/* */ #include "TrackerWatcherCommand.h" #include #include "DownloadEngine.h" #include "BtAnnounce.h" #include "BtRuntime.h" #include "PieceStorage.h" #include "PeerStorage.h" #include "Peer.h" #include "prefs.h" #include "message.h" #include "ByteArrayDiskWriterFactory.h" #include "RecoverableException.h" #include "PeerInitiateConnectionCommand.h" #include "DiskAdaptor.h" #include "FileEntry.h" #include "RequestGroup.h" #include "Option.h" #include "DlAbortEx.h" #include "Logger.h" #include "LogFactory.h" #include "A2STR.h" #include "SocketCore.h" #include "Request.h" #include "AnnounceTier.h" #include "DownloadContext.h" #include "bittorrent_helper.h" #include "a2functional.h" #include "util.h" #include "fmt.h" #include "UDPTrackerRequest.h" #include "UDPTrackerClient.h" #include "BtRegistry.h" #include "NameResolveCommand.h" namespace aria2 { HTTPAnnRequest::HTTPAnnRequest(const std::shared_ptr& rg) : rg_(rg) {} HTTPAnnRequest::~HTTPAnnRequest() {} bool HTTPAnnRequest::stopped() const { return rg_->getNumCommand() == 0; } bool HTTPAnnRequest::success() const { return rg_->downloadFinished(); } void HTTPAnnRequest::stop(DownloadEngine* e) { rg_->setForceHaltRequested(true); } bool HTTPAnnRequest::issue(DownloadEngine* e) { try { std::vector* commands = new std::vector(); auto_delete_container > commandsDel(commands); rg_->createInitialCommand(*commands, e); e->addCommand(*commands); e->setNoWait(true); commands->clear(); A2_LOG_DEBUG("added tracker request command"); return true; } catch(RecoverableException& ex) { A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, ex); return false; } } bool HTTPAnnRequest::processResponse (const std::shared_ptr& btAnnounce) { try { std::stringstream strm; unsigned char data[2048]; rg_->getPieceStorage()->getDiskAdaptor()->openFile(); while(1) { ssize_t dataLength = rg_->getPieceStorage()-> getDiskAdaptor()->readData(data, sizeof(data), strm.tellp()); if(dataLength == 0) { break; } strm.write(reinterpret_cast(data), dataLength); } std::string res = strm.str(); btAnnounce->processAnnounceResponse (reinterpret_cast(res.c_str()), res.size()); return true; } catch(RecoverableException& e) { A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, e); return false; } } UDPAnnRequest::UDPAnnRequest(const std::shared_ptr& req) : req_(req) {} UDPAnnRequest::~UDPAnnRequest() {} bool UDPAnnRequest::stopped() const { return !req_ || req_->state == UDPT_STA_COMPLETE; } bool UDPAnnRequest::success() const { return req_ && req_->state == UDPT_STA_COMPLETE && req_->error == UDPT_ERR_SUCCESS; } void UDPAnnRequest::stop(DownloadEngine* e) { if(req_) { req_.reset(); } } bool UDPAnnRequest::issue(DownloadEngine* e) { if(req_) { NameResolveCommand* command = new NameResolveCommand (e->newCUID(), e, req_); e->addCommand(command); e->setNoWait(true); return true; } else { return false; } } bool UDPAnnRequest::processResponse (const std::shared_ptr& btAnnounce) { if(req_) { btAnnounce->processUDPTrackerResponse(req_); return true; } else { return false; } } TrackerWatcherCommand::TrackerWatcherCommand (cuid_t cuid, RequestGroup* requestGroup, DownloadEngine* e) : Command(cuid), requestGroup_(requestGroup), e_(e), udpTrackerClient_(e_->getBtRegistry()->getUDPTrackerClient()) { requestGroup_->increaseNumCommand(); if(udpTrackerClient_) { udpTrackerClient_->increaseWatchers(); } } TrackerWatcherCommand::~TrackerWatcherCommand() { requestGroup_->decreaseNumCommand(); if(udpTrackerClient_) { udpTrackerClient_->decreaseWatchers(); } } bool TrackerWatcherCommand::execute() { if(requestGroup_->isForceHaltRequested()) { if(!trackerRequest_) { return true; } else if(trackerRequest_->stopped() || trackerRequest_->success()) { return true; } else { trackerRequest_->stop(e_); e_->setRefreshInterval(0); e_->addCommand(this); return false; } } if(btAnnounce_->noMoreAnnounce()) { A2_LOG_DEBUG("no more announce"); return true; } if(!trackerRequest_) { trackerRequest_ = createAnnounce(e_); if(trackerRequest_) { trackerRequest_->issue(e_); } } else if(trackerRequest_->stopped()) { // We really want to make sure that tracker request has finished // by checking getNumCommand() == 0. Because we reset // trackerRequestGroup_, if it is still used in other Command, we // will get Segmentation fault. if(trackerRequest_->success()) { if(trackerRequest_->processResponse(btAnnounce_)) { btAnnounce_->announceSuccess(); btAnnounce_->resetAnnounce(); addConnection(); } else { btAnnounce_->announceFailure(); if(btAnnounce_->isAllAnnounceFailed()) { btAnnounce_->resetAnnounce(); } } trackerRequest_.reset(); } else { // handle errors here btAnnounce_->announceFailure(); // inside it, trackers = 0. trackerRequest_.reset(); if(btAnnounce_->isAllAnnounceFailed()) { btAnnounce_->resetAnnounce(); } } } e_->addCommand(this); return false; } void TrackerWatcherCommand::addConnection() { while(!btRuntime_->isHalt() && btRuntime_->lessThanMinPeers()) { if(!peerStorage_->isPeerAvailable()) { break; } cuid_t ncuid = e_->newCUID(); std::shared_ptr peer = peerStorage_->checkoutPeer(ncuid); // sanity check if(!peer) { break; } PeerInitiateConnectionCommand* command; command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer, e_, btRuntime_); command->setPeerStorage(peerStorage_); command->setPieceStorage(pieceStorage_); e_->addCommand(command); A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - Adding new command CUID#%" PRId64 "", getCuid(), peer->usedBy())); } } std::shared_ptr TrackerWatcherCommand::createAnnounce(DownloadEngine* e) { std::shared_ptr treq; while(!btAnnounce_->isAllAnnounceFailed() && btAnnounce_->isAnnounceReady()) { std::string uri = btAnnounce_->getAnnounceUrl(); uri_split_result res; memset(&res, 0, sizeof(res)); if(uri_split(&res, uri.c_str()) == 0) { // Without UDP tracker support, send it to normal tracker flow // and make it fail. if(udpTrackerClient_ && uri::getFieldString(res, USR_SCHEME, uri.c_str()) == "udp") { uint16_t localPort; localPort = e->getBtRegistry()->getUdpPort(); treq = createUDPAnnRequest (uri::getFieldString(res, USR_HOST, uri.c_str()), res.port, localPort); } else { treq = createHTTPAnnRequest(btAnnounce_->getAnnounceUrl()); } btAnnounce_->announceStart(); // inside it, trackers++. break; } else { btAnnounce_->announceFailure(); } } if(btAnnounce_->isAllAnnounceFailed()) { btAnnounce_->resetAnnounce(); } return treq; } std::shared_ptr TrackerWatcherCommand::createUDPAnnRequest(const std::string& host, uint16_t port, uint16_t localPort) { std::shared_ptr req = btAnnounce_->createUDPTrackerRequest(host, port, localPort); return std::shared_ptr(new UDPAnnRequest(req)); } namespace { bool backupTrackerIsAvailable (const std::shared_ptr& context) { std::shared_ptr torrentAttrs = bittorrent::getTorrentAttrs(context); if(torrentAttrs->announceList.size() >= 2) { return true; } if(torrentAttrs->announceList.empty()) { return false; } if(torrentAttrs->announceList[0].size() >= 2) { return true; } else { return false; } } } // namespace std::shared_ptr TrackerWatcherCommand::createHTTPAnnRequest(const std::string& uri) { std::vector uris; uris.push_back(uri); std::shared_ptr