/* */ #include "DownloadCommand.h" #include #include "Request.h" #include "RequestGroup.h" #include "DownloadEngine.h" #include "PeerStat.h" #include "DlAbortEx.h" #include "DlRetryEx.h" #include "SegmentMan.h" #include "Segment.h" #include "Logger.h" #include "ChecksumCheckIntegrityEntry.h" #include "PieceStorage.h" #include "CheckIntegrityCommand.h" #include "DiskAdaptor.h" #include "DownloadContext.h" #include "Option.h" #include "util.h" #include "Socket.h" #include "message.h" #include "prefs.h" #include "StringFormat.h" #include "Decoder.h" #include "RequestGroupMan.h" #include "wallclock.h" #include "ServerStatMan.h" #include "FileAllocationEntry.h" #ifdef ENABLE_MESSAGE_DIGEST # include "MessageDigestHelper.h" #endif // ENABLE_MESSAGE_DIGEST namespace aria2 { static const size_t BUFSIZE = 16*1024; DownloadCommand::DownloadCommand(cuid_t cuid, const SharedHandle& req, const SharedHandle& fileEntry, RequestGroup* requestGroup, DownloadEngine* e, const SocketHandle& s): AbstractCommand(cuid, req, fileEntry, requestGroup, e, s), _buf(new unsigned char[BUFSIZE]), _startupIdleTime(10), _lowestDownloadSpeedLimit(0) #ifdef ENABLE_MESSAGE_DIGEST , _pieceHashValidationEnabled(false) #endif // ENABLE_MESSAGE_DIGEST { #ifdef ENABLE_MESSAGE_DIGEST { if(getOption()->getAsBool(PREF_REALTIME_CHUNK_CHECKSUM)) { const std::string& algo = getDownloadContext()->getPieceHashAlgo(); if(MessageDigestContext::supports(algo)) { _messageDigestContext.reset(new MessageDigestContext()); _messageDigestContext->trySetAlgo(algo); _messageDigestContext->digestInit(); _pieceHashValidationEnabled = true; } } } #endif // ENABLE_MESSAGE_DIGEST _peerStat = req->initPeerStat(); _peerStat->downloadStart(); getSegmentMan()->registerPeerStat(_peerStat); } DownloadCommand::~DownloadCommand() { _peerStat->downloadStop(); getSegmentMan()->updateFastestPeerStat(_peerStat); delete [] _buf; } bool DownloadCommand::executeInternal() { if(getDownloadEngine()->getRequestGroupMan()->doesOverallDownloadSpeedExceed() || getRequestGroup()->doesDownloadSpeedExceed()) { getDownloadEngine()->addCommand(this); disableReadCheckSocket(); return false; } setReadCheckSocket(getSocket()); SharedHandle segment = getSegments().front(); size_t bufSize; if(segment->getLength() > 0) { if(static_cast(segment->getPosition()+segment->getLength()) <= static_cast(getFileEntry()->getLastOffset())) { bufSize = std::min(segment->getLength()-segment->getWrittenLength(), BUFSIZE); } else { bufSize = std::min (static_cast (getFileEntry()->getLastOffset()-segment->getPositionToWrite()), BUFSIZE); } } else { bufSize = BUFSIZE; } getSocket()->readData(_buf, bufSize); const SharedHandle& diskAdaptor = getPieceStorage()->getDiskAdaptor(); const unsigned char* bufFinal; size_t bufSizeFinal; std::string decoded; if(_transferEncodingDecoder.isNull()) { bufFinal = _buf; bufSizeFinal = bufSize; } else { decoded = _transferEncodingDecoder->decode(_buf, bufSize); bufFinal = reinterpret_cast(decoded.c_str()); bufSizeFinal = decoded.size(); } if(_contentEncodingDecoder.isNull()) { diskAdaptor->writeData(bufFinal, bufSizeFinal, segment->getPositionToWrite()); } else { std::string out = _contentEncodingDecoder->decode(bufFinal, bufSizeFinal); diskAdaptor->writeData(reinterpret_cast(out.data()), out.size(), segment->getPositionToWrite()); bufSizeFinal = out.size(); } #ifdef ENABLE_MESSAGE_DIGEST if(_pieceHashValidationEnabled) { segment->updateHash(segment->getWrittenLength(), bufFinal, bufSizeFinal); } #endif // ENABLE_MESSAGE_DIGEST if(bufSizeFinal > 0) { segment->updateWrittenLength(bufSizeFinal); } _peerStat->updateDownloadLength(bufSize); getSegmentMan()->updateDownloadSpeedFor(_peerStat); bool segmentPartComplete = false; // Note that GrowSegment::complete() always returns false. if(_transferEncodingDecoder.isNull() && _contentEncodingDecoder.isNull()) { if(segment->complete() || segment->getPositionToWrite() == getFileEntry()->getLastOffset()) { segmentPartComplete = true; } else if(segment->getLength() == 0 && bufSize == 0 && !getSocket()->wantRead() && !getSocket()->wantWrite()) { segmentPartComplete = true; } } else if(!_transferEncodingDecoder.isNull() && (segment->complete() || segment->getPositionToWrite() == getFileEntry()->getLastOffset())){ // In this case, transferEncodingDecoder is used and // Content-Length is known. segmentPartComplete = true; } else if((_transferEncodingDecoder.isNull() || _transferEncodingDecoder->finished()) && (_contentEncodingDecoder.isNull() || _contentEncodingDecoder->finished())) { segmentPartComplete = true; } if(!segmentPartComplete && bufSize == 0 && !getSocket()->wantRead() && !getSocket()->wantWrite()) { throw DL_RETRY_EX(EX_GOT_EOF); } if(segmentPartComplete) { if(segment->complete() || segment->getLength() == 0) { // If segment->getLength() == 0, the server doesn't provide // content length, but the client detected that download // completed. if(getLogger()->info()) { getLogger()->info(MSG_SEGMENT_DOWNLOAD_COMPLETED, util::itos(getCuid()).c_str()); } #ifdef ENABLE_MESSAGE_DIGEST { const std::string& expectedPieceHash = getDownloadContext()->getPieceHash(segment->getIndex()); if(_pieceHashValidationEnabled && !expectedPieceHash.empty()) { if(segment->isHashCalculated()) { if(getLogger()->debug()) { getLogger()->debug ("Hash is available! index=%lu", static_cast(segment->getIndex())); } validatePieceHash (segment, expectedPieceHash, segment->getHashString()); } else { _messageDigestContext->digestReset(); validatePieceHash (segment, expectedPieceHash, MessageDigestHelper::digest (_messageDigestContext.get(), getPieceStorage()->getDiskAdaptor(), segment->getPosition(), segment->getLength())); } } else { getSegmentMan()->completeSegment(getCuid(), segment); } } #else // !ENABLE_MESSAGE_DIGEST getSegmentMan()->completeSegment(getCuid(), segment); #endif // !ENABLE_MESSAGE_DIGEST } else { // If segment is not canceled here, in the next pipelining // request, aria2 requests bad range // [FileEntry->getLastOffset(), FileEntry->getLastOffset()) getSegmentMan()->cancelSegment(getCuid(), segment); } checkLowestDownloadSpeed(); // this unit is going to download another segment. return prepareForNextSegment(); } else { checkLowestDownloadSpeed(); setWriteCheckSocketIf(getSocket(), getSocket()->wantWrite()); getDownloadEngine()->addCommand(this); return false; } } void DownloadCommand::checkLowestDownloadSpeed() const { // calculate downloading speed if(_peerStat->getDownloadStartTime().difference(global::wallclock) >= _startupIdleTime) { unsigned int nowSpeed = _peerStat->calculateDownloadSpeed(); if(_lowestDownloadSpeedLimit > 0 && nowSpeed <= _lowestDownloadSpeedLimit) { throw DL_ABORT_EX2(StringFormat(EX_TOO_SLOW_DOWNLOAD_SPEED, nowSpeed, _lowestDownloadSpeedLimit, getRequest()->getHost().c_str()).str(), downloadresultcode::TOO_SLOW_DOWNLOAD_SPEED); } } } bool DownloadCommand::prepareForNextSegment() { if(getRequestGroup()->downloadFinished()) { // Remove in-flight request here. getFileEntry()->poolRequest(getRequest()); // If this is a single file download, and file size becomes known // just after downloading, set total length to FileEntry object // here. if(getDownloadContext()->getFileEntries().size() == 1) { if(getFileEntry()->getLength() == 0) { getFileEntry()->setLength(getPieceStorage()->getCompletedLength()); } } #ifdef ENABLE_MESSAGE_DIGEST if(getDownloadContext()->getPieceHashAlgo().empty()) { SharedHandle entry (new ChecksumCheckIntegrityEntry(getRequestGroup())); if(entry->isValidationReady()) { entry->initValidator(); // TODO do we need cuttrailinggarbage here? getDownloadEngine()->getCheckIntegrityMan()->pushEntry(entry); } } // Following 2lines are needed for DownloadEngine to detect // completed RequestGroups without 1sec delay. getDownloadEngine()->setNoWait(true); getDownloadEngine()->setRefreshInterval(0); #endif // ENABLE_MESSAGE_DIGEST return true; } else { // The number of segments should be 1 in order to pass through the next // segment. if(getSegments().size() == 1) { SharedHandle tempSegment = getSegments().front(); if(!tempSegment->complete()) { return prepareForRetry(0); } SharedHandle nextSegment = getSegmentMan()->getCleanSegmentIfOwnerIsIdle (getCuid(), tempSegment->getIndex()+1); if(nextSegment.isNull()) { return prepareForRetry(0); } else { getDownloadEngine()->addCommand(this); return false; } } else { return prepareForRetry(0); } } } #ifdef ENABLE_MESSAGE_DIGEST void DownloadCommand::validatePieceHash(const SharedHandle& segment, const std::string& expectedPieceHash, const std::string& actualPieceHash) { if(actualPieceHash == expectedPieceHash) { getLogger()->info(MSG_GOOD_CHUNK_CHECKSUM, actualPieceHash.c_str()); getSegmentMan()->completeSegment(getCuid(), segment); } else { getLogger()->info(EX_INVALID_CHUNK_CHECKSUM, segment->getIndex(), util::itos(segment->getPosition(), true).c_str(), expectedPieceHash.c_str(), actualPieceHash.c_str()); segment->clear(); getSegmentMan()->cancelSegment(getCuid()); throw DL_RETRY_EX (StringFormat("Invalid checksum index=%d", segment->getIndex()).str()); } } #endif // ENABLE_MESSAGE_DIGEST void DownloadCommand::setTransferEncodingDecoder (const SharedHandle& decoder) { this->_transferEncodingDecoder = decoder; } void DownloadCommand::setContentEncodingDecoder (const SharedHandle& decoder) { _contentEncodingDecoder = decoder; } } // namespace aria2