BtPieceMessage.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. /* <!-- copyright */
  2. /*
  3. * aria2 - The high speed download utility
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * In addition, as a special exception, the copyright holders give
  22. * permission to link the code of portions of this program with the
  23. * OpenSSL library under certain conditions as described in each
  24. * individual source file, and distribute linked combinations
  25. * including the two.
  26. * You must obey the GNU General Public License in all respects
  27. * for all of the code used other than OpenSSL. If you modify
  28. * file(s) with this exception, you may extend this exception to your
  29. * version of the file(s), but you are not obligated to do so. If you
  30. * do not wish to do so, delete this exception statement from your
  31. * version. If you delete this exception statement from all source
  32. * files in the program, then also delete it here.
  33. */
  34. /* copyright --> */
  35. #include "BtPieceMessage.h"
  36. #include <cstring>
  37. #include <cstdlib>
  38. #include <cassert>
  39. #include "bittorrent_helper.h"
  40. #include "util.h"
  41. #include "message.h"
  42. #include "DlAbortEx.h"
  43. #include "message_digest_helper.h"
  44. #include "DiskAdaptor.h"
  45. #include "Logger.h"
  46. #include "LogFactory.h"
  47. #include "Peer.h"
  48. #include "Piece.h"
  49. #include "PieceStorage.h"
  50. #include "BtMessageDispatcher.h"
  51. #include "BtMessageFactory.h"
  52. #include "BtRequestFactory.h"
  53. #include "PeerConnection.h"
  54. #include "fmt.h"
  55. #include "DownloadContext.h"
  56. #include "PeerStorage.h"
  57. #include "array_fun.h"
  58. #include "WrDiskCache.h"
  59. #include "WrDiskCacheEntry.h"
  60. #include "DownloadFailureException.h"
  61. namespace aria2 {
  62. const char BtPieceMessage::NAME[] = "piece";
  63. BtPieceMessage::BtPieceMessage
  64. (size_t index, int32_t begin, int32_t blockLength)
  65. : AbstractBtMessage(ID, NAME),
  66. index_(index),
  67. begin_(begin),
  68. blockLength_(blockLength),
  69. data_(0)
  70. {
  71. setUploading(true);
  72. }
  73. BtPieceMessage::~BtPieceMessage()
  74. {}
  75. void BtPieceMessage::setMsgPayload(const unsigned char* data)
  76. {
  77. data_ = data;
  78. }
  79. BtPieceMessage* BtPieceMessage::create
  80. (const unsigned char* data, size_t dataLength)
  81. {
  82. bittorrent::assertPayloadLengthGreater(9, dataLength, NAME);
  83. bittorrent::assertID(ID, data, NAME);
  84. BtPieceMessage* message(new BtPieceMessage());
  85. message->setIndex(bittorrent::getIntParam(data, 1));
  86. message->setBegin(bittorrent::getIntParam(data, 5));
  87. message->setBlockLength(dataLength-9);
  88. return message;
  89. }
  90. void BtPieceMessage::doReceivedAction()
  91. {
  92. if(isMetadataGetMode()) {
  93. return;
  94. }
  95. RequestSlot slot = getBtMessageDispatcher()->getOutstandingRequest
  96. (index_, begin_, blockLength_);
  97. getPeer()->updateDownloadLength(blockLength_);
  98. downloadContext_->updateDownloadLength(blockLength_);
  99. if(!RequestSlot::isNull(slot)) {
  100. getPeer()->snubbing(false);
  101. SharedHandle<Piece> piece = getPieceStorage()->getPiece(index_);
  102. int64_t offset =
  103. static_cast<int64_t>(index_)*downloadContext_->getPieceLength()+begin_;
  104. A2_LOG_DEBUG(fmt(MSG_PIECE_RECEIVED,
  105. getCuid(),
  106. static_cast<unsigned long>(index_),
  107. begin_,
  108. blockLength_,
  109. static_cast<int64_t>(offset),
  110. static_cast<unsigned long>(slot.getBlockIndex())));
  111. if(piece->hasBlock(slot.getBlockIndex())) {
  112. A2_LOG_DEBUG("Already have this block.");
  113. return;
  114. }
  115. if(piece->getWrDiskCacheEntry()) {
  116. // Write Disk Cache enabled. Unfortunately, it incurs extra data
  117. // copy.
  118. unsigned char* dataCopy = new unsigned char[blockLength_];
  119. memcpy(dataCopy, data_+9, blockLength_);
  120. piece->updateWrCache(getPieceStorage()->getWrDiskCache(),
  121. dataCopy, 0, blockLength_, blockLength_, offset);
  122. } else {
  123. getPieceStorage()->getDiskAdaptor()->writeData(data_+9, blockLength_,
  124. offset);
  125. }
  126. piece->completeBlock(slot.getBlockIndex());
  127. A2_LOG_DEBUG(fmt(MSG_PIECE_BITFIELD, getCuid(),
  128. util::toHex(piece->getBitfield(),
  129. piece->getBitfieldLength()).c_str()));
  130. piece->updateHash(begin_, data_+9, blockLength_);
  131. getBtMessageDispatcher()->removeOutstandingRequest(slot);
  132. if(piece->pieceComplete()) {
  133. if(checkPieceHash(piece)) {
  134. onNewPiece(piece);
  135. } else {
  136. onWrongPiece(piece);
  137. peerStorage_->addBadPeer(getPeer()->getIPAddress());
  138. throw DL_ABORT_EX("Bad piece hash.");
  139. }
  140. }
  141. } else {
  142. A2_LOG_DEBUG(fmt("CUID#%" PRId64 " - RequestSlot not found, index=%lu, begin=%d",
  143. getCuid(),
  144. static_cast<unsigned long>(index_),
  145. begin_));
  146. }
  147. }
  148. size_t BtPieceMessage::MESSAGE_HEADER_LENGTH = 13;
  149. unsigned char* BtPieceMessage::createMessageHeader()
  150. {
  151. /**
  152. * len --- 9+blockLength, 4bytes
  153. * id --- 7, 1byte
  154. * index --- index, 4bytes
  155. * begin --- begin, 4bytes
  156. * total: 13bytes
  157. */
  158. unsigned char* msgHeader = new unsigned char[MESSAGE_HEADER_LENGTH];
  159. bittorrent::createPeerMessageString(msgHeader, MESSAGE_HEADER_LENGTH,
  160. 9+blockLength_, ID);
  161. bittorrent::setIntParam(&msgHeader[5], index_);
  162. bittorrent::setIntParam(&msgHeader[9], begin_);
  163. return msgHeader;
  164. }
  165. size_t BtPieceMessage::getMessageHeaderLength()
  166. {
  167. return MESSAGE_HEADER_LENGTH;
  168. }
  169. namespace {
  170. struct PieceSendUpdate : public ProgressUpdate {
  171. PieceSendUpdate(const SharedHandle<Peer>& peer)
  172. : peer(peer) {}
  173. virtual void update(size_t length, bool complete)
  174. {
  175. peer->updateUploadLength(length);
  176. }
  177. SharedHandle<Peer> peer;
  178. };
  179. } // namespace
  180. void BtPieceMessage::send()
  181. {
  182. if(isInvalidate()) {
  183. return;
  184. }
  185. A2_LOG_INFO(fmt(MSG_SEND_PEER_MESSAGE,
  186. getCuid(),
  187. getPeer()->getIPAddress().c_str(),
  188. getPeer()->getPort(),
  189. toString().c_str()));
  190. getPeerConnection()->pushBytes(createMessageHeader(),
  191. getMessageHeaderLength());
  192. int64_t pieceDataOffset =
  193. static_cast<int64_t>(index_)*downloadContext_->getPieceLength()+begin_;
  194. pushPieceData(pieceDataOffset, blockLength_);
  195. }
  196. void BtPieceMessage::pushPieceData(int64_t offset, int32_t length) const
  197. {
  198. assert(length <= 16*1024);
  199. array_ptr<unsigned char> buf(new unsigned char[length]);
  200. ssize_t r;
  201. r = getPieceStorage()->getDiskAdaptor()->readData(buf, length, offset);
  202. if(r == length) {
  203. unsigned char* dbuf = buf;
  204. buf.reset(0);
  205. getPeerConnection()->pushBytes(dbuf, length,
  206. new PieceSendUpdate(getPeer()));
  207. // To avoid upload rate overflow, we update the length here at
  208. // once.
  209. downloadContext_->updateUploadLength(length);
  210. } else {
  211. throw DL_ABORT_EX(EX_DATA_READ);
  212. }
  213. }
  214. std::string BtPieceMessage::toString() const
  215. {
  216. return fmt("%s index=%lu, begin=%d, length=%d",
  217. NAME,
  218. static_cast<unsigned long>(index_),
  219. begin_, blockLength_);
  220. }
  221. bool BtPieceMessage::checkPieceHash(const SharedHandle<Piece>& piece)
  222. {
  223. if(!getPieceStorage()->isEndGame() && piece->isHashCalculated()) {
  224. A2_LOG_DEBUG(fmt("Hash is available!! index=%lu",
  225. static_cast<unsigned long>(piece->getIndex())));
  226. return
  227. piece->getDigest() == downloadContext_->getPieceHash(piece->getIndex());
  228. } else {
  229. A2_LOG_DEBUG(fmt("Calculating hash index=%lu",
  230. static_cast<unsigned long>(piece->getIndex())));
  231. try {
  232. return piece->getDigestWithWrCache(downloadContext_->getPieceLength(),
  233. getPieceStorage()->getDiskAdaptor())
  234. == downloadContext_->getPieceHash(piece->getIndex());
  235. } catch(RecoverableException& e) {
  236. piece->clearAllBlock(getPieceStorage()->getWrDiskCache());
  237. throw;
  238. }
  239. }
  240. }
  241. void BtPieceMessage::onNewPiece(const SharedHandle<Piece>& piece)
  242. {
  243. if(piece->getWrDiskCacheEntry()) {
  244. // We flush cached data whenever an whole piece is retrieved.
  245. piece->flushWrCache(getPieceStorage()->getWrDiskCache());
  246. if(piece->getWrDiskCacheEntry()->getError() !=
  247. WrDiskCacheEntry::CACHE_ERR_SUCCESS) {
  248. piece->clearAllBlock(getPieceStorage()->getWrDiskCache());
  249. throw DOWNLOAD_FAILURE_EXCEPTION2
  250. (fmt("Write disk cache flush failure index=%lu",
  251. static_cast<unsigned long>(piece->getIndex())),
  252. piece->getWrDiskCacheEntry()->getErrorCode());
  253. }
  254. }
  255. A2_LOG_INFO(fmt(MSG_GOT_NEW_PIECE,
  256. getCuid(),
  257. static_cast<unsigned long>(piece->getIndex())));
  258. getPieceStorage()->completePiece(piece);
  259. getPieceStorage()->advertisePiece(getCuid(), piece->getIndex());
  260. }
  261. void BtPieceMessage::onWrongPiece(const SharedHandle<Piece>& piece)
  262. {
  263. A2_LOG_INFO(fmt(MSG_GOT_WRONG_PIECE,
  264. getCuid(),
  265. static_cast<unsigned long>(piece->getIndex())));
  266. piece->clearAllBlock(getPieceStorage()->getWrDiskCache());
  267. piece->destroyHashContext();
  268. getBtRequestFactory()->removeTargetPiece(piece);
  269. }
  270. void BtPieceMessage::onChokingEvent(const BtChokingEvent& event)
  271. {
  272. if(!isInvalidate() &&
  273. !getPeer()->isInAmAllowedIndexSet(index_)) {
  274. A2_LOG_DEBUG(fmt(MSG_REJECT_PIECE_CHOKED,
  275. getCuid(),
  276. static_cast<unsigned long>(index_),
  277. begin_,
  278. blockLength_));
  279. if(getPeer()->isFastExtensionEnabled()) {
  280. SharedHandle<BtMessage> rej =
  281. getBtMessageFactory()->createRejectMessage
  282. (index_, begin_, blockLength_);
  283. getBtMessageDispatcher()->addMessageToQueue(rej);
  284. }
  285. setInvalidate(true);
  286. }
  287. }
  288. void BtPieceMessage::onCancelSendingPieceEvent
  289. (const BtCancelSendingPieceEvent& event)
  290. {
  291. if(!isInvalidate() &&
  292. index_ == event.getIndex() &&
  293. begin_ == event.getBegin() &&
  294. blockLength_ == event.getLength()) {
  295. A2_LOG_DEBUG(fmt(MSG_REJECT_PIECE_CANCEL,
  296. getCuid(),
  297. static_cast<unsigned long>(index_),
  298. begin_,
  299. blockLength_));
  300. if(getPeer()->isFastExtensionEnabled()) {
  301. SharedHandle<BtMessage> rej =
  302. getBtMessageFactory()->createRejectMessage
  303. (index_, begin_, blockLength_);
  304. getBtMessageDispatcher()->addMessageToQueue(rej);
  305. }
  306. setInvalidate(true);
  307. }
  308. }
  309. void BtPieceMessage::setDownloadContext
  310. (const SharedHandle<DownloadContext>& downloadContext)
  311. {
  312. downloadContext_ = downloadContext;
  313. }
  314. void BtPieceMessage::setPeerStorage
  315. (const SharedHandle<PeerStorage>& peerStorage)
  316. {
  317. peerStorage_ = peerStorage;
  318. }
  319. } // namespace aria2