/* */ #include "EpollEventPoll.h" #include #include #include #include "Command.h" #include "LogFactory.h" #include "Logger.h" namespace aria2 { EpollEventPoll::KSocketEntry::KSocketEntry(sock_t s): SocketEntry(s) {} int accumulateEvent(int events, const EpollEventPoll::KEvent& event) { return events|event.getEvents(); } struct epoll_event EpollEventPoll::KSocketEntry::getEvents() { struct epoll_event epEvent; memset(&epEvent, 0, sizeof(struct epoll_event)); epEvent.data.ptr = this; #ifdef ENABLE_ASYNC_DNS epEvent.events = std::accumulate(_adnsEvents.begin(), _adnsEvents.end(), std::accumulate(_commandEvents.begin(), _commandEvents.end(), 0, accumulateEvent), accumulateEvent); #else // !ENABLE_ASYNC_DNS epEvent.events = std::accumulate(_commandEvents.begin(), _commandEvents.end(), 0, accumulateEvent); #endif // !ENABLE_ASYNC_DNS return epEvent; } EpollEventPoll::EpollEventPoll(): _epEventsSize(EPOLL_EVENTS_MAX), _epEvents(new struct epoll_event[_epEventsSize]), _logger(LogFactory::getInstance()) { _epfd = epoll_create(EPOLL_EVENTS_MAX); } EpollEventPoll::~EpollEventPoll() { if(_epfd != -1) { int r; while((r = close(_epfd)) == -1 && errno == EINTR); if(r == -1) { _logger->error("Error occurred while closing epoll file descriptor" " %d: %s", _epfd, strerror(errno)); } } delete [] _epEvents; } bool EpollEventPoll::good() const { return _epfd != -1; } void EpollEventPoll::poll(const struct timeval& tv) { // timeout is millisec int timeout = tv.tv_sec*1000+tv.tv_usec/1000; int res; while((res = epoll_wait(_epfd, _epEvents, EPOLL_EVENTS_MAX, timeout)) == -1 && errno == EINTR); if(res > 0) { for(int i = 0; i < res; ++i) { KSocketEntry* p = reinterpret_cast(_epEvents[i].data.ptr); p->processEvents(_epEvents[i].events); } } #ifdef ENABLE_ASYNC_DNS // It turns out that we have to call ares_process_fd before ares's // own timeout and ares may create new sockets or closes socket in // their API. So we call ares_process_fd for all ares_channel and // re-register their sockets. for(std::deque >::iterator i = _nameResolverEntries.begin(), eoi = _nameResolverEntries.end(); i != eoi; ++i) { (*i)->processTimeout(); (*i)->removeSocketEvents(this); (*i)->addSocketEvents(this); } #endif // ENABLE_ASYNC_DNS // TODO timeout of name resolver is determined in Command(AbstractCommand, // DHTEntryPoint...Command) } static int translateEvents(EventPoll::EventType events) { int newEvents = 0; if(EventPoll::EVENT_READ&events) { newEvents |= EPOLLIN; } if(EventPoll::EVENT_WRITE&events) { newEvents |= EPOLLOUT; } if(EventPoll::EVENT_ERROR&events) { newEvents |= EPOLLERR; } if(EventPoll::EVENT_HUP&events) { newEvents |= EPOLLHUP; } return newEvents; } bool EpollEventPoll::addEvents(sock_t socket, const EpollEventPoll::KEvent& event) { SharedHandle socketEntry(new KSocketEntry(socket)); std::deque >::iterator i = std::lower_bound(_socketEntries.begin(), _socketEntries.end(), socketEntry); int r = 0; if(i != _socketEntries.end() && (*i) == socketEntry) { event.addSelf(*i); struct epoll_event epEvent = (*i)->getEvents(); r = epoll_ctl(_epfd, EPOLL_CTL_MOD, (*i)->getSocket(), &epEvent); if(r == -1) { // try EPOLL_CTL_ADD: There is a chance that previously socket X is // added to epoll, but it is closed and is not yet removed from // SocketEntries. In this case, EPOLL_CTL_MOD is failed with ENOENT. r = epoll_ctl(_epfd, EPOLL_CTL_ADD, (*i)->getSocket(), &epEvent); } } else { _socketEntries.insert(i, socketEntry); if(_socketEntries.size() > _epEventsSize) { _epEventsSize *= 2; delete [] _epEvents; _epEvents = new struct epoll_event[_epEventsSize]; } event.addSelf(socketEntry); struct epoll_event epEvent = socketEntry->getEvents(); r = epoll_ctl(_epfd, EPOLL_CTL_ADD, socketEntry->getSocket(), &epEvent); } if(r == -1) { if(_logger->debug()) { _logger->debug("Failed to add socket event %d:%s", socket, strerror(errno)); } return false; } else { return true; } } bool EpollEventPoll::addEvents(sock_t socket, Command* command, EventPoll::EventType events) { int epEvents = translateEvents(events); return addEvents(socket, KCommandEvent(command, epEvents)); } #ifdef ENABLE_ASYNC_DNS bool EpollEventPoll::addEvents(sock_t socket, Command* command, int events, const SharedHandle& rs) { return addEvents(socket, KADNSEvent(rs, command, socket, events)); } #endif // ENABLE_ASYNC_DNS bool EpollEventPoll::deleteEvents(sock_t socket, const EpollEventPoll::KEvent& event) { SharedHandle socketEntry(new KSocketEntry(socket)); std::deque >::iterator i = std::lower_bound(_socketEntries.begin(), _socketEntries.end(), socketEntry); if(i != _socketEntries.end() && (*i) == socketEntry) { event.removeSelf(*i); int r = 0; if((*i)->eventEmpty()) { // In kernel before 2.6.9, epoll_ctl with EPOLL_CTL_DEL requires non-null // pointer of epoll_event. struct epoll_event ev = {0,{0}}; r = epoll_ctl(_epfd, EPOLL_CTL_DEL, (*i)->getSocket(), &ev); _socketEntries.erase(i); } else { // If socket is closed, then it seems it is automatically removed from // epoll, so following EPOLL_CTL_MOD may fail. struct epoll_event epEvent = (*i)->getEvents(); r = epoll_ctl(_epfd, EPOLL_CTL_MOD, (*i)->getSocket(), &epEvent); if(r == -1) { if(_logger->debug()) { _logger->debug("Failed to delete socket event, but may be ignored:%s", strerror(errno)); } } } if(r == -1) { if(_logger->debug()) { _logger->debug("Failed to delete socket event:%s", strerror(errno)); } return false; } else { return true; } } else { if(_logger->debug()) { _logger->debug("Socket %d is not found in SocketEntries.", socket); } return false; } } #ifdef ENABLE_ASYNC_DNS bool EpollEventPoll::deleteEvents(sock_t socket, Command* command, const SharedHandle& rs) { return deleteEvents(socket, KADNSEvent(rs, command, socket, 0)); } #endif // ENABLE_ASYNC_DNS bool EpollEventPoll::deleteEvents(sock_t socket, Command* command, EventPoll::EventType events) { int epEvents = translateEvents(events); return deleteEvents(socket, KCommandEvent(command, epEvents)); } #ifdef ENABLE_ASYNC_DNS bool EpollEventPoll::addNameResolver (const SharedHandle& resolver, Command* command) { SharedHandle entry (new KAsyncNameResolverEntry(resolver, command)); std::deque >::iterator itr = std::find(_nameResolverEntries.begin(), _nameResolverEntries.end(), entry); if(itr == _nameResolverEntries.end()) { _nameResolverEntries.push_back(entry); entry->addSocketEvents(this); return true; } else { return false; } } bool EpollEventPoll::deleteNameResolver (const SharedHandle& resolver, Command* command) { SharedHandle entry (new KAsyncNameResolverEntry(resolver, command)); std::deque >::iterator itr = std::find(_nameResolverEntries.begin(), _nameResolverEntries.end(), entry); if(itr == _nameResolverEntries.end()) { return false; } else { (*itr)->removeSocketEvents(this); _nameResolverEntries.erase(itr); return true; } } #endif // ENABLE_ASYNC_DNS } // namespace aria2