PeerInteractionCommand.cc 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. /* <!-- copyright */
  2. /*
  3. * aria2 - The high speed download utility
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * In addition, as a special exception, the copyright holders give
  22. * permission to link the code of portions of this program with the
  23. * OpenSSL library under certain conditions as described in each
  24. * individual source file, and distribute linked combinations
  25. * including the two.
  26. * You must obey the GNU General Public License in all respects
  27. * for all of the code used other than OpenSSL. If you modify
  28. * file(s) with this exception, you may extend this exception to your
  29. * version of the file(s), but you are not obligated to do so. If you
  30. * do not wish to do so, delete this exception statement from your
  31. * version. If you delete this exception statement from all source
  32. * files in the program, then also delete it here.
  33. */
  34. /* copyright --> */
  35. #include "PeerInteractionCommand.h"
  36. #include <algorithm>
  37. #include "DownloadEngine.h"
  38. #include "PeerInitiateConnectionCommand.h"
  39. #include "DefaultBtInteractive.h"
  40. #include "DlAbortEx.h"
  41. #include "message.h"
  42. #include "prefs.h"
  43. #include "SocketCore.h"
  44. #include "Option.h"
  45. #include "DownloadContext.h"
  46. #include "Peer.h"
  47. #include "BtMessage.h"
  48. #include "BtHandshakeMessage.h"
  49. #include "BtRuntime.h"
  50. #include "PeerStorage.h"
  51. #include "DefaultBtMessageDispatcher.h"
  52. #include "DefaultBtMessageReceiver.h"
  53. #include "DefaultBtRequestFactory.h"
  54. #include "DefaultBtMessageFactory.h"
  55. #include "DefaultBtInteractive.h"
  56. #include "PeerConnection.h"
  57. #include "ExtensionMessageFactory.h"
  58. #include "DHTRoutingTable.h"
  59. #include "DHTTaskQueue.h"
  60. #include "DHTTaskFactory.h"
  61. #include "DHTNode.h"
  62. #include "DHTRegistry.h"
  63. #include "DHTPeerAnnounceStorage.h"
  64. #include "DHTTokenTracker.h"
  65. #include "DHTMessageDispatcher.h"
  66. #include "DHTMessageReceiver.h"
  67. #include "DHTMessageFactory.h"
  68. #include "DHTMessageCallback.h"
  69. #include "PieceStorage.h"
  70. #include "RequestGroup.h"
  71. #include "DefaultExtensionMessageFactory.h"
  72. #include "RequestGroupMan.h"
  73. #include "ExtensionMessageRegistry.h"
  74. #include "bittorrent_helper.h"
  75. #include "UTMetadataRequestFactory.h"
  76. #include "UTMetadataRequestTracker.h"
  77. #include "BtRegistry.h"
  78. namespace aria2 {
  79. PeerInteractionCommand::PeerInteractionCommand(
  80. cuid_t cuid, RequestGroup* requestGroup, const std::shared_ptr<Peer>& p,
  81. DownloadEngine* e, const std::shared_ptr<BtRuntime>& btRuntime,
  82. const std::shared_ptr<PieceStorage>& pieceStorage,
  83. const std::shared_ptr<PeerStorage>& peerStorage,
  84. const std::shared_ptr<SocketCore>& s, Seq sequence,
  85. std::unique_ptr<PeerConnection> peerConnection)
  86. : PeerAbstractCommand{cuid, p, e, s},
  87. requestGroup_{requestGroup},
  88. btRuntime_{btRuntime},
  89. pieceStorage_{pieceStorage},
  90. peerStorage_{peerStorage},
  91. sequence_{sequence}
  92. {
  93. // TODO move following bunch of processing to separate method, like init()
  94. if (sequence_ == INITIATOR_SEND_HANDSHAKE) {
  95. disableReadCheckSocket();
  96. setWriteCheckSocket(getSocket());
  97. setTimeout(std::chrono::seconds(
  98. getOption()->getAsInt(PREF_PEER_CONNECTION_TIMEOUT)));
  99. }
  100. int family;
  101. unsigned char compact[COMPACT_LEN_IPV6];
  102. int compactlen = bittorrent::packcompact(compact, getPeer()->getIPAddress(),
  103. getPeer()->getPort());
  104. if (compactlen == COMPACT_LEN_IPV6) {
  105. family = AF_INET6;
  106. }
  107. else {
  108. family = AF_INET;
  109. }
  110. auto torrentAttrs =
  111. bittorrent::getTorrentAttrs(requestGroup_->getDownloadContext());
  112. bool metadataGetMode = torrentAttrs->metadata.empty();
  113. auto exMsgRegistry = make_unique<ExtensionMessageRegistry>();
  114. exMsgRegistry->setExtensionMessageID(ExtensionMessageRegistry::UT_PEX, 8);
  115. // http://www.bittorrent.org/beps/bep_0009.html
  116. exMsgRegistry->setExtensionMessageID(ExtensionMessageRegistry::UT_METADATA,
  117. 9);
  118. auto extensionMessageFactory = make_unique<DefaultExtensionMessageFactory>(
  119. getPeer(), exMsgRegistry.get());
  120. auto extensionMessageFactoryPtr = extensionMessageFactory.get();
  121. extensionMessageFactory->setPeerStorage(peerStorage.get());
  122. extensionMessageFactory->setDownloadContext(
  123. requestGroup_->getDownloadContext().get());
  124. // PieceStorage will be set later.
  125. auto factory = make_unique<DefaultBtMessageFactory>();
  126. auto factoryPtr = factory.get();
  127. factory->setCuid(cuid);
  128. factory->setDownloadContext(requestGroup_->getDownloadContext().get());
  129. factory->setPieceStorage(pieceStorage.get());
  130. factory->setPeerStorage(peerStorage.get());
  131. factory->setExtensionMessageFactory(extensionMessageFactory.get());
  132. factory->setPeer(getPeer());
  133. if (family == AF_INET) {
  134. factory->setLocalNode(DHTRegistry::getData().localNode.get());
  135. factory->setRoutingTable(DHTRegistry::getData().routingTable.get());
  136. factory->setTaskQueue(DHTRegistry::getData().taskQueue.get());
  137. factory->setTaskFactory(DHTRegistry::getData().taskFactory.get());
  138. }
  139. else {
  140. factory->setLocalNode(DHTRegistry::getData6().localNode.get());
  141. factory->setRoutingTable(DHTRegistry::getData6().routingTable.get());
  142. factory->setTaskQueue(DHTRegistry::getData6().taskQueue.get());
  143. factory->setTaskFactory(DHTRegistry::getData6().taskFactory.get());
  144. }
  145. if (metadataGetMode) {
  146. factory->enableMetadataGetMode();
  147. }
  148. if (!peerConnection) {
  149. peerConnection = make_unique<PeerConnection>(cuid, getPeer(), getSocket());
  150. }
  151. else {
  152. if (sequence_ == RECEIVER_WAIT_HANDSHAKE &&
  153. peerConnection->getBufferLength() > 0) {
  154. setStatus(Command::STATUS_ONESHOT_REALTIME);
  155. getDownloadEngine()->setNoWait(true);
  156. }
  157. }
  158. // If the number of pieces gets bigger, the length of Bitfield
  159. // message payload exceeds the initial buffer capacity of
  160. // PeerConnection, which is MAX_PAYLOAD_LEN. We expand buffer as
  161. // necessary so that PeerConnection can receive the Bitfield
  162. // message.
  163. size_t bitfieldPayloadSize =
  164. 1 + (requestGroup_->getDownloadContext()->getNumPieces() + 7) / 8;
  165. peerConnection->reserveBuffer(bitfieldPayloadSize);
  166. auto dispatcher = make_unique<DefaultBtMessageDispatcher>();
  167. auto dispatcherPtr = dispatcher.get();
  168. dispatcher->setCuid(cuid);
  169. dispatcher->setPeer(getPeer());
  170. dispatcher->setDownloadContext(requestGroup_->getDownloadContext().get());
  171. dispatcher->setRequestTimeout(
  172. std::chrono::seconds(getOption()->getAsInt(PREF_BT_REQUEST_TIMEOUT)));
  173. dispatcher->setBtMessageFactory(factory.get());
  174. dispatcher->setRequestGroupMan(
  175. getDownloadEngine()->getRequestGroupMan().get());
  176. dispatcher->setPeerConnection(peerConnection.get());
  177. auto receiver = make_unique<DefaultBtMessageReceiver>();
  178. receiver->setDownloadContext(requestGroup_->getDownloadContext().get());
  179. receiver->setPeerConnection(peerConnection.get());
  180. receiver->setDispatcher(dispatcher.get());
  181. receiver->setBtMessageFactory(factory.get());
  182. auto reqFactory = make_unique<DefaultBtRequestFactory>();
  183. reqFactory->setPeer(getPeer());
  184. reqFactory->setPieceStorage(pieceStorage.get());
  185. reqFactory->setBtMessageDispatcher(dispatcher.get());
  186. reqFactory->setBtMessageFactory(factory.get());
  187. reqFactory->setCuid(cuid);
  188. // reverse depends
  189. factory->setBtMessageDispatcher(dispatcher.get());
  190. factory->setBtRequestFactory(reqFactory.get());
  191. factory->setPeerConnection(peerConnection.get());
  192. extensionMessageFactory->setBtMessageDispatcher(dispatcher.get());
  193. extensionMessageFactory->setBtMessageFactory(factory.get());
  194. getPeer()->allocateSessionResource(
  195. requestGroup_->getDownloadContext()->getPieceLength(),
  196. requestGroup_->getDownloadContext()->getTotalLength());
  197. getPeer()->setBtMessageDispatcher(dispatcher.get());
  198. auto btInteractive = make_unique<DefaultBtInteractive>(
  199. requestGroup_->getDownloadContext(), getPeer());
  200. btInteractive->setBtRuntime(btRuntime_);
  201. btInteractive->setPieceStorage(pieceStorage_);
  202. btInteractive->setPeerStorage(peerStorage); // Note: Not a member variable.
  203. btInteractive->setCuid(cuid);
  204. btInteractive->setBtMessageReceiver(std::move(receiver));
  205. btInteractive->setDispatcher(std::move(dispatcher));
  206. btInteractive->setBtRequestFactory(std::move(reqFactory));
  207. btInteractive->setPeerConnection(std::move(peerConnection));
  208. btInteractive->setExtensionMessageFactory(std::move(extensionMessageFactory));
  209. btInteractive->setExtensionMessageRegistry(std::move(exMsgRegistry));
  210. btInteractive->setKeepAliveInterval(
  211. std::chrono::seconds(getOption()->getAsInt(PREF_BT_KEEP_ALIVE_INTERVAL)));
  212. btInteractive->setRequestGroupMan(
  213. getDownloadEngine()->getRequestGroupMan().get());
  214. btInteractive->setBtMessageFactory(std::move(factory));
  215. if ((metadataGetMode || !torrentAttrs->privateTorrent) &&
  216. !getPeer()->isLocalPeer()) {
  217. if (getOption()->getAsBool(PREF_ENABLE_PEER_EXCHANGE)) {
  218. btInteractive->setUTPexEnabled(true);
  219. }
  220. if (family == AF_INET) {
  221. if (DHTRegistry::isInitialized()) {
  222. btInteractive->setDHTEnabled(true);
  223. factoryPtr->setDHTEnabled(true);
  224. btInteractive->setLocalNode(DHTRegistry::getData().localNode.get());
  225. }
  226. }
  227. else {
  228. if (DHTRegistry::isInitialized6()) {
  229. btInteractive->setDHTEnabled(true);
  230. factoryPtr->setDHTEnabled(true);
  231. btInteractive->setLocalNode(DHTRegistry::getData6().localNode.get());
  232. }
  233. }
  234. }
  235. if (metadataGetMode) {
  236. auto utMetadataRequestFactory = make_unique<UTMetadataRequestFactory>();
  237. auto utMetadataRequestTracker = make_unique<UTMetadataRequestTracker>();
  238. utMetadataRequestFactory->setCuid(cuid);
  239. utMetadataRequestFactory->setDownloadContext(
  240. requestGroup_->getDownloadContext().get());
  241. utMetadataRequestFactory->setBtMessageDispatcher(dispatcherPtr);
  242. utMetadataRequestFactory->setBtMessageFactory(factoryPtr);
  243. utMetadataRequestFactory->setPeer(getPeer());
  244. utMetadataRequestFactory->setUTMetadataRequestTracker(
  245. utMetadataRequestTracker.get());
  246. extensionMessageFactoryPtr->setUTMetadataRequestTracker(
  247. utMetadataRequestTracker.get());
  248. btInteractive->setUTMetadataRequestFactory(
  249. std::move(utMetadataRequestFactory));
  250. btInteractive->setUTMetadataRequestTracker(
  251. std::move(utMetadataRequestTracker));
  252. }
  253. btInteractive->setTcpPort(e->getBtRegistry()->getTcpPort());
  254. if (metadataGetMode) {
  255. btInteractive->enableMetadataGetMode();
  256. }
  257. btInteractive_ = std::move(btInteractive);
  258. btRuntime_->increaseConnections();
  259. requestGroup_->increaseNumCommand();
  260. }
  261. PeerInteractionCommand::~PeerInteractionCommand()
  262. {
  263. if (getPeer()->getCompletedLength() > 0) {
  264. pieceStorage_->subtractPieceStats(getPeer()->getBitfield(),
  265. getPeer()->getBitfieldLength());
  266. }
  267. getPeer()->releaseSessionResource();
  268. requestGroup_->decreaseNumCommand();
  269. btRuntime_->decreaseConnections();
  270. }
  271. bool PeerInteractionCommand::executeInternal()
  272. {
  273. setNoCheck(false);
  274. bool done = false;
  275. while (!done) {
  276. switch (sequence_) {
  277. case INITIATOR_SEND_HANDSHAKE:
  278. if (!getSocket()->isWritable(0)) {
  279. done = true;
  280. break;
  281. }
  282. disableWriteCheckSocket();
  283. setReadCheckSocket(getSocket());
  284. // socket->setBlockingMode();
  285. setTimeout(std::chrono::seconds(getOption()->getAsInt(PREF_BT_TIMEOUT)));
  286. btInteractive_->initiateHandshake();
  287. sequence_ = INITIATOR_WAIT_HANDSHAKE;
  288. break;
  289. case INITIATOR_WAIT_HANDSHAKE: {
  290. if (btInteractive_->countPendingMessage() > 0) {
  291. btInteractive_->sendPendingMessage();
  292. if (btInteractive_->countPendingMessage() > 0) {
  293. done = true;
  294. break;
  295. }
  296. }
  297. auto handshakeMessage = btInteractive_->receiveHandshake();
  298. if (!handshakeMessage) {
  299. done = true;
  300. break;
  301. }
  302. btInteractive_->doPostHandshakeProcessing();
  303. sequence_ = WIRED;
  304. break;
  305. }
  306. case RECEIVER_WAIT_HANDSHAKE: {
  307. auto handshakeMessage = btInteractive_->receiveAndSendHandshake();
  308. if (!handshakeMessage) {
  309. done = true;
  310. break;
  311. }
  312. btInteractive_->doPostHandshakeProcessing();
  313. sequence_ = WIRED;
  314. break;
  315. }
  316. case WIRED:
  317. // See the comment for writable check below.
  318. disableWriteCheckSocket();
  319. btInteractive_->doInteractionProcessing();
  320. if (btInteractive_->countReceivedMessageInIteration() > 0) {
  321. updateKeepAlive();
  322. }
  323. if ((getPeer()->amInterested() && !getPeer()->peerChoking()) ||
  324. btInteractive_->countOutstandingRequest() ||
  325. (getPeer()->peerInterested() && !getPeer()->amChoking())) {
  326. // Writable check to avoid slow seeding
  327. if (btInteractive_->isSendingMessageInProgress()) {
  328. setWriteCheckSocket(getSocket());
  329. }
  330. if (getDownloadEngine()
  331. ->getRequestGroupMan()
  332. ->doesOverallDownloadSpeedExceed() ||
  333. requestGroup_->doesDownloadSpeedExceed()) {
  334. disableReadCheckSocket();
  335. setNoCheck(true);
  336. }
  337. else {
  338. setReadCheckSocket(getSocket());
  339. }
  340. }
  341. else {
  342. disableReadCheckSocket();
  343. }
  344. done = true;
  345. break;
  346. }
  347. }
  348. if (btInteractive_->countPendingMessage() > 0) {
  349. setNoCheck(true);
  350. }
  351. addCommandSelf();
  352. return false;
  353. }
  354. // TODO this method removed when PeerBalancerCommand is implemented
  355. bool PeerInteractionCommand::prepareForNextPeer(time_t wait)
  356. {
  357. if (peerStorage_->isPeerAvailable() && btRuntime_->lessThanEqMinPeers()) {
  358. cuid_t ncuid = getDownloadEngine()->newCUID();
  359. std::shared_ptr<Peer> peer = peerStorage_->checkoutPeer(ncuid);
  360. // sanity check
  361. if (peer) {
  362. auto command = make_unique<PeerInitiateConnectionCommand>(
  363. ncuid, requestGroup_, peer, getDownloadEngine(), btRuntime_);
  364. command->setPeerStorage(peerStorage_);
  365. command->setPieceStorage(pieceStorage_);
  366. getDownloadEngine()->addCommand(std::move(command));
  367. }
  368. }
  369. return true;
  370. }
  371. void PeerInteractionCommand::onAbort()
  372. {
  373. btInteractive_->cancelAllPiece();
  374. peerStorage_->returnPeer(getPeer());
  375. }
  376. void PeerInteractionCommand::onFailure(const Exception& err)
  377. {
  378. requestGroup_->setLastErrorCode(err.getErrorCode(), err.what());
  379. requestGroup_->setHaltRequested(true);
  380. getDownloadEngine()->setRefreshInterval(std::chrono::milliseconds(0));
  381. }
  382. bool PeerInteractionCommand::exitBeforeExecute()
  383. {
  384. return btRuntime_->isHalt();
  385. }
  386. const std::shared_ptr<Option>& PeerInteractionCommand::getOption() const
  387. {
  388. return requestGroup_->getOption();
  389. }
  390. } // namespace aria2