/* */ #include "SegmentMan.h" #include #include #include #include "util.h" #include "message.h" #include "prefs.h" #include "PiecedSegment.h" #include "GrowSegment.h" #include "LogFactory.h" #include "Logger.h" #include "PieceStorage.h" #include "PeerStat.h" #include "Option.h" #include "DownloadContext.h" #include "Piece.h" #include "FileEntry.h" #include "wallclock.h" #include "fmt.h" #include "WrDiskCacheEntry.h" #include "DownloadFailureException.h" namespace aria2 { SegmentEntry::SegmentEntry(cuid_t cuid, const std::shared_ptr& segment) : cuid(cuid), segment(segment) { } SegmentEntry::~SegmentEntry() = default; SegmentMan::SegmentMan(const std::shared_ptr& downloadContext, const std::shared_ptr& pieceStorage) : downloadContext_(downloadContext), pieceStorage_(pieceStorage), ignoreBitfield_(downloadContext->getPieceLength(), downloadContext->getTotalLength()) { ignoreBitfield_.enableFilter(); } SegmentMan::~SegmentMan() = default; bool SegmentMan::downloadFinished() const { if (!pieceStorage_) { return false; } else { return pieceStorage_->downloadFinished(); } } void SegmentMan::init() { // TODO Do we have to do something about DownloadContext and // PieceStorage here? } int64_t SegmentMan::getTotalLength() const { if (!pieceStorage_) { return 0; } else { return pieceStorage_->getTotalLength(); } } void SegmentMan::setPieceStorage( const std::shared_ptr& pieceStorage) { pieceStorage_ = pieceStorage; } void SegmentMan::setDownloadContext( const std::shared_ptr& downloadContext) { downloadContext_ = downloadContext; } namespace { void flushWrDiskCache(WrDiskCache* wrDiskCache, const std::shared_ptr& piece) { piece->flushWrCache(wrDiskCache); if (piece->getWrDiskCacheEntry()->getError() != WrDiskCacheEntry::CACHE_ERR_SUCCESS) { piece->clearAllBlock(wrDiskCache); throw DOWNLOAD_FAILURE_EXCEPTION2( fmt("Write disk cache flush failure index=%lu", static_cast(piece->getIndex())), piece->getWrDiskCacheEntry()->getErrorCode()); } } } // namespace std::shared_ptr SegmentMan::checkoutSegment(cuid_t cuid, const std::shared_ptr& piece) { if (!piece) { return nullptr; } A2_LOG_DEBUG(fmt("Attach segment#%lu to CUID#%" PRId64 ".", static_cast(piece->getIndex()), cuid)); if (piece->getWrDiskCacheEntry()) { // Flush cached data here, because the cached data may be overlapped // if BT peers are involved. A2_LOG_DEBUG(fmt( "Flushing cached data, size=%lu", static_cast(piece->getWrDiskCacheEntry()->getSize()))); flushWrDiskCache(pieceStorage_->getWrDiskCache(), piece); } piece->setUsedBySegment(true); std::shared_ptr segment; if (piece->getLength() == 0) { segment = std::make_shared(piece); } else { segment = std::make_shared( downloadContext_->getPieceLength(), piece); } auto entry = std::make_shared(cuid, segment); usedSegmentEntries_.push_back(entry); A2_LOG_DEBUG(fmt("index=%lu, length=%" PRId64 ", segmentLength=%" PRId64 "," " writtenLength=%" PRId64, static_cast(segment->getIndex()), segment->getLength(), segment->getSegmentLength(), segment->getWrittenLength())); if (piece->getLength() > 0) { auto positr = segmentWrittenLengthMemo_.find(segment->getIndex()); if (positr != segmentWrittenLengthMemo_.end()) { const auto writtenLength = (*positr).second; A2_LOG_DEBUG(fmt("writtenLength(in memo)=%" PRId64 ", writtenLength=%" PRId64, writtenLength, segment->getWrittenLength())); // If the difference between cached writtenLength and segment's // writtenLength is less than one block, we assume that these // missing bytes are already downloaded. if (segment->getWrittenLength() < writtenLength && writtenLength - segment->getWrittenLength() < piece->getBlockLength()) { segment->updateWrittenLength(writtenLength - segment->getWrittenLength()); } } } return segment; } void SegmentMan::getInFlightSegment( std::vector>& segments, cuid_t cuid) { for (SegmentEntries::const_iterator itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end(); itr != eoi; ++itr) { const std::shared_ptr& segmentEntry = *itr; if (segmentEntry->cuid == cuid) { segments.push_back(segmentEntry->segment); } } } std::shared_ptr SegmentMan::getSegment(cuid_t cuid, size_t minSplitSize) { std::shared_ptr piece = pieceStorage_->getMissingPiece( minSplitSize, ignoreBitfield_.getFilterBitfield(), ignoreBitfield_.getBitfieldLength(), cuid); return checkoutSegment(cuid, piece); } void SegmentMan::getSegment(std::vector>& segments, cuid_t cuid, size_t minSplitSize, const std::shared_ptr& fileEntry, size_t maxSegments) { BitfieldMan filter(ignoreBitfield_); filter.enableFilter(); filter.addNotFilter(fileEntry->getOffset(), fileEntry->getLength()); std::vector> pending; while (segments.size() < maxSegments) { std::shared_ptr segment = checkoutSegment( cuid, pieceStorage_->getMissingPiece(minSplitSize, filter.getFilterBitfield(), filter.getBitfieldLength(), cuid)); if (!segment) { break; } if (segment->getPositionToWrite() < fileEntry->getOffset() || fileEntry->getLastOffset() <= segment->getPositionToWrite()) { pending.push_back(segment); } else { segments.push_back(segment); } } for (std::vector>::const_iterator i = pending.begin(), eoi = pending.end(); i != eoi; ++i) { cancelSegment(cuid, *i); } } std::shared_ptr SegmentMan::getSegmentWithIndex(cuid_t cuid, size_t index) { if (index > 0 && downloadContext_->getNumPieces() <= index) { return nullptr; } return checkoutSegment(cuid, pieceStorage_->getMissingPiece(index, cuid)); } std::shared_ptr SegmentMan::getCleanSegmentIfOwnerIsIdle(cuid_t cuid, size_t index) { if (index > 0 && downloadContext_->getNumPieces() <= index) { return nullptr; } for (SegmentEntries::const_iterator itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end(); itr != eoi; ++itr) { const std::shared_ptr& segmentEntry = *itr; if (segmentEntry->segment->getIndex() == index) { if (segmentEntry->segment->getWrittenLength() > 0) { return nullptr; } if (segmentEntry->cuid == cuid) { return segmentEntry->segment; } cuid_t owner = segmentEntry->cuid; std::shared_ptr ps = getPeerStat(owner); if (!ps || ps->getStatus() == NetStat::IDLE) { cancelSegment(owner); return getSegmentWithIndex(cuid, index); } else { return nullptr; } } } return nullptr; } void SegmentMan::cancelSegmentInternal(cuid_t cuid, const std::shared_ptr& segment) { A2_LOG_DEBUG(fmt("Canceling segment#%lu", static_cast(segment->getIndex()))); const std::shared_ptr& piece = segment->getPiece(); // TODO In PieceStorage::cancelPiece(), WrDiskCacheEntry may be // released. Flush first. if (piece->getWrDiskCacheEntry()) { // Flush cached data here, because the cached data may be overlapped // if BT peers are involved. A2_LOG_DEBUG(fmt( "Flushing cached data, size=%lu", static_cast(piece->getWrDiskCacheEntry()->getSize()))); flushWrDiskCache(pieceStorage_->getWrDiskCache(), piece); // TODO Exception may cause some segments (pieces) are not // canceled. } piece->setUsedBySegment(false); pieceStorage_->cancelPiece(piece, cuid); segmentWrittenLengthMemo_[segment->getIndex()] = segment->getWrittenLength(); A2_LOG_DEBUG(fmt("Memorized segment index=%lu, writtenLength=%" PRId64, static_cast(segment->getIndex()), segment->getWrittenLength())); } void SegmentMan::cancelSegment(cuid_t cuid) { for (auto itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end(); itr != eoi;) { if ((*itr)->cuid == cuid) { cancelSegmentInternal(cuid, (*itr)->segment); itr = usedSegmentEntries_.erase(itr); eoi = usedSegmentEntries_.end(); } else { ++itr; } } } void SegmentMan::cancelSegment(cuid_t cuid, const std::shared_ptr& segment) { for (auto itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end(); itr != eoi;) { if ((*itr)->cuid == cuid && *(*itr)->segment == *segment) { cancelSegmentInternal(cuid, (*itr)->segment); itr = usedSegmentEntries_.erase(itr); break; } else { ++itr; } } } void SegmentMan::cancelAllSegments() { for (auto& e : usedSegmentEntries_) { cancelSegmentInternal(e->cuid, e->segment); } usedSegmentEntries_.clear(); } void SegmentMan::eraseSegmentWrittenLengthMemo() { segmentWrittenLengthMemo_.clear(); } namespace { class FindSegmentEntry { private: std::shared_ptr segment_; public: FindSegmentEntry(std::shared_ptr segment) : segment_(std::move(segment)) { } bool operator()(const std::shared_ptr& segmentEntry) const { return segmentEntry->segment->getIndex() == segment_->getIndex(); } }; } // namespace bool SegmentMan::completeSegment(cuid_t cuid, const std::shared_ptr& segment) { pieceStorage_->completePiece(segment->getPiece()); pieceStorage_->advertisePiece(cuid, segment->getPiece()->getIndex(), global::wallclock()); auto itr = std::find_if(usedSegmentEntries_.begin(), usedSegmentEntries_.end(), FindSegmentEntry(segment)); if (itr == usedSegmentEntries_.end()) { return false; } else { usedSegmentEntries_.erase(itr); return true; } } bool SegmentMan::hasSegment(size_t index) const { return pieceStorage_->hasPiece(index); } int64_t SegmentMan::getDownloadLength() const { if (!pieceStorage_) { return 0; } else { return pieceStorage_->getCompletedLength(); } } void SegmentMan::registerPeerStat(const std::shared_ptr& peerStat) { peerStats_.push_back(peerStat); } std::shared_ptr SegmentMan::getPeerStat(cuid_t cuid) const { for (auto& e : peerStats_) { if (e->getCuid() == cuid) { return e; } } return nullptr; } namespace { class PeerStatHostProtoEqual { private: const std::shared_ptr& peerStat_; public: PeerStatHostProtoEqual(const std::shared_ptr& peerStat) : peerStat_(peerStat) { } bool operator()(const std::shared_ptr& p) const { return peerStat_->getHostname() == p->getHostname() && peerStat_->getProtocol() == p->getProtocol(); } }; } // namespace void SegmentMan::updateFastestPeerStat( const std::shared_ptr& peerStat) { auto i = std::find_if(fastestPeerStats_.begin(), fastestPeerStats_.end(), PeerStatHostProtoEqual(peerStat)); if (i == fastestPeerStats_.end()) { fastestPeerStats_.push_back(peerStat); } else if ((*i)->getAvgDownloadSpeed() < peerStat->getAvgDownloadSpeed()) { // *i's SessionDownloadLength must be added to peerStat peerStat->addSessionDownloadLength((*i)->getSessionDownloadLength()); *i = peerStat; } else { // peerStat's SessionDownloadLength must be added to *i (*i)->addSessionDownloadLength(peerStat->getSessionDownloadLength()); } } size_t SegmentMan::countFreePieceFrom(size_t index) const { size_t numPieces = downloadContext_->getNumPieces(); for (size_t i = index; i < numPieces; ++i) { if (pieceStorage_->hasPiece(i) || pieceStorage_->isPieceUsed(i)) { return i - index; } } return downloadContext_->getNumPieces() - index; } void SegmentMan::ignoreSegmentFor(const std::shared_ptr& fileEntry) { A2_LOG_DEBUG(fmt("ignoring segment for path=%s, offset=%" PRId64 ", length=%" PRId64 "", fileEntry->getPath().c_str(), fileEntry->getOffset(), fileEntry->getLength())); ignoreBitfield_.addFilter(fileEntry->getOffset(), fileEntry->getLength()); } void SegmentMan::recognizeSegmentFor( const std::shared_ptr& fileEntry) { ignoreBitfield_.removeFilter(fileEntry->getOffset(), fileEntry->getLength()); } bool SegmentMan::allSegmentsIgnored() const { return ignoreBitfield_.isAllFilterBitSet(); } } // namespace aria2