PeerInteractionCommand.cc 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. /* <!-- copyright */
  2. /*
  3. * aria2 - The high speed download utility
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * In addition, as a special exception, the copyright holders give
  22. * permission to link the code of portions of this program with the
  23. * OpenSSL library under certain conditions as described in each
  24. * individual source file, and distribute linked combinations
  25. * including the two.
  26. * You must obey the GNU General Public License in all respects
  27. * for all of the code used other than OpenSSL. If you modify
  28. * file(s) with this exception, you may extend this exception to your
  29. * version of the file(s), but you are not obligated to do so. If you
  30. * do not wish to do so, delete this exception statement from your
  31. * version. If you delete this exception statement from all source
  32. * files in the program, then also delete it here.
  33. */
  34. /* copyright --> */
  35. #include "PeerInteractionCommand.h"
  36. #include <algorithm>
  37. #include "DownloadEngine.h"
  38. #include "PeerInitiateConnectionCommand.h"
  39. #include "DefaultBtInteractive.h"
  40. #include "DlAbortEx.h"
  41. #include "message.h"
  42. #include "prefs.h"
  43. #include "SocketCore.h"
  44. #include "Option.h"
  45. #include "DownloadContext.h"
  46. #include "Peer.h"
  47. #include "BtMessage.h"
  48. #include "BtRuntime.h"
  49. #include "PeerStorage.h"
  50. #include "DefaultBtMessageDispatcher.h"
  51. #include "DefaultBtMessageReceiver.h"
  52. #include "DefaultBtRequestFactory.h"
  53. #include "DefaultBtMessageFactory.h"
  54. #include "DefaultBtInteractive.h"
  55. #include "PeerConnection.h"
  56. #include "ExtensionMessageFactory.h"
  57. #include "DHTRoutingTable.h"
  58. #include "DHTTaskQueue.h"
  59. #include "DHTTaskFactory.h"
  60. #include "DHTNode.h"
  61. #include "DHTRegistry.h"
  62. #include "DHTPeerAnnounceStorage.h"
  63. #include "DHTTokenTracker.h"
  64. #include "DHTMessageDispatcher.h"
  65. #include "DHTMessageReceiver.h"
  66. #include "DHTMessageFactory.h"
  67. #include "DHTMessageCallback.h"
  68. #include "PieceStorage.h"
  69. #include "RequestGroup.h"
  70. #include "DefaultExtensionMessageFactory.h"
  71. #include "RequestGroupMan.h"
  72. #include "ExtensionMessageRegistry.h"
  73. #include "bittorrent_helper.h"
  74. #include "UTMetadataRequestFactory.h"
  75. #include "UTMetadataRequestTracker.h"
  76. #include "BtRegistry.h"
  77. namespace aria2 {
  78. PeerInteractionCommand::PeerInteractionCommand
  79. (cuid_t cuid,
  80. RequestGroup* requestGroup,
  81. const std::shared_ptr<Peer>& p,
  82. DownloadEngine* e,
  83. const std::shared_ptr<BtRuntime>& btRuntime,
  84. const std::shared_ptr<PieceStorage>& pieceStorage,
  85. const std::shared_ptr<PeerStorage>& peerStorage,
  86. const std::shared_ptr<SocketCore>& s,
  87. Seq sequence,
  88. const std::shared_ptr<PeerConnection>& passedPeerConnection)
  89. :PeerAbstractCommand(cuid, p, e, s),
  90. requestGroup_(requestGroup),
  91. btRuntime_(btRuntime),
  92. pieceStorage_(pieceStorage),
  93. peerStorage_(peerStorage),
  94. sequence_(sequence)
  95. {
  96. // TODO move following bunch of processing to separate method, like init()
  97. if(sequence_ == INITIATOR_SEND_HANDSHAKE) {
  98. disableReadCheckSocket();
  99. setWriteCheckSocket(getSocket());
  100. setTimeout(getOption()->getAsInt(PREF_PEER_CONNECTION_TIMEOUT));
  101. }
  102. int family;
  103. unsigned char compact[COMPACT_LEN_IPV6];
  104. int compactlen = bittorrent::packcompact
  105. (compact, getPeer()->getIPAddress(), getPeer()->getPort());
  106. if(compactlen == COMPACT_LEN_IPV6) {
  107. family = AF_INET6;
  108. } else {
  109. family = AF_INET;
  110. }
  111. auto torrentAttrs =
  112. bittorrent::getTorrentAttrs(requestGroup_->getDownloadContext());
  113. bool metadataGetMode = torrentAttrs->metadata.empty();
  114. std::shared_ptr<ExtensionMessageRegistry> exMsgRegistry
  115. (new ExtensionMessageRegistry());
  116. exMsgRegistry->setExtensionMessageID(ExtensionMessageRegistry::UT_PEX, 8);
  117. // http://www.bittorrent.org/beps/bep_0009.html
  118. exMsgRegistry->setExtensionMessageID(ExtensionMessageRegistry::UT_METADATA,
  119. 9);
  120. std::shared_ptr<UTMetadataRequestFactory> utMetadataRequestFactory;
  121. std::shared_ptr<UTMetadataRequestTracker> utMetadataRequestTracker;
  122. if(metadataGetMode) {
  123. utMetadataRequestFactory.reset(new UTMetadataRequestFactory());
  124. utMetadataRequestTracker.reset(new UTMetadataRequestTracker());
  125. }
  126. DefaultExtensionMessageFactory* extensionMessageFactoryPtr
  127. (new DefaultExtensionMessageFactory(getPeer(), exMsgRegistry));
  128. extensionMessageFactoryPtr->setPeerStorage(peerStorage);
  129. extensionMessageFactoryPtr->setDownloadContext
  130. (requestGroup_->getDownloadContext());
  131. extensionMessageFactoryPtr->setUTMetadataRequestTracker
  132. (utMetadataRequestTracker.get());
  133. // PieceStorage will be set later.
  134. std::shared_ptr<ExtensionMessageFactory> extensionMessageFactory
  135. (extensionMessageFactoryPtr);
  136. DefaultBtMessageFactory* factoryPtr(new DefaultBtMessageFactory());
  137. factoryPtr->setCuid(cuid);
  138. factoryPtr->setDownloadContext(requestGroup_->getDownloadContext().get());
  139. factoryPtr->setPieceStorage(pieceStorage.get());
  140. factoryPtr->setPeerStorage(peerStorage.get());
  141. factoryPtr->setExtensionMessageFactory(extensionMessageFactory);
  142. factoryPtr->setPeer(getPeer());
  143. if(family == AF_INET) {
  144. factoryPtr->setLocalNode(DHTRegistry::getData().localNode.get());
  145. factoryPtr->setRoutingTable(DHTRegistry::getData().routingTable.get());
  146. factoryPtr->setTaskQueue(DHTRegistry::getData().taskQueue.get());
  147. factoryPtr->setTaskFactory(DHTRegistry::getData().taskFactory.get());
  148. } else {
  149. factoryPtr->setLocalNode(DHTRegistry::getData6().localNode.get());
  150. factoryPtr->setRoutingTable(DHTRegistry::getData6().routingTable.get());
  151. factoryPtr->setTaskQueue(DHTRegistry::getData6().taskQueue.get());
  152. factoryPtr->setTaskFactory(DHTRegistry::getData6().taskFactory.get());
  153. }
  154. if(metadataGetMode) {
  155. factoryPtr->enableMetadataGetMode();
  156. }
  157. std::shared_ptr<BtMessageFactory> factory(factoryPtr);
  158. std::shared_ptr<PeerConnection> peerConnection;
  159. if(!passedPeerConnection) {
  160. peerConnection.reset(new PeerConnection(cuid, getPeer(), getSocket()));
  161. } else {
  162. peerConnection = passedPeerConnection;
  163. if(sequence_ == RECEIVER_WAIT_HANDSHAKE &&
  164. peerConnection->getBufferLength() > 0) {
  165. setStatus(Command::STATUS_ONESHOT_REALTIME);
  166. getDownloadEngine()->setNoWait(true);
  167. }
  168. }
  169. // If the number of pieces gets bigger, the length of Bitfield
  170. // message payload exceeds the initial buffer capacity of
  171. // PeerConnection, which is MAX_PAYLOAD_LEN. We expand buffer as
  172. // necessary so that PeerConnection can receive the Bitfield
  173. // message.
  174. size_t bitfieldPayloadSize =
  175. 1+(requestGroup_->getDownloadContext()->getNumPieces()+7)/8;
  176. peerConnection->reserveBuffer(bitfieldPayloadSize);
  177. DefaultBtMessageDispatcher* dispatcherPtr(new DefaultBtMessageDispatcher());
  178. dispatcherPtr->setCuid(cuid);
  179. dispatcherPtr->setPeer(getPeer());
  180. dispatcherPtr->setDownloadContext(requestGroup_->getDownloadContext());
  181. dispatcherPtr->setPieceStorage(pieceStorage);
  182. dispatcherPtr->setPeerStorage(peerStorage);
  183. dispatcherPtr->setRequestTimeout(getOption()->
  184. getAsInt(PREF_BT_REQUEST_TIMEOUT));
  185. dispatcherPtr->setBtMessageFactory(factoryPtr);
  186. dispatcherPtr->setRequestGroupMan
  187. (getDownloadEngine()->getRequestGroupMan().get());
  188. dispatcherPtr->setPeerConnection(peerConnection);
  189. std::shared_ptr<BtMessageDispatcher> dispatcher(dispatcherPtr);
  190. DefaultBtMessageReceiver* receiverPtr(new DefaultBtMessageReceiver());
  191. receiverPtr->setDownloadContext(requestGroup_->getDownloadContext());
  192. receiverPtr->setPeerConnection(peerConnection.get());
  193. receiverPtr->setDispatcher(dispatcherPtr);
  194. receiverPtr->setBtMessageFactory(factoryPtr);
  195. std::shared_ptr<BtMessageReceiver> receiver(receiverPtr);
  196. DefaultBtRequestFactory* reqFactoryPtr(new DefaultBtRequestFactory());
  197. reqFactoryPtr->setPeer(getPeer());
  198. reqFactoryPtr->setPieceStorage(pieceStorage);
  199. reqFactoryPtr->setBtMessageDispatcher(dispatcherPtr);
  200. reqFactoryPtr->setBtMessageFactory(factoryPtr);
  201. reqFactoryPtr->setCuid(cuid);
  202. std::shared_ptr<BtRequestFactory> reqFactory(reqFactoryPtr);
  203. DefaultBtInteractive* btInteractivePtr
  204. (new DefaultBtInteractive(requestGroup_->getDownloadContext(), getPeer()));
  205. btInteractivePtr->setBtRuntime(btRuntime_);
  206. btInteractivePtr->setPieceStorage(pieceStorage_);
  207. btInteractivePtr->setPeerStorage(peerStorage); // Note: Not a member variable.
  208. btInteractivePtr->setCuid(cuid);
  209. btInteractivePtr->setBtMessageReceiver(receiver);
  210. btInteractivePtr->setDispatcher(dispatcher);
  211. btInteractivePtr->setBtRequestFactory(reqFactory);
  212. btInteractivePtr->setPeerConnection(peerConnection);
  213. btInteractivePtr->setExtensionMessageFactory(extensionMessageFactory);
  214. btInteractivePtr->setExtensionMessageRegistry(exMsgRegistry);
  215. btInteractivePtr->setKeepAliveInterval
  216. (getOption()->getAsInt(PREF_BT_KEEP_ALIVE_INTERVAL));
  217. btInteractivePtr->setRequestGroupMan
  218. (getDownloadEngine()->getRequestGroupMan().get());
  219. btInteractivePtr->setBtMessageFactory(factory);
  220. if((metadataGetMode || !torrentAttrs->privateTorrent) &&
  221. !getPeer()->isLocalPeer()) {
  222. if(getOption()->getAsBool(PREF_ENABLE_PEER_EXCHANGE)) {
  223. btInteractivePtr->setUTPexEnabled(true);
  224. }
  225. if(family == AF_INET) {
  226. if(DHTRegistry::isInitialized()) {
  227. btInteractivePtr->setDHTEnabled(true);
  228. factoryPtr->setDHTEnabled(true);
  229. btInteractivePtr->setLocalNode(DHTRegistry::getData().localNode.get());
  230. }
  231. } else {
  232. if(DHTRegistry::isInitialized6()) {
  233. btInteractivePtr->setDHTEnabled(true);
  234. factoryPtr->setDHTEnabled(true);
  235. btInteractivePtr->setLocalNode(DHTRegistry::getData6().localNode.get());
  236. }
  237. }
  238. }
  239. btInteractivePtr->setUTMetadataRequestFactory(utMetadataRequestFactory);
  240. btInteractivePtr->setUTMetadataRequestTracker(utMetadataRequestTracker);
  241. btInteractivePtr->setTcpPort(e->getBtRegistry()->getTcpPort());
  242. if(metadataGetMode) {
  243. btInteractivePtr->enableMetadataGetMode();
  244. }
  245. std::shared_ptr<BtInteractive> btInteractive(btInteractivePtr);
  246. btInteractive_ = btInteractive;
  247. // reverse depends
  248. factoryPtr->setBtMessageDispatcher(dispatcherPtr);
  249. factoryPtr->setBtRequestFactory(reqFactoryPtr);
  250. factoryPtr->setPeerConnection(peerConnection.get());
  251. extensionMessageFactoryPtr->setBtMessageDispatcher(dispatcherPtr);
  252. extensionMessageFactoryPtr->setBtMessageFactory(factoryPtr);
  253. if(metadataGetMode) {
  254. utMetadataRequestFactory->setCuid(cuid);
  255. utMetadataRequestFactory->setDownloadContext
  256. (requestGroup_->getDownloadContext());
  257. utMetadataRequestFactory->setBtMessageDispatcher(dispatcherPtr);
  258. utMetadataRequestFactory->setBtMessageFactory(factoryPtr);
  259. utMetadataRequestFactory->setPeer(getPeer());
  260. utMetadataRequestFactory->setUTMetadataRequestTracker
  261. (utMetadataRequestTracker.get());
  262. }
  263. getPeer()->allocateSessionResource
  264. (requestGroup_->getDownloadContext()->getPieceLength(),
  265. requestGroup_->getDownloadContext()->getTotalLength());
  266. getPeer()->setBtMessageDispatcher(dispatcherPtr);
  267. btRuntime_->increaseConnections();
  268. requestGroup_->increaseNumCommand();
  269. }
  270. PeerInteractionCommand::~PeerInteractionCommand() {
  271. if(getPeer()->getCompletedLength() > 0) {
  272. pieceStorage_->subtractPieceStats(getPeer()->getBitfield(),
  273. getPeer()->getBitfieldLength());
  274. }
  275. getPeer()->releaseSessionResource();
  276. requestGroup_->decreaseNumCommand();
  277. btRuntime_->decreaseConnections();
  278. }
  279. bool PeerInteractionCommand::executeInternal() {
  280. setNoCheck(false);
  281. bool done = false;
  282. while(!done) {
  283. switch(sequence_) {
  284. case INITIATOR_SEND_HANDSHAKE:
  285. if(!getSocket()->isWritable(0)) {
  286. done = true;
  287. break;
  288. }
  289. disableWriteCheckSocket();
  290. setReadCheckSocket(getSocket());
  291. //socket->setBlockingMode();
  292. setTimeout(getOption()->getAsInt(PREF_BT_TIMEOUT));
  293. btInteractive_->initiateHandshake();
  294. sequence_ = INITIATOR_WAIT_HANDSHAKE;
  295. break;
  296. case INITIATOR_WAIT_HANDSHAKE: {
  297. if(btInteractive_->countPendingMessage() > 0) {
  298. btInteractive_->sendPendingMessage();
  299. if(btInteractive_->countPendingMessage() > 0) {
  300. done = true;
  301. break;
  302. }
  303. }
  304. std::shared_ptr<BtMessage> handshakeMessage =
  305. btInteractive_->receiveHandshake();
  306. if(!handshakeMessage) {
  307. done = true;
  308. break;
  309. }
  310. btInteractive_->doPostHandshakeProcessing();
  311. sequence_ = WIRED;
  312. break;
  313. }
  314. case RECEIVER_WAIT_HANDSHAKE: {
  315. std::shared_ptr<BtMessage> handshakeMessage =
  316. btInteractive_->receiveAndSendHandshake();
  317. if(!handshakeMessage) {
  318. done = true;
  319. break;
  320. }
  321. btInteractive_->doPostHandshakeProcessing();
  322. sequence_ = WIRED;
  323. break;
  324. }
  325. case WIRED:
  326. // See the comment for writable check below.
  327. disableWriteCheckSocket();
  328. btInteractive_->doInteractionProcessing();
  329. if(btInteractive_->countReceivedMessageInIteration() > 0) {
  330. updateKeepAlive();
  331. }
  332. if((getPeer()->amInterested() && !getPeer()->peerChoking()) ||
  333. btInteractive_->countOutstandingRequest() ||
  334. (getPeer()->peerInterested() && !getPeer()->amChoking())) {
  335. // Writable check to avoid slow seeding
  336. if(btInteractive_->isSendingMessageInProgress()) {
  337. setWriteCheckSocket(getSocket());
  338. }
  339. if(getDownloadEngine()->getRequestGroupMan()->
  340. doesOverallDownloadSpeedExceed() ||
  341. requestGroup_->doesDownloadSpeedExceed()) {
  342. disableReadCheckSocket();
  343. setNoCheck(true);
  344. } else {
  345. setReadCheckSocket(getSocket());
  346. }
  347. } else {
  348. disableReadCheckSocket();
  349. }
  350. done = true;
  351. break;
  352. }
  353. }
  354. if(btInteractive_->countPendingMessage() > 0) {
  355. setNoCheck(true);
  356. }
  357. getDownloadEngine()->addCommand(this);
  358. return false;
  359. }
  360. // TODO this method removed when PeerBalancerCommand is implemented
  361. bool PeerInteractionCommand::prepareForNextPeer(time_t wait) {
  362. if(peerStorage_->isPeerAvailable() && btRuntime_->lessThanEqMinPeers()) {
  363. cuid_t ncuid = getDownloadEngine()->newCUID();
  364. std::shared_ptr<Peer> peer = peerStorage_->checkoutPeer(ncuid);
  365. // sanity check
  366. if(peer) {
  367. PeerInitiateConnectionCommand* command;
  368. command = new PeerInitiateConnectionCommand(ncuid, requestGroup_, peer,
  369. getDownloadEngine(),
  370. btRuntime_);
  371. command->setPeerStorage(peerStorage_);
  372. command->setPieceStorage(pieceStorage_);
  373. getDownloadEngine()->addCommand(command);
  374. }
  375. }
  376. return true;
  377. }
  378. void PeerInteractionCommand::onAbort() {
  379. btInteractive_->cancelAllPiece();
  380. peerStorage_->returnPeer(getPeer());
  381. }
  382. void PeerInteractionCommand::onFailure(const Exception& err)
  383. {
  384. requestGroup_->setLastErrorCode(err.getErrorCode());
  385. requestGroup_->setHaltRequested(true);
  386. getDownloadEngine()->setRefreshInterval(0);
  387. }
  388. bool PeerInteractionCommand::exitBeforeExecute()
  389. {
  390. return btRuntime_->isHalt();
  391. }
  392. const std::shared_ptr<Option>& PeerInteractionCommand::getOption() const
  393. {
  394. return requestGroup_->getOption();
  395. }
  396. } // namespace aria2