PeerInteractionCommand.cc 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. /* <!-- copyright */
  2. /*
  3. * aria2 - a simple utility for downloading files faster
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  20. */
  21. /* copyright --> */
  22. #include "PeerInteractionCommand.h"
  23. #include "PeerInitiateConnectionCommand.h"
  24. #include "PeerMessageUtil.h"
  25. #include "HandshakeMessage.h"
  26. #include "KeepAliveMessage.h"
  27. #include "ChokeMessage.h"
  28. #include "UnchokeMessage.h"
  29. #include "HaveMessage.h"
  30. #include "DlAbortEx.h"
  31. #include "Util.h"
  32. #include "message.h"
  33. #include "prefs.h"
  34. #include <algorithm>
  35. PeerInteractionCommand::PeerInteractionCommand(int cuid,
  36. const PeerHandle& p,
  37. TorrentDownloadEngine* e,
  38. const SocketHandle& s,
  39. int sequence)
  40. :PeerAbstractCommand(cuid, p, e, s), sequence(sequence) {
  41. if(sequence == INITIATOR_SEND_HANDSHAKE) {
  42. disableReadCheckSocket();
  43. setWriteCheckSocket(socket);
  44. setTimeout(e->option->getAsInt(PREF_PEER_CONNECTION_TIMEOUT));
  45. }
  46. peerInteraction = new PeerInteraction(cuid, peer, socket, e->option,
  47. e->torrentMan);
  48. peerInteraction->setUploadLimit(e->option->getAsInt(PREF_UPLOAD_LIMIT));
  49. setUploadLimit(e->option->getAsInt(PREF_UPLOAD_LIMIT));
  50. chokeUnchokeCount = 0;
  51. haveCount = 0;
  52. keepAliveCount = 0;
  53. e->torrentMan->addActivePeer(peer);
  54. }
  55. PeerInteractionCommand::~PeerInteractionCommand() {
  56. delete peerInteraction;
  57. e->torrentMan->deleteActivePeer(peer);
  58. }
  59. bool PeerInteractionCommand::executeInternal() {
  60. disableWriteCheckSocket();
  61. setUploadLimitCheck(false);
  62. setNoCheck(false);
  63. switch(sequence) {
  64. case INITIATOR_SEND_HANDSHAKE:
  65. if(!socket->isWritable(0)) {
  66. setWriteCheckSocket(socket);
  67. break;
  68. }
  69. socket->setBlockingMode();
  70. setReadCheckSocket(socket);
  71. setTimeout(e->option->getAsInt(PREF_TIMEOUT));
  72. peerInteraction->sendHandshake();
  73. sequence = INITIATOR_WAIT_HANDSHAKE;
  74. break;
  75. case INITIATOR_WAIT_HANDSHAKE: {
  76. if(peerInteraction->countMessageInQueue() > 0) {
  77. peerInteraction->sendMessages(e->getUploadSpeed());
  78. if(peerInteraction->countMessageInQueue() > 0) {
  79. break;
  80. }
  81. }
  82. PeerMessageHandle handshakeMessage =
  83. peerInteraction->receiveHandshake();
  84. if(handshakeMessage.get() == 0) {
  85. break;
  86. }
  87. peer->setPeerId(((HandshakeMessage*)handshakeMessage.get())->peerId);
  88. logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid,
  89. peer->ipaddr.c_str(), peer->port,
  90. handshakeMessage->toString().c_str());
  91. haveCheckTime.reset();
  92. peerInteraction->sendBitfield();
  93. peerInteraction->sendAllowedFast();
  94. sequence = WIRED;
  95. break;
  96. }
  97. case RECEIVER_WAIT_HANDSHAKE: {
  98. PeerMessageHandle handshakeMessage =
  99. peerInteraction->receiveHandshake(true);
  100. if(handshakeMessage.get() == 0) {
  101. break;
  102. }
  103. peer->setPeerId(((HandshakeMessage*)handshakeMessage.get())->peerId);
  104. logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid,
  105. peer->ipaddr.c_str(), peer->port,
  106. handshakeMessage->toString().c_str());
  107. haveCheckTime.reset();
  108. peerInteraction->sendBitfield();
  109. peerInteraction->sendAllowedFast();
  110. sequence = WIRED;
  111. break;
  112. }
  113. case WIRED:
  114. peerInteraction->syncPiece();
  115. decideChoking();
  116. if(periodicExecPoint.elapsedInMillis(500)) {
  117. periodicExecPoint.reset();
  118. detectMessageFlooding();
  119. peerInteraction->checkRequestSlot();
  120. checkHave();
  121. sendKeepAlive();
  122. }
  123. receiveMessages();
  124. peerInteraction->addRequests();
  125. peerInteraction->sendMessages(e->getUploadSpeed());
  126. break;
  127. }
  128. if(peerInteraction->countMessageInQueue() > 0) {
  129. if(peerInteraction->isSendingMessageInProgress()) {
  130. setUploadLimitCheck(true);
  131. }
  132. setNoCheck(true);
  133. }
  134. e->commands.push_back(this);
  135. return false;
  136. }
  137. #define FLOODING_CHECK_INTERVAL 5
  138. void PeerInteractionCommand::detectMessageFlooding() {
  139. if(freqCheckPoint.elapsed(FLOODING_CHECK_INTERVAL)) {
  140. if(chokeUnchokeCount*1.0/FLOODING_CHECK_INTERVAL >= 0.4
  141. //|| haveCount*1.0/elapsed >= 20.0
  142. || keepAliveCount*1.0/FLOODING_CHECK_INTERVAL >= 1.0) {
  143. throw new DlAbortEx("Flooding detected.");
  144. } else {
  145. chokeUnchokeCount = 0;
  146. haveCount = 0;
  147. keepAliveCount = 0;
  148. freqCheckPoint.reset();
  149. }
  150. }
  151. }
  152. /*
  153. void PeerInteractionCommand::checkLongTimePeerChoking() {
  154. if(e->torrentMan->downloadComplete()) {
  155. return;
  156. }
  157. if(peer->amInterested && peer->peerChoking) {
  158. if(chokeCheckPoint.elapsed(MAX_PEER_CHOKING_INTERVAL)) {
  159. logger->info("CUID#%d - The peer is choking too long.", cuid);
  160. peer->snubbing = true;
  161. }
  162. } else {
  163. chokeCheckPoint.reset();
  164. }
  165. }
  166. */
  167. void PeerInteractionCommand::decideChoking() {
  168. if(peer->shouldBeChoking()) {
  169. if(!peer->amChoking) {
  170. peerInteraction->addMessage(peerInteraction->getPeerMessageFactory()->
  171. createChokeMessage());
  172. }
  173. } else {
  174. if(peer->amChoking) {
  175. peerInteraction->addMessage(peerInteraction->getPeerMessageFactory()->
  176. createUnchokeMessage());
  177. }
  178. }
  179. }
  180. void PeerInteractionCommand::receiveMessages() {
  181. for(int i = 0; i < 50; i++) {
  182. PeerMessageHandle message = peerInteraction->receiveMessage();
  183. if(message.get() == NULL) {
  184. return;
  185. }
  186. logger->info(MSG_RECEIVE_PEER_MESSAGE, cuid,
  187. peer->ipaddr.c_str(), peer->port,
  188. message->toString().c_str());
  189. // to detect flooding
  190. switch(message->getId()) {
  191. case KeepAliveMessage::ID:
  192. keepAliveCount++;
  193. break;
  194. case ChokeMessage::ID:
  195. if(!peer->peerChoking) {
  196. chokeUnchokeCount++;
  197. }
  198. break;
  199. case UnchokeMessage::ID:
  200. if(peer->peerChoking) {
  201. chokeUnchokeCount++;
  202. }
  203. break;
  204. case HaveMessage::ID:
  205. haveCount++;
  206. break;
  207. }
  208. message->receivedAction();
  209. }
  210. }
  211. // TODO this method removed when PeerBalancerCommand is implemented
  212. bool PeerInteractionCommand::prepareForNextPeer(int wait) {
  213. if(e->torrentMan->isPeerAvailable()) {
  214. PeerHandle peer = e->torrentMan->getPeer();
  215. int newCuid = e->torrentMan->getNewCuid();
  216. peer->cuid = newCuid;
  217. PeerInitiateConnectionCommand* command =
  218. new PeerInitiateConnectionCommand(newCuid, peer, e);
  219. e->commands.push_back(command);
  220. }
  221. return true;
  222. }
  223. bool PeerInteractionCommand::prepareForRetry(int wait) {
  224. e->commands.push_back(this);
  225. return false;
  226. }
  227. void PeerInteractionCommand::onAbort(Exception* ex) {
  228. peerInteraction->abortAllPieces();
  229. PeerAbstractCommand::onAbort(ex);
  230. }
  231. void PeerInteractionCommand::sendKeepAlive() {
  232. if(keepAliveCheckPoint.elapsed(KEEP_ALIVE_INTERVAL)) {
  233. if(peerInteraction->countMessageInQueue() == 0) {
  234. peerInteraction->addMessage(peerInteraction->getPeerMessageFactory()->
  235. createKeepAliveMessage());
  236. peerInteraction->sendMessages(e->getUploadSpeed());
  237. }
  238. keepAliveCheckPoint.reset();
  239. }
  240. }
  241. void PeerInteractionCommand::checkHave() {
  242. PieceIndexes indexes =
  243. e->torrentMan->getAdvertisedPieceIndexes(cuid, haveCheckTime);
  244. haveCheckTime.reset();
  245. if(indexes.size() >= 20) {
  246. if(peer->isFastExtensionEnabled()) {
  247. if(e->torrentMan->hasAllPieces()) {
  248. peerInteraction->addMessage(peerInteraction->getPeerMessageFactory()->
  249. createHaveAllMessage());
  250. } else {
  251. peerInteraction->addMessage(peerInteraction->getPeerMessageFactory()->
  252. createBitfieldMessage());
  253. }
  254. } else {
  255. peerInteraction->addMessage(peerInteraction->getPeerMessageFactory()->
  256. createBitfieldMessage());
  257. }
  258. } else {
  259. for(PieceIndexes::iterator itr = indexes.begin(); itr != indexes.end(); itr++) {
  260. peerInteraction->addMessage(peerInteraction->getPeerMessageFactory()->
  261. createHaveMessage(*itr));
  262. }
  263. }
  264. }