Przeglądaj źródła

Added support for a dynamic management of the number of concurrent downloads as a function of the overall bandwidth observed

oliviercommelarbre 9 lat temu
rodzic
commit
9e05371fb6

+ 12 - 0
src/FillRequestGroupCommand.cc

@@ -42,6 +42,7 @@
 #include "LogFactory.h"
 #include "DownloadContext.h"
 #include "fmt.h"
+#include "wallclock.h"
 
 namespace aria2 {
 
@@ -80,6 +81,17 @@ bool FillRequestGroupCommand::execute()
     }
   }
   e_->addRoutineCommand(std::unique_ptr<Command>(this));
+
+  // let's make sure we come back here every second or so
+  // if we use the optimize-concurrent-download option
+  if (rgman->getOptimizeConcurrentDownloads()) {
+    const auto& now = global::wallclock();
+    if (std::chrono::duration_cast<std::chrono::seconds>(lastExecTime.difference(now)) >= 1_s) {
+       lastExecTime = now;
+       rgman->requestQueueCheck();
+    }
+  }
+
   return false;
 }
 

+ 2 - 0
src/FillRequestGroupCommand.h

@@ -37,6 +37,7 @@
 
 #include "Command.h"
 #include "a2time.h"
+#include "TimerA2.h"
 
 namespace aria2 {
 
@@ -46,6 +47,7 @@ class DownloadEngine;
 class FillRequestGroupCommand : public Command {
 private:
   DownloadEngine* e_;
+  Timer lastExecTime;
 
 public:
   FillRequestGroupCommand(cuid_t cuid, DownloadEngine* e);

+ 10 - 0
src/NetStat.cc

@@ -56,6 +56,11 @@ int NetStat::calculateDownloadSpeed()
   return downloadSpeed_.calculateSpeed();
 }
 
+int NetStat::calculateNewestDownloadSpeed(int seconds)
+{
+  return downloadSpeed_.calculateNewestSpeed(seconds);
+}
+
 int NetStat::calculateAvgDownloadSpeed()
 {
   return avgDownloadSpeed_ = downloadSpeed_.calculateAvgSpeed();
@@ -63,6 +68,11 @@ int NetStat::calculateAvgDownloadSpeed()
 
 int NetStat::calculateUploadSpeed() { return uploadSpeed_.calculateSpeed(); }
 
+int NetStat::calculateNewestUploadSpeed(int seconds)
+{
+  return uploadSpeed_.calculateNewestSpeed(seconds); 
+}
+
 int NetStat::calculateAvgUploadSpeed()
 {
   return avgUploadSpeed_ = uploadSpeed_.calculateAvgSpeed();

+ 4 - 0
src/NetStat.h

@@ -61,10 +61,14 @@ public:
    */
   int calculateDownloadSpeed();
 
+  int calculateNewestDownloadSpeed(int seconds);
+
   int calculateAvgDownloadSpeed();
 
   int calculateUploadSpeed();
 
+  int calculateNewestUploadSpeed(int seconds);
+
   int calculateAvgUploadSpeed();
 
   void updateDownload(size_t bytes);

+ 10 - 0
src/OptionHandlerFactory.cc

@@ -416,6 +416,16 @@ std::vector<OptionHandler*> OptionHandlerFactory::createOptionHandlers()
     op->setChangeGlobalOption(true);
     handlers.push_back(op);
   }
+  {
+    OptionHandler* op(new OptimizeConcurrentDownloadsOptionHandler
+                      (PREF_OPTIMIZE_CONCURRENT_DOWNLOADS,
+                       TEXT_OPTIMIZE_CONCURRENT_DOWNLOADS,
+                       A2_V_FALSE,
+                       OptionHandler::OPT_ARG));
+    op->addTag(TAG_BASIC);
+    op->setChangeGlobalOption(true);
+    handlers.push_back(op);
+  }
   {
     OptionHandler* op(new NumberOptionHandler(PREF_MAX_CONNECTION_PER_SERVER,
                                               TEXT_MAX_CONNECTION_PER_SERVER,

+ 55 - 0
src/OptionHandlerImpl.cc

@@ -586,6 +586,61 @@ std::string PrioritizePieceOptionHandler::createPossibleValuesString() const
   return "head[=SIZE], tail[=SIZE]";
 }
 
+OptimizeConcurrentDownloadsOptionHandler::OptimizeConcurrentDownloadsOptionHandler(
+    PrefPtr pref, const char* description, const std::string& defaultValue, char shortName)
+    : AbstractOptionHandler(pref, description, defaultValue,
+                            OptionHandler::OPT_ARG, shortName)
+{
+}
+
+void OptimizeConcurrentDownloadsOptionHandler::parseArg(Option& option,
+                                     const std::string& optarg) const
+{
+  if (optarg == "true" || optarg.empty()) {
+    option.put(pref_, A2_V_TRUE);
+  }
+  else if (optarg == "false") {
+    option.put(pref_, A2_V_FALSE);
+  }
+  else {
+    auto p = util::divide(std::begin(optarg), std::end(optarg), ':');
+
+    std::string coeff_b(p.second.first,p.second.second);
+    if(coeff_b.empty()) {
+      std::string msg = pref_->k;
+      msg += " ";
+      msg += _("must be either 'true', 'false' or a pair numeric coefficients A and B under the form 'A:B'.");
+      throw DL_ABORT_EX(msg);
+    }
+
+    std::string coeff_a(p.first.first,p.first.second);
+
+
+    PrefPtr pref=PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA;
+    std::string *sptr = &coeff_a;
+    for(;;) {
+      try {
+        double dbl = std::stod(*sptr);
+      } catch(std::invalid_argument & ex) {
+        throw DL_ABORT_EX(fmt("Bad number '%s'", sptr->c_str()));
+      }
+      option.put(pref,*sptr);
+
+      if(pref == PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFB) {
+        break;
+      }
+      pref = PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFB;
+      sptr = &coeff_b;
+    }
+    option.put(pref_, A2_V_TRUE);
+  }
+}
+
+std::string OptimizeConcurrentDownloadsOptionHandler::createPossibleValuesString() const
+{
+  return "true, false, A:B";
+}
+
 DeprecatedOptionHandler::DeprecatedOptionHandler(
     OptionHandler* depOptHandler, const OptionHandler* repOptHandler,
     bool stillWork, std::string additionalMessage)

+ 11 - 0
src/OptionHandlerImpl.h

@@ -254,6 +254,17 @@ public:
   virtual std::string createPossibleValuesString() const CXX11_OVERRIDE;
 };
 
+class OptimizeConcurrentDownloadsOptionHandler : public AbstractOptionHandler {
+public:
+  OptimizeConcurrentDownloadsOptionHandler(
+      PrefPtr pref, const char* description = NO_DESCRIPTION,
+      const std::string& defaultValue = NO_DEFAULT_VALUE, char shortName = 0);
+  virtual void parseArg(Option& option,
+                        const std::string& optarg) const CXX11_OVERRIDE;
+  virtual std::string createPossibleValuesString() const CXX11_OVERRIDE;
+};
+
+
 // This class is used to deprecate option and optionally handle its
 // option value using replacing option.
 class DeprecatedOptionHandler : public OptionHandler {

+ 70 - 4
src/RequestGroupMan.cc

@@ -83,6 +83,7 @@
 #include "SimpleRandomizer.h"
 #include "array_fun.h"
 #include "OpenedFileCounter.h"
+#include "wallclock.h"
 #ifdef ENABLE_BITTORRENT
 #include "bittorrent_helper.h"
 #endif // ENABLE_BITTORRENT
@@ -102,8 +103,12 @@ void appendReservedGroup(RequestGroupList& list, InputIterator first,
 
 RequestGroupMan::RequestGroupMan(
     std::vector<std::shared_ptr<RequestGroup>> requestGroups,
-    int maxSimultaneousDownloads, const Option* option)
-    : maxSimultaneousDownloads_(maxSimultaneousDownloads),
+    int maxConcurrentDownloads, const Option* option)
+    : maxConcurrentDownloads_(maxConcurrentDownloads),
+      optimizeConcurrentDownloads_(false),
+      optimizeConcurrentDownloadsCoeffA_(5.),
+      optimizeConcurrentDownloadsCoeffB_(25.),
+      optimizationSpeed_(0),
       numActive_(0),
       option_(option),
       serverStatMan_(std::make_shared<ServerStatMan>()),
@@ -120,12 +125,25 @@ RequestGroupMan::RequestGroupMan(
           this, option->getAsInt(PREF_BT_MAX_OPEN_FILES))),
       numStoppedTotal_(0)
 {
+  setupOptimizeConcurrentDownloads();
   appendReservedGroup(reservedGroups_, requestGroups.begin(),
                       requestGroups.end());
 }
 
 RequestGroupMan::~RequestGroupMan() { openedFileCounter_->deactivate(); }
 
+bool RequestGroupMan::setupOptimizeConcurrentDownloads(void)
+{
+    optimizeConcurrentDownloads_ = option_->getAsBool(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS);
+    if (optimizeConcurrentDownloads_) {
+      if (option_->defined(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA)) { 
+        optimizeConcurrentDownloadsCoeffA_ = std::stod(option_->get(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA));
+        optimizeConcurrentDownloadsCoeffB_ = std::stod(option_->get(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFB));
+      }
+    }
+    return optimizeConcurrentDownloads_;
+}
+
 bool RequestGroupMan::downloadFinished()
 {
   if (keepRunning_) {
@@ -474,11 +492,14 @@ createInitialCommand(const std::shared_ptr<RequestGroup>& requestGroup,
 void RequestGroupMan::fillRequestGroupFromReserver(DownloadEngine* e)
 {
   removeStoppedGroup(e);
-  if (static_cast<size_t>(maxSimultaneousDownloads_) <= numActive_) {
+
+  int maxConcurrentDownloads = optimizeConcurrentDownloads_ ? optimizeConcurrentDownloads() : maxConcurrentDownloads_;
+
+  if (static_cast<size_t>(maxConcurrentDownloads) <= numActive_) {
     return;
   }
   int count = 0;
-  int num = maxSimultaneousDownloads_ - numActive_;
+  int num = maxConcurrentDownloads - numActive_;
   std::vector<std::shared_ptr<RequestGroup>> pending;
 
   while (count < num && (uriListParser_ || !reservedGroups_.empty())) {
@@ -1005,4 +1026,49 @@ void RequestGroupMan::decreaseNumActive()
   --numActive_;
 }
 
+
+int RequestGroupMan::optimizeConcurrentDownloads()
+{
+  // gauge the current speed
+  int currentSpeed = getNetStat().calculateDownloadSpeed();
+
+  const auto& now = global::wallclock();
+  if (currentSpeed >= optimizationSpeed_) {
+    optimizationSpeed_ = currentSpeed;
+    optimizationSpeedTimer_ = now;
+  } else if (std::chrono::duration_cast<std::chrono::seconds>(optimizationSpeedTimer_.difference(now)) >= 5_s) {
+    // we keep using the reference speed for minimum 5 seconds so reset the timer
+    optimizationSpeedTimer_ = now;
+
+    // keep the reference speed as long as the speed tends to augment or to maintain itself within 10%
+    if (currentSpeed >= 1.1 * getNetStat().calculateNewestDownloadSpeed(5)) {
+      // else assume a possible congestion and record a new optimization speed by dichotomy
+      optimizationSpeed_ = (optimizationSpeed_ + currentSpeed)/2.;
+    }
+  }
+
+  if (optimizationSpeed_ <= 0) {
+    return 1;
+  }
+
+  // apply the rule
+  if ((maxOverallDownloadSpeedLimit_ > 0) && (optimizationSpeed_ > maxOverallDownloadSpeedLimit_)) {
+    optimizationSpeed_ = maxOverallDownloadSpeedLimit_;
+  }
+  int maxConcurrentDownloads = ceil(
+    optimizeConcurrentDownloadsCoeffA_
+    + optimizeConcurrentDownloadsCoeffB_ * log10(optimizationSpeed_ * 8. / 1000000.)
+  );
+
+  // bring the value in bound between 1 and the defined maximum
+  maxConcurrentDownloads = std::min(std::max(1, maxConcurrentDownloads), maxConcurrentDownloads_);
+  
+  A2_LOG_DEBUG
+    (fmt("Max concurrent downloads optimized at %d (%lu currently active) "
+         "[optimization speed %sB/s, current speed %sB/s]",
+         maxConcurrentDownloads, numActive_, util::abbrevSize(optimizationSpeed_).c_str(), 
+         util::abbrevSize(currentSpeed).c_str()));
+  
+  return maxConcurrentDownloads;
+}
 } // namespace aria2

+ 18 - 3
src/RequestGroupMan.h

@@ -72,7 +72,13 @@ private:
   RequestGroupList reservedGroups_;
   DownloadResultList downloadResults_;
 
-  int maxSimultaneousDownloads_;
+  int maxConcurrentDownloads_;
+
+  bool optimizeConcurrentDownloads_;
+  double optimizeConcurrentDownloadsCoeffA_;
+  double optimizeConcurrentDownloadsCoeffB_;
+  int optimizationSpeed_;
+  Timer optimizationSpeedTimer_;
 
   // The number of simultaneous active downloads, excluding seed only
   // item if PREF_BT_DETACH_SEED_ONLY is true.  We rely on this
@@ -135,9 +141,11 @@ private:
   void addRequestGroupIndex(
       const std::vector<std::shared_ptr<RequestGroup>>& groups);
 
+  int optimizeConcurrentDownloads();
+
 public:
   RequestGroupMan(std::vector<std::shared_ptr<RequestGroup>> requestGroups,
-                  int maxSimultaneousDownloads, const Option* option);
+                  int maxConcurrentDownloads, const Option* option);
 
   ~RequestGroupMan();
 
@@ -195,6 +203,13 @@ public:
 
   bool removeReservedGroup(a2_gid_t gid);
 
+  bool getOptimizeConcurrentDownloads() const
+  {
+    return optimizeConcurrentDownloads_;
+  }
+
+  bool setupOptimizeConcurrentDownloads();
+
   void showDownloadResults(OutputFile& o, bool full) const;
 
   bool isSameFileBeingDownloaded(RequestGroup* requestGroup) const;
@@ -291,7 +306,7 @@ public:
     return maxOverallUploadSpeedLimit_;
   }
 
-  void setMaxSimultaneousDownloads(int max) { maxSimultaneousDownloads_ = max; }
+  void setMaxConcurrentDownloads(int max) { maxConcurrentDownloads_ = max; }
 
   // Call this function if requestGroups_ queue should be maintained.
   // This function is added to reduce the call of maintenance, but at

+ 5 - 1
src/RpcMethodImpl.cc

@@ -1592,10 +1592,14 @@ void changeGlobalOption(const Option& option, DownloadEngine* e)
         option.getAsInt(PREF_MAX_OVERALL_UPLOAD_LIMIT));
   }
   if (option.defined(PREF_MAX_CONCURRENT_DOWNLOADS)) {
-    e->getRequestGroupMan()->setMaxSimultaneousDownloads(
+    e->getRequestGroupMan()->setMaxConcurrentDownloads(
         option.getAsInt(PREF_MAX_CONCURRENT_DOWNLOADS));
     e->getRequestGroupMan()->requestQueueCheck();
   }
+  if(option.defined(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS)) {
+    e->getRequestGroupMan()->setupOptimizeConcurrentDownloads();
+    e->getRequestGroupMan()->requestQueueCheck();
+  }
   if (option.defined(PREF_MAX_DOWNLOAD_RESULT)) {
     e->getRequestGroupMan()->setMaxDownloadResult(
         option.getAsInt(PREF_MAX_DOWNLOAD_RESULT));

+ 26 - 1
src/SpeedCalc.cc

@@ -42,7 +42,7 @@
 namespace aria2 {
 
 namespace {
-constexpr auto WINDOW_TIME = 15_s;
+constexpr auto WINDOW_TIME = 10_s;
 } // namespace
 
 SpeedCalc::SpeedCalc() : accumulatedLength_(0), bytesWindow_(0), maxSpeed_(0) {}
@@ -84,6 +84,31 @@ int SpeedCalc::calculateSpeed()
   return speed;
 }
 
+int SpeedCalc::calculateNewestSpeed(int seconds)
+{
+  const auto& now = global::wallclock();
+  removeStaleTimeSlot(now);
+
+  int64_t bytesCount(0);
+  auto it = timeSlots_.rbegin();
+  while (it != timeSlots_.rend()) {
+    if (it->first.difference(now) > seconds * 1_s) {
+      break;
+    }
+    bytesCount += (*it++).second;
+  }
+  if (it == timeSlots_.rbegin()) {
+    return 0;
+  }
+
+  auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
+                     (*--it).first.difference(now)).count();
+  if (elapsed <= 0) {
+    elapsed = 1;
+  }
+  return bytesCount * (1000. / elapsed);
+}
+
 void SpeedCalc::update(size_t bytes)
 {
   const auto& now = global::wallclock();

+ 2 - 0
src/SpeedCalc.h

@@ -61,6 +61,8 @@ public:
    */
   int calculateSpeed();
 
+  int calculateNewestSpeed(int seconds);
+
   int getMaxSpeed() const { return maxSpeed_; }
 
   int calculateAvgSpeed() const;

+ 6 - 0
src/prefs.cc

@@ -216,6 +216,12 @@ PrefPtr PREF_INPUT_FILE = makePref("input-file");
 PrefPtr PREF_DEFERRED_INPUT = makePref("deferred-input");
 // value: 1*digit
 PrefPtr PREF_MAX_CONCURRENT_DOWNLOADS = makePref("max-concurrent-downloads");
+// value: true | false | A:B
+PrefPtr PREF_OPTIMIZE_CONCURRENT_DOWNLOADS = makePref("optimize-concurrent-downloads");
+// values: 1*digit ['.' [ 1*digit ] ]
+PrefPtr PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA = makePref("optimize-concurrent-downloads-coeffA");
+// values: 1*digit ['.' [ 1*digit ] ]
+PrefPtr PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFB = makePref("optimize-concurrent-downloads-coeffB");
 // value: true | false
 PrefPtr PREF_FORCE_SEQUENTIAL = makePref("force-sequential");
 // value: true | false

+ 6 - 0
src/prefs.h

@@ -173,6 +173,12 @@ extern PrefPtr PREF_DEFERRED_INPUT;
 // value: 1*digit
 extern PrefPtr PREF_MAX_CONCURRENT_DOWNLOADS;
 // value: true | false
+extern PrefPtr PREF_OPTIMIZE_CONCURRENT_DOWNLOADS;
+// value: 1*digit ['.' [ 1*digit ] ]
+extern PrefPtr PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA;
+// value: 1*digit ['.' [ 1*digit ] ]
+extern PrefPtr PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFB;
+// value: true | false
 extern PrefPtr PREF_FORCE_SEQUENTIAL;
 // value: true | false
 extern PrefPtr PREF_AUTO_FILE_RENAMING;

+ 15 - 1
src/usage_text.h

@@ -242,7 +242,21 @@
 #define TEXT_MAX_CONCURRENT_DOWNLOADS                                   \
   _(" -j, --max-concurrent-downloads=N Set maximum number of parallel downloads for\n" \
     "                              every static (HTTP/FTP) URL, torrent and metalink.\n" \
-    "                              See also --split option.")
+    "                              See also --split and --optimize-concurrent-downloads options.")
+#define TEXT_OPTIMIZE_CONCURRENT_DOWNLOADS\
+  _(" --optimize-concurrent-downloads[=true|false|A:B] Optimizes the number of\n" \
+    "                              concurrent downloads according to the bandwidth\n" \
+    "                              available. aria2 uses the download speed observed\n" \
+    "                              in the previous downloads to adapt the number of\n" \
+    "                              downloads launched in parallel according to the rule\n" \
+    "                              N = A + B Log10(speed in Mbps). The coefficients\n" \
+    "                              A and B can be customized in the option arguments\n" \
+    "                              with A and B separated by a colon. The default values\n" \
+    "                              (A=5,B=25) lead to using typically 5 parallel\n" \
+    "                              downloads on 1Mbps networks and above 50 on 100Mbps\n" \
+    "                              networks. The number of parallel downloads remains\n" \
+    "                              constrained under the maximum defined by the\n" \
+    "                              max-concurrent-downloads parameter.")
 #define TEXT_LOAD_COOKIES                                               \
   _(" --load-cookies=FILE          Load Cookies from FILE using the Firefox3 format\n" \
     "                              and Mozilla/Firefox(1.x/2.x)/Netscape format.")