| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456 | /* <!-- copyright *//* * aria2 - The high speed download utility * * Copyright (C) 2006 Tatsuhiro Tsujikawa * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * * In addition, as a special exception, the copyright holders give * permission to link the code of portions of this program with the * OpenSSL library under certain conditions as described in each * individual source file, and distribute linked combinations * including the two. * You must obey the GNU General Public License in all respects * for all of the code used other than OpenSSL.  If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so.  If you * do not wish to do so, delete this exception statement from your * version.  If you delete this exception statement from all source * files in the program, then also delete it here. *//* copyright --> */#include "DefaultBtMessageDispatcher.h"#include <algorithm>#include "prefs.h"#include "BtAbortOutstandingRequestEvent.h"#include "BtCancelSendingPieceEvent.h"#include "BtChokingEvent.h"#include "BtMessageFactory.h"#include "message.h"#include "DownloadContext.h"#include "PeerStorage.h"#include "PieceStorage.h"#include "BtMessage.h"#include "Peer.h"#include "Piece.h"#include "LogFactory.h"#include "Logger.h"#include "a2functional.h"#include "a2algo.h"#include "RequestGroupMan.h"#include "RequestGroup.h"namespace aria2 {DefaultBtMessageDispatcher::DefaultBtMessageDispatcher():  cuid(0),  requestTimeout(0),  logger(LogFactory::getInstance()) {}DefaultBtMessageDispatcher::~DefaultBtMessageDispatcher(){  if(logger->debug()) {    logger->debug("DefaultBtMessageDispatcher::deleted");  }}void DefaultBtMessageDispatcher::addMessageToQueue(const BtMessageHandle& btMessage){  btMessage->onQueued();  messageQueue.push_back(btMessage);}void DefaultBtMessageDispatcher::addMessageToQueue(const std::vector<SharedHandle<BtMessage> >& btMessages){  for(std::vector<SharedHandle<BtMessage> >::const_iterator itr =        btMessages.begin(); itr != btMessages.end(); ++itr) {    addMessageToQueue(*itr);  }}void DefaultBtMessageDispatcher::sendMessages() {  std::vector<SharedHandle<BtMessage> > tempQueue;  while(!messageQueue.empty()) {    BtMessageHandle msg = messageQueue.front();    messageQueue.pop_front();    if(msg->isUploading() && !msg->isSendingInProgress()) {      if(_requestGroupMan->doesOverallUploadSpeedExceed() ||         _downloadContext->getOwnerRequestGroup()->doesUploadSpeedExceed()) {        tempQueue.push_back(msg);        continue;      }    }    msg->send();    if(msg->isUploading()) {      _peerStorage->updateTransferStatFor(peer);    }    if(msg->isSendingInProgress()) {      messageQueue.push_front(msg);      break;    }  }  if(!tempQueue.empty()) {    // Insert pending message to the front, so that message is likely sent in    // the same order as it is queued.    if(!messageQueue.empty() && messageQueue.front()->isSendingInProgress()) {      messageQueue.insert(messageQueue.begin()+1,                          tempQueue.begin(), tempQueue.end());    } else {      messageQueue.insert(messageQueue.begin(),                          tempQueue.begin(), tempQueue.end());    }  }}// Cancel sending piece message to peer.void DefaultBtMessageDispatcher::doCancelSendingPieceAction(size_t index, uint32_t begin, size_t length){  BtCancelSendingPieceEvent event(index, begin, length);  std::vector<SharedHandle<BtMessage> > tempQueue    (messageQueue.begin(), messageQueue.end());  forEachMemFunSH(tempQueue.begin(), tempQueue.end(),                  &BtMessage::onCancelSendingPieceEvent, event);}// Cancel sending piece message to peer.// TODO Is this method really necessary?void DefaultBtMessageDispatcher::doCancelSendingPieceAction(const SharedHandle<Piece>& piece){}class AbortOutstandingRequest {private:  SharedHandle<Piece> _piece;  int32_t _cuid;  Logger* _logger;public:  AbortOutstandingRequest(const SharedHandle<Piece>& piece, int32_t cuid):    _piece(piece),    _cuid(cuid),    _logger(LogFactory::getInstance()) {}  void operator()(const RequestSlot& slot) const  {    if(_logger->debug()) {      _logger->debug(MSG_DELETING_REQUEST_SLOT,                     _cuid,                     slot.getIndex(),                     slot.getBlockIndex());      _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin());    }    _piece->cancelBlock(slot.getBlockIndex());  }};// localhost cancels outstanding download requests to the peer.void DefaultBtMessageDispatcher::doAbortOutstandingRequestAction(const SharedHandle<Piece>& piece) {  RequestSlot rs(piece->getIndex(), 0, 0, 0);  std::deque<RequestSlot>::iterator first =    std::lower_bound(requestSlots.begin(), requestSlots.end(), rs);  rs.setIndex(piece->getIndex()+1);  std::deque<RequestSlot>::iterator last =    std::lower_bound(requestSlots.begin(), requestSlots.end(), rs);  std::for_each(first, last, AbortOutstandingRequest(piece, cuid));  requestSlots.erase(first, last);  BtAbortOutstandingRequestEvent event(piece);  std::vector<SharedHandle<BtMessage> > tempQueue    (messageQueue.begin(), messageQueue.end());  forEachMemFunSH(tempQueue.begin(), tempQueue.end(),                  &BtMessage::onAbortOutstandingRequestEvent, event);}class ProcessChokedRequestSlot {private:  int32_t _cuid;  SharedHandle<Peer> _peer;  SharedHandle<PieceStorage> _pieceStorage;  Logger* _logger;public:  ProcessChokedRequestSlot(int32_t cuid,                           const SharedHandle<Peer>& peer,                           const SharedHandle<PieceStorage>& pieceStorage):    _cuid(cuid),    _peer(peer),    _pieceStorage(pieceStorage),    _logger(LogFactory::getInstance()) {}    void operator()(const RequestSlot& slot) const  {    if(!_peer->isInPeerAllowedIndexSet(slot.getIndex())) {      if(_logger->debug()) {        _logger->debug(MSG_DELETING_REQUEST_SLOT_CHOKED,                       _cuid,                       slot.getIndex(),                       slot.getBlockIndex());        _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin());      }      SharedHandle<Piece> piece = _pieceStorage->getPiece(slot.getIndex());      piece->cancelBlock(slot.getBlockIndex());    }  }};class FindChokedRequestSlot {private:  SharedHandle<Peer> _peer;public:  FindChokedRequestSlot(const SharedHandle<Peer>& peer):    _peer(peer) {}    bool operator()(const RequestSlot& slot) const  {    return !_peer->isInPeerAllowedIndexSet(slot.getIndex());  }};// localhost received choke message from the peer.void DefaultBtMessageDispatcher::doChokedAction(){  std::for_each(requestSlots.begin(), requestSlots.end(),                ProcessChokedRequestSlot(cuid, peer, _pieceStorage));  requestSlots.erase(std::remove_if(requestSlots.begin(), requestSlots.end(),                                    FindChokedRequestSlot(peer)),                     requestSlots.end());}// localhost dispatched choke message to the peer.void DefaultBtMessageDispatcher::doChokingAction(){  BtChokingEvent event;  std::vector<SharedHandle<BtMessage> > tempQueue    (messageQueue.begin(), messageQueue.end());  forEachMemFunSH(tempQueue.begin(), tempQueue.end(),                  &BtMessage::onChokingEvent, event);}class ProcessStaleRequestSlot {private:  int32_t _cuid;  SharedHandle<Peer> _peer;  SharedHandle<PieceStorage> _pieceStorage;  BtMessageDispatcher* _messageDispatcher;  WeakHandle<BtMessageFactory> _messageFactory;  const struct timeval& _now;  time_t _requestTimeout;  Logger* _logger;public:  ProcessStaleRequestSlot(int32_t cuid, const SharedHandle<Peer>& peer,                          const SharedHandle<PieceStorage>& pieceStorage,                          BtMessageDispatcher* dispatcher,                          const WeakHandle<BtMessageFactory>& factory,                          const struct timeval& now,                          time_t requestTimeout):    _cuid(cuid),    _peer(peer),    _pieceStorage(pieceStorage),    _messageDispatcher(dispatcher),    _messageFactory(factory),    _now(now),    _requestTimeout(requestTimeout),    _logger(LogFactory::getInstance()) {}  void operator()(const RequestSlot& slot)  {    if(slot.isTimeout(_now, _requestTimeout)) {      if(_logger->debug()) {        _logger->debug(MSG_DELETING_REQUEST_SLOT_TIMEOUT,                       _cuid,                       slot.getBlockIndex());        _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin());      }      slot.getPiece()->cancelBlock(slot.getBlockIndex());      _peer->snubbing(true);    } else if(slot.getPiece()->hasBlock(slot.getBlockIndex())) {      if(_logger->debug()) {        _logger->debug(MSG_DELETING_REQUEST_SLOT_ACQUIRED,                       _cuid,                       slot.getBlockIndex());        _logger->debug("index=%d, begin=%d", slot.getIndex(), slot.getBegin());      }      _messageDispatcher->addMessageToQueue        (_messageFactory->createCancelMessage(slot.getIndex(),                                              slot.getBegin(),                                              slot.getLength()));    }  }};class FindStaleRequestSlot {private:  SharedHandle<PieceStorage> _pieceStorage;  const struct timeval& _now;  time_t _requestTimeout;public:  FindStaleRequestSlot(const SharedHandle<PieceStorage>& pieceStorage,                       const struct timeval& now,                       time_t requestTimeout):    _pieceStorage(pieceStorage),    _now(now),    _requestTimeout(requestTimeout) {}  bool operator()(const RequestSlot& slot)  {    if(slot.isTimeout(_now, _requestTimeout)) {      return true;    } else {      if(slot.getPiece()->hasBlock(slot.getBlockIndex())) {        return true;      } else {        return false;      }    }  }};void DefaultBtMessageDispatcher::checkRequestSlotAndDoNecessaryThing(){  struct timeval now;  gettimeofday(&now, 0);  std::for_each(requestSlots.begin(), requestSlots.end(),                ProcessStaleRequestSlot(cuid,                                        peer,                                        _pieceStorage,                                        this,                                        messageFactory,                                        now,                                        requestTimeout));  requestSlots.erase(std::remove_if(requestSlots.begin(), requestSlots.end(),                                    FindStaleRequestSlot(_pieceStorage,                                                         now,                                                         requestTimeout)),                     requestSlots.end());}bool DefaultBtMessageDispatcher::isSendingInProgress(){  if(messageQueue.size() > 0) {    return messageQueue.front()->isSendingInProgress();  } else {    return false;  }}class BlockIndexLess {public:  bool operator()(const RequestSlot& lhs, const RequestSlot& rhs) const  {    if(lhs.getIndex() == rhs.getIndex()) {      return lhs.getBlockIndex() < rhs.getBlockIndex();    } else {      return lhs.getIndex() < rhs.getIndex();    }  }};bool DefaultBtMessageDispatcher::isOutstandingRequest(size_t index, size_t blockIndex) {  RequestSlot rs(index, 0, 0, blockIndex);  std::deque<RequestSlot>::iterator i =    std::lower_bound(requestSlots.begin(), requestSlots.end(), rs, BlockIndexLess());  return i != requestSlots.end() &&    (*i).getIndex() == index && (*i).getBlockIndex() == blockIndex;}RequestSlotDefaultBtMessageDispatcher::getOutstandingRequest(size_t index, uint32_t begin, size_t length){  RequestSlot ret;  RequestSlot rs(index, begin, length, 0);  std::deque<RequestSlot>::iterator i =    std::lower_bound(requestSlots.begin(), requestSlots.end(), rs);  if(i != requestSlots.end() && (*i) == rs) {    ret = *i;  } else {    ret = RequestSlot::nullSlot;  }  return ret;}void DefaultBtMessageDispatcher::removeOutstandingRequest(const RequestSlot& slot){  std::deque<RequestSlot>::iterator i =    std::lower_bound(requestSlots.begin(), requestSlots.end(), slot);  if(i != requestSlots.end() && (*i) == slot) {    AbortOutstandingRequest(slot.getPiece(), cuid)(*i);    requestSlots.erase(i);  }}void DefaultBtMessageDispatcher::addOutstandingRequest(const RequestSlot& slot){  std::deque<RequestSlot>::iterator i =    std::lower_bound(requestSlots.begin(), requestSlots.end(), slot);  if(i == requestSlots.end() || (*i) != slot) {    requestSlots.insert(i, slot);  }}size_t DefaultBtMessageDispatcher::countOutstandingUpload(){  return std::count_if(messageQueue.begin(), messageQueue.end(),                       mem_fun_sh(&BtMessage::isUploading));}void DefaultBtMessageDispatcher::setPeer(const SharedHandle<Peer>& peer){  this->peer = peer;}void DefaultBtMessageDispatcher::setDownloadContext(const SharedHandle<DownloadContext>& downloadContext){  _downloadContext = downloadContext;}void DefaultBtMessageDispatcher::setPieceStorage(const SharedHandle<PieceStorage>& pieceStorage){  _pieceStorage = pieceStorage;}void DefaultBtMessageDispatcher::setPeerStorage(const SharedHandle<PeerStorage>& peerStorage){  _peerStorage = peerStorage;}void DefaultBtMessageDispatcher::setBtMessageFactory(const WeakHandle<BtMessageFactory>& factory){  this->messageFactory = factory;}void DefaultBtMessageDispatcher::setRequestGroupMan(const WeakHandle<RequestGroupMan>& rgman){  _requestGroupMan = rgman;}} // namespace aria2
 |