SegmentMan.cc 15 KB


  1. /* <!-- copyright */
  2. /*
  3. * aria2 - The high speed download utility
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * In addition, as a special exception, the copyright holders give
  22. * permission to link the code of portions of this program with the
  23. * OpenSSL library under certain conditions as described in each
  24. * individual source file, and distribute linked combinations
  25. * including the two.
  26. * You must obey the GNU General Public License in all respects
  27. * for all of the code used other than OpenSSL. If you modify
  28. * file(s) with this exception, you may extend this exception to your
  29. * version of the file(s), but you are not obligated to do so. If you
  30. * do not wish to do so, delete this exception statement from your
  31. * version. If you delete this exception statement from all source
  32. * files in the program, then also delete it here.
  33. */
  34. /* copyright --> */
  35. #include "SegmentMan.h"
  36. #include <cassert>
  37. #include <algorithm>
  38. #include <numeric>
  39. #include "util.h"
  40. #include "message.h"
  41. #include "prefs.h"
  42. #include "PiecedSegment.h"
  43. #include "GrowSegment.h"
  44. #include "LogFactory.h"
  45. #include "Logger.h"
  46. #include "PieceStorage.h"
  47. #include "PeerStat.h"
  48. #include "Option.h"
  49. #include "DownloadContext.h"
  50. #include "Piece.h"
  51. #include "FileEntry.h"
  52. #include "wallclock.h"
  53. #include "fmt.h"
  54. #include "WrDiskCacheEntry.h"
  55. #include "DownloadFailureException.h"
  56. namespace aria2 {
  57. SegmentEntry::SegmentEntry(cuid_t cuid, const std::shared_ptr<Segment>& segment)
  58. : cuid(cuid), segment(segment)
  59. {
  60. }
  61. SegmentEntry::~SegmentEntry() = default;
  62. SegmentMan::SegmentMan(const std::shared_ptr<DownloadContext>& downloadContext,
  63. const std::shared_ptr<PieceStorage>& pieceStorage)
  64. : downloadContext_(downloadContext),
  65. pieceStorage_(pieceStorage),
  66. ignoreBitfield_(downloadContext->getPieceLength(),
  67. downloadContext->getTotalLength())
  68. {
  69. ignoreBitfield_.enableFilter();
  70. }
  71. SegmentMan::~SegmentMan() = default;
  72. bool SegmentMan::downloadFinished() const
  73. {
  74. if (!pieceStorage_) {
  75. return false;
  76. }
  77. else {
  78. return pieceStorage_->downloadFinished();
  79. }
  80. }
  81. void SegmentMan::init()
  82. {
  83. // TODO Do we have to do something about DownloadContext and
  84. // PieceStorage here?
  85. }
  86. int64_t SegmentMan::getTotalLength() const
  87. {
  88. if (!pieceStorage_) {
  89. return 0;
  90. }
  91. else {
  92. return pieceStorage_->getTotalLength();
  93. }
  94. }
  95. void SegmentMan::setPieceStorage(
  96. const std::shared_ptr<PieceStorage>& pieceStorage)
  97. {
  98. pieceStorage_ = pieceStorage;
  99. }
  100. void SegmentMan::setDownloadContext(
  101. const std::shared_ptr<DownloadContext>& downloadContext)
  102. {
  103. downloadContext_ = downloadContext;
  104. }
  105. namespace {
  106. void flushWrDiskCache(WrDiskCache* wrDiskCache,
  107. const std::shared_ptr<Piece>& piece)
  108. {
  109. piece->flushWrCache(wrDiskCache);
  110. if (piece->getWrDiskCacheEntry()->getError() !=
  111. WrDiskCacheEntry::CACHE_ERR_SUCCESS) {
  112. piece->clearAllBlock(wrDiskCache);
  113. throw DOWNLOAD_FAILURE_EXCEPTION2(
  114. fmt("Write disk cache flush failure index=%lu",
  115. static_cast<unsigned long>(piece->getIndex())),
  116. piece->getWrDiskCacheEntry()->getErrorCode());
  117. }
  118. }
  119. } // namespace
  120. std::shared_ptr<Segment>
  121. SegmentMan::checkoutSegment(cuid_t cuid, const std::shared_ptr<Piece>& piece)
  122. {
  123. if (!piece) {
  124. return nullptr;
  125. }
  126. A2_LOG_DEBUG(fmt("Attach segment#%lu to CUID#%" PRId64 ".",
  127. static_cast<unsigned long>(piece->getIndex()), cuid));
  128. if (piece->getWrDiskCacheEntry()) {
  129. // Flush cached data here, because the cached data may be overlapped
  130. // if BT peers are involved.
  131. A2_LOG_DEBUG(fmt(
  132. "Flushing cached data, size=%lu",
  133. static_cast<unsigned long>(piece->getWrDiskCacheEntry()->getSize())));
  134. flushWrDiskCache(pieceStorage_->getWrDiskCache(), piece);
  135. }
  136. piece->setUsedBySegment(true);
  137. std::shared_ptr<Segment> segment;
  138. if (piece->getLength() == 0) {
  139. segment = std::make_shared<GrowSegment>(piece);
  140. }
  141. else {
  142. segment = std::make_shared<PiecedSegment>(
  143. downloadContext_->getPieceLength(), piece);
  144. }
  145. auto entry = std::make_shared<SegmentEntry>(cuid, segment);
  146. usedSegmentEntries_.push_back(entry);
  147. A2_LOG_DEBUG(fmt("index=%lu, length=%" PRId64 ", segmentLength=%" PRId64 ","
  148. " writtenLength=%" PRId64,
  149. static_cast<unsigned long>(segment->getIndex()),
  150. segment->getLength(), segment->getSegmentLength(),
  151. segment->getWrittenLength()));
  152. if (piece->getLength() > 0) {
  153. auto positr = segmentWrittenLengthMemo_.find(segment->getIndex());
  154. if (positr != segmentWrittenLengthMemo_.end()) {
  155. const auto writtenLength = (*positr).second;
  156. A2_LOG_DEBUG(fmt("writtenLength(in memo)=%" PRId64
  157. ", writtenLength=%" PRId64,
  158. writtenLength, segment->getWrittenLength()));
  159. // If the difference between cached writtenLength and segment's
  160. // writtenLength is less than one block, we assume that these
  161. // missing bytes are already downloaded.
  162. if (segment->getWrittenLength() < writtenLength &&
  163. writtenLength - segment->getWrittenLength() <
  164. piece->getBlockLength()) {
  165. segment->updateWrittenLength(writtenLength -
  166. segment->getWrittenLength());
  167. }
  168. }
  169. }
  170. return segment;
  171. }
  172. void SegmentMan::getInFlightSegment(
  173. std::vector<std::shared_ptr<Segment>>& segments, cuid_t cuid)
  174. {
  175. for (SegmentEntries::const_iterator itr = usedSegmentEntries_.begin(),
  176. eoi = usedSegmentEntries_.end();
  177. itr != eoi; ++itr) {
  178. const std::shared_ptr<SegmentEntry>& segmentEntry = *itr;
  179. if (segmentEntry->cuid == cuid) {
  180. segments.push_back(segmentEntry->segment);
  181. }
  182. }
  183. }
  184. std::shared_ptr<Segment> SegmentMan::getSegment(cuid_t cuid,
  185. size_t minSplitSize)
  186. {
  187. std::shared_ptr<Piece> piece = pieceStorage_->getMissingPiece(
  188. minSplitSize, ignoreBitfield_.getFilterBitfield(),
  189. ignoreBitfield_.getBitfieldLength(), cuid);
  190. return checkoutSegment(cuid, piece);
  191. }
  192. void SegmentMan::getSegment(std::vector<std::shared_ptr<Segment>>& segments,
  193. cuid_t cuid, size_t minSplitSize,
  194. const std::shared_ptr<FileEntry>& fileEntry,
  195. size_t maxSegments)
  196. {
  197. BitfieldMan filter(ignoreBitfield_);
  198. filter.enableFilter();
  199. filter.addNotFilter(fileEntry->getOffset(), fileEntry->getLength());
  200. std::vector<std::shared_ptr<Segment>> pending;
  201. while (segments.size() < maxSegments) {
  202. std::shared_ptr<Segment> segment = checkoutSegment(
  203. cuid,
  204. pieceStorage_->getMissingPiece(minSplitSize, filter.getFilterBitfield(),
  205. filter.getBitfieldLength(), cuid));
  206. if (!segment) {
  207. break;
  208. }
  209. if (segment->getPositionToWrite() < fileEntry->getOffset() ||
  210. fileEntry->getLastOffset() <= segment->getPositionToWrite()) {
  211. pending.push_back(segment);
  212. }
  213. else {
  214. segments.push_back(segment);
  215. }
  216. }
  217. for (std::vector<std::shared_ptr<Segment>>::const_iterator
  218. i = pending.begin(),
  219. eoi = pending.end();
  220. i != eoi; ++i) {
  221. cancelSegment(cuid, *i);
  222. }
  223. }
  224. std::shared_ptr<Segment> SegmentMan::getSegmentWithIndex(cuid_t cuid,
  225. size_t index)
  226. {
  227. if (index > 0 && downloadContext_->getNumPieces() <= index) {
  228. return nullptr;
  229. }
  230. return checkoutSegment(cuid, pieceStorage_->getMissingPiece(index, cuid));
  231. }
  232. std::shared_ptr<Segment> SegmentMan::getCleanSegmentIfOwnerIsIdle(cuid_t cuid,
  233. size_t index)
  234. {
  235. if (index > 0 && downloadContext_->getNumPieces() <= index) {
  236. return nullptr;
  237. }
  238. for (SegmentEntries::const_iterator itr = usedSegmentEntries_.begin(),
  239. eoi = usedSegmentEntries_.end();
  240. itr != eoi; ++itr) {
  241. const std::shared_ptr<SegmentEntry>& segmentEntry = *itr;
  242. if (segmentEntry->segment->getIndex() == index) {
  243. if (segmentEntry->segment->getWrittenLength() > 0) {
  244. return nullptr;
  245. }
  246. if (segmentEntry->cuid == cuid) {
  247. return segmentEntry->segment;
  248. }
  249. cuid_t owner = segmentEntry->cuid;
  250. std::shared_ptr<PeerStat> ps = getPeerStat(owner);
  251. if (!ps || ps->getStatus() == NetStat::IDLE) {
  252. cancelSegment(owner);
  253. return getSegmentWithIndex(cuid, index);
  254. }
  255. else {
  256. return nullptr;
  257. }
  258. }
  259. }
  260. return nullptr;
  261. }
  262. void SegmentMan::cancelSegmentInternal(cuid_t cuid,
  263. const std::shared_ptr<Segment>& segment)
  264. {
  265. A2_LOG_DEBUG(fmt("Canceling segment#%lu",
  266. static_cast<unsigned long>(segment->getIndex())));
  267. const std::shared_ptr<Piece>& piece = segment->getPiece();
  268. // TODO In PieceStorage::cancelPiece(), WrDiskCacheEntry may be
  269. // released. Flush first.
  270. if (piece->getWrDiskCacheEntry()) {
  271. // Flush cached data here, because the cached data may be overlapped
  272. // if BT peers are involved.
  273. A2_LOG_DEBUG(fmt(
  274. "Flushing cached data, size=%lu",
  275. static_cast<unsigned long>(piece->getWrDiskCacheEntry()->getSize())));
  276. flushWrDiskCache(pieceStorage_->getWrDiskCache(), piece);
  277. // TODO Exception may cause some segments (pieces) are not
  278. // canceled.
  279. }
  280. piece->setUsedBySegment(false);
  281. pieceStorage_->cancelPiece(piece, cuid);
  282. segmentWrittenLengthMemo_[segment->getIndex()] = segment->getWrittenLength();
  283. A2_LOG_DEBUG(fmt("Memorized segment index=%lu, writtenLength=%" PRId64,
  284. static_cast<unsigned long>(segment->getIndex()),
  285. segment->getWrittenLength()));
  286. }
  287. void SegmentMan::cancelSegment(cuid_t cuid)
  288. {
  289. for (auto itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end();
  290. itr != eoi;) {
  291. if ((*itr)->cuid == cuid) {
  292. cancelSegmentInternal(cuid, (*itr)->segment);
  293. itr = usedSegmentEntries_.erase(itr);
  294. eoi = usedSegmentEntries_.end();
  295. }
  296. else {
  297. ++itr;
  298. }
  299. }
  300. }
  301. void SegmentMan::cancelSegment(cuid_t cuid,
  302. const std::shared_ptr<Segment>& segment)
  303. {
  304. for (auto itr = usedSegmentEntries_.begin(), eoi = usedSegmentEntries_.end();
  305. itr != eoi;) {
  306. if ((*itr)->cuid == cuid && *(*itr)->segment == *segment) {
  307. cancelSegmentInternal(cuid, (*itr)->segment);
  308. itr = usedSegmentEntries_.erase(itr);
  309. break;
  310. }
  311. else {
  312. ++itr;
  313. }
  314. }
  315. }
  316. void SegmentMan::cancelAllSegments()
  317. {
  318. for (auto& e : usedSegmentEntries_) {
  319. cancelSegmentInternal(e->cuid, e->segment);
  320. }
  321. usedSegmentEntries_.clear();
  322. }
  323. void SegmentMan::eraseSegmentWrittenLengthMemo()
  324. {
  325. segmentWrittenLengthMemo_.clear();
  326. }
  327. namespace {
  328. class FindSegmentEntry {
  329. private:
  330. std::shared_ptr<Segment> segment_;
  331. public:
  332. FindSegmentEntry(std::shared_ptr<Segment> segment)
  333. : segment_(std::move(segment))
  334. {
  335. }
  336. bool operator()(const std::shared_ptr<SegmentEntry>& segmentEntry) const
  337. {
  338. return segmentEntry->segment->getIndex() == segment_->getIndex();
  339. }
  340. };
  341. } // namespace
  342. bool SegmentMan::completeSegment(cuid_t cuid,
  343. const std::shared_ptr<Segment>& segment)
  344. {
  345. pieceStorage_->completePiece(segment->getPiece());
  346. pieceStorage_->advertisePiece(cuid, segment->getPiece()->getIndex(),
  347. global::wallclock());
  348. auto itr = std::find_if(usedSegmentEntries_.begin(),
  349. usedSegmentEntries_.end(), FindSegmentEntry(segment));
  350. if (itr == usedSegmentEntries_.end()) {
  351. return false;
  352. }
  353. else {
  354. usedSegmentEntries_.erase(itr);
  355. return true;
  356. }
  357. }
  358. bool SegmentMan::hasSegment(size_t index) const
  359. {
  360. return pieceStorage_->hasPiece(index);
  361. }
  362. int64_t SegmentMan::getDownloadLength() const
  363. {
  364. if (!pieceStorage_) {
  365. return 0;
  366. }
  367. else {
  368. return pieceStorage_->getCompletedLength();
  369. }
  370. }
  371. void SegmentMan::registerPeerStat(const std::shared_ptr<PeerStat>& peerStat)
  372. {
  373. peerStats_.push_back(peerStat);
  374. }
  375. std::shared_ptr<PeerStat> SegmentMan::getPeerStat(cuid_t cuid) const
  376. {
  377. for (auto& e : peerStats_) {
  378. if (e->getCuid() == cuid) {
  379. return e;
  380. }
  381. }
  382. return nullptr;
  383. }
  384. namespace {
  385. class PeerStatHostProtoEqual {
  386. private:
  387. const std::shared_ptr<PeerStat>& peerStat_;
  388. public:
  389. PeerStatHostProtoEqual(const std::shared_ptr<PeerStat>& peerStat)
  390. : peerStat_(peerStat)
  391. {
  392. }
  393. bool operator()(const std::shared_ptr<PeerStat>& p) const
  394. {
  395. return peerStat_->getHostname() == p->getHostname() &&
  396. peerStat_->getProtocol() == p->getProtocol();
  397. }
  398. };
  399. } // namespace
  400. void SegmentMan::updateFastestPeerStat(
  401. const std::shared_ptr<PeerStat>& peerStat)
  402. {
  403. auto i = std::find_if(fastestPeerStats_.begin(), fastestPeerStats_.end(),
  404. PeerStatHostProtoEqual(peerStat));
  405. if (i == fastestPeerStats_.end()) {
  406. fastestPeerStats_.push_back(peerStat);
  407. }
  408. else if ((*i)->getAvgDownloadSpeed() < peerStat->getAvgDownloadSpeed()) {
  409. // *i's SessionDownloadLength must be added to peerStat
  410. peerStat->addSessionDownloadLength((*i)->getSessionDownloadLength());
  411. *i = peerStat;
  412. }
  413. else {
  414. // peerStat's SessionDownloadLength must be added to *i
  415. (*i)->addSessionDownloadLength(peerStat->getSessionDownloadLength());
  416. }
  417. }
  418. size_t SegmentMan::countFreePieceFrom(size_t index) const
  419. {
  420. size_t numPieces = downloadContext_->getNumPieces();
  421. for (size_t i = index; i < numPieces; ++i) {
  422. if (pieceStorage_->hasPiece(i) || pieceStorage_->isPieceUsed(i)) {
  423. return i - index;
  424. }
  425. }
  426. return downloadContext_->getNumPieces() - index;
  427. }
  428. void SegmentMan::ignoreSegmentFor(const std::shared_ptr<FileEntry>& fileEntry)
  429. {
  430. A2_LOG_DEBUG(fmt("ignoring segment for path=%s, offset=%" PRId64
  431. ", length=%" PRId64 "",
  432. fileEntry->getPath().c_str(), fileEntry->getOffset(),
  433. fileEntry->getLength()));
  434. ignoreBitfield_.addFilter(fileEntry->getOffset(), fileEntry->getLength());
  435. }
  436. void SegmentMan::recognizeSegmentFor(
  437. const std::shared_ptr<FileEntry>& fileEntry)
  438. {
  439. ignoreBitfield_.removeFilter(fileEntry->getOffset(), fileEntry->getLength());
  440. }
  441. bool SegmentMan::allSegmentsIgnored() const
  442. {
  443. return ignoreBitfield_.isAllFilterBitSet();
  444. }
  445. } // namespace aria2