/* */ #include "AbstractCommand.h" #include #include "Request.h" #include "DownloadEngine.h" #include "Option.h" #include "PeerStat.h" #include "SegmentMan.h" #include "Logger.h" #include "Segment.h" #include "DlAbortEx.h" #include "DlRetryEx.h" #include "DownloadFailureException.h" #include "CreateRequestCommand.h" #include "InitiateConnectionCommandFactory.h" #include "SleepCommand.h" #ifdef ENABLE_ASYNC_DNS #include "AsyncNameResolver.h" #endif // ENABLE_ASYNC_DNS #include "StreamCheckIntegrityEntry.h" #include "PieceStorage.h" #include "Socket.h" #include "message.h" #include "prefs.h" #include "StringFormat.h" #include "ServerStat.h" #include "RequestGroupMan.h" #include "A2STR.h" #include "util.h" #include "LogFactory.h" #include "DownloadContext.h" #include "wallclock.h" #include "NameResolver.h" #include "ServerStatMan.h" #include "FileAllocationEntry.h" namespace aria2 { AbstractCommand::AbstractCommand(cuid_t cuid, const SharedHandle& req, const SharedHandle& fileEntry, RequestGroup* requestGroup, DownloadEngine* e, const SocketHandle& s): Command(cuid), _checkPoint(global::wallclock), _timeout(requestGroup->getTimeout()), _requestGroup(requestGroup), _req(req), _fileEntry(fileEntry), _e(e), _socket(s), _checkSocketIsReadable(false), _checkSocketIsWritable(false), _nameResolverCheck(false) { if(!_socket.isNull() && _socket->isOpen()) { setReadCheckSocket(_socket); } _requestGroup->increaseStreamConnection(); _requestGroup->increaseNumCommand(); } AbstractCommand::~AbstractCommand() { disableReadCheckSocket(); disableWriteCheckSocket(); #ifdef ENABLE_ASYNC_DNS disableNameResolverCheck(_asyncNameResolver); #endif // ENABLE_ASYNC_DNS _requestGroup->decreaseNumCommand(); _requestGroup->decreaseStreamConnection(); } bool AbstractCommand::execute() { if(getLogger()->debug()) { getLogger()->debug("CUID#%s - socket: read:%d, write:%d, hup:%d, err:%d", util::itos(getCuid()).c_str(), readEventEnabled(), writeEventEnabled(), hupEventEnabled(), errorEventEnabled()); } try { if(_requestGroup->downloadFinished() || _requestGroup->isHaltRequested()) { return true; } if(!_req.isNull() && _req->removalRequested()) { if(getLogger()->debug()) { getLogger()->debug ("CUID#%s - Discard original URI=%s because it is requested.", util::itos(getCuid()).c_str(), _req->getUri().c_str()); } return prepareForRetry(0); } // TODO it is not needed to check other PeerStats every time. // Find faster Request when no segment is available. if(!_req.isNull() && _fileEntry->countPooledRequest() > 0 && !getPieceStorage()->hasMissingUnusedPiece()) { SharedHandle fasterRequest = _fileEntry->findFasterRequest(_req); if(!fasterRequest.isNull()) { if(getLogger()->info()) { getLogger()->info("CUID#%s - Use faster Request hostname=%s, port=%u", util::itos(getCuid()).c_str(), fasterRequest->getHost().c_str(), fasterRequest->getPort()); } // Cancel current Request object and use faster one. _fileEntry->removeRequest(_req); Command* command = InitiateConnectionCommandFactory::createInitiateConnectionCommand (getCuid(), fasterRequest, _fileEntry, _requestGroup, _e); _e->setNoWait(true); _e->addCommand(command); return true; } } if((_checkSocketIsReadable && readEventEnabled()) || (_checkSocketIsWritable && writeEventEnabled()) || hupEventEnabled() || #ifdef ENABLE_ASYNC_DNS (_nameResolverCheck && nameResolveFinished()) || #endif // ENABLE_ASYNC_DNS (!_checkSocketIsReadable && !_checkSocketIsWritable && !_nameResolverCheck)) { _checkPoint = global::wallclock; if(!getPieceStorage().isNull()) { _segments.clear(); getSegmentMan()->getInFlightSegment(_segments, getCuid()); if(!_req.isNull() && _segments.empty()) { // This command previously has assigned segments, but it is // canceled. So discard current request chain. if(getLogger()->debug()) { getLogger()->debug("CUID#%s - It seems previously assigned segments" " are canceled. Restart.", util::itos(getCuid()).c_str()); } return prepareForRetry(0); } if(_req.isNull() || _req->getMaxPipelinedRequest() == 1 || getDownloadContext()->getFileEntries().size() == 1) { if(_segments.empty()) { SharedHandle segment = getSegmentMan()->getSegment(getCuid()); if(!segment.isNull()) { _segments.push_back(segment); } } if(_segments.empty()) { // TODO socket could be pooled here if pipelining is enabled... if(getLogger()->info()) { getLogger()->info(MSG_NO_SEGMENT_AVAILABLE, util::itos(getCuid()).c_str()); } // When all segments are ignored in SegmentMan, there are // no URIs available, so don't retry. if(getSegmentMan()->allSegmentsIgnored()) { if(getLogger()->debug()) { getLogger()->debug("All segments are ignored."); } return true; } else { return prepareForRetry(1); } } } else { size_t maxSegments = _req->getMaxPipelinedRequest(); if(_segments.size() < maxSegments) { getSegmentMan()->getSegment (_segments, getCuid(), _fileEntry, maxSegments); } if(_segments.empty()) { return prepareForRetry(0); } } } return executeInternal(); } else if(errorEventEnabled()) { throw DL_RETRY_EX (StringFormat(MSG_NETWORK_PROBLEM, _socket->getSocketError().c_str()).str()); } else { if(_checkPoint.difference(global::wallclock) >= _timeout) { // timeout triggers ServerStat error state. SharedHandle ss = _e->getRequestGroupMan()->getOrCreateServerStat(_req->getHost(), _req->getProtocol()); ss->setError(); throw DL_RETRY_EX2(EX_TIME_OUT, downloadresultcode::TIME_OUT); } _e->addCommand(this); return false; } } catch(DlAbortEx& err) { if(_req.isNull()) { if(getLogger()->debug()) { getLogger()->debug(EX_EXCEPTION_CAUGHT, err); } } else { getLogger()->error (MSG_DOWNLOAD_ABORTED, DL_ABORT_EX2(StringFormat ("URI=%s", _req->getCurrentUri().c_str()).str(),err), util::itos(getCuid()).c_str(), _req->getUri().c_str()); _fileEntry->addURIResult(_req->getUri(), err.getCode()); _requestGroup->setLastUriResult(_req->getUri(), err.getCode()); if(err.getCode() == downloadresultcode::CANNOT_RESUME) { _requestGroup->increaseResumeFailureCount(); } } onAbort(); tryReserved(); return true; } catch(DlRetryEx& err) { assert(!_req.isNull()); if(getLogger()->info()) { getLogger()->info (MSG_RESTARTING_DOWNLOAD, DL_RETRY_EX2(StringFormat ("URI=%s", _req->getCurrentUri().c_str()).str(), err), util::itos(getCuid()).c_str(), _req->getUri().c_str()); } _req->addTryCount(); _req->resetRedirectCount(); const unsigned int maxTries = getOption()->getAsInt(PREF_MAX_TRIES); bool isAbort = maxTries != 0 && _req->getTryCount() >= maxTries; if(isAbort) { if(getLogger()->info()) { getLogger()->info(MSG_MAX_TRY, util::itos(getCuid()).c_str(), _req->getTryCount()); } getLogger()->error(MSG_DOWNLOAD_ABORTED, err, util::itos(getCuid()).c_str(), _req->getUri().c_str()); _fileEntry->addURIResult(_req->getUri(), err.getCode()); _requestGroup->setLastUriResult(_req->getUri(), err.getCode()); if(err.getCode() == downloadresultcode::CANNOT_RESUME) { _requestGroup->increaseResumeFailureCount(); } onAbort(); tryReserved(); return true; } else { return prepareForRetry(0); } } catch(DownloadFailureException& err) { getLogger()->error(EX_EXCEPTION_CAUGHT, err); if(!_req.isNull()) { _fileEntry->addURIResult(_req->getUri(), err.getCode()); _requestGroup->setLastUriResult(_req->getUri(), err.getCode()); } _requestGroup->setHaltRequested(true); return true; } } void AbstractCommand::tryReserved() { if(getDownloadContext()->getFileEntries().size() == 1) { const SharedHandle& entry = getDownloadContext()->getFirstFileEntry(); // Don't create new command if currently file length is unknown // and there are no URI left. Because file length is unknown, we // can assume that there are no in-flight request object. if(entry->getLength() == 0 && entry->getRemainingUris().empty()) { if(getLogger()->debug()) { getLogger()->debug("CUID#%s - Not trying next request." " No reserved/pooled request is remaining and" " total length is still unknown.", util::itos(getCuid()).c_str()); } return; } } if(getLogger()->debug()) { getLogger()->debug("CUID#%s - Trying reserved/pooled request.", util::itos(getCuid()).c_str()); } std::vector commands; _requestGroup->createNextCommand(commands, _e, 1); _e->setNoWait(true); _e->addCommand(commands); } bool AbstractCommand::prepareForRetry(time_t wait) { if(!getPieceStorage().isNull()) { getSegmentMan()->cancelSegment(getCuid()); } if(!_req.isNull()) { _fileEntry->poolRequest(_req); if(getLogger()->debug()) { getLogger()->debug("CUID#%s - Pooling request URI=%s", util::itos(getCuid()).c_str(), _req->getUri().c_str()); } if(!getSegmentMan().isNull()) { getSegmentMan()->recognizeSegmentFor(_fileEntry); } } Command* command = new CreateRequestCommand(getCuid(), _requestGroup, _e); if(wait == 0) { _e->setNoWait(true); _e->addCommand(command); } else { SleepCommand* scom = new SleepCommand(getCuid(), _e, _requestGroup, command, wait); _e->addCommand(scom); } return true; } void AbstractCommand::onAbort() { if(!_req.isNull()) { // TODO This might be a problem if the failure is caused by proxy. _e->getRequestGroupMan()->getOrCreateServerStat (_req->getHost(), _req->getProtocol())->setError(); _fileEntry->removeIdenticalURI(_req->getUri()); _fileEntry->removeRequest(_req); } if(getLogger()->debug()) { getLogger()->debug("CUID#%s - Aborting download", util::itos(getCuid()).c_str()); } if(!getPieceStorage().isNull()) { getSegmentMan()->cancelSegment(getCuid()); // Don't do following process if BitTorrent is involved or files // in DownloadContext is more than 1. The latter condition is // limitation of current implementation. if(!getOption()->getAsBool(PREF_ALWAYS_RESUME) && !_fileEntry.isNull() && getSegmentMan()->calculateSessionDownloadLength() == 0 && !_requestGroup->p2pInvolved() && getDownloadContext()->getFileEntries().size() == 1) { const int maxTries = getOption()->getAsInt(PREF_MAX_RESUME_FAILURE_TRIES); if((maxTries > 0 && _requestGroup->getResumeFailureCount() >= maxTries)|| _fileEntry->emptyRequestUri()) { // Local file exists, but given servers(or at least contacted // ones) doesn't support resume. Let's restart download from // scratch. getLogger()->notice("CUID#%s - Failed to resume download." " Download from scratch.", util::itos(getCuid()).c_str()); if(getLogger()->debug()) { getLogger()->debug ("CUID#%s - Gathering URIs that has CANNOT_RESUME error", util::itos(getCuid()).c_str()); } // Set PREF_ALWAYS_RESUME to V_TRUE to avoid repeating this // process. getOption()->put(PREF_ALWAYS_RESUME, V_TRUE); std::deque res; _fileEntry->extractURIResult(res, downloadresultcode::CANNOT_RESUME); if(!res.empty()) { getSegmentMan()->cancelAllSegments(); getSegmentMan()->eraseSegmentWrittenLengthMemo(); getPieceStorage()->markPiecesDone(0); std::vector uris; uris.reserve(res.size()); std::transform(res.begin(), res.end(), std::back_inserter(uris), std::mem_fun_ref(&URIResult::getURI)); if(getLogger()->debug()) { getLogger()->debug("CUID#%s - %lu URIs found.", util::itos(getCuid()).c_str(), static_cast(uris.size())); } _fileEntry->addUris(uris.begin(), uris.end()); getSegmentMan()->recognizeSegmentFor(_fileEntry); } } } } } void AbstractCommand::disableReadCheckSocket() { if(_checkSocketIsReadable) { _e->deleteSocketForReadCheck(_readCheckTarget, this); _checkSocketIsReadable = false; _readCheckTarget.reset(); } } void AbstractCommand::setReadCheckSocket(const SocketHandle& socket) { if(!socket->isOpen()) { disableReadCheckSocket(); } else { if(_checkSocketIsReadable) { if(_readCheckTarget != socket) { _e->deleteSocketForReadCheck(_readCheckTarget, this); _e->addSocketForReadCheck(socket, this); _readCheckTarget = socket; } } else { _e->addSocketForReadCheck(socket, this); _checkSocketIsReadable = true; _readCheckTarget = socket; } } } void AbstractCommand::setReadCheckSocketIf (const SharedHandle& socket, bool pred) { if(pred) { setReadCheckSocket(socket); } else { disableReadCheckSocket(); } } void AbstractCommand::disableWriteCheckSocket() { if(_checkSocketIsWritable) { _e->deleteSocketForWriteCheck(_writeCheckTarget, this); _checkSocketIsWritable = false; _writeCheckTarget.reset(); } } void AbstractCommand::setWriteCheckSocket(const SocketHandle& socket) { if(!socket->isOpen()) { disableWriteCheckSocket(); } else { if(_checkSocketIsWritable) { if(_writeCheckTarget != socket) { _e->deleteSocketForWriteCheck(_writeCheckTarget, this); _e->addSocketForWriteCheck(socket, this); _writeCheckTarget = socket; } } else { _e->addSocketForWriteCheck(socket, this); _checkSocketIsWritable = true; _writeCheckTarget = socket; } } } void AbstractCommand::setWriteCheckSocketIf (const SharedHandle& socket, bool pred) { if(pred) { setWriteCheckSocket(socket); } else { disableWriteCheckSocket(); } } // Returns proxy option value for the given protocol. static const std::string& getProxyOptionFor (const std::string& proxyPref, const SharedHandle