From f873b1803608955ff60952640a4a91c427077b3d Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 25 Oct 2018 08:39:32 -0400 Subject: [PATCH] fix up event loop --- llarp/ev.cpp | 35 ++++++++++++++------------- llarp/ev.hpp | 37 +++++++++++++++++++---------- llarp/ev_epoll.hpp | 42 ++++++++++++++++---------------- llarp/ev_kqueue.hpp | 58 +++++++++++++++++++++++++-------------------- 4 files changed, 98 insertions(+), 74 deletions(-) diff --git a/llarp/ev.cpp b/llarp/ev.cpp index 4778b384f..d3592d2ee 100644 --- a/llarp/ev.cpp +++ b/llarp/ev.cpp @@ -122,7 +122,9 @@ llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const void *pkt, { constexpr size_t buffsz = llarp::ev_io::WriteBuffer::BufferSize; const byte_t *ptr = (const byte_t *)pkt; - llarp::ev_io *impl = static_cast< llarp::ev_io * >(conn->impl); + llarp::tcp_conn *impl = static_cast< llarp::tcp_conn * >(conn->impl); + if(impl->_shouldClose) + return false; while(sz > buffsz) { if(!impl->queue_write((const byte_t *)ptr, buffsz)) @@ -170,25 +172,26 @@ llarp_ev_tun_async_write(struct llarp_tun_io *tun, const void *buf, size_t sz) void llarp_tcp_conn_close(struct llarp_tcp_conn *conn) { - if(!conn) - return; - if(conn->impl) - { - llarp::ev_io *impl = static_cast< llarp::ev_io * >(conn->impl); - // deregister and dealloc - conn->loop->close_ev(impl); - conn->impl = nullptr; - } - // call hook if needed - if(conn->closed) - conn->closed(conn); - - // delete conn - delete conn; + static_cast< llarp::tcp_conn * >(conn->impl)->_shouldClose = true; } namespace llarp { + bool + tcp_conn::tick() + { + if(_shouldClose) + { + if(tcp->closed) + tcp->closed(tcp); + delete tcp; + return false; + } + else if(tcp->tick) + tcp->tick(tcp); + return true; + } + int tcp_serv::read(void *, size_t) { diff --git a/llarp/ev.hpp b/llarp/ev.hpp index 94cd51a53..72a3f9b13 100644 --- a/llarp/ev.hpp +++ b/llarp/ev.hpp @@ -85,7 +85,7 @@ namespace llarp #ifndef _WIN32 int fd; - + int flags = 0; ev_io(int f) : fd(f) { } @@ -123,8 +123,12 @@ namespace llarp return -1; }; - virtual void - tick(){}; + /// return false if we want to deregister and remove ourselves + virtual bool + tick() + { + return true; + }; /// used for tun interface and tcp conn ssize_t @@ -216,6 +220,7 @@ namespace llarp struct tcp_conn : public ev_io { + bool _shouldClose = false; llarp_tcp_conn* tcp; tcp_conn(int fd, llarp_tcp_conn* conn) : ev_io(fd, new LosslessWriteQueue_t()), tcp(conn) @@ -246,12 +251,8 @@ namespace llarp return 0; } - void - tick() - { - if(tcp->tick) - tcp->tick(tcp); - } + bool + tick(); int sendto(const sockaddr*, const void*, size_t) @@ -269,11 +270,12 @@ namespace llarp { } - void + bool tick() { if(tcp->tick) tcp->tick(tcp); + return true; } /// actually does accept() :^) @@ -318,6 +320,7 @@ struct llarp_ev_loop virtual bool udp_close(llarp_udp_io* l) = 0; + /// deregister event listener virtual bool close_ev(llarp::ev_io* ev) = 0; @@ -327,6 +330,7 @@ struct llarp_ev_loop virtual llarp::ev_io* bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* addr) = 0; + /// register event listener virtual bool add_ev(llarp::ev_io* ev, bool write = false) = 0; @@ -340,8 +344,17 @@ struct llarp_ev_loop void tick_listeners() { - for(const auto& h : handlers) - h->tick(); + auto itr = handlers.begin(); + while(itr != handlers.end()) + { + if((*itr)->tick()) + ++itr; + else + { + close_ev(itr->get()); + itr = handlers.erase(itr); + } + } } }; diff --git a/llarp/ev_epoll.hpp b/llarp/ev_epoll.hpp index feec92444..3215de012 100644 --- a/llarp/ev_epoll.hpp +++ b/llarp/ev_epoll.hpp @@ -28,11 +28,12 @@ namespace llarp { } - void + bool tick() { if(udp->tick) udp->tick(udp); + return true; } int @@ -93,12 +94,12 @@ namespace llarp return -1; } - void + bool tick() { if(t->tick) t->tick(t); - flush_write(); + return true; } void @@ -201,14 +202,14 @@ struct llarp_epoll_loop : public llarp_ev_loop llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr); if(ev) { - if(events[idx].events & EPOLLOUT) - { - ev->flush_write(); - } if(events[idx].events & EPOLLIN) { ev->read(readbuf, sizeof(readbuf)); } + if(events[idx].events & EPOLLOUT) + { + ev->flush_write(); + } } ++idx; } @@ -234,14 +235,14 @@ struct llarp_epoll_loop : public llarp_ev_loop llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr); if(ev) { - if(events[idx].events & EPOLLOUT) - { - ev->flush_write(); - } if(events[idx].events & EPOLLIN) { ev->read(readbuf, sizeof(readbuf)); } + if(events[idx].events & EPOLLOUT) + { + ev->flush_write(); + } } ++idx; } @@ -301,15 +302,7 @@ struct llarp_epoll_loop : public llarp_ev_loop bool close_ev(llarp::ev_io* ev) { - if(epoll_ctl(epollfd, EPOLL_CTL_DEL, ev->fd, nullptr) == -1) - return false; - // deallocate - handlers.erase( - std::remove_if(handlers.begin(), handlers.end(), - [ev](const std::unique_ptr< llarp::ev_io >& i) -> bool { - return i.get() == ev; - })); - return true; + return epoll_ctl(epollfd, EPOLL_CTL_DEL, ev->fd, nullptr) != -1; } llarp::ev_io* @@ -391,6 +384,15 @@ struct llarp_epoll_loop : public llarp_ev_loop if(listener) { close_ev(listener); + // remove handler + auto itr = handlers.begin(); + while(itr != handlers.end()) + { + if(itr->get() == listener) + itr = handlers.erase(itr); + else + ++itr; + } l->impl = nullptr; ret = true; } diff --git a/llarp/ev_kqueue.hpp b/llarp/ev_kqueue.hpp index db0ae7f76..f225b383e 100644 --- a/llarp/ev_kqueue.hpp +++ b/llarp/ev_kqueue.hpp @@ -34,12 +34,14 @@ namespace llarp { } - void + bool tick() { if(udp->tick) udp->tick(udp); + return true; } + virtual int read(void* buf, size_t sz) { @@ -131,12 +133,12 @@ namespace llarp } } - void + bool tick() { if(t->tick) t->tick(t); - flush_write(); + return true; } int @@ -227,11 +229,10 @@ struct llarp_kqueue_loop : public llarp_ev_loop llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].udata); if(ev) { - ev->read(readbuf, sizeof(readbuf)); - } - else - { - llarp::LogWarn("event[", idx, "] udata is not an ev_io"); + if(events[idx].filter & EVFILT_READ) + ev->read(readbuf, sizeof(readbuf)); + if(events[idx].filter & EVFILT_WRITE) + ev->flush_write(); } ++idx; } @@ -261,8 +262,10 @@ struct llarp_kqueue_loop : public llarp_ev_loop llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].udata); if(ev) { - // printf("reading_ev [%x] fd[%d]\n", ev, ev->fd); - ev->read(readbuf, sizeof(readbuf)); + if(events[idx].filter & EVFILT_READ) + ev->read(readbuf, sizeof(readbuf)); + if(events[idx].filter & EVFILT_WRITE) + ev->flush_write(); } else { @@ -338,17 +341,8 @@ struct llarp_kqueue_loop : public llarp_ev_loop bool close_ev(llarp::ev_io* ev) { - EV_SET(&change, ev->fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr); - if(kevent(kqueuefd, &change, 1, nullptr, 0, nullptr) != -1) - { - handlers.erase(std::remove_if( - handlers.begin(), handlers.end(), - [ev](const std::unique_ptr< llarp::ev_io >& i) -> bool { - return i.get() == ev; - })); - return true; - } - return false; + EV_SET(&change, ev->fd, ev->flags, EV_DELETE, 0, 0, nullptr); + return kevent(kqueuefd, &change, 1, nullptr, 0, nullptr) != -1; } llarp::ev_io* @@ -365,10 +359,11 @@ struct llarp_kqueue_loop : public llarp_ev_loop bool add_ev(llarp::ev_io* ev, bool write) { + ev->flags = EVFILT_READ; if(write) - EV_SET(&change, ev->fd, EVFILT_READ | EVFILT_WRITE, EV_ADD, 0, 0, ev); - else - EV_SET(&change, ev->fd, EVFILT_READ, EV_ADD, 0, 0, ev); + ev->flags |= EVFILT_WRITE; + + EV_SET(&change, ev->fd, ev->flags, EV_ADD, 0, 0, ev); if(kevent(kqueuefd, &change, 1, nullptr, 0, nullptr) == -1) { delete ev; @@ -386,9 +381,20 @@ struct llarp_kqueue_loop : public llarp_ev_loop if(listener) { // printf("Calling close_ev for [%x] fd[%d]\n", listener, listener->fd); - ret = close_ev(listener); + ret = close_ev(listener); + // remove handler + auto itr = handlers.begin(); + while(itr != handlers.end()) + { + if(itr->get() == listener) + { + itr = handlers.erase(itr); + ret = true; + } + else + ++itr; + } l->impl = nullptr; - ret = true; } return ret; }