/* */ #ifndef _D_DHT_ABSTRACT_NODE_LOOKUP_TASK_H_ #define _D_DHT_ABSTRACT_NODE_LOOKUP_TASK_H_ #include "DHTAbstractTask.h" #include #include #include #include #include "DHTConstants.h" #include "DHTNodeLookupEntry.h" #include "DHTRoutingTable.h" #include "DHTMessageDispatcher.h" #include "DHTMessageFactory.h" #include "DHTMessage.h" #include "DHTNode.h" #include "DHTBucket.h" #include "LogFactory.h" #include "Logger.h" #include "util.h" #include "DHTIDCloser.h" namespace aria2 { class DHTNode; class DHTMessage; template class DHTAbstractNodeLookupTask:public DHTAbstractTask { private: unsigned char _targetID[DHT_ID_LENGTH]; std::deque > _entries; size_t _inFlightMessage; template void toEntries (Container& entries, const std::vector >& nodes) const { for(std::vector >::const_iterator i = nodes.begin(), eoi = nodes.end(); i != eoi; ++i) { SharedHandle e(new DHTNodeLookupEntry(*i)); entries.push_back(e); } } void sendMessage() { for(std::deque >::iterator i = _entries.begin(), eoi = _entries.end(); i != eoi && _inFlightMessage < ALPHA; ++i) { if((*i)->used == false) { ++_inFlightMessage; (*i)->used = true; SharedHandle m = createMessage((*i)->node); SharedHandle callback(createCallback()); getMessageDispatcher()->addMessageToQueue(m, callback); } } } void sendMessageAndCheckFinish() { if(needsAdditionalOutgoingMessage()) { sendMessage(); } if(_inFlightMessage == 0) { if(getLogger()->debug()) { getLogger()->debug("Finished node_lookup for node ID %s", util::toHex(_targetID, DHT_ID_LENGTH).c_str()); } onFinish(); updateBucket(); setFinished(true); } else { if(getLogger()->debug()) { getLogger()->debug("%d in flight message for node ID %s", _inFlightMessage, util::toHex(_targetID, DHT_ID_LENGTH).c_str()); } } } void updateBucket() {} protected: const unsigned char* getTargetID() const { return _targetID; } const std::deque >& getEntries() const { return _entries; } virtual void getNodesFromMessage (std::vector >& nodes, const ResponseMessage* message) = 0; virtual void onReceivedInternal (const ResponseMessage* message) {} virtual bool needsAdditionalOutgoingMessage() { return true; } virtual void onFinish() {} virtual SharedHandle createMessage (const SharedHandle& remoteNode) = 0; virtual SharedHandle createCallback() = 0; public: DHTAbstractNodeLookupTask(const unsigned char* targetID): _inFlightMessage(0) { memcpy(_targetID, targetID, DHT_ID_LENGTH); } static const size_t ALPHA = 3; virtual void startup() { std::vector > nodes; getRoutingTable()->getClosestKNodes(nodes, _targetID); _entries.clear(); toEntries(_entries, nodes); if(_entries.empty()) { setFinished(true); } else { // TODO use RTT here _inFlightMessage = 0; sendMessage(); if(_inFlightMessage == 0) { if(getLogger()->debug()) { getLogger()->debug("No message was sent in this lookup stage. Finished."); } setFinished(true); } } } void onReceived(const ResponseMessage* message) { --_inFlightMessage; onReceivedInternal(message); std::vector > nodes; getNodesFromMessage(nodes, message); std::vector > newEntries; toEntries(newEntries, nodes); size_t count = 0; for(std::vector >::const_iterator i = newEntries.begin(), eoi = newEntries.end(); i != eoi; ++i) { if(memcmp(getLocalNode()->getID(), (*i)->node->getID(), DHT_ID_LENGTH) != 0) { _entries.push_front(*i); ++count; if(getLogger()->debug()) { getLogger()->debug("Received nodes: id=%s, ip=%s", util::toHex((*i)->node->getID(), DHT_ID_LENGTH).c_str(), (*i)->node->getIPAddress().c_str()); } } } if(getLogger()->debug()) { getLogger()->debug("%u node lookup entries added.", count); } std::stable_sort(_entries.begin(), _entries.end(), DHTIDCloser(_targetID)); _entries.erase(std::unique(_entries.begin(), _entries.end()), _entries.end()); if(getLogger()->debug()) { getLogger()->debug("%u node lookup entries are unique.", _entries.size()); } if(_entries.size() > DHTBucket::K) { _entries.erase(_entries.begin()+DHTBucket::K, _entries.end()); } sendMessageAndCheckFinish(); } void onTimeout(const SharedHandle& node) { if(getLogger()->debug()) { getLogger()->debug("node lookup message timeout for node ID=%s", util::toHex(node->getID(), DHT_ID_LENGTH).c_str()); } --_inFlightMessage; for(std::deque >::iterator i = _entries.begin(), eoi = _entries.end(); i != eoi; ++i) { if((*i)->node == node) { _entries.erase(i); break; } } sendMessageAndCheckFinish(); } }; } // namespace aria2 #endif // _D_DHT_ABSTRACT_NODE_LOOKUP_TASK_H_