/* */ #include "DownloadCommand.h" #include #include "Request.h" #include "RequestGroup.h" #include "DownloadEngine.h" #include "PeerStat.h" #include "DlAbortEx.h" #include "DlRetryEx.h" #include "SegmentMan.h" #include "Segment.h" #include "Logger.h" #include "ChecksumCheckIntegrityEntry.h" #include "PieceStorage.h" #include "CheckIntegrityCommand.h" #include "DiskAdaptor.h" #include "DownloadContext.h" #include "Option.h" #include "util.h" #include "Socket.h" #include "message.h" #include "prefs.h" #include "StringFormat.h" #include "Decoder.h" #include "RequestGroupMan.h" #include "wallclock.h" #include "ServerStatMan.h" #include "FileAllocationEntry.h" #ifdef ENABLE_MESSAGE_DIGEST # include "MessageDigestHelper.h" #endif // ENABLE_MESSAGE_DIGEST namespace aria2 { static const size_t BUFSIZE = 16*1024; DownloadCommand::DownloadCommand(cuid_t cuid, const SharedHandle& req, const SharedHandle& fileEntry, RequestGroup* requestGroup, DownloadEngine* e, const SocketHandle& s): AbstractCommand(cuid, req, fileEntry, requestGroup, e, s), buf_(new unsigned char[BUFSIZE]), startupIdleTime_(10), lowestDownloadSpeedLimit_(0) #ifdef ENABLE_MESSAGE_DIGEST , pieceHashValidationEnabled_(false) #endif // ENABLE_MESSAGE_DIGEST { #ifdef ENABLE_MESSAGE_DIGEST { if(getOption()->getAsBool(PREF_REALTIME_CHUNK_CHECKSUM)) { const std::string& algo = getDownloadContext()->getPieceHashAlgo(); if(MessageDigestContext::supports(algo)) { messageDigestContext_.reset(new MessageDigestContext()); messageDigestContext_->trySetAlgo(algo); messageDigestContext_->digestInit(); pieceHashValidationEnabled_ = true; } } } #endif // ENABLE_MESSAGE_DIGEST peerStat_ = req->initPeerStat(); peerStat_->downloadStart(); getSegmentMan()->registerPeerStat(peerStat_); } DownloadCommand::~DownloadCommand() { peerStat_->downloadStop(); getSegmentMan()->updateFastestPeerStat(peerStat_); delete [] buf_; } bool DownloadCommand::executeInternal() { if(getDownloadEngine()->getRequestGroupMan()->doesOverallDownloadSpeedExceed() || getRequestGroup()->doesDownloadSpeedExceed()) { getDownloadEngine()->addCommand(this); disableReadCheckSocket(); return false; } setReadCheckSocket(getSocket()); SharedHandle segment = getSegments().front(); size_t bufSize; if(segment->getLength() > 0) { if(static_cast(segment->getPosition()+segment->getLength()) <= static_cast(getFileEntry()->getLastOffset())) { bufSize = std::min(segment->getLength()-segment->getWrittenLength(), BUFSIZE); } else { bufSize = std::min (static_cast (getFileEntry()->getLastOffset()-segment->getPositionToWrite()), BUFSIZE); } } else { bufSize = BUFSIZE; } getSocket()->readData(buf_, bufSize); const SharedHandle& diskAdaptor = getPieceStorage()->getDiskAdaptor(); const unsigned char* bufFinal; size_t bufSizeFinal; std::string decoded; if(transferEncodingDecoder_.isNull()) { bufFinal = buf_; bufSizeFinal = bufSize; } else { decoded = transferEncodingDecoder_->decode(buf_, bufSize); bufFinal = reinterpret_cast(decoded.c_str()); bufSizeFinal = decoded.size(); } if(contentEncodingDecoder_.isNull()) { diskAdaptor->writeData(bufFinal, bufSizeFinal, segment->getPositionToWrite()); } else { std::string out = contentEncodingDecoder_->decode(bufFinal, bufSizeFinal); diskAdaptor->writeData(reinterpret_cast(out.data()), out.size(), segment->getPositionToWrite()); bufSizeFinal = out.size(); } #ifdef ENABLE_MESSAGE_DIGEST if(pieceHashValidationEnabled_) { segment->updateHash(segment->getWrittenLength(), bufFinal, bufSizeFinal); } #endif // ENABLE_MESSAGE_DIGEST if(bufSizeFinal > 0) { segment->updateWrittenLength(bufSizeFinal); } peerStat_->updateDownloadLength(bufSize); getSegmentMan()->updateDownloadSpeedFor(peerStat_); bool segmentPartComplete = false; // Note that GrowSegment::complete() always returns false. if(transferEncodingDecoder_.isNull() && contentEncodingDecoder_.isNull()) { if(segment->complete() || segment->getPositionToWrite() == getFileEntry()->getLastOffset()) { segmentPartComplete = true; } else if(segment->getLength() == 0 && bufSize == 0 && !getSocket()->wantRead() && !getSocket()->wantWrite()) { segmentPartComplete = true; } } else if(!transferEncodingDecoder_.isNull() && (segment->complete() || segment->getPositionToWrite() == getFileEntry()->getLastOffset())){ // In this case, transferEncodingDecoder is used and // Content-Length is known. segmentPartComplete = true; } else if((transferEncodingDecoder_.isNull() || transferEncodingDecoder_->finished()) && (contentEncodingDecoder_.isNull() || contentEncodingDecoder_->finished())) { segmentPartComplete = true; } if(!segmentPartComplete && bufSize == 0 && !getSocket()->wantRead() && !getSocket()->wantWrite()) { throw DL_RETRY_EX(EX_GOT_EOF); } if(segmentPartComplete) { if(segment->complete() || segment->getLength() == 0) { // If segment->getLength() == 0, the server doesn't provide // content length, but the client detected that download // completed. if(getLogger()->info()) { getLogger()->info(MSG_SEGMENT_DOWNLOAD_COMPLETED, util::itos(getCuid()).c_str()); } #ifdef ENABLE_MESSAGE_DIGEST { const std::string& expectedPieceHash = getDownloadContext()->getPieceHash(segment->getIndex()); if(pieceHashValidationEnabled_ && !expectedPieceHash.empty()) { if(segment->isHashCalculated()) { if(getLogger()->debug()) { getLogger()->debug ("Hash is available! index=%lu", static_cast(segment->getIndex())); } validatePieceHash (segment, expectedPieceHash, segment->getHashString()); } else { messageDigestContext_->digestReset(); validatePieceHash (segment, expectedPieceHash, MessageDigestHelper::digest (messageDigestContext_.get(), getPieceStorage()->getDiskAdaptor(), segment->getPosition(), segment->getLength())); } } else { getSegmentMan()->completeSegment(getCuid(), segment); } } #else // !ENABLE_MESSAGE_DIGEST getSegmentMan()->completeSegment(getCuid(), segment); #endif // !ENABLE_MESSAGE_DIGEST } else { // If segment is not canceled here, in the next pipelining // request, aria2 requests bad range // [FileEntry->getLastOffset(), FileEntry->getLastOffset()) getSegmentMan()->cancelSegment(getCuid(), segment); } checkLowestDownloadSpeed(); // this unit is going to download another segment. return prepareForNextSegment(); } else { checkLowestDownloadSpeed(); setWriteCheckSocketIf(getSocket(), getSocket()->wantWrite()); getDownloadEngine()->addCommand(this); return false; } } void DownloadCommand::checkLowestDownloadSpeed() const { // calculate downloading speed if(peerStat_->getDownloadStartTime().difference(global::wallclock) >= startupIdleTime_) { unsigned int nowSpeed = peerStat_->calculateDownloadSpeed(); if(lowestDownloadSpeedLimit_ > 0 && nowSpeed <= lowestDownloadSpeedLimit_) { throw DL_ABORT_EX2(StringFormat(EX_TOO_SLOW_DOWNLOAD_SPEED, nowSpeed, lowestDownloadSpeedLimit_, getRequest()->getHost().c_str()).str(), downloadresultcode::TOO_SLOW_DOWNLOAD_SPEED); } } } bool DownloadCommand::prepareForNextSegment() { if(getRequestGroup()->downloadFinished()) { // Remove in-flight request here. getFileEntry()->poolRequest(getRequest()); // If this is a single file download, and file size becomes known // just after downloading, set total length to FileEntry object // here. if(getDownloadContext()->getFileEntries().size() == 1) { if(getFileEntry()->getLength() == 0) { getFileEntry()->setLength(getPieceStorage()->getCompletedLength()); } } #ifdef ENABLE_MESSAGE_DIGEST if(getDownloadContext()->getPieceHashAlgo().empty()) { SharedHandle entry (new ChecksumCheckIntegrityEntry(getRequestGroup())); if(entry->isValidationReady()) { entry->initValidator(); entry->cutTrailingGarbage(); getDownloadEngine()->getCheckIntegrityMan()->pushEntry(entry); } } // Following 2lines are needed for DownloadEngine to detect // completed RequestGroups without 1sec delay. getDownloadEngine()->setNoWait(true); getDownloadEngine()->setRefreshInterval(0); #endif // ENABLE_MESSAGE_DIGEST return true; } else { // The number of segments should be 1 in order to pass through the next // segment. if(getSegments().size() == 1) { SharedHandle tempSegment = getSegments().front(); if(!tempSegment->complete()) { return prepareForRetry(0); } SharedHandle nextSegment = getSegmentMan()->getSegment (getCuid(), tempSegment->getIndex()+1); if(nextSegment.isNull()) { nextSegment = getSegmentMan()->getCleanSegmentIfOwnerIsIdle (getCuid(), tempSegment->getIndex()+1); } if(nextSegment.isNull() || nextSegment->getWrittenLength() > 0) { // If nextSegment->getWrittenLength() > 0, current socket must // be closed because writing incoming data at // nextSegment->getWrittenLength() corrupts file. return prepareForRetry(0); } else { getDownloadEngine()->addCommand(this); return false; } } else { return prepareForRetry(0); } } } #ifdef ENABLE_MESSAGE_DIGEST void DownloadCommand::validatePieceHash(const SharedHandle& segment, const std::string& expectedPieceHash, const std::string& actualPieceHash) { if(actualPieceHash == expectedPieceHash) { getLogger()->info(MSG_GOOD_CHUNK_CHECKSUM, actualPieceHash.c_str()); getSegmentMan()->completeSegment(getCuid(), segment); } else { getLogger()->info(EX_INVALID_CHUNK_CHECKSUM, segment->getIndex(), util::itos(segment->getPosition(), true).c_str(), expectedPieceHash.c_str(), actualPieceHash.c_str()); segment->clear(); getSegmentMan()->cancelSegment(getCuid()); throw DL_RETRY_EX (StringFormat("Invalid checksum index=%d", segment->getIndex()).str()); } } #endif // ENABLE_MESSAGE_DIGEST void DownloadCommand::setTransferEncodingDecoder (const SharedHandle& decoder) { this->transferEncodingDecoder_ = decoder; } void DownloadCommand::setContentEncodingDecoder (const SharedHandle& decoder) { contentEncodingDecoder_ = decoder; } } // namespace aria2