Procházet zdrojové kódy

2008-05-11 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>

	Implemented rarest piece first piece selection strategy.
	* src/AbstractBtMessage.cc
	* src/AbstractBtMessage.h
	* src/BitfieldMan.cc
	* src/BitfieldMan.h
	* src/BtBitfieldMessage.cc
	* src/BtHaveAllMessage.cc
	* src/BtHaveMessage.cc
	* src/DefaultPieceStorage.cc
	* src/DefaultPieceStorage.h
	* src/PeerInteractionCommand.cc
	* src/PieceStorage.h
	* src/UnknownLengthPieceStorage.h
Tatsuhiro Tsujikawa před 17 roky
rodič
revize
a702d60666

+ 16 - 0
ChangeLog

@@ -1,3 +1,19 @@
+2008-05-11  Tatsuhiro Tsujikawa  <tujikawa at rednoah dot com>
+
+	Implemented rarest piece first piece selection strategy.
+	* src/AbstractBtMessage.cc
+	* src/AbstractBtMessage.h
+	* src/BitfieldMan.cc
+	* src/BitfieldMan.h
+	* src/BtBitfieldMessage.cc
+	* src/BtHaveAllMessage.cc
+	* src/BtHaveMessage.cc
+	* src/DefaultPieceStorage.cc
+	* src/DefaultPieceStorage.h
+	* src/PeerInteractionCommand.cc
+	* src/PieceStorage.h
+	* src/UnknownLengthPieceStorage.h
+
 2008-05-11  Tatsuhiro Tsujikawa  <tujikawa at rednoah dot com>
 
 	Rewritten readData and writeData.

+ 5 - 0
src/AbstractBtMessage.cc

@@ -107,6 +107,11 @@ SharedHandle<BtContext> AbstractBtMessage::getBtContext() const
   return btContext;
 }
 
+void AbstractBtMessage::setPieceStorage(const SharedHandle<PieceStorage>& pieceStorage)
+{
+  this->pieceStorage = pieceStorage;
+}
+
 void AbstractBtMessage::setBtMessageDispatcher(const WeakHandle<BtMessageDispatcher>& dispatcher)
 {
   this->dispatcher = dispatcher;

+ 2 - 0
src/AbstractBtMessage.h

@@ -136,6 +136,8 @@ public:
 
   SharedHandle<BtContext> getBtContext() const;
 
+  void setPieceStorage(const SharedHandle<PieceStorage>& pieceStorage);
+
   void setBtMessageDispatcher(const WeakHandle<BtMessageDispatcher>& dispatcher);
 
   void setPeerConnection(const WeakHandle<PeerConnection>& peerConnection);

+ 13 - 0
src/BitfieldMan.cc

@@ -424,6 +424,19 @@ std::deque<size_t> BitfieldMan::getAllMissingIndexes(const unsigned char* peerBi
   return getAllMissingIndexes(bf, bitfieldLength);
 }
 
+std::deque<size_t> BitfieldMan::getAllMissingUnusedIndexes(const unsigned char* peerBitfield, size_t peerBitfieldLength) const {
+  if(bitfieldLength != peerBitfieldLength) {
+    return std::deque<size_t>();
+  }
+  array_fun<unsigned char> bf = array_and(array_and(array_negate(bitfield),
+						    array_negate(useBitfield)),
+					  peerBitfield);
+  if(filterEnabled) {
+    bf = array_and(bf, filterBitfield);
+  }
+  return getAllMissingIndexes(bf, bitfieldLength);
+}
+
 size_t BitfieldMan::countMissingBlock() const {
   return cachedNumMissingBlock;
 }

+ 5 - 0
src/BitfieldMan.h

@@ -142,6 +142,11 @@ public:
    * affected by filter
    */
   std::deque<size_t> getAllMissingIndexes(const unsigned char* bitfield, size_t len) const;
+  /**
+   * affected by filter
+   */
+  std::deque<size_t> getAllMissingUnusedIndexes(const unsigned char* bitfield,
+						size_t len) const;
   /**
    * affected by filter
    */

+ 2 - 0
src/BtBitfieldMessage.cc

@@ -39,6 +39,7 @@
 #include "message.h"
 #include "Peer.h"
 #include "StringFormat.h"
+#include "PieceStorage.h"
 #include <cstring>
 
 namespace aria2 {
@@ -72,6 +73,7 @@ BtBitfieldMessage::create(const unsigned char* data, size_t dataLength)
 }
 
 void BtBitfieldMessage::doReceivedAction() {
+  pieceStorage->updatePieceStats(bitfield, bitfieldLength, peer->getBitfield());
   peer->setBitfield(bitfield, bitfieldLength);
 }
 

+ 3 - 0
src/BtHaveAllMessage.cc

@@ -38,6 +38,7 @@
 #include "message.h"
 #include "Peer.h"
 #include "StringFormat.h"
+#include "PieceStorage.h"
 
 namespace aria2 {
 
@@ -61,7 +62,9 @@ void BtHaveAllMessage::doReceivedAction() {
       (StringFormat("%s received while fast extension is disabled",
 		    toString().c_str()).str());
   }
+  pieceStorage->subtractPieceStats(peer->getBitfield(), peer->getBitfieldLength());
   peer->setAllBitfield();
+  pieceStorage->addPieceStats(peer->getBitfield(), peer->getBitfieldLength());
 }
 
 size_t BtHaveAllMessage::MESSAGE_LENGTH = 5;

+ 2 - 0
src/BtHaveMessage.cc

@@ -39,6 +39,7 @@
 #include "message.h"
 #include "Peer.h"
 #include "StringFormat.h"
+#include "PieceStorage.h"
 
 namespace aria2 {
 
@@ -59,6 +60,7 @@ BtHaveMessageHandle BtHaveMessage::create(const unsigned char* data, size_t data
 
 void BtHaveMessage::doReceivedAction() {
   peer->updateBitfield(index, 1);
+  pieceStorage->addPieceStats(index);
 }
 
 size_t BtHaveMessage::MESSAGE_LENGTH = 9;

+ 177 - 3
src/DefaultPieceStorage.cc

@@ -58,16 +58,57 @@
 
 namespace aria2 {
 
+class GenPieceStat {
+private:
+  size_t _index;
+public:
+  GenPieceStat():_index(0) {}
+
+  SharedHandle<PieceStat> operator()()
+  {
+    return SharedHandle<PieceStat>(new PieceStat(_index++));
+  }
+};
+
+class PieceRarer
+{
+public:
+  bool operator()(const SharedHandle<PieceStat>& left,
+		  const SharedHandle<PieceStat>& right)
+  {
+    if(left->getCount() == right->getCount()) {
+      return left->getOrder() < right->getOrder();
+    } else {
+      return left->getCount() < right->getCount();
+    }
+  }
+};
+
 DefaultPieceStorage::DefaultPieceStorage(const DownloadContextHandle& downloadContext, const Option* option):
   downloadContext(downloadContext),
   _diskWriterFactory(new DefaultDiskWriterFactory()),
   endGamePieceNum(END_GAME_PIECE_NUM),
-  option(option)
+  option(option),
+  _pieceStats(downloadContext->getNumPieces())
 {
   bitfieldMan =
     BitfieldManFactory::getFactoryInstance()->
     createBitfieldMan(downloadContext->getPieceLength(),
 		      downloadContext->getTotalLength());
+
+  std::generate(_pieceStats.begin(), _pieceStats.end(), GenPieceStat());
+  _sortedPieceStats = _pieceStats;
+  // we need some randomness in ordering.
+  std::random_shuffle(_sortedPieceStats.begin(), _sortedPieceStats.end());
+  {
+    size_t order = 0;
+    for(std::deque<SharedHandle<PieceStat> >::iterator i = _sortedPieceStats.begin();
+	i != _sortedPieceStats.end(); ++i) {
+      (*i)->setOrder(order++);
+    }
+  }
+  std::sort(_sortedPieceStats.begin(), _sortedPieceStats.end(), PieceRarer());
+
   logger = LogFactory::getInstance();
 }
 
@@ -86,14 +127,38 @@ bool DefaultPieceStorage::isEndGame()
   return bitfieldMan->countMissingBlock() <= endGamePieceNum;
 }
 
+class FindRarestPiece
+{
+private:
+  const std::deque<size_t>& _indexes;
+public:
+  FindRarestPiece(const std::deque<size_t>& indexes):_indexes(indexes) {}
+
+  bool operator()(const SharedHandle<PieceStat>& pieceStat)
+  {
+    return std::binary_search(_indexes.begin(), _indexes.end(), pieceStat->getIndex());
+  }
+};
+
 bool DefaultPieceStorage::getMissingPieceIndex(size_t& index, const PeerHandle& peer)
 {
   if(isEndGame()) {
     return bitfieldMan->getMissingIndex(index, peer->getBitfield(),
 					peer->getBitfieldLength());
   } else {
-    return bitfieldMan->getMissingUnusedIndex(index, peer->getBitfield(),
+    std::deque<size_t> indexes =
+      bitfieldMan->getAllMissingUnusedIndexes(peer->getBitfield(),
 					      peer->getBitfieldLength());
+    if(indexes.empty()) {
+      return false;
+    } else {
+      std::sort(indexes.begin(), indexes.end());
+      std::deque<SharedHandle<PieceStat> >::const_iterator i =
+	std::find_if(_sortedPieceStats.begin(), _sortedPieceStats.end(),
+		     FindRarestPiece(indexes));
+      index = (*i)->getIndex();
+      return true;
+    }
   }
 }
 
@@ -285,6 +350,7 @@ void DefaultPieceStorage::completePiece(const PieceHandle& piece)
   }
   bitfieldMan->setBit(piece->getIndex());
   bitfieldMan->unsetUseBit(piece->getIndex());
+  addPieceStats(piece->getIndex());
   if(downloadFinished()) {
     diskAdaptor->onDownloadComplete();
     if(isSelectiveDownloadingMode()) {
@@ -455,8 +521,9 @@ void DefaultPieceStorage::setBitfield(const unsigned char* bitfield,
 				      size_t bitfieldLength)
 {
   bitfieldMan->setBitfield(bitfield, bitfieldLength);
+  addPieceStats(bitfield, bitfieldLength);
 }
-  
+
 size_t DefaultPieceStorage::getBitfieldLength()
 {
   return bitfieldMan->getBitfieldLength();
@@ -576,4 +643,111 @@ void DefaultPieceStorage::setDiskWriterFactory(const DiskWriterFactoryHandle& di
   _diskWriterFactory = diskWriterFactory;
 }
 
+void DefaultPieceStorage::addPieceStats(const unsigned char* bitfield,
+					size_t bitfieldLength)
+{
+  size_t index = 0;
+  for(size_t bi = 0; bi < bitfieldLength; ++bi) {
+    
+    for(size_t i = 0; i < 8; ++i, ++index) {
+      unsigned char mask = 128 >> i;
+      if(bitfield[bi]&mask) {
+	_pieceStats[index]->addCount();
+      }
+    }
+
+  }
+  std::sort(_sortedPieceStats.begin(), _sortedPieceStats.end(), PieceRarer());
+}
+
+void DefaultPieceStorage::subtractPieceStats(const unsigned char* bitfield,
+					     size_t bitfieldLength)
+{
+  size_t index = 0;
+  for(size_t bi = 0; bi < bitfieldLength; ++bi) {
+    
+    for(size_t i = 0; i < 8; ++i, ++index) {
+      unsigned char mask = 128 >> i;
+      if(bitfield[bi]&mask) {
+	_pieceStats[index]->subCount();
+      }
+    }
+
+  }
+  std::sort(_sortedPieceStats.begin(), _sortedPieceStats.end(), PieceRarer());
+}
+
+void DefaultPieceStorage::updatePieceStats(const unsigned char* newBitfield,
+					   size_t newBitfieldLength,
+					   const unsigned char* oldBitfield)
+{
+  size_t index = 0;
+  for(size_t bi = 0; bi < newBitfieldLength; ++bi) {
+    
+    for(size_t i = 0; i < 8; ++i, ++index) {
+      unsigned char mask = 128 >> i;
+      if((newBitfield[bi]&mask) && !(oldBitfield[bi]&mask)) {
+	_pieceStats[index]->addCount();
+      } else if(!(newBitfield[bi]&mask) && (oldBitfield[bi]&mask)) {
+	_pieceStats[index]->subCount();
+      }
+    }
+
+  }
+  std::sort(_sortedPieceStats.begin(), _sortedPieceStats.end(), PieceRarer());
+}
+
+void DefaultPieceStorage::addPieceStats(size_t index)
+{
+  std::deque<SharedHandle<PieceStat> >::iterator cur =
+    std::lower_bound(_sortedPieceStats.begin(), _sortedPieceStats.end(),
+		     _pieceStats[index], PieceRarer());
+
+  (*cur)->addCount();
+
+  std::deque<SharedHandle<PieceStat> >::iterator last =
+    std::upper_bound(cur+1, _sortedPieceStats.end(), *cur, PieceRarer());
+
+  std::sort(cur, last, PieceRarer());
+//   for(std::deque<SharedHandle<PieceStat> >::const_iterator i = _sortedPieceStats.begin(); i != _sortedPieceStats.end(); ++i) {
+//     logger->debug("index = %u, count = %u", (*i)->getIndex(), (*i)->getCount());
+//   }
+}
+
+PieceStat::PieceStat(size_t index):_order(0), _index(index), _count(0) {}
+
+void PieceStat::addCount()
+{
+  if(_count < SIZE_MAX) {
+    ++_count;
+  }
+}
+
+void PieceStat::subCount()
+{
+  if(_count > 0) {
+    --_count;
+  }
+}
+
+size_t PieceStat::getIndex() const
+{
+  return _index;
+}
+
+size_t PieceStat::getCount() const
+{
+  return _count;
+}
+
+void PieceStat::setOrder(size_t order)
+{
+  _order = order;
+}
+
+size_t PieceStat::getOrder() const
+{
+  return _order;
+}
+
 } // namespace aria2

+ 33 - 0
src/DefaultPieceStorage.h

@@ -67,6 +67,23 @@ public:
 
 typedef std::deque<HaveEntry> Haves;
 
+class PieceStat {
+private:
+  size_t _order;
+  size_t _index;
+  size_t _count;
+public:
+  PieceStat(size_t index);
+
+  void addCount();
+  void subCount();
+
+  size_t getOrder() const;
+  void setOrder(size_t order);
+  size_t getIndex() const;
+  size_t getCount() const;
+};
+
 class DefaultPieceStorage : public PieceStorage {
 private:
   SharedHandle<DownloadContext> downloadContext;
@@ -74,10 +91,14 @@ private:
   SharedHandle<DiskAdaptor> diskAdaptor;
   SharedHandle<DiskWriterFactory> _diskWriterFactory;
   std::deque<SharedHandle<Piece> > usedPieces;
+
   size_t endGamePieceNum;
   Logger* logger;
   const Option* option;
   Haves haves;
+
+  std::deque<SharedHandle<PieceStat> > _pieceStats;
+  std::deque<SharedHandle<PieceStat> > _sortedPieceStats;
   
   bool getMissingPieceIndex(size_t& index, const SharedHandle<Peer>& peer);
   bool getMissingFastPieceIndex(size_t& index, const SharedHandle<Peer>& peer);
@@ -177,6 +198,18 @@ public:
 
   virtual std::deque<SharedHandle<Piece> > getInFlightPieces();
 
+  virtual void addPieceStats(size_t index);
+
+  virtual void addPieceStats(const unsigned char* bitfield,
+			     size_t bitfieldLength);
+  
+  virtual void subtractPieceStats(const unsigned char* bitfield,
+				  size_t bitfieldLength);
+
+  virtual void updatePieceStats(const unsigned char* newBitfield,
+				size_t newBitfieldLength,
+				const unsigned char* oldBitfield);
+
   /**
    * This method is made private for test purpose only.
    */

+ 3 - 0
src/PeerInteractionCommand.cc

@@ -62,6 +62,7 @@
 #include "DHTNode.h"
 #include "DHTSetup.h"
 #include "DHTRegistry.h"
+#include "PieceStorage.h"
 #include <algorithm>
 
 namespace aria2 {
@@ -169,6 +170,8 @@ PeerInteractionCommand::PeerInteractionCommand(int32_t cuid,
 }
 
 PeerInteractionCommand::~PeerInteractionCommand() {
+  pieceStorage->subtractPieceStats(peer->getBitfield(),
+				   peer->getBitfieldLength());
   peer->releaseSessionResource();
   PEER_OBJECT_CLUSTER(btContext)->unregisterHandle(peer->getID());
 					

+ 13 - 0
src/PieceStorage.h

@@ -204,6 +204,19 @@ public:
   virtual size_t countInFlightPiece() = 0;
 
   virtual std::deque<SharedHandle<Piece> > getInFlightPieces() = 0;
+
+  virtual void addPieceStats(size_t index) = 0;
+
+  virtual void addPieceStats(const unsigned char* bitfield,
+			     size_t bitfieldLength) = 0;
+
+  virtual void subtractPieceStats(const unsigned char* bitfield,
+				  size_t bitfieldLength) = 0;
+
+  virtual void updatePieceStats(const unsigned char* newBitfield,
+				size_t newBitfieldLength,
+				const unsigned char* oldBitfield) = 0;
+
 };
 
 typedef SharedHandle<PieceStorage> PieceStorageHandle;

+ 13 - 0
src/UnknownLengthPieceStorage.h

@@ -250,6 +250,19 @@ public:
 
   virtual std::deque<SharedHandle<Piece> > getInFlightPieces();
 
+  virtual void addPieceStats(size_t index) {}
+
+  virtual void addPieceStats(const unsigned char* bitfield,
+			     size_t bitfieldLength) {}
+
+  virtual void subtractPieceStats(const unsigned char* bitfield,
+				  size_t bitfieldLength) {}
+
+  virtual void updatePieceStats(const unsigned char* newBitfield,
+				size_t newBitfieldLength,
+				const unsigned char* oldBitfield) {}
+
+
   void setDiskWriterFactory(const SharedHandle<DiskWriterFactory>& diskWriterFactory);
 };
 

+ 3 - 0
test/BtBitfieldMessageTest.cc

@@ -2,6 +2,7 @@
 #include "PeerMessageUtil.h"
 #include "Util.h"
 #include "Peer.h"
+#include "MockPieceStorage.h"
 #include <cstring>
 #include <cppunit/extensions/HelperMacros.h>
 
@@ -74,6 +75,8 @@ void BtBitfieldMessageTest::testDoReceivedAction() {
   peer->allocateSessionResource(16*1024, 16*16*1024);
   BtBitfieldMessage msg;
   msg.setPeer(peer);
+  SharedHandle<MockPieceStorage> pieceStorage(new MockPieceStorage());
+  msg.setPieceStorage(pieceStorage);
   unsigned char bitfield[] = { 0xff, 0xff };
   msg.setBitfield(bitfield, sizeof(bitfield));
   

+ 4 - 1
test/BtHaveAllMessageTest.cc

@@ -1,6 +1,7 @@
 #include "BtHaveAllMessage.h"
 #include "PeerMessageUtil.h"
 #include "Peer.h"
+#include "MockPieceStorage.h"
 #include <cstring>
 #include <cppunit/extensions/HelperMacros.h>
 
@@ -64,7 +65,9 @@ void BtHaveAllMessageTest::testDoReceivedAction() {
   peer->allocateSessionResource(16*1024, 256*1024);
   peer->setFastExtensionEnabled(true);
   msg.setPeer(peer);
-  
+  SharedHandle<MockPieceStorage> pieceStorage(new MockPieceStorage());
+  msg.setPieceStorage(pieceStorage);
+
   msg.doReceivedAction();
   
   CPPUNIT_ASSERT(peer->isSeeder());

+ 3 - 0
test/BtHaveMessageTest.cc

@@ -1,6 +1,7 @@
 #include "BtHaveMessage.h"
 #include "PeerMessageUtil.h"
 #include "Peer.h"
+#include "MockPieceStorage.h"
 #include <cstring>
 #include <cppunit/extensions/HelperMacros.h>
 
@@ -70,6 +71,8 @@ void BtHaveMessageTest::testDoReceivedAction() {
   BtHaveMessage msg;
   msg.setIndex(1);
   msg.setPeer(peer);
+  SharedHandle<MockPieceStorage> pieceStorage(new MockPieceStorage());
+  msg.setPieceStorage(pieceStorage);
 
   CPPUNIT_ASSERT(!peer->hasPiece(msg.getIndex()));
 

+ 7 - 3
test/DefaultPieceStorageTest.cc

@@ -80,14 +80,16 @@ void DefaultPieceStorageTest::testGetMissingPiece() {
   pss.setEndGamePieceNum(0);
 
   peer->setAllBitfield();
+  // TODO the ordering of piece may vary depending on the system, so the test
+  // may fail.
   SharedHandle<Piece> piece = pss.getMissingPiece(peer);
-  CPPUNIT_ASSERT_EQUAL(std::string("piece: index=0, length=128"),
+  CPPUNIT_ASSERT_EQUAL(std::string("piece: index=2, length=128"),
 		       piece->toString());
   piece = pss.getMissingPiece(peer);
   CPPUNIT_ASSERT_EQUAL(std::string("piece: index=1, length=128"),
 		       piece->toString());
   piece = pss.getMissingPiece(peer);
-  CPPUNIT_ASSERT_EQUAL(std::string("piece: index=2, length=128"),
+  CPPUNIT_ASSERT_EQUAL(std::string("piece: index=0, length=128"),
 		       piece->toString());
   piece = pss.getMissingPiece(peer);
   CPPUNIT_ASSERT(piece.isNull());
@@ -122,8 +124,10 @@ void DefaultPieceStorageTest::testCompletePiece() {
 
   peer->setAllBitfield();
 
+  // TODO the ordering of piece may vary depending on the system, so the test
+  // may fail.
   SharedHandle<Piece> piece = pss.getMissingPiece(peer);
-  CPPUNIT_ASSERT_EQUAL(std::string("piece: index=0, length=128"),
+  CPPUNIT_ASSERT_EQUAL(std::string("piece: index=2, length=128"),
 		       piece->toString());
 
   CPPUNIT_ASSERT_EQUAL(0ULL, pss.getCompletedLength());

+ 11 - 0
test/MockPieceStorage.h

@@ -207,6 +207,17 @@ public:
     return inFlightPieces;
   }
 
+  virtual void addPieceStats(size_t index) {}
+
+  virtual void addPieceStats(const unsigned char* bitfield,
+			     size_t bitfieldLength) {}
+
+  virtual void subtractPieceStats(const unsigned char* bitfield,
+				  size_t bitfieldLength) {}
+
+  virtual void updatePieceStats(const unsigned char* newBitfield,
+				size_t newBitfieldLength,
+				const unsigned char* oldBitfield) {}
 };
 
 } // namespace aria2