PeerConnection.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. /* <!-- copyright */
  2. /*
  3. * aria2 - a simple utility for downloading files faster
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  20. */
  21. /* copyright --> */
  22. #include "PeerConnection.h"
  23. #include "message.h"
  24. #include "DlAbortEx.h"
  25. #include "PeerMessageUtil.h"
  26. #include "Util.h"
  27. #include <netinet/in.h>
  28. PeerConnection::PeerConnection(int cuid, const Socket* socket,
  29. const Option* op, const Logger* logger,
  30. Peer* peer, TorrentMan* torrentMan)
  31. :cuid(cuid), socket(socket), option(op), logger(logger), peer(peer),
  32. torrentMan(torrentMan),
  33. resbufLength(0), currentPayloadLength(0), lenbufLength(0) {}
  34. PeerConnection::~PeerConnection() {}
  35. void PeerConnection::sendHandshake() const {
  36. /**
  37. * pstrlen --- '19', 1byte
  38. * pstr --- "BitTorrent protocol", 19bytes
  39. * reserved --- \0 * 8, 8bytes
  40. * info_hash --- info_hash, 20bytes
  41. * peer_id --- peer_id, 20bytes
  42. * total: 68bytes
  43. */
  44. char msg[HANDSHAKE_MESSAGE_LENGTH];
  45. memset(msg, 0, sizeof(msg));
  46. msg[0] = 19;
  47. memcpy(&msg[1], PSTR, strlen(PSTR));
  48. memcpy(&msg[28], torrentMan->getInfoHash(), 20);
  49. memcpy(&msg[48], torrentMan->peerId.c_str(), 20);
  50. writeOutgoingMessageLog("handshake");
  51. socket->writeData(msg, sizeof(msg));
  52. }
  53. void PeerConnection::sendKeepAlive() const {
  54. /**
  55. * len --- 0, 4bytes
  56. * total: 4bytes
  57. */
  58. char msg[4];
  59. memset(msg, 0, sizeof(msg));
  60. writeOutgoingMessageLog("keep-alive");
  61. socket->writeData(msg, sizeof(msg));
  62. }
  63. void PeerConnection::createNLengthMessage(char* msg, int msgLen, int payloadLen, int id) const {
  64. assert(msgLen >= 5);
  65. memset(msg, 0, msgLen);
  66. setIntParam(msg, payloadLen);
  67. msg[4] = (char)id;
  68. }
  69. void PeerConnection::setIntParam(char* dest, int param) const {
  70. int nParam = htonl(param);
  71. memcpy(dest, &nParam, 4);
  72. }
  73. void PeerConnection::writeOutgoingMessageLog(const char* msg) const {
  74. logger->info(MSG_SEND_PEER_MESSAGE, cuid, peer->ipaddr.c_str(), peer->port, msg);
  75. }
  76. void PeerConnection::writeOutgoingMessageLog(const char* msg, int index) const {
  77. logger->info(MSG_SEND_PEER_MESSAGE_WITH_INDEX, cuid, peer->ipaddr.c_str(), peer->port, msg, index);
  78. }
  79. void PeerConnection::writeOutgoingMessageLog(const char* msg, const unsigned char* bitfield, int bitfieldLength) const {
  80. logger->info(MSG_SEND_PEER_MESSAGE_WITH_BITFIELD, cuid, peer->ipaddr.c_str(), peer->port, msg, Util::toHex(bitfield, bitfieldLength).c_str());
  81. }
  82. void PeerConnection::writeOutgoingMessageLog(const char* msg, int index, int begin, int length) const {
  83. logger->info(MSG_SEND_PEER_MESSAGE_WITH_INDEX_BEGIN_LENGTH, cuid, peer->ipaddr.c_str(), peer->port, msg, index, begin, length);
  84. }
  85. void PeerConnection::sendChoke() const {
  86. /**
  87. * len --- 1, 4bytes
  88. * id --- 0, 1byte
  89. * total: 5bytes
  90. */
  91. char msg[5];
  92. createNLengthMessage(msg, sizeof(msg), 1, 0);
  93. writeOutgoingMessageLog("choke");
  94. socket->writeData(msg, sizeof(msg));
  95. }
  96. void PeerConnection::sendUnchoke() const {
  97. /**
  98. * len --- 1, 4bytes
  99. * id --- 1, 1byte
  100. * total: 5bytes
  101. */
  102. char msg[5];
  103. createNLengthMessage(msg, sizeof(msg), 1, 1);
  104. writeOutgoingMessageLog("unchoke");
  105. socket->writeData(msg, sizeof(msg));
  106. }
  107. void PeerConnection::sendInterested() const {
  108. /**
  109. * len --- 1, 4bytes
  110. * id --- 2, 1byte
  111. * total: 5bytes
  112. */
  113. char msg[5];
  114. createNLengthMessage(msg, sizeof(msg), 1, 2);
  115. writeOutgoingMessageLog("interested");
  116. socket->writeData(msg, sizeof(msg));
  117. }
  118. void PeerConnection::sendNotInterested() const {
  119. /**
  120. * len --- 1, 4bytes
  121. * id --- 3, 1byte
  122. * total: 5bytes
  123. */
  124. char msg[5];
  125. createNLengthMessage(msg, sizeof(msg), 1, 3);
  126. writeOutgoingMessageLog("not interested");
  127. socket->writeData(msg, sizeof(msg));
  128. }
  129. void PeerConnection::sendHave(int index) const {
  130. /**
  131. * len --- 5, 4bytes
  132. * id --- 4, 1byte
  133. * piece index --- index, 4bytes
  134. * total: 9bytes
  135. */
  136. char msg[9];
  137. createNLengthMessage(msg, sizeof(msg), 5, 4);
  138. setIntParam(&msg[5], index);
  139. writeOutgoingMessageLog("have", index);
  140. socket->writeData(msg, sizeof(msg));
  141. }
  142. void PeerConnection::sendBitfield() const {
  143. int len = torrentMan->getBitfieldLength();
  144. const unsigned char* bitfield = torrentMan->getBitfield();
  145. /**
  146. * len --- 1+len, 4bytes
  147. * id --- 5, 1byte
  148. * bitfield --- bitfield, len bytes
  149. * total: 5+len bytes
  150. */
  151. int msgLen = 5+len;
  152. char* msg = new char[msgLen];
  153. try {
  154. createNLengthMessage(msg, msgLen, 1+len, 5);
  155. writeOutgoingMessageLog("bitfield", bitfield, len);
  156. socket->writeData(msg, msgLen);
  157. delete [] msg;
  158. } catch(Exception* ex) {
  159. delete [] msg;
  160. throw;
  161. }
  162. }
  163. void PeerConnection::sendRequest(int index, int begin, int length) const {
  164. /**
  165. * len --- 13, 4bytes
  166. * id --- 6, 1byte
  167. * index --- index, 4bytes
  168. * begin --- begin, 4bytes
  169. * length --- length, 4bytes
  170. * total: 17bytes
  171. */
  172. char msg[17];
  173. createNLengthMessage(msg, sizeof(msg), 13, 6);
  174. setIntParam(&msg[5], index);
  175. setIntParam(&msg[9], begin);
  176. setIntParam(&msg[13], length);
  177. writeOutgoingMessageLog("request", index, begin, length);
  178. socket->writeData(msg, sizeof(msg));
  179. }
  180. void PeerConnection::sendPiece(int index, int begin, int length) const {
  181. /**
  182. * len --- 9+length, 4bytes
  183. * id --- 7, 1byte
  184. * index --- index, 4bytes
  185. * begin --- begin, 4bytes
  186. * sub total: 13bytes
  187. * additionally,
  188. * block --- data, X bytes
  189. */
  190. char msg[13];
  191. createNLengthMessage(msg, sizeof(msg), 9+length, 7);
  192. setIntParam(&msg[5], index);
  193. setIntParam(&msg[9], begin);
  194. writeOutgoingMessageLog("piece", index, begin, length);
  195. socket->writeData(msg, sizeof(msg));
  196. int BUF_SIZE = 4096;
  197. char buf[BUF_SIZE];
  198. int iteration = length/BUF_SIZE;
  199. long long int pieceOffset = ((long long int)index*torrentMan->pieceLength)+begin;
  200. for(int i = 0; i < iteration; i++) {
  201. if(torrentMan->diskWriter->readData(buf, BUF_SIZE, pieceOffset+i*BUF_SIZE) < BUF_SIZE) {
  202. throw new DlAbortEx("piece reading failed.");
  203. }
  204. socket->writeData(buf, BUF_SIZE);
  205. }
  206. int rem = length%BUF_SIZE;
  207. if(rem > 0) {
  208. if(torrentMan->diskWriter->readData(buf, rem, pieceOffset+iteration*BUF_SIZE) < rem) {
  209. throw new DlAbortEx("piece reading failed.");
  210. }
  211. socket->writeData(buf, rem);
  212. }
  213. }
  214. void PeerConnection::sendPieceHeader(int index, int begin, int length) const {
  215. /**
  216. * len --- 9+length, 4bytes
  217. * id --- 7, 1byte
  218. * index --- index, 4bytes
  219. * begin --- begin, 4bytes
  220. * sub total: 13bytes
  221. * additionally,
  222. * block --- data, X bytes
  223. */
  224. char msg[13];
  225. createNLengthMessage(msg, sizeof(msg), 9+length, 7);
  226. setIntParam(&msg[5], index);
  227. setIntParam(&msg[9], begin);
  228. writeOutgoingMessageLog("piece", index, begin, length);
  229. socket->writeData(msg, sizeof(msg));
  230. }
  231. int PeerConnection::sendPieceData(long long int offset, int length) const {
  232. int BUF_SIZE = 256;
  233. char buf[BUF_SIZE];
  234. int iteration = length/BUF_SIZE;
  235. int writtenLength = 0;
  236. bool isWritable = true;
  237. for(int i = 0; i < iteration; i++) {
  238. isWritable = socket->isWritable(0);
  239. if(!isWritable) {
  240. return writtenLength;
  241. }
  242. if(torrentMan->diskWriter->readData(buf, BUF_SIZE, offset+i*BUF_SIZE) < BUF_SIZE) {
  243. throw new DlAbortEx("piece reading failed.");
  244. }
  245. socket->writeData(buf, BUF_SIZE);
  246. writtenLength += BUF_SIZE;
  247. }
  248. if(socket->isWritable(0)) {
  249. int rem = length%BUF_SIZE;
  250. if(rem > 0) {
  251. if(torrentMan->diskWriter->readData(buf, rem, offset+iteration*BUF_SIZE) < rem) {
  252. throw new DlAbortEx("piece reading failed.");
  253. }
  254. socket->writeData(buf, rem);
  255. writtenLength += rem;
  256. }
  257. }
  258. return writtenLength;
  259. }
  260. void PeerConnection::sendCancel(int index, int begin, int length) const {
  261. /**
  262. * len --- 13, 4bytes
  263. * id --- 8, 1byte
  264. * index --- index, 4bytes
  265. * begin --- begin, 4bytes
  266. * length -- length, 4bytes
  267. * total: 17bytes
  268. */
  269. char msg[17];
  270. createNLengthMessage(msg, sizeof(msg), 13, 8);
  271. setIntParam(&msg[5], index);
  272. setIntParam(&msg[9], begin);
  273. setIntParam(&msg[13], length);
  274. writeOutgoingMessageLog("cancel", index, begin, length);
  275. socket->writeData(msg, sizeof(msg));
  276. }
  277. PeerMessage* PeerConnection::receiveMessage() {
  278. if(!socket->isReadable(0)) {
  279. return NULL;
  280. }
  281. if(resbufLength == 0 && lenbufLength != 4) {
  282. // read payload size, 4-byte integer
  283. int remain = 4-lenbufLength;
  284. int temp = remain;
  285. socket->readData(lenbuf+lenbufLength, temp);
  286. if(temp == 0) {
  287. // we got EOF
  288. throw new DlAbortEx(EX_EOF_FROM_PEER);
  289. }
  290. if(remain != temp) {
  291. // still 4-temp bytes to go
  292. lenbufLength += temp;
  293. return NULL;
  294. }
  295. //payloadLen = ntohl(nPayloadLen);
  296. int payloadLength = ntohl(*((int*)lenbuf));
  297. if(payloadLength > MAX_PAYLOAD_LEN) {
  298. throw new DlAbortEx("max payload length exceeded. length = %d",
  299. payloadLength);
  300. }
  301. currentPayloadLength = payloadLength;
  302. }
  303. // we have currentPayloadLen-resbufLen bytes to read
  304. int remaining = currentPayloadLength-resbufLength;
  305. if(remaining > 0) {
  306. socket->readData(resbuf+resbufLength, remaining);
  307. if(remaining == 0) {
  308. // we got EOF
  309. throw new DlAbortEx(EX_EOF_FROM_PEER);
  310. }
  311. resbufLength += remaining;
  312. if(currentPayloadLength != resbufLength) {
  313. return NULL;
  314. }
  315. }
  316. // we got whole payload.
  317. resbufLength = 0;
  318. lenbufLength = 0;
  319. PeerMessage* peerMessage = PeerMessageUtil::createPeerMessage(resbuf, currentPayloadLength);
  320. try {
  321. PeerMessageUtil::checkIntegrity(peerMessage, torrentMan->pieceLength,
  322. torrentMan->pieces, torrentMan->totalSize);
  323. } catch(Exception* e) {
  324. delete peerMessage;
  325. throw;
  326. }
  327. return peerMessage;
  328. }
  329. HandshakeMessage* PeerConnection::receiveHandshake() {
  330. if(!socket->isReadable(0)) {
  331. return NULL;
  332. }
  333. int remain = HANDSHAKE_MESSAGE_LENGTH-resbufLength;
  334. int temp = remain;
  335. socket->readData(resbuf+resbufLength, temp);
  336. if(temp == 0) {
  337. // we got EOF
  338. throw new DlAbortEx(EX_EOF_FROM_PEER);
  339. }
  340. if(remain != temp) {
  341. resbufLength += temp;
  342. return NULL;
  343. }
  344. // we got whole handshake payload
  345. resbufLength = 0;
  346. HandshakeMessage* handshakeMessage = PeerMessageUtil::createHandshakeMessage(resbuf);
  347. try {
  348. PeerMessageUtil::checkHandshake(handshakeMessage, torrentMan->getInfoHash());
  349. } catch(Exception* e) {
  350. delete handshakeMessage;
  351. throw;
  352. }
  353. return handshakeMessage;
  354. }