| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661 | /* <!-- copyright *//* * aria2 - The high speed download utility * * Copyright (C) 2006 Tatsuhiro Tsujikawa * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * * In addition, as a special exception, the copyright holders give * permission to link the code of portions of this program with the * OpenSSL library under certain conditions as described in each * individual source file, and distribute linked combinations * including the two. * You must obey the GNU General Public License in all respects * for all of the code used other than OpenSSL.  If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so.  If you * do not wish to do so, delete this exception statement from your * version.  If you delete this exception statement from all source * files in the program, then also delete it here. *//* copyright --> */#include "DefaultBtInteractive.h"#include <cstring>#include <vector>#include "prefs.h"#include "message.h"#include "BtHandshakeMessage.h"#include "util.h"#include "BtKeepAliveMessage.h"#include "BtChokeMessage.h"#include "BtUnchokeMessage.h"#include "BtRequestMessage.h"#include "BtPieceMessage.h"#include "DlAbortEx.h"#include "BtExtendedMessage.h"#include "HandshakeExtensionMessage.h"#include "UTPexExtensionMessage.h"#include "DefaultExtensionMessageFactory.h"#include "ExtensionMessageRegistry.h"#include "DHTNode.h"#include "Peer.h"#include "Piece.h"#include "DownloadContext.h"#include "PieceStorage.h"#include "PeerStorage.h"#include "BtRuntime.h"#include "BtMessageReceiver.h"#include "BtMessageDispatcher.h"#include "BtMessageFactory.h"#include "BtRequestFactory.h"#include "PeerConnection.h"#include "Logger.h"#include "LogFactory.h"#include "fmt.h"#include "RequestGroup.h"#include "RequestGroupMan.h"#include "bittorrent_helper.h"#include "UTMetadataRequestFactory.h"#include "UTMetadataRequestTracker.h"#include "wallclock.h"namespace aria2 {DefaultBtInteractive::DefaultBtInteractive(const std::shared_ptr<DownloadContext>& downloadContext, const std::shared_ptr<Peer>& peer)  : cuid_(0),    downloadContext_(downloadContext),    peer_(peer),    metadataGetMode_(false),    localNode_(0),    allowedFastSetSize_(10),    haveTimer_(global::wallclock()),    keepAliveTimer_(global::wallclock()),    floodingTimer_(global::wallclock()),    inactiveTimer_(global::wallclock()),    pexTimer_(global::wallclock()),    perSecTimer_(global::wallclock()),    keepAliveInterval_(120),    utPexEnabled_(false),    dhtEnabled_(false),    numReceivedMessage_(0),    maxOutstandingRequest_(DEFAULT_MAX_OUTSTANDING_REQUEST),    requestGroupMan_(0),    tcpPort_(0),    haveLastSent_(global::wallclock()){}DefaultBtInteractive::~DefaultBtInteractive() {}void DefaultBtInteractive::initiateHandshake() {  std::shared_ptr<BtMessage> message =    messageFactory_->createHandshakeMessage    (bittorrent::getInfoHash(downloadContext_), bittorrent::getStaticPeerId());  dispatcher_->addMessageToQueue(message);  dispatcher_->sendMessages();}std::shared_ptr<BtMessage> DefaultBtInteractive::receiveHandshake(bool quickReply) {  std::shared_ptr<BtHandshakeMessage> message =    btMessageReceiver_->receiveHandshake(quickReply);  if(!message) {    return std::shared_ptr<BtMessage>();  }  if(memcmp(message->getPeerId(), bittorrent::getStaticPeerId(),            PEER_ID_LENGTH) == 0) {    throw DL_ABORT_EX      (fmt("CUID#%" PRId64 " - Drop connection from the same Peer ID",           cuid_));  }  const PeerSet& usedPeers = peerStorage_->getUsedPeers();  for(PeerSet::const_iterator i = usedPeers.begin(), eoi = usedPeers.end();      i != eoi; ++i) {    if((*i)->isActive() &&       memcmp((*i)->getPeerId(), message->getPeerId(), PEER_ID_LENGTH) == 0) {      throw DL_ABORT_EX        (fmt("CUID#%" PRId64 " - Same Peer ID has been already seen.",             cuid_));    }  }  peer_->setPeerId(message->getPeerId());  if(message->isFastExtensionSupported()) {    peer_->setFastExtensionEnabled(true);    A2_LOG_INFO(fmt(MSG_FAST_EXTENSION_ENABLED, cuid_));  }  if(message->isExtendedMessagingEnabled()) {    peer_->setExtendedMessagingEnabled(true);    if(!utPexEnabled_) {      extensionMessageRegistry_->removeExtension        (ExtensionMessageRegistry::UT_PEX);    }    A2_LOG_INFO(fmt(MSG_EXTENDED_MESSAGING_ENABLED, cuid_));  }  if(message->isDHTEnabled()) {    peer_->setDHTEnabled(true);    A2_LOG_INFO(fmt(MSG_DHT_ENABLED_PEER, cuid_));  }  A2_LOG_INFO(fmt(MSG_RECEIVE_PEER_MESSAGE, cuid_,                  peer_->getIPAddress().c_str(), peer_->getPort(),                  message->toString().c_str()));  return message;}std::shared_ptr<BtMessage> DefaultBtInteractive::receiveAndSendHandshake() {  return receiveHandshake(true);}void DefaultBtInteractive::doPostHandshakeProcessing() {  // Set time 0 to haveTimer to cache http/ftp download piece completion  haveTimer_.reset(0);  keepAliveTimer_ = global::wallclock();  floodingTimer_ = global::wallclock();  pexTimer_.reset(0);  if(peer_->isExtendedMessagingEnabled()) {    addHandshakeExtendedMessageToQueue();  }  if(!metadataGetMode_) {    addBitfieldMessageToQueue();  }  if(peer_->isDHTEnabled() && dhtEnabled_) {    addPortMessageToQueue();  }  if(!metadataGetMode_) {    addAllowedFastMessageToQueue();  }  sendPendingMessage();}void DefaultBtInteractive::addPortMessageToQueue(){  dispatcher_->addMessageToQueue    (messageFactory_->createPortMessage(localNode_->getPort()));}void DefaultBtInteractive::addHandshakeExtendedMessageToQueue(){  std::shared_ptr<HandshakeExtensionMessage> m(new HandshakeExtensionMessage());  m->setClientVersion("aria2/" PACKAGE_VERSION);  m->setTCPPort(tcpPort_);  m->setExtensions(extensionMessageRegistry_->getExtensions());  auto attrs = bittorrent::getTorrentAttrs(downloadContext_);  if(!attrs->metadata.empty()) {    m->setMetadataSize(attrs->metadataSize);  }  std::shared_ptr<BtMessage> msg = messageFactory_->createBtExtendedMessage(m);  dispatcher_->addMessageToQueue(msg);}void DefaultBtInteractive::addBitfieldMessageToQueue() {  if(peer_->isFastExtensionEnabled()) {    if(pieceStorage_->allDownloadFinished()) {      dispatcher_->addMessageToQueue(messageFactory_->createHaveAllMessage());    } else if(pieceStorage_->getCompletedLength() > 0) {      dispatcher_->addMessageToQueue(messageFactory_->createBitfieldMessage());    } else {      dispatcher_->addMessageToQueue(messageFactory_->createHaveNoneMessage());    }  } else {    if(pieceStorage_->getCompletedLength() > 0) {      dispatcher_->addMessageToQueue(messageFactory_->createBitfieldMessage());    }  }}void DefaultBtInteractive::addAllowedFastMessageToQueue() {  if(peer_->isFastExtensionEnabled()) {    std::vector<size_t> fastSet;    bittorrent::computeFastSet(fastSet, peer_->getIPAddress(),                               downloadContext_->getNumPieces(),                               bittorrent::getInfoHash(downloadContext_),                               allowedFastSetSize_);    for(std::vector<size_t>::const_iterator itr = fastSet.begin(),          eoi = fastSet.end(); itr != eoi; ++itr) {      dispatcher_->addMessageToQueue        (messageFactory_->createAllowedFastMessage(*itr));    }  }}void DefaultBtInteractive::decideChoking() {  if(peer_->shouldBeChoking()) {    if(!peer_->amChoking()) {      dispatcher_->addMessageToQueue(messageFactory_->createChokeMessage());    }  } else {    if(peer_->amChoking()) {      dispatcher_->addMessageToQueue(messageFactory_->createUnchokeMessage());    }  }}void DefaultBtInteractive::checkHave() {  const size_t MIN_HAVE_PACK_SIZE = 20;  const time_t MAX_HAVE_DELAY_SEC = 10;  pieceStorage_->getAdvertisedPieceIndexes(haveIndexes_, cuid_, haveTimer_);  haveTimer_ = global::wallclock();  if(haveIndexes_.size() >= MIN_HAVE_PACK_SIZE) {    if(peer_->isFastExtensionEnabled() &&       pieceStorage_->allDownloadFinished()) {      dispatcher_->addMessageToQueue(messageFactory_->createHaveAllMessage());    } else {      dispatcher_->addMessageToQueue(messageFactory_->createBitfieldMessage());    }    haveIndexes_.clear();  } else {    if(haveIndexes_.size() >= MIN_HAVE_PACK_SIZE ||       haveLastSent_.difference(global::wallclock()) >= MAX_HAVE_DELAY_SEC) {      haveLastSent_ = global::wallclock();      for(std::vector<size_t>::const_iterator itr = haveIndexes_.begin(),            eoi = haveIndexes_.end(); itr != eoi; ++itr) {        dispatcher_->addMessageToQueue(messageFactory_->                                       createHaveMessage(*itr));      }      haveIndexes_.clear();    }  }}void DefaultBtInteractive::sendKeepAlive() {  if(keepAliveTimer_.difference(global::wallclock()) >= keepAliveInterval_) {    dispatcher_->addMessageToQueue(messageFactory_->createKeepAliveMessage());    dispatcher_->sendMessages();    keepAliveTimer_ = global::wallclock();  }}size_t DefaultBtInteractive::receiveMessages() {  size_t countOldOutstandingRequest = dispatcher_->countOutstandingRequest();  size_t msgcount = 0;  while(1) {    if(requestGroupMan_->doesOverallDownloadSpeedExceed() ||       downloadContext_->getOwnerRequestGroup()->doesDownloadSpeedExceed()) {      break;    }    std::shared_ptr<BtMessage> message = btMessageReceiver_->receiveMessage();    if(!message) {      break;    }    ++msgcount;    A2_LOG_INFO(fmt(MSG_RECEIVE_PEER_MESSAGE,                    cuid_,                    peer_->getIPAddress().c_str(), peer_->getPort(),                    message->toString().c_str()));    message->doReceivedAction();    switch(message->getId()) {    case BtKeepAliveMessage::ID:      floodingStat_.incKeepAliveCount();      break;    case BtChokeMessage::ID:      if(!peer_->peerChoking()) {        floodingStat_.incChokeUnchokeCount();      }      break;    case BtUnchokeMessage::ID:      if(peer_->peerChoking()) {        floodingStat_.incChokeUnchokeCount();      }      break;    case BtPieceMessage::ID:    case BtRequestMessage::ID:      inactiveTimer_ = global::wallclock();      break;    }  }  if(!pieceStorage_->isEndGame() &&     countOldOutstandingRequest > dispatcher_->countOutstandingRequest() &&     (countOldOutstandingRequest - dispatcher_->countOutstandingRequest())*4 >=     maxOutstandingRequest_) {    maxOutstandingRequest_ =      std::min((size_t)UB_MAX_OUTSTANDING_REQUEST,               maxOutstandingRequest_*2);  }  return msgcount;}void DefaultBtInteractive::decideInterest() {  if(pieceStorage_->hasMissingPiece(peer_)) {    if(!peer_->amInterested()) {      A2_LOG_DEBUG(fmt(MSG_PEER_INTERESTED, cuid_));      dispatcher_->        addMessageToQueue(messageFactory_->createInterestedMessage());    }  } else {    if(peer_->amInterested()) {      A2_LOG_DEBUG(fmt(MSG_PEER_NOT_INTERESTED, cuid_));      dispatcher_->        addMessageToQueue(messageFactory_->createNotInterestedMessage());    }  }}void DefaultBtInteractive::fillPiece(size_t maxMissingBlock) {  if(pieceStorage_->hasMissingPiece(peer_)) {    size_t numMissingBlock = btRequestFactory_->countMissingBlock();    if(numMissingBlock >= maxMissingBlock) {      return;    }    size_t diffMissingBlock = maxMissingBlock-numMissingBlock;    std::vector<std::shared_ptr<Piece> > pieces;    if(peer_->peerChoking()) {      if(peer_->isFastExtensionEnabled()) {        if(pieceStorage_->isEndGame()) {          std::vector<size_t> excludedIndexes;          excludedIndexes.reserve(btRequestFactory_->countTargetPiece());          btRequestFactory_->getTargetPieceIndexes(excludedIndexes);          pieceStorage_->getMissingFastPiece            (pieces, diffMissingBlock, peer_, excludedIndexes, cuid_);        } else {          pieces.reserve(diffMissingBlock);          pieceStorage_->getMissingFastPiece            (pieces, diffMissingBlock, peer_, cuid_);        }      }    } else {      if(pieceStorage_->isEndGame()) {        std::vector<size_t> excludedIndexes;        excludedIndexes.reserve(btRequestFactory_->countTargetPiece());        btRequestFactory_->getTargetPieceIndexes(excludedIndexes);        pieceStorage_->getMissingPiece          (pieces, diffMissingBlock, peer_, excludedIndexes, cuid_);      } else {        pieces.reserve(diffMissingBlock);        pieceStorage_->getMissingPiece(pieces, diffMissingBlock, peer_, cuid_);      }    }    for(std::vector<std::shared_ptr<Piece> >::const_iterator i =          pieces.begin(), eoi = pieces.end(); i != eoi; ++i) {      btRequestFactory_->addTargetPiece(*i);    }  }}void DefaultBtInteractive::addRequests() {  if(!pieceStorage_->isEndGame() && !pieceStorage_->hasMissingUnusedPiece()) {    pieceStorage_->enterEndGame();  }  fillPiece(maxOutstandingRequest_);  size_t reqNumToCreate =    maxOutstandingRequest_ <= dispatcher_->countOutstandingRequest() ?    0 : maxOutstandingRequest_-dispatcher_->countOutstandingRequest();  if(reqNumToCreate > 0) {    std::vector<std::shared_ptr<BtMessage> > requests;    requests.reserve(reqNumToCreate);    if(pieceStorage_->isEndGame()) {      btRequestFactory_->createRequestMessagesOnEndGame(requests,reqNumToCreate);    } else {      btRequestFactory_->createRequestMessages(requests, reqNumToCreate);    }    dispatcher_->addMessageToQueue(requests);  }}void DefaultBtInteractive::cancelAllPiece() {  btRequestFactory_->removeAllTargetPiece();  if(metadataGetMode_ && downloadContext_->getTotalLength() > 0) {    std::vector<size_t> metadataRequests =      utMetadataRequestTracker_->getAllTrackedIndex();    for(std::vector<size_t>::const_iterator i = metadataRequests.begin(),          eoi = metadataRequests.end(); i != eoi; ++i) {      A2_LOG_DEBUG(fmt("Cancel metadata: piece=%lu",                       static_cast<unsigned long>(*i)));      pieceStorage_->cancelPiece(pieceStorage_->getPiece(*i), cuid_);    }  }}void DefaultBtInteractive::sendPendingMessage() {  dispatcher_->sendMessages();}void DefaultBtInteractive::detectMessageFlooding() {  if(floodingTimer_.     difference(global::wallclock()) >= FLOODING_CHECK_INTERVAL) {    if(floodingStat_.getChokeUnchokeCount() >= 2 ||       floodingStat_.getKeepAliveCount() >= 2) {      throw DL_ABORT_EX(EX_FLOODING_DETECTED);    } else {      floodingStat_.reset();    }    floodingTimer_ = global::wallclock();  }}void DefaultBtInteractive::checkActiveInteraction(){  time_t inactiveTime = inactiveTimer_.difference(global::wallclock());  // To allow aria2 to accept mutially interested peer, disconnect unintersted  // peer.  {    const time_t interval = 30;    if(!peer_->amInterested() && !peer_->peerInterested() &&       inactiveTime >= interval) {      peer_->setDisconnectedGracefully(true);      // TODO change the message      throw DL_ABORT_EX        (fmt("Disconnect peer because we are not interested each other"             " after %ld second(s).",             static_cast<long int>(interval)));    }  }  // Since the peers which are *just* connected and do nothing to improve  // mutual download progress are completely waste of resources, those peers  // are disconnected in a certain time period.  {    const time_t interval = 60;    if(inactiveTime >= interval) {      peer_->setDisconnectedGracefully(true);      throw DL_ABORT_EX        (fmt(EX_DROP_INACTIVE_CONNECTION,             static_cast<long int>(interval)));    }  }  // If both of us are seeders, drop connection.  if(peer_->isSeeder() && pieceStorage_->downloadFinished()) {    throw DL_ABORT_EX(MSG_GOOD_BYE_SEEDER);  }}void DefaultBtInteractive::addPeerExchangeMessage(){  if(pexTimer_.     difference(global::wallclock()) >= UTPexExtensionMessage::DEFAULT_INTERVAL) {    std::shared_ptr<UTPexExtensionMessage> m      (new UTPexExtensionMessage(peer_->getExtensionMessageID                                 (ExtensionMessageRegistry::UT_PEX)));    const PeerSet& usedPeers = peerStorage_->getUsedPeers();    for(PeerSet::const_iterator i = usedPeers.begin(), eoi = usedPeers.end();        i != eoi && !m->freshPeersAreFull(); ++i) {      if((*i)->isActive() && peer_->getIPAddress() != (*i)->getIPAddress()) {        m->addFreshPeer(*i);      }    }    const std::deque<std::shared_ptr<Peer> >& droppedPeers =      peerStorage_->getDroppedPeers();    for(std::deque<std::shared_ptr<Peer> >::const_iterator i =          droppedPeers.begin(), eoi = droppedPeers.end();        i != eoi && !m->droppedPeersAreFull();        ++i) {      if(peer_->getIPAddress() != (*i)->getIPAddress()) {        m->addDroppedPeer(*i);      }    }    std::shared_ptr<BtMessage> msg = messageFactory_->createBtExtendedMessage(m);    dispatcher_->addMessageToQueue(msg);    pexTimer_ = global::wallclock();  }}void DefaultBtInteractive::doInteractionProcessing() {  if(metadataGetMode_) {    sendKeepAlive();    numReceivedMessage_ = receiveMessages();    // PieceStorage is re-initialized with metadata_size in    // HandshakeExtensionMessage::doReceivedAction().    pieceStorage_ =      downloadContext_->getOwnerRequestGroup()->getPieceStorage();    if(peer_->getExtensionMessageID(ExtensionMessageRegistry::UT_METADATA) &&       downloadContext_->getTotalLength() > 0) {      size_t num = utMetadataRequestTracker_->avail();      if(num > 0) {        std::vector<std::shared_ptr<BtMessage> > requests;        utMetadataRequestFactory_->create(requests, num, pieceStorage_);        dispatcher_->addMessageToQueue(requests);      }      if(perSecTimer_.difference(global::wallclock()) >= 1) {        perSecTimer_ = global::wallclock();        // Drop timeout request after queuing message to give a chance        // to other connection to request piece.        std::vector<size_t> indexes =          utMetadataRequestTracker_->removeTimeoutEntry();        for(std::vector<size_t>::const_iterator i = indexes.begin(),              eoi = indexes.end(); i != eoi; ++i) {          pieceStorage_->cancelPiece(pieceStorage_->getPiece(*i), cuid_);        }      }      if(pieceStorage_->downloadFinished()) {        downloadContext_->getOwnerRequestGroup()->setForceHaltRequested          (true, RequestGroup::NONE);      }    }  } else {    checkActiveInteraction();    decideChoking();    detectMessageFlooding();    if(perSecTimer_.difference(global::wallclock()) >= 1) {      perSecTimer_ = global::wallclock();      dispatcher_->checkRequestSlotAndDoNecessaryThing();    }    checkHave();    sendKeepAlive();    numReceivedMessage_ = receiveMessages();    btRequestFactory_->removeCompletedPiece();    decideInterest();    if(!pieceStorage_->downloadFinished()) {      addRequests();    }  }  if(peer_->getExtensionMessageID(ExtensionMessageRegistry::UT_PEX) &&     utPexEnabled_) {    addPeerExchangeMessage();  }  sendPendingMessage();}void DefaultBtInteractive::setLocalNode(DHTNode* node){  localNode_ = node;}size_t DefaultBtInteractive::countPendingMessage(){  return dispatcher_->countMessageInQueue();}bool DefaultBtInteractive::isSendingMessageInProgress(){  return dispatcher_->isSendingInProgress();}size_t DefaultBtInteractive::countReceivedMessageInIteration() const{  return numReceivedMessage_;}size_t DefaultBtInteractive::countOutstandingRequest(){  if(metadataGetMode_) {    return utMetadataRequestTracker_->count();  } else {    return dispatcher_->countOutstandingRequest();  }}void DefaultBtInteractive::setBtRuntime(const std::shared_ptr<BtRuntime>& btRuntime){  btRuntime_ = btRuntime;}void DefaultBtInteractive::setPieceStorage(const std::shared_ptr<PieceStorage>& pieceStorage){  pieceStorage_ = pieceStorage;}void DefaultBtInteractive::setPeerStorage(const std::shared_ptr<PeerStorage>& peerStorage){  peerStorage_ = peerStorage;}void DefaultBtInteractive::setPeer(const std::shared_ptr<Peer>& peer){  peer_ = peer;}void DefaultBtInteractive::setBtMessageReceiver(const std::shared_ptr<BtMessageReceiver>& receiver){  btMessageReceiver_ = receiver;}void DefaultBtInteractive::setDispatcher(const std::shared_ptr<BtMessageDispatcher>& dispatcher){  dispatcher_ = dispatcher;}void DefaultBtInteractive::setBtRequestFactory(const std::shared_ptr<BtRequestFactory>& factory){  btRequestFactory_ = factory;}void DefaultBtInteractive::setPeerConnection(const std::shared_ptr<PeerConnection>& peerConnection){  peerConnection_ = peerConnection;}void DefaultBtInteractive::setExtensionMessageFactory(const std::shared_ptr<ExtensionMessageFactory>& factory){  extensionMessageFactory_ = factory;}void DefaultBtInteractive::setBtMessageFactory(const std::shared_ptr<BtMessageFactory>& factory){  messageFactory_ = factory;}void DefaultBtInteractive::setRequestGroupMan(RequestGroupMan* rgman){  requestGroupMan_ = rgman;}} // namespace aria2
 |