/* */ #include "DownloadCommand.h" #include #include "Request.h" #include "RequestGroup.h" #include "DownloadEngine.h" #include "PeerStat.h" #include "DlAbortEx.h" #include "DlRetryEx.h" #include "SegmentMan.h" #include "Segment.h" #include "Logger.h" #include "LogFactory.h" #include "ChecksumCheckIntegrityEntry.h" #include "PieceStorage.h" #include "CheckIntegrityCommand.h" #include "DiskAdaptor.h" #include "DownloadContext.h" #include "Option.h" #include "util.h" #include "SocketCore.h" #include "message.h" #include "prefs.h" #include "fmt.h" #include "RequestGroupMan.h" #include "wallclock.h" #include "SinkStreamFilter.h" #include "FileEntry.h" #include "SocketRecvBuffer.h" #include "Piece.h" #include "WrDiskCacheEntry.h" #include "DownloadFailureException.h" #ifdef ENABLE_MESSAGE_DIGEST # include "MessageDigest.h" # include "message_digest_helper.h" #endif // ENABLE_MESSAGE_DIGEST #ifdef ENABLE_BITTORRENT # include "bittorrent_helper.h" #endif // ENABLE_BITTORRENT namespace aria2 { namespace { const size_t BUFSIZE = 16*1024; } // namespace DownloadCommand::DownloadCommand (cuid_t cuid, const SharedHandle& req, const SharedHandle& fileEntry, RequestGroup* requestGroup, DownloadEngine* e, const SharedHandle& s, const SharedHandle& socketRecvBuffer) : AbstractCommand(cuid, req, fileEntry, requestGroup, e, s, socketRecvBuffer), startupIdleTime_(10), lowestDownloadSpeedLimit_(0), pieceHashValidationEnabled_(false) { #ifdef ENABLE_MESSAGE_DIGEST { if(getOption()->getAsBool(PREF_REALTIME_CHUNK_CHECKSUM)) { const std::string& algo = getDownloadContext()->getPieceHashType(); if(MessageDigest::supports(algo)) { messageDigest_ = MessageDigest::create(algo); pieceHashValidationEnabled_ = true; } } } #endif // ENABLE_MESSAGE_DIGEST peerStat_ = req->initPeerStat(); peerStat_->downloadStart(); getSegmentMan()->registerPeerStat(peerStat_); WrDiskCache* wrDiskCache = getPieceStorage()->getWrDiskCache(); streamFilter_.reset(new SinkStreamFilter(wrDiskCache, pieceHashValidationEnabled_)); streamFilter_->init(); sinkFilterOnly_ = true; checkSocketRecvBuffer(); } DownloadCommand::~DownloadCommand() { peerStat_->downloadStop(); getSegmentMan()->updateFastestPeerStat(peerStat_); } namespace { void flushWrDiskCacheEntry(WrDiskCache* wrDiskCache, const SharedHandle& segment) { const SharedHandle& piece = segment->getPiece(); if(piece->getWrDiskCacheEntry()) { piece->flushWrCache(wrDiskCache); if(piece->getWrDiskCacheEntry()->getError() != WrDiskCacheEntry::CACHE_ERR_SUCCESS) { segment->clear(wrDiskCache); throw DOWNLOAD_FAILURE_EXCEPTION2 (fmt("Write disk cache flush failure index=%lu", static_cast(piece->getIndex())), piece->getWrDiskCacheEntry()->getErrorCode()); } } } } // namespace bool DownloadCommand::executeInternal() { if(getDownloadEngine()->getRequestGroupMan()->doesOverallDownloadSpeedExceed() || getRequestGroup()->doesDownloadSpeedExceed()) { getDownloadEngine()->addCommand(this); disableReadCheckSocket(); return false; } setReadCheckSocket(getSocket()); const SharedHandle& diskAdaptor = getPieceStorage()->getDiskAdaptor(); SharedHandle segment = getSegments().front(); bool eof = false; if(getSocketRecvBuffer()->bufferEmpty()) { // Only read from socket when buffer is empty. Imagine that When // segment length is *short* and we are using HTTP pilelining. We // issued 2 requests in pipeline. When reading first response // header, we may read its response body and 2nd response header // and 2nd response body in buffer if they are small enough to fit // in buffer. And then server may sends EOF. In this case, we // read data from socket here, we will get EOF and leaves 2nd // response unprocessed. To prevent this, we don't read from // socket when buffer is not empty. eof = getSocketRecvBuffer()->recv() == 0 && !getSocket()->wantRead() && !getSocket()->wantWrite(); } if(!eof) { size_t bufSize; if(sinkFilterOnly_) { if(segment->getLength() > 0) { if(segment->getPosition()+segment->getLength() <= getFileEntry()->getLastOffset()) { bufSize = std::min(static_cast(segment->getLength() -segment->getWrittenLength()), getSocketRecvBuffer()->getBufferLength()); } else { bufSize = std::min (static_cast (getFileEntry()->getLastOffset()-segment->getPositionToWrite()), getSocketRecvBuffer()->getBufferLength()); } } else { bufSize = getSocketRecvBuffer()->getBufferLength(); } streamFilter_->transform(diskAdaptor, segment, getSocketRecvBuffer()->getBuffer(), bufSize); } else { // It is possible that segment is completed but we have some bytes // of stream to read. For example, chunked encoding has "0"+CRLF // after data. After we read data(at this moment segment is // completed), we need another 3bytes(or more if it has trailers). streamFilter_->transform(diskAdaptor, segment, getSocketRecvBuffer()->getBuffer(), getSocketRecvBuffer()->getBufferLength()); bufSize = streamFilter_->getBytesProcessed(); } getSocketRecvBuffer()->shiftBuffer(bufSize); peerStat_->updateDownloadLength(bufSize); getDownloadContext()->updateDownloadLength(bufSize); } bool segmentPartComplete = false; // Note that GrowSegment::complete() always returns false. if(sinkFilterOnly_) { if(segment->complete() || segment->getPositionToWrite() == getFileEntry()->getLastOffset()) { segmentPartComplete = true; } else if(segment->getLength() == 0 && eof) { segmentPartComplete = true; } } else { int64_t loff = getFileEntry()->gtoloff(segment->getPositionToWrite()); if(getFileEntry()->getLength() > 0 && !sinkFilterOnly_ && ((loff == getRequestEndOffset() && streamFilter_->finished()) || loff < getRequestEndOffset()) && (segment->complete() || segment->getPositionToWrite() == getFileEntry()->getLastOffset())) { // In this case, StreamFilter other than *SinkStreamFilter is // used and Content-Length is known. We check // streamFilter_->finished() only if the requested end offset // equals to written position in file local offset; in other // words, data in the requested ranage is all received. If // requested end offset is greater than this segment, then // streamFilter_ is not finished in this segment. segmentPartComplete = true; } else if(streamFilter_->finished()) { segmentPartComplete = true; } } if(!segmentPartComplete && eof) { throw DL_RETRY_EX(EX_GOT_EOF); } if(segmentPartComplete) { if(segment->complete() || segment->getLength() == 0) { // If segment->getLength() == 0, the server doesn't provide // content length, but the client detected that download // completed. A2_LOG_INFO(fmt(MSG_SEGMENT_DOWNLOAD_COMPLETED, getCuid())); #ifdef ENABLE_MESSAGE_DIGEST { const std::string& expectedPieceHash = getDownloadContext()->getPieceHash(segment->getIndex()); if(pieceHashValidationEnabled_ && !expectedPieceHash.empty()) { if( #ifdef ENABLE_BITTORRENT // TODO Is this necessary? (!getPieceStorage()->isEndGame() || !getDownloadContext()->hasAttribute(CTX_ATTR_BT)) && #endif // ENABLE_BITTORRENT segment->isHashCalculated()) { A2_LOG_DEBUG(fmt("Hash is available! index=%lu", static_cast(segment->getIndex()))); validatePieceHash (segment, expectedPieceHash, segment->getDigest()); } else { try { std::string actualHash = segment->getPiece()->getDigestWithWrCache (segment->getSegmentLength(), diskAdaptor); validatePieceHash(segment, expectedPieceHash, actualHash); } catch(RecoverableException& e) { segment->clear(getPieceStorage()->getWrDiskCache()); getSegmentMan()->cancelSegment(getCuid()); throw; } } } else { completeSegment(getCuid(), segment); } } #else // !ENABLE_MESSAGE_DIGEST completeSegment(getCuid(), segment); #endif // !ENABLE_MESSAGE_DIGEST } else { // If segment is not canceled here, in the next pipelining // request, aria2 requests bad range // [FileEntry->getLastOffset(), FileEntry->getLastOffset()) getSegmentMan()->cancelSegment(getCuid(), segment); } checkLowestDownloadSpeed(); // this unit is going to download another segment. return prepareForNextSegment(); } else { checkLowestDownloadSpeed(); setWriteCheckSocketIf(getSocket(), getSocket()->wantWrite()); checkSocketRecvBuffer(); getDownloadEngine()->addCommand(this); return false; } } void DownloadCommand::checkLowestDownloadSpeed() const { if(lowestDownloadSpeedLimit_ > 0 && peerStat_->getDownloadStartTime().difference(global::wallclock()) >= startupIdleTime_) { int nowSpeed = peerStat_->calculateDownloadSpeed(); if(nowSpeed <= lowestDownloadSpeedLimit_) { throw DL_ABORT_EX2(fmt(EX_TOO_SLOW_DOWNLOAD_SPEED, nowSpeed, lowestDownloadSpeedLimit_, getRequest()->getHost().c_str()), error_code::TOO_SLOW_DOWNLOAD_SPEED); } } } bool DownloadCommand::prepareForNextSegment() { if(getRequestGroup()->downloadFinished()) { // Remove in-flight request here. getFileEntry()->poolRequest(getRequest()); // If this is a single file download, and file size becomes known // just after downloading, set total length to FileEntry object // here. if(getDownloadContext()->getFileEntries().size() == 1) { if(getFileEntry()->getLength() == 0) { getFileEntry()->setLength(getPieceStorage()->getCompletedLength()); } } #ifdef ENABLE_MESSAGE_DIGEST if(getDownloadContext()->getPieceHashType().empty()) { SharedHandle entry (new ChecksumCheckIntegrityEntry(getRequestGroup())); if(entry->isValidationReady()) { entry->initValidator(); entry->cutTrailingGarbage(); getDownloadEngine()->getCheckIntegrityMan()->pushEntry(entry); } } #endif // ENABLE_MESSAGE_DIGEST // Following 2lines are needed for DownloadEngine to detect // completed RequestGroups without 1sec delay. getDownloadEngine()->setNoWait(true); getDownloadEngine()->setRefreshInterval(0); return true; } else { // The number of segments should be 1 in order to pass through the next // segment. if(getSegments().size() == 1) { SharedHandle tempSegment = getSegments().front(); if(!tempSegment->complete()) { return prepareForRetry(0); } if(getRequestEndOffset() == getFileEntry()->gtoloff (tempSegment->getPosition()+tempSegment->getLength())) { return prepareForRetry(0); } SharedHandle nextSegment = getSegmentMan()->getSegmentWithIndex (getCuid(), tempSegment->getIndex()+1); if(!nextSegment) { nextSegment = getSegmentMan()->getCleanSegmentIfOwnerIsIdle (getCuid(), tempSegment->getIndex()+1); } if(!nextSegment || nextSegment->getWrittenLength() > 0) { // If nextSegment->getWrittenLength() > 0, current socket must // be closed because writing incoming data at // nextSegment->getWrittenLength() corrupts file. return prepareForRetry(0); } else { checkSocketRecvBuffer(); getDownloadEngine()->addCommand(this); return false; } } else { return prepareForRetry(0); } } } #ifdef ENABLE_MESSAGE_DIGEST void DownloadCommand::validatePieceHash(const SharedHandle& segment, const std::string& expectedHash, const std::string& actualHash) { if(actualHash == expectedHash) { A2_LOG_INFO(fmt(MSG_GOOD_CHUNK_CHECKSUM, util::toHex(actualHash).c_str())); completeSegment(getCuid(), segment); } else { A2_LOG_INFO(fmt(EX_INVALID_CHUNK_CHECKSUM, static_cast(segment->getIndex()), segment->getPosition(), util::toHex(expectedHash).c_str(), util::toHex(actualHash).c_str())); segment->clear(getPieceStorage()->getWrDiskCache()); getSegmentMan()->cancelSegment(getCuid()); throw DL_RETRY_EX (fmt("Invalid checksum index=%lu", static_cast(segment->getIndex()))); } } #endif // ENABLE_MESSAGE_DIGEST void DownloadCommand::completeSegment(cuid_t cuid, const SharedHandle& segment) { flushWrDiskCacheEntry(getPieceStorage()->getWrDiskCache(), segment); getSegmentMan()->completeSegment(cuid, segment); } void DownloadCommand::installStreamFilter (const SharedHandle& streamFilter) { if(!streamFilter) { return; } streamFilter->installDelegate(streamFilter_); streamFilter_ = streamFilter; const std::string& name = streamFilter_->getName(); sinkFilterOnly_ = util::endsWith(name, SinkStreamFilter::NAME); } } // namespace aria2