| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 | /* <!-- copyright *//* * aria2 - The high speed download utility * * Copyright (C) 2006 Tatsuhiro Tsujikawa * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * * In addition, as a special exception, the copyright holders give * permission to link the code of portions of this program with the * OpenSSL library under certain conditions as described in each * individual source file, and distribute linked combinations * including the two. * You must obey the GNU General Public License in all respects * for all of the code used other than OpenSSL.  If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so.  If you * do not wish to do so, delete this exception statement from your * version.  If you delete this exception statement from all source * files in the program, then also delete it here. *//* copyright --> */#include "DownloadCommand.h"#include <cassert>#include "Request.h"#include "RequestGroup.h"#include "DownloadEngine.h"#include "PeerStat.h"#include "DlAbortEx.h"#include "DlRetryEx.h"#include "SegmentMan.h"#include "Segment.h"#include "Logger.h"#include "ChecksumCheckIntegrityEntry.h"#include "PieceStorage.h"#include "CheckIntegrityCommand.h"#include "DiskAdaptor.h"#include "DownloadContext.h"#include "Option.h"#include "util.h"#include "Socket.h"#include "message.h"#include "prefs.h"#include "StringFormat.h"#include "Decoder.h"#include "RequestGroupMan.h"#include "wallclock.h"#include "ServerStatMan.h"#include "FileAllocationEntry.h"#ifdef ENABLE_MESSAGE_DIGEST# include "MessageDigestHelper.h"#endif // ENABLE_MESSAGE_DIGESTnamespace aria2 {static const size_t BUFSIZE = 16*1024;DownloadCommand::DownloadCommand(cuid_t cuid,                                 const SharedHandle<Request>& req,                                 const SharedHandle<FileEntry>& fileEntry,                                 RequestGroup* requestGroup,                                 DownloadEngine* e,                                 const SocketHandle& s):  AbstractCommand(cuid, req, fileEntry, requestGroup, e, s),  buf_(new unsigned char[BUFSIZE]),  startupIdleTime_(10),  lowestDownloadSpeedLimit_(0)#ifdef ENABLE_MESSAGE_DIGEST  , pieceHashValidationEnabled_(false)#endif // ENABLE_MESSAGE_DIGEST{#ifdef ENABLE_MESSAGE_DIGEST  {    if(getOption()->getAsBool(PREF_REALTIME_CHUNK_CHECKSUM)) {      const std::string& algo = getDownloadContext()->getPieceHashAlgo();      if(MessageDigestContext::supports(algo)) {        messageDigestContext_.reset(new MessageDigestContext());        messageDigestContext_->trySetAlgo(algo);        messageDigestContext_->digestInit();                pieceHashValidationEnabled_ = true;      }    }  }#endif // ENABLE_MESSAGE_DIGEST  peerStat_ = req->initPeerStat();  peerStat_->downloadStart();  getSegmentMan()->registerPeerStat(peerStat_);}DownloadCommand::~DownloadCommand() {  peerStat_->downloadStop();  getSegmentMan()->updateFastestPeerStat(peerStat_);  delete [] buf_;}bool DownloadCommand::executeInternal() {  if(getDownloadEngine()->getRequestGroupMan()->doesOverallDownloadSpeedExceed()     || getRequestGroup()->doesDownloadSpeedExceed()) {    getDownloadEngine()->addCommand(this);    disableReadCheckSocket();    return false;  }  setReadCheckSocket(getSocket());  SharedHandle<Segment> segment = getSegments().front();  size_t bufSize;  if(segment->getLength() > 0) {    if(static_cast<uint64_t>(segment->getPosition()+segment->getLength()) <=       static_cast<uint64_t>(getFileEntry()->getLastOffset())) {      bufSize = std::min(segment->getLength()-segment->getWrittenLength(),                         BUFSIZE);    } else {      bufSize =        std::min        (static_cast<size_t>         (getFileEntry()->getLastOffset()-segment->getPositionToWrite()),         BUFSIZE);    }  } else {    bufSize = BUFSIZE;  }  getSocket()->readData(buf_, bufSize);  const SharedHandle<DiskAdaptor>& diskAdaptor =    getPieceStorage()->getDiskAdaptor();  const unsigned char* bufFinal;  size_t bufSizeFinal;  std::string decoded;  if(transferEncodingDecoder_.isNull()) {    bufFinal = buf_;    bufSizeFinal = bufSize;  } else {    decoded = transferEncodingDecoder_->decode(buf_, bufSize);    bufFinal = reinterpret_cast<const unsigned char*>(decoded.c_str());    bufSizeFinal = decoded.size();  }  if(contentEncodingDecoder_.isNull()) {    diskAdaptor->writeData(bufFinal, bufSizeFinal,                           segment->getPositionToWrite());  } else {    std::string out = contentEncodingDecoder_->decode(bufFinal, bufSizeFinal);    diskAdaptor->writeData(reinterpret_cast<const unsigned char*>(out.data()),                           out.size(),                           segment->getPositionToWrite());    bufSizeFinal = out.size();  }#ifdef ENABLE_MESSAGE_DIGEST  if(pieceHashValidationEnabled_) {    segment->updateHash(segment->getWrittenLength(), bufFinal, bufSizeFinal);  }#endif // ENABLE_MESSAGE_DIGEST  if(bufSizeFinal > 0) {    segment->updateWrittenLength(bufSizeFinal);  }  peerStat_->updateDownloadLength(bufSize);  getSegmentMan()->updateDownloadSpeedFor(peerStat_);  bool segmentPartComplete = false;  // Note that GrowSegment::complete() always returns false.  if(transferEncodingDecoder_.isNull() && contentEncodingDecoder_.isNull()) {    if(segment->complete() ||       segment->getPositionToWrite() == getFileEntry()->getLastOffset()) {      segmentPartComplete = true;    } else if(segment->getLength() == 0 && bufSize == 0 &&              !getSocket()->wantRead() && !getSocket()->wantWrite()) {      segmentPartComplete = true;    }  } else if(!transferEncodingDecoder_.isNull() &&            (segment->complete() ||             segment->getPositionToWrite() == getFileEntry()->getLastOffset())){    // In this case, transferEncodingDecoder is used and    // Content-Length is known.    segmentPartComplete = true;  } else if((transferEncodingDecoder_.isNull() ||             transferEncodingDecoder_->finished()) &&            (contentEncodingDecoder_.isNull() ||             contentEncodingDecoder_->finished())) {    segmentPartComplete = true;  }  if(!segmentPartComplete && bufSize == 0 &&     !getSocket()->wantRead() && !getSocket()->wantWrite()) {    throw DL_RETRY_EX(EX_GOT_EOF);  }  if(segmentPartComplete) {    if(segment->complete() || segment->getLength() == 0) {      // If segment->getLength() == 0, the server doesn't provide      // content length, but the client detected that download      // completed.      if(getLogger()->info()) {        getLogger()->info(MSG_SEGMENT_DOWNLOAD_COMPLETED,                          util::itos(getCuid()).c_str());      }#ifdef ENABLE_MESSAGE_DIGEST      {        const std::string& expectedPieceHash =          getDownloadContext()->getPieceHash(segment->getIndex());        if(pieceHashValidationEnabled_ && !expectedPieceHash.empty()) {          if(segment->isHashCalculated()) {            if(getLogger()->debug()) {              getLogger()->debug                ("Hash is available! index=%lu",                 static_cast<unsigned long>(segment->getIndex()));            }            validatePieceHash              (segment, expectedPieceHash, segment->getHashString());          } else {            messageDigestContext_->digestReset();            validatePieceHash              (segment, expectedPieceHash,               MessageDigestHelper::digest               (messageDigestContext_.get(),                getPieceStorage()->getDiskAdaptor(),                segment->getPosition(),                segment->getLength()));          }        } else {          getSegmentMan()->completeSegment(getCuid(), segment);        }      }#else // !ENABLE_MESSAGE_DIGEST      getSegmentMan()->completeSegment(getCuid(), segment);#endif // !ENABLE_MESSAGE_DIGEST    } else {      // If segment is not canceled here, in the next pipelining      // request, aria2 requests bad range      // [FileEntry->getLastOffset(), FileEntry->getLastOffset())      getSegmentMan()->cancelSegment(getCuid(), segment);    }    checkLowestDownloadSpeed();    // this unit is going to download another segment.    return prepareForNextSegment();  } else {    checkLowestDownloadSpeed();    setWriteCheckSocketIf(getSocket(), getSocket()->wantWrite());    getDownloadEngine()->addCommand(this);    return false;  }}void DownloadCommand::checkLowestDownloadSpeed() const{  // calculate downloading speed  if(peerStat_->getDownloadStartTime().difference(global::wallclock) >=     startupIdleTime_) {    unsigned int nowSpeed = peerStat_->calculateDownloadSpeed();    if(lowestDownloadSpeedLimit_ > 0 && nowSpeed <= lowestDownloadSpeedLimit_) {      throw DL_ABORT_EX2(StringFormat(EX_TOO_SLOW_DOWNLOAD_SPEED,                                      nowSpeed,                                      lowestDownloadSpeedLimit_,                                      getRequest()->getHost().c_str()).str(),                         downloadresultcode::TOO_SLOW_DOWNLOAD_SPEED);    }  }}bool DownloadCommand::prepareForNextSegment() {  if(getRequestGroup()->downloadFinished()) {    // Remove in-flight request here.    getFileEntry()->poolRequest(getRequest());    // If this is a single file download, and file size becomes known    // just after downloading, set total length to FileEntry object    // here.    if(getDownloadContext()->getFileEntries().size() == 1) {      if(getFileEntry()->getLength() == 0) {        getFileEntry()->setLength(getPieceStorage()->getCompletedLength());      }    }#ifdef ENABLE_MESSAGE_DIGEST    if(getDownloadContext()->getPieceHashAlgo().empty()) {      SharedHandle<CheckIntegrityEntry> entry        (new ChecksumCheckIntegrityEntry(getRequestGroup()));      if(entry->isValidationReady()) {        entry->initValidator();        entry->cutTrailingGarbage();        getDownloadEngine()->getCheckIntegrityMan()->pushEntry(entry);      }    }    // Following 2lines are needed for DownloadEngine to detect    // completed RequestGroups without 1sec delay.    getDownloadEngine()->setNoWait(true);    getDownloadEngine()->setRefreshInterval(0);#endif // ENABLE_MESSAGE_DIGEST    return true;  } else {    // The number of segments should be 1 in order to pass through the next    // segment.    if(getSegments().size() == 1) {      SharedHandle<Segment> tempSegment = getSegments().front();      if(!tempSegment->complete()) {        return prepareForRetry(0);      }      SharedHandle<Segment> nextSegment = getSegmentMan()->getSegment        (getCuid(), tempSegment->getIndex()+1);      if(nextSegment.isNull()) {        nextSegment = getSegmentMan()->getCleanSegmentIfOwnerIsIdle        (getCuid(), tempSegment->getIndex()+1);      }      if(nextSegment.isNull() || nextSegment->getWrittenLength() > 0) {        // If nextSegment->getWrittenLength() > 0, current socket must        // be closed because writing incoming data at        // nextSegment->getWrittenLength() corrupts file.        return prepareForRetry(0);      } else {        getDownloadEngine()->addCommand(this);        return false;      }    } else {      return prepareForRetry(0);    }  }}#ifdef ENABLE_MESSAGE_DIGESTvoid DownloadCommand::validatePieceHash(const SharedHandle<Segment>& segment,                                        const std::string& expectedPieceHash,                                        const std::string& actualPieceHash){  if(actualPieceHash == expectedPieceHash) {    getLogger()->info(MSG_GOOD_CHUNK_CHECKSUM, actualPieceHash.c_str());    getSegmentMan()->completeSegment(getCuid(), segment);  } else {    getLogger()->info(EX_INVALID_CHUNK_CHECKSUM,                      segment->getIndex(),                      util::itos(segment->getPosition(), true).c_str(),                      expectedPieceHash.c_str(),                      actualPieceHash.c_str());    segment->clear();    getSegmentMan()->cancelSegment(getCuid());    throw DL_RETRY_EX      (StringFormat("Invalid checksum index=%d", segment->getIndex()).str());  }}#endif // ENABLE_MESSAGE_DIGESTvoid DownloadCommand::setTransferEncodingDecoder(const SharedHandle<Decoder>& decoder){  this->transferEncodingDecoder_ = decoder;}void DownloadCommand::setContentEncodingDecoder(const SharedHandle<Decoder>& decoder){  contentEncodingDecoder_ = decoder;}} // namespace aria2
 |