RequestGroupMan.cc 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115
  1. /* <!-- copyright */
  2. /*
  3. * aria2 - The high speed download utility
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * In addition, as a special exception, the copyright holders give
  22. * permission to link the code of portions of this program with the
  23. * OpenSSL library under certain conditions as described in each
  24. * individual source file, and distribute linked combinations
  25. * including the two.
  26. * You must obey the GNU General Public License in all respects
  27. * for all of the code used other than OpenSSL. If you modify
  28. * file(s) with this exception, you may extend this exception to your
  29. * version of the file(s), but you are not obligated to do so. If you
  30. * do not wish to do so, delete this exception statement from your
  31. * version. If you delete this exception statement from all source
  32. * files in the program, then also delete it here.
  33. */
  34. /* copyright --> */
  35. #include "RequestGroupMan.h"
  36. #include <unistd.h>
  37. #include <cstring>
  38. #include <iomanip>
  39. #include <sstream>
  40. #include <numeric>
  41. #include <algorithm>
  42. #include <utility>
  43. #include "BtProgressInfoFile.h"
  44. #include "RecoverableException.h"
  45. #include "RequestGroup.h"
  46. #include "LogFactory.h"
  47. #include "Logger.h"
  48. #include "DownloadEngine.h"
  49. #include "message.h"
  50. #include "a2functional.h"
  51. #include "DownloadResult.h"
  52. #include "DownloadContext.h"
  53. #include "ServerStatMan.h"
  54. #include "ServerStat.h"
  55. #include "SegmentMan.h"
  56. #include "FeedbackURISelector.h"
  57. #include "InorderURISelector.h"
  58. #include "AdaptiveURISelector.h"
  59. #include "Option.h"
  60. #include "prefs.h"
  61. #include "File.h"
  62. #include "util.h"
  63. #include "Command.h"
  64. #include "FileEntry.h"
  65. #include "fmt.h"
  66. #include "FileAllocationEntry.h"
  67. #include "CheckIntegrityEntry.h"
  68. #include "Segment.h"
  69. #include "DlAbortEx.h"
  70. #include "uri.h"
  71. #include "Signature.h"
  72. #include "OutputFile.h"
  73. #include "download_helper.h"
  74. #include "UriListParser.h"
  75. #include "SingletonHolder.h"
  76. #include "Notifier.h"
  77. #include "PeerStat.h"
  78. #include "WrDiskCache.h"
  79. #include "PieceStorage.h"
  80. #include "DiskAdaptor.h"
  81. #include "SimpleRandomizer.h"
  82. #include "array_fun.h"
  83. #include "OpenedFileCounter.h"
  84. #include "wallclock.h"
  85. #include "RpcMethodImpl.h"
  86. #ifdef ENABLE_BITTORRENT
  87. #include "bittorrent_helper.h"
  88. #endif // ENABLE_BITTORRENT
  89. namespace aria2 {
  90. namespace {
  91. template <typename InputIterator>
  92. void appendReservedGroup(RequestGroupList& list, InputIterator first,
  93. InputIterator last)
  94. {
  95. for (; first != last; ++first) {
  96. list.push_back((*first)->getGID(), *first);
  97. }
  98. }
  99. } // namespace
  100. RequestGroupMan::RequestGroupMan(
  101. std::vector<std::shared_ptr<RequestGroup>> requestGroups,
  102. int maxConcurrentDownloads, const Option* option)
  103. : maxConcurrentDownloads_(maxConcurrentDownloads),
  104. optimizeConcurrentDownloads_(false),
  105. optimizeConcurrentDownloadsCoeffA_(5.),
  106. optimizeConcurrentDownloadsCoeffB_(25.),
  107. optimizationSpeed_(0),
  108. numActive_(0),
  109. option_(option),
  110. serverStatMan_(std::make_shared<ServerStatMan>()),
  111. maxOverallDownloadSpeedLimit_(
  112. option->getAsInt(PREF_MAX_OVERALL_DOWNLOAD_LIMIT)),
  113. maxOverallUploadSpeedLimit_(
  114. option->getAsInt(PREF_MAX_OVERALL_UPLOAD_LIMIT)),
  115. keepRunning_(option->getAsBool(PREF_ENABLE_RPC)),
  116. queueCheck_(true),
  117. removedErrorResult_(0),
  118. removedLastErrorResult_(error_code::FINISHED),
  119. maxDownloadResult_(option->getAsInt(PREF_MAX_DOWNLOAD_RESULT)),
  120. openedFileCounter_(std::make_shared<OpenedFileCounter>(
  121. this, option->getAsInt(PREF_BT_MAX_OPEN_FILES))),
  122. numStoppedTotal_(0)
  123. {
  124. setupOptimizeConcurrentDownloads();
  125. appendReservedGroup(reservedGroups_, requestGroups.begin(),
  126. requestGroups.end());
  127. }
  128. RequestGroupMan::~RequestGroupMan() { openedFileCounter_->deactivate(); }
  129. bool RequestGroupMan::setupOptimizeConcurrentDownloads(void)
  130. {
  131. optimizeConcurrentDownloads_ =
  132. option_->getAsBool(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS);
  133. if (optimizeConcurrentDownloads_) {
  134. if (option_->defined(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA)) {
  135. optimizeConcurrentDownloadsCoeffA_ = strtod(
  136. option_->get(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA).c_str(),
  137. nullptr);
  138. optimizeConcurrentDownloadsCoeffB_ = strtod(
  139. option_->get(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFB).c_str(),
  140. nullptr);
  141. }
  142. }
  143. return optimizeConcurrentDownloads_;
  144. }
  145. bool RequestGroupMan::downloadFinished()
  146. {
  147. if (keepRunning_) {
  148. return false;
  149. }
  150. return requestGroups_.empty() && reservedGroups_.empty();
  151. }
  152. void RequestGroupMan::addRequestGroup(
  153. const std::shared_ptr<RequestGroup>& group)
  154. {
  155. ++numActive_;
  156. requestGroups_.push_back(group->getGID(), group);
  157. }
  158. void RequestGroupMan::addReservedGroup(
  159. const std::vector<std::shared_ptr<RequestGroup>>& groups)
  160. {
  161. requestQueueCheck();
  162. appendReservedGroup(reservedGroups_, groups.begin(), groups.end());
  163. }
  164. void RequestGroupMan::addReservedGroup(
  165. const std::shared_ptr<RequestGroup>& group)
  166. {
  167. requestQueueCheck();
  168. reservedGroups_.push_back(group->getGID(), group);
  169. }
  170. namespace {
  171. struct RequestGroupKeyFunc {
  172. a2_gid_t operator()(const std::shared_ptr<RequestGroup>& rg) const
  173. {
  174. return rg->getGID();
  175. }
  176. };
  177. } // namespace
  178. void RequestGroupMan::insertReservedGroup(
  179. size_t pos, const std::vector<std::shared_ptr<RequestGroup>>& groups)
  180. {
  181. requestQueueCheck();
  182. pos = std::min(reservedGroups_.size(), pos);
  183. reservedGroups_.insert(pos, RequestGroupKeyFunc(), groups.begin(),
  184. groups.end());
  185. }
  186. void RequestGroupMan::insertReservedGroup(
  187. size_t pos, const std::shared_ptr<RequestGroup>& group)
  188. {
  189. requestQueueCheck();
  190. pos = std::min(reservedGroups_.size(), pos);
  191. reservedGroups_.insert(pos, group->getGID(), group);
  192. }
  193. size_t RequestGroupMan::countRequestGroup() const
  194. {
  195. return requestGroups_.size();
  196. }
  197. std::shared_ptr<RequestGroup> RequestGroupMan::findGroup(a2_gid_t gid) const
  198. {
  199. std::shared_ptr<RequestGroup> rg = requestGroups_.get(gid);
  200. if (!rg) {
  201. rg = reservedGroups_.get(gid);
  202. }
  203. return rg;
  204. }
  205. size_t RequestGroupMan::changeReservedGroupPosition(a2_gid_t gid, int pos,
  206. OffsetMode how)
  207. {
  208. ssize_t dest = reservedGroups_.move(gid, pos, how);
  209. if (dest == -1) {
  210. throw DL_ABORT_EX(fmt("GID#%s not found in the waiting queue.",
  211. GroupId::toHex(gid).c_str()));
  212. }
  213. else {
  214. return dest;
  215. }
  216. }
  217. bool RequestGroupMan::removeReservedGroup(a2_gid_t gid)
  218. {
  219. return reservedGroups_.remove(gid);
  220. }
  221. namespace {
  222. void notifyDownloadEvent(DownloadEvent event,
  223. const std::shared_ptr<RequestGroup>& group)
  224. {
  225. // Check NULL to make unit test easier.
  226. if (SingletonHolder<Notifier>::instance()) {
  227. SingletonHolder<Notifier>::instance()->notifyDownloadEvent(event, group);
  228. }
  229. }
  230. } // namespace
  231. namespace {
  232. void executeStopHook(const std::shared_ptr<RequestGroup>& group,
  233. const Option* option, error_code::Value result)
  234. {
  235. PrefPtr hookPref = nullptr;
  236. if (!option->blank(PREF_ON_DOWNLOAD_STOP)) {
  237. hookPref = PREF_ON_DOWNLOAD_STOP;
  238. }
  239. if (result == error_code::FINISHED) {
  240. if (!option->blank(PREF_ON_DOWNLOAD_COMPLETE)) {
  241. hookPref = PREF_ON_DOWNLOAD_COMPLETE;
  242. }
  243. }
  244. else if (result != error_code::IN_PROGRESS && result != error_code::REMOVED) {
  245. if (!option->blank(PREF_ON_DOWNLOAD_ERROR)) {
  246. hookPref = PREF_ON_DOWNLOAD_ERROR;
  247. }
  248. }
  249. if (hookPref) {
  250. util::executeHookByOptName(group, option, hookPref);
  251. }
  252. if (result == error_code::FINISHED) {
  253. notifyDownloadEvent(EVENT_ON_DOWNLOAD_COMPLETE, group);
  254. }
  255. else if (result != error_code::IN_PROGRESS && result != error_code::REMOVED) {
  256. notifyDownloadEvent(EVENT_ON_DOWNLOAD_ERROR, group);
  257. }
  258. else {
  259. notifyDownloadEvent(EVENT_ON_DOWNLOAD_STOP, group);
  260. }
  261. }
  262. } // namespace
  263. namespace {
  264. class ProcessStoppedRequestGroup {
  265. private:
  266. DownloadEngine* e_;
  267. RequestGroupList& reservedGroups_;
  268. void saveSignature(const std::shared_ptr<RequestGroup>& group)
  269. {
  270. auto& sig = group->getDownloadContext()->getSignature();
  271. if (sig && !sig->getBody().empty()) {
  272. // filename of signature file is the path to download file followed by
  273. // ".sig".
  274. std::string signatureFile = group->getFirstFilePath() + ".sig";
  275. if (sig->save(signatureFile)) {
  276. A2_LOG_NOTICE(fmt(MSG_SIGNATURE_SAVED, signatureFile.c_str()));
  277. }
  278. else {
  279. A2_LOG_NOTICE(fmt(MSG_SIGNATURE_NOT_SAVED, signatureFile.c_str()));
  280. }
  281. }
  282. }
  283. // Collect statistics during download in PeerStats and update/register
  284. // ServerStatMan
  285. void collectStat(const std::shared_ptr<RequestGroup>& group)
  286. {
  287. if (group->getSegmentMan()) {
  288. bool singleConnection =
  289. group->getSegmentMan()->getPeerStats().size() == 1;
  290. const std::vector<std::shared_ptr<PeerStat>>& peerStats =
  291. group->getSegmentMan()->getFastestPeerStats();
  292. for (auto& stat : peerStats) {
  293. if (stat->getHostname().empty() || stat->getProtocol().empty()) {
  294. continue;
  295. }
  296. int speed = stat->getAvgDownloadSpeed();
  297. if (speed == 0)
  298. continue;
  299. std::shared_ptr<ServerStat> ss =
  300. e_->getRequestGroupMan()->getOrCreateServerStat(
  301. stat->getHostname(), stat->getProtocol());
  302. ss->increaseCounter();
  303. ss->updateDownloadSpeed(speed);
  304. if (singleConnection) {
  305. ss->updateSingleConnectionAvgSpeed(speed);
  306. }
  307. else {
  308. ss->updateMultiConnectionAvgSpeed(speed);
  309. }
  310. }
  311. }
  312. }
  313. public:
  314. ProcessStoppedRequestGroup(DownloadEngine* e,
  315. RequestGroupList& reservedGroups)
  316. : e_(e), reservedGroups_(reservedGroups)
  317. {
  318. }
  319. bool operator()(const RequestGroupList::value_type& group)
  320. {
  321. if (group->getNumCommand() == 0) {
  322. collectStat(group);
  323. const std::shared_ptr<DownloadContext>& dctx =
  324. group->getDownloadContext();
  325. if (!group->isSeedOnlyEnabled()) {
  326. e_->getRequestGroupMan()->decreaseNumActive();
  327. }
  328. // DownloadContext::resetDownloadStopTime() is only called when
  329. // download completed. If
  330. // DownloadContext::getDownloadStopTime().isZero() is true, then
  331. // there is a possibility that the download is error or
  332. // in-progress and resetDownloadStopTime() is not called. So
  333. // call it here.
  334. if (dctx->getDownloadStopTime().isZero()) {
  335. dctx->resetDownloadStopTime();
  336. }
  337. try {
  338. group->closeFile();
  339. if (group->isPauseRequested()) {
  340. if (!group->isRestartRequested()) {
  341. A2_LOG_NOTICE(fmt(_("Download GID#%s paused"),
  342. GroupId::toHex(group->getGID()).c_str()));
  343. }
  344. group->saveControlFile();
  345. }
  346. else if (group->downloadFinished() &&
  347. !group->getDownloadContext()->isChecksumVerificationNeeded()) {
  348. group->applyLastModifiedTimeToLocalFiles();
  349. group->reportDownloadFinished();
  350. if (group->allDownloadFinished() &&
  351. !group->getOption()->getAsBool(PREF_FORCE_SAVE)) {
  352. group->removeControlFile();
  353. saveSignature(group);
  354. }
  355. else {
  356. group->saveControlFile();
  357. }
  358. std::vector<std::shared_ptr<RequestGroup>> nextGroups;
  359. group->postDownloadProcessing(nextGroups);
  360. if (!nextGroups.empty()) {
  361. A2_LOG_DEBUG(fmt("Adding %lu RequestGroups as a result of"
  362. " PostDownloadHandler.",
  363. static_cast<unsigned long>(nextGroups.size())));
  364. e_->getRequestGroupMan()->insertReservedGroup(0, nextGroups);
  365. }
  366. #ifdef ENABLE_BITTORRENT
  367. // For in-memory download (e.g., Magnet URI), the
  368. // FileEntry::getPath() does not return actual file path, so
  369. // we don't remove it.
  370. if (group->getOption()->getAsBool(PREF_BT_REMOVE_UNSELECTED_FILE) &&
  371. !group->inMemoryDownload() && dctx->hasAttribute(CTX_ATTR_BT)) {
  372. A2_LOG_INFO(fmt(MSG_REMOVING_UNSELECTED_FILE,
  373. GroupId::toHex(group->getGID()).c_str()));
  374. const std::vector<std::shared_ptr<FileEntry>>& files =
  375. dctx->getFileEntries();
  376. for (auto& file : files) {
  377. if (!file->isRequested()) {
  378. if (File(file->getPath()).remove()) {
  379. A2_LOG_INFO(fmt(MSG_FILE_REMOVED, file->getPath().c_str()));
  380. }
  381. else {
  382. A2_LOG_INFO(
  383. fmt(MSG_FILE_COULD_NOT_REMOVED, file->getPath().c_str()));
  384. }
  385. }
  386. }
  387. }
  388. #endif // ENABLE_BITTORRENT
  389. }
  390. else {
  391. A2_LOG_NOTICE(
  392. fmt(_("Download GID#%s not complete: %s"),
  393. GroupId::toHex(group->getGID()).c_str(),
  394. group->getDownloadContext()->getBasePath().c_str()));
  395. group->saveControlFile();
  396. }
  397. }
  398. catch (RecoverableException& ex) {
  399. A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, ex);
  400. }
  401. if (group->isPauseRequested()) {
  402. group->setState(RequestGroup::STATE_WAITING);
  403. reservedGroups_.push_front(group->getGID(), group);
  404. group->releaseRuntimeResource(e_);
  405. group->setForceHaltRequested(false);
  406. auto pendingOption = group->getPendingOption();
  407. if (pendingOption) {
  408. changeOption(group, *pendingOption, e_);
  409. }
  410. if (group->isRestartRequested()) {
  411. group->setPauseRequested(false);
  412. }
  413. else {
  414. util::executeHookByOptName(group, e_->getOption(),
  415. PREF_ON_DOWNLOAD_PAUSE);
  416. notifyDownloadEvent(EVENT_ON_DOWNLOAD_PAUSE, group);
  417. }
  418. // TODO Should we have to prepend spend uris to remaining uris
  419. // in case PREF_REUSE_URI is disabled?
  420. }
  421. else {
  422. std::shared_ptr<DownloadResult> dr = group->createDownloadResult();
  423. e_->getRequestGroupMan()->addDownloadResult(dr);
  424. executeStopHook(group, e_->getOption(), dr->result);
  425. group->releaseRuntimeResource(e_);
  426. }
  427. group->setRestartRequested(false);
  428. group->setPendingOption(nullptr);
  429. return true;
  430. }
  431. else {
  432. return false;
  433. }
  434. }
  435. };
  436. } // namespace
  437. void RequestGroupMan::removeStoppedGroup(DownloadEngine* e)
  438. {
  439. size_t numPrev = requestGroups_.size();
  440. requestGroups_.remove_if(ProcessStoppedRequestGroup(e, reservedGroups_));
  441. size_t numRemoved = numPrev - requestGroups_.size();
  442. if (numRemoved > 0) {
  443. A2_LOG_DEBUG(fmt("%lu RequestGroup(s) deleted.",
  444. static_cast<unsigned long>(numRemoved)));
  445. }
  446. }
  447. void RequestGroupMan::configureRequestGroup(
  448. const std::shared_ptr<RequestGroup>& requestGroup) const
  449. {
  450. const std::string& uriSelectorValue =
  451. requestGroup->getOption()->get(PREF_URI_SELECTOR);
  452. if (uriSelectorValue == V_FEEDBACK) {
  453. requestGroup->setURISelector(
  454. make_unique<FeedbackURISelector>(serverStatMan_));
  455. }
  456. else if (uriSelectorValue == V_INORDER) {
  457. requestGroup->setURISelector(make_unique<InorderURISelector>());
  458. }
  459. else if (uriSelectorValue == V_ADAPTIVE) {
  460. requestGroup->setURISelector(
  461. make_unique<AdaptiveURISelector>(serverStatMan_, requestGroup.get()));
  462. }
  463. }
  464. namespace {
  465. std::vector<std::unique_ptr<Command>>
  466. createInitialCommand(const std::shared_ptr<RequestGroup>& requestGroup,
  467. DownloadEngine* e)
  468. {
  469. std::vector<std::unique_ptr<Command>> res;
  470. requestGroup->createInitialCommand(res, e);
  471. return res;
  472. }
  473. } // namespace
  474. void RequestGroupMan::fillRequestGroupFromReserver(DownloadEngine* e)
  475. {
  476. removeStoppedGroup(e);
  477. int maxConcurrentDownloads = optimizeConcurrentDownloads_
  478. ? optimizeConcurrentDownloads()
  479. : maxConcurrentDownloads_;
  480. if (static_cast<size_t>(maxConcurrentDownloads) <= numActive_) {
  481. return;
  482. }
  483. int count = 0;
  484. int num = maxConcurrentDownloads - numActive_;
  485. std::vector<std::shared_ptr<RequestGroup>> pending;
  486. while (count < num && (uriListParser_ || !reservedGroups_.empty())) {
  487. if (uriListParser_ && reservedGroups_.empty()) {
  488. std::vector<std::shared_ptr<RequestGroup>> groups;
  489. // May throw exception
  490. bool ok = createRequestGroupFromUriListParser(groups, option_,
  491. uriListParser_.get());
  492. if (ok) {
  493. appendReservedGroup(reservedGroups_, groups.begin(), groups.end());
  494. }
  495. else {
  496. uriListParser_.reset();
  497. if (reservedGroups_.empty()) {
  498. break;
  499. }
  500. }
  501. }
  502. std::shared_ptr<RequestGroup> groupToAdd = *reservedGroups_.begin();
  503. reservedGroups_.pop_front();
  504. if ((keepRunning_ && groupToAdd->isPauseRequested()) ||
  505. !groupToAdd->isDependencyResolved()) {
  506. pending.push_back(groupToAdd);
  507. continue;
  508. }
  509. // Drop pieceStorage here because paused download holds its
  510. // reference.
  511. groupToAdd->dropPieceStorage();
  512. configureRequestGroup(groupToAdd);
  513. groupToAdd->setRequestGroupMan(this);
  514. groupToAdd->setState(RequestGroup::STATE_ACTIVE);
  515. ++numActive_;
  516. requestGroups_.push_back(groupToAdd->getGID(), groupToAdd);
  517. try {
  518. auto res = createInitialCommand(groupToAdd, e);
  519. ++count;
  520. if (res.empty()) {
  521. requestQueueCheck();
  522. }
  523. else {
  524. e->addCommand(std::move(res));
  525. }
  526. }
  527. catch (RecoverableException& ex) {
  528. A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, ex);
  529. A2_LOG_DEBUG("Deleting temporal commands.");
  530. groupToAdd->setLastErrorCode(ex.getErrorCode(), ex.what());
  531. // We add groupToAdd to e later in order to it is processed in
  532. // removeStoppedGroup().
  533. requestQueueCheck();
  534. }
  535. util::executeHookByOptName(groupToAdd, e->getOption(),
  536. PREF_ON_DOWNLOAD_START);
  537. notifyDownloadEvent(EVENT_ON_DOWNLOAD_START, groupToAdd);
  538. }
  539. if (!pending.empty()) {
  540. reservedGroups_.insert(reservedGroups_.begin(), RequestGroupKeyFunc(),
  541. pending.begin(), pending.end());
  542. }
  543. if (count > 0) {
  544. e->setNoWait(true);
  545. e->setRefreshInterval(std::chrono::milliseconds(0));
  546. A2_LOG_DEBUG(fmt("%d RequestGroup(s) added.", count));
  547. }
  548. }
  549. void RequestGroupMan::save()
  550. {
  551. for (auto& rg : requestGroups_) {
  552. if (rg->allDownloadFinished() &&
  553. !rg->getDownloadContext()->isChecksumVerificationNeeded() &&
  554. !rg->getOption()->getAsBool(PREF_FORCE_SAVE)) {
  555. rg->removeControlFile();
  556. }
  557. else {
  558. try {
  559. rg->saveControlFile();
  560. }
  561. catch (RecoverableException& e) {
  562. A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, e);
  563. }
  564. }
  565. }
  566. }
  567. void RequestGroupMan::closeFile()
  568. {
  569. for (auto& elem : requestGroups_) {
  570. elem->closeFile();
  571. }
  572. }
  573. RequestGroupMan::DownloadStat RequestGroupMan::getDownloadStat() const
  574. {
  575. int finished = 0;
  576. int error = removedErrorResult_;
  577. int inprogress = 0;
  578. int removed = 0;
  579. error_code::Value lastError = removedLastErrorResult_;
  580. for (auto& dr : downloadResults_) {
  581. if (dr->belongsTo != 0) {
  582. continue;
  583. }
  584. if (dr->result == error_code::FINISHED) {
  585. ++finished;
  586. }
  587. else if (dr->result == error_code::IN_PROGRESS) {
  588. ++inprogress;
  589. }
  590. else if (dr->result == error_code::REMOVED) {
  591. ++removed;
  592. }
  593. else {
  594. ++error;
  595. lastError = dr->result;
  596. }
  597. }
  598. return DownloadStat(error, inprogress, reservedGroups_.size(), lastError);
  599. }
  600. enum DownloadResultStatus {
  601. A2_STATUS_OK,
  602. A2_STATUS_INPR,
  603. A2_STATUS_RM,
  604. A2_STATUS_ERR
  605. };
  606. namespace {
  607. const char* getStatusStr(DownloadResultStatus status, bool useColor)
  608. {
  609. // status string is formatted in 4 characters wide.
  610. switch (status) {
  611. case (A2_STATUS_OK):
  612. if (useColor) {
  613. return "\033[1;32mOK\033[0m ";
  614. }
  615. else {
  616. return "OK ";
  617. }
  618. case (A2_STATUS_INPR):
  619. if (useColor) {
  620. return "\033[1;34mINPR\033[0m";
  621. }
  622. else {
  623. return "INPR";
  624. }
  625. case (A2_STATUS_RM):
  626. if (useColor) {
  627. return "\033[1mRM\033[0m ";
  628. }
  629. else {
  630. return "RM ";
  631. }
  632. case (A2_STATUS_ERR):
  633. if (useColor) {
  634. return "\033[1;31mERR\033[0m ";
  635. }
  636. else {
  637. return "ERR ";
  638. }
  639. default:
  640. return "";
  641. }
  642. }
  643. } // namespace
  644. void RequestGroupMan::showDownloadResults(OutputFile& o, bool full) const
  645. {
  646. int pathRowSize = 55;
  647. // Download Results:
  648. // idx|stat|path/length
  649. // ===+====+=======================================================================
  650. o.printf("\n%s"
  651. "\ngid |stat|avg speed |",
  652. _("Download Results:"));
  653. if (full) {
  654. o.write(" %|path/URI"
  655. "\n======+====+===========+===+");
  656. pathRowSize -= 4;
  657. }
  658. else {
  659. o.write("path/URI"
  660. "\n======+====+===========+");
  661. }
  662. std::string line(pathRowSize, '=');
  663. o.printf("%s\n", line.c_str());
  664. bool useColor = o.supportsColor() && option_->getAsBool(PREF_ENABLE_COLOR);
  665. int ok = 0;
  666. int err = 0;
  667. int inpr = 0;
  668. int rm = 0;
  669. for (auto& dr : downloadResults_) {
  670. if (dr->belongsTo != 0) {
  671. continue;
  672. }
  673. const char* status;
  674. switch (dr->result) {
  675. case error_code::FINISHED:
  676. status = getStatusStr(A2_STATUS_OK, useColor);
  677. ++ok;
  678. break;
  679. case error_code::IN_PROGRESS:
  680. status = getStatusStr(A2_STATUS_INPR, useColor);
  681. ++inpr;
  682. break;
  683. case error_code::REMOVED:
  684. status = getStatusStr(A2_STATUS_RM, useColor);
  685. ++rm;
  686. break;
  687. default:
  688. status = getStatusStr(A2_STATUS_ERR, useColor);
  689. ++err;
  690. }
  691. if (full) {
  692. formatDownloadResultFull(o, status, dr);
  693. }
  694. else {
  695. o.write(formatDownloadResult(status, dr).c_str());
  696. o.write("\n");
  697. }
  698. }
  699. if (ok > 0 || err > 0 || inpr > 0 || rm > 0) {
  700. o.printf("\n%s\n", _("Status Legend:"));
  701. if (ok > 0) {
  702. o.write(_("(OK):download completed."));
  703. }
  704. if (err > 0) {
  705. o.write(_("(ERR):error occurred."));
  706. }
  707. if (inpr > 0) {
  708. o.write(_("(INPR):download in-progress."));
  709. }
  710. if (rm > 0) {
  711. o.write(_("(RM):download removed."));
  712. }
  713. o.write("\n");
  714. }
  715. }
  716. namespace {
  717. void formatDownloadResultCommon(
  718. std::ostream& o, const char* status,
  719. const std::shared_ptr<DownloadResult>& downloadResult)
  720. {
  721. o << std::setw(3) << downloadResult->gid->toAbbrevHex() << "|" << std::setw(4)
  722. << status << "|";
  723. if (downloadResult->sessionTime.count() > 0) {
  724. o << std::setw(8)
  725. << util::abbrevSize(downloadResult->sessionDownloadLength * 1000 /
  726. downloadResult->sessionTime.count())
  727. << "B/s";
  728. }
  729. else {
  730. o << std::setw(11);
  731. o << "n/a";
  732. }
  733. o << "|";
  734. }
  735. } // namespace
  736. void RequestGroupMan::formatDownloadResultFull(
  737. OutputFile& out, const char* status,
  738. const std::shared_ptr<DownloadResult>& downloadResult) const
  739. {
  740. BitfieldMan bt(downloadResult->pieceLength, downloadResult->totalLength);
  741. bt.setBitfield(
  742. reinterpret_cast<const unsigned char*>(downloadResult->bitfield.data()),
  743. downloadResult->bitfield.size());
  744. bool head = true;
  745. const std::vector<std::shared_ptr<FileEntry>>& fileEntries =
  746. downloadResult->fileEntries;
  747. for (auto& f : fileEntries) {
  748. if (!f->isRequested()) {
  749. continue;
  750. }
  751. std::stringstream o;
  752. if (head) {
  753. formatDownloadResultCommon(o, status, downloadResult);
  754. head = false;
  755. }
  756. else {
  757. o << " | | |";
  758. }
  759. if (f->getLength() == 0 || downloadResult->bitfield.empty()) {
  760. o << " -|";
  761. }
  762. else {
  763. int64_t completedLength =
  764. bt.getOffsetCompletedLength(f->getOffset(), f->getLength());
  765. o << std::setw(3) << 100 * completedLength / f->getLength() << "|";
  766. }
  767. writeFilePath(o, f, downloadResult->inMemoryDownload);
  768. o << "\n";
  769. out.write(o.str().c_str());
  770. }
  771. if (head) {
  772. std::stringstream o;
  773. formatDownloadResultCommon(o, status, downloadResult);
  774. o << " -|n/a\n";
  775. out.write(o.str().c_str());
  776. }
  777. }
  778. std::string RequestGroupMan::formatDownloadResult(
  779. const char* status,
  780. const std::shared_ptr<DownloadResult>& downloadResult) const
  781. {
  782. std::stringstream o;
  783. formatDownloadResultCommon(o, status, downloadResult);
  784. const std::vector<std::shared_ptr<FileEntry>>& fileEntries =
  785. downloadResult->fileEntries;
  786. writeFilePath(fileEntries.begin(), fileEntries.end(), o,
  787. downloadResult->inMemoryDownload);
  788. return o.str();
  789. }
  790. namespace {
  791. template <typename StringInputIterator, typename FileEntryInputIterator>
  792. bool sameFilePathExists(StringInputIterator sfirst, StringInputIterator slast,
  793. FileEntryInputIterator ffirst,
  794. FileEntryInputIterator flast)
  795. {
  796. for (; ffirst != flast; ++ffirst) {
  797. if (std::binary_search(sfirst, slast, (*ffirst)->getPath())) {
  798. return true;
  799. }
  800. }
  801. return false;
  802. }
  803. } // namespace
  804. bool RequestGroupMan::isSameFileBeingDownloaded(
  805. RequestGroup* requestGroup) const
  806. {
  807. // TODO it may be good to use dedicated method rather than use
  808. // isPreLocalFileCheckEnabled
  809. if (!requestGroup->isPreLocalFileCheckEnabled()) {
  810. return false;
  811. }
  812. std::vector<std::string> files;
  813. for (auto& rg : requestGroups_) {
  814. if (rg.get() != requestGroup) {
  815. const std::vector<std::shared_ptr<FileEntry>>& entries =
  816. rg->getDownloadContext()->getFileEntries();
  817. std::transform(entries.begin(), entries.end(), std::back_inserter(files),
  818. std::mem_fn(&FileEntry::getPath));
  819. }
  820. }
  821. std::sort(files.begin(), files.end());
  822. const std::vector<std::shared_ptr<FileEntry>>& entries =
  823. requestGroup->getDownloadContext()->getFileEntries();
  824. return sameFilePathExists(files.begin(), files.end(), entries.begin(),
  825. entries.end());
  826. }
  827. void RequestGroupMan::halt()
  828. {
  829. for (auto& elem : requestGroups_) {
  830. elem->setHaltRequested(true);
  831. }
  832. }
  833. void RequestGroupMan::forceHalt()
  834. {
  835. for (auto& elem : requestGroups_) {
  836. elem->setForceHaltRequested(true);
  837. }
  838. }
  839. TransferStat RequestGroupMan::calculateStat()
  840. {
  841. // TODO Currently, all time upload length is not set.
  842. return netStat_.toTransferStat();
  843. }
  844. std::shared_ptr<DownloadResult>
  845. RequestGroupMan::findDownloadResult(a2_gid_t gid) const
  846. {
  847. return downloadResults_.get(gid);
  848. }
  849. bool RequestGroupMan::removeDownloadResult(a2_gid_t gid)
  850. {
  851. return downloadResults_.remove(gid);
  852. }
  853. void RequestGroupMan::addDownloadResult(
  854. const std::shared_ptr<DownloadResult>& dr)
  855. {
  856. ++numStoppedTotal_;
  857. bool rv = downloadResults_.push_back(dr->gid->getNumericId(), dr);
  858. assert(rv);
  859. while (downloadResults_.size() > maxDownloadResult_) {
  860. // Save last encountered error code so that we can report it
  861. // later.
  862. const auto& dr = downloadResults_[0];
  863. if (dr->belongsTo == 0 && dr->result != error_code::FINISHED) {
  864. removedLastErrorResult_ = dr->result;
  865. ++removedErrorResult_;
  866. // Keep unfinished download result, so that we can save them by
  867. // SessionSerializer.
  868. if (option_->getAsBool(PREF_KEEP_UNFINISHED_DOWNLOAD_RESULT)) {
  869. if (dr->result != error_code::REMOVED ||
  870. dr->option->getAsBool(PREF_FORCE_SAVE)) {
  871. unfinishedDownloadResults_.push_back(dr);
  872. }
  873. }
  874. }
  875. downloadResults_.pop_front();
  876. }
  877. }
  878. void RequestGroupMan::purgeDownloadResult() { downloadResults_.clear(); }
  879. std::shared_ptr<ServerStat>
  880. RequestGroupMan::findServerStat(const std::string& hostname,
  881. const std::string& protocol) const
  882. {
  883. return serverStatMan_->find(hostname, protocol);
  884. }
  885. std::shared_ptr<ServerStat>
  886. RequestGroupMan::getOrCreateServerStat(const std::string& hostname,
  887. const std::string& protocol)
  888. {
  889. std::shared_ptr<ServerStat> ss = findServerStat(hostname, protocol);
  890. if (!ss) {
  891. ss = std::make_shared<ServerStat>(hostname, protocol);
  892. addServerStat(ss);
  893. }
  894. return ss;
  895. }
  896. bool RequestGroupMan::addServerStat(
  897. const std::shared_ptr<ServerStat>& serverStat)
  898. {
  899. return serverStatMan_->add(serverStat);
  900. }
  901. bool RequestGroupMan::loadServerStat(const std::string& filename)
  902. {
  903. return serverStatMan_->load(filename);
  904. }
  905. bool RequestGroupMan::saveServerStat(const std::string& filename) const
  906. {
  907. return serverStatMan_->save(filename);
  908. }
  909. void RequestGroupMan::removeStaleServerStat(const std::chrono::seconds& timeout)
  910. {
  911. serverStatMan_->removeStaleServerStat(timeout);
  912. }
  913. bool RequestGroupMan::doesOverallDownloadSpeedExceed()
  914. {
  915. return maxOverallDownloadSpeedLimit_ > 0 &&
  916. maxOverallDownloadSpeedLimit_ < netStat_.calculateDownloadSpeed();
  917. }
  918. bool RequestGroupMan::doesOverallUploadSpeedExceed()
  919. {
  920. return maxOverallUploadSpeedLimit_ > 0 &&
  921. maxOverallUploadSpeedLimit_ < netStat_.calculateUploadSpeed();
  922. }
  923. void RequestGroupMan::getUsedHosts(
  924. std::vector<std::pair<size_t, std::string>>& usedHosts)
  925. {
  926. // vector of tuple which consists of use count, -download speed,
  927. // hostname. We want to sort by least used and faster download
  928. // speed. We use -download speed so that we can sort them using
  929. // operator<().
  930. std::vector<std::tuple<size_t, int, std::string>> tempHosts;
  931. for (const auto& rg : requestGroups_) {
  932. const auto& inFlightReqs =
  933. rg->getDownloadContext()->getFirstFileEntry()->getInFlightRequests();
  934. for (const auto& req : inFlightReqs) {
  935. uri_split_result us;
  936. if (uri_split(&us, req->getUri().c_str()) == 0) {
  937. std::string host =
  938. uri::getFieldString(us, USR_HOST, req->getUri().c_str());
  939. auto k = tempHosts.begin();
  940. auto eok = tempHosts.end();
  941. for (; k != eok; ++k) {
  942. if (std::get<2>(*k) == host) {
  943. ++std::get<0>(*k);
  944. break;
  945. }
  946. }
  947. if (k == eok) {
  948. std::string protocol =
  949. uri::getFieldString(us, USR_SCHEME, req->getUri().c_str());
  950. auto ss = findServerStat(host, protocol);
  951. int invDlSpeed = (ss && ss->isOK())
  952. ? -(static_cast<int>(ss->getDownloadSpeed()))
  953. : 0;
  954. tempHosts.emplace_back(1, invDlSpeed, host);
  955. }
  956. }
  957. }
  958. }
  959. std::sort(tempHosts.begin(), tempHosts.end());
  960. std::transform(tempHosts.begin(), tempHosts.end(),
  961. std::back_inserter(usedHosts),
  962. [](const std::tuple<size_t, int, std::string>& x) {
  963. return std::make_pair(std::get<0>(x), std::get<2>(x));
  964. });
  965. }
  966. void RequestGroupMan::setUriListParser(
  967. const std::shared_ptr<UriListParser>& uriListParser)
  968. {
  969. uriListParser_ = uriListParser;
  970. }
  971. void RequestGroupMan::initWrDiskCache()
  972. {
  973. assert(!wrDiskCache_);
  974. size_t limit = option_->getAsInt(PREF_DISK_CACHE);
  975. if (limit > 0) {
  976. wrDiskCache_ = make_unique<WrDiskCache>(limit);
  977. }
  978. }
  979. void RequestGroupMan::decreaseNumActive()
  980. {
  981. assert(numActive_ > 0);
  982. --numActive_;
  983. }
  984. int RequestGroupMan::optimizeConcurrentDownloads()
  985. {
  986. // gauge the current speed
  987. int currentSpeed = getNetStat().calculateDownloadSpeed();
  988. const auto& now = global::wallclock();
  989. if (currentSpeed >= optimizationSpeed_) {
  990. optimizationSpeed_ = currentSpeed;
  991. optimizationSpeedTimer_ = now;
  992. }
  993. else if (std::chrono::duration_cast<std::chrono::seconds>(
  994. optimizationSpeedTimer_.difference(now)) >= 5_s) {
  995. // we keep using the reference speed for minimum 5 seconds so reset the
  996. // timer
  997. optimizationSpeedTimer_ = now;
  998. // keep the reference speed as long as the speed tends to augment or to
  999. // maintain itself within 10%
  1000. if (currentSpeed >= 1.1 * getNetStat().calculateNewestDownloadSpeed(5)) {
  1001. // else assume a possible congestion and record a new optimization speed
  1002. // by dichotomy
  1003. optimizationSpeed_ = (optimizationSpeed_ + currentSpeed) / 2.;
  1004. }
  1005. }
  1006. if (optimizationSpeed_ <= 0) {
  1007. return optimizeConcurrentDownloadsCoeffA_;
  1008. }
  1009. // apply the rule
  1010. if ((maxOverallDownloadSpeedLimit_ > 0) &&
  1011. (optimizationSpeed_ > maxOverallDownloadSpeedLimit_)) {
  1012. optimizationSpeed_ = maxOverallDownloadSpeedLimit_;
  1013. }
  1014. int maxConcurrentDownloads =
  1015. ceil(optimizeConcurrentDownloadsCoeffA_ +
  1016. optimizeConcurrentDownloadsCoeffB_ *
  1017. log10(optimizationSpeed_ * 8. / 1000000.));
  1018. // bring the value in bound between 1 and the defined maximum
  1019. maxConcurrentDownloads =
  1020. std::min(std::max(1, maxConcurrentDownloads), maxConcurrentDownloads_);
  1021. A2_LOG_DEBUG(
  1022. fmt("Max concurrent downloads optimized at %d (%lu currently active) "
  1023. "[optimization speed %sB/s, current speed %sB/s]",
  1024. maxConcurrentDownloads, static_cast<unsigned long>(numActive_),
  1025. util::abbrevSize(optimizationSpeed_).c_str(),
  1026. util::abbrevSize(currentSpeed).c_str()));
  1027. return maxConcurrentDownloads;
  1028. }
  1029. } // namespace aria2