/* */ #include "DHTPeerAnnounceStorage.h" #include #include #include "DHTPeerAnnounceEntry.h" #include "Peer.h" #include "DHTConstants.h" #include "DHTTaskQueue.h" #include "DHTTaskFactory.h" #include "DHTTask.h" #include "LogFactory.h" #include "Logger.h" #include "util.h" #include "a2functional.h" #include "wallclock.h" namespace aria2 { DHTPeerAnnounceStorage::DHTPeerAnnounceStorage(): logger_(LogFactory::getInstance()) {} DHTPeerAnnounceStorage::~DHTPeerAnnounceStorage() {} namespace { class InfoHashLess { public: bool operator()(const SharedHandle& lhs, const SharedHandle& rhs) { return memcmp(lhs->getInfoHash(), rhs->getInfoHash(), DHT_ID_LENGTH) < 0; } }; } SharedHandle DHTPeerAnnounceStorage::getPeerAnnounceEntry(const unsigned char* infoHash) { SharedHandle entry(new DHTPeerAnnounceEntry(infoHash)); std::deque >::iterator i = std::lower_bound(entries_.begin(), entries_.end(), entry, InfoHashLess()); if(i != entries_.end() && memcmp(infoHash, (*i)->getInfoHash(), DHT_ID_LENGTH) == 0) { entry = *i; } else { entries_.insert(i, entry); } return entry; } void DHTPeerAnnounceStorage::addPeerAnnounce(const unsigned char* infoHash, const std::string& ipaddr, uint16_t port) { if(logger_->debug()) { logger_->debug("Adding %s:%u to peer announce list: infoHash=%s", ipaddr.c_str(), port, util::toHex(infoHash, DHT_ID_LENGTH).c_str()); } getPeerAnnounceEntry(infoHash)->addPeerAddrEntry(PeerAddrEntry(ipaddr, port)); } bool DHTPeerAnnounceStorage::contains(const unsigned char* infoHash) const { SharedHandle entry(new DHTPeerAnnounceEntry(infoHash)); return std::binary_search(entries_.begin(), entries_.end(), entry, InfoHashLess()); } void DHTPeerAnnounceStorage::getPeers(std::vector >& peers, const unsigned char* infoHash) { SharedHandle entry(new DHTPeerAnnounceEntry(infoHash)); std::deque >::iterator i = std::lower_bound(entries_.begin(), entries_.end(), entry, InfoHashLess()); if(i != entries_.end() && memcmp(infoHash, (*i)->getInfoHash(), DHT_ID_LENGTH) == 0 && !(*i)->empty()) { (*i)->getPeers(peers); } } namespace { class RemoveStalePeerAddrEntry { public: void operator()(const SharedHandle& e) { e->removeStalePeerAddrEntry(DHT_PEER_ANNOUNCE_PURGE_INTERVAL); } }; } void DHTPeerAnnounceStorage::handleTimeout() { if(logger_->debug()) { logger_->debug("Now purge peer announces(%lu entries) which are timed out.", static_cast(entries_.size())); } std::for_each(entries_.begin(), entries_.end(), RemoveStalePeerAddrEntry()); entries_.erase(std::remove_if(entries_.begin(), entries_.end(), mem_fun_sh(&DHTPeerAnnounceEntry::empty)), entries_.end()); if(logger_->debug()) { logger_->debug("Currently %lu peer announce entries", static_cast(entries_.size())); } } void DHTPeerAnnounceStorage::announcePeer() { if(logger_->debug()) { logger_->debug("Now announcing peer."); } for(std::deque >::iterator i = entries_.begin(), eoi = entries_.end(); i != eoi; ++i) { if((*i)->getLastUpdated(). difference(global::wallclock) >= DHT_PEER_ANNOUNCE_INTERVAL) { (*i)->notifyUpdate(); SharedHandle task = taskFactory_->createPeerAnnounceTask((*i)->getInfoHash()); taskQueue_->addPeriodicTask2(task); if(logger_->debug()) { logger_->debug("Added 1 peer announce: infoHash=%s", util::toHex((*i)->getInfoHash(), DHT_ID_LENGTH).c_str()); } } } } void DHTPeerAnnounceStorage::setTaskQueue(const SharedHandle& taskQueue) { taskQueue_ = taskQueue; } void DHTPeerAnnounceStorage::setTaskFactory(const SharedHandle& taskFactory) { taskFactory_ = taskFactory; } } // namespace aria2