/* */ #include "DownloadEngine.h" #include "Util.h" #include "LogFactory.h" #include "TimeA2.h" #include #include #include #include #include #include using namespace std; DownloadEngine::DownloadEngine():noWait(false), segmentMan(0) { logger = LogFactory::getInstance(); } DownloadEngine::~DownloadEngine() { cleanQueue(); delete segmentMan; } void DownloadEngine::cleanQueue() { for_each(commands.begin(), commands.end(), Deleter()); commands.clear(); } void DownloadEngine::run() { initStatistics(); Time cp; cp.setTimeInSec(0); Commands activeCommands; while(!commands.empty()) { if(cp.elapsed(1)) { cp.reset(); int max = commands.size(); for(int i = 0; i < max; i++) { Command* com = commands.front(); commands.pop_front(); if(com->execute()) { delete com; } } } else { for(Commands::iterator itr = activeCommands.begin(); itr != activeCommands.end(); itr++) { Commands::iterator comItr = find(commands.begin(), commands.end(), *itr); assert(comItr != commands.end()); Command* command = *itr; commands.erase(comItr); if(command->execute()) { delete command; } } } afterEachIteration(); activeCommands.clear(); if(!noWait && !commands.empty()) { waitData(activeCommands); } noWait = false; calculateStatistics(); } onEndOfRun(); } void DownloadEngine::shortSleep() const { struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 1000; fd_set rfds; FD_ZERO(&rfds); select(0, &rfds, NULL, NULL, &tv); } void DownloadEngine::waitData(Commands& activeCommands) { fd_set rfds; fd_set wfds; int retval = 0; struct timeval tv; memcpy(&rfds, &rfdset, sizeof(fd_set)); memcpy(&wfds, &wfdset, sizeof(fd_set)); tv.tv_sec = 1; tv.tv_usec = 0; retval = select(fdmax+1, &rfds, &wfds, NULL, &tv); if(retval > 0) { for(SocketEntries::iterator itr = socketEntries.begin(); itr != socketEntries.end(); ++itr) { SocketEntry& entry = *itr; if(FD_ISSET(entry.socket->getSockfd(), &rfds) || FD_ISSET(entry.socket->getSockfd(), &wfds)) { if(find(activeCommands.begin(), activeCommands.end(), entry.command) == activeCommands.end()) { activeCommands.push_back(entry.command); } } } #ifdef ENABLE_ASYNC_DNS for(NameResolverEntries::iterator itr = nameResolverEntries.begin(); itr != nameResolverEntries.end(); ++itr) { NameResolverEntry& entry = *itr; entry.nameResolver->process(&rfds, &wfds); switch(entry.nameResolver->getStatus()) { case NameResolver::STATUS_SUCCESS: case NameResolver::STATUS_ERROR: if(find(activeCommands.begin(), activeCommands.end(), entry.command) == activeCommands.end()) { activeCommands.push_back(entry.command); } break; default: break; } } #endif // ENABLE_ASYNC_DNS } } void DownloadEngine::updateFdSet() { fdmax = 0; FD_ZERO(&rfdset); FD_ZERO(&wfdset); #ifdef ENABLE_ASYNC_DNS for(NameResolverEntries::iterator itr = nameResolverEntries.begin(); itr != nameResolverEntries.end(); ++itr) { NameResolverEntry& entry = *itr; int fd = entry.nameResolver->getFds(&rfdset, &wfdset); if(fdmax < fd) { fdmax = fd; } } #endif // ENABLE_ASYNC_DNS for(SocketEntries::iterator itr = socketEntries.begin(); itr != socketEntries.end(); ++itr) { SocketEntry& entry = *itr; int fd = entry.socket->getSockfd(); switch(entry.type) { case SocketEntry::TYPE_RD: FD_SET(fd, &rfdset); break; case SocketEntry::TYPE_WR: FD_SET(fd, &wfdset); break; } if(fdmax < fd) { fdmax = fd; } } } bool DownloadEngine::addSocket(const SocketEntry& entry) { SocketEntries::iterator itr = find(socketEntries.begin(), socketEntries.end(), entry); if(itr == socketEntries.end()) { socketEntries.push_back(entry); updateFdSet(); return true; } else { return false; } } bool DownloadEngine::deleteSocket(const SocketEntry& entry) { SocketEntries::iterator itr = find(socketEntries.begin(), socketEntries.end(), entry); if(itr == socketEntries.end()) { return false; } else { socketEntries.erase(itr); updateFdSet(); return true; } } bool DownloadEngine::addSocketForReadCheck(const SocketHandle& socket, Command* command) { SocketEntry entry(socket, command, SocketEntry::TYPE_RD); return addSocket(entry); } bool DownloadEngine::deleteSocketForReadCheck(const SocketHandle& socket, Command* command) { SocketEntry entry(socket, command, SocketEntry::TYPE_RD); return deleteSocket(entry); } bool DownloadEngine::addSocketForWriteCheck(const SocketHandle& socket, Command* command) { SocketEntry entry(socket, command, SocketEntry::TYPE_WR); return addSocket(entry); } bool DownloadEngine::deleteSocketForWriteCheck(const SocketHandle& socket, Command* command) { SocketEntry entry(socket, command, SocketEntry::TYPE_WR); return deleteSocket(entry); } #ifdef ENABLE_ASYNC_DNS bool DownloadEngine::addNameResolverCheck(const NameResolverHandle& resolver, Command* command) { NameResolverEntry entry(resolver, command); NameResolverEntries::iterator itr = find(nameResolverEntries.begin(), nameResolverEntries.end(), entry); if(itr == nameResolverEntries.end()) { nameResolverEntries.push_back(entry); updateFdSet(); return true; } else { return false; } } bool DownloadEngine::deleteNameResolverCheck(const NameResolverHandle& resolver, Command* command) { NameResolverEntry entry(resolver, command); NameResolverEntries::iterator itr = find(nameResolverEntries.begin(), nameResolverEntries.end(), entry); if(itr == nameResolverEntries.end()) { return false; } else { nameResolverEntries.erase(itr); updateFdSet(); return true; } } #endif // ENABLE_ASYNC_DNS