PeerInteractionCommand.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. /* <!-- copyright */
  2. /*
  3. * aria2 - The high speed download utility
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * In addition, as a special exception, the copyright holders give
  22. * permission to link the code of portions of this program with the
  23. * OpenSSL library under certain conditions as described in each
  24. * individual source file, and distribute linked combinations
  25. * including the two.
  26. * You must obey the GNU General Public License in all respects
  27. * for all of the code used other than OpenSSL. If you modify
  28. * file(s) with this exception, you may extend this exception to your
  29. * version of the file(s), but you are not obligated to do so. If you
  30. * do not wish to do so, delete this exception statement from your
  31. * version. If you delete this exception statement from all source
  32. * files in the program, then also delete it here.
  33. */
  34. /* copyright --> */
  35. #include "PeerInteractionCommand.h"
  36. #include <algorithm>
  37. #include "DownloadEngine.h"
  38. #include "PeerInitiateConnectionCommand.h"
  39. #include "DefaultBtInteractive.h"
  40. #include "DlAbortEx.h"
  41. #include "message.h"
  42. #include "prefs.h"
  43. #include "Socket.h"
  44. #include "Option.h"
  45. #include "DownloadContext.h"
  46. #include "Peer.h"
  47. #include "BtMessage.h"
  48. #include "BtRuntime.h"
  49. #include "PeerStorage.h"
  50. #include "DefaultBtMessageDispatcher.h"
  51. #include "DefaultBtMessageReceiver.h"
  52. #include "DefaultBtRequestFactory.h"
  53. #include "DefaultBtMessageFactory.h"
  54. #include "DefaultBtInteractive.h"
  55. #include "PeerConnection.h"
  56. #include "ExtensionMessageFactory.h"
  57. #include "DHTRoutingTable.h"
  58. #include "DHTTaskQueue.h"
  59. #include "DHTTaskFactory.h"
  60. #include "DHTNode.h"
  61. #include "DHTRegistry.h"
  62. #include "DHTPeerAnnounceStorage.h"
  63. #include "DHTTokenTracker.h"
  64. #include "DHTMessageDispatcher.h"
  65. #include "DHTMessageReceiver.h"
  66. #include "DHTMessageFactory.h"
  67. #include "DHTMessageCallback.h"
  68. #include "PieceStorage.h"
  69. #include "RequestGroup.h"
  70. #include "BtAnnounce.h"
  71. #include "BtProgressInfoFile.h"
  72. #include "DefaultExtensionMessageFactory.h"
  73. #include "RequestGroupMan.h"
  74. #include "ExtensionMessageRegistry.h"
  75. #include "bittorrent_helper.h"
  76. #include "UTMetadataRequestFactory.h"
  77. #include "UTMetadataRequestTracker.h"
  78. #include "ServerStatMan.h"
  79. #include "FileAllocationEntry.h"
  80. #include "CheckIntegrityEntry.h"
  81. namespace aria2 {
  82. PeerInteractionCommand::PeerInteractionCommand
  83. (cuid_t cuid,
  84. RequestGroup* requestGroup,
  85. const SharedHandle<Peer>& p,
  86. DownloadEngine* e,
  87. const SharedHandle<BtRuntime>& btRuntime,
  88. const SharedHandle<PieceStorage>& pieceStorage,
  89. const SharedHandle<PeerStorage>& peerStorage,
  90. const SocketHandle& s,
  91. Seq sequence,
  92. const PeerConnectionHandle& passedPeerConnection)
  93. :PeerAbstractCommand(cuid, p, e, s),
  94. requestGroup_(requestGroup),
  95. btRuntime_(btRuntime),
  96. pieceStorage_(pieceStorage),
  97. peerStorage_(peerStorage),
  98. sequence_(sequence)
  99. {
  100. // TODO move following bunch of processing to separate method, like init()
  101. if(sequence_ == INITIATOR_SEND_HANDSHAKE) {
  102. disableReadCheckSocket();
  103. setWriteCheckSocket(getSocket());
  104. setTimeout(getOption()->getAsInt(PREF_PEER_CONNECTION_TIMEOUT));
  105. }
  106. int family;
  107. unsigned char compact[COMPACT_LEN_IPV6];
  108. int compactlen = bittorrent::packcompact
  109. (compact, getPeer()->getIPAddress(), getPeer()->getPort());
  110. if(compactlen == COMPACT_LEN_IPV6) {
  111. family = AF_INET6;
  112. } else {
  113. family = AF_INET;
  114. }
  115. SharedHandle<TorrentAttribute> torrentAttrs =
  116. bittorrent::getTorrentAttrs(requestGroup_->getDownloadContext());
  117. bool metadataGetMode = torrentAttrs->metadata.empty();
  118. SharedHandle<ExtensionMessageRegistry> exMsgRegistry
  119. (new ExtensionMessageRegistry());
  120. SharedHandle<UTMetadataRequestFactory> utMetadataRequestFactory;
  121. SharedHandle<UTMetadataRequestTracker> utMetadataRequestTracker;
  122. if(metadataGetMode) {
  123. utMetadataRequestFactory.reset(new UTMetadataRequestFactory());
  124. utMetadataRequestTracker.reset(new UTMetadataRequestTracker());
  125. }
  126. SharedHandle<DefaultExtensionMessageFactory> extensionMessageFactory
  127. (new DefaultExtensionMessageFactory(getPeer(), exMsgRegistry));
  128. extensionMessageFactory->setPeerStorage(peerStorage);
  129. extensionMessageFactory->setDownloadContext
  130. (requestGroup_->getDownloadContext());
  131. extensionMessageFactory->setUTMetadataRequestTracker
  132. (utMetadataRequestTracker);
  133. // PieceStorage will be set later.
  134. SharedHandle<DefaultBtMessageFactory> factory(new DefaultBtMessageFactory());
  135. factory->setCuid(cuid);
  136. factory->setDownloadContext(requestGroup_->getDownloadContext());
  137. factory->setPieceStorage(pieceStorage);
  138. factory->setPeerStorage(peerStorage);
  139. factory->setExtensionMessageFactory(extensionMessageFactory);
  140. factory->setPeer(getPeer());
  141. if(family == AF_INET) {
  142. factory->setLocalNode(DHTRegistry::getData().localNode);
  143. factory->setRoutingTable(DHTRegistry::getData().routingTable);
  144. factory->setTaskQueue(DHTRegistry::getData().taskQueue);
  145. factory->setTaskFactory(DHTRegistry::getData().taskFactory);
  146. } else {
  147. factory->setLocalNode(DHTRegistry::getData6().localNode);
  148. factory->setRoutingTable(DHTRegistry::getData6().routingTable);
  149. factory->setTaskQueue(DHTRegistry::getData6().taskQueue);
  150. factory->setTaskFactory(DHTRegistry::getData6().taskFactory);
  151. }
  152. if(metadataGetMode) {
  153. factory->enableMetadataGetMode();
  154. }
  155. PeerConnectionHandle peerConnection;
  156. if(passedPeerConnection.isNull()) {
  157. peerConnection.reset(new PeerConnection(cuid, getPeer(), getSocket()));
  158. } else {
  159. peerConnection = passedPeerConnection;
  160. }
  161. SharedHandle<DefaultBtMessageDispatcher> dispatcher
  162. (new DefaultBtMessageDispatcher());
  163. dispatcher->setCuid(cuid);
  164. dispatcher->setPeer(getPeer());
  165. dispatcher->setDownloadContext(requestGroup_->getDownloadContext());
  166. dispatcher->setPieceStorage(pieceStorage);
  167. dispatcher->setPeerStorage(peerStorage);
  168. dispatcher->setRequestTimeout(getOption()->getAsInt(PREF_BT_REQUEST_TIMEOUT));
  169. dispatcher->setBtMessageFactory(factory);
  170. dispatcher->setRequestGroupMan(getDownloadEngine()->getRequestGroupMan());
  171. DefaultBtMessageReceiverHandle receiver(new DefaultBtMessageReceiver());
  172. receiver->setDownloadContext(requestGroup_->getDownloadContext());
  173. receiver->setPeerConnection(peerConnection);
  174. receiver->setDispatcher(dispatcher);
  175. receiver->setBtMessageFactory(factory);
  176. SharedHandle<DefaultBtRequestFactory> reqFactory
  177. (new DefaultBtRequestFactory());
  178. reqFactory->setPeer(getPeer());
  179. reqFactory->setPieceStorage(pieceStorage);
  180. reqFactory->setBtMessageDispatcher(dispatcher);
  181. reqFactory->setBtMessageFactory(factory);
  182. DefaultBtInteractiveHandle btInteractive
  183. (new DefaultBtInteractive(requestGroup_->getDownloadContext(), getPeer()));
  184. btInteractive->setBtRuntime(btRuntime_);
  185. btInteractive->setPieceStorage(pieceStorage_);
  186. btInteractive->setPeerStorage(peerStorage); // Note: Not a member variable.
  187. btInteractive->setCuid(cuid);
  188. btInteractive->setBtMessageReceiver(receiver);
  189. btInteractive->setDispatcher(dispatcher);
  190. btInteractive->setBtRequestFactory(reqFactory);
  191. btInteractive->setPeerConnection(peerConnection);
  192. btInteractive->setExtensionMessageFactory(extensionMessageFactory);
  193. btInteractive->setExtensionMessageRegistry(exMsgRegistry);
  194. btInteractive->setKeepAliveInterval
  195. (getOption()->getAsInt(PREF_BT_KEEP_ALIVE_INTERVAL));
  196. btInteractive->setRequestGroupMan(getDownloadEngine()->getRequestGroupMan());
  197. btInteractive->setBtMessageFactory(factory);
  198. if((metadataGetMode || !torrentAttrs->privateTorrent) &&
  199. !getPeer()->isLocalPeer()) {
  200. if(getOption()->getAsBool(PREF_ENABLE_PEER_EXCHANGE)) {
  201. btInteractive->setUTPexEnabled(true);
  202. }
  203. if(family == AF_INET) {
  204. if(DHTRegistry::isInitialized()) {
  205. btInteractive->setDHTEnabled(true);
  206. factory->setDHTEnabled(true);
  207. btInteractive->setLocalNode(DHTRegistry::getData().localNode);
  208. }
  209. } else {
  210. if(DHTRegistry::isInitialized6()) {
  211. btInteractive->setDHTEnabled(true);
  212. factory->setDHTEnabled(true);
  213. btInteractive->setLocalNode(DHTRegistry::getData6().localNode);
  214. }
  215. }
  216. }
  217. btInteractive->setUTMetadataRequestFactory(utMetadataRequestFactory);
  218. btInteractive->setUTMetadataRequestTracker(utMetadataRequestTracker);
  219. if(metadataGetMode) {
  220. btInteractive->enableMetadataGetMode();
  221. }
  222. btInteractive_ = btInteractive;
  223. // reverse depends
  224. factory->setBtMessageDispatcher(dispatcher);
  225. factory->setBtRequestFactory(reqFactory);
  226. factory->setPeerConnection(peerConnection);
  227. extensionMessageFactory->setBtMessageDispatcher(dispatcher);
  228. extensionMessageFactory->setBtMessageFactory(factory);
  229. if(metadataGetMode) {
  230. utMetadataRequestFactory->setDownloadContext
  231. (requestGroup_->getDownloadContext());
  232. utMetadataRequestFactory->setBtMessageDispatcher(dispatcher);
  233. utMetadataRequestFactory->setBtMessageFactory(factory);
  234. utMetadataRequestFactory->setPeer(getPeer());
  235. utMetadataRequestFactory->setUTMetadataRequestTracker
  236. (utMetadataRequestTracker);
  237. }
  238. getPeer()->allocateSessionResource
  239. (requestGroup_->getDownloadContext()->getPieceLength(),
  240. requestGroup_->getDownloadContext()->getTotalLength());
  241. getPeer()->setBtMessageDispatcher(dispatcher);
  242. btRuntime_->increaseConnections();
  243. requestGroup_->increaseNumCommand();
  244. }
  245. PeerInteractionCommand::~PeerInteractionCommand() {
  246. if(getPeer()->getCompletedLength() > 0) {
  247. pieceStorage_->subtractPieceStats(getPeer()->getBitfield(),
  248. getPeer()->getBitfieldLength());
  249. }
  250. getPeer()->releaseSessionResource();
  251. requestGroup_->decreaseNumCommand();
  252. btRuntime_->decreaseConnections();
  253. }
  254. bool PeerInteractionCommand::executeInternal() {
  255. setNoCheck(false);
  256. switch(sequence_) {
  257. case INITIATOR_SEND_HANDSHAKE:
  258. if(!getSocket()->isWritable(0)) {
  259. break;
  260. }
  261. disableWriteCheckSocket();
  262. setReadCheckSocket(getSocket());
  263. //socket->setBlockingMode();
  264. setTimeout(getOption()->getAsInt(PREF_BT_TIMEOUT));
  265. btInteractive_->initiateHandshake();
  266. sequence_ = INITIATOR_WAIT_HANDSHAKE;
  267. break;
  268. case INITIATOR_WAIT_HANDSHAKE: {
  269. if(btInteractive_->countPendingMessage() > 0) {
  270. btInteractive_->sendPendingMessage();
  271. if(btInteractive_->countPendingMessage() > 0) {
  272. break;
  273. }
  274. }
  275. BtMessageHandle handshakeMessage = btInteractive_->receiveHandshake();
  276. if(handshakeMessage.isNull()) {
  277. break;
  278. }
  279. btInteractive_->doPostHandshakeProcessing();
  280. sequence_ = WIRED;
  281. break;
  282. }
  283. case RECEIVER_WAIT_HANDSHAKE: {
  284. BtMessageHandle handshakeMessage =btInteractive_->receiveAndSendHandshake();
  285. if(handshakeMessage.isNull()) {
  286. break;
  287. }
  288. btInteractive_->doPostHandshakeProcessing();
  289. sequence_ = WIRED;
  290. break;
  291. }
  292. case WIRED:
  293. // See the comment for writable check below.
  294. disableWriteCheckSocket();
  295. btInteractive_->doInteractionProcessing();
  296. if(btInteractive_->countReceivedMessageInIteration() > 0) {
  297. updateKeepAlive();
  298. }
  299. if((getPeer()->amInterested() && !getPeer()->peerChoking()) ||
  300. btInteractive_->countOutstandingRequest() ||
  301. (getPeer()->peerInterested() && !getPeer()->amChoking())) {
  302. // Writable check to avoid slow seeding
  303. if(btInteractive_->isSendingMessageInProgress()) {
  304. setWriteCheckSocket(getSocket());
  305. }
  306. if(getDownloadEngine()->getRequestGroupMan()->
  307. doesOverallDownloadSpeedExceed() ||
  308. requestGroup_->doesDownloadSpeedExceed()) {
  309. disableReadCheckSocket();
  310. setNoCheck(true);
  311. } else {
  312. setReadCheckSocket(getSocket());
  313. }
  314. } else {
  315. disableReadCheckSocket();
  316. }
  317. break;
  318. }
  319. if(btInteractive_->countPendingMessage() > 0) {
  320. setNoCheck(true);
  321. }
  322. getDownloadEngine()->addCommand(this);
  323. return false;
  324. }
  325. // TODO this method removed when PeerBalancerCommand is implemented
  326. bool PeerInteractionCommand::prepareForNextPeer(time_t wait) {
  327. if(peerStorage_->isPeerAvailable() && btRuntime_->lessThanEqMinPeers()) {
  328. SharedHandle<Peer> peer = peerStorage_->getUnusedPeer();
  329. peer->usedBy(getDownloadEngine()->newCUID());
  330. PeerInitiateConnectionCommand* command =
  331. new PeerInitiateConnectionCommand
  332. (peer->usedBy(), requestGroup_, peer, getDownloadEngine(), btRuntime_);
  333. command->setPeerStorage(peerStorage_);
  334. command->setPieceStorage(pieceStorage_);
  335. getDownloadEngine()->addCommand(command);
  336. }
  337. return true;
  338. }
  339. void PeerInteractionCommand::onAbort() {
  340. btInteractive_->cancelAllPiece();
  341. peerStorage_->returnPeer(getPeer());
  342. }
  343. void PeerInteractionCommand::onFailure()
  344. {
  345. requestGroup_->setHaltRequested(true);
  346. }
  347. bool PeerInteractionCommand::exitBeforeExecute()
  348. {
  349. return btRuntime_->isHalt();
  350. }
  351. const SharedHandle<Option>& PeerInteractionCommand::getOption() const
  352. {
  353. return requestGroup_->getOption();
  354. }
  355. } // namespace aria2