DefaultPeerStorage.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. /* <!-- copyright */
  2. /*
  3. * aria2 - The high speed download utility
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * In addition, as a special exception, the copyright holders give
  22. * permission to link the code of portions of this program with the
  23. * OpenSSL library under certain conditions as described in each
  24. * individual source file, and distribute linked combinations
  25. * including the two.
  26. * You must obey the GNU General Public License in all respects
  27. * for all of the code used other than OpenSSL. If you modify
  28. * file(s) with this exception, you may extend this exception to your
  29. * version of the file(s), but you are not obligated to do so. If you
  30. * do not wish to do so, delete this exception statement from your
  31. * version. If you delete this exception statement from all source
  32. * files in the program, then also delete it here.
  33. */
  34. /* copyright --> */
  35. #include "DefaultPeerStorage.h"
  36. #include <algorithm>
  37. #include "LogFactory.h"
  38. #include "Logger.h"
  39. #include "message.h"
  40. #include "Peer.h"
  41. #include "BtRuntime.h"
  42. #include "BtSeederStateChoke.h"
  43. #include "BtLeecherStateChoke.h"
  44. #include "PieceStorage.h"
  45. #include "wallclock.h"
  46. #include "a2functional.h"
  47. #include "fmt.h"
  48. namespace aria2 {
  49. namespace {
  50. const size_t MAX_PEER_LIST_SIZE = 1024;
  51. const size_t MAX_PEER_LIST_UPDATE = 100;
  52. } // namespace
  53. DefaultPeerStorage::DefaultPeerStorage()
  54. : maxPeerListSize_(MAX_PEER_LIST_SIZE),
  55. removedPeerSessionDownloadLength_(0),
  56. removedPeerSessionUploadLength_(0),
  57. seederStateChoke_(new BtSeederStateChoke()),
  58. leecherStateChoke_(new BtLeecherStateChoke()),
  59. lastTransferStatMapUpdated_(0)
  60. {}
  61. DefaultPeerStorage::~DefaultPeerStorage()
  62. {
  63. delete seederStateChoke_;
  64. delete leecherStateChoke_;
  65. }
  66. namespace {
  67. class FindIdenticalPeer {
  68. private:
  69. SharedHandle<Peer> peer_;
  70. public:
  71. FindIdenticalPeer(const SharedHandle<Peer>& peer):peer_(peer) {}
  72. bool operator()(const SharedHandle<Peer>& peer) const {
  73. return (*peer_ == *peer) ||
  74. ((peer_->getIPAddress() == peer->getIPAddress()) &&
  75. (peer_->getPort() == peer->getPort()));
  76. }
  77. };
  78. } // namespace
  79. bool DefaultPeerStorage::isPeerAlreadyAdded(const SharedHandle<Peer>& peer)
  80. {
  81. return std::find_if(peers_.begin(), peers_.end(),
  82. FindIdenticalPeer(peer)) != peers_.end();
  83. }
  84. bool DefaultPeerStorage::addPeer(const SharedHandle<Peer>& peer) {
  85. if(isPeerAlreadyAdded(peer)) {
  86. A2_LOG_DEBUG(fmt("Adding %s:%u is rejected because it has been already"
  87. " added.",
  88. peer->getIPAddress().c_str(), peer->getPort()));
  89. return false;
  90. }
  91. const size_t peerListSize = peers_.size();
  92. if(peerListSize >= maxPeerListSize_) {
  93. deleteUnusedPeer(peerListSize-maxPeerListSize_+1);
  94. }
  95. peers_.push_front(peer);
  96. A2_LOG_DEBUG(fmt("Now peer list contains %lu peers",
  97. static_cast<unsigned long>(peers_.size())));
  98. return true;
  99. }
  100. void DefaultPeerStorage::addPeer(const std::vector<SharedHandle<Peer> >& peers)
  101. {
  102. size_t added = 0;
  103. size_t addMax = std::min(maxPeerListSize_, MAX_PEER_LIST_UPDATE);
  104. for(std::vector<SharedHandle<Peer> >::const_iterator itr = peers.begin(),
  105. eoi = peers.end(); itr != eoi && added < addMax; ++itr) {
  106. const SharedHandle<Peer>& peer = *itr;
  107. if(isPeerAlreadyAdded(peer)) {
  108. A2_LOG_DEBUG(fmt("Adding %s:%u is rejected because it has been already"
  109. " added.",
  110. peer->getIPAddress().c_str(), peer->getPort()));
  111. continue;
  112. } else {
  113. A2_LOG_DEBUG(fmt(MSG_ADDING_PEER,
  114. peer->getIPAddress().c_str(), peer->getPort()));
  115. }
  116. peers_.push_front(peer);
  117. ++added;
  118. }
  119. const size_t peerListSize = peers_.size();
  120. if(peerListSize >= maxPeerListSize_) {
  121. deleteUnusedPeer(peerListSize-maxPeerListSize_);
  122. }
  123. A2_LOG_DEBUG(fmt("Now peer list contains %lu peers",
  124. static_cast<unsigned long>(peers_.size())));
  125. }
  126. void DefaultPeerStorage::addDroppedPeer(const SharedHandle<Peer>& peer)
  127. {
  128. droppedPeers_.push_front(peer);
  129. if(droppedPeers_.size() > 50) {
  130. droppedPeers_.pop_back();
  131. }
  132. }
  133. const std::deque<SharedHandle<Peer> >& DefaultPeerStorage::getPeers()
  134. {
  135. return peers_;
  136. }
  137. const std::deque<SharedHandle<Peer> >& DefaultPeerStorage::getDroppedPeers()
  138. {
  139. return droppedPeers_;
  140. }
  141. namespace {
  142. class FindFinePeer {
  143. public:
  144. bool operator()(const SharedHandle<Peer>& peer) const {
  145. return peer->unused() && peer->isGood();
  146. }
  147. };
  148. } // namespace
  149. SharedHandle<Peer> DefaultPeerStorage::getUnusedPeer() {
  150. std::deque<SharedHandle<Peer> >::const_iterator itr =
  151. std::find_if(peers_.begin(), peers_.end(), FindFinePeer());
  152. if(itr == peers_.end()) {
  153. return SharedHandle<Peer>();
  154. } else {
  155. return *itr;
  156. }
  157. }
  158. namespace {
  159. class FindPeer {
  160. private:
  161. std::string ipaddr;
  162. uint16_t port;
  163. public:
  164. FindPeer(const std::string& ipaddr, uint16_t port):
  165. ipaddr(ipaddr), port(port) {}
  166. bool operator()(const SharedHandle<Peer>& peer) const {
  167. return ipaddr == peer->getIPAddress() && port == peer->getPort();
  168. }
  169. };
  170. } // namespace
  171. SharedHandle<Peer> DefaultPeerStorage::getPeer(const std::string& ipaddr,
  172. uint16_t port) const {
  173. std::deque<SharedHandle<Peer> >::const_iterator itr =
  174. std::find_if(peers_.begin(), peers_.end(), FindPeer(ipaddr, port));
  175. if(itr == peers_.end()) {
  176. return SharedHandle<Peer>();
  177. } else {
  178. return *itr;
  179. }
  180. }
  181. size_t DefaultPeerStorage::countPeer() const {
  182. return peers_.size();
  183. }
  184. bool DefaultPeerStorage::isPeerAvailable() {
  185. return getUnusedPeer();
  186. }
  187. namespace {
  188. class CollectActivePeer {
  189. private:
  190. std::vector<SharedHandle<Peer> >& activePeers_;
  191. public:
  192. CollectActivePeer(std::vector<SharedHandle<Peer> >& activePeers):
  193. activePeers_(activePeers) {}
  194. void operator()(const SharedHandle<Peer>& peer)
  195. {
  196. if(peer->isActive()) {
  197. activePeers_.push_back(peer);
  198. }
  199. }
  200. };
  201. } // namespace
  202. void DefaultPeerStorage::getActivePeers
  203. (std::vector<SharedHandle<Peer> >& activePeers)
  204. {
  205. std::for_each(peers_.begin(), peers_.end(), CollectActivePeer(activePeers));
  206. }
  207. namespace {
  208. TransferStat calculateStatFor(const SharedHandle<Peer>& peer)
  209. {
  210. TransferStat s;
  211. s.downloadSpeed = peer->calculateDownloadSpeed();
  212. s.uploadSpeed = peer->calculateUploadSpeed();
  213. s.sessionDownloadLength = peer->getSessionDownloadLength();
  214. s.sessionUploadLength = peer->getSessionUploadLength();
  215. return s;
  216. }
  217. } // namespace
  218. TransferStat DefaultPeerStorage::calculateStat()
  219. {
  220. TransferStat stat;
  221. if(lastTransferStatMapUpdated_.differenceInMillis(global::wallclock)+
  222. A2_DELTA_MILLIS >= 250) {
  223. A2_LOG_DEBUG("Updating TransferStat of PeerStorage");
  224. lastTransferStatMapUpdated_ = global::wallclock;
  225. peerTransferStatMap_.clear();
  226. std::vector<SharedHandle<Peer> > activePeers;
  227. getActivePeers(activePeers);
  228. for(std::vector<SharedHandle<Peer> >::const_iterator i =
  229. activePeers.begin(), eoi = activePeers.end(); i != eoi; ++i) {
  230. TransferStat s;
  231. s.downloadSpeed = (*i)->calculateDownloadSpeed();
  232. s.uploadSpeed = (*i)->calculateUploadSpeed();
  233. s.sessionDownloadLength = (*i)->getSessionDownloadLength();
  234. s.sessionUploadLength = (*i)->getSessionUploadLength();
  235. peerTransferStatMap_[(*i)->getID()] = calculateStatFor(*i);
  236. stat += s;
  237. }
  238. cachedTransferStat_ = stat;
  239. } else {
  240. stat = cachedTransferStat_;
  241. }
  242. stat.sessionDownloadLength += removedPeerSessionDownloadLength_;
  243. stat.sessionUploadLength += removedPeerSessionUploadLength_;
  244. stat.setAllTimeUploadLength(btRuntime_->getUploadLengthAtStartup()+
  245. stat.getSessionUploadLength());
  246. return stat;
  247. }
  248. void DefaultPeerStorage::updateTransferStatFor(const SharedHandle<Peer>& peer)
  249. {
  250. A2_LOG_DEBUG(fmt("Updating TransferStat for peer %s", peer->getID().c_str()));
  251. std::map<std::string, TransferStat>::iterator itr =
  252. peerTransferStatMap_.find(peer->getID());
  253. if(itr == peerTransferStatMap_.end()) {
  254. return;
  255. }
  256. cachedTransferStat_ -= (*itr).second;
  257. TransferStat s = calculateStatFor(peer);
  258. cachedTransferStat_ += s;
  259. (*itr).second = s;
  260. }
  261. TransferStat DefaultPeerStorage::getTransferStatFor
  262. (const SharedHandle<Peer>& peer)
  263. {
  264. std::map<std::string, TransferStat>::const_iterator itr =
  265. peerTransferStatMap_.find(peer->getID());
  266. if(itr == peerTransferStatMap_.end()) {
  267. return TransferStat();
  268. } else {
  269. return (*itr).second;
  270. }
  271. }
  272. void DefaultPeerStorage::deleteUnusedPeer(size_t delSize) {
  273. std::deque<SharedHandle<Peer> > temp;
  274. for(std::deque<SharedHandle<Peer> >::const_reverse_iterator itr =
  275. peers_.rbegin(), eoi = peers_.rend(); itr != eoi; ++itr) {
  276. const SharedHandle<Peer>& p = *itr;
  277. if(p->unused() && delSize > 0) {
  278. onErasingPeer(p);
  279. --delSize;
  280. } else {
  281. temp.push_front(p);
  282. }
  283. }
  284. peers_.swap(temp);
  285. }
  286. void DefaultPeerStorage::onErasingPeer(const SharedHandle<Peer>& peer) {}
  287. void DefaultPeerStorage::onReturningPeer(const SharedHandle<Peer>& peer)
  288. {
  289. if(peer->isActive()) {
  290. TransferStat removedStat(calculateStatFor(peer));
  291. removedPeerSessionDownloadLength_ += removedStat.getSessionDownloadLength();
  292. removedPeerSessionUploadLength_ += removedStat.getSessionUploadLength();
  293. cachedTransferStat_ -= removedStat;
  294. if(peer->isDisconnectedGracefully() && !peer->isIncomingPeer()) {
  295. peer->startBadCondition();
  296. addDroppedPeer(peer);
  297. }
  298. // Execute choking algorithm if unchoked and interested peer is
  299. // disconnected.
  300. if(!peer->amChoking() && peer->peerInterested()) {
  301. executeChoke();
  302. }
  303. }
  304. }
  305. void DefaultPeerStorage::returnPeer(const SharedHandle<Peer>& peer)
  306. {
  307. std::deque<SharedHandle<Peer> >::iterator itr =
  308. std::find_if(peers_.begin(), peers_.end(), derefEqual(peer));
  309. if(itr == peers_.end()) {
  310. A2_LOG_DEBUG(fmt("Cannot find peer %s:%u in PeerStorage.",
  311. peer->getIPAddress().c_str(), peer->getPort()));
  312. } else {
  313. peers_.erase(itr);
  314. onReturningPeer(peer);
  315. onErasingPeer(peer);
  316. }
  317. }
  318. bool DefaultPeerStorage::chokeRoundIntervalElapsed()
  319. {
  320. const time_t CHOKE_ROUND_INTERVAL = 10;
  321. if(pieceStorage_->downloadFinished()) {
  322. return seederStateChoke_->getLastRound().
  323. difference(global::wallclock) >= CHOKE_ROUND_INTERVAL;
  324. } else {
  325. return leecherStateChoke_->getLastRound().
  326. difference(global::wallclock) >= CHOKE_ROUND_INTERVAL;
  327. }
  328. }
  329. void DefaultPeerStorage::executeChoke()
  330. {
  331. std::vector<SharedHandle<Peer> > activePeers;
  332. getActivePeers(activePeers);
  333. if(pieceStorage_->downloadFinished()) {
  334. return seederStateChoke_->executeChoke(activePeers);
  335. } else {
  336. return leecherStateChoke_->executeChoke(activePeers);
  337. }
  338. }
  339. void DefaultPeerStorage::setPieceStorage(const SharedHandle<PieceStorage>& ps)
  340. {
  341. pieceStorage_ = ps;
  342. }
  343. void DefaultPeerStorage::setBtRuntime(const SharedHandle<BtRuntime>& btRuntime)
  344. {
  345. btRuntime_ = btRuntime;
  346. }
  347. } // namespace aria2