PeerInteraction.cc 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. /* <!-- copyright */
  2. /*
  3. * aria2 - a simple utility for downloading files faster
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  20. */
  21. /* copyright --> */
  22. #include "PeerInteraction.h"
  23. #include "LogFactory.h"
  24. #include "DlAbortEx.h"
  25. #include "KeepAliveMessage.h"
  26. #include "PeerMessageUtil.h"
  27. #include "Util.h"
  28. #include <netinet/in.h>
  29. PeerInteraction::PeerInteraction(int cuid,
  30. const PeerHandle& peer,
  31. const SocketHandle& socket,
  32. const Option* op,
  33. TorrentMan* torrentMan)
  34. :cuid(cuid),
  35. uploadLimit(0),
  36. torrentMan(torrentMan),
  37. peer(peer),
  38. quickReplied(false) {
  39. peerConnection = new PeerConnection(cuid, socket, op);
  40. peerMessageFactory = new PeerMessageFactory(cuid, this, peer);
  41. logger = LogFactory::getInstance();
  42. }
  43. PeerInteraction::~PeerInteraction() {
  44. delete peerConnection;
  45. delete peerMessageFactory;
  46. }
  47. bool PeerInteraction::isSendingMessageInProgress() const {
  48. if(messageQueue.size() > 0) {
  49. const PeerMessageHandle& peerMessage = messageQueue.front();
  50. if(peerMessage->isInProgress()) {
  51. return true;
  52. }
  53. }
  54. return false;
  55. }
  56. void PeerInteraction::sendMessages(int uploadSpeed) {
  57. MessageQueue tempQueue;
  58. while(messageQueue.size() > 0) {
  59. PeerMessageHandle msg = messageQueue.front();
  60. messageQueue.pop_front();
  61. if(uploadLimit != 0 && uploadLimit*1024 <= uploadSpeed &&
  62. msg->isUploading() && !msg->isInProgress()) {
  63. tempQueue.push_back(msg);
  64. } else {
  65. msg->send();
  66. if(msg->isInProgress()) {
  67. messageQueue.push_front(msg);
  68. break;
  69. }
  70. }
  71. }
  72. copy(tempQueue.begin(), tempQueue.end(), back_inserter(messageQueue));
  73. }
  74. void PeerInteraction::addMessage(const PeerMessageHandle& peerMessage) {
  75. peerMessage->onPush();
  76. messageQueue.push_back(peerMessage);
  77. }
  78. void PeerInteraction::addRequestSlot(const RequestSlot& requestSlot) {
  79. requestSlots.push_back(requestSlot);
  80. }
  81. void PeerInteraction::rejectAllPieceMessageInQueue() {
  82. int size = messageQueue.size();
  83. for(int i = 0; i < size; i++) {
  84. messageQueue.at(i)->onChoked();
  85. }
  86. }
  87. void PeerInteraction::rejectPieceMessageInQueue(int index, int begin, int length) {
  88. int size = messageQueue.size();
  89. for(int i = 0; i < size; i++) {
  90. messageQueue.at(i)->onCanceled(index, begin, length);
  91. }
  92. }
  93. void PeerInteraction::onChoked() {
  94. for(Pieces::iterator itr = pieces.begin(); itr != pieces.end();) {
  95. Piece& piece = *itr;
  96. if(!peer->isInFastSet(piece.getIndex())) {
  97. abortPiece(piece);
  98. itr = pieces.erase(itr);
  99. } else {
  100. itr++;
  101. }
  102. }
  103. }
  104. void PeerInteraction::abortAllPieces() {
  105. for(Pieces::iterator itr = pieces.begin(); itr != pieces.end();) {
  106. abortPiece(*itr);
  107. itr = pieces.erase(itr);
  108. }
  109. }
  110. void PeerInteraction::abortPiece(Piece& piece) {
  111. if(!Piece::isNull(piece)) {
  112. int size = messageQueue.size();
  113. for(int i = 0; i < size; i++) {
  114. messageQueue.at(i)->onAbortPiece(piece);
  115. }
  116. for(RequestSlots::iterator itr = requestSlots.begin();
  117. itr != requestSlots.end();) {
  118. if(itr->getIndex() == piece.getIndex()) {
  119. logger->debug("CUID#%d - Deleting request slot blockIndex=%d"
  120. " because piece was canceled",
  121. cuid,
  122. itr->getBlockIndex());
  123. piece.cancelBlock(itr->getBlockIndex());
  124. itr = requestSlots.erase(itr);
  125. } else {
  126. itr++;
  127. }
  128. }
  129. torrentMan->cancelPiece(piece);
  130. }
  131. }
  132. void PeerInteraction::deleteRequestSlot(const RequestSlot& requestSlot) {
  133. RequestSlots::iterator itr = find(requestSlots.begin(), requestSlots.end(),
  134. requestSlot);
  135. if(itr != requestSlots.end()) {
  136. requestSlots.erase(itr);
  137. }
  138. }
  139. void PeerInteraction::checkRequestSlot() {
  140. for(RequestSlots::iterator itr = requestSlots.begin();
  141. itr != requestSlots.end();) {
  142. RequestSlot& slot = *itr;
  143. if(slot.isTimeout(REQUEST_TIME_OUT)) {
  144. logger->debug("CUID#%d - Deleting request slot blockIndex=%d"
  145. " because of time out",
  146. cuid,
  147. slot.getBlockIndex());
  148. Piece& piece = getDownloadPiece(slot.getIndex());
  149. piece.cancelBlock(slot.getBlockIndex());
  150. itr = requestSlots.erase(itr);
  151. peer->snubbing = true;
  152. } else {
  153. Piece piece = getDownloadPiece(slot.getIndex());
  154. if(piece.hasBlock(slot.getBlockIndex()) ||
  155. torrentMan->hasPiece(piece.getIndex())) {
  156. logger->debug("CUID#%d - Deleting request slot blockIndex=%d because"
  157. " the block has been acquired.", cuid,
  158. slot.getBlockIndex());
  159. addMessage(peerMessageFactory->createCancelMessage(slot.getIndex(),
  160. slot.getBegin(),
  161. slot.getLength()));
  162. itr = requestSlots.erase(itr);
  163. } else {
  164. itr++;
  165. }
  166. }
  167. }
  168. updatePiece();
  169. }
  170. bool PeerInteraction::isInRequestSlot(int index, int blockIndex) const {
  171. for(RequestSlots::const_iterator itr = requestSlots.begin();
  172. itr != requestSlots.end(); itr++) {
  173. const RequestSlot& slot = *itr;
  174. if(slot.getIndex() == index && slot.getBlockIndex() == blockIndex) {
  175. return true;
  176. }
  177. }
  178. return false;
  179. }
  180. RequestSlot PeerInteraction::getCorrespondingRequestSlot(int index,
  181. int begin,
  182. int length) const {
  183. for(RequestSlots::const_iterator itr = requestSlots.begin();
  184. itr != requestSlots.end(); itr++) {
  185. const RequestSlot& slot = *itr;
  186. if(slot.getIndex() == index &&
  187. slot.getBegin() == begin &&
  188. slot.getLength() == length) {
  189. return slot;
  190. }
  191. }
  192. return RequestSlot::nullSlot;
  193. }
  194. int PeerInteraction::countMessageInQueue() const {
  195. return messageQueue.size();
  196. }
  197. int PeerInteraction::countRequestSlot() const {
  198. return requestSlots.size();
  199. }
  200. PeerMessageHandle PeerInteraction::receiveHandshake(bool quickReply) {
  201. char msg[HANDSHAKE_MESSAGE_LENGTH];
  202. int msgLength = HANDSHAKE_MESSAGE_LENGTH;
  203. bool retval = peerConnection->receiveHandshake(msg, msgLength);
  204. // To handle tracker's NAT-checking feature
  205. if(!quickReplied && quickReply && msgLength >= 48) {
  206. quickReplied = true;
  207. // check info_hash
  208. if(memcmp(torrentMan->getInfoHash(), &msg[28], INFO_HASH_LENGTH) == 0) {
  209. sendHandshake();
  210. }
  211. }
  212. if(!retval) {
  213. return NULL;
  214. }
  215. PeerMessageHandle handshakeMessage(peerMessageFactory->createHandshakeMessage(msg, msgLength));
  216. handshakeMessage->check();
  217. if(((HandshakeMessage*)handshakeMessage.get())->isFastExtensionSupported()) {
  218. peer->setFastExtensionEnabled(true);
  219. logger->info("CUID#%d - Fast extension enabled.", cuid);
  220. }
  221. return handshakeMessage;
  222. }
  223. PeerMessageHandle PeerInteraction::receiveMessage() {
  224. char msg[MAX_PAYLOAD_LEN];
  225. int msgLength = 0;
  226. if(!peerConnection->receiveMessage(msg, msgLength)) {
  227. return NULL;
  228. }
  229. PeerMessageHandle peerMessage =
  230. peerMessageFactory->createPeerMessage(msg, msgLength);
  231. peerMessage->check();
  232. return peerMessage;
  233. }
  234. void PeerInteraction::syncPiece() {
  235. for(Pieces::iterator itr = pieces.begin(); itr != pieces.end(); itr++) {
  236. torrentMan->syncPiece(*itr);
  237. }
  238. }
  239. void PeerInteraction::updatePiece() {
  240. for(Pieces::iterator itr = pieces.begin(); itr != pieces.end(); itr++) {
  241. torrentMan->updatePiece(*itr);
  242. }
  243. }
  244. void PeerInteraction::getNewPieceAndSendInterest(int pieceNum) {
  245. if(pieces.empty() && !torrentMan->hasMissingPiece(peer)) {
  246. if(peer->amInterested) {
  247. logger->debug("CUID#%d - Not interested in the peer", cuid);
  248. addMessage(peerMessageFactory->createNotInterestedMessage());
  249. }
  250. } else {
  251. if(peer->peerChoking) {
  252. onChoked();
  253. if(peer->isFastExtensionEnabled()) {
  254. while((int)pieces.size() < pieceNum) {
  255. Piece piece = torrentMan->getMissingFastPiece(peer);
  256. if(Piece::isNull(piece)) {
  257. break;
  258. } else {
  259. pieces.push_back(piece);
  260. }
  261. }
  262. }
  263. } else {
  264. while((int)pieces.size() < pieceNum) {
  265. Piece piece = torrentMan->getMissingPiece(peer);
  266. if(Piece::isNull(piece)) {
  267. break;
  268. } else {
  269. pieces.push_back(piece);
  270. }
  271. }
  272. }
  273. if(!peer->amInterested) {
  274. logger->debug("CUID#%d - Interested in the peer", cuid);
  275. addMessage(peerMessageFactory->createInterestedMessage());
  276. }
  277. }
  278. }
  279. void PeerInteraction::addRequests() {
  280. // Abort downloading of completed piece.
  281. for(Pieces::iterator itr = pieces.begin(); itr != pieces.end();) {
  282. Piece& piece = *itr;
  283. if(piece.pieceComplete()) {
  284. abortPiece(piece);
  285. itr = pieces.erase(itr);
  286. } else {
  287. itr++;
  288. }
  289. }
  290. int MAX_PENDING_REQUEST;
  291. if(peer->getLatency() < 500) {
  292. MAX_PENDING_REQUEST = 24;
  293. } else if(peer->getLatency() < 1500) {
  294. MAX_PENDING_REQUEST = 12;
  295. } else {
  296. MAX_PENDING_REQUEST = 6;
  297. }
  298. int pieceNum;
  299. if(torrentMan->isEndGame()) {
  300. pieceNum = 1;
  301. } else {
  302. int blocks = DIV_FLOOR(torrentMan->pieceLength, BLOCK_LENGTH);
  303. pieceNum = DIV_FLOOR(MAX_PENDING_REQUEST, blocks);
  304. }
  305. getNewPieceAndSendInterest(pieceNum);
  306. for(Pieces::iterator itr = pieces.begin(); itr != pieces.end(); itr++) {
  307. Piece& piece = *itr;
  308. if(torrentMan->isEndGame()) {
  309. BlockIndexes missingBlockIndexes = piece.getAllMissingBlockIndexes();
  310. random_shuffle(missingBlockIndexes.begin(), missingBlockIndexes.end());
  311. int count = countRequestSlot();
  312. for(BlockIndexes::const_iterator bitr = missingBlockIndexes.begin();
  313. bitr != missingBlockIndexes.end() && count < MAX_PENDING_REQUEST;
  314. bitr++) {
  315. int blockIndex = *bitr;
  316. if(!isInRequestSlot(piece.getIndex(), blockIndex)) {
  317. addMessage(peerMessageFactory->createRequestMessage(piece,
  318. blockIndex));
  319. count++;
  320. }
  321. }
  322. } else {
  323. while(countRequestSlot() < MAX_PENDING_REQUEST) {
  324. int blockIndex = piece.getMissingUnusedBlockIndex();
  325. if(blockIndex == -1) {
  326. break;
  327. }
  328. addMessage(peerMessageFactory->createRequestMessage(piece,
  329. blockIndex));
  330. }
  331. }
  332. if(countRequestSlot() >= MAX_PENDING_REQUEST) {
  333. break;
  334. }
  335. }
  336. updatePiece();
  337. }
  338. void PeerInteraction::sendHandshake() {
  339. PeerMessageHandle handle =
  340. peerMessageFactory->createHandshakeMessage(torrentMan->getInfoHash(),
  341. torrentMan->peerId.c_str());
  342. addMessage(handle);
  343. sendMessages(0);
  344. }
  345. void PeerInteraction::sendBitfield() {
  346. if(peer->isFastExtensionEnabled()) {
  347. if(torrentMan->hasAllPieces()) {
  348. addMessage(peerMessageFactory->createHaveAllMessage());
  349. } else if(torrentMan->getDownloadLength() > 0) {
  350. addMessage(peerMessageFactory->createBitfieldMessage());
  351. } else {
  352. addMessage(peerMessageFactory->createHaveNoneMessage());
  353. }
  354. } else {
  355. if(torrentMan->getDownloadLength() > 0) {
  356. addMessage(peerMessageFactory->createBitfieldMessage());
  357. }
  358. }
  359. sendMessages(0);
  360. }
  361. void PeerInteraction::sendAllowedFast() {
  362. if(peer->isFastExtensionEnabled()) {
  363. Integers fastSet = Util::computeFastSet(peer->ipaddr, torrentMan->getInfoHash(),
  364. torrentMan->pieces, ALLOWED_FAST_SET_SIZE);
  365. for(Integers::const_iterator itr = fastSet.begin();
  366. itr != fastSet.end(); itr++) {
  367. addMessage(peerMessageFactory->createAllowedFastMessage(*itr));
  368. }
  369. }
  370. }
  371. Piece& PeerInteraction::getDownloadPiece(int index) {
  372. for(Pieces::iterator itr = pieces.begin(); itr != pieces.end(); itr++) {
  373. if(itr->getIndex() == index) {
  374. return *itr;
  375. }
  376. }
  377. throw new DlAbortEx("No such piece index=%d", index);
  378. }
  379. bool PeerInteraction::hasDownloadPiece(int index) const {
  380. for(Pieces::const_iterator itr = pieces.begin(); itr != pieces.end(); itr++) {
  381. if(itr->getIndex() == index) {
  382. return true;
  383. }
  384. }
  385. return false;
  386. }
  387. bool PeerInteraction::isInFastSet(int index) const {
  388. return find(fastSet.begin(), fastSet.end(), index) != fastSet.end();
  389. }
  390. void PeerInteraction::addFastSetIndex(int index) {
  391. if(!isInFastSet(index)) {
  392. fastSet.push_back(index);
  393. }
  394. }