fix up event loop

pull/35/head
Jeff Becker 6 years ago
parent c483ec5231
commit f873b18036
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -122,7 +122,9 @@ llarp_tcp_conn_async_write(struct llarp_tcp_conn *conn, const void *pkt,
{
constexpr size_t buffsz = llarp::ev_io::WriteBuffer::BufferSize;
const byte_t *ptr = (const byte_t *)pkt;
llarp::ev_io *impl = static_cast< llarp::ev_io * >(conn->impl);
llarp::tcp_conn *impl = static_cast< llarp::tcp_conn * >(conn->impl);
if(impl->_shouldClose)
return false;
while(sz > buffsz)
{
if(!impl->queue_write((const byte_t *)ptr, buffsz))
@ -170,25 +172,26 @@ llarp_ev_tun_async_write(struct llarp_tun_io *tun, const void *buf, size_t sz)
void
llarp_tcp_conn_close(struct llarp_tcp_conn *conn)
{
if(!conn)
return;
if(conn->impl)
{
llarp::ev_io *impl = static_cast< llarp::ev_io * >(conn->impl);
// deregister and dealloc
conn->loop->close_ev(impl);
conn->impl = nullptr;
}
// call hook if needed
if(conn->closed)
conn->closed(conn);
// delete conn
delete conn;
static_cast< llarp::tcp_conn * >(conn->impl)->_shouldClose = true;
}
namespace llarp
{
bool
tcp_conn::tick()
{
if(_shouldClose)
{
if(tcp->closed)
tcp->closed(tcp);
delete tcp;
return false;
}
else if(tcp->tick)
tcp->tick(tcp);
return true;
}
int
tcp_serv::read(void *, size_t)
{

@ -85,7 +85,7 @@ namespace llarp
#ifndef _WIN32
int fd;
int flags = 0;
ev_io(int f) : fd(f)
{
}
@ -123,8 +123,12 @@ namespace llarp
return -1;
};
virtual void
tick(){};
/// return false if we want to deregister and remove ourselves
virtual bool
tick()
{
return true;
};
/// used for tun interface and tcp conn
ssize_t
@ -216,6 +220,7 @@ namespace llarp
struct tcp_conn : public ev_io
{
bool _shouldClose = false;
llarp_tcp_conn* tcp;
tcp_conn(int fd, llarp_tcp_conn* conn)
: ev_io(fd, new LosslessWriteQueue_t()), tcp(conn)
@ -246,12 +251,8 @@ namespace llarp
return 0;
}
void
tick()
{
if(tcp->tick)
tcp->tick(tcp);
}
bool
tick();
int
sendto(const sockaddr*, const void*, size_t)
@ -269,11 +270,12 @@ namespace llarp
{
}
void
bool
tick()
{
if(tcp->tick)
tcp->tick(tcp);
return true;
}
/// actually does accept() :^)
@ -318,6 +320,7 @@ struct llarp_ev_loop
virtual bool
udp_close(llarp_udp_io* l) = 0;
/// deregister event listener
virtual bool
close_ev(llarp::ev_io* ev) = 0;
@ -327,6 +330,7 @@ struct llarp_ev_loop
virtual llarp::ev_io*
bind_tcp(llarp_tcp_acceptor* tcp, const sockaddr* addr) = 0;
/// register event listener
virtual bool
add_ev(llarp::ev_io* ev, bool write = false) = 0;
@ -340,8 +344,17 @@ struct llarp_ev_loop
void
tick_listeners()
{
for(const auto& h : handlers)
h->tick();
auto itr = handlers.begin();
while(itr != handlers.end())
{
if((*itr)->tick())
++itr;
else
{
close_ev(itr->get());
itr = handlers.erase(itr);
}
}
}
};

@ -28,11 +28,12 @@ namespace llarp
{
}
void
bool
tick()
{
if(udp->tick)
udp->tick(udp);
return true;
}
int
@ -93,12 +94,12 @@ namespace llarp
return -1;
}
void
bool
tick()
{
if(t->tick)
t->tick(t);
flush_write();
return true;
}
void
@ -201,14 +202,14 @@ struct llarp_epoll_loop : public llarp_ev_loop
llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr);
if(ev)
{
if(events[idx].events & EPOLLOUT)
{
ev->flush_write();
}
if(events[idx].events & EPOLLIN)
{
ev->read(readbuf, sizeof(readbuf));
}
if(events[idx].events & EPOLLOUT)
{
ev->flush_write();
}
}
++idx;
}
@ -234,14 +235,14 @@ struct llarp_epoll_loop : public llarp_ev_loop
llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].data.ptr);
if(ev)
{
if(events[idx].events & EPOLLOUT)
{
ev->flush_write();
}
if(events[idx].events & EPOLLIN)
{
ev->read(readbuf, sizeof(readbuf));
}
if(events[idx].events & EPOLLOUT)
{
ev->flush_write();
}
}
++idx;
}
@ -301,15 +302,7 @@ struct llarp_epoll_loop : public llarp_ev_loop
bool
close_ev(llarp::ev_io* ev)
{
if(epoll_ctl(epollfd, EPOLL_CTL_DEL, ev->fd, nullptr) == -1)
return false;
// deallocate
handlers.erase(
std::remove_if(handlers.begin(), handlers.end(),
[ev](const std::unique_ptr< llarp::ev_io >& i) -> bool {
return i.get() == ev;
}));
return true;
return epoll_ctl(epollfd, EPOLL_CTL_DEL, ev->fd, nullptr) != -1;
}
llarp::ev_io*
@ -391,6 +384,15 @@ struct llarp_epoll_loop : public llarp_ev_loop
if(listener)
{
close_ev(listener);
// remove handler
auto itr = handlers.begin();
while(itr != handlers.end())
{
if(itr->get() == listener)
itr = handlers.erase(itr);
else
++itr;
}
l->impl = nullptr;
ret = true;
}

@ -34,12 +34,14 @@ namespace llarp
{
}
void
bool
tick()
{
if(udp->tick)
udp->tick(udp);
return true;
}
virtual int
read(void* buf, size_t sz)
{
@ -131,12 +133,12 @@ namespace llarp
}
}
void
bool
tick()
{
if(t->tick)
t->tick(t);
flush_write();
return true;
}
int
@ -227,11 +229,10 @@ struct llarp_kqueue_loop : public llarp_ev_loop
llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].udata);
if(ev)
{
ev->read(readbuf, sizeof(readbuf));
}
else
{
llarp::LogWarn("event[", idx, "] udata is not an ev_io");
if(events[idx].filter & EVFILT_READ)
ev->read(readbuf, sizeof(readbuf));
if(events[idx].filter & EVFILT_WRITE)
ev->flush_write();
}
++idx;
}
@ -261,8 +262,10 @@ struct llarp_kqueue_loop : public llarp_ev_loop
llarp::ev_io* ev = static_cast< llarp::ev_io* >(events[idx].udata);
if(ev)
{
// printf("reading_ev [%x] fd[%d]\n", ev, ev->fd);
ev->read(readbuf, sizeof(readbuf));
if(events[idx].filter & EVFILT_READ)
ev->read(readbuf, sizeof(readbuf));
if(events[idx].filter & EVFILT_WRITE)
ev->flush_write();
}
else
{
@ -338,17 +341,8 @@ struct llarp_kqueue_loop : public llarp_ev_loop
bool
close_ev(llarp::ev_io* ev)
{
EV_SET(&change, ev->fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr);
if(kevent(kqueuefd, &change, 1, nullptr, 0, nullptr) != -1)
{
handlers.erase(std::remove_if(
handlers.begin(), handlers.end(),
[ev](const std::unique_ptr< llarp::ev_io >& i) -> bool {
return i.get() == ev;
}));
return true;
}
return false;
EV_SET(&change, ev->fd, ev->flags, EV_DELETE, 0, 0, nullptr);
return kevent(kqueuefd, &change, 1, nullptr, 0, nullptr) != -1;
}
llarp::ev_io*
@ -365,10 +359,11 @@ struct llarp_kqueue_loop : public llarp_ev_loop
bool
add_ev(llarp::ev_io* ev, bool write)
{
ev->flags = EVFILT_READ;
if(write)
EV_SET(&change, ev->fd, EVFILT_READ | EVFILT_WRITE, EV_ADD, 0, 0, ev);
else
EV_SET(&change, ev->fd, EVFILT_READ, EV_ADD, 0, 0, ev);
ev->flags |= EVFILT_WRITE;
EV_SET(&change, ev->fd, ev->flags, EV_ADD, 0, 0, ev);
if(kevent(kqueuefd, &change, 1, nullptr, 0, nullptr) == -1)
{
delete ev;
@ -386,9 +381,20 @@ struct llarp_kqueue_loop : public llarp_ev_loop
if(listener)
{
// printf("Calling close_ev for [%x] fd[%d]\n", listener, listener->fd);
ret = close_ev(listener);
ret = close_ev(listener);
// remove handler
auto itr = handlers.begin();
while(itr != handlers.end())
{
if(itr->get() == listener)
{
itr = handlers.erase(itr);
ret = true;
}
else
++itr;
}
l->impl = nullptr;
ret = true;
}
return ret;
}

Loading…
Cancel
Save