/* */ #include "DefaultPeerStorage.h" #include #include "LogFactory.h" #include "Logger.h" #include "message.h" #include "Peer.h" #include "BtRuntime.h" #include "BtSeederStateChoke.h" #include "BtLeecherStateChoke.h" #include "PieceStorage.h" #include "wallclock.h" namespace aria2 { static const int MAX_PEER_LIST_SIZE = 1024; DefaultPeerStorage::DefaultPeerStorage(): _logger(LogFactory::getInstance()), _removedPeerSessionDownloadLength(0), _removedPeerSessionUploadLength(0), _seederStateChoke(new BtSeederStateChoke()), _leecherStateChoke(new BtLeecherStateChoke()), _lastTransferStatMapUpdated(0) {} DefaultPeerStorage::~DefaultPeerStorage() { delete _seederStateChoke; delete _leecherStateChoke; } class FindIdenticalPeer { private: SharedHandle _peer; public: FindIdenticalPeer(const SharedHandle& peer):_peer(peer) {} bool operator()(const SharedHandle& peer) const { return (_peer == peer) || ((_peer->getIPAddress() == peer->getIPAddress()) && (_peer->getPort() == peer->getPort())); } }; bool DefaultPeerStorage::isPeerAlreadyAdded(const SharedHandle& peer) { return std::find_if(_peers.begin(), _peers.end(), FindIdenticalPeer(peer)) != _peers.end(); } static size_t calculateMaxPeerListSize(const SharedHandle& btRuntime) { if(btRuntime.isNull()) { return MAX_PEER_LIST_SIZE; } return btRuntime->getMaxPeers() == 0 ? MAX_PEER_LIST_SIZE : btRuntime->getMaxPeers()+(btRuntime->getMaxPeers() >> 2); } bool DefaultPeerStorage::addPeer(const SharedHandle& peer) { if(isPeerAlreadyAdded(peer)) { if(_logger->debug()) { _logger->debug("Adding %s:%u is rejected because it has been already" " added.", peer->getIPAddress().c_str(), peer->getPort()); } return false; } size_t maxPeerListSize = calculateMaxPeerListSize(_btRuntime); if(_peers.size() >= maxPeerListSize) { deleteUnusedPeer(_peers.size()-maxPeerListSize+1); } _peers.push_front(peer); return true; } void DefaultPeerStorage::addPeer(const std::vector >& peers) { for(std::vector >::const_iterator itr = peers.begin(), eoi = peers.end(); itr != eoi; ++itr) { const SharedHandle& peer = *itr; if(addPeer(peer)) { if(_logger->debug()) { _logger->debug(MSG_ADDING_PEER, peer->getIPAddress().c_str(), peer->getPort()); } } } } const std::deque >& DefaultPeerStorage::getPeers() { return _peers; } class FindFinePeer { public: bool operator()(const SharedHandle& peer) const { return peer->unused() && peer->isGood(); } }; SharedHandle DefaultPeerStorage::getUnusedPeer() { std::deque >::const_iterator itr = std::find_if(_peers.begin(), _peers.end(), FindFinePeer()); if(itr == _peers.end()) { return SharedHandle(); } else { return *itr; } } class FindPeer { private: std::string ipaddr; uint16_t port; public: FindPeer(const std::string& ipaddr, uint16_t port): ipaddr(ipaddr), port(port) {} bool operator()(const SharedHandle& peer) const { return ipaddr == peer->getIPAddress() && port == peer->getPort(); } }; SharedHandle DefaultPeerStorage::getPeer(const std::string& ipaddr, uint16_t port) const { std::deque >::const_iterator itr = std::find_if(_peers.begin(), _peers.end(), FindPeer(ipaddr, port)); if(itr == _peers.end()) { return SharedHandle(); } else { return *itr; } } size_t DefaultPeerStorage::countPeer() const { return _peers.size(); } bool DefaultPeerStorage::isPeerAvailable() { return !getUnusedPeer().isNull(); } class CollectActivePeer { private: std::vector >& _activePeers; public: CollectActivePeer(std::vector >& activePeers): _activePeers(activePeers) {} void operator()(const SharedHandle& peer) { if(peer->isActive()) { _activePeers.push_back(peer); } } }; void DefaultPeerStorage::getActivePeers (std::vector >& activePeers) { std::for_each(_peers.begin(), _peers.end(), CollectActivePeer(activePeers)); } static TransferStat calculateStatFor(const SharedHandle& peer) { TransferStat s; s.downloadSpeed = peer->calculateDownloadSpeed(); s.uploadSpeed = peer->calculateUploadSpeed(); s.sessionDownloadLength = peer->getSessionDownloadLength(); s.sessionUploadLength = peer->getSessionUploadLength(); return s; } TransferStat DefaultPeerStorage::calculateStat() { TransferStat stat; if(_lastTransferStatMapUpdated.differenceInMillis(global::wallclock) >= 250) { if(_logger->debug()) { _logger->debug("Updating TransferStat of PeerStorage"); } _lastTransferStatMapUpdated = global::wallclock; _peerTransferStatMap.clear(); std::vector > activePeers; getActivePeers(activePeers); for(std::vector >::const_iterator i = activePeers.begin(), eoi = activePeers.end(); i != eoi; ++i) { TransferStat s; s.downloadSpeed = (*i)->calculateDownloadSpeed(); s.uploadSpeed = (*i)->calculateUploadSpeed(); s.sessionDownloadLength = (*i)->getSessionDownloadLength(); s.sessionUploadLength = (*i)->getSessionUploadLength(); _peerTransferStatMap[(*i)->getID()] = calculateStatFor(*i); stat += s; } _cachedTransferStat = stat; } else { stat = _cachedTransferStat; } stat.sessionDownloadLength += _removedPeerSessionDownloadLength; stat.sessionUploadLength += _removedPeerSessionUploadLength; stat.setAllTimeUploadLength(_btRuntime->getUploadLengthAtStartup()+ stat.getSessionUploadLength()); return stat; } void DefaultPeerStorage::updateTransferStatFor(const SharedHandle& peer) { if(_logger->debug()) { _logger->debug("Updating TransferStat for peer %s", peer->getID().c_str()); } std::map::iterator itr = _peerTransferStatMap.find(peer->getID()); if(itr == _peerTransferStatMap.end()) { return; } _cachedTransferStat -= (*itr).second; TransferStat s = calculateStatFor(peer); _cachedTransferStat += s; (*itr).second = s; } TransferStat DefaultPeerStorage::getTransferStatFor (const SharedHandle& peer) { std::map::const_iterator itr = _peerTransferStatMap.find(peer->getID()); if(itr == _peerTransferStatMap.end()) { return TransferStat(); } else { return (*itr).second; } } void DefaultPeerStorage::deleteUnusedPeer(size_t delSize) { std::deque > temp; for(std::deque >::const_reverse_iterator itr = _peers.rbegin(), eoi = _peers.rend(); itr != eoi; ++itr) { const SharedHandle& p = *itr; if(p->unused() && delSize > 0) { onErasingPeer(p); delSize--; } else { temp.push_front(p); } } _peers.swap(temp); } void DefaultPeerStorage::onErasingPeer(const SharedHandle& peer) {} void DefaultPeerStorage::onReturningPeer(const SharedHandle& peer) { if(peer->isActive()) { TransferStat removedStat(calculateStatFor(peer)); _removedPeerSessionDownloadLength += removedStat.getSessionDownloadLength(); _removedPeerSessionUploadLength += removedStat.getSessionUploadLength(); _cachedTransferStat -= removedStat; // Execute choking algorithm if unchoked and interested peer is // disconnected. if(!peer->amChoking() && peer->peerInterested()) { executeChoke(); } } } void DefaultPeerStorage::returnPeer(const SharedHandle& peer) { std::deque >::iterator itr = std::find(_peers.begin(), _peers.end(), peer); if(itr == _peers.end()) { if(_logger->debug()) { _logger->debug("Cannot find peer %s:%u in PeerStorage.", peer->getIPAddress().c_str(), peer->getPort()); } } else { _peers.erase(itr); onReturningPeer(peer); onErasingPeer(peer); } } bool DefaultPeerStorage::chokeRoundIntervalElapsed() { const time_t CHOKE_ROUND_INTERVAL = 10; if(_pieceStorage->downloadFinished()) { return _seederStateChoke->getLastRound(). difference(global::wallclock) >= CHOKE_ROUND_INTERVAL; } else { return _leecherStateChoke->getLastRound(). difference(global::wallclock) >= CHOKE_ROUND_INTERVAL; } } void DefaultPeerStorage::executeChoke() { std::vector > activePeers; getActivePeers(activePeers); if(_pieceStorage->downloadFinished()) { return _seederStateChoke->executeChoke(activePeers); } else { return _leecherStateChoke->executeChoke(activePeers); } } void DefaultPeerStorage::setPieceStorage(const SharedHandle& ps) { _pieceStorage = ps; } void DefaultPeerStorage::setBtRuntime(const SharedHandle& btRuntime) { _btRuntime = btRuntime; } } // namespace aria2