Browse Source

SocketRecvBuffer: Eliminate memmove

Since we don't try to read into buffer if buffer is not empty, we
don't have to memmove things.  This commit mostly rewrites
SocketRecvBuffer.
Tatsuhiro Tsujikawa 10 năm trước cách đây
mục cha
commit
692a674fe0

+ 1 - 1
src/DownloadCommand.cc

@@ -187,7 +187,7 @@ bool DownloadCommand::executeInternal() {
                                getSocketRecvBuffer()->getBufferLength());
                                getSocketRecvBuffer()->getBufferLength());
       bufSize = streamFilter_->getBytesProcessed();
       bufSize = streamFilter_->getBytesProcessed();
     }
     }
-    getSocketRecvBuffer()->shiftBuffer(bufSize);
+    getSocketRecvBuffer()->drain(bufSize);
     peerStat_->updateDownloadLength(bufSize);
     peerStat_->updateDownloadLength(bufSize);
     getDownloadContext()->updateDownloadLength(bufSize);
     getDownloadContext()->updateDownloadLength(bufSize);
   }
   }

+ 2 - 2
src/HttpConnection.cc

@@ -157,12 +157,12 @@ std::unique_ptr<HttpResponse> HttpConnection::receiveResponse()
     httpResponse->setHttpHeader(proc->getResult());
     httpResponse->setHttpHeader(proc->getResult());
     httpResponse->setHttpRequest(outstandingHttpRequests_.front()->
     httpResponse->setHttpRequest(outstandingHttpRequests_.front()->
                                  popHttpRequest());
                                  popHttpRequest());
-    socketRecvBuffer_->shiftBuffer(proc->getLastBytesProcessed());
+    socketRecvBuffer_->drain(proc->getLastBytesProcessed());
     outstandingHttpRequests_.pop_front();
     outstandingHttpRequests_.pop_front();
     return httpResponse;
     return httpResponse;
   }
   }
 
 
-  socketRecvBuffer_->shiftBuffer(proc->getLastBytesProcessed());
+  socketRecvBuffer_->drain(proc->getLastBytesProcessed());
   return nullptr;
   return nullptr;
 }
 }
 
 

+ 3 - 3
src/HttpServer.cc

@@ -142,7 +142,7 @@ bool HttpServer::receiveRequest()
     lastRequestHeader_ = headerProcessor_->getResult();
     lastRequestHeader_ = headerProcessor_->getResult();
     A2_LOG_INFO(fmt("HTTP Server received request\n%s",
     A2_LOG_INFO(fmt("HTTP Server received request\n%s",
                     headerProcessor_->getHeaderString().c_str()));
                     headerProcessor_->getHeaderString().c_str()));
-    socketRecvBuffer_->shiftBuffer(headerProcessor_->getLastBytesProcessed());
+    socketRecvBuffer_->drain(headerProcessor_->getLastBytesProcessed());
     bodyConsumed_ = 0;
     bodyConsumed_ = 0;
     if(setupResponseRecv() < 0) {
     if(setupResponseRecv() < 0) {
       A2_LOG_INFO("Request path is invaild. Ignore the request body.");
       A2_LOG_INFO("Request path is invaild. Ignore the request body.");
@@ -175,7 +175,7 @@ bool HttpServer::receiveRequest()
     }
     }
     return true;
     return true;
   } else {
   } else {
-    socketRecvBuffer_->shiftBuffer(headerProcessor_->getLastBytesProcessed());
+    socketRecvBuffer_->drain(headerProcessor_->getLastBytesProcessed());
     return false;
     return false;
   }
   }
 }
 }
@@ -197,7 +197,7 @@ bool HttpServer::receiveBody()
   if(lastBody_) {
   if(lastBody_) {
     lastBody_->writeData(socketRecvBuffer_->getBuffer(), length, 0);
     lastBody_->writeData(socketRecvBuffer_->getBuffer(), length, 0);
   }
   }
-  socketRecvBuffer_->shiftBuffer(length);
+  socketRecvBuffer_->drain(length);
   bodyConsumed_ += length;
   bodyConsumed_ += length;
   return lastContentLength_ == bodyConsumed_;
   return lastContentLength_ == bodyConsumed_;
 }
 }

+ 1 - 1
src/HttpSkipResponseCommand.cc

@@ -138,7 +138,7 @@ bool HttpSkipResponseCommand::executeInternal()
                                  getSocketRecvBuffer()->getBufferLength());
                                  getSocketRecvBuffer()->getBufferLength());
         bufSize = streamFilter_->getBytesProcessed();
         bufSize = streamFilter_->getBytesProcessed();
       }
       }
-      getSocketRecvBuffer()->shiftBuffer(bufSize);
+      getSocketRecvBuffer()->drain(bufSize);
     }
     }
     if(totalLength_ != 0 && eof) {
     if(totalLength_ != 0 && eof) {
       throw DL_RETRY_EX(EX_GOT_EOF);
       throw DL_RETRY_EX(EX_GOT_EOF);

+ 19 - 18
src/SocketRecvBuffer.cc

@@ -39,17 +39,11 @@
 
 
 #include "SocketCore.h"
 #include "SocketCore.h"
 #include "LogFactory.h"
 #include "LogFactory.h"
-#include "a2functional.h"
 
 
 namespace aria2 {
 namespace aria2 {
 
 
-SocketRecvBuffer::SocketRecvBuffer
-(std::shared_ptr<SocketCore> socket,
- size_t capacity)
-  : socket_(std::move(socket)),
-    capacity_(capacity),
-    buf_(make_unique<unsigned char[]>(capacity_)),
-    bufLen_(0)
+SocketRecvBuffer::SocketRecvBuffer(std::shared_ptr<SocketCore> socket)
+  : socket_(std::move(socket)), pos_(buf_.data()), last_(pos_)
 {}
 {}
 
 
 SocketRecvBuffer::~SocketRecvBuffer()
 SocketRecvBuffer::~SocketRecvBuffer()
@@ -57,21 +51,28 @@ SocketRecvBuffer::~SocketRecvBuffer()
 
 
 ssize_t SocketRecvBuffer::recv()
 ssize_t SocketRecvBuffer::recv()
 {
 {
-  size_t len = capacity_ - bufLen_;
-  if(len > 0) {
-    socket_->readData(buf_.get() + bufLen_, len);
-    bufLen_ += len;
-  } else {
+  size_t n = std::end(buf_) - last_;
+  if (n == 0) {
     A2_LOG_DEBUG("Buffer full");
     A2_LOG_DEBUG("Buffer full");
+    return 0;
   }
   }
-  return len;
+  socket_->readData(last_, n);
+  last_ += n;
+  return n;
 }
 }
 
 
-void SocketRecvBuffer::shiftBuffer(size_t offset)
+void SocketRecvBuffer::drain(size_t n)
 {
 {
-  assert(offset <= bufLen_);
-  memmove(buf_.get(), buf_.get() + offset, bufLen_ - offset);
-  bufLen_ -= offset;
+  assert(pos_ + n <= last_);
+  pos_ += n;
+  if (pos_ == last_) {
+    truncateBuffer();
+  }
+}
+
+void SocketRecvBuffer::truncateBuffer()
+{
+  pos_ = last_ = buf_.data();
 }
 }
 
 
 } // namespace aria2
 } // namespace aria2

+ 13 - 17
src/SocketRecvBuffer.h

@@ -38,6 +38,7 @@
 #include "common.h"
 #include "common.h"
 
 
 #include <memory>
 #include <memory>
+#include <array>
 
 
 namespace aria2 {
 namespace aria2 {
 
 
@@ -45,21 +46,17 @@ class SocketCore;
 
 
 class SocketRecvBuffer {
 class SocketRecvBuffer {
 public:
 public:
-  SocketRecvBuffer
-  (std::shared_ptr<SocketCore> socket,
-   size_t capacity = 16*1024);
+  SocketRecvBuffer(std::shared_ptr<SocketCore> socket);
   ~SocketRecvBuffer();
   ~SocketRecvBuffer();
   // Reads data from socket as much as capacity allows. Returns the
   // Reads data from socket as much as capacity allows. Returns the
   // number of bytes read.
   // number of bytes read.
   ssize_t recv();
   ssize_t recv();
-  // Shifts buffer by offset bytes. offset must satisfy offset <=
-  // getBufferLength().
-  void shiftBuffer(size_t offset);
   // Truncates the contents of buffer to 0.
   // Truncates the contents of buffer to 0.
-  void clearBuffer()
-  {
-    bufLen_ = 0;
-  }
+  void truncateBuffer();
+  // Drains first n bytes of data from buffer.  It is an programmer's
+  // responsibility to ensure that n is smaller or equal to the
+  // buffered data.
+  void drain(size_t n);
 
 
   const std::shared_ptr<SocketCore>& getSocket() const
   const std::shared_ptr<SocketCore>& getSocket() const
   {
   {
@@ -68,25 +65,24 @@ public:
 
 
   const unsigned char* getBuffer() const
   const unsigned char* getBuffer() const
   {
   {
-    return buf_.get();
+    return pos_;
   }
   }
 
 
   size_t getBufferLength() const
   size_t getBufferLength() const
   {
   {
-    return bufLen_;
+    return last_ - pos_;
   }
   }
 
 
   bool bufferEmpty() const
   bool bufferEmpty() const
   {
   {
-    return bufLen_ == 0;
+    return pos_ == last_;
   }
   }
 
 
-  void pushBuffer(const unsigned char* data, size_t len);
 private:
 private:
+  std::array<unsigned char, 16384> buf_;
   std::shared_ptr<SocketCore> socket_;
   std::shared_ptr<SocketCore> socket_;
-  size_t capacity_;
-  std::unique_ptr<unsigned char[]> buf_;
-  size_t bufLen_;
+  unsigned char *pos_;
+  unsigned char *last_;
 };
 };
 
 
 } // namespace aria2
 } // namespace aria2