فهرست منبع

2008-09-14 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>

	Use non-blocking socket for TCP communication to avoid possible
	blocking due to buffer full.
	* src/AbstractProxyRequestCommand.cc
	* src/FtpConnection.cc
	* src/FtpConnection.h
	* src/FtpNegotiationCommand.cc
	* src/FtpNegotiationCommand.h
	* src/HttpConnection.cc
	* src/HttpConnection.h
	* src/HttpRequestCommand.cc
	* src/InitiatorMSEHandshakeCommand.cc
	* src/InitiatorMSEHandshakeCommand.h
	* src/MSEHandshake.cc
	* src/MSEHandshake.h
	* src/Makefile.am
	* src/PeerConnection.cc
	* src/PeerConnection.h
	* src/PeerInteractionCommand.cc
	* src/PeerListenCommand.cc
	* src/ReceiverMSEHandshakeCommand.cc
	* src/ReceiverMSEHandshakeCommand.h
	* src/SocketBuffer.cc
	* src/SocketBuffer.h
	* src/SocketCore.cc
	* src/SocketCore.h
Tatsuhiro Tsujikawa 17 سال پیش
والد
کامیت
495f02ebbc

+ 28 - 0
ChangeLog

@@ -1,3 +1,31 @@
+2008-09-14  Tatsuhiro Tsujikawa  <tujikawa at rednoah dot com>
+
+	Use non-blocking socket for TCP communication to avoid possible
+	blocking due to buffer full.
+	* src/AbstractProxyRequestCommand.cc
+	* src/FtpConnection.cc
+	* src/FtpConnection.h
+	* src/FtpNegotiationCommand.cc
+	* src/FtpNegotiationCommand.h
+	* src/HttpConnection.cc
+	* src/HttpConnection.h
+	* src/HttpRequestCommand.cc
+	* src/InitiatorMSEHandshakeCommand.cc
+	* src/InitiatorMSEHandshakeCommand.h
+	* src/MSEHandshake.cc
+	* src/MSEHandshake.h
+	* src/Makefile.am
+	* src/PeerConnection.cc
+	* src/PeerConnection.h
+	* src/PeerInteractionCommand.cc
+	* src/PeerListenCommand.cc
+	* src/ReceiverMSEHandshakeCommand.cc
+	* src/ReceiverMSEHandshakeCommand.h
+	* src/SocketBuffer.cc
+	* src/SocketBuffer.h
+	* src/SocketCore.cc
+	* src/SocketCore.h
+
 2008-09-14  Tatsuhiro Tsujikawa  <tujikawa at rednoah dot com>
 
 	Call setStatusRealtime() in constructor.

+ 21 - 14
src/AbstractProxyRequestCommand.cc

@@ -51,7 +51,8 @@ AbstractProxyRequestCommand::AbstractProxyRequestCommand(int cuid,
 							 RequestGroup* requestGroup,
 							 DownloadEngine* e,
 							 const SocketHandle& s)
-  :AbstractCommand(cuid, req, requestGroup, e, s)
+  :AbstractCommand(cuid, req, requestGroup, e, s),
+   httpConnection(new HttpConnection(cuid, s, e->option))
 {
   setTimeout(e->option->getAsInt(PREF_CONNECT_TIMEOUT));
   disableReadCheckSocket();
@@ -61,19 +62,25 @@ AbstractProxyRequestCommand::AbstractProxyRequestCommand(int cuid,
 AbstractProxyRequestCommand::~AbstractProxyRequestCommand() {}
 
 bool AbstractProxyRequestCommand::executeInternal() {
-  socket->setBlockingMode();
-
-  HttpRequestHandle httpRequest(new HttpRequest());
-  httpRequest->setUserAgent(e->option->get(PREF_USER_AGENT));
-  httpRequest->setRequest(req);
-  httpRequest->configure(e->option);
-
-  httpConnection.reset(new HttpConnection(cuid, socket, e->option));
-
-  httpConnection->sendProxyRequest(httpRequest);
-
-  e->commands.push_back(getNextCommand());
-  return true;
+  //socket->setBlockingMode();
+  if(httpConnection->sendBufferIsEmpty()) {
+    HttpRequestHandle httpRequest(new HttpRequest());
+    httpRequest->setUserAgent(e->option->get(PREF_USER_AGENT));
+    httpRequest->setRequest(req);
+    httpRequest->configure(e->option);
+    
+    httpConnection->sendProxyRequest(httpRequest);
+  } else {
+    httpConnection->sendPendingData();
+  }
+  if(httpConnection->sendBufferIsEmpty()) {
+    e->commands.push_back(getNextCommand());
+    return true;
+  } else {
+    setWriteCheckSocket(socket);
+    e->commands.push_back(this);
+    return false;
+  }
 }
 
 } // namespace aria2

+ 112 - 62
src/FtpConnection.cc

@@ -48,6 +48,7 @@
 #include "Socket.h"
 #include "A2STR.h"
 #include <cstring>
+#include <cassert>
 
 namespace aria2 {
 
@@ -58,106 +59,155 @@ const std::string FtpConnection::I("I");
 FtpConnection::FtpConnection(int32_t cuid, const SocketHandle& socket,
 			     const RequestHandle& req, const Option* op):
   cuid(cuid), socket(socket), req(req), option(op),
-  logger(LogFactory::getInstance()) {}
+  logger(LogFactory::getInstance()),
+  _socketBuffer(socket) {}
 
 FtpConnection::~FtpConnection() {}
 
-void FtpConnection::sendUser() const
+bool FtpConnection::sendUser()
 {
-  std::string request = "USER "+AuthConfigFactorySingleton::instance()->createAuthConfig(req)->getUser()+"\r\n";
-  logger->info(MSG_SENDING_REQUEST, cuid, "USER ********");
-  socket->writeData(request);
+  if(_socketBuffer.sendBufferIsEmpty()) {
+    std::string request = "USER "+
+      AuthConfigFactorySingleton::instance()->createAuthConfig(req)->
+      getUser()+"\r\n";
+    logger->info(MSG_SENDING_REQUEST, cuid, "USER ********");
+    _socketBuffer.feedSendBuffer(request);
+  }
+  _socketBuffer.send();
+  return _socketBuffer.sendBufferIsEmpty();
 }
 
-void FtpConnection::sendPass() const
+bool FtpConnection::sendPass()
 {
-  std::string request = "PASS "+AuthConfigFactorySingleton::instance()->createAuthConfig(req)->getPassword()+"\r\n";
-  logger->info(MSG_SENDING_REQUEST, cuid, "PASS ********");
-  socket->writeData(request);
+  if(_socketBuffer.sendBufferIsEmpty()) {
+    std::string request = "PASS "+
+      AuthConfigFactorySingleton::instance()->createAuthConfig(req)->
+      getPassword()+"\r\n";
+    logger->info(MSG_SENDING_REQUEST, cuid, "PASS ********");
+    _socketBuffer.feedSendBuffer(request);
+  }
+  _socketBuffer.send();
+  return _socketBuffer.sendBufferIsEmpty();
 }
 
-void FtpConnection::sendType() const
+bool FtpConnection::sendType()
 {
-  std::string type;
-  if(option->get(PREF_FTP_TYPE) == V_ASCII) {
-    type = FtpConnection::A;
-  } else {
-    type = FtpConnection::I;
+  if(_socketBuffer.sendBufferIsEmpty()) {
+    std::string type;
+    if(option->get(PREF_FTP_TYPE) == V_ASCII) {
+      type = FtpConnection::A;
+    } else {
+      type = FtpConnection::I;
+    }
+    std::string request = "TYPE "+type+"\r\n";
+    logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
+    _socketBuffer.feedSendBuffer(request);
   }
-  std::string request = "TYPE "+type+"\r\n";
-  logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
-  socket->writeData(request);
+  _socketBuffer.send();
+  return _socketBuffer.sendBufferIsEmpty();
 }
 
-void FtpConnection::sendCwd() const
+bool FtpConnection::sendCwd()
 {
-  std::string request = "CWD "+Util::urldecode(req->getDir())+"\r\n";
-  logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
-  socket->writeData(request);
+  if(_socketBuffer.sendBufferIsEmpty()) {
+    std::string request = "CWD "+Util::urldecode(req->getDir())+"\r\n";
+    logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
+    _socketBuffer.feedSendBuffer(request);
+  }
+  _socketBuffer.send();
+  return _socketBuffer.sendBufferIsEmpty();
 }
 
-void FtpConnection::sendMdtm() const
+bool FtpConnection::sendMdtm()
 {
-  std::string request = "MDTM "+Util::urlencode(req->getFile())+"\r\n";
-  logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
-  socket->writeData(request);
+  if(_socketBuffer.sendBufferIsEmpty()) {
+    std::string request = "MDTM "+Util::urlencode(req->getFile())+"\r\n";
+    logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
+    _socketBuffer.feedSendBuffer(request);
+  }
+  _socketBuffer.send();
+  return _socketBuffer.sendBufferIsEmpty();
 }
 
-void FtpConnection::sendSize() const
+bool FtpConnection::sendSize()
 {
-  std::string request = "SIZE "+Util::urldecode(req->getFile())+"\r\n";
-  logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
-  socket->writeData(request);
+  if(_socketBuffer.sendBufferIsEmpty()) {
+    std::string request = "SIZE "+Util::urldecode(req->getFile())+"\r\n";
+    logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
+    _socketBuffer.feedSendBuffer(request);
+  }
+  _socketBuffer.send();
+  return _socketBuffer.sendBufferIsEmpty();
 }
 
-void FtpConnection::sendPasv() const
+bool FtpConnection::sendPasv()
 {
-  static const std::string request("PASV\r\n");
-  logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
-  socket->writeData(request);
+  if(_socketBuffer.sendBufferIsEmpty()) {
+    static const std::string request("PASV\r\n");
+    logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
+    _socketBuffer.feedSendBuffer(request);
+  }
+  _socketBuffer.send();
+  return _socketBuffer.sendBufferIsEmpty();
 }
 
-SocketHandle FtpConnection::sendPort() const
+SharedHandle<SocketCore> FtpConnection::createServerSocket()
 {
-  SocketHandle serverSocket(new SocketCore());
+  SharedHandle<SocketCore> serverSocket(new SocketCore());
   serverSocket->bind(0);
   serverSocket->beginListen();
   serverSocket->setNonBlockingMode();
-
-  std::pair<std::string, uint16_t> addrinfo;
-  socket->getAddrInfo(addrinfo);
-  unsigned int ipaddr[4]; 
-  sscanf(addrinfo.first.c_str(), "%u.%u.%u.%u",
-	 &ipaddr[0], &ipaddr[1], &ipaddr[2], &ipaddr[3]);
-  serverSocket->getAddrInfo(addrinfo);
-  std::string request = "PORT "+
-    Util::uitos(ipaddr[0])+","+Util::uitos(ipaddr[1])+","+
-    Util::uitos(ipaddr[2])+","+Util::uitos(ipaddr[3])+","+
-    Util::uitos(addrinfo.second/256)+","+Util::uitos(addrinfo.second%256)+"\r\n";
-  logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
-  socket->writeData(request);
   return serverSocket;
 }
 
-void FtpConnection::sendRest(const SegmentHandle& segment) const
+bool FtpConnection::sendPort(const SharedHandle<SocketCore>& serverSocket)
 {
-  std::string request = "REST ";
-  if(segment.isNull()) {
-    request += "0";
-  } else {
-    request += Util::itos(segment->getPositionToWrite());
+  if(_socketBuffer.sendBufferIsEmpty()) {
+    std::pair<std::string, uint16_t> addrinfo;
+    socket->getAddrInfo(addrinfo);
+    unsigned int ipaddr[4]; 
+    sscanf(addrinfo.first.c_str(), "%u.%u.%u.%u",
+	   &ipaddr[0], &ipaddr[1], &ipaddr[2], &ipaddr[3]);
+    serverSocket->getAddrInfo(addrinfo);
+    std::string request = "PORT "+
+      Util::uitos(ipaddr[0])+","+Util::uitos(ipaddr[1])+","+
+      Util::uitos(ipaddr[2])+","+Util::uitos(ipaddr[3])+","+
+      Util::uitos(addrinfo.second/256)+","+
+      Util::uitos(addrinfo.second%256)+"\r\n";
+    logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
+    _socketBuffer.feedSendBuffer(request);
   }
-  request += "\r\n";
+  _socketBuffer.send();
+  return _socketBuffer.sendBufferIsEmpty();
+}
 
-  logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
-  socket->writeData(request);
+bool FtpConnection::sendRest(const SegmentHandle& segment)
+{
+  if(_socketBuffer.sendBufferIsEmpty()) {
+    std::string request = "REST ";
+    if(segment.isNull()) {
+      request += "0";
+    } else {
+      request += Util::itos(segment->getPositionToWrite());
+    }
+    request += "\r\n";
+    
+    logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
+    _socketBuffer.feedSendBuffer(request);
+  }
+  _socketBuffer.send();
+  return _socketBuffer.sendBufferIsEmpty();
 }
 
-void FtpConnection::sendRetr() const
+bool FtpConnection::sendRetr()
 {
-  std::string request = "RETR "+Util::urldecode(req->getFile())+"\r\n";
-  logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
-  socket->writeData(request);
+  if(_socketBuffer.sendBufferIsEmpty()) {
+    std::string request = "RETR "+Util::urldecode(req->getFile())+"\r\n";
+    logger->info(MSG_SENDING_REQUEST, cuid, request.c_str());
+    _socketBuffer.feedSendBuffer(request);
+  }
+  _socketBuffer.send();
+  return _socketBuffer.sendBufferIsEmpty();
 }
 
 unsigned int FtpConnection::getStatus(const std::string& response) const

+ 14 - 10
src/FtpConnection.h

@@ -38,6 +38,7 @@
 #include "common.h"
 #include "SharedHandle.h"
 #include "TimeA2.h"
+#include "SocketBuffer.h"
 #include <utility>
 #include <string>
 
@@ -59,6 +60,8 @@ private:
 
   std::string strbuf;
 
+  SocketBuffer _socketBuffer;
+
   unsigned int getStatus(const std::string& response) const;
   std::string::size_type findEndOfResponse(unsigned int status,
 					   const std::string& buf) const;
@@ -72,16 +75,17 @@ public:
   FtpConnection(int32_t cuid, const SharedHandle<SocketCore>& socket,
 		const SharedHandle<Request>& req, const Option* op);
   ~FtpConnection();
-  void sendUser() const;
-  void sendPass() const;
-  void sendType() const;
-  void sendCwd() const;
-  void sendMdtm() const;
-  void sendSize() const;
-  void sendPasv() const;
-  SharedHandle<SocketCore> sendPort() const;
-  void sendRest(const SharedHandle<Segment>& segment) const;
-  void sendRetr() const;
+  bool sendUser();
+  bool sendPass();
+  bool sendType();
+  bool sendCwd();
+  bool sendMdtm();
+  bool sendSize();
+  bool sendPasv();
+  SharedHandle<SocketCore> createServerSocket();
+  bool sendPort(const SharedHandle<SocketCore>& serverSocket);
+  bool sendRest(const SharedHandle<Segment>& segment);
+  bool sendRetr();
 
   unsigned int receiveResponse();
   unsigned int receiveSizeResponse(uint64_t& size);

+ 77 - 26
src/FtpNegotiationCommand.cc

@@ -105,7 +105,7 @@ bool FtpNegotiationCommand::executeInternal() {
     if(e->option->getAsBool(PREF_FTP_PASV)) {
       sequence = SEQ_SEND_PASV;
     } else {
-      sequence = SEQ_SEND_PORT;
+      sequence = SEQ_PREPARE_SERVER_SOCKET;
     }
     return false;
   } else {
@@ -116,7 +116,7 @@ bool FtpNegotiationCommand::executeInternal() {
 
 bool FtpNegotiationCommand::recvGreeting() {
   setTimeout(e->option->getAsInt(PREF_TIMEOUT));
-  socket->setBlockingMode();
+  //socket->setBlockingMode();
   disableWriteCheckSocket();
   setReadCheckSocket(socket);
 
@@ -133,8 +133,12 @@ bool FtpNegotiationCommand::recvGreeting() {
 }
 
 bool FtpNegotiationCommand::sendUser() {
-  ftp->sendUser();
-  sequence = SEQ_RECV_USER;
+  if(ftp->sendUser()) {
+    disableWriteCheckSocket();
+    sequence = SEQ_RECV_USER;
+  } else {
+    setWriteCheckSocket(socket);
+  }
   return false;
 }
 
@@ -156,8 +160,12 @@ bool FtpNegotiationCommand::recvUser() {
 }
 
 bool FtpNegotiationCommand::sendPass() {
-  ftp->sendPass();
-  sequence = SEQ_RECV_PASS;
+  if(ftp->sendPass()) {
+    disableWriteCheckSocket();
+    sequence = SEQ_RECV_PASS;
+  } else {
+    setWriteCheckSocket(socket);
+  }
   return false;
 }
 
@@ -174,8 +182,12 @@ bool FtpNegotiationCommand::recvPass() {
 }
 
 bool FtpNegotiationCommand::sendType() {
-  ftp->sendType();
-  sequence = SEQ_RECV_TYPE;
+  if(ftp->sendType()) {
+    disableWriteCheckSocket();
+    sequence = SEQ_RECV_TYPE;
+  } else {
+    setWriteCheckSocket(socket);
+  }
   return false;
 }
 
@@ -192,8 +204,14 @@ bool FtpNegotiationCommand::recvType() {
 }
 
 bool FtpNegotiationCommand::sendCwd() {
-  ftp->sendCwd();
-  sequence = SEQ_RECV_CWD;
+  // Calling setReadCheckSocket() is needed when the socket is reused, 
+  setReadCheckSocket(socket);
+  if(ftp->sendCwd()) {
+    disableWriteCheckSocket();
+    sequence = SEQ_RECV_CWD;
+  } else {
+    setWriteCheckSocket(socket);
+  }
   return false;
 }
 
@@ -217,8 +235,12 @@ bool FtpNegotiationCommand::recvCwd() {
 
 bool FtpNegotiationCommand::sendMdtm()
 {
-  ftp->sendMdtm();
-  sequence = SEQ_RECV_MDTM;
+  if(ftp->sendMdtm()) {
+    disableWriteCheckSocket();
+    sequence = SEQ_RECV_MDTM;
+  } else {
+    setWriteCheckSocket(socket);
+  }
   return false;
 }
 
@@ -251,8 +273,12 @@ bool FtpNegotiationCommand::recvMdtm()
 }
 
 bool FtpNegotiationCommand::sendSize() {
-  ftp->sendSize();
-  sequence = SEQ_RECV_SIZE;
+  if(ftp->sendSize()) {
+    disableWriteCheckSocket();
+    sequence = SEQ_RECV_SIZE;
+  } else {
+    setWriteCheckSocket(socket);
+  }
   return false;
 }
 
@@ -334,7 +360,7 @@ bool FtpNegotiationCommand::recvSize() {
       if(e->option->getAsBool(PREF_FTP_PASV)) {
 	sequence = SEQ_SEND_PASV;
       } else {
-	sequence = SEQ_SEND_PORT;
+	sequence = SEQ_PREPARE_SERVER_SOCKET;
       }
       return onFileSizeDetermined(0);
 
@@ -345,7 +371,7 @@ bool FtpNegotiationCommand::recvSize() {
   if(e->option->getAsBool(PREF_FTP_PASV)) {
     sequence = SEQ_SEND_PASV;
   } else {
-    sequence = SEQ_SEND_PORT;
+    sequence = SEQ_PREPARE_SERVER_SOCKET;
   }
   return true;
 }
@@ -355,10 +381,21 @@ void FtpNegotiationCommand::afterFileAllocation()
   setReadCheckSocket(socket);
 }
 
+bool FtpNegotiationCommand::prepareServerSocket()
+{
+  serverSocket = ftp->createServerSocket();
+  sequence = SEQ_SEND_PORT;
+  return true;
+}
+
 bool FtpNegotiationCommand::sendPort() {
   afterFileAllocation();
-  serverSocket = ftp->sendPort();
-  sequence = SEQ_RECV_PORT;
+  if(ftp->sendPort(serverSocket)) {
+    disableWriteCheckSocket();
+    sequence = SEQ_RECV_PORT;
+  } else {
+    setWriteCheckSocket(socket);
+  }
   return false;
 }
 
@@ -376,8 +413,12 @@ bool FtpNegotiationCommand::recvPort() {
 
 bool FtpNegotiationCommand::sendPasv() {
   afterFileAllocation();
-  ftp->sendPasv();
-  sequence = SEQ_RECV_PASV;
+  if(ftp->sendPasv()) {
+    disableWriteCheckSocket();
+    sequence = SEQ_RECV_PASV;
+  } else {
+    setWriteCheckSocket(socket);
+  }
   return false;
 }
 
@@ -405,15 +446,19 @@ bool FtpNegotiationCommand::recvPasv() {
 }
 
 bool FtpNegotiationCommand::sendRestPasv(const SegmentHandle& segment) {
-  dataSocket->setBlockingMode();
+  //dataSocket->setBlockingMode();
   setReadCheckSocket(socket);
   disableWriteCheckSocket();
   return sendRest(segment);
 }
 
 bool FtpNegotiationCommand::sendRest(const SegmentHandle& segment) {
-  ftp->sendRest(segment);
-  sequence = SEQ_RECV_REST;
+  if(ftp->sendRest(segment)) {
+    disableWriteCheckSocket();
+    sequence = SEQ_RECV_REST;
+  } else {
+    setWriteCheckSocket(socket);
+  }
   return false;
 }
 
@@ -434,8 +479,12 @@ bool FtpNegotiationCommand::recvRest(const SharedHandle<Segment>& segment) {
 }
 
 bool FtpNegotiationCommand::sendRetr() {
-  ftp->sendRetr();
-  sequence = SEQ_RECV_RETR;
+  if(ftp->sendRetr()) {
+    disableWriteCheckSocket();
+    sequence = SEQ_RECV_RETR;
+  } else {
+    setWriteCheckSocket(socket);
+  }
   return false;
 }
 
@@ -464,7 +513,7 @@ bool FtpNegotiationCommand::waitConnection()
   disableReadCheckSocket();
   setReadCheckSocket(socket);
   dataSocket.reset(serverSocket->acceptConnection());
-  dataSocket->setBlockingMode();
+  //dataSocket->setBlockingMode();
   sequence = SEQ_NEGOTIATION_COMPLETED;
   return false;
 }
@@ -498,6 +547,8 @@ bool FtpNegotiationCommand::processSequence(const SegmentHandle& segment) {
     return sendSize();
   case SEQ_RECV_SIZE:
     return recvSize();
+  case SEQ_PREPARE_SERVER_SOCKET:
+    return prepareServerSocket();
   case SEQ_SEND_PORT:
     return sendPort();
   case SEQ_RECV_PORT:

+ 2 - 0
src/FtpNegotiationCommand.h

@@ -58,6 +58,7 @@ public:
     SEQ_RECV_MDTM,
     SEQ_SEND_SIZE,
     SEQ_RECV_SIZE,
+    SEQ_PREPARE_SERVER_SOCKET,
     SEQ_SEND_PORT,
     SEQ_RECV_PORT,
     SEQ_SEND_PASV,
@@ -88,6 +89,7 @@ private:
   bool recvMdtm();
   bool sendSize();
   bool recvSize();
+  bool prepareServerSocket();
   bool sendPort();
   bool recvPort();
   bool sendPasv();

+ 15 - 3
src/HttpConnection.cc

@@ -72,7 +72,9 @@ HttpHeaderProcessorHandle HttpRequestEntry::getHttpHeaderProcessor() const
 HttpConnection::HttpConnection(int32_t cuid,
 			       const SocketHandle& socket,
 			       const Option* op):
-  cuid(cuid), socket(socket), option(op), logger(LogFactory::getInstance())
+  cuid(cuid), socket(socket),
+  _socketBuffer(socket),
+  option(op), logger(LogFactory::getInstance())
 {}
 
 std::string HttpConnection::eraseConfidentialInfo(const std::string& request)
@@ -98,7 +100,7 @@ void HttpConnection::sendRequest(const HttpRequestHandle& httpRequest)
 {
   std::string request = httpRequest->createRequest();
   logger->info(MSG_SENDING_REQUEST, cuid, eraseConfidentialInfo(request).c_str());
-  socket->writeData(request.c_str(), request.size());
+  _socketBuffer.feedAndSend(request);
   SharedHandle<HttpRequestEntry> entry(new HttpRequestEntry(httpRequest));
   outstandingHttpRequests.push_back(entry);
 }
@@ -107,7 +109,7 @@ void HttpConnection::sendProxyRequest(const HttpRequestHandle& httpRequest)
 {
   std::string request = httpRequest->createProxyRequest();
   logger->info(MSG_SENDING_REQUEST, cuid, eraseConfidentialInfo(request).c_str());
-  socket->writeData(request.c_str(), request.size());
+  _socketBuffer.feedAndSend(request);
   SharedHandle<HttpRequestEntry> entry(new HttpRequestEntry(httpRequest));
   outstandingHttpRequests.push_back(entry);
 }
@@ -179,4 +181,14 @@ SharedHandle<HttpRequest> HttpConnection::getFirstHttpRequest() const
   }
 }
 
+bool HttpConnection::sendBufferIsEmpty() const
+{
+  return _socketBuffer.sendBufferIsEmpty();
+}
+
+void HttpConnection::sendPendingData()
+{
+  _socketBuffer.send();
+}
+
 } // namespace aria2

+ 6 - 0
src/HttpConnection.h

@@ -37,6 +37,7 @@
 
 #include "common.h"
 #include "SharedHandle.h"
+#include "SocketBuffer.h"
 #include <string>
 #include <deque>
 
@@ -71,6 +72,7 @@ class HttpConnection {
 private:
   int32_t cuid;
   SharedHandle<SocketCore> socket;
+  SocketBuffer _socketBuffer;
   const Option* option;
   Logger* logger;
 
@@ -112,6 +114,10 @@ public:
   SharedHandle<HttpRequest> getFirstHttpRequest() const;
 
   bool isIssued(const SharedHandle<Segment>& segment) const;
+
+  bool sendBufferIsEmpty() const;
+
+  void sendPendingData();
 };
 
 typedef SharedHandle<HttpConnection> HttpConnectionHandle;

+ 35 - 24
src/HttpRequestCommand.cc

@@ -98,34 +98,45 @@ createHttpRequest(const SharedHandle<Request>& req,
 }
 
 bool HttpRequestCommand::executeInternal() {
-  socket->setBlockingMode();
-  if(req->getProtocol() == Request::PROTO_HTTPS) {
-    socket->initiateSecureConnection();
-  }
+  //socket->setBlockingMode();
+  if(_httpConnection->sendBufferIsEmpty()) {
+    if(req->getProtocol() == Request::PROTO_HTTPS) {
+      socket->initiateSecureConnection();
+    }
 
-  if(_segments.empty()) {
-    HttpRequestHandle httpRequest
-      (createHttpRequest(req, SharedHandle<Segment>(),
-			 _requestGroup->getTotalLength(), e->option,
-			 _requestGroup,
-			 e->getCookieStorage()));
-    _httpConnection->sendRequest(httpRequest);
-  } else {
-    for(Segments::iterator itr = _segments.begin(); itr != _segments.end(); ++itr) {
-      const SegmentHandle& segment = *itr;
-      if(!_httpConnection->isIssued(segment)) {
-	HttpRequestHandle httpRequest
-	  (createHttpRequest(req, segment,
-			     _requestGroup->getTotalLength(), e->option,
-			     _requestGroup,
-			     e->getCookieStorage()));
-	_httpConnection->sendRequest(httpRequest);
+    if(_segments.empty()) {
+      HttpRequestHandle httpRequest
+	(createHttpRequest(req, SharedHandle<Segment>(),
+			   _requestGroup->getTotalLength(), e->option,
+			   _requestGroup,
+			   e->getCookieStorage()));
+      _httpConnection->sendRequest(httpRequest);
+    } else {
+      for(Segments::iterator itr = _segments.begin(); itr != _segments.end(); ++itr) {
+	const SegmentHandle& segment = *itr;
+	if(!_httpConnection->isIssued(segment)) {
+	  HttpRequestHandle httpRequest
+	    (createHttpRequest(req, segment,
+			       _requestGroup->getTotalLength(), e->option,
+			       _requestGroup,
+			       e->getCookieStorage()));
+	  _httpConnection->sendRequest(httpRequest);
+	}
       }
     }
+  } else {
+    _httpConnection->sendPendingData();
+  }
+  if(_httpConnection->sendBufferIsEmpty()) {
+    Command* command = new HttpResponseCommand(cuid, req, _requestGroup,
+					       _httpConnection, e, socket);
+    e->commands.push_back(command);
+    return true;
+  } else {
+    setWriteCheckSocket(socket);
+    e->commands.push_back(this);
+    return false;
   }
-  Command* command = new HttpResponseCommand(cuid, req, _requestGroup, _httpConnection, e, socket);
-  e->commands.push_back(command);
-  return true;
 }
 
 } // namespace aria2

+ 25 - 5
src/InitiatorMSEHandshakeCommand.cc

@@ -93,21 +93,41 @@ bool InitiatorMSEHandshakeCommand::executeInternal() {
     }
     disableWriteCheckSocket();
     setReadCheckSocket(socket);
-    socket->setBlockingMode();
+    //socket->setBlockingMode();
     setTimeout(e->option->getAsInt(PREF_BT_TIMEOUT));
     _mseHandshake->initEncryptionFacility(true);
-    _mseHandshake->sendPublicKey();
-    _sequence = INITIATOR_WAIT_KEY;    
+    if(_mseHandshake->sendPublicKey()) {
+      _sequence = INITIATOR_WAIT_KEY;
+    } else {
+      setWriteCheckSocket(socket);
+      _sequence = INITIATOR_SEND_KEY_PENDING;
+    }
     break;
   }
+  case INITIATOR_SEND_KEY_PENDING:
+    if(_mseHandshake->sendPublicKey()) {
+      disableWriteCheckSocket();
+      _sequence = INITIATOR_WAIT_KEY;
+    }
+    break;
   case INITIATOR_WAIT_KEY: {
     if(_mseHandshake->receivePublicKey()) {
       _mseHandshake->initCipher(btContext->getInfoHash());
-      _mseHandshake->sendInitiatorStep2();
-      _sequence = INITIATOR_FIND_VC_MARKER;
+      if(_mseHandshake->sendInitiatorStep2()) {
+	_sequence = INITIATOR_FIND_VC_MARKER;
+      } else {
+	setWriteCheckSocket(socket);
+	_sequence = INITIATOR_SEND_STEP2_PENDING;
+      }
     }
     break;
   }
+  case INITIATOR_SEND_STEP2_PENDING:
+    if(_mseHandshake->sendInitiatorStep2()) {
+      disableWriteCheckSocket();
+      _sequence = INITIATOR_FIND_VC_MARKER;
+    }
+    break;
   case INITIATOR_FIND_VC_MARKER: {
     if(_mseHandshake->findInitiatorVCMarker()) {
       _sequence = INITIATOR_RECEIVE_PAD_D_LENGTH;

+ 2 - 0
src/InitiatorMSEHandshakeCommand.h

@@ -50,7 +50,9 @@ class InitiatorMSEHandshakeCommand : public PeerAbstractCommand,
 public:
   enum Seq {
     INITIATOR_SEND_KEY,
+    INITIATOR_SEND_KEY_PENDING,
     INITIATOR_WAIT_KEY,
+    INITIATOR_SEND_STEP2_PENDING,
     INITIATOR_FIND_VC_MARKER,
     INITIATOR_RECEIVE_PAD_D_LENGTH,
     INITIATOR_RECEIVE_PAD_D,

+ 75 - 61
src/MSEHandshake.cc

@@ -70,6 +70,7 @@ MSEHandshake::MSEHandshake(int32_t cuid,
   _option(op),
   _logger(LogFactory::getInstance()),
   _rbufLength(0),
+  _socketBuffer(socket),
   _negotiatedCryptoType(CRYPTO_NONE),
   _dh(0),
   _initiator(true),
@@ -120,15 +121,20 @@ void MSEHandshake::initEncryptionFacility(bool initiator)
   _initiator = initiator;
 }
 
-void MSEHandshake::sendPublicKey()
+bool MSEHandshake::sendPublicKey()
 {
-  _logger->debug("CUID#%d - Sending public key.", _cuid);
-  unsigned char buffer[KEY_LENGTH+MAX_PAD_LENGTH];
-  _dh->getPublicKey(buffer, KEY_LENGTH);
-
-  size_t padLength = SimpleRandomizer::getInstance()->getRandomNumber(MAX_PAD_LENGTH+1);
-  _dh->generateNonce(buffer+KEY_LENGTH, padLength);
-  _socket->writeData(buffer, KEY_LENGTH+padLength);
+  if(_socketBuffer.sendBufferIsEmpty()) {
+    _logger->debug("CUID#%d - Sending public key.", _cuid);
+    unsigned char buffer[KEY_LENGTH+MAX_PAD_LENGTH];
+    _dh->getPublicKey(buffer, KEY_LENGTH);
+
+    size_t padLength = SimpleRandomizer::getInstance()->getRandomNumber(MAX_PAD_LENGTH+1);
+    _dh->generateNonce(buffer+KEY_LENGTH, padLength);
+    _socketBuffer.feedSendBuffer(std::string(&buffer[0],
+					     &buffer[KEY_LENGTH+padLength]));
+  }
+  _socketBuffer.send();
+  return _socketBuffer.sendBufferIsEmpty();
 }
 
 bool MSEHandshake::receivePublicKey()
@@ -193,7 +199,7 @@ void MSEHandshake::encryptAndSendData(const unsigned char* data, size_t length)
   while(r > 0) {
     s = std::min(r, sizeof(temp));
     _encryptor->encrypt(temp, s, dptr, s);
-    _socket->writeData(temp, s);
+    _socketBuffer.feedSendBuffer(std::string(&temp[0], &temp[s]));
     dptr += s;
     r -= s;
   }
@@ -238,47 +244,51 @@ uint16_t MSEHandshake::decodeLength16(const unsigned char* buffer)
   return ntohs(be);
 }
 
-void MSEHandshake::sendInitiatorStep2()
+bool MSEHandshake::sendInitiatorStep2()
 {
-  _logger->debug("CUID#%d - Sending negotiation step2.", _cuid);
-  unsigned char md[20];
-  createReq1Hash(md);
-  _socket->writeData(md, sizeof(md));
-
-  createReq23Hash(md, _infoHash);
-  _socket->writeData(md, sizeof(md));
+  if(_socketBuffer.sendBufferIsEmpty()) {
+    _logger->debug("CUID#%d - Sending negotiation step2.", _cuid);
+    unsigned char md[20];
+    createReq1Hash(md);
+    _socketBuffer.feedSendBuffer(std::string(&md[0], &md[sizeof(md)]));
 
-  {
-    unsigned char buffer[8+4+2+MAX_PAD_LENGTH+2];
+    createReq23Hash(md, _infoHash);
+    _socketBuffer.feedSendBuffer(std::string(&md[0], &md[sizeof(md)]));
 
-    // VC
-    memcpy(buffer, VC, sizeof(VC));
-    // crypto_provide
-    unsigned char cryptoProvide[4];
-    memset(cryptoProvide, 0, sizeof(cryptoProvide));
-    if(_option->get(PREF_BT_MIN_CRYPTO_LEVEL) == V_PLAIN) {
-      cryptoProvide[3] = CRYPTO_PLAIN_TEXT;
-    }
-    cryptoProvide[3] |= CRYPTO_ARC4;
-    memcpy(buffer+8, cryptoProvide, sizeof(cryptoProvide));
-
-    // len(padC)
-    uint16_t padCLength = SimpleRandomizer::getInstance()->getRandomNumber(MAX_PAD_LENGTH+1);
-    {
-      uint16_t padCLengthBE = htons(padCLength);
-      memcpy(buffer+8+4, &padCLengthBE, sizeof(padCLengthBE));
-    }
-    // padC
-    memset(buffer+8+4+2, 0, padCLength);
-    // len(IA)
-    // currently, IA is zero-length.
-    uint16_t iaLength = 0;
     {
-      uint16_t iaLengthBE = htons(iaLength);
-      memcpy(buffer+8+4+2+padCLength, &iaLengthBE, sizeof(iaLengthBE));
+      unsigned char buffer[8+4+2+MAX_PAD_LENGTH+2];
+
+      // VC
+      memcpy(buffer, VC, sizeof(VC));
+      // crypto_provide
+      unsigned char cryptoProvide[4];
+      memset(cryptoProvide, 0, sizeof(cryptoProvide));
+      if(_option->get(PREF_BT_MIN_CRYPTO_LEVEL) == V_PLAIN) {
+	cryptoProvide[3] = CRYPTO_PLAIN_TEXT;
+      }
+      cryptoProvide[3] |= CRYPTO_ARC4;
+      memcpy(buffer+8, cryptoProvide, sizeof(cryptoProvide));
+
+      // len(padC)
+      uint16_t padCLength = SimpleRandomizer::getInstance()->getRandomNumber(MAX_PAD_LENGTH+1);
+      {
+	uint16_t padCLengthBE = htons(padCLength);
+	memcpy(buffer+8+4, &padCLengthBE, sizeof(padCLengthBE));
+      }
+      // padC
+      memset(buffer+8+4+2, 0, padCLength);
+      // len(IA)
+      // currently, IA is zero-length.
+      uint16_t iaLength = 0;
+      {
+	uint16_t iaLengthBE = htons(iaLength);
+	memcpy(buffer+8+4+2+padCLength, &iaLengthBE, sizeof(iaLengthBE));
+      }
+      encryptAndSendData(buffer, 8+4+2+padCLength+2);
     }
-    encryptAndSendData(buffer, 8+4+2+padCLength+2);
   }
+  _socketBuffer.send();
+  return _socketBuffer.sendBufferIsEmpty();
 }
 
 // This function reads exactly until the end of VC marker is reached.
@@ -498,25 +508,29 @@ bool MSEHandshake::receiveReceiverIA()
   return true;
 }
 
-void MSEHandshake::sendReceiverStep2()
+bool MSEHandshake::sendReceiverStep2()
 {
-  unsigned char buffer[8+4+2+MAX_PAD_LENGTH];
-  // VC
-  memcpy(buffer, VC, sizeof(VC));
-  // crypto_select
-  unsigned char cryptoSelect[4];
-  memset(cryptoSelect, 0, sizeof(cryptoSelect));
-  cryptoSelect[3] = _negotiatedCryptoType;
-  memcpy(buffer+8, cryptoSelect, sizeof(cryptoSelect));
-  // len(padD)
-  uint16_t padDLength = SimpleRandomizer::getInstance()->getRandomNumber(MAX_PAD_LENGTH+1);
-  {
-    uint16_t padDLengthBE = htons(padDLength);
-    memcpy(buffer+8+4, &padDLengthBE, sizeof(padDLengthBE));
+  if(_socketBuffer.sendBufferIsEmpty()) {
+    unsigned char buffer[8+4+2+MAX_PAD_LENGTH];
+    // VC
+    memcpy(buffer, VC, sizeof(VC));
+    // crypto_select
+    unsigned char cryptoSelect[4];
+    memset(cryptoSelect, 0, sizeof(cryptoSelect));
+    cryptoSelect[3] = _negotiatedCryptoType;
+    memcpy(buffer+8, cryptoSelect, sizeof(cryptoSelect));
+    // len(padD)
+    uint16_t padDLength = SimpleRandomizer::getInstance()->getRandomNumber(MAX_PAD_LENGTH+1);
+    {
+      uint16_t padDLengthBE = htons(padDLength);
+      memcpy(buffer+8+4, &padDLengthBE, sizeof(padDLengthBE));
+    }
+    // padD, all zeroed
+    memset(buffer+8+4+2, 0, padDLength);
+    encryptAndSendData(buffer, 8+4+2+padDLength);
   }
-  // padD, all zeroed
-  memset(buffer+8+4+2, 0, padDLength);
-  encryptAndSendData(buffer, 8+4+2+padDLength);
+  _socketBuffer.send();
+  return _socketBuffer.sendBufferIsEmpty();
 }
 
 uint16_t MSEHandshake::verifyPadLength(const unsigned char* padlenbuf, const char* padName)

+ 6 - 3
src/MSEHandshake.h

@@ -38,6 +38,7 @@
 #include "common.h"
 #include "SharedHandle.h"
 #include "BtConstants.h"
+#include "SocketBuffer.h"
 
 namespace aria2 {
 
@@ -83,6 +84,8 @@ private:
   unsigned char _rbuf[MAX_BUFFER_LENGTH];
   size_t _rbufLength;
 
+  SocketBuffer _socketBuffer;
+
   CRYPTO_TYPE _negotiatedCryptoType;
   DHKeyExchange* _dh;
   SharedHandle<ARC4Encryptor> _encryptor;
@@ -134,13 +137,13 @@ public:
 
   void initEncryptionFacility(bool initiator);
 
-  void sendPublicKey();
+  bool sendPublicKey();
 
   bool receivePublicKey();
 
   void initCipher(const unsigned char* infoHash);
 
-  void sendInitiatorStep2();
+  bool sendInitiatorStep2();
 
   bool findInitiatorVCMarker();
 
@@ -156,7 +159,7 @@ public:
 
   bool receiveReceiverIA();
 
-  void sendReceiverStep2();
+  bool sendReceiverStep2();
 
   // returns plain text IA
   const unsigned char* getIA() const;

+ 2 - 1
src/Makefile.am

@@ -194,7 +194,8 @@ SRCS =  Socket.h\
 	InOrderURISelector.cc InOrderURISelector.h\
 	ServerStatURISelector.cc ServerStatURISelector.h\
 	NsCookieParser.cc NsCookieParser.h\
-	CookieStorage.cc CookieStorage.h
+	CookieStorage.cc CookieStorage.h\
+	SocketBuffer.cc SocketBuffer.h
 
 if HAVE_LIBZ
 SRCS += GZipDecoder.cc GZipDecoder.h

+ 17 - 14
src/Makefile.in

@@ -413,7 +413,8 @@ am__libaria2c_a_SOURCES_DIST = Socket.h SocketCore.cc SocketCore.h \
 	ServerStatMan.h URISelector.h InOrderURISelector.cc \
 	InOrderURISelector.h ServerStatURISelector.cc \
 	ServerStatURISelector.h NsCookieParser.cc NsCookieParser.h \
-	CookieStorage.cc CookieStorage.h GZipDecoder.cc GZipDecoder.h \
+	CookieStorage.cc CookieStorage.h SocketBuffer.cc \
+	SocketBuffer.h GZipDecoder.cc GZipDecoder.h \
 	Sqlite3MozCookieParser.cc Sqlite3MozCookieParser.h \
 	AsyncNameResolver.cc AsyncNameResolver.h \
 	IteratableChunkChecksumValidator.cc \
@@ -807,12 +808,13 @@ am__objects_18 = SocketCore.$(OBJEXT) Command.$(OBJEXT) \
 	Signature.$(OBJEXT) ServerStat.$(OBJEXT) \
 	ServerStatMan.$(OBJEXT) InOrderURISelector.$(OBJEXT) \
 	ServerStatURISelector.$(OBJEXT) NsCookieParser.$(OBJEXT) \
-	CookieStorage.$(OBJEXT) $(am__objects_1) $(am__objects_2) \
-	$(am__objects_3) $(am__objects_4) $(am__objects_5) \
-	$(am__objects_6) $(am__objects_7) $(am__objects_8) \
-	$(am__objects_9) $(am__objects_10) $(am__objects_11) \
-	$(am__objects_12) $(am__objects_13) $(am__objects_14) \
-	$(am__objects_15) $(am__objects_16) $(am__objects_17)
+	CookieStorage.$(OBJEXT) SocketBuffer.$(OBJEXT) \
+	$(am__objects_1) $(am__objects_2) $(am__objects_3) \
+	$(am__objects_4) $(am__objects_5) $(am__objects_6) \
+	$(am__objects_7) $(am__objects_8) $(am__objects_9) \
+	$(am__objects_10) $(am__objects_11) $(am__objects_12) \
+	$(am__objects_13) $(am__objects_14) $(am__objects_15) \
+	$(am__objects_16) $(am__objects_17)
 am_libaria2c_a_OBJECTS = $(am__objects_18)
 libaria2c_a_OBJECTS = $(am_libaria2c_a_OBJECTS)
 am__installdirs = "$(DESTDIR)$(bindir)"
@@ -1135,13 +1137,13 @@ SRCS = Socket.h SocketCore.cc SocketCore.h BinaryStream.h Command.cc \
 	ServerStatMan.h URISelector.h InOrderURISelector.cc \
 	InOrderURISelector.h ServerStatURISelector.cc \
 	ServerStatURISelector.h NsCookieParser.cc NsCookieParser.h \
-	CookieStorage.cc CookieStorage.h $(am__append_1) \
-	$(am__append_2) $(am__append_3) $(am__append_4) \
-	$(am__append_5) $(am__append_6) $(am__append_7) \
-	$(am__append_8) $(am__append_9) $(am__append_10) \
-	$(am__append_11) $(am__append_12) $(am__append_13) \
-	$(am__append_14) $(am__append_15) $(am__append_16) \
-	$(am__append_17)
+	CookieStorage.cc CookieStorage.h SocketBuffer.cc \
+	SocketBuffer.h $(am__append_1) $(am__append_2) $(am__append_3) \
+	$(am__append_4) $(am__append_5) $(am__append_6) \
+	$(am__append_7) $(am__append_8) $(am__append_9) \
+	$(am__append_10) $(am__append_11) $(am__append_12) \
+	$(am__append_13) $(am__append_14) $(am__append_15) \
+	$(am__append_16) $(am__append_17)
 noinst_LIBRARIES = libaria2c.a
 libaria2c_a_SOURCES = $(SRCS)
 aria2c_LDADD = libaria2c.a @LIBINTL@ @ALLOCA@ @LIBGNUTLS_LIBS@\
@@ -1491,6 +1493,7 @@ distclean-compile:
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SizeMetalinkParserState.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SkipTagMetalinkParserState.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SleepCommand.Po@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SocketBuffer.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SocketCore.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/SpeedCalc.Po@am__quote@
 @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/Sqlite3MozCookieParser.Po@am__quote@

+ 10 - 7
src/PeerConnection.cc

@@ -59,17 +59,18 @@ PeerConnection::PeerConnection(int32_t cuid,
    resbufLength(0),
    currentPayloadLength(0),
    lenbufLength(0),
+   _socketBuffer(socket),
    _encryptionEnabled(false),
    _prevPeek(false)
 {}
 
 PeerConnection::~PeerConnection() {}
 
-ssize_t PeerConnection::sendMessage(const unsigned char* data, size_t dataLength) {
+ssize_t PeerConnection::sendMessage(const unsigned char* data,
+				    size_t dataLength)
+{
   if(socket->isWritable(0)) {
-    // TODO fix this
-    sendData(data, dataLength, _encryptionEnabled);
-    return dataLength;
+    return sendData(data, dataLength, _encryptionEnabled);
   } else {
     return 0;
   }
@@ -185,7 +186,8 @@ void PeerConnection::readData(unsigned char* data, size_t& length, bool encrypti
   }
 }
 
-void PeerConnection::sendData(const unsigned char* data, size_t length, bool encryption)
+ssize_t PeerConnection::sendData(const unsigned char* data,
+				 size_t length, bool encryption)
 {
   if(encryption) {
     unsigned char temp[4096];
@@ -194,13 +196,14 @@ void PeerConnection::sendData(const unsigned char* data, size_t length, bool enc
     while(r > 0) {
       size_t s = std::min(r, sizeof(temp));
       _encryptor->encrypt(temp, s, dptr, s);
-      socket->writeData(temp, s);
+      _socketBuffer.feedSendBuffer(std::string(&temp[0], &temp[s]));
       dptr += s;
       r -= s;
     }
   } else {
-    socket->writeData(data, length);
+    _socketBuffer.feedSendBuffer(std::string(&data[0], &data[length]));
   }
+  return _socketBuffer.send();
 }
 
 void PeerConnection::enableEncryption(const SharedHandle<ARC4Encryptor>& encryptor,

+ 4 - 1
src/PeerConnection.h

@@ -37,6 +37,7 @@
 
 #include "common.h"
 #include "SharedHandle.h"
+#include "SocketBuffer.h"
 #include <unistd.h>
 
 namespace aria2 {
@@ -64,6 +65,8 @@ private:
   unsigned char lenbuf[4];
   size_t lenbufLength;
 
+  SocketBuffer _socketBuffer;
+
   bool _encryptionEnabled;
   SharedHandle<ARC4Encryptor> _encryptor;
   SharedHandle<ARC4Decryptor> _decryptor;
@@ -72,7 +75,7 @@ private:
 
   void readData(unsigned char* data, size_t& length, bool encryption);
 
-  void sendData(const unsigned char* data, size_t length, bool encryption);
+  ssize_t sendData(const unsigned char* data, size_t length, bool encryption);
 
 public:
   PeerConnection(int32_t cuid, const SharedHandle<SocketCore>& socket, const Option* op);

+ 1 - 1
src/PeerInteractionCommand.cc

@@ -193,7 +193,7 @@ bool PeerInteractionCommand::executeInternal() {
     }
     disableWriteCheckSocket();
     setReadCheckSocket(socket);
-    socket->setBlockingMode();
+    //socket->setBlockingMode();
     setTimeout(e->option->getAsInt(PREF_BT_TIMEOUT));
     btInteractive->initiateHandshake();
     sequence = INITIATOR_WAIT_HANDSHAKE;

+ 1 - 1
src/PeerListenCommand.cc

@@ -99,7 +99,7 @@ bool PeerListenCommand::execute() {
 
       // Since peerSocket may be in non-blocking mode, make it blocking mode
       // here.
-      peerSocket->setBlockingMode();
+      //peerSocket->setBlockingMode();
 
       PeerHandle peer(new Peer(peerInfo.first, peerInfo.second, true));
       int32_t cuid = CUIDCounterSingletonHolder::instance()->newID();

+ 45 - 19
src/ReceiverMSEHandshakeCommand.cc

@@ -106,11 +106,21 @@ bool ReceiverMSEHandshakeCommand::executeInternal()
   }
   case RECEIVER_WAIT_KEY: {
     if(_mseHandshake->receivePublicKey()) {
-      _mseHandshake->sendPublicKey();
-      _sequence = RECEIVER_FIND_HASH_MARKER;
+      if(_mseHandshake->sendPublicKey()) {
+	_sequence = RECEIVER_FIND_HASH_MARKER;
+      } else {
+	setWriteCheckSocket(socket);
+	_sequence = RECEIVER_SEND_KEY_PENDING;
+      }
     }
     break;
   }
+  case RECEIVER_SEND_KEY_PENDING:
+    if(_mseHandshake->sendPublicKey()) {
+      disableWriteCheckSocket();
+      _sequence = RECEIVER_FIND_HASH_MARKER;
+    }
+    break;
   case RECEIVER_FIND_HASH_MARKER: {
     if(_mseHandshake->findReceiverHashMarker()) {
       _sequence = RECEIVER_RECEIVE_PAD_C_LENGTH;
@@ -137,30 +147,46 @@ bool ReceiverMSEHandshakeCommand::executeInternal()
   }
   case RECEIVER_RECEIVE_IA: {
     if(_mseHandshake->receiveReceiverIA()) {
-      _mseHandshake->sendReceiverStep2();
-      SharedHandle<PeerConnection> peerConnection
-	(new PeerConnection(cuid, socket, e->option));
-      if(_mseHandshake->getNegotiatedCryptoType() == MSEHandshake::CRYPTO_ARC4) {
-	peerConnection->enableEncryption(_mseHandshake->getEncryptor(),
-					 _mseHandshake->getDecryptor());
+      if(_mseHandshake->sendReceiverStep2()) {
+	createCommand();
+	return true;
+      } else {
+	setWriteCheckSocket(socket);
+	_sequence = RECEIVER_SEND_STEP2_PENDING;
       }
-      if(_mseHandshake->getIALength() > 0) {
-	peerConnection->presetBuffer(_mseHandshake->getIA(),
-				     _mseHandshake->getIALength());
-      }
-      // TODO add _mseHandshake->getInfoHash() to PeerReceiveHandshakeCommand
-      // as a hint. If this info hash and one in BitTorrent Handshake does not
-      // match, then drop connection.
-      Command* c =
-	new PeerReceiveHandshakeCommand(cuid, peer, e, socket, peerConnection);
-      e->commands.push_back(c);
-      return true;
     }
     break;
   }
+  case RECEIVER_SEND_STEP2_PENDING:
+    if(_mseHandshake->sendReceiverStep2()) {
+      disableWriteCheckSocket();
+      createCommand();
+      return true;
+    }
+    break;
   }
   e->commands.push_back(this);
   return false;
 }
 
+void ReceiverMSEHandshakeCommand::createCommand()
+{
+  SharedHandle<PeerConnection> peerConnection
+    (new PeerConnection(cuid, socket, e->option));
+  if(_mseHandshake->getNegotiatedCryptoType() == MSEHandshake::CRYPTO_ARC4) {
+    peerConnection->enableEncryption(_mseHandshake->getEncryptor(),
+				     _mseHandshake->getDecryptor());
+  }
+  if(_mseHandshake->getIALength() > 0) {
+    peerConnection->presetBuffer(_mseHandshake->getIA(),
+				 _mseHandshake->getIALength());
+  }
+  // TODO add _mseHandshake->getInfoHash() to PeerReceiveHandshakeCommand
+  // as a hint. If this info hash and one in BitTorrent Handshake does not
+  // match, then drop connection.
+  Command* c =
+    new PeerReceiveHandshakeCommand(cuid, peer, e, socket, peerConnection);
+  e->commands.push_back(c);
+}
+
 } // namespace aria2

+ 5 - 1
src/ReceiverMSEHandshakeCommand.h

@@ -49,16 +49,20 @@ public:
   enum Seq {
     RECEIVER_IDENTIFY_HANDSHAKE,
     RECEIVER_WAIT_KEY,
+    RECEIVER_SEND_KEY_PENDING,
     RECEIVER_FIND_HASH_MARKER,
     RECEIVER_RECEIVE_PAD_C_LENGTH,
     RECEIVER_RECEIVE_PAD_C,
     RECEIVER_RECEIVE_IA_LENGTH,
-    RECEIVER_RECEIVE_IA
+    RECEIVER_RECEIVE_IA,
+    RECEIVER_SEND_STEP2_PENDING,
   };
 private:
   Seq _sequence;
 
   MSEHandshake* _mseHandshake;
+
+  void createCommand();
 protected:
   virtual bool executeInternal();
   virtual bool exitBeforeExecute();

+ 74 - 0
src/SocketBuffer.cc

@@ -0,0 +1,74 @@
+/* <!-- copyright */
+/*
+ * aria2 - The high speed download utility
+ *
+ * Copyright (C) 2006 Tatsuhiro Tsujikawa
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ * In addition, as a special exception, the copyright holders give
+ * permission to link the code of portions of this program with the
+ * OpenSSL library under certain conditions as described in each
+ * individual source file, and distribute linked combinations
+ * including the two.
+ * You must obey the GNU General Public License in all respects
+ * for all of the code used other than OpenSSL.  If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so.  If you
+ * do not wish to do so, delete this exception statement from your
+ * version.  If you delete this exception statement from all source
+ * files in the program, then also delete it here.
+ */
+/* copyright --> */
+#include "SocketBuffer.h"
+#include "SocketCore.h"
+#include <cassert>
+
+namespace aria2 {
+
+SocketBuffer::SocketBuffer(const SharedHandle<SocketCore>& socket):
+  _socket(socket) {}
+
+SocketBuffer::~SocketBuffer() {}
+
+void SocketBuffer::feedSendBuffer(const std::string& data)
+{
+  _sendbuf += data;
+}
+
+ssize_t SocketBuffer::feedAndSend(const std::string& data)
+{
+  feedSendBuffer(data);
+  return send();
+}
+
+ssize_t SocketBuffer::send()
+{
+  if(_sendbuf.empty()) {
+    return 0;
+  }
+  ssize_t len = _socket->writeData(_sendbuf.c_str(),
+				   _sendbuf.size());
+  assert(len <= _sendbuf.size());
+  _sendbuf.erase(0, len);
+  return len;
+}
+
+bool SocketBuffer::sendBufferIsEmpty() const
+{
+  return _sendbuf.empty();
+}
+
+} // namespace aria2

+ 68 - 0
src/SocketBuffer.h

@@ -0,0 +1,68 @@
+/* <!-- copyright */
+/*
+ * aria2 - The high speed download utility
+ *
+ * Copyright (C) 2006 Tatsuhiro Tsujikawa
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ * In addition, as a special exception, the copyright holders give
+ * permission to link the code of portions of this program with the
+ * OpenSSL library under certain conditions as described in each
+ * individual source file, and distribute linked combinations
+ * including the two.
+ * You must obey the GNU General Public License in all respects
+ * for all of the code used other than OpenSSL.  If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so.  If you
+ * do not wish to do so, delete this exception statement from your
+ * version.  If you delete this exception statement from all source
+ * files in the program, then also delete it here.
+ */
+/* copyright --> */
+#ifndef _D_SOCKET_BUFFER_H_
+#define _D_SOCKET_BUFFER_H_
+
+#include "common.h"
+#include "SharedHandle.h"
+#include <string>
+
+namespace aria2 {
+
+class SocketCore;
+
+class SocketBuffer {
+private:
+  SharedHandle<SocketCore> _socket;
+
+  std::string _sendbuf;
+public:
+  SocketBuffer(const SharedHandle<SocketCore>& socket);
+
+  ~SocketBuffer();
+
+  void feedSendBuffer(const std::string& data);
+
+  ssize_t feedAndSend(const std::string& data);
+
+  ssize_t send();
+
+  bool sendBufferIsEmpty() const;
+
+};
+
+} // namespace aria2
+
+#endif // _D_SOCKET_BUFFER_H_

+ 23 - 7
src/SocketCore.cc

@@ -440,14 +440,16 @@ bool SocketCore::isReadable(time_t timeout)
 
 }
 
-void SocketCore::writeData(const char* data, size_t len)
+ssize_t SocketCore::writeData(const char* data, size_t len)
 {
   ssize_t ret = 0;
 
   if(!secure) {
     while((ret = send(sockfd, data, len, 0)) == -1 && errno == EINTR);
-    // TODO assuming Blocking mode.
-    if(ret == -1 || (size_t)ret != len) {
+    if(ret == -1 && errno == EAGAIN) {
+      ret = 0;
+    }
+    if(ret == -1) {
       throw DlRetryEx(StringFormat(EX_SOCKET_SEND, errorMsg()).str());
     }
   } else {
@@ -455,17 +457,31 @@ void SocketCore::writeData(const char* data, size_t len)
      // for SSL
      // TODO handling len == 0 case required
     ret = SSL_write(ssl, data, len);
-    if(ret <= 0 || (size_t)ret != len) {
-      throw DlRetryEx(StringFormat(EX_SOCKET_SEND, ERR_error_string(ERR_get_error(), NULL)).str());
+    if(ret < 0) {
+      switch(SSL_get_error(ssl, ret)) {
+      case SSL_ERROR_WANT_READ:
+      case SSL_ERROR_WANT_WRITE:
+	ret = 0;
+      }
+    }
+    if(ret <= 0) {
+      throw DlRetryEx(StringFormat(EX_SOCKET_SEND,
+				   ERR_error_string(ERR_get_error(), 0)).str());
     }
 #endif // HAVE_LIBSSL
 #ifdef HAVE_LIBGNUTLS
-    ret = gnutls_record_send(sslSession, data, len);
-    if(ret < 0 || (size_t)ret != len) {
+    while((ret = gnutls_record_send(sslSession, data, len)) ==
+	  GNUTLS_E_INTERRUPTED);
+    if(ret == GNUTLS_E_AGAIN) {
+      ret = 0;
+    }
+    if(ret < 0) {
       throw DlRetryEx(StringFormat(EX_SOCKET_SEND, gnutls_strerror(ret)).str());
     }
 #endif // HAVE_LIBGNUTLS
   }
+
+  return ret;
 }
 
 void SocketCore::readData(char* data, size_t& len)

+ 5 - 5
src/SocketCore.h

@@ -196,14 +196,14 @@ public:
    * @param data data to write
    * @param len length of data
    */
-  void writeData(const char* data, size_t len);
-  void writeData(const std::string& msg)
+  ssize_t writeData(const char* data, size_t len);
+  ssize_t writeData(const std::string& msg)
   {
-    writeData(msg.c_str(), msg.size());
+    return writeData(msg.c_str(), msg.size());
   }
-  void writeData(const unsigned char* data, size_t len)
+  ssize_t writeData(const unsigned char* data, size_t len)
   {
-    writeData(reinterpret_cast<const char*>(data), len);
+    return writeData(reinterpret_cast<const char*>(data), len);
   }
 
   void writeData(const char* data, size_t len, const std::string& host, uint16_t port);