Browse Source

Write data in 4K aligned offset in write with disk cache enabled

This greatly reduces disk activity especially on Win + NTFS.  Not so
much difference on Linux.
Tatsuhiro Tsujikawa 13 years ago
parent
commit
9ed8502e74

+ 47 - 0
src/AbstractSingleDiskAdaptor.cc

@@ -38,6 +38,8 @@
 #include "DiskWriter.h"
 #include "FileEntry.h"
 #include "TruncFileAllocationIterator.h"
+#include "WrDiskCacheEntry.h"
+#include "LogFactory.h"
 #ifdef HAVE_SOME_FALLOCATE
 # include "FallocFileAllocationIterator.h"
 #endif // HAVE_SOME_FALLOCATE
@@ -81,6 +83,51 @@ ssize_t AbstractSingleDiskAdaptor::readData
   return diskWriter_->readData(data, len, offset);
 }
 
+void AbstractSingleDiskAdaptor::writeCache(const WrDiskCacheEntry* entry)
+{
+  // Write cached data in 4KiB aligned offset. This reduces disk
+  // activity especially on Windows 7 NTFS. In this code, we assume
+  // that maximum length of DataCell data is 16KiB to simplify the
+  // code.
+  unsigned char buf[16*1024];
+  int64_t start = 0;
+  size_t buflen = 0;
+  size_t buffoffset = 0;
+  const WrDiskCacheEntry::DataCellSet& dataSet = entry->getDataSet();
+  for(WrDiskCacheEntry::DataCellSet::const_iterator i = dataSet.begin(),
+        eoi = dataSet.end(); i != eoi; ++i) {
+    if(start+static_cast<ssize_t>(buflen) < (*i)->goff) {
+      A2_LOG_DEBUG(fmt("Cache flush goff=%"PRId64", len=%lu",
+                       start, static_cast<unsigned long>(buflen)));
+      writeData(buf+buffoffset, buflen-buffoffset, start);
+      start = (*i)->goff;
+      buflen = buffoffset = 0;
+    }
+    if(buflen == 0 && ((*i)->goff & 0xfff) == 0 && ((*i)->len & 0xfff) == 0) {
+      // Already aligned. Write it without copy.
+      writeData((*i)->data + (*i)->offset, (*i)->len, start);
+      start += (*i)->len;
+    } else {
+      if(buflen == 0) {
+        buflen = buffoffset = (*i)->goff & 0xfff;
+      }
+      size_t wlen = std::min(sizeof(buf) - buflen, (*i)->len);
+      memcpy(buf+buflen, (*i)->data+(*i)->offset, wlen);
+      buflen += wlen;
+      if(buflen == sizeof(buf)) {
+        A2_LOG_DEBUG(fmt("Cache flush goff=%"PRId64", len=%lu",
+                         start, static_cast<unsigned long>(buflen)));
+        writeData(buf+buffoffset, buflen-buffoffset, start);
+        memcpy(buf, (*i)->data + (*i)->offset + wlen, (*i)->len - wlen);
+        start += sizeof(buf) - buffoffset;
+        buflen = (*i)->len - wlen;
+        buffoffset = 0;
+      }
+    }
+  }
+  writeData(buf+buffoffset, buflen-buffoffset, start);
+}
+
 bool AbstractSingleDiskAdaptor::fileExists()
 {
   return File(getFilePath()).exists();

+ 2 - 0
src/AbstractSingleDiskAdaptor.h

@@ -65,6 +65,8 @@ public:
 
   virtual ssize_t readData(unsigned char* data, size_t len, int64_t offset);
 
+  virtual void writeCache(const WrDiskCacheEntry* entry);
+
   virtual bool fileExists();
 
   virtual int64_t size();

+ 4 - 0
src/DiskAdaptor.h

@@ -47,6 +47,7 @@ namespace aria2 {
 
 class FileEntry;
 class FileAllocationIterator;
+class WrDiskCacheEntry;
 
 class DiskAdaptor:public BinaryStream {
 public:
@@ -105,6 +106,9 @@ public:
   // successfully changed.
   virtual size_t utime(const Time& actime, const Time& modtime) = 0;
 
+  // Writes cached data to the underlying disk.
+  virtual void writeCache(const WrDiskCacheEntry* entry) = 0;
+
   void setFileAllocationMethod(FileAllocationMethod method)
   {
     fileAllocationMethod_ = method;

+ 98 - 0
src/MultiDiskAdaptor.cc

@@ -50,6 +50,7 @@
 #include "Logger.h"
 #include "LogFactory.h"
 #include "SimpleRandomizer.h"
+#include "WrDiskCacheEntry.h"
 
 namespace aria2 {
 
@@ -419,6 +420,103 @@ ssize_t MultiDiskAdaptor::readData
   return totalReadLength;
 }
 
+void MultiDiskAdaptor::writeCache(const WrDiskCacheEntry* entry)
+{
+  // Write cached data in 4KiB aligned offset. This reduces disk
+  // activity especially on Windows 7 NTFS.
+  unsigned char buf[16*1024];
+  size_t buflen = 0;
+  size_t buffoffset = 0;
+  const WrDiskCacheEntry::DataCellSet& dataSet = entry->getDataSet();
+  if(dataSet.empty()) {
+    return;
+  }
+  DiskWriterEntries::const_iterator dent =
+    findFirstDiskWriterEntry(diskWriterEntries_, (*dataSet.begin())->goff),
+    eod = diskWriterEntries_.end();
+  WrDiskCacheEntry::DataCellSet::const_iterator i = dataSet.begin(),
+    eoi = dataSet.end();
+  size_t celloff = 0;
+  for(; dent != eod; ++dent) {
+    int64_t lstart = 0, lp = 0;
+    const SharedHandle<FileEntry>& fent = (*dent)->getFileEntry();
+    for(; i != eoi;) {
+      if(std::max(fent->getOffset(),
+                  static_cast<int64_t>((*i)->goff + celloff)) <
+         std::min(fent->getLastOffset(),
+                  static_cast<int64_t>((*i)->goff + (*i)->len))) {
+        openIfNot(*dent, &DiskWriterEntry::openFile);
+        if(!(*dent)->isOpen()) {
+          throwOnDiskWriterNotOpened(*dent, (*i)->goff + celloff);
+        }
+      } else {
+        A2_LOG_DEBUG(fmt("%s Cache flush loff=%"PRId64", len=%lu",
+                         fent->getPath().c_str(),
+                         lstart,
+                         static_cast<unsigned long>(buflen-buffoffset)));
+        (*dent)->getDiskWriter()->
+          writeData(buf + buffoffset, buflen - buffoffset, lstart);
+        buflen = buffoffset = 0;
+        break;
+      }
+      int64_t loff = fent->gtoloff((*i)->goff + celloff);
+      if(static_cast<int64_t>(lstart + buflen) < loff) {
+        A2_LOG_DEBUG(fmt("%s Cache flush loff=%"PRId64", len=%lu",
+                         fent->getPath().c_str(),
+                         lstart,
+                         static_cast<unsigned long>(buflen-buffoffset)));
+        (*dent)->getDiskWriter()->
+          writeData(buf + buffoffset, buflen - buffoffset, lstart);
+        lstart = lp = loff;
+        buflen = buffoffset = 0;
+      }
+      // If the position of the cache data is not aligned, offset
+      // buffer so that next write can be aligned.
+      if(buflen == 0) {
+        buflen = buffoffset = loff & 0xfff;
+      }
+      assert((*i)->len > celloff);
+      for(;;) {
+        size_t wlen = std::min(static_cast<int64_t>((*i)->len - celloff),
+                               fent->getLength() - lp);
+        wlen = std::min(wlen, sizeof(buf) - buflen);
+        memcpy(buf + buflen, (*i)->data + (*i)->offset + celloff, wlen);
+        buflen += wlen;
+        celloff += wlen;
+        lp += wlen;
+        if(lp == fent->getLength() || buflen == sizeof(buf)) {
+          A2_LOG_DEBUG(fmt("%s Cache flush loff=%"PRId64", len=%lu",
+                           fent->getPath().c_str(),
+                           lstart,
+                           static_cast<unsigned long>(buflen-buffoffset)));
+          (*dent)->getDiskWriter()->
+            writeData(buf + buffoffset, buflen - buffoffset, lstart);
+          lstart += buflen - buffoffset;
+          lp = lstart;
+          buflen = buffoffset = 0;
+        }
+        if(lp == fent->getLength() || celloff == (*i)->len) {
+          break;
+        }
+      }
+      if(celloff == (*i)->len) {
+        ++i;
+        celloff = 0;
+      }
+    }
+    if(i == eoi) {
+      A2_LOG_DEBUG(fmt("%s Cache flush loff=%"PRId64", len=%lu",
+                       fent->getPath().c_str(),
+                       lstart,
+                       static_cast<unsigned long>(buflen - buffoffset)));
+      (*dent)->getDiskWriter()->
+        writeData(buf + buffoffset, buflen - buffoffset, lstart);
+      break;
+    }
+  }
+  assert(i == eoi);
+}
+
 bool MultiDiskAdaptor::fileExists()
 {
   return std::find_if(getFileEntries().begin(), getFileEntries().end(),

+ 2 - 0
src/MultiDiskAdaptor.h

@@ -135,6 +135,8 @@ public:
 
   virtual ssize_t readData(unsigned char* data, size_t len, int64_t offset);
 
+  virtual void writeCache(const WrDiskCacheEntry* entry);
+
   virtual bool fileExists();
 
   virtual int64_t size();

+ 2 - 8
src/WrDiskCacheEntry.cc

@@ -75,15 +75,9 @@ void WrDiskCacheEntry::writeToDisk()
 {
   DataCellSet::iterator i = set_.begin(), eoi = set_.end();
   try {
-    for(; i != eoi; ++i) {
-      A2_LOG_DEBUG(fmt("WrDiskCacheEntry flush goff=%"PRId64", len=%lu",
-                       (*i)->goff, static_cast<unsigned long>((*i)->len)));
-      diskAdaptor_->writeData((*i)->data+(*i)->offset, (*i)->len,
-                              (*i)->goff);
-    }
+    diskAdaptor_->writeCache(this);
   } catch(RecoverableException& e) {
-    A2_LOG_ERROR(fmt("WrDiskCacheEntry flush error goff=%"PRId64", len=%lu",
-                     (*i)->goff, static_cast<unsigned long>((*i)->len)));
+    A2_LOG_ERROR("WrDiskCacheEntry flush error");
     error_ = CACHE_ERR_ERROR;
     errorCode_ = e.getErrorCode();
   }

+ 30 - 0
test/DirectDiskAdaptorTest.cc

@@ -7,6 +7,8 @@
 #include "Exception.h"
 #include "util.h"
 #include "TestUtil.h"
+#include "ByteArrayDiskWriter.h"
+#include "WrDiskCacheEntry.h"
 
 namespace aria2 {
 
@@ -14,6 +16,7 @@ class DirectDiskAdaptorTest:public CppUnit::TestFixture {
 
   CPPUNIT_TEST_SUITE(DirectDiskAdaptorTest);
   CPPUNIT_TEST(testCutTrailingGarbage);
+  CPPUNIT_TEST(testWriteCache);
   CPPUNIT_TEST_SUITE_END();
 public:
   void setUp() {}
@@ -21,6 +24,7 @@ public:
   void tearDown() {}
 
   void testCutTrailingGarbage();
+  void testWriteCache();
 };
 
 
@@ -50,4 +54,30 @@ void DirectDiskAdaptorTest::testCutTrailingGarbage()
                        File(entry->getPath()).size());
 }
 
+void DirectDiskAdaptorTest::testWriteCache()
+{
+  SharedHandle<DirectDiskAdaptor> adaptor(new DirectDiskAdaptor());
+  SharedHandle<ByteArrayDiskWriter> dw(new ByteArrayDiskWriter());
+  adaptor->setDiskWriter(dw);
+  WrDiskCacheEntry cache(adaptor);
+  std::string data1(4096, '1'), data2(4094, '2');
+  cache.cacheData(createDataCell(5, data1.c_str()));
+  cache.cacheData(createDataCell(5+data1.size(), data2.c_str()));
+  adaptor->writeCache(&cache);
+  CPPUNIT_ASSERT_EQUAL(data1+data2, dw->getString().substr(5));
+
+  cache.clear();
+  dw->setString("");
+  cache.cacheData(createDataCell(4096, data1.c_str()));
+  adaptor->writeCache(&cache);
+  CPPUNIT_ASSERT_EQUAL(data1, dw->getString().substr(4096));
+
+  cache.clear();
+  dw->setString("???????");
+  cache.cacheData(createDataCell(0, "abc"));
+  cache.cacheData(createDataCell(4, "efg"));
+  adaptor->writeCache(&cache);
+  CPPUNIT_ASSERT_EQUAL(std::string("abc?efg"), dw->getString());
+}
+
 } // namespace aria2

+ 45 - 0
test/MultiDiskAdaptorTest.cc

@@ -12,6 +12,7 @@
 #include "array_fun.h"
 #include "TestUtil.h"
 #include "DiskWriter.h"
+#include "WrDiskCacheEntry.h"
 
 namespace aria2 {
 
@@ -24,6 +25,7 @@ class MultiDiskAdaptorTest:public CppUnit::TestFixture {
   CPPUNIT_TEST(testSize);
   CPPUNIT_TEST(testUtime);
   CPPUNIT_TEST(testResetDiskWriterEntries);
+  CPPUNIT_TEST(testWriteCache);
   CPPUNIT_TEST_SUITE_END();
 private:
   SharedHandle<MultiDiskAdaptor> adaptor;
@@ -39,6 +41,7 @@ public:
   void testSize();
   void testUtime();
   void testResetDiskWriterEntries();
+  void testWriteCache();
 };
 
 
@@ -453,4 +456,46 @@ void MultiDiskAdaptorTest::testUtime()
                  File(entries[2]->getPath()).getModifiedTime().getTime());
 }
 
+void MultiDiskAdaptorTest::testWriteCache()
+{
+  std::string storeDir =
+    A2_TEST_OUT_DIR"/aria2_MultiDiskAdaptorTest_testWriteCache";
+  SharedHandle<FileEntry> entries[] = {
+    SharedHandle<FileEntry>(new FileEntry(storeDir+"/file1", 16385, 0)),
+    SharedHandle<FileEntry>(new FileEntry(storeDir+"/file2", 4098, 16385))
+  };
+  for(int i = 0; i < 2; ++i) {
+    File(entries[i]->getPath()).remove();
+  }
+  SharedHandle<MultiDiskAdaptor> adaptor(new MultiDiskAdaptor());
+  adaptor->setFileEntries(vbegin(entries), vend(entries));
+  WrDiskCacheEntry cache(adaptor);
+  std::string data1(16383, '1'), data2(100, '2'), data3(4000, '3');
+  cache.cacheData(createDataCell(0, data1.c_str()));
+  cache.cacheData(createDataCell(data1.size(), data2.c_str()));
+  cache.cacheData(createDataCell(data1.size()+data2.size(), data3.c_str()));
+  adaptor->openFile();
+  adaptor->writeCache(&cache);
+  for(int i = 0; i < 2; ++i) {
+    CPPUNIT_ASSERT_EQUAL(entries[i]->getLength(),
+                         File(entries[i]->getPath()).size());
+  }
+  CPPUNIT_ASSERT_EQUAL(data1+data2.substr(0, 2),
+                       readFile(entries[0]->getPath()));
+  CPPUNIT_ASSERT_EQUAL(data2.substr(2)+data3,
+                       readFile(entries[1]->getPath()));
+
+  adaptor->closeFile();
+  for(int i = 0; i < 2; ++i) {
+    File(entries[i]->getPath()).remove();
+  }
+  cache.clear();
+  cache.cacheData(createDataCell(123, data2.c_str()));
+  adaptor->openFile();
+  adaptor->writeCache(&cache);
+  CPPUNIT_ASSERT_EQUAL((int64_t)(123+data2.size()),
+                       File(entries[0]->getPath()).size());
+  CPPUNIT_ASSERT_EQUAL(data2, readFile(entries[0]->getPath()).substr(123));
+}
+
 } // namespace aria2

+ 1 - 1
test/TestUtil.cc

@@ -101,7 +101,7 @@ WrDiskCacheEntry::DataCell* createDataCell(int64_t goff,
   cell->data = new unsigned char[len];
   memcpy(cell->data, data, len);
   cell->offset = offset;
-  cell->len = len;
+  cell->len = len - offset;
   return cell;
 }