Browse Source

Append new data to the contagious cache data

If we receive small data (e.g., 1 or 2 bytes), cache entry becomes a
headache. To mitigate this problem, we allocate cache buffer at least
4KiB and append the data to the contagious cache data.
Tatsuhiro Tsujikawa 13 years ago
parent
commit
838949ab85

+ 1 - 1
src/BtPieceMessage.cc

@@ -128,7 +128,7 @@ void BtPieceMessage::doReceivedAction()
       unsigned char* dataCopy = new unsigned char[blockLength_];
       memcpy(dataCopy, data_+9, blockLength_);
       piece->updateWrCache(getPieceStorage()->getWrDiskCache(),
-                           dataCopy, 0, blockLength_, offset);
+                           dataCopy, 0, blockLength_, blockLength_, offset);
     } else {
       getPieceStorage()->getDiskAdaptor()->writeData(data_+9, blockLength_,
                                                      offset);

+ 19 - 1
src/Piece.cc

@@ -351,7 +351,8 @@ void Piece::clearWrCache(WrDiskCache* diskCache)
 }
 
 void Piece::updateWrCache(WrDiskCache* diskCache, unsigned char* data,
-                          size_t offset, size_t len, int64_t goff)
+                          size_t offset, size_t len, size_t capacity,
+                          int64_t goff)
 {
   if(!diskCache) {
     return;
@@ -363,6 +364,7 @@ void Piece::updateWrCache(WrDiskCache* diskCache, unsigned char* data,
   cell->data = data;
   cell->offset = offset;
   cell->len = len;
+  cell->capacity = capacity;
   bool rv;
   rv = wrCache_->cacheData(cell);
   assert(rv);
@@ -370,6 +372,22 @@ void Piece::updateWrCache(WrDiskCache* diskCache, unsigned char* data,
   assert(rv);
 }
 
+size_t Piece::appendWrCache(WrDiskCache* diskCache, int64_t goff,
+                            const unsigned char* data, size_t len)
+{
+  if(!diskCache) {
+    return 0;
+  }
+  assert(wrCache_);
+  size_t delta = wrCache_->append(goff, data, len);
+  bool rv;
+  if(delta > 0) {
+    rv = diskCache->update(wrCache_, delta);
+    assert(rv);
+  }
+  return delta;
+}
+
 void Piece::releaseWrCache(WrDiskCache* diskCache)
 {
   if(diskCache && wrCache_) {

+ 8 - 1
src/Piece.h

@@ -208,7 +208,14 @@ public:
   void flushWrCache(WrDiskCache* diskCache);
   void clearWrCache(WrDiskCache* diskCache);
   void updateWrCache(WrDiskCache* diskCache, unsigned char* data,
-                     size_t offset, size_t len, int64_t goff);
+                     size_t offset, size_t len, size_t capacity, int64_t goff);
+  void updateWrCache(WrDiskCache* diskCache, unsigned char* data,
+                     size_t offset, size_t len, int64_t goff)
+  {
+    updateWrCache(diskCache, data, offset, len, len, goff);
+  }
+  size_t appendWrCache(WrDiskCache* diskCache, int64_t goff,
+                       const unsigned char* data, size_t len);
   void releaseWrCache(WrDiskCache* diskCache);
   WrDiskCacheEntry* getWrDiskCacheEntry() const
   {

+ 16 - 5
src/SinkStreamFilter.cc

@@ -67,12 +67,23 @@ ssize_t SinkStreamFilter::transform
       wlen = inlen;
     }
     const SharedHandle<Piece>& piece = segment->getPiece();
-    if(piece && piece->getWrDiskCacheEntry()) {
+    if(piece->getWrDiskCacheEntry()) {
       assert(wrDiskCache_);
-      unsigned char* dataCopy = new unsigned char[wlen];
-      memcpy(dataCopy, inbuf, wlen);
-      piece->updateWrCache(wrDiskCache_, dataCopy, 0, wlen,
-                           segment->getPositionToWrite());
+      // If we receive small data (e.g., 1 or 2 bytes), cache entry
+      // becomes a headache. To mitigate this problem, we allocate
+      // cache buffer at least 4KiB and append the data to the
+      // contagious cache data.
+      size_t alen = piece->appendWrCache(wrDiskCache_,
+                                         segment->getPositionToWrite(),
+                                         inbuf, wlen);
+      if(alen < wlen) {
+        size_t len = wlen - alen;
+        size_t capacity = std::max(len, static_cast<size_t>(4096));
+        unsigned char* dataCopy = new unsigned char[capacity];
+        memcpy(dataCopy, inbuf + alen, len);
+        piece->updateWrCache(wrDiskCache_, dataCopy, 0, len, capacity,
+                             segment->getPositionToWrite() + alen);
+      }
     } else {
       out->writeData(inbuf, wlen, segment->getPositionToWrite());
     }

+ 21 - 0
src/WrDiskCacheEntry.cc

@@ -33,6 +33,9 @@
  */
 /* copyright --> */
 #include "WrDiskCacheEntry.h"
+
+#include <cstring>
+
 #include "DiskAdaptor.h"
 #include "RecoverableException.h"
 #include "DownloadFailureException.h"
@@ -101,4 +104,22 @@ bool WrDiskCacheEntry::cacheData(DataCell* dataCell)
   }
 }
 
+size_t WrDiskCacheEntry::append(int64_t goff, const unsigned char *data,
+                                size_t len)
+{
+  if(set_.empty()) {
+    return 0;
+  }
+  DataCellSet::iterator i = set_.end();
+  --i;
+  if(static_cast<int64_t>((*i)->goff + (*i)->len) == goff) {
+    size_t wlen = std::min((*i)->capacity - (*i)->len, len);
+    memcpy((*i)->data + (*i)->offset + (*i)->len, data, wlen);
+    (*i)->len += wlen;
+    return wlen;
+  } else {
+    return 0;
+  }
+}
+
 } // namespace aria2

+ 7 - 0
src/WrDiskCacheEntry.h

@@ -58,6 +58,8 @@ public:
     unsigned char *data;
     size_t offset;
     size_t len;
+    // valid memory range from data+offset
+    size_t capacity;
     bool operator<(const DataCell& rhs) const
     {
       return goff < rhs.goff;
@@ -76,6 +78,11 @@ public:
 
   // Caches |dataCell|
   bool cacheData(DataCell* dataCell);
+
+  // Appends into last dataCell in set_ if the region is
+  // contagious. Returns the number of copied bytes.
+  size_t append(int64_t goff, const unsigned char *data, size_t len);
+
   size_t getSize() const
   {
     return size_;

+ 1 - 1
test/MockSegment.h

@@ -71,7 +71,7 @@ public:
 
   virtual SharedHandle<Piece> getPiece() const
   {
-    return SharedHandle<Piece>();
+    return SharedHandle<Piece>(new Piece());
   }
 };
 

+ 19 - 0
test/PieceTest.cc

@@ -17,6 +17,7 @@ class PieceTest:public CppUnit::TestFixture {
   CPPUNIT_TEST(testCompleteBlock);
   CPPUNIT_TEST(testGetCompletedLength);
   CPPUNIT_TEST(testFlushWrCache);
+  CPPUNIT_TEST(testAppendWrCache);
 #ifdef ENABLE_MESSAGE_DIGEST
 
   CPPUNIT_TEST(testGetDigestWithWrCache);
@@ -39,6 +40,7 @@ public:
   void testCompleteBlock();
   void testGetCompletedLength();
   void testFlushWrCache();
+  void testAppendWrCache();
 
 #ifdef ENABLE_MESSAGE_DIGEST
 
@@ -100,6 +102,23 @@ void PieceTest::testFlushWrCache()
   CPPUNIT_ASSERT(!p.getWrDiskCacheEntry());
 }
 
+void PieceTest::testAppendWrCache()
+{
+  unsigned char* data;
+  Piece p(0, 1024);
+  WrDiskCache dc(1024);
+  p.initWrCache(&dc, adaptor_);
+  size_t capacity = 6;
+  data = new unsigned char[capacity];
+  memcpy(data, "foo", 3);
+  p.updateWrCache(&dc, data, 0, 3, capacity, 0);
+  size_t alen = p.appendWrCache
+    (&dc, 3, reinterpret_cast<const unsigned char*>("barbaz"), 6);
+  CPPUNIT_ASSERT_EQUAL((size_t)3, alen);
+  p.flushWrCache(&dc);
+  CPPUNIT_ASSERT_EQUAL(std::string("foobar"), writer_->getString());
+}
+
 #ifdef ENABLE_MESSAGE_DIGEST
 
 void PieceTest::testGetDigestWithWrCache()

+ 1 - 1
test/TestUtil.cc

@@ -101,7 +101,7 @@ WrDiskCacheEntry::DataCell* createDataCell(int64_t goff,
   cell->data = new unsigned char[len];
   memcpy(cell->data, data, len);
   cell->offset = offset;
-  cell->len = len - offset;
+  cell->len = cell->capacity = len - offset;
   return cell;
 }
 

+ 23 - 0
test/WrDiskCacheEntryTest.cc

@@ -14,6 +14,7 @@ class WrDiskCacheEntryTest:public CppUnit::TestFixture {
 
   CPPUNIT_TEST_SUITE(WrDiskCacheEntryTest);
   CPPUNIT_TEST(testWriteToDisk);
+  CPPUNIT_TEST(testAppend);
   CPPUNIT_TEST(testClear);
   CPPUNIT_TEST_SUITE_END();
 
@@ -28,6 +29,7 @@ public:
   }
 
   void testWriteToDisk();
+  void testAppend();
   void testClear();
 };
 
@@ -43,6 +45,27 @@ void WrDiskCacheEntryTest::testWriteToDisk()
   CPPUNIT_ASSERT_EQUAL(std::string("01234567890"), writer_->getString());
 }
 
+void WrDiskCacheEntryTest::testAppend()
+{
+  WrDiskCacheEntry e(adaptor_);
+  WrDiskCacheEntry::DataCell* cell = new WrDiskCacheEntry::DataCell();
+  cell->goff = 0;
+  size_t capacity = 6;
+  size_t offset = 2;
+  cell->data = new unsigned char[offset+capacity];
+  memcpy(cell->data, "??foo", 3);
+  cell->offset = offset;
+  cell->len = 3;
+  cell->capacity = capacity;
+  e.cacheData(cell);
+  CPPUNIT_ASSERT_EQUAL((size_t)3,
+                       e.append(3, (const unsigned char*)"barbaz", 6));
+  CPPUNIT_ASSERT_EQUAL((size_t)6, cell->len);
+
+  CPPUNIT_ASSERT_EQUAL((size_t)0,
+                       e.append(7, (const unsigned char*)"FOO", 3));
+}
+
 void WrDiskCacheEntryTest::testClear()
 {
   WrDiskCacheEntry e(adaptor_);