123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- /* <!-- copyright */
- /*
- * aria2 - The high speed download utility
- *
- * Copyright (C) 2006 Tatsuhiro Tsujikawa
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- *
- * In addition, as a special exception, the copyright holders give
- * permission to link the code of portions of this program with the
- * OpenSSL library under certain conditions as described in each
- * individual source file, and distribute linked combinations
- * including the two.
- * You must obey the GNU General Public License in all respects
- * for all of the code used other than OpenSSL. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you
- * do not wish to do so, delete this exception statement from your
- * version. If you delete this exception statement from all source
- * files in the program, then also delete it here.
- */
- /* copyright --> */
- #include "DownloadCommand.h"
- #include <cassert>
- #include "Request.h"
- #include "RequestGroup.h"
- #include "DownloadEngine.h"
- #include "PeerStat.h"
- #include "DlAbortEx.h"
- #include "DlRetryEx.h"
- #include "SegmentMan.h"
- #include "Segment.h"
- #include "Logger.h"
- #include "LogFactory.h"
- #include "ChecksumCheckIntegrityEntry.h"
- #include "PieceStorage.h"
- #include "CheckIntegrityCommand.h"
- #include "DiskAdaptor.h"
- #include "DownloadContext.h"
- #include "Option.h"
- #include "util.h"
- #include "SocketCore.h"
- #include "message.h"
- #include "prefs.h"
- #include "fmt.h"
- #include "RequestGroupMan.h"
- #include "wallclock.h"
- #include "SinkStreamFilter.h"
- #include "FileEntry.h"
- #include "SocketRecvBuffer.h"
- #include "Piece.h"
- #include "WrDiskCacheEntry.h"
- #include "DownloadFailureException.h"
- #include "MessageDigest.h"
- #include "message_digest_helper.h"
- #ifdef ENABLE_BITTORRENT
- # include "bittorrent_helper.h"
- #endif // ENABLE_BITTORRENT
- namespace aria2 {
- DownloadCommand::DownloadCommand(
- cuid_t cuid, const std::shared_ptr<Request>& req,
- const std::shared_ptr<FileEntry>& fileEntry, RequestGroup* requestGroup,
- DownloadEngine* e, const std::shared_ptr<SocketCore>& s,
- const std::shared_ptr<SocketRecvBuffer>& socketRecvBuffer)
- : AbstractCommand(cuid, req, fileEntry, requestGroup, e, s,
- socketRecvBuffer),
- startupIdleTime_(10),
- lowestDownloadSpeedLimit_(0),
- pieceHashValidationEnabled_(false)
- {
- {
- if (getOption()->getAsBool(PREF_REALTIME_CHUNK_CHECKSUM)) {
- const std::string& algo = getDownloadContext()->getPieceHashType();
- if (MessageDigest::supports(algo)) {
- messageDigest_ = MessageDigest::create(algo);
- pieceHashValidationEnabled_ = true;
- }
- }
- }
- peerStat_ = req->initPeerStat();
- peerStat_->downloadStart();
- getSegmentMan()->registerPeerStat(peerStat_);
- streamFilter_ = make_unique<SinkStreamFilter>(
- getPieceStorage()->getWrDiskCache(), pieceHashValidationEnabled_);
- streamFilter_->init();
- sinkFilterOnly_ = true;
- checkSocketRecvBuffer();
- }
- DownloadCommand::~DownloadCommand()
- {
- peerStat_->downloadStop();
- getSegmentMan()->updateFastestPeerStat(peerStat_);
- }
- namespace {
- void flushWrDiskCacheEntry(WrDiskCache* wrDiskCache,
- const std::shared_ptr<Segment>& segment)
- {
- const std::shared_ptr<Piece>& piece = segment->getPiece();
- if (piece->getWrDiskCacheEntry()) {
- piece->flushWrCache(wrDiskCache);
- if (piece->getWrDiskCacheEntry()->getError() !=
- WrDiskCacheEntry::CACHE_ERR_SUCCESS) {
- segment->clear(wrDiskCache);
- throw DOWNLOAD_FAILURE_EXCEPTION2(
- fmt("Write disk cache flush failure index=%lu",
- static_cast<unsigned long>(piece->getIndex())),
- piece->getWrDiskCacheEntry()->getErrorCode());
- }
- }
- }
- } // namespace
- bool DownloadCommand::executeInternal()
- {
- if (getDownloadEngine()
- ->getRequestGroupMan()
- ->doesOverallDownloadSpeedExceed() ||
- getRequestGroup()->doesDownloadSpeedExceed()) {
- addCommandSelf();
- disableReadCheckSocket();
- disableWriteCheckSocket();
- return false;
- }
- setReadCheckSocket(getSocket());
- const std::shared_ptr<DiskAdaptor>& diskAdaptor =
- getPieceStorage()->getDiskAdaptor();
- std::shared_ptr<Segment> segment = getSegments().front();
- bool eof = false;
- if (getSocketRecvBuffer()->bufferEmpty()) {
- // Only read from socket when buffer is empty. Imagine that When
- // segment length is *short* and we are using HTTP pilelining. We
- // issued 2 requests in pipeline. When reading first response
- // header, we may read its response body and 2nd response header
- // and 2nd response body in buffer if they are small enough to fit
- // in buffer. And then server may sends EOF. In this case, we
- // read data from socket here, we will get EOF and leaves 2nd
- // response unprocessed. To prevent this, we don't read from
- // socket when buffer is not empty.
- eof = getSocketRecvBuffer()->recv() == 0 && !getSocket()->wantRead() &&
- !getSocket()->wantWrite();
- }
- if (!eof) {
- size_t bufSize;
- if (sinkFilterOnly_) {
- if (segment->getLength() > 0) {
- if (segment->getPosition() + segment->getLength() <=
- getFileEntry()->getLastOffset()) {
- bufSize = std::min(static_cast<size_t>(segment->getLength() -
- segment->getWrittenLength()),
- getSocketRecvBuffer()->getBufferLength());
- }
- else {
- bufSize =
- std::min(static_cast<size_t>(getFileEntry()->getLastOffset() -
- segment->getPositionToWrite()),
- getSocketRecvBuffer()->getBufferLength());
- }
- }
- else {
- bufSize = getSocketRecvBuffer()->getBufferLength();
- }
- streamFilter_->transform(diskAdaptor, segment,
- getSocketRecvBuffer()->getBuffer(), bufSize);
- }
- else {
- // It is possible that segment is completed but we have some bytes
- // of stream to read. For example, chunked encoding has "0"+CRLF
- // after data. After we read data(at this moment segment is
- // completed), we need another 3bytes(or more if it has trailers).
- streamFilter_->transform(diskAdaptor, segment,
- getSocketRecvBuffer()->getBuffer(),
- getSocketRecvBuffer()->getBufferLength());
- bufSize = streamFilter_->getBytesProcessed();
- }
- getSocketRecvBuffer()->drain(bufSize);
- peerStat_->updateDownload(bufSize);
- getDownloadContext()->updateDownload(bufSize);
- }
- bool segmentPartComplete = false;
- // Note that GrowSegment::complete() always returns false.
- if (sinkFilterOnly_) {
- if (segment->complete() ||
- (getFileEntry()->getLength() != 0 &&
- segment->getPositionToWrite() == getFileEntry()->getLastOffset())) {
- segmentPartComplete = true;
- }
- else if (segment->getLength() == 0 && eof) {
- segmentPartComplete = true;
- }
- }
- else {
- int64_t loff = getFileEntry()->gtoloff(segment->getPositionToWrite());
- if (getFileEntry()->getLength() > 0 && !sinkFilterOnly_ &&
- ((loff == getRequestEndOffset() && streamFilter_->finished()) ||
- loff < getRequestEndOffset()) &&
- (segment->complete() ||
- segment->getPositionToWrite() == getFileEntry()->getLastOffset())) {
- // In this case, StreamFilter other than *SinkStreamFilter is
- // used and Content-Length is known. We check
- // streamFilter_->finished() only if the requested end offset
- // equals to written position in file local offset; in other
- // words, data in the requested range is all received. If
- // requested end offset is greater than this segment, then
- // streamFilter_ is not finished in this segment.
- segmentPartComplete = true;
- }
- else if (streamFilter_->finished()) {
- segmentPartComplete = true;
- }
- }
- if (!segmentPartComplete && eof) {
- throw DL_RETRY_EX(EX_GOT_EOF);
- }
- if (segmentPartComplete) {
- if (segment->complete() || segment->getLength() == 0) {
- // If segment->getLength() == 0, the server doesn't provide
- // content length, but the client detected that download
- // completed.
- A2_LOG_INFO(fmt(MSG_SEGMENT_DOWNLOAD_COMPLETED, getCuid()));
- {
- const std::string& expectedPieceHash =
- getDownloadContext()->getPieceHash(segment->getIndex());
- if (pieceHashValidationEnabled_ && !expectedPieceHash.empty()) {
- if (
- #ifdef ENABLE_BITTORRENT
- // TODO Is this necessary?
- (!getPieceStorage()->isEndGame() ||
- !getDownloadContext()->hasAttribute(CTX_ATTR_BT)) &&
- #endif // ENABLE_BITTORRENT
- segment->isHashCalculated()) {
- A2_LOG_DEBUG(fmt("Hash is available! index=%lu",
- static_cast<unsigned long>(segment->getIndex())));
- validatePieceHash(segment, expectedPieceHash, segment->getDigest());
- }
- else {
- try {
- std::string actualHash =
- segment->getPiece()->getDigestWithWrCache(
- segment->getSegmentLength(), diskAdaptor);
- validatePieceHash(segment, expectedPieceHash, actualHash);
- }
- catch (RecoverableException& e) {
- segment->clear(getPieceStorage()->getWrDiskCache());
- getSegmentMan()->cancelSegment(getCuid());
- throw;
- }
- }
- }
- else {
- completeSegment(getCuid(), segment);
- }
- }
- }
- else {
- // If segment is not canceled here, in the next pipelining
- // request, aria2 requests bad range
- // [FileEntry->getLastOffset(), FileEntry->getLastOffset())
- getSegmentMan()->cancelSegment(getCuid(), segment);
- }
- checkLowestDownloadSpeed();
- // this unit is going to download another segment.
- return prepareForNextSegment();
- }
- else {
- checkLowestDownloadSpeed();
- setWriteCheckSocketIf(getSocket(), shouldEnableWriteCheck());
- checkSocketRecvBuffer();
- addCommandSelf();
- return false;
- }
- }
- bool DownloadCommand::shouldEnableWriteCheck()
- {
- return getSocket()->wantWrite();
- }
- void DownloadCommand::checkLowestDownloadSpeed() const
- {
- if (lowestDownloadSpeedLimit_ > 0 &&
- peerStat_->getDownloadStartTime().difference(global::wallclock()) >=
- startupIdleTime_) {
- int nowSpeed = peerStat_->calculateDownloadSpeed();
- if (nowSpeed <= lowestDownloadSpeedLimit_) {
- throw DL_ABORT_EX2(fmt(EX_TOO_SLOW_DOWNLOAD_SPEED, nowSpeed,
- lowestDownloadSpeedLimit_,
- getRequest()->getHost().c_str()),
- error_code::TOO_SLOW_DOWNLOAD_SPEED);
- }
- }
- }
- bool DownloadCommand::prepareForNextSegment()
- {
- if (getRequestGroup()->downloadFinished()) {
- // Remove in-flight request here.
- getFileEntry()->poolRequest(getRequest());
- // If this is a single file download, and file size becomes known
- // just after downloading, set total length to FileEntry object
- // here.
- if (getDownloadContext()->getFileEntries().size() == 1) {
- if (getFileEntry()->getLength() == 0) {
- getFileEntry()->setLength(getPieceStorage()->getCompletedLength());
- }
- }
- if (getDownloadContext()->getPieceHashType().empty()) {
- auto entry = make_unique<ChecksumCheckIntegrityEntry>(getRequestGroup());
- if (entry->isValidationReady()) {
- entry->initValidator();
- entry->cutTrailingGarbage();
- getDownloadEngine()->getCheckIntegrityMan()->pushEntry(
- std::move(entry));
- }
- }
- // Following 2lines are needed for DownloadEngine to detect
- // completed RequestGroups without 1sec delay.
- getDownloadEngine()->setNoWait(true);
- getDownloadEngine()->setRefreshInterval(std::chrono::milliseconds(0));
- return true;
- }
- else {
- // The number of segments should be 1 in order to pass through the next
- // segment.
- if (getSegments().size() == 1) {
- std::shared_ptr<Segment> tempSegment = getSegments().front();
- if (!tempSegment->complete()) {
- return prepareForRetry(0);
- }
- if (getRequestEndOffset() ==
- getFileEntry()->gtoloff(tempSegment->getPosition() +
- tempSegment->getLength())) {
- return prepareForRetry(0);
- }
- std::shared_ptr<Segment> nextSegment =
- getSegmentMan()->getSegmentWithIndex(getCuid(),
- tempSegment->getIndex() + 1);
- if (!nextSegment) {
- nextSegment = getSegmentMan()->getCleanSegmentIfOwnerIsIdle(
- getCuid(), tempSegment->getIndex() + 1);
- }
- if (!nextSegment || nextSegment->getWrittenLength() > 0) {
- // If nextSegment->getWrittenLength() > 0, current socket must
- // be closed because writing incoming data at
- // nextSegment->getWrittenLength() corrupts file.
- return prepareForRetry(0);
- }
- else {
- checkSocketRecvBuffer();
- addCommandSelf();
- return false;
- }
- }
- else {
- return prepareForRetry(0);
- }
- }
- }
- void DownloadCommand::validatePieceHash(const std::shared_ptr<Segment>& segment,
- const std::string& expectedHash,
- const std::string& actualHash)
- {
- if (actualHash == expectedHash) {
- A2_LOG_INFO(fmt(MSG_GOOD_CHUNK_CHECKSUM, util::toHex(actualHash).c_str()));
- completeSegment(getCuid(), segment);
- }
- else {
- A2_LOG_INFO(fmt(EX_INVALID_CHUNK_CHECKSUM,
- static_cast<unsigned long>(segment->getIndex()),
- segment->getPosition(), util::toHex(expectedHash).c_str(),
- util::toHex(actualHash).c_str()));
- segment->clear(getPieceStorage()->getWrDiskCache());
- getSegmentMan()->cancelSegment(getCuid());
- throw DL_RETRY_EX(fmt("Invalid checksum index=%lu",
- static_cast<unsigned long>(segment->getIndex())));
- }
- }
- void DownloadCommand::completeSegment(cuid_t cuid,
- const std::shared_ptr<Segment>& segment)
- {
- flushWrDiskCacheEntry(getPieceStorage()->getWrDiskCache(), segment);
- getSegmentMan()->completeSegment(cuid, segment);
- }
- void DownloadCommand::installStreamFilter(
- std::unique_ptr<StreamFilter> streamFilter)
- {
- if (!streamFilter) {
- return;
- }
- streamFilter->installDelegate(std::move(streamFilter_));
- streamFilter_ = std::move(streamFilter);
- const std::string& name = streamFilter_->getName();
- sinkFilterOnly_ = util::endsWith(name, SinkStreamFilter::NAME);
- }
- // We need to override noCheck() to return true in order to measure
- // download speed to check lowest speed.
- bool DownloadCommand::noCheck() const { return lowestDownloadSpeedLimit_ > 0; }
- } // namespace aria2
|