浏览代码

2008-03-11 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>

	Fixed high memory footprint when DHT is enabled.
	This is not a memory leak, but DHTReplaceNodeTask is more frequently
	queued than it is processed and the queue is getting longer. As a
	consequence, momory usage is increased.
	As for a fix, instead of issuing DHTReplaceNodeTask, I've implemented
	replacement cache in DHTBucket which is described in Kademlia paper.
	* src/DHTRoutingTable.cc (addNode): Removed the issuing of
	DHTReplaceNodeTask.
	* src/DHTBucket.{h, cc}
	(cacheNode): New function.
	(getCachedNodes): New function.
	(dropNode): Push back cached node to _nodes.
	* test/DHTBucketTest.cc
	(testCacheNode): New test
	(testDropNode): New test
Tatsuhiro Tsujikawa 17 年之前
父节点
当前提交
852315bf11
共有 5 个文件被更改,包括 115 次插入30 次删除
  1. 18 0
      ChangeLog
  2. 20 26
      src/DHTBucket.cc
  3. 7 1
      src/DHTBucket.h
  4. 3 3
      src/DHTRoutingTable.cc
  5. 67 0
      test/DHTBucketTest.cc

+ 18 - 0
ChangeLog

@@ -1,3 +1,21 @@
+2008-03-11  Tatsuhiro Tsujikawa  <tujikawa at rednoah dot com>
+
+	Fixed high memory footprint when DHT is enabled.
+	This is not a memory leak, but DHTReplaceNodeTask is more frequently
+	queued than it is processed and the queue is getting longer. As a
+	consequence, momory usage is increased.
+	As for a fix, instead of issuing DHTReplaceNodeTask, I've implemented
+	replacement cache in DHTBucket which is described in Kademlia paper.
+	* src/DHTRoutingTable.cc (addNode): Removed the issuing of
+	DHTReplaceNodeTask.
+	* src/DHTBucket.{h, cc}
+	(cacheNode): New function.
+	(getCachedNodes): New function.
+	(dropNode): Push back cached node to _nodes.
+	* test/DHTBucketTest.cc
+	(testCacheNode): New test
+	(testDropNode): New test
+	
 2008-03-11  Tatsuhiro Tsujikawa  <tujikawa at rednoah dot com>
 
 	Allocate memory for peekBuf in initiateSecureConnection() to avoid

+ 20 - 26
src/DHTBucket.cc

@@ -119,23 +119,6 @@ bool DHTBucket::addNode(const SharedHandle<DHTNode>& node)
       } else {
 	return false;
       }
-	/*
-      } else if(splitAllowed()) {
-	return false;
-      } else {
-	std::deque<SharedHandle<DHTNode> >::iterator ci = find(_cachedNodes.begin(), _cachedNodes.end(), node);
-	if(ci == _cachedNodes.end()) {
-	  _cachedNodes.push_back(node);
-	  if(_cachedNodes.size() > CACHE_SIZE) {
-	    _cachedNodes.erase(_cachedNodes.begin(), _cachedNodes().begin()+CACHE_SIZE-_cachedNodes.size());
-	  }
-	} else {
-	  _cachedNodes.erase(ci);
-	  _cachedNodes.push_back(node);
-	}
-	return true;
-      }
-	*/
     }
   } else {
     _nodes.erase(itr);
@@ -144,19 +127,25 @@ bool DHTBucket::addNode(const SharedHandle<DHTNode>& node)
   }
 }
 
+void DHTBucket::cacheNode(const SharedHandle<DHTNode>& node)
+{
+  // _cachedNodes are sorted by last time seen
+  _cachedNodes.push_front(node);
+  if(_cachedNodes.size() > CACHE_SIZE) {
+    _cachedNodes.resize(CACHE_SIZE, 0);
+  }
+}
+
 void DHTBucket::dropNode(const SharedHandle<DHTNode>& node)
 {
-  return;
-  /*
-  std::deque<SharedHandle<DHTNode> >::iterator itr = find(_nodes.begin(), _nodes.end(), node);
-  if(itr != _nodes.end()) {
-    _nodes.erase(itr);
-    if(_cachedNodes.size()) {
-      _nodes.push_back(_cachedNodes.back());
-      _cachedNodes.erase(_cachedNodes.begin()+_cachedNodes.size()-1);
+  if(_cachedNodes.size()) {
+    std::deque<SharedHandle<DHTNode> >::iterator itr = find(_nodes.begin(), _nodes.end(), node);
+    if(itr != _nodes.end()) {
+      _nodes.erase(itr);
+      _nodes.push_back(_cachedNodes.front());
+      _cachedNodes.erase(_cachedNodes.begin());
     }
   }
-  */
 }
 
 void DHTBucket::moveToHead(const SharedHandle<DHTNode>& node)
@@ -291,4 +280,9 @@ SharedHandle<DHTNode> DHTBucket::getLRUQuestionableNode() const
   }
 }
 
+const std::deque<SharedHandle<DHTNode> >& DHTBucket::getCachedNodes() const
+{
+  return _cachedNodes;
+}
+
 } // namespace aria2

+ 7 - 1
src/DHTBucket.h

@@ -61,7 +61,9 @@ private:
   // sorted in ascending order
   std::deque<SharedHandle<DHTNode> > _nodes;
 
-  //std::deque<SharedHandle<DHTNode> > _cachedNodes;
+  // a replacement cache. The maximum size is specified by CACHE_SIZE.
+  // This is sorted by last time seen.
+  std::deque<SharedHandle<DHTNode> > _cachedNodes;
 
   Time _lastUpdated;
 
@@ -89,6 +91,8 @@ public:
 
   bool addNode(const SharedHandle<DHTNode>& node);
 
+  void cacheNode(const SharedHandle<DHTNode>& node);
+
   bool splitAllowed() const;
   
   size_t getPrefixLength() const
@@ -131,6 +135,8 @@ public:
   bool containsQuestionableNode() const;
 
   SharedHandle<DHTNode> getLRUQuestionableNode() const;
+
+  const std::deque<SharedHandle<DHTNode> >& getCachedNodes() const;
 };
 
 } // namespace aria2

+ 3 - 3
src/DHTRoutingTable.cc

@@ -98,9 +98,9 @@ bool DHTRoutingTable::addNode(const SharedHandle<DHTNode>& node, bool good)
 	bnode = lbnode;
       }
     } else {
-      if(good && bucket->containsQuestionableNode()) {
-	_logger->debug("Issuing ReplaceNodeTask: new node=%s", node->toString().c_str());
-	_taskQueue->addImmediateTask(_taskFactory->createReplaceNodeTask(bucket, node));
+      if(good) {
+	bucket->cacheNode(node);
+	_logger->debug("Cached node=%s", node->toString().c_str());
       }
       return false;
     }

+ 67 - 0
test/DHTBucketTest.cc

@@ -17,6 +17,8 @@ class DHTBucketTest:public CppUnit::TestFixture {
   CPPUNIT_TEST(testMoveToHead);
   CPPUNIT_TEST(testMoveToTail);
   CPPUNIT_TEST(testGetGoodNodes);
+  CPPUNIT_TEST(testCacheNode);
+  CPPUNIT_TEST(testDropNode);
   CPPUNIT_TEST_SUITE_END();
 public:
   void setUp() {}
@@ -31,6 +33,8 @@ public:
   void testMoveToHead();
   void testMoveToTail();
   void testGetGoodNodes();
+  void testCacheNode();
+  void testDropNode();
 };
 
 
@@ -337,4 +341,67 @@ void DHTBucketTest::testGetGoodNodes()
   CPPUNIT_ASSERT_EQUAL((uint16_t)6888, goodNodes[5]->getPort());
 }
 
+void DHTBucketTest::testCacheNode()
+{
+  unsigned char localNodeID[DHT_ID_LENGTH];
+  memset(localNodeID, 0, DHT_ID_LENGTH);
+  SharedHandle<DHTNode> localNode = new DHTNode(localNodeID);
+  DHTBucket bucket(localNode);
+
+  SharedHandle<DHTNode> n1 = new DHTNode();
+  SharedHandle<DHTNode> n2 = new DHTNode();
+  SharedHandle<DHTNode> n3 = new DHTNode();
+
+  bucket.cacheNode(n1);
+  bucket.cacheNode(n2);
+  CPPUNIT_ASSERT_EQUAL((size_t)2, bucket.getCachedNodes().size());
+  CPPUNIT_ASSERT(n2 == bucket.getCachedNodes()[0]);
+
+  bucket.cacheNode(n3);
+  CPPUNIT_ASSERT_EQUAL((size_t)2, bucket.getCachedNodes().size());
+  CPPUNIT_ASSERT(n3 == bucket.getCachedNodes()[0]);
+  CPPUNIT_ASSERT(n2 == bucket.getCachedNodes()[1]);
+}
+
+void DHTBucketTest::testDropNode()
+{
+  unsigned char localNodeID[DHT_ID_LENGTH];
+  memset(localNodeID, 0, DHT_ID_LENGTH);
+  SharedHandle<DHTNode> localNode = new DHTNode(localNodeID);
+  DHTBucket bucket(localNode);
+
+  unsigned char id[DHT_ID_LENGTH];
+  SharedHandle<DHTNode> nodes[] = { 0, 0, 0, 0, 0, 0, 0, 0 };
+  for(size_t i = 0; i < DHTBucket::K; ++i) {
+    createID(id, 0xf0, i);
+    nodes[i] = new DHTNode(id);
+    nodes[i]->setPort(6881+i);
+    CPPUNIT_ASSERT(bucket.addNode(nodes[i]));
+  }
+
+  SharedHandle<DHTNode> cachedNode1 = new DHTNode();
+  SharedHandle<DHTNode> cachedNode2 = new DHTNode();
+
+  bucket.dropNode(nodes[3]);
+  // nothing happens because the replacement cache is empty.
+  {
+    std::deque<SharedHandle<DHTNode> > tnodes = bucket.getNodes();
+    CPPUNIT_ASSERT_EQUAL((size_t)8, tnodes.size());
+    CPPUNIT_ASSERT(nodes[3] == tnodes[3]);
+  }
+
+  bucket.cacheNode(cachedNode1);
+  bucket.cacheNode(cachedNode2);
+
+  bucket.dropNode(nodes[3]);
+  {
+    std::deque<SharedHandle<DHTNode> > tnodes = bucket.getNodes();
+    CPPUNIT_ASSERT_EQUAL((size_t)8, tnodes.size());
+    CPPUNIT_ASSERT(tnodes.end() == std::find(tnodes.begin(), tnodes.end(), nodes[3]));
+    CPPUNIT_ASSERT(cachedNode2 == tnodes[7]);
+  }
+  CPPUNIT_ASSERT_EQUAL((size_t)1, bucket.getCachedNodes().size());
+  CPPUNIT_ASSERT(cachedNode1 == bucket.getCachedNodes()[0]);
+}
+
 } // namespace aria2