Procházet zdrojové kódy

Ignore peer which possibly sent bad data in next few minuts.

In aria2, one piece is downloaded from several peers, so when hash
check failed for that piece, it cannot determine which peer(s) sent
bad data. So, we ignore peer who sent last block of data in random
minutes. We use randomized timeout because all peers get waken up at
the almost same time.
Tatsuhiro Tsujikawa před 14 roky
rodič
revize
d1bb828066

+ 8 - 0
src/BtPieceMessage.cc

@@ -55,6 +55,7 @@
 #include "PeerConnection.h"
 #include "fmt.h"
 #include "DownloadContext.h"
+#include "PeerStorage.h"
 
 namespace aria2 {
 
@@ -132,6 +133,7 @@ void BtPieceMessage::doReceivedAction()
         onNewPiece(piece);
       } else {
         onWrongPiece(piece);
+        peerStorage_->addBadPeer(getPeer()->getIPAddress());
         throw DL_ABORT_EX("Bad piece hash.");
       }
     }
@@ -320,4 +322,10 @@ void BtPieceMessage::setDownloadContext
   downloadContext_ = downloadContext;
 }
 
+void BtPieceMessage::setPeerStorage
+(const SharedHandle<PeerStorage>& peerStorage)
+{
+  peerStorage_ = peerStorage;
+}
+
 } // namespace aria2

+ 4 - 0
src/BtPieceMessage.h

@@ -42,6 +42,7 @@ namespace aria2 {
 class Piece;
 class BtPieceMessage;
 class DownloadContext;
+class PeerStorage;
 
 typedef SharedHandle<BtPieceMessage> BtPieceMessageHandle;
 
@@ -53,6 +54,7 @@ private:
   unsigned char* block_;
   unsigned char* rawData_;
   SharedHandle<DownloadContext> downloadContext_;
+  SharedHandle<PeerStorage> peerStorage_;
 
   static size_t MESSAGE_HEADER_LENGTH;
 
@@ -95,6 +97,8 @@ public:
 
   void setDownloadContext(const SharedHandle<DownloadContext>& downloadContext);
 
+  void setPeerStorage(const SharedHandle<PeerStorage>& peerStorage);
+
   static BtPieceMessageHandle create
   (const unsigned char* data, size_t dataLength);
 

+ 1 - 0
src/DefaultBtMessageFactory.cc

@@ -340,6 +340,7 @@ DefaultBtMessageFactory::createPieceMessage
                                  pieceStorage_->getPieceLength(index)));
   msg->setBtMessageValidator(validator);
   msg->setDownloadContext(downloadContext_);
+  msg->setPeerStorage(peerStorage_);
   setCommonProperty(msg);
   return msg;
 }

+ 44 - 0
src/DefaultPeerStorage.cc

@@ -47,6 +47,7 @@
 #include "wallclock.h"
 #include "a2functional.h"
 #include "fmt.h"
+#include "SimpleRandomizer.h"
 
 namespace aria2 {
 
@@ -100,6 +101,11 @@ bool DefaultPeerStorage::addPeer(const SharedHandle<Peer>& peer) {
                      peer->getIPAddress().c_str(), peer->getPort()));
     return false;
   }
+  if(isBadPeer(peer->getIPAddress())) {
+    A2_LOG_DEBUG(fmt("Adding %s:%u is rejected because it is marked bad.",
+                     peer->getIPAddress().c_str(), peer->getPort()));
+    return false;
+  }
   const size_t peerListSize = peers_.size();
   if(peerListSize >= maxPeerListSize_) {
     deleteUnusedPeer(peerListSize-maxPeerListSize_+1);
@@ -122,6 +128,10 @@ void DefaultPeerStorage::addPeer(const std::vector<SharedHandle<Peer> >& peers)
                        " added.",
                        peer->getIPAddress().c_str(), peer->getPort()));
       continue;
+    } else if(isBadPeer(peer->getIPAddress())) {
+      A2_LOG_DEBUG(fmt("Adding %s:%u is rejected because it is marked bad.",
+                       peer->getIPAddress().c_str(), peer->getPort()));
+      continue;
     } else {
       A2_LOG_DEBUG(fmt(MSG_ADDING_PEER,
                        peer->getIPAddress().c_str(), peer->getPort()));
@@ -301,6 +311,40 @@ TransferStat DefaultPeerStorage::getTransferStatFor
   }
 }
 
+bool DefaultPeerStorage::isBadPeer(const std::string& ipaddr)
+{
+  std::map<std::string, time_t>::iterator i = badPeers_.find(ipaddr);
+  if(i == badPeers_.end()) {
+    return false;
+  } else if(global::wallclock().getTime() >= (*i).second) {
+    badPeers_.erase(i);
+    return false;
+  } else {
+    return true;
+  }
+}
+
+void DefaultPeerStorage::addBadPeer(const std::string& ipaddr)
+{
+  if(lastBadPeerCleaned_.difference(global::wallclock()) >= 3600) {
+    for(std::map<std::string, time_t>::iterator i = badPeers_.begin(),
+          eoi = badPeers_.end(); i != eoi;) {
+      if(global::wallclock().getTime() >= (*i).second) {
+        A2_LOG_DEBUG(fmt("Purge %s from bad peer", (*i).first.c_str()));
+        badPeers_.erase(i++);
+        eoi = badPeers_.end();
+      } else {
+        ++i;
+      }
+    }
+    lastBadPeerCleaned_ = global::wallclock();
+  }
+  A2_LOG_DEBUG(fmt("Added %s as bad peer", ipaddr.c_str()));
+  // We use variable timeout to avoid many bad peers wake up at once.
+  badPeers_[ipaddr] = global::wallclock().getTime()+
+    std::max(SimpleRandomizer::getInstance()->getRandomNumber(601), 120L);
+}
+
 void DefaultPeerStorage::deleteUnusedPeer(size_t delSize) {
   std::deque<SharedHandle<Peer> > temp;
   for(std::deque<SharedHandle<Peer> >::const_reverse_iterator itr =

+ 7 - 0
src/DefaultPeerStorage.h

@@ -68,6 +68,9 @@ private:
 
   TransferStat cachedTransferStat_;
 
+  std::map<std::string, time_t> badPeers_;
+  Timer lastBadPeerCleaned_;
+
   bool isPeerAlreadyAdded(const SharedHandle<Peer>& peer);
 
   void addDroppedPeer(const SharedHandle<Peer>& peer);
@@ -100,6 +103,10 @@ public:
 
   virtual TransferStat getTransferStatFor(const SharedHandle<Peer>& peer);
 
+  virtual bool isBadPeer(const std::string& ipaddr);
+
+  virtual void addBadPeer(const std::string& ipaddr);
+
   virtual void returnPeer(const SharedHandle<Peer>& peer);
 
   virtual bool chokeRoundIntervalElapsed();

+ 11 - 0
src/PeerStorage.h

@@ -103,6 +103,17 @@ public:
 
   virtual TransferStat getTransferStatFor(const SharedHandle<Peer>& peer) = 0;
 
+  /**
+   * Returns true if peer with ipaddr should be ignored because, for
+   * example, it sends bad data.
+   */
+  virtual bool isBadPeer(const std::string& ipaddr) = 0;
+
+  /**
+   * Adds peer with ipaddr in bad peer set.
+   */
+  virtual void addBadPeer(const std::string& ipaddr) = 0;
+
   /**
    * Tells PeerStorage object that peer is no longer used in the session.
    */

+ 10 - 0
test/DefaultPeerStorageTest.cc

@@ -25,6 +25,7 @@ class DefaultPeerStorageTest:public CppUnit::TestFixture {
   CPPUNIT_TEST(testCalculateStat);
   CPPUNIT_TEST(testReturnPeer);
   CPPUNIT_TEST(testOnErasingPeer);
+  CPPUNIT_TEST(testAddBadPeer);
   CPPUNIT_TEST_SUITE_END();
 private:
   SharedHandle<BtRuntime> btRuntime;
@@ -48,6 +49,7 @@ public:
   void testCalculateStat();
   void testReturnPeer();
   void testOnErasingPeer();
+  void testAddBadPeer();
 };
 
 
@@ -262,4 +264,12 @@ void DefaultPeerStorageTest::testOnErasingPeer()
   // test this
 }
 
+void DefaultPeerStorageTest::testAddBadPeer()
+{
+  DefaultPeerStorage ps;
+  ps.addBadPeer("192.168.0.1");
+  CPPUNIT_ASSERT(ps.isBadPeer("192.168.0.1"));
+  CPPUNIT_ASSERT(!ps.isBadPeer("192.168.0.2"));
+}
+
 } // namespace aria2

+ 9 - 0
test/MockPeerStorage.h

@@ -71,6 +71,15 @@ public:
     this->stat = stat;
   }
 
+  virtual bool isBadPeer(const std::string& ipaddr)
+  {
+    return false;
+  }
+
+  virtual void addBadPeer(const std::string& ipaddr)
+  {
+  }
+
   virtual void returnPeer(const SharedHandle<Peer>& peer)
   {
   }