DownloadEngine.cc 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947
  1. /* <!-- copyright */
  2. /*
  3. * aria2 - The high speed download utility
  4. *
  5. * Copyright (C) 2006 Tatsuhiro Tsujikawa
  6. *
  7. * This program is free software; you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation; either version 2 of the License, or
  10. * (at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program; if not, write to the Free Software
  19. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  20. *
  21. * In addition, as a special exception, the copyright holders give
  22. * permission to link the code of portions of this program with the
  23. * OpenSSL library under certain conditions as described in each
  24. * individual source file, and distribute linked combinations
  25. * including the two.
  26. * You must obey the GNU General Public License in all respects
  27. * for all of the code used other than OpenSSL. If you modify
  28. * file(s) with this exception, you may extend this exception to your
  29. * version of the file(s), but you are not obligated to do so. If you
  30. * do not wish to do so, delete this exception statement from your
  31. * version. If you delete this exception statement from all source
  32. * files in the program, then also delete it here.
  33. */
  34. /* copyright --> */
  35. #include "DownloadEngine.h"
  36. #ifdef ENABLE_ASYNC_DNS
  37. #include "AsyncNameResolver.h"
  38. #endif // ENABLE_ASYNC_DNS
  39. #include "StatCalc.h"
  40. #include "RequestGroup.h"
  41. #include "RequestGroupMan.h"
  42. #include "FileAllocationMan.h"
  43. #include "CheckIntegrityMan.h"
  44. #include "DownloadResult.h"
  45. #include "StatCalc.h"
  46. #include "LogFactory.h"
  47. #include "Logger.h"
  48. #include "TimeA2.h"
  49. #include "a2time.h"
  50. #include "Socket.h"
  51. #include "Util.h"
  52. #include "a2functional.h"
  53. #include "DlAbortEx.h"
  54. #include <signal.h>
  55. #include <cstring>
  56. #include <algorithm>
  57. #include <numeric>
  58. #include <cerrno>
  59. namespace aria2 {
  60. // 0 ... running
  61. // 1 ... stop signal detected
  62. // 2 ... stop signal processed by DownloadEngine
  63. // 3 ... 2nd stop signal(force shutdown) detected
  64. // 4 ... 2nd stop signal processed by DownloadEngine
  65. volatile sig_atomic_t globalHaltRequested = 0;
  66. CommandEvent::CommandEvent(Command* command, int events):
  67. _command(command), _events(events) {}
  68. bool CommandEvent::operator==(const CommandEvent& commandEvent) const
  69. {
  70. return _command == commandEvent._command;
  71. }
  72. int CommandEvent::getEvents() const
  73. {
  74. return _events;
  75. }
  76. void CommandEvent::addEvents(int events)
  77. {
  78. _events |= events;
  79. }
  80. void CommandEvent::removeEvents(int events)
  81. {
  82. _events &= (~events);
  83. }
  84. bool CommandEvent::eventsEmpty() const
  85. {
  86. return _events == 0;
  87. }
  88. void CommandEvent::processEvents(int events)
  89. {
  90. if((_events&events) ||
  91. ((SocketEntry::EVENT_ERROR|SocketEntry::EVENT_HUP)&events)) {
  92. _command->setStatusActive();
  93. }
  94. if(SocketEntry::EVENT_READ&events) {
  95. _command->readEventReceived();
  96. }
  97. if(SocketEntry::EVENT_WRITE&events) {
  98. _command->writeEventReceived();
  99. }
  100. if((SocketEntry::EVENT_ERROR|SocketEntry::EVENT_HUP)&events) {
  101. _command->errorEventRecieved();
  102. }
  103. }
  104. #if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
  105. ADNSEvent::ADNSEvent(const SharedHandle<AsyncNameResolver>& resolver,
  106. Command* command,
  107. sock_t socket, int events):
  108. _resolver(resolver), _command(command), _socket(socket), _events(events) {}
  109. bool ADNSEvent::operator==(const ADNSEvent& event) const
  110. {
  111. return _resolver == event._resolver;
  112. }
  113. int ADNSEvent::getEvents() const
  114. {
  115. return _events;
  116. }
  117. void ADNSEvent::processEvents(int events)
  118. {
  119. ares_socket_t readfd;
  120. ares_socket_t writefd;
  121. if(events&(SocketEntry::EVENT_READ|SocketEntry::EVENT_ERROR|SocketEntry::EVENT_HUP)) {
  122. readfd = _socket;
  123. } else {
  124. readfd = ARES_SOCKET_BAD;
  125. }
  126. if(events&(SocketEntry::EVENT_WRITE|SocketEntry::EVENT_ERROR|SocketEntry::EVENT_HUP)) {
  127. writefd = _socket;
  128. } else {
  129. writefd = ARES_SOCKET_BAD;
  130. }
  131. _resolver->process(readfd, writefd);
  132. _command->setStatusActive();
  133. }
  134. #endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
  135. SocketEntry::SocketEntry(sock_t socket):_socket(socket)
  136. {
  137. #ifdef HAVE_EPOLL
  138. memset(&_epEvent, 0, sizeof(struct epoll_event));
  139. #endif // HAVE_EPOLL
  140. }
  141. bool SocketEntry::operator==(const SocketEntry& entry) const
  142. {
  143. return _socket == entry._socket;
  144. }
  145. bool SocketEntry::operator<(const SocketEntry& entry) const
  146. {
  147. return _socket < entry._socket;
  148. }
  149. void SocketEntry::addCommandEvent(Command* command, int events)
  150. {
  151. CommandEvent cev(command, events);
  152. std::deque<CommandEvent>::iterator i = std::find(_commandEvents.begin(),
  153. _commandEvents.end(),
  154. cev);
  155. if(i == _commandEvents.end()) {
  156. _commandEvents.push_back(cev);
  157. } else {
  158. (*i).addEvents(events);
  159. }
  160. }
  161. void SocketEntry::removeCommandEvent(Command* command, int events)
  162. {
  163. CommandEvent cev(command, events);
  164. std::deque<CommandEvent>::iterator i = std::find(_commandEvents.begin(),
  165. _commandEvents.end(),
  166. cev);
  167. if(i == _commandEvents.end()) {
  168. // not found
  169. } else {
  170. (*i).removeEvents(events);
  171. if((*i).eventsEmpty()) {
  172. _commandEvents.erase(i);
  173. }
  174. }
  175. }
  176. #if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
  177. void SocketEntry::addADNSEvent(const SharedHandle<AsyncNameResolver>& resolver,
  178. Command* command, int events)
  179. {
  180. ADNSEvent aev(resolver, command, _socket, events);
  181. std::deque<ADNSEvent>::iterator i = std::find(_adnsEvents.begin(),
  182. _adnsEvents.end(),
  183. aev);
  184. if(i == _adnsEvents.end()) {
  185. _adnsEvents.push_back(aev);
  186. }
  187. }
  188. void SocketEntry::removeADNSEvent(const SharedHandle<AsyncNameResolver>& resolver,
  189. Command* command)
  190. {
  191. ADNSEvent aev(resolver, command, _socket, 0);
  192. std::deque<ADNSEvent>::iterator i = std::find(_adnsEvents.begin(),
  193. _adnsEvents.end(),
  194. aev);
  195. if(i == _adnsEvents.end()) {
  196. // not found
  197. } else {
  198. _adnsEvents.erase(i);
  199. }
  200. }
  201. #endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
  202. void SocketEntry::processEvents(int events)
  203. {
  204. std::for_each(_commandEvents.begin(), _commandEvents.end(),
  205. std::bind2nd(std::mem_fun_ref(&CommandEvent::processEvents),
  206. events));
  207. #if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
  208. std::for_each(_adnsEvents.begin(), _adnsEvents.end(),
  209. std::bind2nd(std::mem_fun_ref(&ADNSEvent::processEvents),
  210. events));
  211. #endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
  212. }
  213. sock_t SocketEntry::getSocket() const
  214. {
  215. return _socket;
  216. }
  217. bool SocketEntry::eventEmpty() const
  218. {
  219. #if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
  220. return _commandEvents.empty() && _adnsEvents.empty();
  221. #else // !(HAVE_EPOLL && ENABLE_ASYNC_DNS)
  222. return _commandEvents.empty();
  223. #endif // !(HAVE_EPOLL && ENABLE_ASYNC_DNS)
  224. }
  225. class AccEvent {
  226. public:
  227. int operator()(int events, const CommandEvent& commandEvent) const
  228. {
  229. return events|commandEvent.getEvents();
  230. }
  231. #if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
  232. int operator()(int events, const ADNSEvent& adnsEvent) const
  233. {
  234. return events|adnsEvent.getEvents();
  235. }
  236. #endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
  237. };
  238. #ifdef HAVE_EPOLL
  239. struct epoll_event& SocketEntry::getEpEvent()
  240. {
  241. _epEvent.data.ptr = this;
  242. #ifdef ENABLE_ASYNC_DNS
  243. _epEvent.events =
  244. std::accumulate(_adnsEvents.begin(),
  245. _adnsEvents.end(),
  246. std::accumulate(_commandEvents.begin(),
  247. _commandEvents.end(), 0, AccEvent()),
  248. AccEvent());
  249. #else // !ENABLE_ASYNC_DNS
  250. _epEvent.events =
  251. std::accumulate(_commandEvents.begin(), _commandEvents.end(), 0, AccEvent());
  252. #endif // !ENABLE_ASYNC_DNS
  253. return _epEvent;
  254. }
  255. #else // !HAVE_EPOLL
  256. int SocketEntry::getEvents()
  257. {
  258. return
  259. std::accumulate(_commandEvents.begin(), _commandEvents.end(), 0, AccEvent());
  260. }
  261. #endif // !HAVE_EPOLL
  262. #ifdef ENABLE_ASYNC_DNS
  263. AsyncNameResolverEntry::AsyncNameResolverEntry
  264. (const SharedHandle<AsyncNameResolver>& nameResolver, Command* command):
  265. _nameResolver(nameResolver), _command(command)
  266. #ifdef HAVE_EPOLL
  267. , _socketsSize(0)
  268. #endif // HAVE_EPOLL
  269. {}
  270. bool AsyncNameResolverEntry::operator==(const AsyncNameResolverEntry& entry)
  271. {
  272. return _nameResolver == entry._nameResolver &&
  273. _command == entry._command;
  274. }
  275. #ifdef HAVE_EPOLL
  276. void AsyncNameResolverEntry::addSocketEvents(DownloadEngine* e)
  277. {
  278. _socketsSize = 0;
  279. int mask = _nameResolver->getsock(_sockets);
  280. if(mask == 0) {
  281. return;
  282. }
  283. size_t i;
  284. for(i = 0; i < ARES_GETSOCK_MAXNUM; ++i) {
  285. //epoll_event_t* epEventPtr = &_epEvents[_socketsSize];
  286. int events = 0;
  287. if(ARES_GETSOCK_READABLE(mask, i)) {
  288. events |= EPOLLIN;
  289. }
  290. if(ARES_GETSOCK_WRITABLE(mask, i)) {
  291. events |= EPOLLOUT;
  292. }
  293. if(events == 0) {
  294. // assume no further sockets are returned.
  295. break;
  296. }
  297. e->addSocketEvents(_sockets[i], _command, events, _nameResolver);
  298. }
  299. _socketsSize = i;
  300. }
  301. void AsyncNameResolverEntry::removeSocketEvents(DownloadEngine* e)
  302. {
  303. for(size_t i = 0; i < _socketsSize; ++i) {
  304. e->deleteSocketEvents(_sockets[i], _command, 0, _nameResolver);
  305. }
  306. }
  307. #else // !HAVE_EPOLL
  308. int AsyncNameResolverEntry::getFds(fd_set* rfdsPtr, fd_set* wfdsPtr)
  309. {
  310. return _nameResolver->getFds(rfdsPtr, wfdsPtr);
  311. }
  312. void AsyncNameResolverEntry::process(fd_set* rfdsPtr, fd_set* wfdsPtr)
  313. {
  314. _nameResolver->process(rfdsPtr, wfdsPtr);
  315. switch(_nameResolver->getStatus()) {
  316. case AsyncNameResolver::STATUS_SUCCESS:
  317. case AsyncNameResolver::STATUS_ERROR:
  318. _command->setStatusActive();
  319. break;
  320. default:
  321. break;
  322. }
  323. }
  324. #endif // !HAVE_EPOLL
  325. #endif // ENABLE_ASYNC_DNS
  326. DownloadEngine::DownloadEngine():logger(LogFactory::getInstance()),
  327. _haltRequested(false),
  328. _noWait(false)
  329. {
  330. #ifdef HAVE_EPOLL
  331. _epfd = epoll_create(EPOLL_EVENTS_MAX);
  332. _epEvents = new struct epoll_event[EPOLL_EVENTS_MAX];
  333. #else // !HAVE_EPOLL
  334. updateFdSet();
  335. #endif // !HAVE_EPOLL
  336. }
  337. DownloadEngine::~DownloadEngine() {
  338. cleanQueue();
  339. #ifdef HAVE_EPOLL
  340. if(_epfd != -1) {
  341. int r;
  342. while((r = close(_epfd)) == -1 && errno == EINTR);
  343. if(r == -1) {
  344. logger->error("Error occurred while closing epoll file descriptor %d: %s",
  345. _epfd, strerror(errno));
  346. }
  347. }
  348. delete [] _epEvents;
  349. #endif // HAVE_EPOLL
  350. }
  351. void DownloadEngine::cleanQueue() {
  352. std::for_each(commands.begin(), commands.end(), Deleter());
  353. commands.clear();
  354. }
  355. static void executeCommand(std::deque<Command*>& commands,
  356. Command::STATUS statusFilter)
  357. {
  358. size_t max = commands.size();
  359. for(size_t i = 0; i < max; i++) {
  360. Command* com = commands.front();
  361. commands.pop_front();
  362. if(com->statusMatch(statusFilter)) {
  363. com->transitStatus();
  364. if(com->execute()) {
  365. delete com;
  366. com = 0;
  367. }
  368. } else {
  369. commands.push_back(com);
  370. }
  371. if(com) {
  372. com->clearIOEvents();
  373. }
  374. }
  375. }
  376. void DownloadEngine::run() {
  377. #ifdef HAVE_EPOLL
  378. if(_epfd == -1) {
  379. throw DlAbortEx("epoll_init() failed.");
  380. }
  381. #endif // HAVE_EPOLL
  382. Time cp;
  383. cp.setTimeInSec(0);
  384. while(!commands.empty() || !_routineCommands.empty()) {
  385. if(cp.elapsed(1)) {
  386. cp.reset();
  387. executeCommand(commands, Command::STATUS_ALL);
  388. } else {
  389. executeCommand(commands, Command::STATUS_ACTIVE);
  390. }
  391. executeCommand(_routineCommands, Command::STATUS_ALL);
  392. afterEachIteration();
  393. if(!commands.empty()) {
  394. waitData();
  395. }
  396. _noWait = false;
  397. calculateStatistics();
  398. }
  399. onEndOfRun();
  400. }
  401. void DownloadEngine::shortSleep() const {
  402. struct timeval tv;
  403. tv.tv_sec = 0;
  404. tv.tv_usec = 1000;
  405. fd_set rfds;
  406. FD_ZERO(&rfds);
  407. select(0, &rfds, NULL, NULL, &tv);
  408. }
  409. void DownloadEngine::waitData()
  410. {
  411. #ifdef HAVE_EPOLL
  412. // timeout is millisec
  413. int timeout = _noWait ? 0 : 1000;
  414. int res;
  415. while((res = epoll_wait(_epfd, _epEvents, EPOLL_EVENTS_MAX, timeout)) == -1 &&
  416. errno == EINTR);
  417. if(res > 0) {
  418. for(int i = 0; i < res; ++i) {
  419. SocketEntry* p = (SocketEntry*)_epEvents[i].data.ptr;
  420. p->processEvents(_epEvents[i].events);
  421. }
  422. }
  423. // TODO timeout of name resolver is determined in Command(AbstractCommand,
  424. // DHTEntryPoint...Command)
  425. #else // !HAVE_EPOLL
  426. fd_set rfds;
  427. fd_set wfds;
  428. struct timeval tv;
  429. memcpy(&rfds, &rfdset, sizeof(fd_set));
  430. memcpy(&wfds, &wfdset, sizeof(fd_set));
  431. #ifdef ENABLE_ASYNC_DNS
  432. for(std::deque<SharedHandle<AsyncNameResolverEntry> >::iterator itr =
  433. nameResolverEntries.begin(); itr != nameResolverEntries.end(); ++itr) {
  434. SharedHandle<AsyncNameResolverEntry>& entry = *itr;
  435. int fd = entry->getFds(&rfds, &wfds);
  436. // TODO force error if fd == 0
  437. if(fdmax < fd) {
  438. fdmax = fd;
  439. }
  440. }
  441. #endif // ENABLE_ASYNC_DNS
  442. tv.tv_sec = _noWait ? 0 : 1;
  443. tv.tv_usec = 0;
  444. int retval = select(fdmax+1, &rfds, &wfds, NULL, &tv);
  445. if(retval > 0) {
  446. for(std::deque<SharedHandle<SocketEntry> >::iterator i =
  447. socketEntries.begin(); i != socketEntries.end(); ++i) {
  448. int events = 0;
  449. if(FD_ISSET((*i)->getSocket(), &rfds)) {
  450. events |= SocketEntry::EVENT_READ;
  451. }
  452. if(FD_ISSET((*i)->getSocket(), &wfds)) {
  453. events |= SocketEntry::EVENT_WRITE;
  454. }
  455. (*i)->processEvents(events);
  456. }
  457. }
  458. #ifdef ENABLE_ASYNC_DNS
  459. for(std::deque<SharedHandle<AsyncNameResolverEntry> >::iterator i =
  460. nameResolverEntries.begin(); i != nameResolverEntries.end(); ++i) {
  461. (*i)->process(&rfds, &wfds);
  462. }
  463. #endif // ENABLE_ASYNC_DNS
  464. #endif // !HAVE_EPOLL
  465. }
  466. #ifndef HAVE_EPOLL
  467. void DownloadEngine::updateFdSet() {
  468. fdmax = 0;
  469. FD_ZERO(&rfdset);
  470. FD_ZERO(&wfdset);
  471. for(std::deque<SharedHandle<SocketEntry> >::iterator i =
  472. socketEntries.begin(); i != socketEntries.end(); ++i) {
  473. sock_t fd = (*i)->getSocket();
  474. int events = (*i)->getEvents();
  475. if(events&SocketEntry::EVENT_READ) {
  476. FD_SET(fd, &rfdset);
  477. }
  478. if(events&SocketEntry::EVENT_WRITE) {
  479. FD_SET(fd, &wfdset);
  480. }
  481. if(fdmax < fd) {
  482. fdmax = fd;
  483. }
  484. }
  485. }
  486. #endif // !HAVE_EPOLL
  487. bool DownloadEngine::addSocketEvents(sock_t socket, Command* command, int events
  488. #if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
  489. ,const SharedHandle<AsyncNameResolver>& rs
  490. #endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
  491. )
  492. {
  493. SharedHandle<SocketEntry> socketEntry(new SocketEntry(socket));
  494. std::deque<SharedHandle<SocketEntry> >::iterator i =
  495. std::lower_bound(socketEntries.begin(), socketEntries.end(), socketEntry);
  496. int r = 0;
  497. if(i != socketEntries.end() && (*i) == socketEntry) {
  498. #ifdef HAVE_EPOLL
  499. #ifdef ENABLE_ASYNC_DNS
  500. if(rs.isNull()) {
  501. (*i)->addCommandEvent(command, events);
  502. } else {
  503. (*i)->addADNSEvent(rs, command, events);
  504. }
  505. #else // !ENABLE_ASYNC_DNS
  506. (*i)->addCommandEvent(command, events);
  507. #endif // !ENABLE_ASYNC_DNS
  508. r = epoll_ctl(_epfd, EPOLL_CTL_MOD, (*i)->getSocket(), &(*i)->getEpEvent());
  509. if(r == -1) {
  510. // try EPOLL_CTL_ADD: There is a chance that previously socket X is
  511. // added to epoll, but it is closed and is not yet removed from
  512. // SocketEntries. In this case, EPOLL_CTL_MOD is failed with ENOENT.
  513. r = epoll_ctl(_epfd, EPOLL_CTL_ADD, (*i)->getSocket(), &(*i)->getEpEvent());
  514. }
  515. #else // !HAVE_EPOLL
  516. (*i)->addCommandEvent(command, events);
  517. #endif // !HAVE_EPOLL
  518. } else {
  519. socketEntries.insert(i, socketEntry);
  520. #ifdef HAVE_EPOLL
  521. #ifdef ENABLE_ASYNC_DNS
  522. if(rs.isNull()) {
  523. socketEntry->addCommandEvent(command, events);
  524. } else {
  525. socketEntry->addADNSEvent(rs, command, events);
  526. }
  527. #else // !ENABLE_ASYNC_DNS
  528. socketEntry->addCommandEvent(command, events);
  529. #endif // !ENABLE_ASYNC_DNS
  530. r = epoll_ctl(_epfd, EPOLL_CTL_ADD, socketEntry->getSocket(), &socketEntry->getEpEvent());
  531. #else // !HAVE_EPOLL
  532. socketEntry->addCommandEvent(command, events);
  533. #endif // !HAVE_EPOLL
  534. }
  535. #ifndef HAVE_EPOLL
  536. updateFdSet();
  537. #endif // !HAVE_EPOLL
  538. if(r == -1) {
  539. logger->debug("Failed to add socket event %d:%s", socket, strerror(errno));
  540. return false;
  541. } else {
  542. return true;
  543. }
  544. }
  545. bool DownloadEngine::deleteSocketEvents(sock_t socket, Command* command, int events
  546. #if defined HAVE_EPOLL && defined ENABLE_ASYNC_DNS
  547. ,const SharedHandle<AsyncNameResolver>& rs
  548. #endif // HAVE_EPOLL && ENABLE_ASYNC_DNS
  549. )
  550. {
  551. SharedHandle<SocketEntry> socketEntry(new SocketEntry(socket));
  552. std::deque<SharedHandle<SocketEntry> >::iterator i =
  553. std::lower_bound(socketEntries.begin(), socketEntries.end(), socketEntry);
  554. if(i != socketEntries.end() && (*i) == socketEntry) {
  555. #ifdef HAVE_EPOLL
  556. #ifdef ENABLE_ASYNC_DNS
  557. if(rs.isNull()) {
  558. (*i)->removeCommandEvent(command, events);
  559. } else {
  560. (*i)->removeADNSEvent(rs, command);
  561. }
  562. #else // !ENABLE_ASYNC_DNS
  563. (*i)->removeCommandEvent(command, events);
  564. #endif // !ENABLE_ASYNC_DNS
  565. #else // !HAVE_EPOLL
  566. (*i)->removeCommandEvent(command, events);
  567. #endif // !HAVE_EPOLL
  568. int r = 0;
  569. if((*i)->eventEmpty()) {
  570. #ifdef HAVE_EPOLL
  571. r = epoll_ctl(_epfd, EPOLL_CTL_DEL, (*i)->getSocket(), 0);
  572. #endif // HAVE_EPOLL
  573. socketEntries.erase(i);
  574. } else {
  575. #ifdef HAVE_EPOLL
  576. // If socket is closed, then it seems it is automatically removed from
  577. // epoll, so following EPOLL_CTL_MOD may fail.
  578. r = epoll_ctl(_epfd, EPOLL_CTL_MOD, (*i)->getSocket(), &(*i)->getEpEvent());
  579. if(r == -1) {
  580. logger->debug("Failed to delete socket event, but may be ignored:%s", strerror(errno));
  581. }
  582. #endif // HAVE_EPOLL
  583. }
  584. #ifndef HAVE_EPOLL
  585. updateFdSet();
  586. #endif // !HAVE_EPOLL
  587. if(r == -1) {
  588. logger->debug("Failed to delete socket event:%s", strerror(errno));
  589. return false;
  590. } else {
  591. return true;
  592. }
  593. } else {
  594. logger->debug("Socket %d is not found in SocketEntries.", socket);
  595. return false;
  596. }
  597. }
  598. bool DownloadEngine::addSocketForReadCheck(const SocketHandle& socket,
  599. Command* command)
  600. {
  601. return addSocketEvents(socket->getSockfd(), command, SocketEntry::EVENT_READ);
  602. }
  603. bool DownloadEngine::deleteSocketForReadCheck(const SocketHandle& socket,
  604. Command* command)
  605. {
  606. return deleteSocketEvents(socket->getSockfd(), command, SocketEntry::EVENT_READ);
  607. }
  608. bool DownloadEngine::addSocketForWriteCheck(const SocketHandle& socket,
  609. Command* command)
  610. {
  611. return addSocketEvents(socket->getSockfd(), command, SocketEntry::EVENT_WRITE);
  612. }
  613. bool DownloadEngine::deleteSocketForWriteCheck(const SocketHandle& socket,
  614. Command* command)
  615. {
  616. return deleteSocketEvents(socket->getSockfd(), command, SocketEntry::EVENT_WRITE);
  617. }
  618. void DownloadEngine::calculateStatistics()
  619. {
  620. if(!_statCalc.isNull()) {
  621. _statCalc->calculateStat(_requestGroupMan, _fileAllocationMan, _checkIntegrityMan);
  622. }
  623. }
  624. void DownloadEngine::onEndOfRun()
  625. {
  626. _requestGroupMan->closeFile();
  627. _requestGroupMan->save();
  628. }
  629. void DownloadEngine::afterEachIteration()
  630. {
  631. if(globalHaltRequested == 1) {
  632. logger->notice(_("Shutdown sequence commencing... Press Ctrl-C again for emergency shutdown."));
  633. requestHalt();
  634. globalHaltRequested = 2;
  635. } else if(globalHaltRequested == 3) {
  636. logger->notice(_("Emergency shutdown sequence commencing..."));
  637. _requestGroupMan->forceHalt();
  638. globalHaltRequested = 4;
  639. }
  640. }
  641. void DownloadEngine::requestHalt()
  642. {
  643. _haltRequested = true;
  644. _requestGroupMan->halt();
  645. }
  646. void DownloadEngine::fillCommand()
  647. {
  648. std::deque<Command*> commands;
  649. _requestGroupMan->getInitialCommands(commands, this);
  650. addCommand(commands);
  651. }
  652. void DownloadEngine::setStatCalc(const StatCalcHandle& statCalc)
  653. {
  654. _statCalc = statCalc;
  655. }
  656. void DownloadEngine::addCommand(const Commands& commands)
  657. {
  658. this->commands.insert(this->commands.end(), commands.begin(), commands.end());
  659. }
  660. #ifdef ENABLE_ASYNC_DNS
  661. bool DownloadEngine::addNameResolverCheck
  662. (const SharedHandle<AsyncNameResolver>& resolver, Command* command)
  663. {
  664. SharedHandle<AsyncNameResolverEntry> entry
  665. (new AsyncNameResolverEntry(resolver, command));
  666. std::deque<SharedHandle<AsyncNameResolverEntry> >::iterator itr =
  667. std::find(nameResolverEntries.begin(), nameResolverEntries.end(), entry);
  668. if(itr == nameResolverEntries.end()) {
  669. nameResolverEntries.push_back(entry);
  670. #ifdef HAVE_EPOLL
  671. entry->addSocketEvents(this);
  672. #endif // HAVE_EPOLL
  673. return true;
  674. } else {
  675. return false;
  676. }
  677. }
  678. bool DownloadEngine::deleteNameResolverCheck
  679. (const SharedHandle<AsyncNameResolver>& resolver, Command* command)
  680. {
  681. SharedHandle<AsyncNameResolverEntry> entry
  682. (new AsyncNameResolverEntry(resolver, command));
  683. std::deque<SharedHandle<AsyncNameResolverEntry> >::iterator itr =
  684. std::find(nameResolverEntries.begin(), nameResolverEntries.end(), entry);
  685. if(itr == nameResolverEntries.end()) {
  686. return false;
  687. } else {
  688. #ifdef HAVE_EPOLL
  689. (*itr)->removeSocketEvents(this);
  690. #endif // HAVE_EPOLL
  691. nameResolverEntries.erase(itr);
  692. return true;
  693. }
  694. }
  695. #endif // ENABLE_ASYNC_DNS
  696. void DownloadEngine::setNoWait(bool b)
  697. {
  698. _noWait = b;
  699. }
  700. void DownloadEngine::addRoutineCommand(Command* command)
  701. {
  702. _routineCommands.push_back(command);
  703. }
  704. void DownloadEngine::poolSocket(const std::string& ipaddr, uint16_t port,
  705. const SharedHandle<SocketCore>& sock,
  706. time_t timeout)
  707. {
  708. std::string addr = ipaddr+":"+Util::uitos(port);
  709. logger->info("Pool socket for %s", addr.c_str());
  710. SocketPoolEntry e(sock, timeout);
  711. std::multimap<std::string, SocketPoolEntry>::value_type p(addr, e);
  712. _socketPool.insert(p);
  713. }
  714. SharedHandle<SocketCore>
  715. DownloadEngine::popPooledSocket(const std::string& ipaddr, uint16_t port)
  716. {
  717. SharedHandle<SocketCore> s;
  718. std::string addr = ipaddr+":"+Util::uitos(port);
  719. std::multimap<std::string, SocketPoolEntry>::iterator first = _socketPool.find(addr);
  720. for(std::multimap<std::string, SocketPoolEntry>::iterator i = first;
  721. i != _socketPool.end() && (*i).first == addr; ++i) {
  722. const SocketPoolEntry& e = (*i).second;
  723. if(!e.isTimeout()) {
  724. logger->info("Reuse socket for %s", addr.c_str());
  725. s = e.getSocket();
  726. _socketPool.erase(first, ++i);
  727. break;
  728. }
  729. }
  730. return s;
  731. }
  732. SharedHandle<SocketCore>
  733. DownloadEngine::popPooledSocket
  734. (const std::deque<std::string>& ipaddrs, uint16_t port)
  735. {
  736. for(std::deque<std::string>::const_iterator i = ipaddrs.begin();
  737. i != ipaddrs.end(); ++i) {
  738. SharedHandle<SocketCore> s = popPooledSocket(*i, port);
  739. if(!s.isNull()) {
  740. return s;
  741. }
  742. }
  743. return SharedHandle<SocketCore>();
  744. }
  745. DownloadEngine::SocketPoolEntry::SocketPoolEntry
  746. (const SharedHandle<SocketCore>& socket,
  747. time_t timeout):
  748. _socket(socket),
  749. _timeout(timeout) {}
  750. DownloadEngine::SocketPoolEntry::~SocketPoolEntry() {}
  751. bool DownloadEngine::SocketPoolEntry::isTimeout() const
  752. {
  753. return _registeredTime.elapsed(_timeout);
  754. }
  755. SharedHandle<SocketCore> DownloadEngine::SocketPoolEntry::getSocket() const
  756. {
  757. return _socket;
  758. }
  759. } // namespace aria2