/* */ #include "DHTPeerAnnounceStorage.h" #include #include #include "DHTPeerAnnounceEntry.h" #include "Peer.h" #include "PeerStorage.h" #include "DHTConstants.h" #include "DHTTaskQueue.h" #include "DHTTaskFactory.h" #include "DHTTask.h" #include "LogFactory.h" #include "Logger.h" #include "util.h" #include "a2functional.h" namespace aria2 { DHTPeerAnnounceStorage::DHTPeerAnnounceStorage(): _logger(LogFactory::getInstance()) {} DHTPeerAnnounceStorage::~DHTPeerAnnounceStorage() {} class InfoHashLess { public: bool operator()(const SharedHandle& lhs, const SharedHandle& rhs) { return memcmp(lhs->getInfoHash(), rhs->getInfoHash(), DHT_ID_LENGTH) < 0; } }; SharedHandle DHTPeerAnnounceStorage::getPeerAnnounceEntry(const unsigned char* infoHash) { SharedHandle entry(new DHTPeerAnnounceEntry(infoHash)); std::deque >::iterator i = std::lower_bound(_entries.begin(), _entries.end(), entry, InfoHashLess()); if(i != _entries.end() && memcmp(infoHash, (*i)->getInfoHash(), DHT_ID_LENGTH) == 0) { entry = *i; } else { _entries.insert(i, entry); } return entry; } void DHTPeerAnnounceStorage::addPeerAnnounce(const unsigned char* infoHash, const std::string& ipaddr, uint16_t port) { _logger->debug("Adding %s:%u to peer announce list: infoHash=%s", ipaddr.c_str(), port, util::toHex(infoHash, DHT_ID_LENGTH).c_str()); getPeerAnnounceEntry(infoHash)->addPeerAddrEntry(PeerAddrEntry(ipaddr, port)); } // add peer announce as localhost downloading the content void DHTPeerAnnounceStorage::addPeerAnnounce (const unsigned char* infoHash, const SharedHandle& peerStorage) { _logger->debug("Adding localhost to peer announce list: infoHash=%s", util::toHex(infoHash, DHT_ID_LENGTH).c_str()); SharedHandle entry = getPeerAnnounceEntry(infoHash); entry->setPeerStorage(peerStorage); } void DHTPeerAnnounceStorage::removeLocalPeerAnnounce (const unsigned char* infoHash) { SharedHandle entry(new DHTPeerAnnounceEntry(infoHash)); std::deque >::iterator i = std::lower_bound(_entries.begin(), _entries.end(), entry, InfoHashLess()); if(i != _entries.end() && memcmp(infoHash, (*i)->getInfoHash(), DHT_ID_LENGTH) == 0) { (*i)->clearLocal(); if((*i)->empty()) { _entries.erase(i); } } } bool DHTPeerAnnounceStorage::contains(const unsigned char* infoHash) const { SharedHandle entry(new DHTPeerAnnounceEntry(infoHash)); return std::binary_search(_entries.begin(), _entries.end(), entry, InfoHashLess()); } void DHTPeerAnnounceStorage::getPeers(std::deque >& peers, const unsigned char* infoHash) { SharedHandle entry(new DHTPeerAnnounceEntry(infoHash)); std::deque >::iterator i = std::lower_bound(_entries.begin(), _entries.end(), entry, InfoHashLess()); if(i != _entries.end() && memcmp(infoHash, (*i)->getInfoHash(), DHT_ID_LENGTH) == 0 && !(*i)->empty()) { (*i)->getPeers(peers); } } class RemoveStalePeerAddrEntry { public: void operator()(const SharedHandle& e) { e->removeStalePeerAddrEntry(DHT_PEER_ANNOUNCE_PURGE_INTERVAL); } }; void DHTPeerAnnounceStorage::handleTimeout() { _logger->debug("Now purge peer announces(%lu entries) which are timed out.", static_cast(_entries.size())); std::for_each(_entries.begin(), _entries.end(), RemoveStalePeerAddrEntry()); _entries.erase(std::remove_if(_entries.begin(), _entries.end(), mem_fun_sh(&DHTPeerAnnounceEntry::empty)), _entries.end()); _logger->debug("Currently %lu peer announce entries", static_cast(_entries.size())); } void DHTPeerAnnounceStorage::announcePeer() { _logger->debug("Now announcing peer."); for(std::deque >::iterator i = _entries.begin(); i != _entries.end(); ++i) { if((*i)->getLastUpdated().elapsed(DHT_PEER_ANNOUNCE_INTERVAL)) { (*i)->notifyUpdate(); SharedHandle task = _taskFactory->createPeerAnnounceTask((*i)->getInfoHash()); _taskQueue->addPeriodicTask2(task); _logger->debug("Added 1 peer announce: infoHash=%s", util::toHex((*i)->getInfoHash(), DHT_ID_LENGTH).c_str()); } } } void DHTPeerAnnounceStorage::setTaskQueue(const SharedHandle& taskQueue) { _taskQueue = taskQueue; } void DHTPeerAnnounceStorage::setTaskFactory(const SharedHandle& taskFactory) { _taskFactory = taskFactory; } } // namespace aria2