Bladeren bron

2007-11-28 Tatsuhiro Tsujikawa <tujikawa at rednoah dot com>

	Added direct I/O support. The current implementation uses O_DIRECT,
	which is not posix standard and is tested on linux 2.6.21.
	Currently only file allocation uses direct I/O.
	* src/SingleFileAllocationIterator.{h, cc}
	* test/SingleFileAllocationIteratorTest.cc
	* src/MultiFileAllocationIterator.{h, cc}
	* test/MultiFileAllocationIteratorTest.cc
	* src/BinaryStream.h
	* src/DiskWriter.h
	* src/AbstractDiskWriter.{h, cc}
	* src/ByteArrayDiskWriter.h
	* src/DiskAdaptor.h
	* src/AbstractSingleDiskAdaptor.{h, cc}
	* src/MultiDiskAdaptor.{h, cc}
	* src/FileAllocationEntry.cc
	* src/Util.{h, cc}
	* src/OptionHandlerFactory.cc
	* src/prefs.h
	* src/version_usage.cc
	* src/option_processing.cc

	Moved FileAllocationMan::markCurrentFileAllocationEntryDone() to
	handleException.
	* src/MultiFileAllocationIterator.cc

	Added EINTR handling
	* src/SocketCore.cc
Tatsuhiro Tsujikawa 18 jaren geleden
bovenliggende
commit
5a2f398eca

+ 30 - 0
ChangeLog

@@ -1,3 +1,33 @@
+2007-11-28  Tatsuhiro Tsujikawa  <tujikawa at rednoah dot com>
+
+	Added direct I/O support. The current implementation uses O_DIRECT,
+	which is not posix standard and is tested on linux 2.6.21.
+	Currently only file allocation uses direct I/O.
+	* src/SingleFileAllocationIterator.{h, cc}
+	* test/SingleFileAllocationIteratorTest.cc
+	* src/MultiFileAllocationIterator.{h, cc}
+	* test/MultiFileAllocationIteratorTest.cc
+	* src/BinaryStream.h
+	* src/DiskWriter.h
+	* src/AbstractDiskWriter.{h, cc}
+	* src/ByteArrayDiskWriter.h
+	* src/DiskAdaptor.h
+	* src/AbstractSingleDiskAdaptor.{h, cc}
+	* src/MultiDiskAdaptor.{h, cc}
+	* src/FileAllocationEntry.cc
+	* src/Util.{h, cc}
+	* src/OptionHandlerFactory.cc
+	* src/prefs.h
+	* src/version_usage.cc
+	* src/option_processing.cc
+
+	Moved FileAllocationMan::markCurrentFileAllocationEntryDone() to
+	handleException.
+	* src/MultiFileAllocationIterator.cc
+
+	Added EINTR handling
+	* src/SocketCore.cc
+	
 2007-11-28  Tatsuhiro Tsujikawa  <tujikawa at rednoah dot com>
 
 	Updated usage

+ 3 - 0
config.h.in

@@ -211,6 +211,9 @@
 /* Define to 1 if you have old openssl. */
 #undef HAVE_OLD_LIBSSL
 
+/* Define to 1 if you have the `posix_memalign' function. */
+#undef HAVE_POSIX_MEMALIGN
+
 /* Define to 1 if you have the `putenv' function. */
 #undef HAVE_PUTENV
 

+ 2 - 1
configure

@@ -12769,7 +12769,8 @@ done
 
 
 
-for ac_func in __argz_count __argz_next __argz_stringify daemon ftruncate getcwd getpagesize inet_ntoa memchr mempcpy memset mkdir munmap nl_langinfo random rmdir select setlocale sigaction sleep socket srandom stpcpy strcasecmp strchr strcspn strdup strerror strstr strtol strtoul unsetenv usleep
+
+for ac_func in __argz_count __argz_next __argz_stringify daemon ftruncate getcwd getpagesize inet_ntoa memchr mempcpy memset mkdir munmap nl_langinfo posix_memalign random rmdir select setlocale sigaction sleep socket srandom stpcpy strcasecmp strchr strcspn strdup strerror strstr strtol strtoul unsetenv usleep
 do
 as_ac_var=`echo "ac_cv_func_$ac_func" | $as_tr_sh`
 { echo "$as_me:$LINENO: checking for $ac_func" >&5

+ 1 - 1
configure.ac

@@ -148,7 +148,7 @@ AC_FUNC_MMAP
 AC_FUNC_SELECT_ARGTYPES
 AC_FUNC_STAT
 AC_FUNC_VPRINTF
-AC_CHECK_FUNCS([__argz_count __argz_next __argz_stringify daemon ftruncate getcwd getpagesize inet_ntoa memchr mempcpy memset mkdir munmap nl_langinfo random rmdir select setlocale sigaction sleep socket srandom stpcpy strcasecmp strchr strcspn strdup strerror strstr strtol strtoul unsetenv usleep])
+AC_CHECK_FUNCS([__argz_count __argz_next __argz_stringify daemon ftruncate getcwd getpagesize inet_ntoa memchr mempcpy memset mkdir munmap nl_langinfo posix_memalign random rmdir select setlocale sigaction sleep socket srandom stpcpy strcasecmp strchr strcspn strdup strerror strstr strtol strtoul unsetenv usleep])
 
 AC_CHECK_FUNCS([basename],
 	[AM_CONDITIONAL([HAVE_BASENAME], true)],

+ 18 - 0
src/AbstractDiskWriter.cc

@@ -163,3 +163,21 @@ int64_t AbstractDiskWriter::size() const
   }
   return fileStat.st_size;
 }
+
+void AbstractDiskWriter::enableDirectIO()
+{
+#ifdef ENABLE_DIRECT_IO
+  int32_t flg;
+  while((flg = fcntl(fd, F_GETFL)) == -1 && errno == EINTR);
+  while(fcntl(fd, F_SETFL, flg|O_DIRECT) == -1 && errno == EINTR);
+#endif // ENABLE_DIRECT_IO
+}
+
+void AbstractDiskWriter::disableDirectIO()
+{
+#ifdef ENABLE_DIRECT_IO
+  int32_t flg;
+  while((flg = fcntl(fd, F_GETFL)) == -1 && errno == EINTR);
+  while(fcntl(fd, F_SETFL, flg&(~O_DIRECT)) == -1 && errno == EINTR);
+#endif // ENABLE_DIRECT_IO
+}

+ 4 - 0
src/AbstractDiskWriter.h

@@ -69,6 +69,10 @@ public:
   virtual void truncate(int64_t length);
 
   virtual int64_t size() const;
+  
+  virtual void enableDirectIO();
+
+  virtual void disableDirectIO();
 };
 
 #endif // _D_ABSTRACT_DISK_WRITER_H_

+ 16 - 1
src/AbstractSingleDiskAdaptor.cc

@@ -73,5 +73,20 @@ bool AbstractSingleDiskAdaptor::fileExists()
 
 FileAllocationIteratorHandle AbstractSingleDiskAdaptor::fileAllocationIterator()
 {
-  return new SingleFileAllocationIterator(this);
+  SingleFileAllocationIteratorHandle h =
+    new SingleFileAllocationIterator(this,
+				     size(),
+				     totalLength);
+  h->init();
+  return h;
+}
+
+void AbstractSingleDiskAdaptor::enableDirectIO()
+{
+  diskWriter->enableDirectIO();
+}
+
+void AbstractSingleDiskAdaptor::disableDirectIO()
+{
+  diskWriter->disableDirectIO();
 }

+ 4 - 0
src/AbstractSingleDiskAdaptor.h

@@ -74,6 +74,10 @@ public:
   
   virtual FileAllocationIteratorHandle fileAllocationIterator();
 
+  virtual void enableDirectIO();
+
+  virtual void disableDirectIO();
+
   void setDiskWriter(const DiskWriterHandle diskWriter) {
     this->diskWriter = diskWriter;
   }

+ 2 - 0
src/BinaryStream.h

@@ -45,6 +45,8 @@ public:
   virtual void writeData(const unsigned char* data, int32_t len, int64_t offset) = 0;
 
   virtual int32_t readData(unsigned char* data, int32_t len, int64_t offset) = 0;
+
+  virtual void truncate(int64_t length) = 0;
 };
 
 typedef SharedHandle<BinaryStream> BinaryStreamHandle;

+ 4 - 0
src/ByteArrayDiskWriter.h

@@ -65,6 +65,10 @@ public:
   {
     return buf.str().size();
   }
+
+  virtual void enableDirectIO() {}
+
+  virtual void disableDirectIO() {}
 };
 
 #endif // _D_BYTE_ARRAY_DISK_WRITER_H_

+ 1 - 0
src/DefaultPieceStorage.cc

@@ -394,6 +394,7 @@ void DefaultPieceStorage::setFileFilter(const Strings& filePaths)
 void DefaultPieceStorage::setFileFilter(IntSequence seq)
 {
   Integers fileIndexes = seq.flush();
+  // TODO Is sorting necessary?
   sort(fileIndexes.begin(), fileIndexes.end());
   fileIndexes.erase(unique(fileIndexes.begin(), fileIndexes.end()), fileIndexes.end());
   Strings filePaths;

+ 4 - 0
src/DiskAdaptor.h

@@ -89,6 +89,10 @@ public:
   const string& getStoreDir() const { return this->storeDir; }
 
   virtual FileAllocationIteratorHandle fileAllocationIterator() = 0;
+
+  virtual void enableDirectIO() {};
+
+  virtual void disableDirectIO() {};
 };
 
 typedef SharedHandle<DiskAdaptor> DiskAdaptorHandle;

+ 4 - 0
src/DiskWriter.h

@@ -71,6 +71,10 @@ public:
 
   // Returns file length
   virtual int64_t size() const = 0;
+
+  virtual void enableDirectIO() = 0;
+
+  virtual void disableDirectIO() = 0;
 };
 
 typedef SharedHandle<DiskWriter> DiskWriterHandle;

+ 13 - 17
src/FileAllocationCommand.cc

@@ -51,29 +51,25 @@ bool FileAllocationCommand::executeInternal()
   if(_e->isHaltRequested()) {
     return true;
   }
-  try {
-    _fileAllocationEntry->allocateChunk();
-    if(_fileAllocationEntry->finished()) {
-      logger->debug(MSG_ALLOCATION_COMPLETED,
-		    _timer.difference(),
-		    Util::llitos(_requestGroup->getTotalLength(), true).c_str());
-      _e->_fileAllocationMan->markCurrentFileAllocationEntryDone();
-      
-      _e->addCommand(_fileAllocationEntry->prepareForNextAction(_e));
-      
-      return true;
-    } else {
-      _e->commands.push_back(this);
-      return false;
-    }
-  } catch(Exception* e) {
+  _fileAllocationEntry->allocateChunk();
+  if(_fileAllocationEntry->finished()) {
+    logger->debug(MSG_ALLOCATION_COMPLETED,
+		  _timer.difference(),
+		  Util::llitos(_requestGroup->getTotalLength(), true).c_str());
     _e->_fileAllocationMan->markCurrentFileAllocationEntryDone();
-    throw;
+    
+    _e->addCommand(_fileAllocationEntry->prepareForNextAction(_e));
+    
+    return true;
+  } else {
+    _e->commands.push_back(this);
+    return false;
   }
 }
 
 bool FileAllocationCommand::handleException(Exception* e)
 {
+  _e->_fileAllocationMan->markCurrentFileAllocationEntryDone();
   logger->error(MSG_FILE_ALLOCATION_FAILURE, e, cuid);
   logger->error(MSG_DOWNLOAD_NOT_COMPLETE, cuid, _requestGroup->getFilePath().c_str());
   return true;

+ 13 - 2
src/FileAllocationEntry.cc

@@ -38,13 +38,24 @@
 #include "RequestGroup.h"
 #include "PieceStorage.h"
 #include "DiskAdaptor.h"
+#include "prefs.h"
+#include "Option.h"
 
 FileAllocationEntry::FileAllocationEntry(RequestGroup* requestGroup, Command* nextCommand):
   RequestGroupEntry(requestGroup, nextCommand),
   _fileAllocationIterator(requestGroup->getPieceStorage()->getDiskAdaptor()->fileAllocationIterator())
-{}
+{
+  if(_requestGroup->getOption()->getAsBool(PREF_ENABLE_DIRECT_IO)) {
+    _requestGroup->getPieceStorage()->getDiskAdaptor()->enableDirectIO();
+  }
+}
 
-FileAllocationEntry:: ~FileAllocationEntry() {}
+FileAllocationEntry:: ~FileAllocationEntry()
+{
+  if(_requestGroup->getOption()->getAsBool(PREF_ENABLE_DIRECT_IO)) {
+    _requestGroup->getPieceStorage()->getDiskAdaptor()->disableDirectIO();
+  }
+}
 
 int64_t FileAllocationEntry::getCurrentLength()
 {

+ 16 - 0
src/MultiDiskAdaptor.cc

@@ -207,3 +207,19 @@ FileAllocationIteratorHandle MultiDiskAdaptor::fileAllocationIterator()
 {
   return new MultiFileAllocationIterator(this);
 }
+
+void MultiDiskAdaptor::enableDirectIO()
+{
+  for(DiskWriterEntries::const_iterator itr = diskWriterEntries.begin();
+      itr != diskWriterEntries.end(); ++itr) {
+    (*itr)->getDiskWriter()->enableDirectIO();
+  }
+}
+
+void MultiDiskAdaptor::disableDirectIO()
+{
+  for(DiskWriterEntries::const_iterator itr = diskWriterEntries.begin();
+      itr != diskWriterEntries.end(); ++itr) {
+    (*itr)->getDiskWriter()->disableDirectIO();
+  }
+}

+ 12 - 0
src/MultiDiskAdaptor.h

@@ -39,6 +39,8 @@
 #include "DiskWriter.h"
 #include "File.h"
 
+class MultiFileAllocationIterator;
+
 class DiskWriterEntry {
 private:
   FileEntryHandle fileEntry;
@@ -96,6 +98,11 @@ public:
   DiskWriterHandle getDiskWriter() const {
     return diskWriter;
   }
+
+  bool operator<(const DiskWriterEntry& entry) const
+  {
+    return fileEntry < entry.fileEntry;
+  }
 };
 
 typedef SharedHandle<DiskWriterEntry> DiskWriterEntryHandle;
@@ -103,6 +110,7 @@ typedef SharedHandle<DiskWriterEntry> DiskWriterEntryHandle;
 typedef deque<DiskWriterEntryHandle> DiskWriterEntries;
 
 class MultiDiskAdaptor : public DiskAdaptor {
+  friend class MultiFileAllocationIterator;
 private:
   string topDir;
   int32_t pieceLength;
@@ -151,6 +159,10 @@ public:
 
   virtual FileAllocationIteratorHandle fileAllocationIterator();
 
+  virtual void enableDirectIO();
+
+  virtual void disableDirectIO();
+
   void setTopDir(const string& topDir) {
     this->topDir = topDir;
   }

+ 44 - 48
src/MultiFileAllocationIterator.cc

@@ -35,96 +35,92 @@
 #include "MultiFileAllocationIterator.h"
 #include "MultiDiskAdaptor.h"
 #include "FileEntry.h"
-
-#define BUFSIZE 16*1024
+#include "SingleFileAllocationIterator.h"
 
 MultiFileAllocationIterator::MultiFileAllocationIterator(MultiDiskAdaptor* diskAdaptor):
   _diskAdaptor(diskAdaptor),
-  _entries(makeFileEntries(diskAdaptor->getFileEntries(), diskAdaptor->getPieceLength())),
-  _currentEntry(0),
+  _entries(makeDiskWriterEntries(diskAdaptor->diskWriterEntries,
+				 diskAdaptor->getPieceLength())),
+  _fileAllocationIterator(0),
   _offset(0)
 {}
 
 MultiFileAllocationIterator::~MultiFileAllocationIterator() {}
 
-void MultiFileAllocationIterator::prepareNextEntry()
-{
-  _currentEntry = 0;
-  _offset = 0;
-  if(!_entries.empty()) {
-    FileEntryHandle entry = _entries.front();
-    _entries.pop_front();
-
-    _currentEntry = entry;
-    _offset = File(_diskAdaptor->getStoreDir()+"/"+
-		   _diskAdaptor->getTopDir()+"/"+
-		   _currentEntry->getPath()).size();
-  }
-}
-
-
 void MultiFileAllocationIterator::allocateChunk()
 {
-  while(_currentEntry.isNull() || _currentEntry->getLength() <= _offset) {
-    prepareNextEntry();
-    if(_currentEntry.isNull()) {
+  while(_fileAllocationIterator.isNull() || _fileAllocationIterator->finished()) {
+    if(_entries.empty()) {
       break;
     }
+    DiskWriterEntryHandle entry = _entries.front();
+    _entries.pop_front();
+    FileEntryHandle fileEntry = entry->getFileEntry();
+    if(entry->size() < fileEntry->getLength()) {
+      _fileAllocationIterator =
+	new SingleFileAllocationIterator(entry->getDiskWriter().get(),
+					 entry->size(),
+					 fileEntry->getLength());
+      _fileAllocationIterator->init();
+    }
   }
   if(finished()) {
     return;
   }
-  int32_t bufSize = BUFSIZE;
-  unsigned char buf[BUFSIZE];
-  memset(buf, 0, bufSize);
-  
-  int32_t wsize = _offset+bufSize > _currentEntry->getLength() ?
-    _currentEntry->getLength()-_offset:bufSize;
-  _diskAdaptor->writeData(buf, wsize, _offset+_currentEntry->getOffset());
-  _offset += wsize;  
+  _fileAllocationIterator->allocateChunk();
 }
 
 bool MultiFileAllocationIterator::finished()
 {
-  return _entries.empty() && _currentEntry.isNull();
+  return _entries.empty() && (_fileAllocationIterator.isNull() || _fileAllocationIterator->finished());
+}
+
+int64_t MultiFileAllocationIterator::getCurrentLength()
+{
+  if(_fileAllocationIterator.isNull()) {
+    return 0;
+  } else {
+    return _fileAllocationIterator->getCurrentLength();
+  }
 }
 
 int64_t MultiFileAllocationIterator::getTotalLength()
 {
-  if(_currentEntry.isNull()) {
+  if(_fileAllocationIterator.isNull()) {
     return 0;
   } else {
-    return _currentEntry->getLength();
+    return _fileAllocationIterator->getTotalLength();
   }
 }
 
-const FileEntries& MultiFileAllocationIterator::getFileEntries() const
+const DiskWriterEntries& MultiFileAllocationIterator::getDiskWriterEntries() const
 {
   return _entries;
 }
 
-FileEntries MultiFileAllocationIterator::makeFileEntries(const FileEntries& srcEntries, int32_t pieceLength) const
+DiskWriterEntries MultiFileAllocationIterator::makeDiskWriterEntries(const DiskWriterEntries& srcEntries, int32_t pieceLength) const
 {
   if(pieceLength == 0) {
-    FileEntries entries;
-    for(FileEntries::const_iterator itr = srcEntries.begin(); itr != srcEntries.end(); ++itr) {
-      if((*itr)->isRequested()) {
+    DiskWriterEntries entries;
+    for(DiskWriterEntries::const_iterator itr = srcEntries.begin(); itr != srcEntries.end(); ++itr) {
+      if((*itr)->getFileEntry()->isRequested()) {
 	entries.push_back(*itr);
       }
     }
     return entries;
   }
-  FileEntries temp(srcEntries);
-  temp.push_front(new FileEntry());
-  FileEntries entries;
-  FileEntries::const_iterator done = temp.begin();
-  for(FileEntries::const_iterator itr = temp.begin()+1; itr != temp.end(); ++itr) {
-    if(!(*itr)->isRequested()) {
+  DiskWriterEntries temp(srcEntries);
+  temp.push_front(new DiskWriterEntry(new FileEntry()));
+  DiskWriterEntries entries;
+  DiskWriterEntries::const_iterator done = temp.begin();
+  for(DiskWriterEntries::const_iterator itr = temp.begin()+1; itr != temp.end(); ++itr) {
+    FileEntryHandle fileEntry = (*itr)->getFileEntry();
+    if(!fileEntry->isRequested()) {
       continue;
     }
-    int64_t pieceStartOffset = ((*itr)->getOffset()/pieceLength)*pieceLength;
-    for(FileEntries::const_iterator i = itr-1; i != done; --i) {
-      if(pieceStartOffset < (*i)->getOffset()+(*i)->getLength()) {
+    int64_t pieceStartOffset = (fileEntry->getOffset()/pieceLength)*pieceLength;
+    for(DiskWriterEntries::const_iterator i = itr-1; i != done; --i) {
+      if(pieceStartOffset < (*i)->getFileEntry()->getOffset()+(*i)->getFileEntry()->getLength()) {
 	entries.push_back(*i);
       } else {
 	break;

+ 10 - 13
src/MultiFileAllocationIterator.h

@@ -38,38 +38,35 @@
 #include "FileAllocationIterator.h"
 
 class MultiDiskAdaptor;
-class FileEntry;
-typedef SharedHandle<FileEntry> FileEntryHandle;
-typedef deque<FileEntryHandle> FileEntries;
+class DiskWriterEntry;
+typedef SharedHandle<DiskWriterEntry> DiskWriterEntryHandle;
+typedef deque<DiskWriterEntryHandle> DiskWriterEntries;
+class SingleFileAllocationIterator;
+typedef SharedHandle<SingleFileAllocationIterator> SingleFileAllocationIteratorHandle;
 
 class MultiFileAllocationIterator:public FileAllocationIterator
 {
 private:
   MultiDiskAdaptor* _diskAdaptor;
-  FileEntries _entries;
-  FileEntryHandle _currentEntry;
+  DiskWriterEntries _entries;
+  SingleFileAllocationIteratorHandle _fileAllocationIterator;
   int64_t _offset;
 
-  FileEntries makeFileEntries(const FileEntries& srcEntries, int32_t pieceLength) const;
+  DiskWriterEntries makeDiskWriterEntries(const DiskWriterEntries& srcEntries, int32_t pieceLength) const;
 public:
   MultiFileAllocationIterator(MultiDiskAdaptor* diskAdaptor);
 
   virtual ~MultiFileAllocationIterator();
 
-  void prepareNextEntry();
-
   virtual void allocateChunk();
   
   virtual bool finished();
 
-  virtual int64_t getCurrentLength()
-  {
-    return _offset;
-  }
+  virtual int64_t getCurrentLength();
 
   virtual int64_t getTotalLength();
 
-  const FileEntries& getFileEntries() const;
+  const DiskWriterEntries& getDiskWriterEntries() const;
 };
 
 typedef SharedHandle<MultiFileAllocationIterator> MultiFileAllocationIteratorHandle;

+ 1 - 0
src/OptionHandlerFactory.cc

@@ -100,6 +100,7 @@ OptionHandlers OptionHandlerFactory::createOptionHandlers()
   handlers.push_back(new BooleanOptionHandler(PREF_ENABLE_HTTP_KEEP_ALIVE));
   handlers.push_back(new BooleanOptionHandler(PREF_ENABLE_HTTP_PIPELINING));
   handlers.push_back(new UnitNumberOptionHandler(PREF_NO_FILE_ALLOCATION_LIMIT, 0));
+  handlers.push_back(new BooleanOptionHandler(PREF_ENABLE_DIRECT_IO));
 
   return handlers;
 }

+ 2 - 2
src/RequestGroup.cc

@@ -199,7 +199,7 @@ Commands RequestGroup::createInitialCommand(DownloadEngine* e)
 	     _option->get(PREF_ALLOW_OVERWRITE) != V_TRUE) {
 	    // TODO we need this->haltRequested = true?
 	    throw new DownloadFailureException(MSG_FILE_ALREADY_EXISTS,
-					       getFilePath().c_str());
+					       _pieceStorage->getDiskAdaptor()->getFilePath().c_str());
 	  } else {
 	    _pieceStorage->getDiskAdaptor()->openFile();
 	  }
@@ -237,7 +237,7 @@ Commands RequestGroup::createInitialCommand(DownloadEngine* e)
 Commands RequestGroup::processCheckIntegrityEntry(const CheckIntegrityEntryHandle& entry, DownloadEngine* e)
 {
 #ifdef ENABLE_MESSAGE_DIGEST
-  if(File(getFilePath()).size() > 0 &&
+  if(//File(getFilePath()).size() > 0 &&
      e->option->get(PREF_CHECK_INTEGRITY) == V_TRUE &&
      entry->isValidationReady()) {
     entry->initValidator();

+ 27 - 20
src/SingleFileAllocationIterator.cc

@@ -33,35 +33,42 @@
  */
 /* copyright --> */
 #include "SingleFileAllocationIterator.h"
-#include "AbstractSingleDiskAdaptor.h"
+#include "BinaryStream.h"
+#include "Util.h"
+#include "a2io.h"
 
-#define BUFSIZE 16*1024
+#define BUFSIZE (256*1024)
 
-SingleFileAllocationIterator::SingleFileAllocationIterator(AbstractSingleDiskAdaptor* diskAdaptor):_diskAdaptor(diskAdaptor), _offset(diskAdaptor->size()) {}
+SingleFileAllocationIterator::SingleFileAllocationIterator(BinaryStream* stream, int64_t offset, int64_t totalLength):_stream(stream), _offset(offset), _totalLength(totalLength), _buffer(0)
+{}
 
-SingleFileAllocationIterator::~SingleFileAllocationIterator() {}
+SingleFileAllocationIterator::~SingleFileAllocationIterator()
+{
+  delete [] _buffer;
+}
 
-void SingleFileAllocationIterator::allocateChunk()
+void SingleFileAllocationIterator::init()
 {
-  int32_t bufSize = BUFSIZE;
-  unsigned char buf[BUFSIZE];
-  memset(buf, 0, bufSize);
-  
-  _diskAdaptor->writeData(buf, bufSize, _offset);
-  _offset += bufSize;
-  
-  if(_diskAdaptor->getTotalLength() < _offset) {
-    _diskAdaptor->truncate(getTotalLength());
-    _offset = getTotalLength();
-  }
+#ifdef HAVE_POSIX_MEMALIGN
+  _buffer = (unsigned char*)Util::allocateAlignedMemory(512, BUFSIZE);
+#else
+  _buffer = new unsigned char[BUFSIZE];
+#endif // HAVE_POSIX_MEMALIGN
+  memset(_buffer, 0, BUFSIZE);
 }
 
-bool SingleFileAllocationIterator::finished()
+void SingleFileAllocationIterator::allocateChunk()
 {
-  return getCurrentLength() >= getTotalLength();
+  _stream->writeData(_buffer, BUFSIZE, _offset);
+  _offset += BUFSIZE;
+
+  if(_totalLength < _offset) {
+    _stream->truncate(_totalLength);
+    _offset = _totalLength;
+  }
 }
 
-int64_t SingleFileAllocationIterator::getTotalLength()
+bool SingleFileAllocationIterator::finished()
 {
-  return _diskAdaptor->getTotalLength();
+  return _offset >= _totalLength;
 }

+ 17 - 4
src/SingleFileAllocationIterator.h

@@ -37,15 +37,20 @@
 
 #include "FileAllocationIterator.h"
 
-class AbstractSingleDiskAdaptor;
+class BinaryStream;
 
 class SingleFileAllocationIterator:public FileAllocationIterator
 {
 private:
-  AbstractSingleDiskAdaptor* _diskAdaptor;
+  BinaryStream* _stream;
+
   int64_t _offset;
+
+  int64_t _totalLength;
+
+  unsigned char* _buffer;
 public:
-  SingleFileAllocationIterator(AbstractSingleDiskAdaptor* diskAdaptor);
+  SingleFileAllocationIterator(BinaryStream* stream, int64_t offset, int64_t totalLength);
 
   virtual ~SingleFileAllocationIterator();
 
@@ -58,7 +63,15 @@ public:
     return _offset;
   }
 
-  virtual int64_t getTotalLength();
+  virtual int64_t getTotalLength()
+  {
+    return _totalLength;
+  }
+
+  /**
+   * Must be called only once, before calling allocateChunk()
+   */
+  void init();
 };
 
 typedef SharedHandle<SingleFileAllocationIterator> SingleFileAllocationIteratorHandle;

+ 6 - 4
src/SocketCore.cc

@@ -213,9 +213,10 @@ void SocketCore::setNonBlockingMode()
     throw new DlAbortEx(EX_SOCKET_NONBLOCKING, errorMsg());
   }
 #else
-  int32_t flags = fcntl(sockfd, F_GETFL, 0);
+  int32_t flags;
+  while((flags = fcntl(sockfd, F_GETFL, 0)) == -1 && errno == EINTR);
   // TODO add error handling
-  fcntl(sockfd, F_SETFL, flags|O_NONBLOCK);
+  while(fcntl(sockfd, F_SETFL, flags|O_NONBLOCK) == -1 && errno == EINTR);
 #endif // __MINGW32__
   blocking = false;
 }
@@ -228,9 +229,10 @@ void SocketCore::setBlockingMode()
     throw new DlAbortEx(EX_SOCKET_BLOCKING, errorMsg());
   }
 #else
-  int32_t flags = fcntl(sockfd, F_GETFL, 0);
+  int32_t flags;
+  while((flags = fcntl(sockfd, F_GETFL, 0)) == -1 && errno == EINTR);
   // TODO add error handling
-  fcntl(sockfd, F_SETFL, flags&(~O_NONBLOCK));
+  while(fcntl(sockfd, F_SETFL, flags&(~O_NONBLOCK)) == -1 && errno == EINTR);
 #endif // __MINGW32__
   blocking = true;
 }

+ 16 - 0
src/Util.cc

@@ -42,6 +42,7 @@
 #include "BitfieldMan.h"
 #include "DefaultDiskWriter.h"
 #include "BinaryStream.h"
+#include "FatalException.h"
 #include <ctype.h>
 #include <errno.h>
 #include <fcntl.h>
@@ -849,3 +850,18 @@ string Util::toString(const BinaryStreamHandle& binaryStream)
   }
   return strm.str();
 }
+
+#ifdef HAVE_POSIX_MEMALIGN
+/**
+ * In linux 2.6, alignment and size should be a multiple of 512.
+ */
+void* Util::allocateAlignedMemory(size_t alignment, size_t size)
+{
+  void* buffer;
+  int32_t res;
+  if((res = posix_memalign(&buffer, alignment, size)) != 0) {
+    throw new FatalException("Error in posix_memalign: %s", strerror(res));
+  }
+  return buffer;
+}
+#endif // HAVE_POSIX_MEMALIGN

+ 4 - 0
src/Util.h

@@ -168,6 +168,10 @@ public:
 
   // binaryStream has to be opened before calling this function.
   static string toString(const BinaryStreamHandle& binaryStream);
+
+#ifdef HAVE_POSIX_MEMALIGN
+  static void* allocateAlignedMemory(size_t alignment, size_t size);
+#endif // HAVE_POSIX_MEMALIGN
 };
 
 #endif // _D_UTIL_H_

+ 4 - 0
src/a2io.h

@@ -116,4 +116,8 @@
 # define a2mkdir(path, openMode) mkdir(path, openMode)
 #endif // __MINGW32__
 
+#if defined HAVE_POSIX_MEMALIGN && O_DIRECT
+//# define ENABLE_DIRECT_IO 1
+#endif // HAVE_POSIX_MEMALIGN && O_DIRECT
+#undef HAVE_POSIX_MEMALIGN
 #endif // _D_A2IO_H_

+ 8 - 0
src/option_processing.cc

@@ -40,6 +40,7 @@
 #include "Util.h"
 #include "message.h"
 #include "Exception.h"
+#include "a2io.h"
 #include <fstream>
 #include <sstream>
 
@@ -125,6 +126,7 @@ Option* option_processing(int argc, char* const argv[])
   op->put(PREF_ENABLE_HTTP_PIPELINING, V_FALSE);
   op->put(PREF_MAX_HTTP_PIPELINING, "2");
   op->put(PREF_SEED_RATIO, "1.0");
+  op->put(PREF_ENABLE_DIRECT_IO, V_FALSE);
   while(1) {
     int optIndex = 0;
     int lopt;
@@ -173,6 +175,9 @@ Option* option_processing(int argc, char* const argv[])
       { "enable-http-keep-alive", optional_argument, &lopt, 207 },
       { "enable-http-pipelining", optional_argument, &lopt, 208 },
       { "no-file-allocation-limit", required_argument, &lopt, 209 },
+#ifdef ENABLE_DIRECT_IO
+      { PREF_ENABLE_DIRECT_IO, optional_argument, &lopt, 210 },
+#endif // ENABLE_DIRECT_IO
 #if defined ENABLE_BITTORRENT || ENABLE_METALINK
       { "show-files", no_argument, NULL, 'S' },
       { "select-file", required_argument, &lopt, 21 },
@@ -323,6 +328,9 @@ Option* option_processing(int argc, char* const argv[])
       case 209:
 	cmdstream << PREF_NO_FILE_ALLOCATION_LIMIT << "=" << optarg << "\n";
 	break;
+      case 210:
+	cmdstream << PREF_ENABLE_DIRECT_IO << "=" << toBoolArg(optarg) << "\n";
+	break;
       }
       break;
     }

+ 2 - 0
src/prefs.h

@@ -115,6 +115,8 @@
 #define PREF_AUTO_FILE_RENAMING "auto-file-renaming"
 // value: true | false
 #define PREF_PARAMETERIZED_URI "parameterized-uri"
+// value: true | false
+#define PREF_ENABLE_DIRECT_IO "enable-direct-io"
 
 /**
  * FTP related preferences

+ 10 - 3
src/version_usage.cc

@@ -33,6 +33,7 @@
  */
 /* copyright --> */
 #include "common.h"
+#include "a2io.h"
 #include "FeatureConfig.h"
 #ifdef ENABLE_MESSAGE_DIGEST
 # include "messageDigest.h"
@@ -161,9 +162,15 @@ void showUsage() {
 	    "                              Default: prealloc") << endl;
   cout << _(" --no-file-allocation-limit=SIZE No file allocation is made for files whose\n"
 	    "                              size is smaller than SIZE.\n"
-	    "                              You can append K or M(1K = 1024, 1M = 1024K).\n"
-	    "                              BitTorrent downloads ignore this option.\n"
-	    "                              Default: 5M") << endl;
+	    "                              You can append K or M(1K = 1024, 1M = 1024K).") << "\n"
+       << DEFAULT_MSG << "5M" << "\n";
+#ifdef ENABLE_DIRECT_IO
+  cout << _(" --enable-direct-io[=true|false] Enable directI/O, which lowers cpu usage while\n"
+	    "                              allocating files.\n"
+	    "                              Turn off if you encounter any error") << "\n"
+	    "                              Tested in Linux 2.6.21" << "\n"
+       << DEFAULT_MSG << "false" << "\n";
+#endif // ENABLE_DIRECT_IO
   cout << _(" --allow-overwrite=true|false If false, aria2 doesn't download a file which\n"
   		"                              already exists but the corresponding .aria2 file\n"
   		"                              doesn't exist.\n"

+ 13 - 10
test/MultiFileAllocationIteratorTest.cc

@@ -7,7 +7,7 @@ class MultiFileAllocationIteratorTest:public CppUnit::TestFixture {
 
   CPPUNIT_TEST_SUITE(MultiFileAllocationIteratorTest);
   CPPUNIT_TEST(testAllocate);
-  CPPUNIT_TEST(testMakeFileEntries);
+  CPPUNIT_TEST(testMakeDiskWriterEntries);
   CPPUNIT_TEST_SUITE_END();
 private:
 
@@ -15,13 +15,13 @@ public:
   void setUp() {}
 
   void testAllocate();
-  void testMakeFileEntries();
+  void testMakeDiskWriterEntries();
 };
 
 
 CPPUNIT_TEST_SUITE_REGISTRATION( MultiFileAllocationIteratorTest );
 
-void MultiFileAllocationIteratorTest::testMakeFileEntries()
+void MultiFileAllocationIteratorTest::testMakeDiskWriterEntries()
 {
   FileEntryHandle fs[] = {
     new FileEntry("file1", 1536, 0),
@@ -43,24 +43,27 @@ void MultiFileAllocationIteratorTest::testMakeFileEntries()
   fs[8]->setRequested(false);
   fs[9]->setRequested(false);
   
+  string storeDir = "/tmp/aria2_MultiFileAllocationIteratorTest_testMakeDiskWriterEntries";
   MultiDiskAdaptorHandle diskAdaptor = new MultiDiskAdaptor();
   diskAdaptor->setFileEntries(FileEntries(&fs[0], &fs[10]));
   diskAdaptor->setPieceLength(1024);
+  diskAdaptor->setStoreDir(storeDir);
+  diskAdaptor->openFile();
 
   MultiFileAllocationIteratorHandle itr = diskAdaptor->fileAllocationIterator();
 
-  FileEntries entries = itr->getFileEntries();
+  DiskWriterEntries entries = itr->getDiskWriterEntries();
 
   sort(entries.begin(), entries.end());
 
   CPPUNIT_ASSERT_EQUAL((size_t)6, entries.size());
 
-  CPPUNIT_ASSERT_EQUAL(string("file1"), entries[0]->getPath());
-  CPPUNIT_ASSERT_EQUAL(string("file2"), entries[1]->getPath());
-  CPPUNIT_ASSERT_EQUAL(string("file3"), entries[2]->getPath());
-  CPPUNIT_ASSERT_EQUAL(string("file6"), entries[3]->getPath());
-  CPPUNIT_ASSERT_EQUAL(string("file7"), entries[4]->getPath());
-  CPPUNIT_ASSERT_EQUAL(string("file8"), entries[5]->getPath());
+  CPPUNIT_ASSERT_EQUAL(storeDir+string("/file1"), entries[0]->getFilePath(storeDir));
+  CPPUNIT_ASSERT_EQUAL(storeDir+string("/file2"), entries[1]->getFilePath(storeDir));
+  CPPUNIT_ASSERT_EQUAL(storeDir+string("/file3"), entries[2]->getFilePath(storeDir));
+  CPPUNIT_ASSERT_EQUAL(storeDir+string("/file6"), entries[3]->getFilePath(storeDir));
+  CPPUNIT_ASSERT_EQUAL(storeDir+string("/file7"), entries[4]->getFilePath(storeDir));
+  CPPUNIT_ASSERT_EQUAL(storeDir+string("/file8"), entries[5]->getFilePath(storeDir));
 }
 
 void MultiFileAllocationIteratorTest::testAllocate()

+ 9 - 24
test/SingleFileAllocationIteratorTest.cc

@@ -1,7 +1,6 @@
 #include "SingleFileAllocationIterator.h"
 #include "File.h"
 #include "DefaultDiskWriter.h"
-#include "DirectDiskAdaptor.h"
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <fcntl.h>
@@ -31,38 +30,24 @@ void SingleFileAllocationIteratorTest::testAllocate()
   string fname = "aria2_SingleFileAllocationIteratorTest_testAllocate";
   string fn = dir+"/"+fname;
   ofstream of(fn.c_str());
-
   of << "0123456789";
-
   of.close();
 
   File x(fn);
   CPPUNIT_ASSERT_EQUAL((int64_t)10, x.size());
 
-
-  DefaultDiskWriterHandle writer = new DefaultDiskWriter();
-  DirectDiskAdaptorHandle diskAdaptor = new DirectDiskAdaptor();
-  diskAdaptor->setDiskWriter(writer);
-  diskAdaptor->setTotalLength(16*1024*2+8*1024);
-  diskAdaptor->setStoreDir(dir);
-  FileEntryHandle fileEntry = new FileEntry(fname,
-					    diskAdaptor->getTotalLength(),
-					    0);
-  FileEntries fs;
-  fs.push_back(fileEntry);
-  diskAdaptor->setFileEntries(fs);
+  DefaultDiskWriter writer;
+  int64_t offset = 10;
+  int64_t totalLength = 16*1024*2+8*1024;
 
   // we have to open file first.
-  diskAdaptor->openFile();
-  SingleFileAllocationIteratorHandle itr = diskAdaptor->fileAllocationIterator();
-
-  itr->allocateChunk();
-  CPPUNIT_ASSERT(!itr->finished());
-  itr->allocateChunk();
-  CPPUNIT_ASSERT(!itr->finished());
-  itr->allocateChunk();
-  CPPUNIT_ASSERT(itr->finished());
+  writer.openExistingFile(fn);
+  SingleFileAllocationIterator itr(&writer, offset, totalLength);
+  itr.init();
 
+  while(!itr.finished()) {
+    itr.allocateChunk();
+  }
   File f(fn);
   CPPUNIT_ASSERT_EQUAL((int64_t)40960, f.size());
 }