DownloadEngine.cc 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. /* <!-- copyright */
  2. /*
  3. * aria2 - a simple utility for downloading files faster
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
  20. */
  21. /* copyright --> */
  22. #include "DownloadEngine.h"
  23. #include "Util.h"
  24. #include "LogFactory.h"
  25. #include "TimeA2.h"
  26. #include <unistd.h>
  27. #include <sys/types.h>
  28. #include <sys/stat.h>
  29. #include <fcntl.h>
  30. #include <errno.h>
  31. #include <algorithm>
  32. using namespace std;
  33. DownloadEngine::DownloadEngine():noWait(false), segmentMan(0) {
  34. logger = LogFactory::getInstance();
  35. }
  36. DownloadEngine::~DownloadEngine() {
  37. delete segmentMan;
  38. cleanQueue();
  39. }
  40. void DownloadEngine::cleanQueue() {
  41. for_each(commands.begin(), commands.end(), Deleter());
  42. commands.clear();
  43. }
  44. void DownloadEngine::run() {
  45. initStatistics();
  46. Time cp;
  47. cp.setTimeInSec(0);
  48. Commands activeCommands;
  49. while(!commands.empty()) {
  50. if(cp.elapsed(1)) {
  51. cp.reset();
  52. int max = commands.size();
  53. for(int i = 0; i < max; i++) {
  54. Command* com = commands.front();
  55. commands.pop_front();
  56. if(com->execute()) {
  57. delete com;
  58. }
  59. }
  60. } else {
  61. for(Commands::iterator itr = activeCommands.begin();
  62. itr != activeCommands.end(); itr++) {
  63. Commands::iterator comItr = find(commands.begin(), commands.end(),
  64. *itr);
  65. assert(comItr != commands.end());
  66. Command* command = *itr;
  67. commands.erase(comItr);
  68. if(command->execute()) {
  69. delete command;
  70. }
  71. }
  72. }
  73. afterEachIteration();
  74. activeCommands.clear();
  75. if(!noWait && !commands.empty()) {
  76. waitData(activeCommands);
  77. }
  78. noWait = false;
  79. calculateStatistics();
  80. }
  81. onEndOfRun();
  82. }
  83. void DownloadEngine::shortSleep() const {
  84. struct timeval tv;
  85. tv.tv_sec = 0;
  86. tv.tv_usec = 1000;
  87. fd_set rfds;
  88. FD_ZERO(&rfds);
  89. select(0, &rfds, NULL, NULL, &tv);
  90. }
  91. class SetDescriptor {
  92. private:
  93. int* max_ptr;
  94. fd_set* rfds_ptr;
  95. fd_set* wfds_ptr;
  96. public:
  97. SetDescriptor(int* max_ptr, fd_set* rfds_ptr, fd_set* wfds_ptr):
  98. max_ptr(max_ptr),
  99. rfds_ptr(rfds_ptr),
  100. wfds_ptr(wfds_ptr) {}
  101. void operator()(const SocketEntry& entry) {
  102. int fd = entry.socket->getSockfd();
  103. switch(entry.type) {
  104. case SocketEntry::TYPE_RD:
  105. FD_SET(fd, rfds_ptr);
  106. break;
  107. case SocketEntry::TYPE_WR:
  108. FD_SET(fd, wfds_ptr);
  109. break;
  110. }
  111. if(*max_ptr < fd) {
  112. *max_ptr = fd;
  113. }
  114. }
  115. #ifdef HAVE_LIBARES
  116. void operator()(const NameResolverEntry& entry) {
  117. int tempFd = entry.nameResolver->getFds(rfds_ptr, wfds_ptr);
  118. if(*max_ptr < tempFd) {
  119. *max_ptr = tempFd;
  120. }
  121. }
  122. #endif // HAVE_LIBARES
  123. };
  124. class AccumulateActiveCommand {
  125. private:
  126. Commands* activeCommands_ptr;
  127. fd_set* rfds_ptr;
  128. fd_set* wfds_ptr;
  129. public:
  130. AccumulateActiveCommand(Commands* activeCommands_ptr,
  131. fd_set* rfds_ptr,
  132. fd_set* wfds_ptr):
  133. activeCommands_ptr(activeCommands_ptr),
  134. rfds_ptr(rfds_ptr),
  135. wfds_ptr(wfds_ptr) {}
  136. void operator()(const SocketEntry& entry) {
  137. if(FD_ISSET(entry.socket->getSockfd(), rfds_ptr) ||
  138. FD_ISSET(entry.socket->getSockfd(), wfds_ptr)) {
  139. activeCommands_ptr->push_back(entry.command);
  140. }
  141. /*
  142. switch(entry.type) {
  143. case SocketEntry::TYPE_RD:
  144. if(FD_ISSET(entry.socket->getSockfd(), rfds_ptr)) {
  145. activeCommands_ptr->push_back(entry.command);
  146. }
  147. break;
  148. case SocketEntry::TYPE_WR:
  149. if(FD_ISSET(entry.socket->getSockfd(), wfds_ptr)) {
  150. activeCommands_ptr->push_back(entry.command);
  151. }
  152. break;
  153. }
  154. */
  155. }
  156. #ifdef HAVE_LIBARES
  157. void operator()(const NameResolverEntry& entry) {
  158. entry.nameResolver->process(rfds_ptr, wfds_ptr);
  159. switch(entry.nameResolver->getStatus()) {
  160. case NameResolver::STATUS_SUCCESS:
  161. case NameResolver::STATUS_ERROR:
  162. activeCommands_ptr->push_back(entry.command);
  163. break;
  164. default:
  165. break;
  166. }
  167. }
  168. #endif // HAVE_LIBARES
  169. };
  170. void DownloadEngine::waitData(Commands& activeCommands) {
  171. fd_set rfds;
  172. fd_set wfds;
  173. int retval = 0;
  174. struct timeval tv;
  175. memcpy(&rfds, &rfdset, sizeof(fd_set));
  176. memcpy(&wfds, &wfdset, sizeof(fd_set));
  177. tv.tv_sec = 1;
  178. tv.tv_usec = 0;
  179. retval = select(fdmax+1, &rfds, &wfds, NULL, &tv);
  180. if(retval > 0) {
  181. for_each(socketEntries.begin(), socketEntries.end(),
  182. AccumulateActiveCommand(&activeCommands, &rfds, &wfds));
  183. #ifdef HAVE_LIBARES
  184. for_each(nameResolverEntries.begin(), nameResolverEntries.end(),
  185. AccumulateActiveCommand(&activeCommands, &rfds, &wfds));
  186. #endif // HAVE_LIBARES
  187. sort(activeCommands.begin(), activeCommands.end());
  188. activeCommands.erase(unique(activeCommands.begin(),
  189. activeCommands.end()),
  190. activeCommands.end());
  191. }
  192. }
  193. void DownloadEngine::updateFdSet() {
  194. fdmax = 0;
  195. FD_ZERO(&rfdset);
  196. FD_ZERO(&wfdset);
  197. #ifdef HAVE_LIBARES
  198. for_each(nameResolverEntries.begin(), nameResolverEntries.end(),
  199. SetDescriptor(&fdmax, &rfdset, &wfdset));
  200. #endif // HAVE_LIBARES
  201. for_each(socketEntries.begin(), socketEntries.end(),
  202. SetDescriptor(&fdmax, &rfdset, &wfdset));
  203. }
  204. bool DownloadEngine::addSocket(const SocketEntry& entry) {
  205. SocketEntries::iterator itr =
  206. find(socketEntries.begin(), socketEntries.end(), entry);
  207. if(itr == socketEntries.end()) {
  208. socketEntries.push_back(entry);
  209. updateFdSet();
  210. return true;
  211. } else {
  212. return false;
  213. }
  214. }
  215. bool DownloadEngine::deleteSocket(const SocketEntry& entry) {
  216. SocketEntries::iterator itr =
  217. find(socketEntries.begin(), socketEntries.end(), entry);
  218. if(itr == socketEntries.end()) {
  219. return false;
  220. } else {
  221. socketEntries.erase(itr);
  222. updateFdSet();
  223. return true;
  224. }
  225. }
  226. bool DownloadEngine::addSocketForReadCheck(const SocketHandle& socket,
  227. Command* command) {
  228. SocketEntry entry(socket, command, SocketEntry::TYPE_RD);
  229. return addSocket(entry);
  230. }
  231. bool DownloadEngine::deleteSocketForReadCheck(const SocketHandle& socket,
  232. Command* command) {
  233. SocketEntry entry(socket, command, SocketEntry::TYPE_RD);
  234. return deleteSocket(entry);
  235. }
  236. bool DownloadEngine::addSocketForWriteCheck(const SocketHandle& socket,
  237. Command* command) {
  238. SocketEntry entry(socket, command, SocketEntry::TYPE_WR);
  239. return addSocket(entry);
  240. }
  241. bool DownloadEngine::deleteSocketForWriteCheck(const SocketHandle& socket,
  242. Command* command) {
  243. SocketEntry entry(socket, command, SocketEntry::TYPE_WR);
  244. return deleteSocket(entry);
  245. }
  246. #ifdef HAVE_LIBARES
  247. bool DownloadEngine::addNameResolverCheck(const NameResolverHandle& resolver,
  248. Command* command) {
  249. NameResolverEntry entry(resolver, command);
  250. NameResolverEntries::iterator itr = find(nameResolverEntries.begin(),
  251. nameResolverEntries.end(),
  252. entry);
  253. if(itr == nameResolverEntries.end()) {
  254. nameResolverEntries.push_back(entry);
  255. updateFdSet();
  256. return true;
  257. } else {
  258. return false;
  259. }
  260. }
  261. bool DownloadEngine::deleteNameResolverCheck(const NameResolverHandle& resolver,
  262. Command* command) {
  263. NameResolverEntry entry(resolver, command);
  264. NameResolverEntries::iterator itr = find(nameResolverEntries.begin(),
  265. nameResolverEntries.end(),
  266. entry);
  267. if(itr == nameResolverEntries.end()) {
  268. return false;
  269. } else {
  270. nameResolverEntries.erase(itr);
  271. updateFdSet();
  272. return true;
  273. }
  274. }
  275. #endif // HAVE_LIBARES