瀏覽代碼

2008-09-28 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>

	Fixed the bug that aria2 may send duplicate data to BitTorrent 
peer if
	whole message data is not sent in one send() call.
	* src/BtPieceMessage.cc
	* src/BtPieceMessage.h
	* src/PeerConnection.cc
	* src/PeerConnection.h
	* src/SimpleBtMessage.cc
	* src/SimpleBtMessage.h
Tatsuhiro Tsujikawa 17 年之前
父節點
當前提交
4f280af10b
共有 7 個文件被更改,包括 67 次插入74 次删除
  1. 11 0
      ChangeLog
  2. 22 48
      src/BtPieceMessage.cc
  3. 0 4
      src/BtPieceMessage.h
  4. 15 5
      src/PeerConnection.cc
  5. 4 0
      src/PeerConnection.h
  6. 15 15
      src/SimpleBtMessage.cc
  7. 0 2
      src/SimpleBtMessage.h

+ 11 - 0
ChangeLog

@@ -1,3 +1,14 @@
+2008-09-28  Tatsuhiro Tsujikawa  <tujikawa at rednoah dot com>
+
+	Fixed the bug that aria2 may send duplicate data to BitTorrent peer if
+	whole message data is not sent in one send() call.
+	* src/BtPieceMessage.cc
+	* src/BtPieceMessage.h
+	* src/PeerConnection.cc
+	* src/PeerConnection.h
+	* src/SimpleBtMessage.cc
+	* src/SimpleBtMessage.h
+	
 2008-09-28  Tatsuhiro Tsujikawa  <tujikawa at rednoah dot com>
 
 	Fixed the bug that aria2 downloads faster than max-download-limit when

+ 22 - 48
src/BtPieceMessage.cc

@@ -53,6 +53,7 @@
 #include "StringFormat.h"
 #include <cstring>
 #include <cstdlib>
+#include <cassert>
 
 namespace aria2 {
 
@@ -140,61 +141,34 @@ void BtPieceMessage::send() {
   if(invalidate) {
     return;
   }
-  if(!headerSent) {
-    if(!sendingInProgress) {
-      logger->info(MSG_SEND_PEER_MESSAGE,
-		   cuid, peer->ipaddr.c_str(), peer->port,
-		   toString().c_str());
-      getMessageHeader();
-      leftDataLength = getMessageHeaderLength();
-      sendingInProgress = true;
-    }
-    size_t writtenLength
-      = peerConnection->sendMessage(msgHeader+getMessageHeaderLength()-leftDataLength,
-				    leftDataLength);
-    if(writtenLength == leftDataLength) {
-      headerSent = true;
-      leftDataLength = blockLength;
-    } else {
-      leftDataLength -= writtenLength;
-    }
-  }
-  if(headerSent) {
-    sendingInProgress = false;
+  if(!sendingInProgress) {
+    logger->info(MSG_SEND_PEER_MESSAGE,
+		 cuid, peer->ipaddr.c_str(), peer->port,
+		 toString().c_str());
+    getMessageHeader();
+    peerConnection->sendMessage(msgHeader, getMessageHeaderLength());
     off_t pieceDataOffset =
-      (off_t)index*btContext->getPieceLength()+begin+blockLength-leftDataLength;
-    size_t writtenLength = sendPieceData(pieceDataOffset, leftDataLength);
+      (off_t)index*btContext->getPieceLength()+begin;
+    size_t writtenLength = sendPieceData(pieceDataOffset, blockLength);
+    logger->debug("msglength = %zu bytes",
+		  getMessageHeaderLength()+blockLength);
+    peer->updateUploadLength(writtenLength);
+  } else {
+    ssize_t writtenLength = peerConnection->sendPendingData();
     peer->updateUploadLength(writtenLength);
-    if(writtenLength < leftDataLength) {
-      sendingInProgress = true;
-    }
-    leftDataLength -= writtenLength;
   }
+  sendingInProgress = !peerConnection->sendBufferIsEmpty();
 }
 
 size_t BtPieceMessage::sendPieceData(off_t offset, size_t length) const {
-  size_t BUF_SIZE = 256;
-  unsigned char buf[BUF_SIZE];
-  div_t res = div(length, BUF_SIZE);
-  size_t writtenLength = 0;
-  for(int i = 0; i < res.quot; i++) {
-    if((size_t)pieceStorage->getDiskAdaptor()->readData(buf, BUF_SIZE, offset+i*BUF_SIZE) < BUF_SIZE) {
-      throw DlAbortEx(EX_DATA_READ);
-    }
-    size_t ws = peerConnection->sendMessage(buf, BUF_SIZE);
-    writtenLength += ws;
-    if(ws != BUF_SIZE) {
-      return writtenLength;
-    }
-  }
-  if(res.rem > 0) {
-    if(pieceStorage->getDiskAdaptor()->readData(buf, res.rem, offset+res.quot*BUF_SIZE) < res.rem) {
-      throw DlAbortEx(EX_DATA_READ);
-    }
-    size_t ws = peerConnection->sendMessage(buf, res.rem);
-    writtenLength += ws;
+  assert(length <= 16*1024);
+  unsigned char buf[16*1024];
+  if(pieceStorage->getDiskAdaptor()->readData(buf, length, offset) ==
+     static_cast<ssize_t>(length)) {
+    return peerConnection->sendMessage(buf, length);
+  } else {
+    throw DlAbortEx(EX_DATA_READ);
   }
-  return writtenLength;
 }
 
 std::string BtPieceMessage::toString() const {

+ 0 - 4
src/BtPieceMessage.h

@@ -52,8 +52,6 @@ private:
   uint32_t begin;
   uint32_t blockLength;
   unsigned char* block;
-  size_t leftDataLength;
-  bool headerSent;
   unsigned char* msgHeader;
 
   static size_t MESSAGE_HEADER_LENGTH;
@@ -99,8 +97,6 @@ public:
      begin(begin),
      blockLength(blockLength),
      block(0),
-     leftDataLength(0),
-     headerSent(false),
      msgHeader(0)
   {
     uploading = true;

+ 15 - 5
src/PeerConnection.cc

@@ -69,11 +69,9 @@ PeerConnection::~PeerConnection() {}
 ssize_t PeerConnection::sendMessage(const unsigned char* data,
 				    size_t dataLength)
 {
-  if(socket->isWritable(0)) {
-    return sendData(data, dataLength, _encryptionEnabled);
-  } else {
-    return 0;
-  }
+  ssize_t writtenLength = sendData(data, dataLength, _encryptionEnabled);
+  logger->debug("sent %d byte(s).", writtenLength);
+  return writtenLength;
 }
 
 bool PeerConnection::receiveMessage(unsigned char* data, size_t& dataLength) {
@@ -231,4 +229,16 @@ void PeerConnection::presetBuffer(const unsigned char* data, size_t length)
   resbufLength = length;
 }
 
+bool PeerConnection::sendBufferIsEmpty() const
+{
+  return _socketBuffer.sendBufferIsEmpty();
+}
+
+ssize_t PeerConnection::sendPendingData()
+{
+  ssize_t writtenLength = _socketBuffer.send();
+  logger->debug("sent %d byte(s).", writtenLength);
+  return writtenLength;
+}
+
 } // namespace aria2

+ 4 - 0
src/PeerConnection.h

@@ -99,6 +99,10 @@ public:
 			const SharedHandle<ARC4Decryptor>& decryptor);
 
   void presetBuffer(const unsigned char* data, size_t length);
+
+  bool sendBufferIsEmpty() const;
+  
+  ssize_t sendPendingData();
 };
 
 typedef SharedHandle<PeerConnection> PeerConnectionHandle;

+ 15 - 15
src/SimpleBtMessage.cc

@@ -41,7 +41,7 @@
 
 namespace aria2 {
 
-SimpleBtMessage::SimpleBtMessage():leftDataLength(0) {}
+SimpleBtMessage::SimpleBtMessage() {}//:leftDataLength(0) {}
 
 SimpleBtMessage::~SimpleBtMessage() {}
 
@@ -49,22 +49,22 @@ void SimpleBtMessage::send() {
   if(invalidate) {
     return;
   }
-  if(sendPredicate() || sendingInProgress) {
+  if(!sendPredicate() && !sendingInProgress) {
+    return;
+  }
+  if(!sendingInProgress) {
     const unsigned char* msg = getMessage();
     size_t msgLength = getMessageLength();
-    if(!sendingInProgress) {
-      logger->info(MSG_SEND_PEER_MESSAGE,
-		   cuid, peer->ipaddr.c_str(), peer->port, toString().c_str());
-      leftDataLength = getMessageLength();
-    }
-    sendingInProgress = false;
-    size_t writtenLength = peerConnection->sendMessage(msg+msgLength-leftDataLength, leftDataLength);
-    if(writtenLength == leftDataLength) {
-      onSendComplete();
-    } else {
-      leftDataLength -= writtenLength;
-      sendingInProgress = true;
-    }
+    logger->info(MSG_SEND_PEER_MESSAGE,
+		 cuid, peer->ipaddr.c_str(), peer->port, toString().c_str());
+    logger->debug("msglength = %zu bytes", msgLength);
+    peerConnection->sendMessage(msg, msgLength);
+  } else {
+    peerConnection->sendPendingData();
+  }
+  sendingInProgress = !peerConnection->sendBufferIsEmpty();
+  if(!sendingInProgress) {
+    onSendComplete();
   }
 }
 

+ 0 - 2
src/SimpleBtMessage.h

@@ -40,8 +40,6 @@
 namespace aria2 {
 
 class SimpleBtMessage : public AbstractBtMessage {
-private:
-  size_t leftDataLength;
 public:
   SimpleBtMessage();