Browse Source

2008-04-22 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>

	Reuse socket if the origin server supports HTTP keep-alive and
	--enable-http-keep-alive or --enable-http-pipelining is given.
	The current implementation doesn't reuse the connections 
established
	against proxy server.

	* src/AbstractCommand.cc
	* src/DownloadEngine.cc
	* src/DownloadEngine.h
	* src/HttpDownloadCommand.cc
	* src/HttpInitiateConnectionCommand.cc
	* src/HttpResponseCommand.cc
Tatsuhiro Tsujikawa 17 years ago
parent
commit
5f55bc197f

+ 14 - 0
ChangeLog

@@ -1,3 +1,17 @@
+2008-04-22  Tatsuhiro Tsujikawa  <tujikawa at rednoah dot com>
+
+	Reuse socket if the origin server supports HTTP keep-alive and
+	--enable-http-keep-alive or --enable-http-pipelining is given.
+	The current implementation doesn't reuse the connections established
+	against proxy server.
+
+	* src/AbstractCommand.cc
+	* src/DownloadEngine.cc
+	* src/DownloadEngine.h
+	* src/HttpDownloadCommand.cc
+	* src/HttpInitiateConnectionCommand.cc
+	* src/HttpResponseCommand.cc
+
 2008-04-22  Tatsuhiro Tsujikawa  <tujikawa at rednoah dot com>
 
 	Rewritten HTTP keep-alive and pipelining routine.

+ 1 - 0
src/AbstractCommand.cc

@@ -133,6 +133,7 @@ bool AbstractCommand::execute() {
 	  _segments.push_back(segment);
 	}
 	if(_segments.empty()) {
+	  // TODO socket could be pooled here if pipelining is enabled...
 	  logger->info(MSG_NO_SEGMENT_AVAILABLE, cuid);
 	  return prepareForRetry(1);
 	}

+ 27 - 0
src/DownloadEngine.cc

@@ -46,6 +46,7 @@
 #include "TimeA2.h"
 #include "a2time.h"
 #include "Socket.h"
+#include "Util.h"
 #include <signal.h>
 #include <cstring>
 #include <algorithm>
@@ -361,4 +362,30 @@ void DownloadEngine::addRoutineCommand(Command* command)
   _routineCommands.push_back(command);
 }
 
+void DownloadEngine::poolSocket(const std::string& ipaddr, uint16_t port,
+                               const SharedHandle<SocketCore>& sock)
+{
+  std::string addr = ipaddr+":"+Util::uitos(port);
+  logger->info("Pool socket for %s", addr.c_str());
+  std::multimap<std::string, SharedHandle<SocketCore> >::value_type newPair
+    (addr, sock);
+  _socketPool.insert(newPair);
+}
+
+SharedHandle<SocketCore>
+DownloadEngine::popPooledSocket(const std::string& ipaddr, uint16_t port)
+{
+  std::string addr = ipaddr+":"+Util::uitos(port);
+  std::multimap<std::string, SharedHandle<SocketCore> >::iterator i =
+    _socketPool.find(addr);
+  if(i == _socketPool.end()) {
+    return SharedHandle<SocketCore>();
+  } else {
+    logger->info("Reuse socket for %s", addr.c_str());
+    SharedHandle<SocketCore> s = (*i).second;
+    _socketPool.erase(i);
+    return s;
+  }
+}
+
 } // namespace aria2

+ 10 - 0
src/DownloadEngine.h

@@ -39,6 +39,7 @@
 #include "SharedHandle.h"
 #include "Command.h"
 #include <deque>
+#include <map>
 #include "a2netcompat.h"
 
 namespace aria2 {
@@ -104,6 +105,9 @@ private:
 
   bool _haltRequested;
 
+  // key = IP address:port, value = Socket
+  std::multimap<std::string, SharedHandle<SocketCore> > _socketPool;
+ 
   void shortSleep() const;
   bool addSocket(const SocketEntry& socketEntry);
   bool deleteSocket(const SocketEntry& socketEntry);
@@ -170,6 +174,12 @@ public:
   void setNoWait(bool b);
 
   void addRoutineCommand(Command* command);
+
+  void poolSocket(const std::string& ipaddr, uint16_t port,
+		  const SharedHandle<SocketCore>& sock);
+
+  SharedHandle<SocketCore> popPooledSocket(const std::string& ipaddr,
+					   uint16_t port);
 };
 
 typedef SharedHandle<DownloadEngine> DownloadEngineHandle;

+ 13 - 0
src/HttpDownloadCommand.cc

@@ -41,6 +41,8 @@
 #include "HttpRequest.h"
 #include "Segment.h"
 #include "Socket.h"
+#include "prefs.h"
+#include "Option.h"
 
 namespace aria2 {
 
@@ -61,6 +63,17 @@ bool HttpDownloadCommand::prepareForNextSegment() {
     e->commands.push_back(command);
     return true;
   } else {
+    if(!e->option->getAsBool(PREF_HTTP_PROXY_ENABLED)) {
+      if(req->isPipeliningEnabled() ||
+	 (req->isKeepAliveEnabled() &&
+	  ((!transferDecoder.isNull() && _requestGroup->downloadFinished()) ||
+	   (uint64_t)_segments.front()->getPositionToWrite() == _requestGroup->getTotalLength()))) {
+	std::pair<std::string, uint16_t> peerInfo;
+	socket->getPeerInfo(peerInfo);
+	e->poolSocket(peerInfo.first, peerInfo.second, socket);
+      }
+    }
+
     return DownloadCommand::prepareForNextSegment();
   }
 }

+ 10 - 3
src/HttpInitiateConnectionCommand.cc

@@ -103,9 +103,16 @@ bool HttpInitiateConnectionCommand::executeInternal() {
       throw new DlAbortEx("ERROR");
     }
   } else {
-    logger->info(MSG_CONNECTING_TO_SERVER, cuid, req->getHost().c_str(),
-		 req->getPort());
-    socket->establishConnection(hostname, req->getPort());
+    SharedHandle<SocketCore> pooledSocket =
+      e->popPooledSocket(hostname, req->getPort());
+
+    if(pooledSocket.isNull()) {
+      logger->info(MSG_CONNECTING_TO_SERVER, cuid, req->getHost().c_str(),
+		   req->getPort());
+      socket->establishConnection(hostname, req->getPort());
+    } else {
+      socket = pooledSocket;
+    }
     SharedHandle<HttpConnection> httpConnection(new HttpConnection(cuid, socket, e->option));
     command = new HttpRequestCommand(cuid, req, _requestGroup, httpConnection,
 				     e, socket);

+ 7 - 1
src/HttpResponseCommand.cc

@@ -145,7 +145,13 @@ bool HttpResponseCommand::handleDefaultEncoding(const HttpResponseHandle& httpRe
     File file(_requestGroup->getFilePath());
 
     SegmentHandle segment = _requestGroup->getSegmentMan()->getSegment(cuid, 0);
-    if(!segment.isNull() && segment->getPositionToWrite() == 0) {
+    // pipelining requires implicit range specified. But the request for
+    // this response most likely dones't contains range header. This means
+    // we can't continue to use this socket because server sends all entity
+    // body instead of a segment.
+    // Therefore, we shutdown the socket here if pipelining is enabled.
+    if(!segment.isNull() && segment->getPositionToWrite() == 0 &&
+       !req->isPipeliningEnabled()) {
       command = createHttpDownloadCommand(httpResponse);
     } else {
       _requestGroup->getSegmentMan()->cancelSegment(cuid);