RequestGroupMan.cc 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074
  1. /* <!-- copyright */
  2. /*
  3. * aria2 - The high speed download utility
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * In addition, as a special exception, the copyright holders give
  22. * permission to link the code of portions of this program with the
  23. * OpenSSL library under certain conditions as described in each
  24. * individual source file, and distribute linked combinations
  25. * including the two.
  26. * You must obey the GNU General Public License in all respects
  27. * for all of the code used other than OpenSSL. If you modify
  28. * file(s) with this exception, you may extend this exception to your
  29. * version of the file(s), but you are not obligated to do so. If you
  30. * do not wish to do so, delete this exception statement from your
  31. * version. If you delete this exception statement from all source
  32. * files in the program, then also delete it here.
  33. */
  34. /* copyright --> */
  35. #include "RequestGroupMan.h"
  36. #include <unistd.h>
  37. #include <cstring>
  38. #include <iomanip>
  39. #include <sstream>
  40. #include <numeric>
  41. #include <algorithm>
  42. #include <utility>
  43. #include "BtProgressInfoFile.h"
  44. #include "RecoverableException.h"
  45. #include "RequestGroup.h"
  46. #include "LogFactory.h"
  47. #include "Logger.h"
  48. #include "DownloadEngine.h"
  49. #include "message.h"
  50. #include "a2functional.h"
  51. #include "DownloadResult.h"
  52. #include "DownloadContext.h"
  53. #include "ServerStatMan.h"
  54. #include "ServerStat.h"
  55. #include "SegmentMan.h"
  56. #include "FeedbackURISelector.h"
  57. #include "InorderURISelector.h"
  58. #include "AdaptiveURISelector.h"
  59. #include "Option.h"
  60. #include "prefs.h"
  61. #include "File.h"
  62. #include "util.h"
  63. #include "Command.h"
  64. #include "FileEntry.h"
  65. #include "fmt.h"
  66. #include "FileAllocationEntry.h"
  67. #include "CheckIntegrityEntry.h"
  68. #include "Segment.h"
  69. #include "DlAbortEx.h"
  70. #include "uri.h"
  71. #include "Signature.h"
  72. #include "OutputFile.h"
  73. #include "download_helper.h"
  74. #include "UriListParser.h"
  75. #include "SingletonHolder.h"
  76. #include "Notifier.h"
  77. #include "PeerStat.h"
  78. #include "WrDiskCache.h"
  79. #include "PieceStorage.h"
  80. #include "DiskAdaptor.h"
  81. #include "SimpleRandomizer.h"
  82. #include "array_fun.h"
  83. #include "OpenedFileCounter.h"
  84. #include "wallclock.h"
  85. #ifdef ENABLE_BITTORRENT
  86. #include "bittorrent_helper.h"
  87. #endif // ENABLE_BITTORRENT
  88. namespace aria2 {
  89. namespace {
  90. template <typename InputIterator>
  91. void appendReservedGroup(RequestGroupList& list, InputIterator first,
  92. InputIterator last)
  93. {
  94. for (; first != last; ++first) {
  95. list.push_back((*first)->getGID(), *first);
  96. }
  97. }
  98. } // namespace
  99. RequestGroupMan::RequestGroupMan(
  100. std::vector<std::shared_ptr<RequestGroup>> requestGroups,
  101. int maxConcurrentDownloads, const Option* option)
  102. : maxConcurrentDownloads_(maxConcurrentDownloads),
  103. optimizeConcurrentDownloads_(false),
  104. optimizeConcurrentDownloadsCoeffA_(5.),
  105. optimizeConcurrentDownloadsCoeffB_(25.),
  106. optimizationSpeed_(0),
  107. numActive_(0),
  108. option_(option),
  109. serverStatMan_(std::make_shared<ServerStatMan>()),
  110. maxOverallDownloadSpeedLimit_(
  111. option->getAsInt(PREF_MAX_OVERALL_DOWNLOAD_LIMIT)),
  112. maxOverallUploadSpeedLimit_(
  113. option->getAsInt(PREF_MAX_OVERALL_UPLOAD_LIMIT)),
  114. keepRunning_(option->getAsBool(PREF_ENABLE_RPC)),
  115. queueCheck_(true),
  116. removedErrorResult_(0),
  117. removedLastErrorResult_(error_code::FINISHED),
  118. maxDownloadResult_(option->getAsInt(PREF_MAX_DOWNLOAD_RESULT)),
  119. openedFileCounter_(std::make_shared<OpenedFileCounter>(
  120. this, option->getAsInt(PREF_BT_MAX_OPEN_FILES))),
  121. numStoppedTotal_(0)
  122. {
  123. setupOptimizeConcurrentDownloads();
  124. appendReservedGroup(reservedGroups_, requestGroups.begin(),
  125. requestGroups.end());
  126. }
  127. RequestGroupMan::~RequestGroupMan() { openedFileCounter_->deactivate(); }
  128. bool RequestGroupMan::setupOptimizeConcurrentDownloads(void)
  129. {
  130. optimizeConcurrentDownloads_ = option_->getAsBool(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS);
  131. if (optimizeConcurrentDownloads_) {
  132. if (option_->defined(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA)) {
  133. optimizeConcurrentDownloadsCoeffA_ = std::stod(option_->get(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA));
  134. optimizeConcurrentDownloadsCoeffB_ = std::stod(option_->get(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFB));
  135. }
  136. }
  137. return optimizeConcurrentDownloads_;
  138. }
  139. bool RequestGroupMan::downloadFinished()
  140. {
  141. if (keepRunning_) {
  142. return false;
  143. }
  144. return requestGroups_.empty() && reservedGroups_.empty();
  145. }
  146. void RequestGroupMan::addRequestGroup(
  147. const std::shared_ptr<RequestGroup>& group)
  148. {
  149. ++numActive_;
  150. requestGroups_.push_back(group->getGID(), group);
  151. }
  152. void RequestGroupMan::addReservedGroup(
  153. const std::vector<std::shared_ptr<RequestGroup>>& groups)
  154. {
  155. requestQueueCheck();
  156. appendReservedGroup(reservedGroups_, groups.begin(), groups.end());
  157. }
  158. void RequestGroupMan::addReservedGroup(
  159. const std::shared_ptr<RequestGroup>& group)
  160. {
  161. requestQueueCheck();
  162. reservedGroups_.push_back(group->getGID(), group);
  163. }
  164. namespace {
  165. struct RequestGroupKeyFunc {
  166. a2_gid_t operator()(const std::shared_ptr<RequestGroup>& rg) const
  167. {
  168. return rg->getGID();
  169. }
  170. };
  171. } // namespace
  172. void RequestGroupMan::insertReservedGroup(
  173. size_t pos, const std::vector<std::shared_ptr<RequestGroup>>& groups)
  174. {
  175. requestQueueCheck();
  176. pos = std::min(reservedGroups_.size(), pos);
  177. reservedGroups_.insert(pos, RequestGroupKeyFunc(), groups.begin(),
  178. groups.end());
  179. }
  180. void RequestGroupMan::insertReservedGroup(
  181. size_t pos, const std::shared_ptr<RequestGroup>& group)
  182. {
  183. requestQueueCheck();
  184. pos = std::min(reservedGroups_.size(), pos);
  185. reservedGroups_.insert(pos, group->getGID(), group);
  186. }
  187. size_t RequestGroupMan::countRequestGroup() const
  188. {
  189. return requestGroups_.size();
  190. }
  191. std::shared_ptr<RequestGroup> RequestGroupMan::findGroup(a2_gid_t gid) const
  192. {
  193. std::shared_ptr<RequestGroup> rg = requestGroups_.get(gid);
  194. if (!rg) {
  195. rg = reservedGroups_.get(gid);
  196. }
  197. return rg;
  198. }
  199. size_t RequestGroupMan::changeReservedGroupPosition(a2_gid_t gid, int pos,
  200. OffsetMode how)
  201. {
  202. ssize_t dest = reservedGroups_.move(gid, pos, how);
  203. if (dest == -1) {
  204. throw DL_ABORT_EX(fmt("GID#%s not found in the waiting queue.",
  205. GroupId::toHex(gid).c_str()));
  206. }
  207. else {
  208. return dest;
  209. }
  210. }
  211. bool RequestGroupMan::removeReservedGroup(a2_gid_t gid)
  212. {
  213. return reservedGroups_.remove(gid);
  214. }
  215. namespace {
  216. void notifyDownloadEvent(DownloadEvent event,
  217. const std::shared_ptr<RequestGroup>& group)
  218. {
  219. // Check NULL to make unit test easier.
  220. if (SingletonHolder<Notifier>::instance()) {
  221. SingletonHolder<Notifier>::instance()->notifyDownloadEvent(event, group);
  222. }
  223. }
  224. } // namespace
  225. namespace {
  226. void executeStopHook(const std::shared_ptr<RequestGroup>& group,
  227. const Option* option, error_code::Value result)
  228. {
  229. PrefPtr hookPref = nullptr;
  230. if (!option->blank(PREF_ON_DOWNLOAD_STOP)) {
  231. hookPref = PREF_ON_DOWNLOAD_STOP;
  232. }
  233. if (result == error_code::FINISHED) {
  234. if (!option->blank(PREF_ON_DOWNLOAD_COMPLETE)) {
  235. hookPref = PREF_ON_DOWNLOAD_COMPLETE;
  236. }
  237. }
  238. else if (result != error_code::IN_PROGRESS && result != error_code::REMOVED) {
  239. if (!option->blank(PREF_ON_DOWNLOAD_ERROR)) {
  240. hookPref = PREF_ON_DOWNLOAD_ERROR;
  241. }
  242. }
  243. if (hookPref) {
  244. util::executeHookByOptName(group, option, hookPref);
  245. }
  246. if (result == error_code::FINISHED) {
  247. notifyDownloadEvent(EVENT_ON_DOWNLOAD_COMPLETE, group);
  248. }
  249. else if (result != error_code::IN_PROGRESS && result != error_code::REMOVED) {
  250. notifyDownloadEvent(EVENT_ON_DOWNLOAD_ERROR, group);
  251. }
  252. else {
  253. notifyDownloadEvent(EVENT_ON_DOWNLOAD_STOP, group);
  254. }
  255. }
  256. } // namespace
  257. namespace {
  258. class ProcessStoppedRequestGroup {
  259. private:
  260. DownloadEngine* e_;
  261. RequestGroupList& reservedGroups_;
  262. void saveSignature(const std::shared_ptr<RequestGroup>& group)
  263. {
  264. auto& sig = group->getDownloadContext()->getSignature();
  265. if (sig && !sig->getBody().empty()) {
  266. // filename of signature file is the path to download file followed by
  267. // ".sig".
  268. std::string signatureFile = group->getFirstFilePath() + ".sig";
  269. if (sig->save(signatureFile)) {
  270. A2_LOG_NOTICE(fmt(MSG_SIGNATURE_SAVED, signatureFile.c_str()));
  271. }
  272. else {
  273. A2_LOG_NOTICE(fmt(MSG_SIGNATURE_NOT_SAVED, signatureFile.c_str()));
  274. }
  275. }
  276. }
  277. // Collect statistics during download in PeerStats and update/register
  278. // ServerStatMan
  279. void collectStat(const std::shared_ptr<RequestGroup>& group)
  280. {
  281. if (group->getSegmentMan()) {
  282. bool singleConnection =
  283. group->getSegmentMan()->getPeerStats().size() == 1;
  284. const std::vector<std::shared_ptr<PeerStat>>& peerStats =
  285. group->getSegmentMan()->getFastestPeerStats();
  286. for (auto& stat : peerStats) {
  287. if (stat->getHostname().empty() || stat->getProtocol().empty()) {
  288. continue;
  289. }
  290. int speed = stat->getAvgDownloadSpeed();
  291. if (speed == 0)
  292. continue;
  293. std::shared_ptr<ServerStat> ss =
  294. e_->getRequestGroupMan()->getOrCreateServerStat(
  295. stat->getHostname(), stat->getProtocol());
  296. ss->increaseCounter();
  297. ss->updateDownloadSpeed(speed);
  298. if (singleConnection) {
  299. ss->updateSingleConnectionAvgSpeed(speed);
  300. }
  301. else {
  302. ss->updateMultiConnectionAvgSpeed(speed);
  303. }
  304. }
  305. }
  306. }
  307. public:
  308. ProcessStoppedRequestGroup(DownloadEngine* e,
  309. RequestGroupList& reservedGroups)
  310. : e_(e), reservedGroups_(reservedGroups)
  311. {
  312. }
  313. bool operator()(const RequestGroupList::value_type& group)
  314. {
  315. if (group->getNumCommand() == 0) {
  316. collectStat(group);
  317. const std::shared_ptr<DownloadContext>& dctx =
  318. group->getDownloadContext();
  319. if (!group->isSeedOnlyEnabled()) {
  320. e_->getRequestGroupMan()->decreaseNumActive();
  321. }
  322. // DownloadContext::resetDownloadStopTime() is only called when
  323. // download completed. If
  324. // DownloadContext::getDownloadStopTime().isZero() is true, then
  325. // there is a possibility that the download is error or
  326. // in-progress and resetDownloadStopTime() is not called. So
  327. // call it here.
  328. if (dctx->getDownloadStopTime().isZero()) {
  329. dctx->resetDownloadStopTime();
  330. }
  331. try {
  332. group->closeFile();
  333. if (group->isPauseRequested()) {
  334. A2_LOG_NOTICE(fmt(_("Download GID#%s paused"),
  335. GroupId::toHex(group->getGID()).c_str()));
  336. group->saveControlFile();
  337. }
  338. else if (group->downloadFinished() &&
  339. !group->getDownloadContext()->isChecksumVerificationNeeded()) {
  340. group->applyLastModifiedTimeToLocalFiles();
  341. group->reportDownloadFinished();
  342. if (group->allDownloadFinished() &&
  343. !group->getOption()->getAsBool(PREF_FORCE_SAVE)) {
  344. group->removeControlFile();
  345. saveSignature(group);
  346. }
  347. else {
  348. group->saveControlFile();
  349. }
  350. std::vector<std::shared_ptr<RequestGroup>> nextGroups;
  351. group->postDownloadProcessing(nextGroups);
  352. if (!nextGroups.empty()) {
  353. A2_LOG_DEBUG(fmt("Adding %lu RequestGroups as a result of"
  354. " PostDownloadHandler.",
  355. static_cast<unsigned long>(nextGroups.size())));
  356. e_->getRequestGroupMan()->insertReservedGroup(0, nextGroups);
  357. }
  358. #ifdef ENABLE_BITTORRENT
  359. // For in-memory download (e.g., Magnet URI), the
  360. // FileEntry::getPath() does not return actual file path, so
  361. // we don't remove it.
  362. if (group->getOption()->getAsBool(PREF_BT_REMOVE_UNSELECTED_FILE) &&
  363. !group->inMemoryDownload() && dctx->hasAttribute(CTX_ATTR_BT)) {
  364. A2_LOG_INFO(fmt(MSG_REMOVING_UNSELECTED_FILE,
  365. GroupId::toHex(group->getGID()).c_str()));
  366. const std::vector<std::shared_ptr<FileEntry>>& files =
  367. dctx->getFileEntries();
  368. for (auto& file : files) {
  369. if (!file->isRequested()) {
  370. if (File(file->getPath()).remove()) {
  371. A2_LOG_INFO(fmt(MSG_FILE_REMOVED, file->getPath().c_str()));
  372. }
  373. else {
  374. A2_LOG_INFO(
  375. fmt(MSG_FILE_COULD_NOT_REMOVED, file->getPath().c_str()));
  376. }
  377. }
  378. }
  379. }
  380. #endif // ENABLE_BITTORRENT
  381. }
  382. else {
  383. A2_LOG_NOTICE(
  384. fmt(_("Download GID#%s not complete: %s"),
  385. GroupId::toHex(group->getGID()).c_str(),
  386. group->getDownloadContext()->getBasePath().c_str()));
  387. group->saveControlFile();
  388. }
  389. }
  390. catch (RecoverableException& ex) {
  391. A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, ex);
  392. }
  393. if (group->isPauseRequested()) {
  394. group->setState(RequestGroup::STATE_WAITING);
  395. reservedGroups_.push_front(group->getGID(), group);
  396. group->releaseRuntimeResource(e_);
  397. group->setForceHaltRequested(false);
  398. util::executeHookByOptName(group, e_->getOption(),
  399. PREF_ON_DOWNLOAD_PAUSE);
  400. notifyDownloadEvent(EVENT_ON_DOWNLOAD_PAUSE, group);
  401. // TODO Should we have to prepend spend uris to remaining uris
  402. // in case PREF_REUSE_URI is disabled?
  403. }
  404. else {
  405. std::shared_ptr<DownloadResult> dr = group->createDownloadResult();
  406. e_->getRequestGroupMan()->addDownloadResult(dr);
  407. executeStopHook(group, e_->getOption(), dr->result);
  408. group->releaseRuntimeResource(e_);
  409. }
  410. return true;
  411. }
  412. else {
  413. return false;
  414. }
  415. }
  416. };
  417. } // namespace
  418. void RequestGroupMan::removeStoppedGroup(DownloadEngine* e)
  419. {
  420. size_t numPrev = requestGroups_.size();
  421. requestGroups_.remove_if(ProcessStoppedRequestGroup(e, reservedGroups_));
  422. size_t numRemoved = numPrev - requestGroups_.size();
  423. if (numRemoved > 0) {
  424. A2_LOG_DEBUG(fmt("%lu RequestGroup(s) deleted.",
  425. static_cast<unsigned long>(numRemoved)));
  426. }
  427. }
  428. void RequestGroupMan::configureRequestGroup(
  429. const std::shared_ptr<RequestGroup>& requestGroup) const
  430. {
  431. const std::string& uriSelectorValue =
  432. requestGroup->getOption()->get(PREF_URI_SELECTOR);
  433. if (uriSelectorValue == V_FEEDBACK) {
  434. requestGroup->setURISelector(
  435. make_unique<FeedbackURISelector>(serverStatMan_));
  436. }
  437. else if (uriSelectorValue == V_INORDER) {
  438. requestGroup->setURISelector(make_unique<InorderURISelector>());
  439. }
  440. else if (uriSelectorValue == V_ADAPTIVE) {
  441. requestGroup->setURISelector(
  442. make_unique<AdaptiveURISelector>(serverStatMan_, requestGroup.get()));
  443. }
  444. }
  445. namespace {
  446. std::vector<std::unique_ptr<Command>>
  447. createInitialCommand(const std::shared_ptr<RequestGroup>& requestGroup,
  448. DownloadEngine* e)
  449. {
  450. std::vector<std::unique_ptr<Command>> res;
  451. requestGroup->createInitialCommand(res, e);
  452. return res;
  453. }
  454. } // namespace
  455. void RequestGroupMan::fillRequestGroupFromReserver(DownloadEngine* e)
  456. {
  457. removeStoppedGroup(e);
  458. int maxConcurrentDownloads = optimizeConcurrentDownloads_ ? optimizeConcurrentDownloads() : maxConcurrentDownloads_;
  459. if (static_cast<size_t>(maxConcurrentDownloads) <= numActive_) {
  460. return;
  461. }
  462. int count = 0;
  463. int num = maxConcurrentDownloads - numActive_;
  464. std::vector<std::shared_ptr<RequestGroup>> pending;
  465. while (count < num && (uriListParser_ || !reservedGroups_.empty())) {
  466. if (uriListParser_ && reservedGroups_.empty()) {
  467. std::vector<std::shared_ptr<RequestGroup>> groups;
  468. // May throw exception
  469. bool ok = createRequestGroupFromUriListParser(groups, option_,
  470. uriListParser_.get());
  471. if (ok) {
  472. appendReservedGroup(reservedGroups_, groups.begin(), groups.end());
  473. }
  474. else {
  475. uriListParser_.reset();
  476. if (reservedGroups_.empty()) {
  477. break;
  478. }
  479. }
  480. }
  481. std::shared_ptr<RequestGroup> groupToAdd = *reservedGroups_.begin();
  482. reservedGroups_.pop_front();
  483. if ((keepRunning_ && groupToAdd->isPauseRequested()) ||
  484. !groupToAdd->isDependencyResolved()) {
  485. pending.push_back(groupToAdd);
  486. continue;
  487. }
  488. // Drop pieceStorage here because paused download holds its
  489. // reference.
  490. groupToAdd->dropPieceStorage();
  491. configureRequestGroup(groupToAdd);
  492. groupToAdd->setRequestGroupMan(this);
  493. groupToAdd->setState(RequestGroup::STATE_ACTIVE);
  494. ++numActive_;
  495. requestGroups_.push_back(groupToAdd->getGID(), groupToAdd);
  496. try {
  497. auto res = createInitialCommand(groupToAdd, e);
  498. ++count;
  499. if (res.empty()) {
  500. requestQueueCheck();
  501. }
  502. else {
  503. e->addCommand(std::move(res));
  504. }
  505. }
  506. catch (RecoverableException& ex) {
  507. A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, ex);
  508. A2_LOG_DEBUG("Deleting temporal commands.");
  509. groupToAdd->setLastErrorCode(ex.getErrorCode(), ex.what());
  510. // We add groupToAdd to e later in order to it is processed in
  511. // removeStoppedGroup().
  512. requestQueueCheck();
  513. }
  514. util::executeHookByOptName(groupToAdd, e->getOption(),
  515. PREF_ON_DOWNLOAD_START);
  516. notifyDownloadEvent(EVENT_ON_DOWNLOAD_START, groupToAdd);
  517. }
  518. if (!pending.empty()) {
  519. reservedGroups_.insert(reservedGroups_.begin(), RequestGroupKeyFunc(),
  520. pending.begin(), pending.end());
  521. }
  522. if (count > 0) {
  523. e->setNoWait(true);
  524. e->setRefreshInterval(std::chrono::milliseconds(0));
  525. A2_LOG_DEBUG(fmt("%d RequestGroup(s) added.", count));
  526. }
  527. }
  528. void RequestGroupMan::save()
  529. {
  530. for (auto& rg : requestGroups_) {
  531. if (rg->allDownloadFinished() &&
  532. !rg->getDownloadContext()->isChecksumVerificationNeeded() &&
  533. !rg->getOption()->getAsBool(PREF_FORCE_SAVE)) {
  534. rg->removeControlFile();
  535. }
  536. else {
  537. try {
  538. rg->saveControlFile();
  539. }
  540. catch (RecoverableException& e) {
  541. A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, e);
  542. }
  543. }
  544. }
  545. }
  546. void RequestGroupMan::closeFile()
  547. {
  548. for (auto& elem : requestGroups_) {
  549. elem->closeFile();
  550. }
  551. }
  552. RequestGroupMan::DownloadStat RequestGroupMan::getDownloadStat() const
  553. {
  554. int finished = 0;
  555. int error = removedErrorResult_;
  556. int inprogress = 0;
  557. int removed = 0;
  558. error_code::Value lastError = removedLastErrorResult_;
  559. for (auto& dr : downloadResults_) {
  560. if (dr->belongsTo != 0) {
  561. continue;
  562. }
  563. if (dr->result == error_code::FINISHED) {
  564. ++finished;
  565. }
  566. else if (dr->result == error_code::IN_PROGRESS) {
  567. ++inprogress;
  568. }
  569. else if (dr->result == error_code::REMOVED) {
  570. ++removed;
  571. }
  572. else {
  573. ++error;
  574. lastError = dr->result;
  575. }
  576. }
  577. return DownloadStat(error, inprogress, reservedGroups_.size(), lastError);
  578. }
  579. enum DownloadResultStatus {
  580. A2_STATUS_OK,
  581. A2_STATUS_INPR,
  582. A2_STATUS_RM,
  583. A2_STATUS_ERR
  584. };
  585. namespace {
  586. const char* getStatusStr(DownloadResultStatus status, bool useColor)
  587. {
  588. // status string is formatted in 4 characters wide.
  589. switch (status) {
  590. case (A2_STATUS_OK):
  591. if (useColor) {
  592. return "\033[1;32mOK\033[0m ";
  593. }
  594. else {
  595. return "OK ";
  596. }
  597. case (A2_STATUS_INPR):
  598. if (useColor) {
  599. return "\033[1;34mINPR\033[0m";
  600. }
  601. else {
  602. return "INPR";
  603. }
  604. case (A2_STATUS_RM):
  605. if (useColor) {
  606. return "\033[1mRM\033[0m ";
  607. }
  608. else {
  609. return "RM ";
  610. }
  611. case (A2_STATUS_ERR):
  612. if (useColor) {
  613. return "\033[1;31mERR\033[0m ";
  614. }
  615. else {
  616. return "ERR ";
  617. }
  618. default:
  619. return "";
  620. }
  621. }
  622. } // namespace
  623. void RequestGroupMan::showDownloadResults(OutputFile& o, bool full) const
  624. {
  625. int pathRowSize = 55;
  626. // Download Results:
  627. // idx|stat|path/length
  628. // ===+====+=======================================================================
  629. o.printf("\n%s"
  630. "\ngid |stat|avg speed |",
  631. _("Download Results:"));
  632. if (full) {
  633. o.write(" %|path/URI"
  634. "\n======+====+===========+===+");
  635. pathRowSize -= 4;
  636. }
  637. else {
  638. o.write("path/URI"
  639. "\n======+====+===========+");
  640. }
  641. std::string line(pathRowSize, '=');
  642. o.printf("%s\n", line.c_str());
  643. bool useColor = o.supportsColor() && option_->getAsBool(PREF_ENABLE_COLOR);
  644. int ok = 0;
  645. int err = 0;
  646. int inpr = 0;
  647. int rm = 0;
  648. for (auto& dr : downloadResults_) {
  649. if (dr->belongsTo != 0) {
  650. continue;
  651. }
  652. const char* status;
  653. switch (dr->result) {
  654. case error_code::FINISHED:
  655. status = getStatusStr(A2_STATUS_OK, useColor);
  656. ++ok;
  657. break;
  658. case error_code::IN_PROGRESS:
  659. status = getStatusStr(A2_STATUS_INPR, useColor);
  660. ++inpr;
  661. break;
  662. case error_code::REMOVED:
  663. status = getStatusStr(A2_STATUS_RM, useColor);
  664. ++rm;
  665. break;
  666. default:
  667. status = getStatusStr(A2_STATUS_ERR, useColor);
  668. ++err;
  669. }
  670. if (full) {
  671. formatDownloadResultFull(o, status, dr);
  672. }
  673. else {
  674. o.write(formatDownloadResult(status, dr).c_str());
  675. o.write("\n");
  676. }
  677. }
  678. if (ok > 0 || err > 0 || inpr > 0 || rm > 0) {
  679. o.printf("\n%s\n", _("Status Legend:"));
  680. if (ok > 0) {
  681. o.write(_("(OK):download completed."));
  682. }
  683. if (err > 0) {
  684. o.write(_("(ERR):error occurred."));
  685. }
  686. if (inpr > 0) {
  687. o.write(_("(INPR):download in-progress."));
  688. }
  689. if (rm > 0) {
  690. o.write(_("(RM):download removed."));
  691. }
  692. o.write("\n");
  693. }
  694. }
  695. namespace {
  696. void formatDownloadResultCommon(
  697. std::ostream& o, const char* status,
  698. const std::shared_ptr<DownloadResult>& downloadResult)
  699. {
  700. o << std::setw(3) << downloadResult->gid->toAbbrevHex() << "|" << std::setw(4)
  701. << status << "|";
  702. if (downloadResult->sessionTime.count() > 0) {
  703. o << std::setw(8)
  704. << util::abbrevSize(downloadResult->sessionDownloadLength * 1000 /
  705. downloadResult->sessionTime.count()) << "B/s";
  706. }
  707. else {
  708. o << std::setw(11);
  709. o << "n/a";
  710. }
  711. o << "|";
  712. }
  713. } // namespace
  714. void RequestGroupMan::formatDownloadResultFull(
  715. OutputFile& out, const char* status,
  716. const std::shared_ptr<DownloadResult>& downloadResult) const
  717. {
  718. BitfieldMan bt(downloadResult->pieceLength, downloadResult->totalLength);
  719. bt.setBitfield(
  720. reinterpret_cast<const unsigned char*>(downloadResult->bitfield.data()),
  721. downloadResult->bitfield.size());
  722. bool head = true;
  723. const std::vector<std::shared_ptr<FileEntry>>& fileEntries =
  724. downloadResult->fileEntries;
  725. for (auto& f : fileEntries) {
  726. if (!f->isRequested()) {
  727. continue;
  728. }
  729. std::stringstream o;
  730. if (head) {
  731. formatDownloadResultCommon(o, status, downloadResult);
  732. head = false;
  733. }
  734. else {
  735. o << " | | |";
  736. }
  737. if (f->getLength() == 0 || downloadResult->bitfield.empty()) {
  738. o << " -|";
  739. }
  740. else {
  741. int64_t completedLength =
  742. bt.getOffsetCompletedLength(f->getOffset(), f->getLength());
  743. o << std::setw(3) << 100 * completedLength / f->getLength() << "|";
  744. }
  745. writeFilePath(o, f, downloadResult->inMemoryDownload);
  746. o << "\n";
  747. out.write(o.str().c_str());
  748. }
  749. if (head) {
  750. std::stringstream o;
  751. formatDownloadResultCommon(o, status, downloadResult);
  752. o << " -|n/a\n";
  753. out.write(o.str().c_str());
  754. }
  755. }
  756. std::string RequestGroupMan::formatDownloadResult(
  757. const char* status,
  758. const std::shared_ptr<DownloadResult>& downloadResult) const
  759. {
  760. std::stringstream o;
  761. formatDownloadResultCommon(o, status, downloadResult);
  762. const std::vector<std::shared_ptr<FileEntry>>& fileEntries =
  763. downloadResult->fileEntries;
  764. writeFilePath(fileEntries.begin(), fileEntries.end(), o,
  765. downloadResult->inMemoryDownload);
  766. return o.str();
  767. }
  768. namespace {
  769. template <typename StringInputIterator, typename FileEntryInputIterator>
  770. bool sameFilePathExists(StringInputIterator sfirst, StringInputIterator slast,
  771. FileEntryInputIterator ffirst,
  772. FileEntryInputIterator flast)
  773. {
  774. for (; ffirst != flast; ++ffirst) {
  775. if (std::binary_search(sfirst, slast, (*ffirst)->getPath())) {
  776. return true;
  777. }
  778. }
  779. return false;
  780. }
  781. } // namespace
  782. bool RequestGroupMan::isSameFileBeingDownloaded(
  783. RequestGroup* requestGroup) const
  784. {
  785. // TODO it may be good to use dedicated method rather than use
  786. // isPreLocalFileCheckEnabled
  787. if (!requestGroup->isPreLocalFileCheckEnabled()) {
  788. return false;
  789. }
  790. std::vector<std::string> files;
  791. for (auto& rg : requestGroups_) {
  792. if (rg.get() != requestGroup) {
  793. const std::vector<std::shared_ptr<FileEntry>>& entries =
  794. rg->getDownloadContext()->getFileEntries();
  795. std::transform(entries.begin(), entries.end(), std::back_inserter(files),
  796. std::mem_fn(&FileEntry::getPath));
  797. }
  798. }
  799. std::sort(files.begin(), files.end());
  800. const std::vector<std::shared_ptr<FileEntry>>& entries =
  801. requestGroup->getDownloadContext()->getFileEntries();
  802. return sameFilePathExists(files.begin(), files.end(), entries.begin(),
  803. entries.end());
  804. }
  805. void RequestGroupMan::halt()
  806. {
  807. for (auto& elem : requestGroups_) {
  808. elem->setHaltRequested(true);
  809. }
  810. }
  811. void RequestGroupMan::forceHalt()
  812. {
  813. for (auto& elem : requestGroups_) {
  814. elem->setForceHaltRequested(true);
  815. }
  816. }
  817. TransferStat RequestGroupMan::calculateStat()
  818. {
  819. // TODO Currently, all time upload length is not set.
  820. return netStat_.toTransferStat();
  821. }
  822. std::shared_ptr<DownloadResult>
  823. RequestGroupMan::findDownloadResult(a2_gid_t gid) const
  824. {
  825. return downloadResults_.get(gid);
  826. }
  827. bool RequestGroupMan::removeDownloadResult(a2_gid_t gid)
  828. {
  829. return downloadResults_.remove(gid);
  830. }
  831. void RequestGroupMan::addDownloadResult(
  832. const std::shared_ptr<DownloadResult>& dr)
  833. {
  834. ++numStoppedTotal_;
  835. bool rv = downloadResults_.push_back(dr->gid->getNumericId(), dr);
  836. assert(rv);
  837. while (downloadResults_.size() > maxDownloadResult_) {
  838. DownloadResultList::iterator i = downloadResults_.begin();
  839. // Save last encountered error code so that we can report it
  840. // later.
  841. const std::shared_ptr<DownloadResult>& dr = *i;
  842. if (dr->belongsTo == 0 && dr->result != error_code::FINISHED) {
  843. removedLastErrorResult_ = dr->result;
  844. ++removedErrorResult_;
  845. }
  846. downloadResults_.pop_front();
  847. }
  848. }
  849. void RequestGroupMan::purgeDownloadResult() { downloadResults_.clear(); }
  850. std::shared_ptr<ServerStat>
  851. RequestGroupMan::findServerStat(const std::string& hostname,
  852. const std::string& protocol) const
  853. {
  854. return serverStatMan_->find(hostname, protocol);
  855. }
  856. std::shared_ptr<ServerStat>
  857. RequestGroupMan::getOrCreateServerStat(const std::string& hostname,
  858. const std::string& protocol)
  859. {
  860. std::shared_ptr<ServerStat> ss = findServerStat(hostname, protocol);
  861. if (!ss) {
  862. ss = std::make_shared<ServerStat>(hostname, protocol);
  863. addServerStat(ss);
  864. }
  865. return ss;
  866. }
  867. bool RequestGroupMan::addServerStat(
  868. const std::shared_ptr<ServerStat>& serverStat)
  869. {
  870. return serverStatMan_->add(serverStat);
  871. }
  872. bool RequestGroupMan::loadServerStat(const std::string& filename)
  873. {
  874. return serverStatMan_->load(filename);
  875. }
  876. bool RequestGroupMan::saveServerStat(const std::string& filename) const
  877. {
  878. return serverStatMan_->save(filename);
  879. }
  880. void RequestGroupMan::removeStaleServerStat(const std::chrono::seconds& timeout)
  881. {
  882. serverStatMan_->removeStaleServerStat(timeout);
  883. }
  884. bool RequestGroupMan::doesOverallDownloadSpeedExceed()
  885. {
  886. return maxOverallDownloadSpeedLimit_ > 0 &&
  887. maxOverallDownloadSpeedLimit_ < netStat_.calculateDownloadSpeed();
  888. }
  889. bool RequestGroupMan::doesOverallUploadSpeedExceed()
  890. {
  891. return maxOverallUploadSpeedLimit_ > 0 &&
  892. maxOverallUploadSpeedLimit_ < netStat_.calculateUploadSpeed();
  893. }
  894. void RequestGroupMan::getUsedHosts(
  895. std::vector<std::pair<size_t, std::string>>& usedHosts)
  896. {
  897. // vector of tuple which consists of use count, -download speed,
  898. // hostname. We want to sort by least used and faster download
  899. // speed. We use -download speed so that we can sort them using
  900. // operator<().
  901. std::vector<std::tuple<size_t, int, std::string>> tempHosts;
  902. for (const auto& rg : requestGroups_) {
  903. const auto& inFlightReqs =
  904. rg->getDownloadContext()->getFirstFileEntry()->getInFlightRequests();
  905. for (const auto& req : inFlightReqs) {
  906. uri_split_result us;
  907. if (uri_split(&us, req->getUri().c_str()) == 0) {
  908. std::string host =
  909. uri::getFieldString(us, USR_HOST, req->getUri().c_str());
  910. auto k = tempHosts.begin();
  911. auto eok = tempHosts.end();
  912. for (; k != eok; ++k) {
  913. if (std::get<2>(*k) == host) {
  914. ++std::get<0>(*k);
  915. break;
  916. }
  917. }
  918. if (k == eok) {
  919. std::string protocol =
  920. uri::getFieldString(us, USR_SCHEME, req->getUri().c_str());
  921. auto ss = findServerStat(host, protocol);
  922. int invDlSpeed = (ss && ss->isOK())
  923. ? -(static_cast<int>(ss->getDownloadSpeed()))
  924. : 0;
  925. tempHosts.emplace_back(1, invDlSpeed, host);
  926. }
  927. }
  928. }
  929. }
  930. std::sort(tempHosts.begin(), tempHosts.end());
  931. std::transform(tempHosts.begin(), tempHosts.end(),
  932. std::back_inserter(usedHosts),
  933. [](const std::tuple<size_t, int, std::string>& x) {
  934. return std::make_pair(std::get<0>(x), std::get<2>(x));
  935. });
  936. }
  937. void RequestGroupMan::setUriListParser(
  938. const std::shared_ptr<UriListParser>& uriListParser)
  939. {
  940. uriListParser_ = uriListParser;
  941. }
  942. void RequestGroupMan::initWrDiskCache()
  943. {
  944. assert(!wrDiskCache_);
  945. size_t limit = option_->getAsInt(PREF_DISK_CACHE);
  946. if (limit > 0) {
  947. wrDiskCache_ = make_unique<WrDiskCache>(limit);
  948. }
  949. }
  950. void RequestGroupMan::decreaseNumActive()
  951. {
  952. assert(numActive_ > 0);
  953. --numActive_;
  954. }
  955. int RequestGroupMan::optimizeConcurrentDownloads()
  956. {
  957. // gauge the current speed
  958. int currentSpeed = getNetStat().calculateDownloadSpeed();
  959. const auto& now = global::wallclock();
  960. if (currentSpeed >= optimizationSpeed_) {
  961. optimizationSpeed_ = currentSpeed;
  962. optimizationSpeedTimer_ = now;
  963. } else if (std::chrono::duration_cast<std::chrono::seconds>(optimizationSpeedTimer_.difference(now)) >= 5_s) {
  964. // we keep using the reference speed for minimum 5 seconds so reset the timer
  965. optimizationSpeedTimer_ = now;
  966. // keep the reference speed as long as the speed tends to augment or to maintain itself within 10%
  967. if (currentSpeed >= 1.1 * getNetStat().calculateNewestDownloadSpeed(5)) {
  968. // else assume a possible congestion and record a new optimization speed by dichotomy
  969. optimizationSpeed_ = (optimizationSpeed_ + currentSpeed)/2.;
  970. }
  971. }
  972. if (optimizationSpeed_ <= 0) {
  973. return 1;
  974. }
  975. // apply the rule
  976. if ((maxOverallDownloadSpeedLimit_ > 0) && (optimizationSpeed_ > maxOverallDownloadSpeedLimit_)) {
  977. optimizationSpeed_ = maxOverallDownloadSpeedLimit_;
  978. }
  979. int maxConcurrentDownloads = ceil(
  980. optimizeConcurrentDownloadsCoeffA_
  981. + optimizeConcurrentDownloadsCoeffB_ * log10(optimizationSpeed_ * 8. / 1000000.)
  982. );
  983. // bring the value in bound between 1 and the defined maximum
  984. maxConcurrentDownloads = std::min(std::max(1, maxConcurrentDownloads), maxConcurrentDownloads_);
  985. A2_LOG_DEBUG
  986. (fmt("Max concurrent downloads optimized at %d (%lu currently active) "
  987. "[optimization speed %sB/s, current speed %sB/s]",
  988. maxConcurrentDownloads, numActive_, util::abbrevSize(optimizationSpeed_).c_str(),
  989. util::abbrevSize(currentSpeed).c_str()));
  990. return maxConcurrentDownloads;
  991. }
  992. } // namespace aria2