PeerInteractionCommand.cc 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. /* <!-- copyright */
  2. /*
  3. * aria2 - The high speed download utility
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * In addition, as a special exception, the copyright holders give
  22. * permission to link the code of portions of this program with the
  23. * OpenSSL library under certain conditions as described in each
  24. * individual source file, and distribute linked combinations
  25. * including the two.
  26. * You must obey the GNU General Public License in all respects
  27. * for all of the code used other than OpenSSL. If you modify
  28. * file(s) with this exception, you may extend this exception to your
  29. * version of the file(s), but you are not obligated to do so. If you
  30. * do not wish to do so, delete this exception statement from your
  31. * version. If you delete this exception statement from all source
  32. * files in the program, then also delete it here.
  33. */
  34. /* copyright --> */
  35. #include "PeerInteractionCommand.h"
  36. #include <algorithm>
  37. #include "DownloadEngine.h"
  38. #include "PeerInitiateConnectionCommand.h"
  39. #include "DefaultBtInteractive.h"
  40. #include "DlAbortEx.h"
  41. #include "message.h"
  42. #include "prefs.h"
  43. #include "Socket.h"
  44. #include "Option.h"
  45. #include "DownloadContext.h"
  46. #include "Peer.h"
  47. #include "BtMessage.h"
  48. #include "BtRuntime.h"
  49. #include "PeerStorage.h"
  50. #include "DefaultBtMessageDispatcher.h"
  51. #include "DefaultBtMessageReceiver.h"
  52. #include "DefaultBtRequestFactory.h"
  53. #include "DefaultBtMessageFactory.h"
  54. #include "DefaultBtInteractive.h"
  55. #include "PeerConnection.h"
  56. #include "ExtensionMessageFactory.h"
  57. #include "DHTRoutingTable.h"
  58. #include "DHTTaskQueue.h"
  59. #include "DHTTaskFactory.h"
  60. #include "DHTNode.h"
  61. #include "DHTRegistry.h"
  62. #include "DHTPeerAnnounceStorage.h"
  63. #include "DHTTokenTracker.h"
  64. #include "DHTMessageDispatcher.h"
  65. #include "DHTMessageReceiver.h"
  66. #include "DHTMessageFactory.h"
  67. #include "DHTMessageCallback.h"
  68. #include "PieceStorage.h"
  69. #include "RequestGroup.h"
  70. #include "DefaultExtensionMessageFactory.h"
  71. #include "RequestGroupMan.h"
  72. #include "ExtensionMessageRegistry.h"
  73. #include "bittorrent_helper.h"
  74. #include "UTMetadataRequestFactory.h"
  75. #include "UTMetadataRequestTracker.h"
  76. namespace aria2 {
  77. PeerInteractionCommand::PeerInteractionCommand
  78. (cuid_t cuid,
  79. RequestGroup* requestGroup,
  80. const SharedHandle<Peer>& p,
  81. DownloadEngine* e,
  82. const SharedHandle<BtRuntime>& btRuntime,
  83. const SharedHandle<PieceStorage>& pieceStorage,
  84. const SharedHandle<PeerStorage>& peerStorage,
  85. const SocketHandle& s,
  86. Seq sequence,
  87. const PeerConnectionHandle& passedPeerConnection)
  88. :PeerAbstractCommand(cuid, p, e, s),
  89. requestGroup_(requestGroup),
  90. btRuntime_(btRuntime),
  91. pieceStorage_(pieceStorage),
  92. peerStorage_(peerStorage),
  93. sequence_(sequence)
  94. {
  95. // TODO move following bunch of processing to separate method, like init()
  96. if(sequence_ == INITIATOR_SEND_HANDSHAKE) {
  97. disableReadCheckSocket();
  98. setWriteCheckSocket(getSocket());
  99. setTimeout(getOption()->getAsInt(PREF_PEER_CONNECTION_TIMEOUT));
  100. }
  101. int family;
  102. unsigned char compact[COMPACT_LEN_IPV6];
  103. int compactlen = bittorrent::packcompact
  104. (compact, getPeer()->getIPAddress(), getPeer()->getPort());
  105. if(compactlen == COMPACT_LEN_IPV6) {
  106. family = AF_INET6;
  107. } else {
  108. family = AF_INET;
  109. }
  110. SharedHandle<TorrentAttribute> torrentAttrs =
  111. bittorrent::getTorrentAttrs(requestGroup_->getDownloadContext());
  112. bool metadataGetMode = torrentAttrs->metadata.empty();
  113. SharedHandle<ExtensionMessageRegistry> exMsgRegistry
  114. (new ExtensionMessageRegistry());
  115. SharedHandle<UTMetadataRequestFactory> utMetadataRequestFactory;
  116. SharedHandle<UTMetadataRequestTracker> utMetadataRequestTracker;
  117. if(metadataGetMode) {
  118. utMetadataRequestFactory.reset(new UTMetadataRequestFactory());
  119. utMetadataRequestTracker.reset(new UTMetadataRequestTracker());
  120. }
  121. SharedHandle<DefaultExtensionMessageFactory> extensionMessageFactory
  122. (new DefaultExtensionMessageFactory(getPeer(), exMsgRegistry));
  123. extensionMessageFactory->setPeerStorage(peerStorage);
  124. extensionMessageFactory->setDownloadContext
  125. (requestGroup_->getDownloadContext());
  126. extensionMessageFactory->setUTMetadataRequestTracker
  127. (utMetadataRequestTracker.get());
  128. // PieceStorage will be set later.
  129. SharedHandle<DefaultBtMessageFactory> factory(new DefaultBtMessageFactory());
  130. factory->setCuid(cuid);
  131. factory->setDownloadContext(requestGroup_->getDownloadContext());
  132. factory->setPieceStorage(pieceStorage);
  133. factory->setPeerStorage(peerStorage);
  134. factory->setExtensionMessageFactory(extensionMessageFactory);
  135. factory->setPeer(getPeer());
  136. if(family == AF_INET) {
  137. factory->setLocalNode(DHTRegistry::getData().localNode.get());
  138. factory->setRoutingTable(DHTRegistry::getData().routingTable.get());
  139. factory->setTaskQueue(DHTRegistry::getData().taskQueue.get());
  140. factory->setTaskFactory(DHTRegistry::getData().taskFactory.get());
  141. } else {
  142. factory->setLocalNode(DHTRegistry::getData6().localNode.get());
  143. factory->setRoutingTable(DHTRegistry::getData6().routingTable.get());
  144. factory->setTaskQueue(DHTRegistry::getData6().taskQueue.get());
  145. factory->setTaskFactory(DHTRegistry::getData6().taskFactory.get());
  146. }
  147. if(metadataGetMode) {
  148. factory->enableMetadataGetMode();
  149. }
  150. PeerConnectionHandle peerConnection;
  151. if(!passedPeerConnection) {
  152. peerConnection.reset(new PeerConnection(cuid, getPeer(), getSocket()));
  153. } else {
  154. peerConnection = passedPeerConnection;
  155. if(sequence_ == RECEIVER_WAIT_HANDSHAKE &&
  156. peerConnection->getBufferLength() > 0) {
  157. setStatus(Command::STATUS_ONESHOT_REALTIME);
  158. getDownloadEngine()->setNoWait(true);
  159. }
  160. }
  161. SharedHandle<DefaultBtMessageDispatcher> dispatcher
  162. (new DefaultBtMessageDispatcher());
  163. dispatcher->setCuid(cuid);
  164. dispatcher->setPeer(getPeer());
  165. dispatcher->setDownloadContext(requestGroup_->getDownloadContext());
  166. dispatcher->setPieceStorage(pieceStorage);
  167. dispatcher->setPeerStorage(peerStorage);
  168. dispatcher->setRequestTimeout(getOption()->getAsInt(PREF_BT_REQUEST_TIMEOUT));
  169. dispatcher->setBtMessageFactory(factory.get());
  170. dispatcher->setRequestGroupMan
  171. (getDownloadEngine()->getRequestGroupMan().get());
  172. DefaultBtMessageReceiverHandle receiver(new DefaultBtMessageReceiver());
  173. receiver->setDownloadContext(requestGroup_->getDownloadContext());
  174. receiver->setPeerConnection(peerConnection.get());
  175. receiver->setDispatcher(dispatcher.get());
  176. receiver->setBtMessageFactory(factory.get());
  177. SharedHandle<DefaultBtRequestFactory> reqFactory
  178. (new DefaultBtRequestFactory());
  179. reqFactory->setPeer(getPeer());
  180. reqFactory->setPieceStorage(pieceStorage);
  181. reqFactory->setBtMessageDispatcher(dispatcher.get());
  182. reqFactory->setBtMessageFactory(factory.get());
  183. DefaultBtInteractiveHandle btInteractive
  184. (new DefaultBtInteractive(requestGroup_->getDownloadContext(), getPeer()));
  185. btInteractive->setBtRuntime(btRuntime_);
  186. btInteractive->setPieceStorage(pieceStorage_);
  187. btInteractive->setPeerStorage(peerStorage); // Note: Not a member variable.
  188. btInteractive->setCuid(cuid);
  189. btInteractive->setBtMessageReceiver(receiver);
  190. btInteractive->setDispatcher(dispatcher);
  191. btInteractive->setBtRequestFactory(reqFactory);
  192. btInteractive->setPeerConnection(peerConnection);
  193. btInteractive->setExtensionMessageFactory(extensionMessageFactory);
  194. btInteractive->setExtensionMessageRegistry(exMsgRegistry);
  195. btInteractive->setKeepAliveInterval
  196. (getOption()->getAsInt(PREF_BT_KEEP_ALIVE_INTERVAL));
  197. btInteractive->setRequestGroupMan
  198. (getDownloadEngine()->getRequestGroupMan().get());
  199. btInteractive->setBtMessageFactory(factory);
  200. if((metadataGetMode || !torrentAttrs->privateTorrent) &&
  201. !getPeer()->isLocalPeer()) {
  202. if(getOption()->getAsBool(PREF_ENABLE_PEER_EXCHANGE)) {
  203. btInteractive->setUTPexEnabled(true);
  204. }
  205. if(family == AF_INET) {
  206. if(DHTRegistry::isInitialized()) {
  207. btInteractive->setDHTEnabled(true);
  208. factory->setDHTEnabled(true);
  209. btInteractive->setLocalNode(DHTRegistry::getData().localNode.get());
  210. }
  211. } else {
  212. if(DHTRegistry::isInitialized6()) {
  213. btInteractive->setDHTEnabled(true);
  214. factory->setDHTEnabled(true);
  215. btInteractive->setLocalNode(DHTRegistry::getData6().localNode.get());
  216. }
  217. }
  218. }
  219. btInteractive->setUTMetadataRequestFactory(utMetadataRequestFactory);
  220. btInteractive->setUTMetadataRequestTracker(utMetadataRequestTracker);
  221. if(metadataGetMode) {
  222. btInteractive->enableMetadataGetMode();
  223. }
  224. btInteractive_ = btInteractive;
  225. // reverse depends
  226. factory->setBtMessageDispatcher(dispatcher.get());
  227. factory->setBtRequestFactory(reqFactory.get());
  228. factory->setPeerConnection(peerConnection.get());
  229. extensionMessageFactory->setBtMessageDispatcher(dispatcher.get());
  230. extensionMessageFactory->setBtMessageFactory(factory.get());
  231. if(metadataGetMode) {
  232. utMetadataRequestFactory->setDownloadContext
  233. (requestGroup_->getDownloadContext());
  234. utMetadataRequestFactory->setBtMessageDispatcher(dispatcher.get());
  235. utMetadataRequestFactory->setBtMessageFactory(factory.get());
  236. utMetadataRequestFactory->setPeer(getPeer());
  237. utMetadataRequestFactory->setUTMetadataRequestTracker
  238. (utMetadataRequestTracker.get());
  239. }
  240. getPeer()->allocateSessionResource
  241. (requestGroup_->getDownloadContext()->getPieceLength(),
  242. requestGroup_->getDownloadContext()->getTotalLength());
  243. getPeer()->setBtMessageDispatcher(dispatcher.get());
  244. btRuntime_->increaseConnections();
  245. requestGroup_->increaseNumCommand();
  246. }
  247. PeerInteractionCommand::~PeerInteractionCommand() {
  248. if(getPeer()->getCompletedLength() > 0) {
  249. pieceStorage_->subtractPieceStats(getPeer()->getBitfield(),
  250. getPeer()->getBitfieldLength());
  251. }
  252. getPeer()->releaseSessionResource();
  253. requestGroup_->decreaseNumCommand();
  254. btRuntime_->decreaseConnections();
  255. }
  256. bool PeerInteractionCommand::executeInternal() {
  257. setNoCheck(false);
  258. bool done = false;
  259. while(!done) {
  260. switch(sequence_) {
  261. case INITIATOR_SEND_HANDSHAKE:
  262. if(!getSocket()->isWritable(0)) {
  263. done = true;
  264. break;
  265. }
  266. disableWriteCheckSocket();
  267. setReadCheckSocket(getSocket());
  268. //socket->setBlockingMode();
  269. setTimeout(getOption()->getAsInt(PREF_BT_TIMEOUT));
  270. btInteractive_->initiateHandshake();
  271. sequence_ = INITIATOR_WAIT_HANDSHAKE;
  272. break;
  273. case INITIATOR_WAIT_HANDSHAKE: {
  274. if(btInteractive_->countPendingMessage() > 0) {
  275. btInteractive_->sendPendingMessage();
  276. if(btInteractive_->countPendingMessage() > 0) {
  277. done = true;
  278. break;
  279. }
  280. }
  281. BtMessageHandle handshakeMessage = btInteractive_->receiveHandshake();
  282. if(!handshakeMessage) {
  283. done = true;
  284. break;
  285. }
  286. btInteractive_->doPostHandshakeProcessing();
  287. sequence_ = WIRED;
  288. break;
  289. }
  290. case RECEIVER_WAIT_HANDSHAKE: {
  291. BtMessageHandle handshakeMessage =
  292. btInteractive_->receiveAndSendHandshake();
  293. if(!handshakeMessage) {
  294. done = true;
  295. break;
  296. }
  297. btInteractive_->doPostHandshakeProcessing();
  298. sequence_ = WIRED;
  299. break;
  300. }
  301. case WIRED:
  302. // See the comment for writable check below.
  303. disableWriteCheckSocket();
  304. btInteractive_->doInteractionProcessing();
  305. if(btInteractive_->countReceivedMessageInIteration() > 0) {
  306. updateKeepAlive();
  307. }
  308. if((getPeer()->amInterested() && !getPeer()->peerChoking()) ||
  309. btInteractive_->countOutstandingRequest() ||
  310. (getPeer()->peerInterested() && !getPeer()->amChoking())) {
  311. // Writable check to avoid slow seeding
  312. if(btInteractive_->isSendingMessageInProgress()) {
  313. setWriteCheckSocket(getSocket());
  314. }
  315. if(getDownloadEngine()->getRequestGroupMan()->
  316. doesOverallDownloadSpeedExceed() ||
  317. requestGroup_->doesDownloadSpeedExceed()) {
  318. disableReadCheckSocket();
  319. setNoCheck(true);
  320. } else {
  321. setReadCheckSocket(getSocket());
  322. }
  323. } else {
  324. disableReadCheckSocket();
  325. }
  326. done = true;
  327. break;
  328. }
  329. }
  330. if(btInteractive_->countPendingMessage() > 0) {
  331. setNoCheck(true);
  332. }
  333. getDownloadEngine()->addCommand(this);
  334. return false;
  335. }
  336. // TODO this method removed when PeerBalancerCommand is implemented
  337. bool PeerInteractionCommand::prepareForNextPeer(time_t wait) {
  338. if(peerStorage_->isPeerAvailable() && btRuntime_->lessThanEqMinPeers()) {
  339. SharedHandle<Peer> peer = peerStorage_->getUnusedPeer();
  340. peer->usedBy(getDownloadEngine()->newCUID());
  341. PeerInitiateConnectionCommand* command =
  342. new PeerInitiateConnectionCommand
  343. (peer->usedBy(), requestGroup_, peer, getDownloadEngine(), btRuntime_);
  344. command->setPeerStorage(peerStorage_);
  345. command->setPieceStorage(pieceStorage_);
  346. getDownloadEngine()->addCommand(command);
  347. }
  348. return true;
  349. }
  350. void PeerInteractionCommand::onAbort() {
  351. btInteractive_->cancelAllPiece();
  352. peerStorage_->returnPeer(getPeer());
  353. }
  354. void PeerInteractionCommand::onFailure(const Exception& err)
  355. {
  356. requestGroup_->setLastErrorCode(err.getErrorCode());
  357. requestGroup_->setHaltRequested(true);
  358. }
  359. bool PeerInteractionCommand::exitBeforeExecute()
  360. {
  361. return btRuntime_->isHalt();
  362. }
  363. const SharedHandle<Option>& PeerInteractionCommand::getOption() const
  364. {
  365. return requestGroup_->getOption();
  366. }
  367. } // namespace aria2