diff --git a/.gitignore b/.gitignore index 5b39871..eef930e 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,7 @@ /tests/testproxy/lp/lp /nbproject/ +/tests/testproxy/lp/nbproject/ # CVS default ignores begin tags diff --git a/tests/testproxy/lp/log.h b/tests/testproxy/lp/log.h index 7eedebc..d5c7cb0 100644 --- a/tests/testproxy/lp/log.h +++ b/tests/testproxy/lp/log.h @@ -123,6 +123,8 @@ extern logger_t *connect_log; #define log_connect_write_free(buf, sz) \ logger_write_freebuf(connect_log, NULL, 0, (buf), (sz)) +#define log_err_level(level, str) { log_err_level_printf((level), (str"\n")); log_fine((str)); } + int log_stats(const char *); int log_conn(const char *); diff --git a/tests/testproxy/lp/prototcp.c b/tests/testproxy/lp/prototcp.c index 863d1b7..437dcc7 100644 --- a/tests/testproxy/lp/prototcp.c +++ b/tests/testproxy/lp/prototcp.c @@ -33,6 +33,10 @@ #include #include +#ifdef __linux__ +#include +#endif /* __linux__ */ + /* * Set up a bufferevent structure for either a dst or src connection, * optionally with or without SSL. Sets all callbacks, enables read @@ -49,10 +53,9 @@ prototcp_bufferevent_setup(pxy_conn_ctx_t *ctx, evutil_socket_t fd) { log_finest_va("ENTER, fd=%d", fd); - struct bufferevent *bev = bufferevent_socket_new(ctx->evbase, fd, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE); + struct bufferevent *bev = bufferevent_socket_new(ctx->thr->evbase, fd, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE); if (!bev) { - log_err_level_printf(LOG_CRIT, "Error creating bufferevent socket\n"); - log_fine_va("bufferevent_socket_new failed, fd=%d", fd); + log_err_level(LOG_CRIT, "Error creating bufferevent socket"); return NULL; } return bev; @@ -96,38 +99,180 @@ prototcp_setup_dst(pxy_conn_ctx_t *ctx) return 0; } -static int NONNULL(1) WUNRES -prototcp_conn_connect(pxy_conn_ctx_t *ctx) +#if defined(__APPLE__) || defined(__FreeBSD__) +#define getdtablecount() 0 + +/* + * Copied from: + * opensmtpd-201801101641p1/openbsd-compat/imsg.c + * + * Copyright (c) 2003, 2004 Henning Brauer + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ +static int +available_fds(unsigned int n) { - log_finest("ENTER"); + unsigned int i; + int ret, fds[256]; - if (prototcp_setup_src(ctx) == -1) { + if (n > (sizeof(fds)/sizeof(fds[0]))) return -1; + + ret = 0; + for (i = 0; i < n; i++) { + fds[i] = -1; + if ((fds[i] = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { + ret = -1; + break; + } } - if (prototcp_setup_dst(ctx) == -1) { - return -1; + for (i = 0; i < n && fds[i] >= 0; i++) + close(fds[i]); + + return ret; +} +#endif /* __APPLE__ || __FreeBSD__ */ + +#ifdef __linux__ +/* + * Copied from: + * https://github.com/tmux/tmux/blob/master/compat/getdtablecount.c + * + * Copyright (c) 2017 Nicholas Marriott + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER + * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING + * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ +int +getdtablecount() +{ + char path[PATH_MAX]; + glob_t g; + int n = 0; + + if (snprintf(path, sizeof path, "/proc/%ld/fd/*", (long)getpid()) < 0) { + log_err_level_printf(LOG_CRIT, "snprintf overflow\n"); + return 0; } + if (glob(path, 0, NULL, &g) == 0) + n = g.gl_pathc; + globfree(&g); + return n; +} +#endif /* __linux__ */ - // Conn setup is successful, so add the conn to the conn list of its thread now - pxy_thrmgr_add_conn(ctx); +/* + * Check if we are out of file descriptors to close the conn, or else libevent will crash us + * @attention We cannot guess the number of children in a connection at conn setup time. So, FD_RESERVE is just a ball park figure. + * But what if a connection passes the check below, but eventually tries to create more children than FD_RESERVE allows for? This will crash us the same. + * Beware, this applies to all current conns, not just the last connection setup. + * For example, 20x conns pass the check below before creating any children, at which point we reach the last FD_RESERVE fds, + * then they all start creating children, which crashes us again. + * So, no matter how large an FD_RESERVE we choose, there will always be a risk of running out of fds, if we check the number of fds during parent conn setup only. + * If we are left with less than FD_RESERVE fds, we should not create more children than FD_RESERVE allows for either. + * Therefore, we check if we are out of fds in pxy_listener_acceptcb_child() and close the conn there too. + * @attention These checks are expected to slow us further down, but it is critical to avoid a crash in case we run out of fds. + */ +static int +check_fd_usage( +#ifdef DEBUG_PROXY + pxy_conn_ctx_t *ctx +#endif /* DEBUG_PROXY */ + ) +{ + int dtable_count = getdtablecount(); - bufferevent_setcb(ctx->src.bev, pxy_bev_readcb, pxy_bev_writecb, pxy_bev_eventcb, ctx); - bufferevent_setcb(ctx->dst.bev, pxy_bev_readcb, pxy_bev_writecb, pxy_bev_eventcb, ctx); + log_finer_va("descriptor_table_size=%d, dtablecount=%d, reserve=%d", descriptor_table_size, dtable_count, FD_RESERVE); - log_finer("Enabling src"); + if (dtable_count + FD_RESERVE >= descriptor_table_size) { + goto out; + } + +#if defined(__APPLE__) || defined(__FreeBSD__) + if (available_fds(FD_RESERVE) == -1) { + goto out; + } +#endif /* __APPLE__ || __FreeBSD__ */ - // Now open the gates - bufferevent_enable(ctx->src.bev, EV_READ|EV_WRITE); return 0; +out: + errno = EMFILE; + log_err_level_printf(LOG_CRIT, "Out of file descriptors\n"); + return -1; } -static void -prototcp_fd_readcb(UNUSED evutil_socket_t fd, UNUSED short what, void *arg) +void +prototcp_connect(UNUSED evutil_socket_t fd, UNUSED short what, void *arg) { pxy_conn_ctx_t *ctx = arg; + log_finest("ENTER"); - pxy_conn_connect(ctx); + + event_free(ctx->ev); + ctx->ev = NULL; + + // Always keep thr load and conns list in sync + ctx->thr->load++; + + ctx->next = ctx->thr->conns; + ctx->thr->conns = ctx; + if (ctx->next) + ctx->next->prev = ctx; + + if (check_fd_usage( +#ifdef DEBUG_PROXY + ctx +#endif /* DEBUG_PROXY */ + ) == -1) { + goto out; + } + + if (sys_sockaddr_str((struct sockaddr *)&ctx->srcaddr, ctx->srcaddrlen, &ctx->srchost_str, &ctx->srcport_str) != 0) { + log_err_level_printf(LOG_CRIT, "Aborting connection setup (out of memory)!\n"); + goto out; + } + log_finest_va("srcaddr= [%s]:%s", ctx->srchost_str, ctx->srcport_str); + + if (prototcp_setup_src(ctx) == -1) { + goto out; + } + + if (prototcp_setup_dst(ctx) == -1) { + goto out; + } + + bufferevent_setcb(ctx->src.bev, pxy_bev_readcb, pxy_bev_writecb, pxy_bev_eventcb, ctx); + bufferevent_setcb(ctx->dst.bev, pxy_bev_readcb, pxy_bev_writecb, pxy_bev_eventcb, ctx); + + log_finer("Enabling src"); + + // Now open the gates + bufferevent_enable(ctx->src.bev, EV_READ|EV_WRITE); + return; +out: + evutil_closesocket(ctx->fd); + pxy_conn_free(ctx, 1); } static int @@ -239,8 +384,7 @@ prototcp_bev_readcb_src(struct bufferevent *bev, pxy_conn_ctx_t *ctx) /* initiate connection */ bufferevent_enable(ctx->dst.bev, EV_READ|EV_WRITE); if (bufferevent_socket_connect(ctx->dst.bev, (struct sockaddr *)&ctx->dstaddr, ctx->dstaddrlen) == -1) { - log_err_level_printf(LOG_CRIT, "prototcp_bev_readcb_src: bufferevent_socket_connect for dst failed\n"); - log_fine("bufferevent_socket_connect for dst failed"); + log_err_level(LOG_CRIT, "bufferevent_socket_connect for dst failed"); } } @@ -362,8 +506,7 @@ prototcp_bev_eventcb_eof_src(struct bufferevent *bev, pxy_conn_ctx_t *ctx) #endif /* DEBUG_PROXY */ if (!ctx->src_connected) { - log_err_level_printf(LOG_WARNING, "EOF on connection before connection establishment\n"); - log_fine("EOF on connection before connection establishment"); + log_err_level(LOG_WARNING, "EOF on connection before connection establishment"); ctx->dst.closed = 1; } else if (!ctx->dst.closed) { log_finest("!dst->closed, terminate conn"); @@ -385,8 +528,7 @@ prototcp_bev_eventcb_eof_dst(struct bufferevent *bev, pxy_conn_ctx_t *ctx) #endif /* DEBUG_PROXY */ if (!ctx->dst_connected) { - log_err_level_printf(LOG_WARNING, "EOF on connection before connection establishment\n"); - log_fine("EOF on connection before connection establishment"); + log_err_level(LOG_WARNING, "EOF on connection before connection establishment"); ctx->src.closed = 1; } else if (!ctx->src.closed) { log_finest("!src->closed, terminate conn"); @@ -497,13 +639,9 @@ protocol_t prototcp_setup(pxy_conn_ctx_t *ctx) { ctx->protoctx->proto = PROTO_TCP; - ctx->protoctx->connectcb = prototcp_conn_connect; - ctx->protoctx->fd_readcb = prototcp_fd_readcb; - ctx->protoctx->bev_readcb = prototcp_bev_readcb; ctx->protoctx->bev_writecb = prototcp_bev_writecb; ctx->protoctx->bev_eventcb = prototcp_bev_eventcb; - return PROTO_TCP; } diff --git a/tests/testproxy/lp/prototcp.h b/tests/testproxy/lp/prototcp.h index 2d8bd96..46fee4e 100644 --- a/tests/testproxy/lp/prototcp.h +++ b/tests/testproxy/lp/prototcp.h @@ -32,6 +32,7 @@ #include "pxyconn.h" +void prototcp_connect(evutil_socket_t, short, void *); protocol_t prototcp_setup(pxy_conn_ctx_t *) NONNULL(1); #endif /* PROTOTCP_H */ diff --git a/tests/testproxy/lp/proxy.c b/tests/testproxy/lp/proxy.c index 0c62724..0a1c860 100644 --- a/tests/testproxy/lp/proxy.c +++ b/tests/testproxy/lp/proxy.c @@ -28,6 +28,7 @@ #include "proxy.h" +#include "prototcp.h" #include "privsep.h" #include "pxythrmgr.h" #include "pxyconn.h" @@ -51,7 +52,6 @@ #include #include - /* * Proxy engine, built around libevent 2.x. */ @@ -92,8 +92,85 @@ proxy_listener_ctx_free(proxy_listener_ctx_t *ctx) free(ctx); } +static protocol_t NONNULL(1) +proxy_setup_proto(pxy_conn_ctx_t *ctx) +{ + ctx->protoctx = malloc(sizeof(proto_ctx_t)); + if (!ctx->protoctx) { + return PROTO_ERROR; + } + memset(ctx->protoctx, 0, sizeof(proto_ctx_t)); + + protocol_t proto = prototcp_setup(ctx); + + if (proto == PROTO_ERROR) { + free(ctx->protoctx); + } + return proto; +} + +static pxy_conn_ctx_t * MALLOC NONNULL(2,3) +proxy_conn_ctx_new(evutil_socket_t fd, + pxy_thrmgr_ctx_t *thrmgr, opts_t *opts) +{ + log_finest_main_va("ENTER, fd=%d", fd); + + pxy_conn_ctx_t *ctx = malloc(sizeof(pxy_conn_ctx_t)); + if (!ctx) { + return NULL; + } + memset(ctx, 0, sizeof(pxy_conn_ctx_t)); + + ctx->id = thrmgr->conn_count++; + + log_finest_main_va("id=%llu, fd=%d", ctx->id, fd); + + ctx->fd = fd; + ctx->thrmgr = thrmgr; + + ctx->proto = proxy_setup_proto(ctx); + if (ctx->proto == PROTO_ERROR) { + free(ctx); + return NULL; + } + + ctx->opts = opts; + + ctx->ctime = time(NULL); + ctx->atime = ctx->ctime; + + ctx->next = NULL; + return ctx; +} + +/* + * Does minimal clean-up, called on error by proxy_listener_acceptcb() only. + * We call this function instead of pxy_conn_ctx_free(), because + * proxy_listener_acceptcb() runs on thrmgr, whereas pxy_conn_ctx_free() + * runs on conn handling thr. This is necessary to prevent multithreading issues. + */ +static void NONNULL(1) +proxy_conn_ctx_free(pxy_conn_ctx_t *ctx) +{ + log_finest("ENTER"); + + if (ctx->ev) { + event_free(ctx->ev); + } + free(ctx->protoctx); + free(ctx); +} + /* * Callback for accept events on the socket listener bufferevent. + * Called when a new incoming connection has been accepted. + * Initiates the connection to the server. The incoming connection + * from the client is not being activated until we have a successful + * connection to the server, because we need the server's certificate + * in order to set up the SSL session to the client. + * For consistency, plain TCP works the same way, even if we could + * start reading from the client while waiting on the connection to + * the server to connect. */ static void proxy_listener_acceptcb(UNUSED struct evconnlistener *listener, @@ -103,7 +180,36 @@ proxy_listener_acceptcb(UNUSED struct evconnlistener *listener, { proxy_listener_ctx_t *lctx = arg; log_finest_main_va("ENTER, fd=%d", fd); - pxy_conn_setup(fd, peeraddr, peeraddrlen, lctx->thrmgr, lctx->opts); + + /* create per connection state */ + pxy_conn_ctx_t *ctx = proxy_conn_ctx_new(fd, lctx->thrmgr, lctx->opts); + if (!ctx) { + log_err_level_printf(LOG_CRIT, "Error allocating memory\n"); + evutil_closesocket(fd); + return; + } + + // Choose the conn handling thr + pxy_thrmgr_attach(ctx); + + ctx->srcaddrlen = peeraddrlen; + memcpy(&ctx->srcaddr, peeraddr, ctx->srcaddrlen); + + // Switch from thrmgr to connection handling thread, i.e. change the event base, asap + // This prevents possible multithreading issues between thrmgr and conn handling threads + ctx->ev = event_new(ctx->thr->evbase, -1, 0, prototcp_connect, ctx); + if (!ctx->ev) { + log_err_level(LOG_CRIT, "Error creating connect event, aborting connection"); + goto out; + } + // The only purpose of this event is to change the event base, so it is a one-shot event + if (event_add(ctx->ev, NULL) == -1) + goto out; + event_active(ctx->ev, 0, 0); + return; +out: + evutil_closesocket(fd); + proxy_conn_ctx_free(ctx); } /* diff --git a/tests/testproxy/lp/pxyconn.c b/tests/testproxy/lp/pxyconn.c index 2d0e57b..a692fbd 100644 --- a/tests/testproxy/lp/pxyconn.c +++ b/tests/testproxy/lp/pxyconn.c @@ -29,8 +29,6 @@ #include "pxyconn.h" -#include "prototcp.h" - #include "privsep.h" #include "sys.h" #include "log.h" @@ -44,20 +42,6 @@ #include -#ifdef __linux__ -#include -#endif /* __linux__ */ - -#include -#include -#include -#include -#include -#include -#ifdef __OpenBSD__ -#include -#endif /* __OpenBSD__ */ - /* * Maximum size of data to buffer per connection direction before * temporarily stopping to read data from the other end. @@ -72,60 +56,6 @@ char *protocol_names[] = { "TCP", // = 0 }; -static protocol_t NONNULL(1) -pxy_setup_proto(pxy_conn_ctx_t *ctx) -{ - ctx->protoctx = malloc(sizeof(proto_ctx_t)); - if (!ctx->protoctx) { - return PROTO_ERROR; - } - memset(ctx->protoctx, 0, sizeof(proto_ctx_t)); - - // Default to tcp - protocol_t proto = prototcp_setup(ctx); - - if (proto == PROTO_ERROR) { - free(ctx->protoctx); - } - return proto; -} - -static pxy_conn_ctx_t * MALLOC NONNULL(2,3) -pxy_conn_ctx_new(evutil_socket_t fd, - pxy_thrmgr_ctx_t *thrmgr, opts_t *opts) -{ - log_finest_main_va("ENTER, fd=%d", fd); - - pxy_conn_ctx_t *ctx = malloc(sizeof(pxy_conn_ctx_t)); - if (!ctx) { - return NULL; - } - memset(ctx, 0, sizeof(pxy_conn_ctx_t)); - - ctx->id = thrmgr->conn_count++; - - log_finest_main_va("id=%llu, fd=%d", ctx->id, fd); - - ctx->fd = fd; - ctx->thrmgr = thrmgr; - - ctx->proto = pxy_setup_proto(ctx); - if (ctx->proto == PROTO_ERROR) { - free(ctx); - return NULL; - } - - ctx->opts = opts; - - ctx->ctime = time(NULL); - ctx->atime = ctx->ctime; - - ctx->next = NULL; - - pxy_thrmgr_attach(ctx); - return ctx; -} - void pxy_conn_ctx_free(pxy_conn_ctx_t *ctx, int by_requestor) { @@ -137,7 +67,7 @@ pxy_conn_ctx_free(pxy_conn_ctx_t *ctx, int by_requestor) } } - pxy_thrmgr_detach(ctx); + pxy_thr_detach(ctx); if (ctx->srchost_str) { free(ctx->srchost_str); @@ -151,9 +81,8 @@ pxy_conn_ctx_free(pxy_conn_ctx_t *ctx, int by_requestor) if (ctx->dstport_str) { free(ctx->dstport_str); } - // If the proto doesn't have special args, proto_free() callback is NULL - if (ctx->protoctx->proto_free) { - ctx->protoctx->proto_free(ctx); + if (ctx->ev) { + event_free(ctx->ev); } free(ctx->protoctx); free(ctx); @@ -400,129 +329,6 @@ pxy_discard_inbuf(struct bufferevent *bev) evbuffer_drain(inbuf, inbuf_size); } -#if defined(__APPLE__) || defined(__FreeBSD__) -#define getdtablecount() 0 - -/* - * Copied from: - * opensmtpd-201801101641p1/openbsd-compat/imsg.c - * - * Copyright (c) 2003, 2004 Henning Brauer - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - */ -static int -available_fds(unsigned int n) -{ - unsigned int i; - int ret, fds[256]; - - if (n > (sizeof(fds)/sizeof(fds[0]))) - return -1; - - ret = 0; - for (i = 0; i < n; i++) { - fds[i] = -1; - if ((fds[i] = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { - ret = -1; - break; - } - } - - for (i = 0; i < n && fds[i] >= 0; i++) - close(fds[i]); - - return ret; -} -#endif /* __APPLE__ || __FreeBSD__ */ - -#ifdef __linux__ -/* - * Copied from: - * https://github.com/tmux/tmux/blob/master/compat/getdtablecount.c - * - * Copyright (c) 2017 Nicholas Marriott - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER - * IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING - * OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - */ -int -getdtablecount() -{ - char path[PATH_MAX]; - glob_t g; - int n = 0; - - if (snprintf(path, sizeof path, "/proc/%ld/fd/*", (long)getpid()) < 0) { - log_err_level_printf(LOG_CRIT, "snprintf overflow\n"); - return 0; - } - if (glob(path, 0, NULL, &g) == 0) - n = g.gl_pathc; - globfree(&g); - return n; -} -#endif /* __linux__ */ - -/* - * Check if we are out of file descriptors to close the conn, or else libevent will crash us - * @attention We cannot guess the number of children in a connection at conn setup time. So, FD_RESERVE is just a ball park figure. - * But what if a connection passes the check below, but eventually tries to create more children than FD_RESERVE allows for? This will crash us the same. - * Beware, this applies to all current conns, not just the last connection setup. - * For example, 20x conns pass the check below before creating any children, at which point we reach the last FD_RESERVE fds, - * then they all start creating children, which crashes us again. - * So, no matter how large an FD_RESERVE we choose, there will always be a risk of running out of fds, if we check the number of fds during parent conn setup only. - * If we are left with less than FD_RESERVE fds, we should not create more children than FD_RESERVE allows for either. - * Therefore, we check if we are out of fds in pxy_listener_acceptcb_child() and close the conn there too. - * @attention These checks are expected to slow us further down, but it is critical to avoid a crash in case we run out of fds. - */ -static int -check_fd_usage( -#ifdef DEBUG_PROXY - evutil_socket_t fd -#endif /* DEBUG_PROXY */ - ) -{ - int dtable_count = getdtablecount(); - - log_finer_main_va("descriptor_table_size=%d, dtablecount=%d, reserve=%d, fd=%d", descriptor_table_size, dtable_count, FD_RESERVE, fd); - - if (dtable_count + FD_RESERVE >= descriptor_table_size) { - goto out; - } - -#if defined(__APPLE__) || defined(__FreeBSD__) - if (available_fds(FD_RESERVE) == -1) { - goto out; - } -#endif /* __APPLE__ || __FreeBSD__ */ - - return 0; -out: - errno = EMFILE; - log_err_level_printf(LOG_CRIT, "Out of file descriptors\n"); - return -1; -} - int pxy_try_close_conn_end(pxy_conn_desc_t *conn_end, pxy_conn_ctx_t *ctx) { @@ -679,80 +485,4 @@ pxy_bev_eventcb(struct bufferevent *bev, short events, void *arg) } } -/* - * Complete the connection. - */ -void -pxy_conn_connect(pxy_conn_ctx_t *ctx) -{ - log_finest("ENTER"); - - if (ctx->protoctx->connectcb(ctx) == -1) { - // @attention Do not try to close conns or do anything else with conn ctx on the thrmgr thread after setting event callbacks and/or socket connect. - // The return value of -1 from connectcb indicates that there was a fatal error before event callbacks were set, so we can terminate the connection. - // Otherwise, it is up to the event callbacks to terminate the connection. This is necessary to avoid multithreading issues. - if (ctx->term || ctx->enomem) { - pxy_conn_free(ctx, ctx->term ? ctx->term_requestor : 1); - } - } - // @attention Do not do anything with the conn ctx after this point on the thrmgr thread -} - -/* - * Callback for accept events on the socket listener bufferevent. - * Called when a new incoming connection has been accepted. - * Initiates the connection to the server. The incoming connection - * from the client is not being activated until we have a successful - * connection to the server, because we need the server's certificate - * in order to set up the SSL session to the client. - * For consistency, plain TCP works the same way, even if we could - * start reading from the client while waiting on the connection to - * the server to connect. - */ -void -pxy_conn_setup(evutil_socket_t fd, - struct sockaddr *peeraddr, int peeraddrlen, - pxy_thrmgr_ctx_t *thrmgr, opts_t *opts) -{ -#ifdef DEBUG_PROXY - log_finest_main_va("ENTER, fd=%d", fd); - - char *host, *port; - if (sys_sockaddr_str(peeraddr, peeraddrlen, &host, &port) == 0) { - log_finest_main_va("peer addr=[%s]:%s, fd=%d", host, port, fd); - free(host); - free(port); - } -#endif /* DEBUG_PROXY */ - - if (check_fd_usage( -#ifdef DEBUG_PROXY - fd -#endif /* DEBUG_PROXY */ - ) == -1) { - evutil_closesocket(fd); - return; - } - - /* create per connection state and attach to thread */ - pxy_conn_ctx_t *ctx = pxy_conn_ctx_new(fd, thrmgr, opts); - if (!ctx) { - log_err_level_printf(LOG_CRIT, "Error allocating memory\n"); - evutil_closesocket(fd); - return; - } - - if (sys_sockaddr_str(peeraddr, peeraddrlen, &ctx->srchost_str, &ctx->srcport_str) != 0) { - log_err_level_printf(LOG_CRIT, "Aborting connection setup (out of memory)!\n"); - goto out; - } - - ctx->protoctx->fd_readcb(ctx->fd, 0, ctx); - return; - -out: - evutil_closesocket(fd); - pxy_conn_ctx_free(ctx, 1); -} - /* vim: set noet ft=c: */ diff --git a/tests/testproxy/lp/pxyconn.h b/tests/testproxy/lp/pxyconn.h index b997248..43130f8 100644 --- a/tests/testproxy/lp/pxyconn.h +++ b/tests/testproxy/lp/pxyconn.h @@ -49,16 +49,11 @@ #define WANT_CONNECT_LOG(ctx) ((ctx)->opts->connectlog||!(ctx)->opts->detach||(ctx)->opts->statslog) #define WANT_CONTENT_LOG(ctx) ((ctx)->opts->contentlog) -typedef void (*fd_readcb_func_t)(evutil_socket_t, short, void *); -typedef int (*connect_func_t)(pxy_conn_ctx_t *); - typedef void (*callback_func_t)(struct bufferevent *, void *); typedef void (*eventcb_func_t)(struct bufferevent *, short, void *); typedef void (*bev_free_func_t)(struct bufferevent *, pxy_conn_ctx_t *); -typedef void (*proto_free_func_t)(pxy_conn_ctx_t *); - /* * Proxy connection context state, describes a proxy connection * with source and destination socket bufferevents, SSL context and @@ -82,18 +77,9 @@ typedef struct proto_ctx proto_ctx_t; struct proto_ctx { protocol_t proto; - - connect_func_t connectcb; - fd_readcb_func_t fd_readcb; - callback_func_t bev_readcb; callback_func_t bev_writecb; eventcb_func_t bev_eventcb; - - proto_free_func_t proto_free; - - // For protocol specific fields, if any - void *arg; }; /* connection state consisting of two connection descriptors, @@ -128,13 +114,16 @@ struct pxy_conn_ctx { unsigned int term_requestor : 1; /* 1 client, 0 server side */ unsigned int seen_sslproxy_line : 1; /* 1 if seen sslproxy line */ - /* destination address */ + struct event *ev; + + /* original source and destination address */ + struct sockaddr_storage srcaddr; + socklen_t srcaddrlen; struct sockaddr_storage dstaddr; socklen_t dstaddrlen; // Thread that the conn is attached to pxy_thr_ctx_t *thr; - unsigned int in_thr_conns : 1; /* 1 to prevent adding twice */ // Unique id of the conn long long unsigned int id; @@ -142,8 +131,6 @@ struct pxy_conn_ctx { pxy_thrmgr_ctx_t *thrmgr; opts_t *opts; - struct event_base *evbase; - evutil_socket_t dst_fd; // Conn create time @@ -155,6 +142,7 @@ struct pxy_conn_ctx { // Per-thread conn list, used to determine idle and expired conns, and to close them pxy_conn_ctx_t *next; + pxy_conn_ctx_t *prev; }; void pxy_log_connect(pxy_conn_ctx_t *) NONNULL(1); @@ -184,11 +172,6 @@ void pxy_bev_readcb(struct bufferevent *, void *); void pxy_bev_writecb(struct bufferevent *, void *); void pxy_bev_eventcb(struct bufferevent *, short, void *); -void pxy_conn_connect(pxy_conn_ctx_t *) NONNULL(1); -void pxy_conn_setup(evutil_socket_t, struct sockaddr *, int, - pxy_thrmgr_ctx_t *, opts_t *) - NONNULL(2,4,5); - #endif /* !PXYCONN_H */ /* vim: set noet ft=c: */ diff --git a/tests/testproxy/lp/pxythr.c b/tests/testproxy/lp/pxythr.c new file mode 100644 index 0000000..0157d7e --- /dev/null +++ b/tests/testproxy/lp/pxythr.c @@ -0,0 +1,177 @@ +/*- + * SSLsplit - transparent SSL/TLS interception + * https://www.roe.ch/SSLsplit + * + * Copyright (c) 2017-2020, Soner Tari . + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "pxythr.h" + +#include "log.h" +#include "pxyconn.h" + +#include +#include + +/* + * Detach a connection from a thread by index. + * This function cannot fail. + */ +void +pxy_thr_detach(pxy_conn_ctx_t *ctx) +{ + assert(ctx != NULL); + // If this function is called, the thr conns list cannot be empty + assert(ctx->thr->conns != NULL); + + log_finest("Removing conn"); + + ctx->thr->load--; + + if (ctx->prev) { + ctx->prev->next = ctx->next; + } else { + ctx->thr->conns = ctx->next; + } + if (ctx->next) + ctx->next->prev = ctx->prev; +} + +/* + * Proxy thread manager: manages the connection handling worker threads + * and the per-thread resources (i.e. event bases). The load is shared + * across num_cpu * 2 connection handling threads, using the number of + * currently assigned connections as the sole metric. + * + * The attach and detach functions are thread-safe. + */ + +static void +pxy_thr_print_thr_info(pxy_thr_ctx_t *tctx) +{ + log_finest_main_va("thr=%d, load=%lu", tctx->thridx, tctx->load); + + unsigned int idx = 1; + evutil_socket_t max_fd = 0; + time_t max_atime = 0; + time_t max_ctime = 0; + + char *smsg = NULL; + + if (tctx->conns) { + time_t now = time(NULL); + + pxy_conn_ctx_t *ctx = tctx->conns; + while (ctx) { + time_t atime = now - ctx->atime; + time_t ctime = now - ctx->ctime; + + log_finest_main_va("CONN: thr=%d, id=%u, fd=%d, dst=%d, p=%d-%d, at=%lld ct=%lld, src_addr=%s:%s, dst_addr=%s:%s", + tctx->thridx, idx, ctx->fd, ctx->dst_fd, ctx->src.closed, ctx->dst.closed, (long long)atime, (long long)ctime, + STRORDASH(ctx->srchost_str), STRORDASH(ctx->srcport_str), STRORDASH(ctx->dsthost_str), STRORDASH(ctx->dstport_str)); + + max_fd = MAX(max_fd, MAX(ctx->fd, ctx->dst_fd)); + max_atime = MAX(max_atime, atime); + max_ctime = MAX(max_ctime, ctime); + + idx++; + ctx = ctx->next; + } + } + + log_finest_main_va("STATS: thr=%d, mld=%zu, mfd=%d, mat=%lld, mct=%lld, iib=%llu, iob=%llu, eib=%llu, eob=%llu, swm=%zu, uwm=%zu, err=%zu, si=%u", + tctx->thridx, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes, + tctx->set_watermarks, tctx->unset_watermarks, tctx->errors, tctx->stats_id); + + if (asprintf(&smsg, "STATS: thr=%d, mld=%zu, mfd=%d, mat=%lld, mct=%lld, iib=%llu, iob=%llu, eib=%llu, eob=%llu, swm=%zu, uwm=%zu, err=%zu, si=%u\n", + tctx->thridx, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes, + tctx->set_watermarks, tctx->unset_watermarks, tctx->errors, tctx->stats_id) < 0) { + return; + } + + if (log_stats(smsg) == -1) { + log_err_level_printf(LOG_WARNING, "Stats logging failed\n"); + } + free(smsg); + + tctx->stats_id++; + + tctx->errors = 0; + tctx->set_watermarks = 0; + tctx->unset_watermarks = 0; + + tctx->intif_in_bytes = 0; + tctx->intif_out_bytes = 0; + tctx->extif_in_bytes = 0; + tctx->extif_out_bytes = 0; + + // Reset these stats with the current values (do not reset to 0 directly, there may be active conns) + tctx->max_fd = max_fd; + tctx->max_load = tctx->load; +} + +/* + * Recurring timer event to prevent the event loops from exiting when + * they run out of events. + */ +static void +pxy_thr_timer_cb(UNUSED evutil_socket_t fd, UNUSED short what, UNUSED void *arg) +{ + pxy_thr_ctx_t *ctx = arg; + + log_finest_main_va("thr=%d, load=%lu, to=%u", ctx->thridx, ctx->load, ctx->timeout_count); + + // @attention Print thread info only if stats logging is enabled, if disabled debug logs are not printed either + if (ctx->thrmgr->opts->statslog) { + ctx->timeout_count++; + if (ctx->timeout_count >= ctx->thrmgr->opts->stats_period) { + ctx->timeout_count = 0; + pxy_thr_print_thr_info(ctx); + } + } +} + +/* + * Thread entry point; runs the event loop of the event base. + * Does not exit until the libevent loop is broken explicitly. + */ +void * +pxy_thr(void *arg) +{ + pxy_thr_ctx_t *ctx = arg; + struct timeval timer_delay = {10, 0}; + struct event *ev; + + ev = event_new(ctx->evbase, -1, EV_PERSIST, pxy_thr_timer_cb, ctx); + if (!ev) + return NULL; + evtimer_add(ev, &timer_delay); + ctx->running = 1; + event_base_dispatch(ctx->evbase); + event_free(ev); + + return NULL; +} + +/* vim: set noet ft=c: */ diff --git a/tests/testproxy/lp/pxythr.h b/tests/testproxy/lp/pxythr.h new file mode 100644 index 0000000..cff6dfd --- /dev/null +++ b/tests/testproxy/lp/pxythr.h @@ -0,0 +1,76 @@ +/*- + * SSLsplit - transparent SSL/TLS interception + * https://www.roe.ch/SSLsplit + * + * Copyright (c) 2017-2020, Soner Tari . + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS ``AS IS'' + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef PXYTHR_H +#define PXYTHR_H + +#include "attrib.h" + +#include +#include + +#include +#include +#include + +typedef struct pxy_conn_ctx pxy_conn_ctx_t; +typedef struct pxy_thrmgr_ctx pxy_thrmgr_ctx_t; + +typedef struct pxy_thr_ctx { + pthread_t thr; + int thridx; + pxy_thrmgr_ctx_t *thrmgr; + size_t load; + struct event_base *evbase; + int running; + + // Statistics + evutil_socket_t max_fd; + size_t max_load; + size_t errors; + size_t set_watermarks; + size_t unset_watermarks; + long long unsigned int intif_in_bytes; + long long unsigned int intif_out_bytes; + long long unsigned int extif_in_bytes; + long long unsigned int extif_out_bytes; + // Each stats has an id, incremented on each stats print + unsigned short stats_id; + // Used to print statistics, compared against stats_period + unsigned int timeout_count; + + // List of active connections on the thread + pxy_conn_ctx_t *conns; +} pxy_thr_ctx_t; + +void pxy_thr_detach(pxy_conn_ctx_t *) NONNULL(1); +void *pxy_thr(void *); + +#endif /* !PXYTHR_H */ + +/* vim: set noet ft=c: */ diff --git a/tests/testproxy/lp/pxythrmgr.c b/tests/testproxy/lp/pxythrmgr.c index e588165..6873f6c 100644 --- a/tests/testproxy/lp/pxythrmgr.c +++ b/tests/testproxy/lp/pxythrmgr.c @@ -35,127 +35,6 @@ #include #include -#include -#include -#include - -/* - * Proxy thread manager: manages the connection handling worker threads - * and the per-thread resources (i.e. event bases). The load is shared - * across num_cpu * 2 connection handling threads, using the number of - * currently assigned connections as the sole metric. - * - * The attach and detach functions are thread-safe. - */ - -static void -pxy_thrmgr_print_thr_info(pxy_thr_ctx_t *tctx) -{ - log_finest_main_va("thr=%d, load=%lu", tctx->thridx, tctx->load); - - unsigned int idx = 1; - evutil_socket_t max_fd = 0; - time_t max_atime = 0; - time_t max_ctime = 0; - - char *smsg = NULL; - - if (tctx->conns) { - time_t now = time(NULL); - - pxy_conn_ctx_t *ctx = tctx->conns; - while (ctx) { - time_t atime = now - ctx->atime; - time_t ctime = now - ctx->ctime; - - log_finest_main_va("CONN: thr=%d, id=%u, fd=%d, dst=%d, p=%d-%d, at=%lld ct=%lld, src_addr=%s:%s, dst_addr=%s:%s", - tctx->thridx, idx, ctx->fd, ctx->dst_fd, ctx->src.closed, ctx->dst.closed, (long long)atime, (long long)ctime, - STRORDASH(ctx->srchost_str), STRORDASH(ctx->srcport_str), STRORDASH(ctx->dsthost_str), STRORDASH(ctx->dstport_str)); - - max_fd = MAX(max_fd, MAX(ctx->fd, ctx->dst_fd)); - max_atime = MAX(max_atime, atime); - max_ctime = MAX(max_ctime, ctime); - - idx++; - ctx = ctx->next; - } - } - - log_finest_main_va("STATS: thr=%d, mld=%zu, mfd=%d, mat=%lld, mct=%lld, iib=%llu, iob=%llu, eib=%llu, eob=%llu, swm=%zu, uwm=%zu, err=%zu, si=%u", - tctx->thridx, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes, - tctx->set_watermarks, tctx->unset_watermarks, tctx->errors, tctx->stats_id); - - if (asprintf(&smsg, "STATS: thr=%d, mld=%zu, mfd=%d, mat=%lld, mct=%lld, iib=%llu, iob=%llu, eib=%llu, eob=%llu, swm=%zu, uwm=%zu, err=%zu, si=%u\n", - tctx->thridx, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes, - tctx->set_watermarks, tctx->unset_watermarks, tctx->errors, tctx->stats_id) < 0) { - return; - } - - if (log_stats(smsg) == -1) { - log_err_level_printf(LOG_WARNING, "Stats logging failed\n"); - } - free(smsg); - - tctx->stats_id++; - - tctx->errors = 0; - tctx->set_watermarks = 0; - tctx->unset_watermarks = 0; - - tctx->intif_in_bytes = 0; - tctx->intif_out_bytes = 0; - tctx->extif_in_bytes = 0; - tctx->extif_out_bytes = 0; - - // Reset these stats with the current values (do not reset to 0 directly, there may be active conns) - tctx->max_fd = max_fd; - tctx->max_load = tctx->load; -} - -/* - * Recurring timer event to prevent the event loops from exiting when - * they run out of events. - */ -static void -pxy_thrmgr_timer_cb(UNUSED evutil_socket_t fd, UNUSED short what, UNUSED void *arg) -{ - pxy_thr_ctx_t *ctx = arg; - - pthread_mutex_lock(&ctx->mutex); - log_finest_main_va("thr=%d, load=%lu, to=%u", ctx->thridx, ctx->load, ctx->timeout_count); - - // @attention Print thread info only if stats logging is enabled, if disabled debug logs are not printed either - if (ctx->thrmgr->opts->statslog) { - ctx->timeout_count++; - if (ctx->timeout_count >= ctx->thrmgr->opts->stats_period) { - ctx->timeout_count = 0; - pxy_thrmgr_print_thr_info(ctx); - } - } - pthread_mutex_unlock(&ctx->mutex); -} - -/* - * Thread entry point; runs the event loop of the event base. - * Does not exit until the libevent loop is broken explicitly. - */ -static void * -pxy_thrmgr_thr(void *arg) -{ - pxy_thr_ctx_t *ctx = arg; - struct timeval timer_delay = {10, 0}; - struct event *ev; - - ev = event_new(ctx->evbase, -1, EV_PERSIST, pxy_thrmgr_timer_cb, ctx); - if (!ev) - return NULL; - evtimer_add(ev, &timer_delay); - ctx->running = 1; - event_base_dispatch(ctx->evbase); - event_free(ev); - - return NULL; -} /* * Create new thread manager but do not start any threads yet. @@ -209,11 +88,6 @@ pxy_thrmgr_run(pxy_thrmgr_ctx_t *ctx) ctx->thr[idx]->thridx = idx; ctx->thr[idx]->timeout_count = 0; ctx->thr[idx]->thrmgr = ctx; - - if (pthread_mutex_init(&ctx->thr[idx]->mutex, NULL)) { - log_dbg_printf("Failed to initialize thr mutex\n"); - goto leave; - } } log_dbg_printf("Initialized %d connection handling threads\n", @@ -221,7 +95,7 @@ pxy_thrmgr_run(pxy_thrmgr_ctx_t *ctx) for (idx = 0; idx < ctx->num_thr; idx++) { if (pthread_create(&ctx->thr[idx]->thr, NULL, - pxy_thrmgr_thr, ctx->thr[idx])) + pxy_thr, ctx->thr[idx])) goto leave_thr; while (!ctx->thr[idx]->running) { sched_yield(); @@ -248,7 +122,6 @@ leave: if (ctx->thr[idx]->evbase) { event_base_free(ctx->thr[idx]->evbase); } - pthread_mutex_destroy(&ctx->thr[idx]->mutex); free(ctx->thr[idx]); } idx--; @@ -278,7 +151,6 @@ pxy_thrmgr_free(pxy_thrmgr_ctx_t *ctx) if (ctx->thr[idx]->evbase) { event_base_free(ctx->thr[idx]->evbase); } - pthread_mutex_destroy(&ctx->thr[idx]->mutex); free(ctx->thr[idx]); } free(ctx->thr); @@ -286,67 +158,6 @@ pxy_thrmgr_free(pxy_thrmgr_ctx_t *ctx) free(ctx); } -void -pxy_thrmgr_add_conn(pxy_conn_ctx_t *ctx) -{ - pthread_mutex_lock(&ctx->thr->mutex); - if (!ctx->in_thr_conns) { - log_finest("Adding conn"); - - ctx->in_thr_conns = 1; - // Always keep thr load and conns list in sync - ctx->thr->load++; - ctx->next = ctx->thr->conns; - ctx->thr->conns = ctx; - } else { - // Do not add conns twice - log_finest("Will not add conn twice"); - } - pthread_mutex_unlock(&ctx->thr->mutex); -} - -static void NONNULL(1) -pxy_thrmgr_remove_conn_unlocked(pxy_conn_ctx_t *ctx) -{ - assert(ctx != NULL); - - if (ctx->in_thr_conns) { - log_finest("Removing conn"); - - // Thr conns list cannot be empty, if the in_thr_conns flag of a conn is set - assert(ctx->thr->conns != NULL); - - // Shouldn't need to reset the in_thr_conns flag, because the conn ctx will be freed next, but just in case - ctx->in_thr_conns = 0; - // We increment thr load in pxy_thrmgr_add_conn() only - ctx->thr->load--; - - // @attention We may get multiple conns with the same fd combinations, so fds cannot uniquely define a conn; hence the need for unique ids. - if (ctx->id == ctx->thr->conns->id) { - ctx->thr->conns = ctx->thr->conns->next; - return; - } else { - pxy_conn_ctx_t *current = ctx->thr->conns->next; - pxy_conn_ctx_t *previous = ctx->thr->conns; - while (current != NULL && previous != NULL) { - if (ctx->id == current->id) { - previous->next = current->next; - return; - } - previous = current; - current = current->next; - } - // This should never happen - log_err_level_printf(LOG_CRIT, "Cannot find conn in thr conns\n"); - log_fine_main_va("Cannot find conn in thr conns, id=%llu, fd=%d", ctx->id, ctx->fd); - assert(0); - } - } else { - // This can happen if we are closing the conn after a fatal error before setting its event callback - log_finest("Conn not in thr conns"); - } -} - /* * Attach a new connection to a thread. Chooses the thread with the fewest * currently active connections, returns the appropriate event bases. @@ -360,51 +171,32 @@ pxy_thrmgr_attach(pxy_conn_ctx_t *ctx) { log_finest("ENTER"); - int thridx = 0; - size_t minload; - pxy_thrmgr_ctx_t *tmctx = ctx->thrmgr; - pthread_mutex_lock(&tmctx->thr[0]->mutex); - minload = tmctx->thr[0]->load; - pthread_mutex_unlock(&tmctx->thr[0]->mutex); + size_t minload = tmctx->thr[0]->load; #ifdef DEBUG_THREAD - log_dbg_printf("===> Proxy connection handler thread status:\n" - "thr[0]: %zu\n", minload); + log_dbg_printf("===> Proxy connection handler thread status:\nthr[0]: %zu\n", minload); #endif /* DEBUG_THREAD */ + + int thridx = 0; for (int idx = 1; idx < tmctx->num_thr; idx++) { - pthread_mutex_lock(&tmctx->thr[idx]->mutex); -#ifdef DEBUG_THREAD - log_dbg_printf("thr[%d]: %zu\n", idx, tmctx->thr[idx]->load); -#endif /* DEBUG_THREAD */ - if (minload > tmctx->thr[idx]->load) { - minload = tmctx->thr[idx]->load; + size_t thrload = tmctx->thr[idx]->load; + + if (minload > thrload) { + minload = thrload; thridx = idx; } - pthread_mutex_unlock(&tmctx->thr[idx]->mutex); + +#ifdef DEBUG_THREAD + log_dbg_printf("thr[%d]: %zu\n", idx, thrload); +#endif /* DEBUG_THREAD */ } - // Defer adding the conn to the conn list of its thread until after a successful conn setup while returning from pxy_conn_connect() - // otherwise pxy_thrmgr_timer_cb() may try to access the conn ctx while it is being freed on failure (signal 6 crash) ctx->thr = tmctx->thr[thridx]; - ctx->evbase = ctx->thr->evbase; #ifdef DEBUG_THREAD log_dbg_printf("thridx: %d\n", thridx); #endif /* DEBUG_THREAD */ } -/* - * Detach a connection from a thread by index. - * This function cannot fail. - */ -void -pxy_thrmgr_detach(pxy_conn_ctx_t *ctx) -{ - pthread_mutex_lock(&ctx->thr->mutex); - log_finest("ENTER"); - pxy_thrmgr_remove_conn_unlocked(ctx); - pthread_mutex_unlock(&ctx->thr->mutex); -} - /* vim: set noet ft=c: */ diff --git a/tests/testproxy/lp/pxythrmgr.h b/tests/testproxy/lp/pxythrmgr.h index 5cef6cc..5e6edd5 100644 --- a/tests/testproxy/lp/pxythrmgr.h +++ b/tests/testproxy/lp/pxythrmgr.h @@ -31,51 +31,11 @@ #include "opts.h" #include "attrib.h" - -#include -#include - -#include -#include -#include +#include "pxythr.h" extern int descriptor_table_size; #define FD_RESERVE 10 -typedef struct pxy_conn_ctx pxy_conn_ctx_t; -typedef struct pxy_thrmgr_ctx pxy_thrmgr_ctx_t; - -typedef struct pxy_thr_ctx { - pthread_t thr; - int thridx; - pxy_thrmgr_ctx_t *thrmgr; - size_t load; - struct event_base *evbase; - int running; - - // Per-thread locking is necessary during connection setup and termination - // to prevent multithreading issues between thrmgr thread and conn handling threads - pthread_mutex_t mutex; - - // Statistics - evutil_socket_t max_fd; - size_t max_load; - size_t errors; - size_t set_watermarks; - size_t unset_watermarks; - long long unsigned int intif_in_bytes; - long long unsigned int intif_out_bytes; - long long unsigned int extif_in_bytes; - long long unsigned int extif_out_bytes; - // Each stats has an id, incremented on each stats print - unsigned short stats_id; - // Used to print statistics, compared against stats_period - unsigned int timeout_count; - - // List of active connections on the thread - pxy_conn_ctx_t *conns; -} pxy_thr_ctx_t; - struct pxy_thrmgr_ctx { int num_thr; opts_t *opts; @@ -89,10 +49,7 @@ pxy_thrmgr_ctx_t * pxy_thrmgr_new(opts_t *) MALLOC; int pxy_thrmgr_run(pxy_thrmgr_ctx_t *) NONNULL(1) WUNRES; void pxy_thrmgr_free(pxy_thrmgr_ctx_t *) NONNULL(1); -void pxy_thrmgr_add_conn(pxy_conn_ctx_t *) NONNULL(1); - void pxy_thrmgr_attach(pxy_conn_ctx_t *) NONNULL(1); -void pxy_thrmgr_detach(pxy_conn_ctx_t *) NONNULL(1); #endif /* !PXYTHRMGR_H */