DefaultPeerStorage.cc 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. /* <!-- copyright */
  2. /*
  3. * aria2 - The high speed download utility
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * In addition, as a special exception, the copyright holders give
  22. * permission to link the code of portions of this program with the
  23. * OpenSSL library under certain conditions as described in each
  24. * individual source file, and distribute linked combinations
  25. * including the two.
  26. * You must obey the GNU General Public License in all respects
  27. * for all of the code used other than OpenSSL. If you modify
  28. * file(s) with this exception, you may extend this exception to your
  29. * version of the file(s), but you are not obligated to do so. If you
  30. * do not wish to do so, delete this exception statement from your
  31. * version. If you delete this exception statement from all source
  32. * files in the program, then also delete it here.
  33. */
  34. /* copyright --> */
  35. #include "DefaultPeerStorage.h"
  36. #include <algorithm>
  37. #include "LogFactory.h"
  38. #include "Logger.h"
  39. #include "message.h"
  40. #include "Peer.h"
  41. #include "BtRuntime.h"
  42. #include "BtSeederStateChoke.h"
  43. #include "BtLeecherStateChoke.h"
  44. #include "PieceStorage.h"
  45. #include "wallclock.h"
  46. #include "a2functional.h"
  47. #include "fmt.h"
  48. #include "SimpleRandomizer.h"
  49. namespace aria2 {
  50. namespace {
  51. const size_t MAX_PEER_LIST_SIZE = 512;
  52. const size_t MAX_PEER_LIST_UPDATE = 100;
  53. } // namespace
  54. DefaultPeerStorage::DefaultPeerStorage()
  55. : maxPeerListSize_(MAX_PEER_LIST_SIZE),
  56. seederStateChoke_(make_unique<BtSeederStateChoke>()),
  57. leecherStateChoke_(make_unique<BtLeecherStateChoke>()),
  58. lastTransferStatMapUpdated_(Timer::zero())
  59. {
  60. }
  61. DefaultPeerStorage::~DefaultPeerStorage()
  62. {
  63. assert(uniqPeers_.size() == unusedPeers_.size() + usedPeers_.size());
  64. }
  65. size_t DefaultPeerStorage::countAllPeer() const
  66. {
  67. return unusedPeers_.size() + usedPeers_.size();
  68. }
  69. bool DefaultPeerStorage::isPeerAlreadyAdded(const std::shared_ptr<Peer>& peer)
  70. {
  71. return uniqPeers_.count(
  72. std::make_pair(peer->getIPAddress(), peer->getOrigPort()));
  73. }
  74. void DefaultPeerStorage::addUniqPeer(const std::shared_ptr<Peer>& peer)
  75. {
  76. uniqPeers_.insert(std::make_pair(peer->getIPAddress(), peer->getOrigPort()));
  77. }
  78. bool DefaultPeerStorage::addPeer(const std::shared_ptr<Peer>& peer)
  79. {
  80. if (isPeerAlreadyAdded(peer)) {
  81. A2_LOG_DEBUG(fmt("Adding %s:%u is rejected because it has been already"
  82. " added.",
  83. peer->getIPAddress().c_str(), peer->getPort()));
  84. return false;
  85. }
  86. if (isBadPeer(peer->getIPAddress())) {
  87. A2_LOG_DEBUG(fmt("Adding %s:%u is rejected because it is marked bad.",
  88. peer->getIPAddress().c_str(), peer->getPort()));
  89. return false;
  90. }
  91. const size_t peerListSize = unusedPeers_.size();
  92. if (peerListSize >= maxPeerListSize_) {
  93. deleteUnusedPeer(peerListSize - maxPeerListSize_ + 1);
  94. }
  95. unusedPeers_.push_front(peer);
  96. addUniqPeer(peer);
  97. A2_LOG_DEBUG(fmt("Now unused peer list contains %lu peers",
  98. static_cast<unsigned long>(unusedPeers_.size())));
  99. return true;
  100. }
  101. void DefaultPeerStorage::addPeer(
  102. const std::vector<std::shared_ptr<Peer>>& peers)
  103. {
  104. size_t added = 0;
  105. size_t addMax = std::min(maxPeerListSize_, MAX_PEER_LIST_UPDATE);
  106. for (auto itr = std::begin(peers), eoi = std::end(peers);
  107. itr != eoi && added < addMax; ++itr) {
  108. auto& peer = *itr;
  109. if (isPeerAlreadyAdded(peer)) {
  110. A2_LOG_DEBUG(fmt("Adding %s:%u is rejected because it has been already"
  111. " added.",
  112. peer->getIPAddress().c_str(), peer->getPort()));
  113. continue;
  114. }
  115. else if (isBadPeer(peer->getIPAddress())) {
  116. A2_LOG_DEBUG(fmt("Adding %s:%u is rejected because it is marked bad.",
  117. peer->getIPAddress().c_str(), peer->getPort()));
  118. continue;
  119. }
  120. else {
  121. A2_LOG_DEBUG(
  122. fmt(MSG_ADDING_PEER, peer->getIPAddress().c_str(), peer->getPort()));
  123. }
  124. unusedPeers_.push_front(peer);
  125. addUniqPeer(peer);
  126. ++added;
  127. }
  128. const size_t peerListSize = unusedPeers_.size();
  129. if (peerListSize > maxPeerListSize_) {
  130. deleteUnusedPeer(peerListSize - maxPeerListSize_);
  131. }
  132. A2_LOG_DEBUG(fmt("Now unused peer list contains %lu peers",
  133. static_cast<unsigned long>(unusedPeers_.size())));
  134. }
  135. void DefaultPeerStorage::addDroppedPeer(const std::shared_ptr<Peer>& peer)
  136. {
  137. // Make sure that no duplicated peer exists in droppedPeers_. If
  138. // exists, erase older one.
  139. for (auto i = std::begin(droppedPeers_), eoi = std::end(droppedPeers_);
  140. i != eoi; ++i) {
  141. if ((*i)->getIPAddress() == peer->getIPAddress() &&
  142. (*i)->getPort() == peer->getPort()) {
  143. droppedPeers_.erase(i);
  144. break;
  145. }
  146. }
  147. droppedPeers_.push_front(peer);
  148. if (droppedPeers_.size() > 50) {
  149. droppedPeers_.pop_back();
  150. }
  151. }
  152. const std::deque<std::shared_ptr<Peer>>& DefaultPeerStorage::getUnusedPeers()
  153. {
  154. return unusedPeers_;
  155. }
  156. const PeerSet& DefaultPeerStorage::getUsedPeers() { return usedPeers_; }
  157. const std::deque<std::shared_ptr<Peer>>& DefaultPeerStorage::getDroppedPeers()
  158. {
  159. return droppedPeers_;
  160. }
  161. bool DefaultPeerStorage::isPeerAvailable() { return !unusedPeers_.empty(); }
  162. bool DefaultPeerStorage::isBadPeer(const std::string& ipaddr)
  163. {
  164. auto i = badPeers_.find(ipaddr);
  165. if (i == std::end(badPeers_)) {
  166. return false;
  167. }
  168. if ((*i).second <= global::wallclock()) {
  169. badPeers_.erase(i);
  170. return false;
  171. }
  172. return true;
  173. }
  174. void DefaultPeerStorage::addBadPeer(const std::string& ipaddr)
  175. {
  176. if (lastBadPeerCleaned_.difference(global::wallclock()) >= 1_h) {
  177. for (auto i = std::begin(badPeers_); i != std::end(badPeers_);) {
  178. if ((*i).second <= global::wallclock()) {
  179. A2_LOG_DEBUG(fmt("Purge %s from bad peer", (*i).first.c_str()));
  180. badPeers_.erase(i++);
  181. // badPeers_.end() will not be invalidated.
  182. }
  183. else {
  184. ++i;
  185. }
  186. }
  187. lastBadPeerCleaned_ = global::wallclock();
  188. }
  189. A2_LOG_DEBUG(fmt("Added %s as bad peer", ipaddr.c_str()));
  190. // We use variable timeout to avoid many bad peers wake up at once.
  191. auto t = global::wallclock();
  192. t.advance(std::chrono::seconds(
  193. std::max(SimpleRandomizer::getInstance()->getRandomNumber(601), 120L)));
  194. badPeers_[ipaddr] = std::move(t);
  195. }
  196. void DefaultPeerStorage::deleteUnusedPeer(size_t delSize)
  197. {
  198. for (; delSize > 0 && !unusedPeers_.empty(); --delSize) {
  199. onErasingPeer(unusedPeers_.back());
  200. unusedPeers_.pop_back();
  201. }
  202. }
  203. std::shared_ptr<Peer> DefaultPeerStorage::checkoutPeer(cuid_t cuid)
  204. {
  205. if (!isPeerAvailable()) {
  206. return nullptr;
  207. }
  208. auto peer = unusedPeers_.front();
  209. unusedPeers_.pop_front();
  210. if (peer->usedBy() != 0) {
  211. A2_LOG_WARN(fmt("CUID#%" PRId64 " is already set for peer %s:%u",
  212. peer->usedBy(), peer->getIPAddress().c_str(),
  213. peer->getOrigPort()));
  214. }
  215. peer->usedBy(cuid);
  216. usedPeers_.insert(peer);
  217. A2_LOG_DEBUG(fmt("Checkout peer %s:%u to CUID#%" PRId64,
  218. peer->getIPAddress().c_str(), peer->getOrigPort(),
  219. peer->usedBy()));
  220. return peer;
  221. }
  222. void DefaultPeerStorage::onErasingPeer(const std::shared_ptr<Peer>& peer)
  223. {
  224. uniqPeers_.erase(std::make_pair(peer->getIPAddress(), peer->getOrigPort()));
  225. }
  226. void DefaultPeerStorage::onReturningPeer(const std::shared_ptr<Peer>& peer)
  227. {
  228. if (peer->isActive()) {
  229. if (peer->isDisconnectedGracefully() && !peer->isIncomingPeer()) {
  230. peer->startDrop();
  231. addDroppedPeer(peer);
  232. }
  233. // Execute choking algorithm if unchoked and interested peer is
  234. // disconnected.
  235. if (!peer->amChoking() && peer->peerInterested()) {
  236. executeChoke();
  237. }
  238. }
  239. peer->usedBy(0);
  240. }
  241. void DefaultPeerStorage::returnPeer(const std::shared_ptr<Peer>& peer)
  242. {
  243. A2_LOG_DEBUG(fmt("Peer %s:%u returned from CUID#%" PRId64,
  244. peer->getIPAddress().c_str(), peer->getOrigPort(),
  245. peer->usedBy()));
  246. if (usedPeers_.erase(peer)) {
  247. onReturningPeer(peer);
  248. onErasingPeer(peer);
  249. }
  250. else {
  251. A2_LOG_WARN(fmt("Cannot find peer %s:%u in usedPeers_",
  252. peer->getIPAddress().c_str(), peer->getOrigPort()));
  253. }
  254. }
  255. bool DefaultPeerStorage::chokeRoundIntervalElapsed()
  256. {
  257. constexpr auto CHOKE_ROUND_INTERVAL = 10_s;
  258. if (pieceStorage_->downloadFinished()) {
  259. return seederStateChoke_->getLastRound().difference(global::wallclock()) >=
  260. CHOKE_ROUND_INTERVAL;
  261. }
  262. else {
  263. return leecherStateChoke_->getLastRound().difference(global::wallclock()) >=
  264. CHOKE_ROUND_INTERVAL;
  265. }
  266. }
  267. void DefaultPeerStorage::executeChoke()
  268. {
  269. if (pieceStorage_->downloadFinished()) {
  270. return seederStateChoke_->executeChoke(usedPeers_);
  271. }
  272. else {
  273. return leecherStateChoke_->executeChoke(usedPeers_);
  274. }
  275. }
  276. void DefaultPeerStorage::setPieceStorage(
  277. const std::shared_ptr<PieceStorage>& ps)
  278. {
  279. pieceStorage_ = ps;
  280. }
  281. void DefaultPeerStorage::setBtRuntime(
  282. const std::shared_ptr<BtRuntime>& btRuntime)
  283. {
  284. btRuntime_ = btRuntime;
  285. }
  286. } // namespace aria2