瀏覽代碼

2008-05-17 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>

	Sort RequestSlot in ascending order and manipulate them using
	lower_bound.
	* src/DefaultBtMessageDispatcher.cc
	* src/DefaultBtMessageDispatcher.h
	(getMessageQueue): Added const qualifier.
	(getRequestSlots): Added const qualifier.
	(sendMessages): Use empty() instead of size().
	(doCancelSendingPieceAction): Use HandleEvent object.
	(doAbortOutstandingRequestAction): Rewritten.
	(doChokedAction): Rewritten.
	(checkRequestSlotAndDoNecessaryThing): Rewritten.
	(isOutstandingRequest): Rewritten.
	(getOutstandingRequest): Rewritten.
	(removeOutstandingRequest): Rewritten.
	(addOutstandingRequest): Rewritten.
	* src/RequestSlot.cc
	* src/RequestSlot.h
	(operator=): Rewritten.
	(operator!=): New function.
	(operator<): New function.
Tatsuhiro Tsujikawa 17 年之前
父節點
當前提交
b5f1ca6f66
共有 6 個文件被更改,包括 260 次插入91 次删除
  1. 23 0
      ChangeLog
  2. 215 85
      src/DefaultBtMessageDispatcher.cc
  3. 2 2
      src/DefaultBtMessageDispatcher.h
  4. 15 3
      src/RequestSlot.cc
  5. 4 0
      src/RequestSlot.h
  6. 1 1
      test/DefaultBtMessageDispatcherTest.cc

+ 23 - 0
ChangeLog

@@ -1,3 +1,26 @@
+2008-05-17  Tatsuhiro Tsujikawa  <tujikawa at rednoah dot com>
+
+	Sort RequestSlot in ascending order and manipulate them using
+	lower_bound.
+	* src/DefaultBtMessageDispatcher.cc
+	* src/DefaultBtMessageDispatcher.h
+	(getMessageQueue): Added const qualifier.
+	(getRequestSlots): Added const qualifier.
+	(sendMessages): Use empty() instead of size().
+	(doCancelSendingPieceAction): Use HandleEvent object.
+	(doAbortOutstandingRequestAction): Rewritten.
+	(doChokedAction): Rewritten.
+	(checkRequestSlotAndDoNecessaryThing): Rewritten.
+	(isOutstandingRequest): Rewritten.
+	(getOutstandingRequest): Rewritten.
+	(removeOutstandingRequest): Rewritten.
+	(addOutstandingRequest): Rewritten.
+	* src/RequestSlot.cc
+	* src/RequestSlot.h
+	(operator=): Rewritten.
+	(operator!=): New function.
+	(operator<): New function.
+	
 2008-05-17  Tatsuhiro Tsujikawa  <tujikawa at rednoah dot com>
 
 	* src/DefaultBtRequestFactory.cc

+ 215 - 85
src/DefaultBtMessageDispatcher.cc

@@ -81,7 +81,7 @@ void DefaultBtMessageDispatcher::addMessageToQueue(const BtMessages& btMessages)
 
 void DefaultBtMessageDispatcher::sendMessages() {
   BtMessages tempQueue;
-  while(messageQueue.size() > 0) {
+  while(!messageQueue.empty()) {
     BtMessageHandle msg = messageQueue.front();
     messageQueue.pop_front();
     if(maxUploadSpeedLimit > 0 &&
@@ -101,6 +101,18 @@ void DefaultBtMessageDispatcher::sendMessages() {
   std::copy(tempQueue.begin(), tempQueue.end(), std::back_inserter(messageQueue));
 }
 
+class HandleEvent {
+private:
+  SharedHandle<BtEvent> _event;
+public:
+  HandleEvent(const SharedHandle<BtEvent>& event):_event(event) {}
+
+  void operator()(const SharedHandle<BtMessage>& msg) const
+  {
+    msg->handleEvent(_event);
+  }
+};
+
 // Cancel sending piece message to peer.
 void DefaultBtMessageDispatcher::doCancelSendingPieceAction(size_t index, uint32_t begin, size_t length)
 {
@@ -108,9 +120,7 @@ void DefaultBtMessageDispatcher::doCancelSendingPieceAction(size_t index, uint32
     (new BtCancelSendingPieceEvent(index, begin, length));
 
   BtMessages tempQueue = messageQueue;
-  for(BtMessages::iterator itr = tempQueue.begin(); itr != tempQueue.end(); itr++) {
-    (*itr)->handleEvent(event);
-  }
+  std::for_each(tempQueue.begin(), tempQueue.end(), HandleEvent(event));
 }
 
 // Cancel sending piece message to peer.
@@ -119,57 +129,105 @@ void DefaultBtMessageDispatcher::doCancelSendingPieceAction(const PieceHandle& p
 {
 }
 
+class AbortOutstandingRequest {
+private:
+  SharedHandle<Piece> _piece;
+  int32_t _cuid;
+  Logger* _logger;
+public:
+  AbortOutstandingRequest(const SharedHandle<Piece>& piece, int32_t cuid):
+    _piece(piece),
+    _cuid(cuid),
+    _logger(LogFactory::getInstance()) {}
+
+  void operator()(const RequestSlot& slot) const
+  {
+    _logger->debug(MSG_DELETING_REQUEST_SLOT,
+		   _cuid,
+		   slot.getIndex(),
+		   slot.getBlockIndex());
+    _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin());
+    _piece->cancelBlock(slot.getBlockIndex());
+  }
+};
+
 // localhost cancels outstanding download requests to the peer.
 void DefaultBtMessageDispatcher::doAbortOutstandingRequestAction(const PieceHandle& piece) {
-  for(RequestSlots::iterator itr = requestSlots.begin();
-      itr != requestSlots.end();) {
-    RequestSlot& slot = *itr;
-    if(slot.getIndex() == piece->getIndex()) {
-      logger->debug(MSG_DELETING_REQUEST_SLOT,
-		    cuid,
-		    slot.getIndex(),
-		    slot.getBlockIndex());
-      piece->cancelBlock(slot.getBlockIndex());
-      itr = requestSlots.erase(itr);
-    } else {
-      itr++;
-    }
-  }
+  RequestSlot rs(piece->getIndex(), 0, 0, 0);
+  std::deque<RequestSlot>::iterator first =
+    std::lower_bound(requestSlots.begin(), requestSlots.end(), rs);
+
+  rs.setIndex(piece->getIndex()+1);
+  std::deque<RequestSlot>::iterator last =
+    std::lower_bound(requestSlots.begin(), requestSlots.end(), rs);
+
+  std::for_each(first, last, AbortOutstandingRequest(piece, cuid));
+  requestSlots.erase(first, last);
 
   BtAbortOutstandingRequestEventHandle event
     (new BtAbortOutstandingRequestEvent(piece));
 
   BtMessages tempQueue = messageQueue;
-  for(BtMessages::iterator itr = tempQueue.begin(); itr != tempQueue.end(); ++itr) {
-    (*itr)->handleEvent(event);
-  }  
+  std::for_each(tempQueue.begin(), tempQueue.end(), HandleEvent(event));
 }
 
-// localhost received choke message from the peer.
-void DefaultBtMessageDispatcher::doChokedAction()
-{
-  for(RequestSlots::iterator itr = requestSlots.begin();
-      itr != requestSlots.end();) {
-    RequestSlot& slot = *itr;
-    if(peer->isInPeerAllowedIndexSet(slot.getIndex())) {
-      itr++;
-    } else {
-      logger->debug(MSG_DELETING_REQUEST_SLOT_CHOKED,
-		    cuid,
-		    slot.getIndex(),
-		    slot.getBlockIndex());
-      PieceHandle piece = pieceStorage->getPiece(slot.getIndex());
+class ProcessChokedRequestSlot {
+private:
+  int32_t _cuid;
+  SharedHandle<Peer> _peer;
+  SharedHandle<PieceStorage> _pieceStorage;
+  Logger* _logger;
+public:
+  ProcessChokedRequestSlot(int32_t cuid,
+			   const SharedHandle<Peer>& peer,
+			   const SharedHandle<PieceStorage>& pieceStorage):
+    _cuid(cuid),
+    _peer(peer),
+    _pieceStorage(pieceStorage),
+    _logger(LogFactory::getInstance()) {}
+  
+  void operator()(const RequestSlot& slot) const
+  {
+    if(!_peer->isInPeerAllowedIndexSet(slot.getIndex())) {
+      _logger->debug(MSG_DELETING_REQUEST_SLOT_CHOKED,
+		     _cuid,
+		     slot.getIndex(),
+		     slot.getBlockIndex());
+      _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin());
+      SharedHandle<Piece> piece = _pieceStorage->getPiece(slot.getIndex());
       piece->cancelBlock(slot.getBlockIndex());
-      itr = requestSlots.erase(itr);
     }
   }
 
+};
+
+class FindChokedRequestSlot {
+private:
+  SharedHandle<Peer> _peer;
+public:
+  FindChokedRequestSlot(const SharedHandle<Peer>& peer):
+    _peer(peer) {}
+  
+  bool operator()(const RequestSlot& slot) const
+  {
+    return !_peer->isInPeerAllowedIndexSet(slot.getIndex());
+  }
+};
+
+// localhost received choke message from the peer.
+void DefaultBtMessageDispatcher::doChokedAction()
+{
+  std::for_each(requestSlots.begin(), requestSlots.end(),
+		ProcessChokedRequestSlot(cuid, peer, pieceStorage));
+
+  requestSlots.erase(std::remove_if(requestSlots.begin(), requestSlots.end(),
+				    FindChokedRequestSlot(peer)),
+		     requestSlots.end());
+
   BtChokedEventHandle event(new BtChokedEvent());
 
   BtMessages tempQueue = messageQueue;
-  for(BtMessages::iterator itr = tempQueue.begin(); itr != tempQueue.end(); ++itr) {
-    (*itr)->handleEvent(event);
-  }
+  std::for_each(tempQueue.begin(), tempQueue.end(), HandleEvent(event));
 }
 
 // localhost dispatched choke message to the peer.
@@ -178,36 +236,93 @@ void DefaultBtMessageDispatcher::doChokingAction()
   BtChokingEventHandle event(new BtChokingEvent());
 
   BtMessages tempQueue = messageQueue;
-  for(BtMessages::iterator itr = tempQueue.begin(); itr != tempQueue.end(); ++itr) {
-    (*itr)->handleEvent(event);
-  }
+  std::for_each(tempQueue.begin(), tempQueue.end(), HandleEvent(event));
 }
 
-void DefaultBtMessageDispatcher::checkRequestSlotAndDoNecessaryThing()
-{
-  for(RequestSlots::iterator itr = requestSlots.begin();
-      itr != requestSlots.end();) {
-    RequestSlot& slot = *itr;
-    PieceHandle piece = pieceStorage->getPiece(slot.getIndex());
-    if(slot.isTimeout(requestTimeout)) {
-      logger->debug(MSG_DELETING_REQUEST_SLOT_TIMEOUT,
-		    cuid,
-		    slot.getBlockIndex());
+class ProcessStaleRequestSlot {
+private:
+  int32_t _cuid;
+  SharedHandle<Peer> _peer;
+  SharedHandle<PieceStorage> _pieceStorage;
+  BtMessageDispatcher* _messageDispatcher;
+  WeakHandle<BtMessageFactory> _messageFactory;
+  time_t _requestTimeout;
+  Logger* _logger;
+public:
+  ProcessStaleRequestSlot(int32_t cuid, const SharedHandle<Peer>& peer,
+			  const SharedHandle<PieceStorage>& pieceStorage,
+			  BtMessageDispatcher* dispatcher,
+			  const WeakHandle<BtMessageFactory>& factory,
+			  time_t requestTimeout):
+    _cuid(cuid),
+    _peer(peer),
+    _pieceStorage(pieceStorage),
+    _messageDispatcher(dispatcher),
+    _messageFactory(factory),
+    _requestTimeout(requestTimeout),
+    _logger(LogFactory::getInstance()) {}
+
+  void operator()(const RequestSlot& slot)
+  {
+    SharedHandle<Piece> piece = _pieceStorage->getPiece(slot.getIndex());
+    if(slot.isTimeout(_requestTimeout)) {
+      _logger->debug(MSG_DELETING_REQUEST_SLOT_TIMEOUT,
+		     _cuid,
+		     slot.getBlockIndex());
+      _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin());
       piece->cancelBlock(slot.getBlockIndex());
-      peer->snubbing(true);
-      itr = requestSlots.erase(itr);
+      _peer->snubbing(true);
     } else if(piece->hasBlock(slot.getBlockIndex())) {
-      logger->debug(MSG_DELETING_REQUEST_SLOT_ACQUIRED,
-		    cuid,
-		    slot.getBlockIndex());
-      addMessageToQueue(messageFactory->createCancelMessage(slot.getIndex(),
-							    slot.getBegin(),
-							    slot.getLength()));
-      itr = requestSlots.erase(itr);
+      _logger->debug(MSG_DELETING_REQUEST_SLOT_ACQUIRED,
+		     _cuid,
+		     slot.getBlockIndex());
+      _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin());
+      _messageDispatcher->addMessageToQueue
+	(_messageFactory->createCancelMessage(slot.getIndex(),
+					      slot.getBegin(),
+					      slot.getLength()));
+    }
+  }
+};
+
+class FindStaleRequestSlot {
+private:
+  SharedHandle<PieceStorage> _pieceStorage;
+  time_t _requestTimeout;
+public:
+  FindStaleRequestSlot(const SharedHandle<PieceStorage>& pieceStorage,
+		       time_t requestTimeout):
+    _pieceStorage(pieceStorage),
+    _requestTimeout(requestTimeout) {}
+
+  bool operator()(const RequestSlot& slot)
+  {
+    if(slot.isTimeout(_requestTimeout)) {
+      return true;
     } else {
-      itr++;
+      SharedHandle<Piece> piece = _pieceStorage->getPiece(slot.getIndex());
+      if(piece->hasBlock(slot.getBlockIndex())) {
+	return true;
+      } else {
+	return false;
+      }
     }
   }
+};
+
+void DefaultBtMessageDispatcher::checkRequestSlotAndDoNecessaryThing()
+{
+  std::for_each(requestSlots.begin(), requestSlots.end(),
+		ProcessStaleRequestSlot(cuid,
+					peer,
+					pieceStorage,
+					this,
+					messageFactory,
+					requestTimeout));
+  requestSlots.erase(std::remove_if(requestSlots.begin(), requestSlots.end(),
+				    FindStaleRequestSlot(pieceStorage,
+							 requestTimeout)),
+		     requestSlots.end());
 }
 
 bool DefaultBtMessageDispatcher::isSendingInProgress()
@@ -224,42 +339,57 @@ size_t DefaultBtMessageDispatcher::countOutstandingRequest()
   return requestSlots.size();
 }
 
-bool DefaultBtMessageDispatcher::isOutstandingRequest(size_t index, size_t blockIndex) {
-  for(RequestSlots::const_iterator itr = requestSlots.begin();
-      itr != requestSlots.end(); itr++) {
-    const RequestSlot& slot = *itr;
-    if(slot.getIndex() == index && slot.getBlockIndex() == blockIndex) {
-      return true;
+class BlockIndexLess {
+public:
+  bool operator()(const RequestSlot& lhs, const RequestSlot& rhs) const
+  {
+    if(lhs.getIndex() == rhs.getIndex()) {
+      return lhs.getBlockIndex() < rhs.getBlockIndex();
+    } else {
+      return lhs.getIndex() < rhs.getIndex();
     }
   }
-  return false;
+};
+
+bool DefaultBtMessageDispatcher::isOutstandingRequest(size_t index, size_t blockIndex) {
+  RequestSlot rs(index, 0, 0, blockIndex);
+
+  std::deque<RequestSlot>::iterator i =
+    std::lower_bound(requestSlots.begin(), requestSlots.end(), rs, BlockIndexLess());
+  return i != requestSlots.end() &&
+    (*i).getIndex() == index && (*i).getBlockIndex() == blockIndex;
 }
 
 RequestSlot
 DefaultBtMessageDispatcher::getOutstandingRequest(size_t index, uint32_t begin, size_t length)
 {
-  for(RequestSlots::iterator itr = requestSlots.begin();
-      itr != requestSlots.end(); itr++) {
-    if(itr->getIndex() == index &&
-       itr->getBegin() == begin &&
-       itr->getLength() == length) {
-      return *itr;
-    }
+  RequestSlot ret(0, 0, 0, 0);
+  RequestSlot rs(index, begin, length, 0);
+  std::deque<RequestSlot>::iterator i =
+    std::lower_bound(requestSlots.begin(), requestSlots.end(), rs);
+  if(i != requestSlots.end() && (*i) == rs) {
+    ret = *i;
+  } else {
+    ret = RequestSlot::nullSlot;
   }
-  return RequestSlot::nullSlot;
+  return ret;
 }
 
 void DefaultBtMessageDispatcher::removeOutstandingRequest(const RequestSlot& slot)
 {
-  RequestSlots temp;
-  std::remove_copy(requestSlots.begin(), requestSlots.end(), std::back_inserter(temp), slot);
-  requestSlots = temp;
+  std::deque<RequestSlot>::iterator i =
+    std::lower_bound(requestSlots.begin(), requestSlots.end(), slot);
+  if(i != requestSlots.end() && (*i) == slot) {
+    requestSlots.erase(i);
+  }
 }
 
-void DefaultBtMessageDispatcher::addOutstandingRequest(const RequestSlot& requestSlot)
+void DefaultBtMessageDispatcher::addOutstandingRequest(const RequestSlot& slot)
 {
-  if(!isOutstandingRequest(requestSlot.getIndex(), requestSlot.getBlockIndex())) {
-    requestSlots.push_back(requestSlot);
+  std::deque<RequestSlot>::iterator i =
+    std::lower_bound(requestSlots.begin(), requestSlots.end(), slot);
+  if(i == requestSlots.end() || (*i) != slot) {
+    requestSlots.insert(i, slot);
   }
 }
 
@@ -269,13 +399,13 @@ size_t DefaultBtMessageDispatcher::countOutstandingUpload()
 		       mem_fun_sh(&BtMessage::isUploading));
 }
 
-std::deque<SharedHandle<BtMessage> >&
+const std::deque<SharedHandle<BtMessage> >&
 DefaultBtMessageDispatcher::getMessageQueue()
 {
   return messageQueue;
 }
 
-std::deque<RequestSlot>& DefaultBtMessageDispatcher::getRequestSlots()
+const std::deque<RequestSlot>& DefaultBtMessageDispatcher::getRequestSlots()
 {
   return requestSlots;
 }

+ 2 - 2
src/DefaultBtMessageDispatcher.h

@@ -103,9 +103,9 @@ public:
 
   virtual size_t countOutstandingUpload();
 
-  std::deque<SharedHandle<BtMessage> >& getMessageQueue();
+  const std::deque<SharedHandle<BtMessage> >& getMessageQueue();
 
-  RequestSlots& getRequestSlots();
+  const RequestSlots& getRequestSlots();
 
   void setPeer(const SharedHandle<Peer>& peer);
 

+ 15 - 3
src/RequestSlot.cc

@@ -64,9 +64,21 @@ RequestSlot& RequestSlot::operator=(const RequestSlot& requestSlot)
 
 bool RequestSlot::operator==(const RequestSlot& requestSlot) const
 {
-  return index == requestSlot.index &&
-    begin == requestSlot.begin &&
-    length == requestSlot.length;
+  return index == requestSlot.index && begin == requestSlot.begin;
+}
+
+bool RequestSlot::operator!=(const RequestSlot& requestSlot) const
+{
+  return !(*this == requestSlot);
+}
+
+bool RequestSlot::operator<(const RequestSlot& requestSlot) const
+{
+  if(index == requestSlot.index) {
+    return begin < requestSlot.begin;
+  } else {
+    return index < requestSlot.index;
+  }
 }
 
 void RequestSlot::setDispatchedTime() {

+ 4 - 0
src/RequestSlot.h

@@ -59,6 +59,10 @@ public:
 
   bool operator==(const RequestSlot& requestSlot) const;
 
+  bool operator!=(const RequestSlot& requestSlot) const;
+
+  bool operator<(const RequestSlot& requestSlot) const;
+
   void setDispatchedTime();
   void setDispatchedTime(time_t secFromEpoch);
 

+ 1 - 1
test/DefaultBtMessageDispatcherTest.cc

@@ -399,7 +399,7 @@ void DefaultBtMessageDispatcherTest::testGetOutstandingRequest() {
   CPPUNIT_ASSERT(!RequestSlot::isNull(s2));
 
   RequestSlot s3 = btMessageDispatcher->getOutstandingRequest(1, 1024, 17*1024);
-  CPPUNIT_ASSERT(RequestSlot::isNull(s3));
+  CPPUNIT_ASSERT(!RequestSlot::isNull(s3));
 
   RequestSlot s4 = btMessageDispatcher->getOutstandingRequest(1, 2*1024, 16*1024);
   CPPUNIT_ASSERT(RequestSlot::isNull(s4));