/* */ #include "AbstractCommand.h" #include #include "Request.h" #include "DownloadEngine.h" #include "Option.h" #include "PeerStat.h" #include "SegmentMan.h" #include "Logger.h" #include "Segment.h" #include "DlAbortEx.h" #include "DlRetryEx.h" #include "DownloadFailureException.h" #include "CreateRequestCommand.h" #include "InitiateConnectionCommandFactory.h" #include "StreamCheckIntegrityEntry.h" #include "PieceStorage.h" #include "SocketCore.h" #include "message.h" #include "prefs.h" #include "fmt.h" #include "ServerStat.h" #include "RequestGroupMan.h" #include "A2STR.h" #include "util.h" #include "LogFactory.h" #include "DownloadContext.h" #include "wallclock.h" #include "NameResolver.h" #include "uri.h" #include "FileEntry.h" #include "error_code.h" #include "SocketRecvBuffer.h" #include "ChecksumCheckIntegrityEntry.h" #ifdef ENABLE_ASYNC_DNS #include "AsyncNameResolver.h" #include "AsyncNameResolverMan.h" #endif // ENABLE_ASYNC_DNS namespace aria2 { AbstractCommand::AbstractCommand( cuid_t cuid, const std::shared_ptr& req, const std::shared_ptr& fileEntry, RequestGroup* requestGroup, DownloadEngine* e, const std::shared_ptr& s, const std::shared_ptr& socketRecvBuffer, bool incNumConnection) : Command(cuid), req_(req), fileEntry_(fileEntry), socket_(s), socketRecvBuffer_(socketRecvBuffer), #ifdef ENABLE_ASYNC_DNS asyncNameResolverMan_(make_unique()), #endif // ENABLE_ASYNC_DNS requestGroup_(requestGroup), e_(e), checkPoint_(global::wallclock()), serverStatTimer_(global::wallclock()), timeout_(requestGroup->getTimeout()), checkSocketIsReadable_(false), checkSocketIsWritable_(false), incNumConnection_(incNumConnection) { if (socket_ && socket_->isOpen()) { setReadCheckSocket(socket_); } if (incNumConnection_) { requestGroup->increaseStreamConnection(); } requestGroup_->increaseStreamCommand(); requestGroup_->increaseNumCommand(); #ifdef ENABLE_ASYNC_DNS configureAsyncNameResolverMan(asyncNameResolverMan_.get(), e_->getOption()); #endif // ENABLE_ASYNC_DNS } AbstractCommand::~AbstractCommand() { disableReadCheckSocket(); disableWriteCheckSocket(); #ifdef ENABLE_ASYNC_DNS asyncNameResolverMan_->disableNameResolverCheck(e_, this); #endif // ENABLE_ASYNC_DNS requestGroup_->decreaseNumCommand(); requestGroup_->decreaseStreamCommand(); if (incNumConnection_) { requestGroup_->decreaseStreamConnection(); } } void AbstractCommand::useFasterRequest( const std::shared_ptr& fasterRequest) { A2_LOG_INFO(fmt("CUID#%" PRId64 " - Use faster Request hostname=%s, port=%u", getCuid(), fasterRequest->getHost().c_str(), fasterRequest->getPort())); // Cancel current Request object and use faster one. fileEntry_->removeRequest(req_); e_->setNoWait(true); e_->addCommand( InitiateConnectionCommandFactory::createInitiateConnectionCommand( getCuid(), fasterRequest, fileEntry_, requestGroup_, e_)); } bool AbstractCommand::shouldProcess() const { if (checkSocketIsReadable_) { if (readEventEnabled()) { return true; } if (socketRecvBuffer_ && !socketRecvBuffer_->bufferEmpty()) { return true; } if (socket_ && socket_->getRecvBufferedLength()) { return true; } } if (checkSocketIsWritable_ && writeEventEnabled()) { return true; } #ifdef ENABLE_ASYNC_DNS const auto resolverChecked = asyncNameResolverMan_->resolverChecked(); if (resolverChecked && asyncNameResolverMan_->getStatus() != 0) { return true; } if (!checkSocketIsReadable_ && !checkSocketIsWritable_ && !resolverChecked) { return true; } #else // ENABLE_ASYNC_DNS if (!checkSocketIsReadable_ && !checkSocketIsWritable_) { return true; } #endif // ENABLE_ASYNC_DNS return noCheck(); } bool AbstractCommand::execute() { A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - socket: read:%d, write:%d, hup:%d, err:%d", getCuid(), readEventEnabled(), writeEventEnabled(), hupEventEnabled(), errorEventEnabled())); try { if (requestGroup_->downloadFinished() || requestGroup_->isHaltRequested()) { return true; } if (req_ && req_->removalRequested()) { A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - Discard original URI=%s because it is" " requested.", getCuid(), req_->getUri().c_str())); return prepareForRetry(0); } auto sm = getSegmentMan(); if (getPieceStorage()) { segments_.clear(); sm->getInFlightSegment(segments_, getCuid()); if (req_ && segments_.empty()) { // This command previously has assigned segments, but it is // canceled. So discard current request chain. Plus, if no // segment is available when http pipelining is used. A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - It seems previously assigned segments" " are canceled. Restart.", getCuid())); // Request::isPipeliningEnabled() == true means aria2 // accessed the remote server and discovered that the server // supports pipelining. if (req_ && req_->isPipeliningEnabled()) { e_->poolSocket(req_, createProxyRequest(), socket_); } return prepareForRetry(0); } // TODO it is not needed to check other PeerStats every time. // Find faster Request when no segment split is allowed. if (req_ && fileEntry_->countPooledRequest() > 0 && requestGroup_->getPendingLength() < calculateMinSplitSize() * 2) { auto fasterRequest = fileEntry_->findFasterRequest(req_); if (fasterRequest) { useFasterRequest(fasterRequest); return true; } } // Don't use this feature if PREF_MAX_{OVERALL_}DOWNLOAD_LIMIT // is used or total length is unknown. if (req_ && fileEntry_->getLength() > 0 && e_->getRequestGroupMan()->getMaxOverallDownloadSpeedLimit() == 0 && requestGroup_->getMaxDownloadSpeedLimit() == 0 && serverStatTimer_.difference(global::wallclock()) >= 10_s) { serverStatTimer_ = global::wallclock(); std::vector> usedHosts; if (getOption()->getAsBool(PREF_SELECT_LEAST_USED_HOST)) { getDownloadEngine()->getRequestGroupMan()->getUsedHosts(usedHosts); } auto fasterRequest = fileEntry_->findFasterRequest( req_, usedHosts, e_->getRequestGroupMan()->getServerStatMan()); if (fasterRequest) { useFasterRequest(fasterRequest); return true; } } } if (shouldProcess()) { checkPoint_ = global::wallclock(); if (!getPieceStorage()) { return executeInternal(); } if (!req_ || req_->getMaxPipelinedRequest() == 1 || // Why the following condition is necessary? That's because // For single file download, SegmentMan::getSegment(cuid) // is more efficient. getDownloadContext()->getFileEntries().size() == 1) { size_t maxSegments = req_ ? req_->getMaxPipelinedRequest() : 1; size_t minSplitSize = calculateMinSplitSize(); while (segments_.size() < maxSegments) { auto segment = sm->getSegment(getCuid(), minSplitSize); if (!segment) { break; } segments_.push_back(segment); } if (segments_.empty()) { // TODO socket could be pooled here if pipelining is // enabled... Hmm, I don't think if pipelining is enabled // it does not go here. A2_LOG_INFO(fmt(MSG_NO_SEGMENT_AVAILABLE, getCuid())); // When all segments are ignored in SegmentMan, there are // no URIs available, so don't retry. if (sm->allSegmentsIgnored()) { A2_LOG_DEBUG("All segments are ignored."); // This will execute other idle Commands and let them // finish quickly. e_->setRefreshInterval(std::chrono::milliseconds(0)); return true; } return prepareForRetry(1); } } else { // For multi-file downloads size_t minSplitSize = calculateMinSplitSize(); size_t maxSegments = req_->getMaxPipelinedRequest(); if (segments_.size() < maxSegments) { sm->getSegment(segments_, getCuid(), minSplitSize, fileEntry_, maxSegments); } if (segments_.empty()) { return prepareForRetry(0); } } return executeInternal(); } if (errorEventEnabled()) { // older kernel may report "connection refused" here. auto ss = e_->getRequestGroupMan()->getOrCreateServerStat( req_->getHost(), req_->getProtocol()); ss->setError(); throw DL_RETRY_EX( fmt(MSG_NETWORK_PROBLEM, socket_->getSocketError().c_str())); } if (checkPoint_.difference(global::wallclock()) >= timeout_) { // timeout triggers ServerStat error state. auto ss = e_->getRequestGroupMan()->getOrCreateServerStat( req_->getHost(), req_->getProtocol()); ss->setError(); // When DNS query was timeout, req_->getConnectedAddr() is // empty. if (!req_->getConnectedAddr().empty()) { // Purging IP address cache to renew IP address. A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - Marking IP address %s as bad", getCuid(), req_->getConnectedAddr().c_str())); e_->markBadIPAddress(req_->getConnectedHostname(), req_->getConnectedAddr(), req_->getConnectedPort()); } if (e_->findCachedIPAddress(req_->getConnectedHostname(), req_->getConnectedPort()) .empty()) { A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - All IP addresses were marked bad." " Removing Entry.", getCuid())); e_->removeCachedIPAddress(req_->getConnectedHostname(), req_->getConnectedPort()); } throw DL_RETRY_EX2(EX_TIME_OUT, error_code::TIME_OUT); } addCommandSelf(); return false; } catch (DlAbortEx& err) { requestGroup_->setLastErrorCode(err.getErrorCode(), err.what()); if (req_) { A2_LOG_ERROR_EX( fmt(MSG_DOWNLOAD_ABORTED, getCuid(), req_->getUri().c_str()), DL_ABORT_EX2(fmt("URI=%s", req_->getCurrentUri().c_str()), err)); fileEntry_->addURIResult(req_->getUri(), err.getErrorCode()); if (err.getErrorCode() == error_code::CANNOT_RESUME) { requestGroup_->increaseResumeFailureCount(); } } else { A2_LOG_DEBUG_EX(EX_EXCEPTION_CAUGHT, err); } onAbort(); tryReserved(); return true; } catch (DlRetryEx& err) { assert(req_); A2_LOG_INFO_EX( fmt(MSG_RESTARTING_DOWNLOAD, getCuid(), req_->getUri().c_str()), DL_RETRY_EX2(fmt("URI=%s", req_->getCurrentUri().c_str()), err)); req_->addTryCount(); req_->resetRedirectCount(); req_->resetUri(); const int maxTries = getOption()->getAsInt(PREF_MAX_TRIES); bool isAbort = maxTries != 0 && req_->getTryCount() >= maxTries; if (isAbort) { A2_LOG_INFO(fmt(MSG_MAX_TRY, getCuid(), req_->getTryCount())); A2_LOG_ERROR_EX( fmt(MSG_DOWNLOAD_ABORTED, getCuid(), req_->getUri().c_str()), err); fileEntry_->addURIResult(req_->getUri(), err.getErrorCode()); requestGroup_->setLastErrorCode(err.getErrorCode(), err.what()); if (err.getErrorCode() == error_code::CANNOT_RESUME) { requestGroup_->increaseResumeFailureCount(); } onAbort(); tryReserved(); return true; } if (err.getErrorCode() == error_code::HTTP_SERVICE_UNAVAILABLE) { Timer wakeTime(global::wallclock()); wakeTime.advance( std::chrono::seconds(getOption()->getAsInt(PREF_RETRY_WAIT))); req_->setWakeTime(wakeTime); } return prepareForRetry(0); } catch (DownloadFailureException& err) { requestGroup_->setLastErrorCode(err.getErrorCode(), err.what()); if (req_) { A2_LOG_ERROR_EX( fmt(MSG_DOWNLOAD_ABORTED, getCuid(), req_->getUri().c_str()), DL_ABORT_EX2(fmt("URI=%s", req_->getCurrentUri().c_str()), err)); fileEntry_->addURIResult(req_->getUri(), err.getErrorCode()); } else { A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, err); } requestGroup_->setHaltRequested(true); getDownloadEngine()->setRefreshInterval(std::chrono::milliseconds(0)); return true; } } void AbstractCommand::tryReserved() { if (getDownloadContext()->getFileEntries().size() == 1) { const auto& entry = getDownloadContext()->getFirstFileEntry(); // Don't create new command if currently file length is unknown // and there are no URI left. Because file length is unknown, we // can assume that there are no in-flight request object. if (entry->getLength() == 0 && entry->getRemainingUris().empty()) { A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - Not trying next request." " No reserved/pooled request is remaining and" " total length is still unknown.", getCuid())); return; } } A2_LOG_DEBUG( fmt("CUID#%" PRId64 " - Trying reserved/pooled request.", getCuid())); std::vector> commands; requestGroup_->createNextCommand(commands, e_, 1); e_->setNoWait(true); e_->addCommand(std::move(commands)); } bool AbstractCommand::prepareForRetry(time_t wait) { if (getPieceStorage()) { getSegmentMan()->cancelSegment(getCuid()); } if (req_) { // Reset persistentConnection and maxPipelinedRequest to handle // the situation where remote server returns Connection: close // after several pipelined requests. req_->supportsPersistentConnection(true); req_->setMaxPipelinedRequest(1); fileEntry_->poolRequest(req_); A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - Pooling request URI=%s", getCuid(), req_->getUri().c_str())); if (getSegmentMan()) { getSegmentMan()->recognizeSegmentFor(fileEntry_); } } auto command = make_unique(getCuid(), requestGroup_, e_); if (wait == 0) { e_->setNoWait(true); } else { // We don't use wait so that Command can be executed by // DownloadEngine::setRefreshInterval(std::chrono::milliseconds(0)). command->setStatus(Command::STATUS_INACTIVE); } e_->addCommand(std::move(command)); return true; } void AbstractCommand::onAbort() { if (req_) { fileEntry_->removeIdenticalURI(req_->getUri()); fileEntry_->removeRequest(req_); } A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - Aborting download", getCuid())); if (!getPieceStorage()) { return; } getSegmentMan()->cancelSegment(getCuid()); // Don't do following process if BitTorrent is involved or files // in DownloadContext is more than 1. The latter condition is // limitation of current implementation. if (getOption()->getAsBool(PREF_ALWAYS_RESUME) || !fileEntry_ || getDownloadContext()->getNetStat().getSessionDownloadLength() != 0 || requestGroup_->p2pInvolved() || getDownloadContext()->getFileEntries().size() != 1) { return; } const int maxTries = getOption()->getAsInt(PREF_MAX_RESUME_FAILURE_TRIES); if (!(maxTries > 0 && requestGroup_->getResumeFailureCount() >= maxTries) && !fileEntry_->emptyRequestUri()) { return; } // Local file exists, but given servers(or at least contacted // ones) doesn't support resume. Let's restart download from // scratch. A2_LOG_NOTICE(fmt(_("CUID#%" PRId64 " - Failed to resume download." " Download from scratch."), getCuid())); A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - Gathering URIs that has CANNOT_RESUME" " error", getCuid())); // Set PREF_ALWAYS_RESUME to A2_V_TRUE to avoid repeating this // process. getOption()->put(PREF_ALWAYS_RESUME, A2_V_TRUE); std::deque res; fileEntry_->extractURIResult(res, error_code::CANNOT_RESUME); if (res.empty()) { return; } getSegmentMan()->cancelAllSegments(); getSegmentMan()->eraseSegmentWrittenLengthMemo(); getPieceStorage()->markPiecesDone(0); std::vector uris; uris.reserve(res.size()); std::transform(std::begin(res), std::end(res), std::back_inserter(uris), std::mem_fn(&URIResult::getURI)); A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - %lu URIs found.", getCuid(), static_cast(uris.size()))); fileEntry_->addUris(std::begin(uris), std::end(uris)); getSegmentMan()->recognizeSegmentFor(fileEntry_); } void AbstractCommand::disableReadCheckSocket() { if (!checkSocketIsReadable_) { return; } e_->deleteSocketForReadCheck(readCheckTarget_, this); checkSocketIsReadable_ = false; readCheckTarget_.reset(); } void AbstractCommand::setReadCheckSocket( const std::shared_ptr& socket) { if (!socket->isOpen()) { disableReadCheckSocket(); return; } if (checkSocketIsReadable_) { if (*readCheckTarget_ != *socket) { e_->deleteSocketForReadCheck(readCheckTarget_, this); e_->addSocketForReadCheck(socket, this); readCheckTarget_ = socket; } return; } e_->addSocketForReadCheck(socket, this); checkSocketIsReadable_ = true; readCheckTarget_ = socket; } void AbstractCommand::setReadCheckSocketIf( const std::shared_ptr& socket, bool pred) { if (pred) { setReadCheckSocket(socket); return; } disableReadCheckSocket(); } void AbstractCommand::disableWriteCheckSocket() { if (!checkSocketIsWritable_) { return; } e_->deleteSocketForWriteCheck(writeCheckTarget_, this); checkSocketIsWritable_ = false; writeCheckTarget_.reset(); } void AbstractCommand::setWriteCheckSocket( const std::shared_ptr& socket) { if (!socket->isOpen()) { disableWriteCheckSocket(); return; } if (checkSocketIsWritable_) { if (*writeCheckTarget_ != *socket) { e_->deleteSocketForWriteCheck(writeCheckTarget_, this); e_->addSocketForWriteCheck(socket, this); writeCheckTarget_ = socket; } return; } e_->addSocketForWriteCheck(socket, this); checkSocketIsWritable_ = true; writeCheckTarget_ = socket; } void AbstractCommand::setWriteCheckSocketIf( const std::shared_ptr& socket, bool pred) { if (pred) { setWriteCheckSocket(socket); return; } disableWriteCheckSocket(); } void AbstractCommand::swapSocket(std::shared_ptr& socket) { disableReadCheckSocket(); disableWriteCheckSocket(); socket_.swap(socket); } namespace { // Constructs proxy URI, merging username and password if they are // defined. std::string makeProxyUri(PrefPtr proxyPref, PrefPtr proxyUser, PrefPtr proxyPasswd, const Option* option) { uri::UriStruct us; if (!uri::parse(us, option->get(proxyPref))) { return ""; } if (option->defined(proxyUser)) { us.username = option->get(proxyUser); } if (option->defined(proxyPasswd)) { us.password = option->get(proxyPasswd); us.hasPassword = true; } return uri::construct(us); } } // namespace namespace { // Returns proxy option value for the given protocol. std::string getProxyOptionFor(PrefPtr proxyPref, PrefPtr proxyUser, PrefPtr proxyPasswd, const Option* option) { std::string uri = makeProxyUri(proxyPref, proxyUser, proxyPasswd, option); if (uri.empty()) { return makeProxyUri(PREF_ALL_PROXY, PREF_ALL_PROXY_USER, PREF_ALL_PROXY_PASSWD, option); } return uri; } } // namespace // Returns proxy URI for given protocol. If no proxy URI is defined, // then returns an empty string. std::string getProxyUri(const std::string& protocol, const Option* option) { if (protocol == "http") { return getProxyOptionFor(PREF_HTTP_PROXY, PREF_HTTP_PROXY_USER, PREF_HTTP_PROXY_PASSWD, option); } if (protocol == "https") { return getProxyOptionFor(PREF_HTTPS_PROXY, PREF_HTTPS_PROXY_USER, PREF_HTTPS_PROXY_PASSWD, option); } if (protocol == "ftp" || protocol == "sftp") { return getProxyOptionFor(PREF_FTP_PROXY, PREF_FTP_PROXY_USER, PREF_FTP_PROXY_PASSWD, option); } return A2STR::NIL; } namespace { // Returns true if proxy is defined for the given protocol. Otherwise // returns false. bool isProxyRequest(const std::string& protocol, const std::shared_ptr