| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649 | /* <!-- copyright *//* * aria2 - The high speed download utility * * Copyright (C) 2006 Tatsuhiro Tsujikawa * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * * In addition, as a special exception, the copyright holders give * permission to link the code of portions of this program with the * OpenSSL library under certain conditions as described in each * individual source file, and distribute linked combinations * including the two. * You must obey the GNU General Public License in all respects * for all of the code used other than OpenSSL.  If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so.  If you * do not wish to do so, delete this exception statement from your * version.  If you delete this exception statement from all source * files in the program, then also delete it here. *//* copyright --> */#include "DownloadEngine.h"#include <signal.h>#include <cstring>#include <cerrno>#include <algorithm>#include <numeric>#include <iterator>#include "StatCalc.h"#include "RequestGroup.h"#include "RequestGroupMan.h"#include "DownloadResult.h"#include "StatCalc.h"#include "LogFactory.h"#include "Logger.h"#include "SocketCore.h"#include "util.h"#include "a2functional.h"#include "DlAbortEx.h"#include "ServerStatMan.h"#include "CookieStorage.h"#include "A2STR.h"#include "AuthConfigFactory.h"#include "AuthConfig.h"#include "Request.h"#include "EventPoll.h"#include "Command.h"#include "FileAllocationEntry.h"#include "CheckIntegrityEntry.h"#include "BtProgressInfoFile.h"#include "DownloadContext.h"#include "fmt.h"#include "wallclock.h"#ifdef ENABLE_BITTORRENT#include "BtRegistry.h"#endif // ENABLE_BITTORRENT#ifdef ENABLE_WEBSOCKET#include "WebSocketSessionMan.h"#endif // ENABLE_WEBSOCKET#include "Option.h"#include "util_security.h"namespace aria2 {namespace global {// 0 ... running// 1 ... stop signal detected// 2 ... stop signal processed by DownloadEngine// 3 ... 2nd stop signal(force shutdown) detected// 4 ... 2nd stop signal processed by DownloadEngine// 5 ... main loop exitedvolatile sig_atomic_t globalHaltRequested = 0;} // namespace globalnamespace {constexpr auto DEFAULT_REFRESH_INTERVAL = 1_s;} // namespaceDownloadEngine::DownloadEngine(std::unique_ptr<EventPoll> eventPoll)    : eventPoll_(std::move(eventPoll)),      haltRequested_(0),      noWait_(true),      refreshInterval_(DEFAULT_REFRESH_INTERVAL),      lastRefresh_(Timer::zero()),      cookieStorage_(make_unique<CookieStorage>()),#ifdef ENABLE_BITTORRENT      btRegistry_(make_unique<BtRegistry>()),#endif // ENABLE_BITTORRENT#ifdef HAVE_ARES_ADDR_NODE      asyncDNSServers_(nullptr),#endif // HAVE_ARES_ADDR_NODE      dnsCache_(make_unique<DNSCache>()),      option_(nullptr){  unsigned char sessionId[20];  util::generateRandomKey(sessionId);  sessionId_.assign(&sessionId[0], &sessionId[sizeof(sessionId)]);}DownloadEngine::~DownloadEngine(){#ifdef HAVE_ARES_ADDR_NODE  setAsyncDNSServers(nullptr);#endif // HAVE_ARES_ADDR_NODE}namespace {void executeCommand(std::deque<std::unique_ptr<Command>>& commands,                    Command::STATUS statusFilter){  size_t max = commands.size();  for (size_t i = 0; i < max; ++i) {    auto com = std::move(commands.front());    commands.pop_front();    if (!com->statusMatch(statusFilter)) {      com->clearIOEvents();      commands.push_back(std::move(com));      continue;    }    com->transitStatus();    if (com->execute()) {      com.reset();    }    else {      com->clearIOEvents();      com.release();    }  }}} // namespacenamespace {class GlobalHaltRequestedFinalizer {public:  GlobalHaltRequestedFinalizer() = default;  ~GlobalHaltRequestedFinalizer() { global::globalHaltRequested = 5; }};} // namespaceint DownloadEngine::run(bool oneshot){  GlobalHaltRequestedFinalizer ghrf;  while (!commands_.empty() || !routineCommands_.empty()) {    if (!commands_.empty()) {      waitData();    }    noWait_ = false;    global::wallclock().reset();    calculateStatistics();    if (lastRefresh_.difference(global::wallclock()) + A2_DELTA_MILLIS >=        refreshInterval_) {      refreshInterval_ = DEFAULT_REFRESH_INTERVAL;      lastRefresh_ = global::wallclock();      executeCommand(commands_, Command::STATUS_ALL);    }    else {      executeCommand(commands_, Command::STATUS_ACTIVE);    }    executeCommand(routineCommands_, Command::STATUS_ALL);    afterEachIteration();    if (!noWait_ && oneshot) {      return 1;    }  }  onEndOfRun();  return 0;}void DownloadEngine::waitData(){  struct timeval tv;  if (noWait_) {    tv.tv_sec = tv.tv_usec = 0;  }  else {    auto t =        std::chrono::duration_cast<std::chrono::microseconds>(refreshInterval_);    tv.tv_sec = t.count() / 1000000;    tv.tv_usec = t.count() % 1000000;  }  eventPoll_->poll(tv);}bool DownloadEngine::addSocketForReadCheck(    const std::shared_ptr<SocketCore>& socket, Command* command){  return eventPoll_->addEvents(socket->getSockfd(), command,                               EventPoll::EVENT_READ);}bool DownloadEngine::deleteSocketForReadCheck(    const std::shared_ptr<SocketCore>& socket, Command* command){  return eventPoll_->deleteEvents(socket->getSockfd(), command,                                  EventPoll::EVENT_READ);}bool DownloadEngine::addSocketForWriteCheck(    const std::shared_ptr<SocketCore>& socket, Command* command){  return eventPoll_->addEvents(socket->getSockfd(), command,                               EventPoll::EVENT_WRITE);}bool DownloadEngine::deleteSocketForWriteCheck(    const std::shared_ptr<SocketCore>& socket, Command* command){  return eventPoll_->deleteEvents(socket->getSockfd(), command,                                  EventPoll::EVENT_WRITE);}void DownloadEngine::calculateStatistics(){  if (statCalc_) {    statCalc_->calculateStat(this);  }}void DownloadEngine::onEndOfRun(){  requestGroupMan_->removeStoppedGroup(this);  requestGroupMan_->closeFile();  requestGroupMan_->save();}void DownloadEngine::afterEachIteration(){  if (global::globalHaltRequested == 1) {    A2_LOG_NOTICE(_("Shutdown sequence commencing..."                    " Press Ctrl-C again for emergency shutdown."));    requestHalt();    global::globalHaltRequested = 2;    setNoWait(true);    setRefreshInterval(std::chrono::milliseconds(0));    return;  }  if (global::globalHaltRequested == 3) {    A2_LOG_NOTICE(_("Emergency shutdown sequence commencing..."));    requestForceHalt();    global::globalHaltRequested = 4;    setNoWait(true);    setRefreshInterval(std::chrono::milliseconds(0));    return;  }}void DownloadEngine::requestHalt(){  haltRequested_ = std::max(haltRequested_, 1);  requestGroupMan_->halt();}void DownloadEngine::requestForceHalt(){  haltRequested_ = std::max(haltRequested_, 2);  requestGroupMan_->forceHalt();}void DownloadEngine::setStatCalc(std::unique_ptr<StatCalc> statCalc){  statCalc_ = std::move(statCalc);}#ifdef ENABLE_ASYNC_DNSbool DownloadEngine::addNameResolverCheck(    const std::shared_ptr<AsyncNameResolver>& resolver, Command* command){  return eventPoll_->addNameResolver(resolver, command);}bool DownloadEngine::deleteNameResolverCheck(    const std::shared_ptr<AsyncNameResolver>& resolver, Command* command){  return eventPoll_->deleteNameResolver(resolver, command);}#endif // ENABLE_ASYNC_DNSvoid DownloadEngine::setNoWait(bool b) { noWait_ = b; }void DownloadEngine::addRoutineCommand(std::unique_ptr<Command> command){  routineCommands_.push_back(std::move(command));}void DownloadEngine::poolSocket(const std::string& key,                                const SocketPoolEntry& entry){  A2_LOG_INFO(fmt("Pool socket for %s", key.c_str()));  std::multimap<std::string, SocketPoolEntry>::value_type p(key, entry);  socketPool_.insert(p);}void DownloadEngine::evictSocketPool(){  if (socketPool_.empty()) {    return;  }  std::multimap<std::string, SocketPoolEntry> newPool;  A2_LOG_DEBUG("Scaning SocketPool and erasing timed out entry.");  for (auto& elem : socketPool_) {    if (!elem.second.isTimeout()) {      newPool.insert(elem);    }  }  A2_LOG_DEBUG(      fmt("%lu entries removed.",          static_cast<unsigned long>(socketPool_.size() - newPool.size())));  socketPool_ = std::move(newPool);}namespace {std::string createSockPoolKey(const std::string& host, uint16_t port,                              const std::string& username,                              const std::string& proxyhost, uint16_t proxyport){  std::string key;  if (!username.empty()) {    key += util::percentEncode(username);    key += "@";  }  key += fmt("%s(%u)", host.c_str(), port);  if (!proxyhost.empty()) {    key += fmt("/%s(%u)", proxyhost.c_str(), proxyport);  }  return key;}} // namespacevoid DownloadEngine::poolSocket(const std::string& ipaddr, uint16_t port,                                const std::string& username,                                const std::string& proxyhost,                                uint16_t proxyport,                                const std::shared_ptr<SocketCore>& sock,                                const std::string& options,                                std::chrono::seconds timeout){  SocketPoolEntry e(sock, options, std::move(timeout));  poolSocket(createSockPoolKey(ipaddr, port, username, proxyhost, proxyport),             e);}void DownloadEngine::poolSocket(const std::string& ipaddr, uint16_t port,                                const std::string& proxyhost,                                uint16_t proxyport,                                const std::shared_ptr<SocketCore>& sock,                                std::chrono::seconds timeout){  SocketPoolEntry e(sock, std::move(timeout));  poolSocket(createSockPoolKey(ipaddr, port, A2STR::NIL, proxyhost, proxyport),             e);}namespace {bool getPeerInfo(Endpoint& res, const std::shared_ptr<SocketCore>& socket){  try {    res = socket->getPeerInfo();    return true;  }  catch (RecoverableException& e) {    // socket->getPeerInfo() can fail if the socket has been    // disconnected.    A2_LOG_INFO_EX("Getting peer info failed. Pooling socket canceled.", e);    return false;  }}} // namespacevoid DownloadEngine::poolSocket(const std::shared_ptr<Request>& request,                                const std::shared_ptr<Request>& proxyRequest,                                const std::shared_ptr<SocketCore>& socket,                                std::chrono::seconds timeout){  if (proxyRequest) {    // If proxy is defined, then pool socket with its hostname.    poolSocket(request->getHost(), request->getPort(), proxyRequest->getHost(),               proxyRequest->getPort(), socket, std::move(timeout));    return;  }  Endpoint peerInfo;  if (getPeerInfo(peerInfo, socket)) {    poolSocket(peerInfo.addr, peerInfo.port, A2STR::NIL, 0, socket,               std::move(timeout));  }}void DownloadEngine::poolSocket(const std::shared_ptr<Request>& request,                                const std::string& username,                                const std::shared_ptr<Request>& proxyRequest,                                const std::shared_ptr<SocketCore>& socket,                                const std::string& options,                                std::chrono::seconds timeout){  if (proxyRequest) {    // If proxy is defined, then pool socket with its hostname.    poolSocket(request->getHost(), request->getPort(), username,               proxyRequest->getHost(), proxyRequest->getPort(), socket,               options, std::move(timeout));    return;  }  Endpoint peerInfo;  if (getPeerInfo(peerInfo, socket)) {    poolSocket(peerInfo.addr, peerInfo.port, username, A2STR::NIL, 0, socket,               options, std::move(timeout));  }}std::multimap<std::string, DownloadEngine::SocketPoolEntry>::iteratorDownloadEngine::findSocketPoolEntry(const std::string& key){  std::pair<std::multimap<std::string, SocketPoolEntry>::iterator,            std::multimap<std::string, SocketPoolEntry>::iterator> range =      socketPool_.equal_range(key);  for (auto i = range.first, eoi = range.second; i != eoi; ++i) {    const SocketPoolEntry& e = (*i).second;    // We assume that if socket is readable it means peer shutdowns    // connection and the socket will receive EOF. So skip it.    if (!e.isTimeout() && !e.getSocket()->isReadable(0)) {      A2_LOG_INFO(fmt("Found socket for %s", key.c_str()));      return i;    }  }  return socketPool_.end();}std::shared_ptr<SocketCore>DownloadEngine::popPooledSocket(const std::string& ipaddr, uint16_t port,                                const std::string& proxyhost,                                uint16_t proxyport){  std::shared_ptr<SocketCore> s;  auto i = findSocketPoolEntry(      createSockPoolKey(ipaddr, port, A2STR::NIL, proxyhost, proxyport));  if (i != socketPool_.end()) {    s = (*i).second.getSocket();    socketPool_.erase(i);  }  return s;}std::shared_ptr<SocketCore>DownloadEngine::popPooledSocket(std::string& options, const std::string& ipaddr,                                uint16_t port, const std::string& username,                                const std::string& proxyhost,                                uint16_t proxyport){  std::shared_ptr<SocketCore> s;  auto i = findSocketPoolEntry(      createSockPoolKey(ipaddr, port, username, proxyhost, proxyport));  if (i != socketPool_.end()) {    s = (*i).second.getSocket();    options = (*i).second.getOptions();    socketPool_.erase(i);  }  return s;}std::shared_ptr<SocketCore>DownloadEngine::popPooledSocket(const std::vector<std::string>& ipaddrs,                                uint16_t port){  std::shared_ptr<SocketCore> s;  for (const auto& ipaddr : ipaddrs) {    s = popPooledSocket(ipaddr, port, A2STR::NIL, 0);    if (s) {      break;    }  }  return s;}std::shared_ptr<SocketCore>DownloadEngine::popPooledSocket(std::string& options,                                const std::vector<std::string>& ipaddrs,                                uint16_t port, const std::string& username){  std::shared_ptr<SocketCore> s;  for (const auto& ipaddr : ipaddrs) {    s = popPooledSocket(options, ipaddr, port, username, A2STR::NIL, 0);    if (s) {      break;    }  }  return s;}DownloadEngine::SocketPoolEntry::SocketPoolEntry(    const std::shared_ptr<SocketCore>& socket, const std::string& options,    std::chrono::seconds timeout)    : socket_(socket), options_(options), timeout_(std::move(timeout)){}DownloadEngine::SocketPoolEntry::SocketPoolEntry(    const std::shared_ptr<SocketCore>& socket, std::chrono::seconds timeout)    : socket_(socket), timeout_(std::move(timeout)){}DownloadEngine::SocketPoolEntry::~SocketPoolEntry() = default;bool DownloadEngine::SocketPoolEntry::isTimeout() const{  return registeredTime_.difference(global::wallclock()) >= timeout_;}cuid_t DownloadEngine::newCUID() { return cuidCounter_.newID(); }const std::string&DownloadEngine::findCachedIPAddress(const std::string& hostname,                                    uint16_t port) const{  return dnsCache_->find(hostname, port);}void DownloadEngine::cacheIPAddress(const std::string& hostname,                                    const std::string& ipaddr, uint16_t port){  dnsCache_->put(hostname, ipaddr, port);}void DownloadEngine::markBadIPAddress(const std::string& hostname,                                      const std::string& ipaddr, uint16_t port){  dnsCache_->markBad(hostname, ipaddr, port);}void DownloadEngine::removeCachedIPAddress(const std::string& hostname,                                           uint16_t port){  dnsCache_->remove(hostname, port);}void DownloadEngine::setAuthConfigFactory(    std::unique_ptr<AuthConfigFactory> factory){  authConfigFactory_ = std::move(factory);}const std::unique_ptr<AuthConfigFactory>&DownloadEngine::getAuthConfigFactory() const{  return authConfigFactory_;}const std::unique_ptr<CookieStorage>& DownloadEngine::getCookieStorage() const{  return cookieStorage_;}void DownloadEngine::setRefreshInterval(std::chrono::milliseconds interval){  refreshInterval_ = std::move(interval);}void DownloadEngine::addCommand(std::vector<std::unique_ptr<Command>> commands){  commands_.insert(commands_.end(),                   std::make_move_iterator(std::begin(commands)),                   std::make_move_iterator(std::end(commands)));}void DownloadEngine::addCommand(std::unique_ptr<Command> command){  commands_.push_back(std::move(command));}void DownloadEngine::setRequestGroupMan(std::unique_ptr<RequestGroupMan> rgman){  requestGroupMan_ = std::move(rgman);}void DownloadEngine::setFileAllocationMan(    std::unique_ptr<FileAllocationMan> faman){  fileAllocationMan_ = std::move(faman);}void DownloadEngine::setCheckIntegrityMan(    std::unique_ptr<CheckIntegrityMan> ciman){  checkIntegrityMan_ = std::move(ciman);}#ifdef HAVE_ARES_ADDR_NODEvoid DownloadEngine::setAsyncDNSServers(ares_addr_node* asyncDNSServers){  ares_addr_node* node = asyncDNSServers_;  while (node) {    ares_addr_node* next = node->next;    delete node;    node = next;  }  asyncDNSServers_ = asyncDNSServers;}#endif // HAVE_ARES_ADDR_NODE#ifdef ENABLE_WEBSOCKETvoid DownloadEngine::setWebSocketSessionMan(    std::unique_ptr<rpc::WebSocketSessionMan> wsman){  webSocketSessionMan_ = std::move(wsman);}#endif // ENABLE_WEBSOCKETbool DownloadEngine::validateToken(const std::string& token){  using namespace util::security;  if (!option_->defined(PREF_RPC_SECRET)) {    return true;  }  if (!tokenHMAC_) {    tokenHMAC_ = HMAC::createRandom();    if (!tokenHMAC_) {      A2_LOG_ERROR("Failed to create HMAC");      return false;    }    tokenExpected_ = make_unique<HMACResult>(        tokenHMAC_->getResult(option_->get(PREF_RPC_SECRET)));  }  return *tokenExpected_ == tokenHMAC_->getResult(token);}} // namespace aria2
 |