Update lp with sslproxy changes and clean up

pull/48/head
Soner Tari 4 years ago
parent d3c3059c26
commit 6c5165fa6e

1
.gitignore vendored

@ -26,6 +26,7 @@
/tests/testproxy/lp/lp
/nbproject/
/tests/testproxy/lp/nbproject/
# CVS default ignores begin
tags

@ -123,6 +123,8 @@ extern logger_t *connect_log;
#define log_connect_write_free(buf, sz) \
logger_write_freebuf(connect_log, NULL, 0, (buf), (sz))
#define log_err_level(level, str) { log_err_level_printf((level), (str"\n")); log_fine((str)); }
int log_stats(const char *);
int log_conn(const char *);

@ -33,6 +33,10 @@
#include <sys/param.h>
#include <string.h>
#ifdef __linux__
#include <glob.h>
#endif /* __linux__ */
/*
* Set up a bufferevent structure for either a dst or src connection,
* optionally with or without SSL. Sets all callbacks, enables read
@ -49,10 +53,9 @@ prototcp_bufferevent_setup(pxy_conn_ctx_t *ctx, evutil_socket_t fd)
{
log_finest_va("ENTER, fd=%d", fd);
struct bufferevent *bev = bufferevent_socket_new(ctx->evbase, fd, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);
struct bufferevent *bev = bufferevent_socket_new(ctx->thr->evbase, fd, BEV_OPT_DEFER_CALLBACKS|BEV_OPT_THREADSAFE);
if (!bev) {
log_err_level_printf(LOG_CRIT, "Error creating bufferevent socket\n");
log_fine_va("bufferevent_socket_new failed, fd=%d", fd);
log_err_level(LOG_CRIT, "Error creating bufferevent socket");
return NULL;
}
return bev;
@ -96,38 +99,180 @@ prototcp_setup_dst(pxy_conn_ctx_t *ctx)
return 0;
}
static int NONNULL(1) WUNRES
prototcp_conn_connect(pxy_conn_ctx_t *ctx)
#if defined(__APPLE__) || defined(__FreeBSD__)
#define getdtablecount() 0
/*
* Copied from:
* opensmtpd-201801101641p1/openbsd-compat/imsg.c
*
* Copyright (c) 2003, 2004 Henning Brauer <henning@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
static int
available_fds(unsigned int n)
{
log_finest("ENTER");
unsigned int i;
int ret, fds[256];
if (prototcp_setup_src(ctx) == -1) {
if (n > (sizeof(fds)/sizeof(fds[0])))
return -1;
ret = 0;
for (i = 0; i < n; i++) {
fds[i] = -1;
if ((fds[i] = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
ret = -1;
break;
}
}
if (prototcp_setup_dst(ctx) == -1) {
return -1;
for (i = 0; i < n && fds[i] >= 0; i++)
close(fds[i]);
return ret;
}
#endif /* __APPLE__ || __FreeBSD__ */
#ifdef __linux__
/*
* Copied from:
* https://github.com/tmux/tmux/blob/master/compat/getdtablecount.c
*
* Copyright (c) 2017 Nicholas Marriott <nicholas.marriott@gmail.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
* IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
* OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
int
getdtablecount()
{
char path[PATH_MAX];
glob_t g;
int n = 0;
if (snprintf(path, sizeof path, "/proc/%ld/fd/*", (long)getpid()) < 0) {
log_err_level_printf(LOG_CRIT, "snprintf overflow\n");
return 0;
}
if (glob(path, 0, NULL, &g) == 0)
n = g.gl_pathc;
globfree(&g);
return n;
}
#endif /* __linux__ */
// Conn setup is successful, so add the conn to the conn list of its thread now
pxy_thrmgr_add_conn(ctx);
/*
* Check if we are out of file descriptors to close the conn, or else libevent will crash us
* @attention We cannot guess the number of children in a connection at conn setup time. So, FD_RESERVE is just a ball park figure.
* But what if a connection passes the check below, but eventually tries to create more children than FD_RESERVE allows for? This will crash us the same.
* Beware, this applies to all current conns, not just the last connection setup.
* For example, 20x conns pass the check below before creating any children, at which point we reach the last FD_RESERVE fds,
* then they all start creating children, which crashes us again.
* So, no matter how large an FD_RESERVE we choose, there will always be a risk of running out of fds, if we check the number of fds during parent conn setup only.
* If we are left with less than FD_RESERVE fds, we should not create more children than FD_RESERVE allows for either.
* Therefore, we check if we are out of fds in pxy_listener_acceptcb_child() and close the conn there too.
* @attention These checks are expected to slow us further down, but it is critical to avoid a crash in case we run out of fds.
*/
static int
check_fd_usage(
#ifdef DEBUG_PROXY
pxy_conn_ctx_t *ctx
#endif /* DEBUG_PROXY */
)
{
int dtable_count = getdtablecount();
bufferevent_setcb(ctx->src.bev, pxy_bev_readcb, pxy_bev_writecb, pxy_bev_eventcb, ctx);
bufferevent_setcb(ctx->dst.bev, pxy_bev_readcb, pxy_bev_writecb, pxy_bev_eventcb, ctx);
log_finer_va("descriptor_table_size=%d, dtablecount=%d, reserve=%d", descriptor_table_size, dtable_count, FD_RESERVE);
log_finer("Enabling src");
if (dtable_count + FD_RESERVE >= descriptor_table_size) {
goto out;
}
#if defined(__APPLE__) || defined(__FreeBSD__)
if (available_fds(FD_RESERVE) == -1) {
goto out;
}
#endif /* __APPLE__ || __FreeBSD__ */
// Now open the gates
bufferevent_enable(ctx->src.bev, EV_READ|EV_WRITE);
return 0;
out:
errno = EMFILE;
log_err_level_printf(LOG_CRIT, "Out of file descriptors\n");
return -1;
}
static void
prototcp_fd_readcb(UNUSED evutil_socket_t fd, UNUSED short what, void *arg)
void
prototcp_connect(UNUSED evutil_socket_t fd, UNUSED short what, void *arg)
{
pxy_conn_ctx_t *ctx = arg;
log_finest("ENTER");
pxy_conn_connect(ctx);
event_free(ctx->ev);
ctx->ev = NULL;
// Always keep thr load and conns list in sync
ctx->thr->load++;
ctx->next = ctx->thr->conns;
ctx->thr->conns = ctx;
if (ctx->next)
ctx->next->prev = ctx;
if (check_fd_usage(
#ifdef DEBUG_PROXY
ctx
#endif /* DEBUG_PROXY */
) == -1) {
goto out;
}
if (sys_sockaddr_str((struct sockaddr *)&ctx->srcaddr, ctx->srcaddrlen, &ctx->srchost_str, &ctx->srcport_str) != 0) {
log_err_level_printf(LOG_CRIT, "Aborting connection setup (out of memory)!\n");
goto out;
}
log_finest_va("srcaddr= [%s]:%s", ctx->srchost_str, ctx->srcport_str);
if (prototcp_setup_src(ctx) == -1) {
goto out;
}
if (prototcp_setup_dst(ctx) == -1) {
goto out;
}
bufferevent_setcb(ctx->src.bev, pxy_bev_readcb, pxy_bev_writecb, pxy_bev_eventcb, ctx);
bufferevent_setcb(ctx->dst.bev, pxy_bev_readcb, pxy_bev_writecb, pxy_bev_eventcb, ctx);
log_finer("Enabling src");
// Now open the gates
bufferevent_enable(ctx->src.bev, EV_READ|EV_WRITE);
return;
out:
evutil_closesocket(ctx->fd);
pxy_conn_free(ctx, 1);
}
static int
@ -239,8 +384,7 @@ prototcp_bev_readcb_src(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
/* initiate connection */
bufferevent_enable(ctx->dst.bev, EV_READ|EV_WRITE);
if (bufferevent_socket_connect(ctx->dst.bev, (struct sockaddr *)&ctx->dstaddr, ctx->dstaddrlen) == -1) {
log_err_level_printf(LOG_CRIT, "prototcp_bev_readcb_src: bufferevent_socket_connect for dst failed\n");
log_fine("bufferevent_socket_connect for dst failed");
log_err_level(LOG_CRIT, "bufferevent_socket_connect for dst failed");
}
}
@ -362,8 +506,7 @@ prototcp_bev_eventcb_eof_src(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
#endif /* DEBUG_PROXY */
if (!ctx->src_connected) {
log_err_level_printf(LOG_WARNING, "EOF on connection before connection establishment\n");
log_fine("EOF on connection before connection establishment");
log_err_level(LOG_WARNING, "EOF on connection before connection establishment");
ctx->dst.closed = 1;
} else if (!ctx->dst.closed) {
log_finest("!dst->closed, terminate conn");
@ -385,8 +528,7 @@ prototcp_bev_eventcb_eof_dst(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
#endif /* DEBUG_PROXY */
if (!ctx->dst_connected) {
log_err_level_printf(LOG_WARNING, "EOF on connection before connection establishment\n");
log_fine("EOF on connection before connection establishment");
log_err_level(LOG_WARNING, "EOF on connection before connection establishment");
ctx->src.closed = 1;
} else if (!ctx->src.closed) {
log_finest("!src->closed, terminate conn");
@ -497,13 +639,9 @@ protocol_t
prototcp_setup(pxy_conn_ctx_t *ctx)
{
ctx->protoctx->proto = PROTO_TCP;
ctx->protoctx->connectcb = prototcp_conn_connect;
ctx->protoctx->fd_readcb = prototcp_fd_readcb;
ctx->protoctx->bev_readcb = prototcp_bev_readcb;
ctx->protoctx->bev_writecb = prototcp_bev_writecb;
ctx->protoctx->bev_eventcb = prototcp_bev_eventcb;
return PROTO_TCP;
}

@ -32,6 +32,7 @@
#include "pxyconn.h"
void prototcp_connect(evutil_socket_t, short, void *);
protocol_t prototcp_setup(pxy_conn_ctx_t *) NONNULL(1);
#endif /* PROTOTCP_H */

@ -28,6 +28,7 @@
#include "proxy.h"
#include "prototcp.h"
#include "privsep.h"
#include "pxythrmgr.h"
#include "pxyconn.h"
@ -51,7 +52,6 @@
#include <event2/buffer.h>
#include <event2/thread.h>
/*
* Proxy engine, built around libevent 2.x.
*/
@ -92,8 +92,85 @@ proxy_listener_ctx_free(proxy_listener_ctx_t *ctx)
free(ctx);
}
static protocol_t NONNULL(1)
proxy_setup_proto(pxy_conn_ctx_t *ctx)
{
ctx->protoctx = malloc(sizeof(proto_ctx_t));
if (!ctx->protoctx) {
return PROTO_ERROR;
}
memset(ctx->protoctx, 0, sizeof(proto_ctx_t));
protocol_t proto = prototcp_setup(ctx);
if (proto == PROTO_ERROR) {
free(ctx->protoctx);
}
return proto;
}
static pxy_conn_ctx_t * MALLOC NONNULL(2,3)
proxy_conn_ctx_new(evutil_socket_t fd,
pxy_thrmgr_ctx_t *thrmgr, opts_t *opts)
{
log_finest_main_va("ENTER, fd=%d", fd);
pxy_conn_ctx_t *ctx = malloc(sizeof(pxy_conn_ctx_t));
if (!ctx) {
return NULL;
}
memset(ctx, 0, sizeof(pxy_conn_ctx_t));
ctx->id = thrmgr->conn_count++;
log_finest_main_va("id=%llu, fd=%d", ctx->id, fd);
ctx->fd = fd;
ctx->thrmgr = thrmgr;
ctx->proto = proxy_setup_proto(ctx);
if (ctx->proto == PROTO_ERROR) {
free(ctx);
return NULL;
}
ctx->opts = opts;
ctx->ctime = time(NULL);
ctx->atime = ctx->ctime;
ctx->next = NULL;
return ctx;
}
/*
* Does minimal clean-up, called on error by proxy_listener_acceptcb() only.
* We call this function instead of pxy_conn_ctx_free(), because
* proxy_listener_acceptcb() runs on thrmgr, whereas pxy_conn_ctx_free()
* runs on conn handling thr. This is necessary to prevent multithreading issues.
*/
static void NONNULL(1)
proxy_conn_ctx_free(pxy_conn_ctx_t *ctx)
{
log_finest("ENTER");
if (ctx->ev) {
event_free(ctx->ev);
}
free(ctx->protoctx);
free(ctx);
}
/*
* Callback for accept events on the socket listener bufferevent.
* Called when a new incoming connection has been accepted.
* Initiates the connection to the server. The incoming connection
* from the client is not being activated until we have a successful
* connection to the server, because we need the server's certificate
* in order to set up the SSL session to the client.
* For consistency, plain TCP works the same way, even if we could
* start reading from the client while waiting on the connection to
* the server to connect.
*/
static void
proxy_listener_acceptcb(UNUSED struct evconnlistener *listener,
@ -103,7 +180,36 @@ proxy_listener_acceptcb(UNUSED struct evconnlistener *listener,
{
proxy_listener_ctx_t *lctx = arg;
log_finest_main_va("ENTER, fd=%d", fd);
pxy_conn_setup(fd, peeraddr, peeraddrlen, lctx->thrmgr, lctx->opts);
/* create per connection state */
pxy_conn_ctx_t *ctx = proxy_conn_ctx_new(fd, lctx->thrmgr, lctx->opts);
if (!ctx) {
log_err_level_printf(LOG_CRIT, "Error allocating memory\n");
evutil_closesocket(fd);
return;
}
// Choose the conn handling thr
pxy_thrmgr_attach(ctx);
ctx->srcaddrlen = peeraddrlen;
memcpy(&ctx->srcaddr, peeraddr, ctx->srcaddrlen);
// Switch from thrmgr to connection handling thread, i.e. change the event base, asap
// This prevents possible multithreading issues between thrmgr and conn handling threads
ctx->ev = event_new(ctx->thr->evbase, -1, 0, prototcp_connect, ctx);
if (!ctx->ev) {
log_err_level(LOG_CRIT, "Error creating connect event, aborting connection");
goto out;
}
// The only purpose of this event is to change the event base, so it is a one-shot event
if (event_add(ctx->ev, NULL) == -1)
goto out;
event_active(ctx->ev, 0, 0);
return;
out:
evutil_closesocket(fd);
proxy_conn_ctx_free(ctx);
}
/*

@ -29,8 +29,6 @@
#include "pxyconn.h"
#include "prototcp.h"
#include "privsep.h"
#include "sys.h"
#include "log.h"
@ -44,20 +42,6 @@
#include <event2/listener.h>
#ifdef __linux__
#include <glob.h>
#endif /* __linux__ */
#include <net/if_arp.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/sysctl.h>
#include <net/route.h>
#include <netinet/if_ether.h>
#ifdef __OpenBSD__
#include <net/if_dl.h>
#endif /* __OpenBSD__ */
/*
* Maximum size of data to buffer per connection direction before
* temporarily stopping to read data from the other end.
@ -72,60 +56,6 @@ char *protocol_names[] = {
"TCP", // = 0
};
static protocol_t NONNULL(1)
pxy_setup_proto(pxy_conn_ctx_t *ctx)
{
ctx->protoctx = malloc(sizeof(proto_ctx_t));
if (!ctx->protoctx) {
return PROTO_ERROR;
}
memset(ctx->protoctx, 0, sizeof(proto_ctx_t));
// Default to tcp
protocol_t proto = prototcp_setup(ctx);
if (proto == PROTO_ERROR) {
free(ctx->protoctx);
}
return proto;
}
static pxy_conn_ctx_t * MALLOC NONNULL(2,3)
pxy_conn_ctx_new(evutil_socket_t fd,
pxy_thrmgr_ctx_t *thrmgr, opts_t *opts)
{
log_finest_main_va("ENTER, fd=%d", fd);
pxy_conn_ctx_t *ctx = malloc(sizeof(pxy_conn_ctx_t));
if (!ctx) {
return NULL;
}
memset(ctx, 0, sizeof(pxy_conn_ctx_t));
ctx->id = thrmgr->conn_count++;
log_finest_main_va("id=%llu, fd=%d", ctx->id, fd);
ctx->fd = fd;
ctx->thrmgr = thrmgr;
ctx->proto = pxy_setup_proto(ctx);
if (ctx->proto == PROTO_ERROR) {
free(ctx);
return NULL;
}
ctx->opts = opts;
ctx->ctime = time(NULL);
ctx->atime = ctx->ctime;
ctx->next = NULL;
pxy_thrmgr_attach(ctx);
return ctx;
}
void
pxy_conn_ctx_free(pxy_conn_ctx_t *ctx, int by_requestor)
{
@ -137,7 +67,7 @@ pxy_conn_ctx_free(pxy_conn_ctx_t *ctx, int by_requestor)
}
}
pxy_thrmgr_detach(ctx);
pxy_thr_detach(ctx);
if (ctx->srchost_str) {
free(ctx->srchost_str);
@ -151,9 +81,8 @@ pxy_conn_ctx_free(pxy_conn_ctx_t *ctx, int by_requestor)
if (ctx->dstport_str) {
free(ctx->dstport_str);
}
// If the proto doesn't have special args, proto_free() callback is NULL
if (ctx->protoctx->proto_free) {
ctx->protoctx->proto_free(ctx);
if (ctx->ev) {
event_free(ctx->ev);
}
free(ctx->protoctx);
free(ctx);
@ -400,129 +329,6 @@ pxy_discard_inbuf(struct bufferevent *bev)
evbuffer_drain(inbuf, inbuf_size);
}
#if defined(__APPLE__) || defined(__FreeBSD__)
#define getdtablecount() 0
/*
* Copied from:
* opensmtpd-201801101641p1/openbsd-compat/imsg.c
*
* Copyright (c) 2003, 2004 Henning Brauer <henning@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
static int
available_fds(unsigned int n)
{
unsigned int i;
int ret, fds[256];
if (n > (sizeof(fds)/sizeof(fds[0])))
return -1;
ret = 0;
for (i = 0; i < n; i++) {
fds[i] = -1;
if ((fds[i] = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
ret = -1;
break;
}
}
for (i = 0; i < n && fds[i] >= 0; i++)
close(fds[i]);
return ret;
}
#endif /* __APPLE__ || __FreeBSD__ */
#ifdef __linux__
/*
* Copied from:
* https://github.com/tmux/tmux/blob/master/compat/getdtablecount.c
*
* Copyright (c) 2017 Nicholas Marriott <nicholas.marriott@gmail.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF MIND, USE, DATA OR PROFITS, WHETHER
* IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING
* OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
int
getdtablecount()
{
char path[PATH_MAX];
glob_t g;
int n = 0;
if (snprintf(path, sizeof path, "/proc/%ld/fd/*", (long)getpid()) < 0) {
log_err_level_printf(LOG_CRIT, "snprintf overflow\n");
return 0;
}
if (glob(path, 0, NULL, &g) == 0)
n = g.gl_pathc;
globfree(&g);
return n;
}
#endif /* __linux__ */
/*
* Check if we are out of file descriptors to close the conn, or else libevent will crash us
* @attention We cannot guess the number of children in a connection at conn setup time. So, FD_RESERVE is just a ball park figure.
* But what if a connection passes the check below, but eventually tries to create more children than FD_RESERVE allows for? This will crash us the same.
* Beware, this applies to all current conns, not just the last connection setup.
* For example, 20x conns pass the check below before creating any children, at which point we reach the last FD_RESERVE fds,
* then they all start creating children, which crashes us again.
* So, no matter how large an FD_RESERVE we choose, there will always be a risk of running out of fds, if we check the number of fds during parent conn setup only.
* If we are left with less than FD_RESERVE fds, we should not create more children than FD_RESERVE allows for either.
* Therefore, we check if we are out of fds in pxy_listener_acceptcb_child() and close the conn there too.
* @attention These checks are expected to slow us further down, but it is critical to avoid a crash in case we run out of fds.
*/
static int
check_fd_usage(
#ifdef DEBUG_PROXY
evutil_socket_t fd
#endif /* DEBUG_PROXY */
)
{
int dtable_count = getdtablecount();
log_finer_main_va("descriptor_table_size=%d, dtablecount=%d, reserve=%d, fd=%d", descriptor_table_size, dtable_count, FD_RESERVE, fd);
if (dtable_count + FD_RESERVE >= descriptor_table_size) {
goto out;
}
#if defined(__APPLE__) || defined(__FreeBSD__)
if (available_fds(FD_RESERVE) == -1) {
goto out;
}
#endif /* __APPLE__ || __FreeBSD__ */
return 0;
out:
errno = EMFILE;
log_err_level_printf(LOG_CRIT, "Out of file descriptors\n");
return -1;
}
int
pxy_try_close_conn_end(pxy_conn_desc_t *conn_end, pxy_conn_ctx_t *ctx)
{
@ -679,80 +485,4 @@ pxy_bev_eventcb(struct bufferevent *bev, short events, void *arg)
}
}
/*
* Complete the connection.
*/
void
pxy_conn_connect(pxy_conn_ctx_t *ctx)
{
log_finest("ENTER");
if (ctx->protoctx->connectcb(ctx) == -1) {
// @attention Do not try to close conns or do anything else with conn ctx on the thrmgr thread after setting event callbacks and/or socket connect.
// The return value of -1 from connectcb indicates that there was a fatal error before event callbacks were set, so we can terminate the connection.
// Otherwise, it is up to the event callbacks to terminate the connection. This is necessary to avoid multithreading issues.
if (ctx->term || ctx->enomem) {
pxy_conn_free(ctx, ctx->term ? ctx->term_requestor : 1);
}
}
// @attention Do not do anything with the conn ctx after this point on the thrmgr thread
}
/*
* Callback for accept events on the socket listener bufferevent.
* Called when a new incoming connection has been accepted.
* Initiates the connection to the server. The incoming connection
* from the client is not being activated until we have a successful
* connection to the server, because we need the server's certificate
* in order to set up the SSL session to the client.
* For consistency, plain TCP works the same way, even if we could
* start reading from the client while waiting on the connection to
* the server to connect.
*/
void
pxy_conn_setup(evutil_socket_t fd,
struct sockaddr *peeraddr, int peeraddrlen,
pxy_thrmgr_ctx_t *thrmgr, opts_t *opts)
{
#ifdef DEBUG_PROXY
log_finest_main_va("ENTER, fd=%d", fd);
char *host, *port;
if (sys_sockaddr_str(peeraddr, peeraddrlen, &host, &port) == 0) {
log_finest_main_va("peer addr=[%s]:%s, fd=%d", host, port, fd);
free(host);
free(port);
}
#endif /* DEBUG_PROXY */
if (check_fd_usage(
#ifdef DEBUG_PROXY
fd
#endif /* DEBUG_PROXY */
) == -1) {
evutil_closesocket(fd);
return;
}
/* create per connection state and attach to thread */
pxy_conn_ctx_t *ctx = pxy_conn_ctx_new(fd, thrmgr, opts);
if (!ctx) {
log_err_level_printf(LOG_CRIT, "Error allocating memory\n");
evutil_closesocket(fd);
return;
}
if (sys_sockaddr_str(peeraddr, peeraddrlen, &ctx->srchost_str, &ctx->srcport_str) != 0) {
log_err_level_printf(LOG_CRIT, "Aborting connection setup (out of memory)!\n");
goto out;
}
ctx->protoctx->fd_readcb(ctx->fd, 0, ctx);
return;
out:
evutil_closesocket(fd);
pxy_conn_ctx_free(ctx, 1);
}
/* vim: set noet ft=c: */

@ -49,16 +49,11 @@
#define WANT_CONNECT_LOG(ctx) ((ctx)->opts->connectlog||!(ctx)->opts->detach||(ctx)->opts->statslog)
#define WANT_CONTENT_LOG(ctx) ((ctx)->opts->contentlog)
typedef void (*fd_readcb_func_t)(evutil_socket_t, short, void *);
typedef int (*connect_func_t)(pxy_conn_ctx_t *);
typedef void (*callback_func_t)(struct bufferevent *, void *);
typedef void (*eventcb_func_t)(struct bufferevent *, short, void *);
typedef void (*bev_free_func_t)(struct bufferevent *, pxy_conn_ctx_t *);
typedef void (*proto_free_func_t)(pxy_conn_ctx_t *);
/*
* Proxy connection context state, describes a proxy connection
* with source and destination socket bufferevents, SSL context and
@ -82,18 +77,9 @@ typedef struct proto_ctx proto_ctx_t;
struct proto_ctx {
protocol_t proto;
connect_func_t connectcb;
fd_readcb_func_t fd_readcb;
callback_func_t bev_readcb;
callback_func_t bev_writecb;
eventcb_func_t bev_eventcb;
proto_free_func_t proto_free;
// For protocol specific fields, if any
void *arg;
};
/* connection state consisting of two connection descriptors,
@ -128,13 +114,16 @@ struct pxy_conn_ctx {
unsigned int term_requestor : 1; /* 1 client, 0 server side */
unsigned int seen_sslproxy_line : 1; /* 1 if seen sslproxy line */
/* destination address */
struct event *ev;
/* original source and destination address */
struct sockaddr_storage srcaddr;
socklen_t srcaddrlen;
struct sockaddr_storage dstaddr;
socklen_t dstaddrlen;
// Thread that the conn is attached to
pxy_thr_ctx_t *thr;
unsigned int in_thr_conns : 1; /* 1 to prevent adding twice */
// Unique id of the conn
long long unsigned int id;
@ -142,8 +131,6 @@ struct pxy_conn_ctx {
pxy_thrmgr_ctx_t *thrmgr;
opts_t *opts;
struct event_base *evbase;
evutil_socket_t dst_fd;
// Conn create time
@ -155,6 +142,7 @@ struct pxy_conn_ctx {
// Per-thread conn list, used to determine idle and expired conns, and to close them
pxy_conn_ctx_t *next;
pxy_conn_ctx_t *prev;
};
void pxy_log_connect(pxy_conn_ctx_t *) NONNULL(1);
@ -184,11 +172,6 @@ void pxy_bev_readcb(struct bufferevent *, void *);
void pxy_bev_writecb(struct bufferevent *, void *);
void pxy_bev_eventcb(struct bufferevent *, short, void *);
void pxy_conn_connect(pxy_conn_ctx_t *) NONNULL(1);
void pxy_conn_setup(evutil_socket_t, struct sockaddr *, int,
pxy_thrmgr_ctx_t *, opts_t *)
NONNULL(2,4,5);
#endif /* !PXYCONN_H */
/* vim: set noet ft=c: */

@ -0,0 +1,177 @@
/*-
* SSLsplit - transparent SSL/TLS interception
* https://www.roe.ch/SSLsplit
*
* Copyright (c) 2017-2020, Soner Tari <sonertari@gmail.com>.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "pxythr.h"
#include "log.h"
#include "pxyconn.h"
#include <assert.h>
#include <sys/param.h>
/*
* Detach a connection from a thread by index.
* This function cannot fail.
*/
void
pxy_thr_detach(pxy_conn_ctx_t *ctx)
{
assert(ctx != NULL);
// If this function is called, the thr conns list cannot be empty
assert(ctx->thr->conns != NULL);
log_finest("Removing conn");
ctx->thr->load--;
if (ctx->prev) {
ctx->prev->next = ctx->next;
} else {
ctx->thr->conns = ctx->next;
}
if (ctx->next)
ctx->next->prev = ctx->prev;
}
/*
* Proxy thread manager: manages the connection handling worker threads
* and the per-thread resources (i.e. event bases). The load is shared
* across num_cpu * 2 connection handling threads, using the number of
* currently assigned connections as the sole metric.
*
* The attach and detach functions are thread-safe.
*/
static void
pxy_thr_print_thr_info(pxy_thr_ctx_t *tctx)
{
log_finest_main_va("thr=%d, load=%lu", tctx->thridx, tctx->load);
unsigned int idx = 1;
evutil_socket_t max_fd = 0;
time_t max_atime = 0;
time_t max_ctime = 0;
char *smsg = NULL;
if (tctx->conns) {
time_t now = time(NULL);
pxy_conn_ctx_t *ctx = tctx->conns;
while (ctx) {
time_t atime = now - ctx->atime;
time_t ctime = now - ctx->ctime;
log_finest_main_va("CONN: thr=%d, id=%u, fd=%d, dst=%d, p=%d-%d, at=%lld ct=%lld, src_addr=%s:%s, dst_addr=%s:%s",
tctx->thridx, idx, ctx->fd, ctx->dst_fd, ctx->src.closed, ctx->dst.closed, (long long)atime, (long long)ctime,
STRORDASH(ctx->srchost_str), STRORDASH(ctx->srcport_str), STRORDASH(ctx->dsthost_str), STRORDASH(ctx->dstport_str));
max_fd = MAX(max_fd, MAX(ctx->fd, ctx->dst_fd));
max_atime = MAX(max_atime, atime);
max_ctime = MAX(max_ctime, ctime);
idx++;
ctx = ctx->next;
}
}
log_finest_main_va("STATS: thr=%d, mld=%zu, mfd=%d, mat=%lld, mct=%lld, iib=%llu, iob=%llu, eib=%llu, eob=%llu, swm=%zu, uwm=%zu, err=%zu, si=%u",
tctx->thridx, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes,
tctx->set_watermarks, tctx->unset_watermarks, tctx->errors, tctx->stats_id);
if (asprintf(&smsg, "STATS: thr=%d, mld=%zu, mfd=%d, mat=%lld, mct=%lld, iib=%llu, iob=%llu, eib=%llu, eob=%llu, swm=%zu, uwm=%zu, err=%zu, si=%u\n",
tctx->thridx, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes,
tctx->set_watermarks, tctx->unset_watermarks, tctx->errors, tctx->stats_id) < 0) {
return;
}
if (log_stats(smsg) == -1) {
log_err_level_printf(LOG_WARNING, "Stats logging failed\n");
}
free(smsg);
tctx->stats_id++;
tctx->errors = 0;
tctx->set_watermarks = 0;
tctx->unset_watermarks = 0;
tctx->intif_in_bytes = 0;
tctx->intif_out_bytes = 0;
tctx->extif_in_bytes = 0;
tctx->extif_out_bytes = 0;
// Reset these stats with the current values (do not reset to 0 directly, there may be active conns)
tctx->max_fd = max_fd;
tctx->max_load = tctx->load;
}
/*
* Recurring timer event to prevent the event loops from exiting when
* they run out of events.
*/
static void
pxy_thr_timer_cb(UNUSED evutil_socket_t fd, UNUSED short what, UNUSED void *arg)
{
pxy_thr_ctx_t *ctx = arg;
log_finest_main_va("thr=%d, load=%lu, to=%u", ctx->thridx, ctx->load, ctx->timeout_count);
// @attention Print thread info only if stats logging is enabled, if disabled debug logs are not printed either
if (ctx->thrmgr->opts->statslog) {
ctx->timeout_count++;
if (ctx->timeout_count >= ctx->thrmgr->opts->stats_period) {
ctx->timeout_count = 0;
pxy_thr_print_thr_info(ctx);
}
}
}
/*
* Thread entry point; runs the event loop of the event base.
* Does not exit until the libevent loop is broken explicitly.
*/
void *
pxy_thr(void *arg)
{
pxy_thr_ctx_t *ctx = arg;
struct timeval timer_delay = {10, 0};
struct event *ev;
ev = event_new(ctx->evbase, -1, EV_PERSIST, pxy_thr_timer_cb, ctx);
if (!ev)
return NULL;
evtimer_add(ev, &timer_delay);
ctx->running = 1;
event_base_dispatch(ctx->evbase);
event_free(ev);
return NULL;
}
/* vim: set noet ft=c: */

@ -0,0 +1,76 @@
/*-
* SSLsplit - transparent SSL/TLS interception
* https://www.roe.ch/SSLsplit
*
* Copyright (c) 2017-2020, Soner Tari <sonertari@gmail.com>.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef PXYTHR_H
#define PXYTHR_H
#include "attrib.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <event2/event.h>
#include <event2/dns.h>
#include <pthread.h>
typedef struct pxy_conn_ctx pxy_conn_ctx_t;
typedef struct pxy_thrmgr_ctx pxy_thrmgr_ctx_t;
typedef struct pxy_thr_ctx {
pthread_t thr;
int thridx;
pxy_thrmgr_ctx_t *thrmgr;
size_t load;
struct event_base *evbase;
int running;
// Statistics
evutil_socket_t max_fd;
size_t max_load;
size_t errors;
size_t set_watermarks;
size_t unset_watermarks;
long long unsigned int intif_in_bytes;
long long unsigned int intif_out_bytes;
long long unsigned int extif_in_bytes;
long long unsigned int extif_out_bytes;
// Each stats has an id, incremented on each stats print
unsigned short stats_id;
// Used to print statistics, compared against stats_period
unsigned int timeout_count;
// List of active connections on the thread
pxy_conn_ctx_t *conns;
} pxy_thr_ctx_t;
void pxy_thr_detach(pxy_conn_ctx_t *) NONNULL(1);
void *pxy_thr(void *);
#endif /* !PXYTHR_H */
/* vim: set noet ft=c: */

@ -35,127 +35,6 @@
#include <string.h>
#include <event2/bufferevent.h>
#include <pthread.h>
#include <assert.h>
#include <sys/param.h>
/*
* Proxy thread manager: manages the connection handling worker threads
* and the per-thread resources (i.e. event bases). The load is shared
* across num_cpu * 2 connection handling threads, using the number of
* currently assigned connections as the sole metric.
*
* The attach and detach functions are thread-safe.
*/
static void
pxy_thrmgr_print_thr_info(pxy_thr_ctx_t *tctx)
{
log_finest_main_va("thr=%d, load=%lu", tctx->thridx, tctx->load);
unsigned int idx = 1;
evutil_socket_t max_fd = 0;
time_t max_atime = 0;
time_t max_ctime = 0;
char *smsg = NULL;
if (tctx->conns) {
time_t now = time(NULL);
pxy_conn_ctx_t *ctx = tctx->conns;
while (ctx) {
time_t atime = now - ctx->atime;
time_t ctime = now - ctx->ctime;
log_finest_main_va("CONN: thr=%d, id=%u, fd=%d, dst=%d, p=%d-%d, at=%lld ct=%lld, src_addr=%s:%s, dst_addr=%s:%s",
tctx->thridx, idx, ctx->fd, ctx->dst_fd, ctx->src.closed, ctx->dst.closed, (long long)atime, (long long)ctime,
STRORDASH(ctx->srchost_str), STRORDASH(ctx->srcport_str), STRORDASH(ctx->dsthost_str), STRORDASH(ctx->dstport_str));
max_fd = MAX(max_fd, MAX(ctx->fd, ctx->dst_fd));
max_atime = MAX(max_atime, atime);
max_ctime = MAX(max_ctime, ctime);
idx++;
ctx = ctx->next;
}
}
log_finest_main_va("STATS: thr=%d, mld=%zu, mfd=%d, mat=%lld, mct=%lld, iib=%llu, iob=%llu, eib=%llu, eob=%llu, swm=%zu, uwm=%zu, err=%zu, si=%u",
tctx->thridx, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes,
tctx->set_watermarks, tctx->unset_watermarks, tctx->errors, tctx->stats_id);
if (asprintf(&smsg, "STATS: thr=%d, mld=%zu, mfd=%d, mat=%lld, mct=%lld, iib=%llu, iob=%llu, eib=%llu, eob=%llu, swm=%zu, uwm=%zu, err=%zu, si=%u\n",
tctx->thridx, tctx->max_load, tctx->max_fd, (long long)max_atime, (long long)max_ctime, tctx->intif_in_bytes, tctx->intif_out_bytes, tctx->extif_in_bytes, tctx->extif_out_bytes,
tctx->set_watermarks, tctx->unset_watermarks, tctx->errors, tctx->stats_id) < 0) {
return;
}
if (log_stats(smsg) == -1) {
log_err_level_printf(LOG_WARNING, "Stats logging failed\n");
}
free(smsg);
tctx->stats_id++;
tctx->errors = 0;
tctx->set_watermarks = 0;
tctx->unset_watermarks = 0;
tctx->intif_in_bytes = 0;
tctx->intif_out_bytes = 0;
tctx->extif_in_bytes = 0;
tctx->extif_out_bytes = 0;
// Reset these stats with the current values (do not reset to 0 directly, there may be active conns)
tctx->max_fd = max_fd;
tctx->max_load = tctx->load;
}
/*
* Recurring timer event to prevent the event loops from exiting when
* they run out of events.
*/
static void
pxy_thrmgr_timer_cb(UNUSED evutil_socket_t fd, UNUSED short what, UNUSED void *arg)
{
pxy_thr_ctx_t *ctx = arg;
pthread_mutex_lock(&ctx->mutex);
log_finest_main_va("thr=%d, load=%lu, to=%u", ctx->thridx, ctx->load, ctx->timeout_count);
// @attention Print thread info only if stats logging is enabled, if disabled debug logs are not printed either
if (ctx->thrmgr->opts->statslog) {
ctx->timeout_count++;
if (ctx->timeout_count >= ctx->thrmgr->opts->stats_period) {
ctx->timeout_count = 0;
pxy_thrmgr_print_thr_info(ctx);
}
}
pthread_mutex_unlock(&ctx->mutex);
}
/*
* Thread entry point; runs the event loop of the event base.
* Does not exit until the libevent loop is broken explicitly.
*/
static void *
pxy_thrmgr_thr(void *arg)
{
pxy_thr_ctx_t *ctx = arg;
struct timeval timer_delay = {10, 0};
struct event *ev;
ev = event_new(ctx->evbase, -1, EV_PERSIST, pxy_thrmgr_timer_cb, ctx);
if (!ev)
return NULL;
evtimer_add(ev, &timer_delay);
ctx->running = 1;
event_base_dispatch(ctx->evbase);
event_free(ev);
return NULL;
}
/*
* Create new thread manager but do not start any threads yet.
@ -209,11 +88,6 @@ pxy_thrmgr_run(pxy_thrmgr_ctx_t *ctx)
ctx->thr[idx]->thridx = idx;
ctx->thr[idx]->timeout_count = 0;
ctx->thr[idx]->thrmgr = ctx;
if (pthread_mutex_init(&ctx->thr[idx]->mutex, NULL)) {
log_dbg_printf("Failed to initialize thr mutex\n");
goto leave;
}
}
log_dbg_printf("Initialized %d connection handling threads\n",
@ -221,7 +95,7 @@ pxy_thrmgr_run(pxy_thrmgr_ctx_t *ctx)
for (idx = 0; idx < ctx->num_thr; idx++) {
if (pthread_create(&ctx->thr[idx]->thr, NULL,
pxy_thrmgr_thr, ctx->thr[idx]))
pxy_thr, ctx->thr[idx]))
goto leave_thr;
while (!ctx->thr[idx]->running) {
sched_yield();
@ -248,7 +122,6 @@ leave:
if (ctx->thr[idx]->evbase) {
event_base_free(ctx->thr[idx]->evbase);
}
pthread_mutex_destroy(&ctx->thr[idx]->mutex);
free(ctx->thr[idx]);
}
idx--;
@ -278,7 +151,6 @@ pxy_thrmgr_free(pxy_thrmgr_ctx_t *ctx)
if (ctx->thr[idx]->evbase) {
event_base_free(ctx->thr[idx]->evbase);
}
pthread_mutex_destroy(&ctx->thr[idx]->mutex);
free(ctx->thr[idx]);
}
free(ctx->thr);
@ -286,67 +158,6 @@ pxy_thrmgr_free(pxy_thrmgr_ctx_t *ctx)
free(ctx);
}
void
pxy_thrmgr_add_conn(pxy_conn_ctx_t *ctx)
{
pthread_mutex_lock(&ctx->thr->mutex);
if (!ctx->in_thr_conns) {
log_finest("Adding conn");
ctx->in_thr_conns = 1;
// Always keep thr load and conns list in sync
ctx->thr->load++;
ctx->next = ctx->thr->conns;
ctx->thr->conns = ctx;
} else {
// Do not add conns twice
log_finest("Will not add conn twice");
}
pthread_mutex_unlock(&ctx->thr->mutex);
}
static void NONNULL(1)
pxy_thrmgr_remove_conn_unlocked(pxy_conn_ctx_t *ctx)
{
assert(ctx != NULL);
if (ctx->in_thr_conns) {
log_finest("Removing conn");
// Thr conns list cannot be empty, if the in_thr_conns flag of a conn is set
assert(ctx->thr->conns != NULL);
// Shouldn't need to reset the in_thr_conns flag, because the conn ctx will be freed next, but just in case
ctx->in_thr_conns = 0;
// We increment thr load in pxy_thrmgr_add_conn() only
ctx->thr->load--;
// @attention We may get multiple conns with the same fd combinations, so fds cannot uniquely define a conn; hence the need for unique ids.
if (ctx->id == ctx->thr->conns->id) {
ctx->thr->conns = ctx->thr->conns->next;
return;
} else {
pxy_conn_ctx_t *current = ctx->thr->conns->next;
pxy_conn_ctx_t *previous = ctx->thr->conns;
while (current != NULL && previous != NULL) {
if (ctx->id == current->id) {
previous->next = current->next;
return;
}
previous = current;
current = current->next;
}
// This should never happen
log_err_level_printf(LOG_CRIT, "Cannot find conn in thr conns\n");
log_fine_main_va("Cannot find conn in thr conns, id=%llu, fd=%d", ctx->id, ctx->fd);
assert(0);
}
} else {
// This can happen if we are closing the conn after a fatal error before setting its event callback
log_finest("Conn not in thr conns");
}
}
/*
* Attach a new connection to a thread. Chooses the thread with the fewest
* currently active connections, returns the appropriate event bases.
@ -360,51 +171,32 @@ pxy_thrmgr_attach(pxy_conn_ctx_t *ctx)
{
log_finest("ENTER");
int thridx = 0;
size_t minload;
pxy_thrmgr_ctx_t *tmctx = ctx->thrmgr;
pthread_mutex_lock(&tmctx->thr[0]->mutex);
minload = tmctx->thr[0]->load;
pthread_mutex_unlock(&tmctx->thr[0]->mutex);
size_t minload = tmctx->thr[0]->load;
#ifdef DEBUG_THREAD
log_dbg_printf("===> Proxy connection handler thread status:\n"
"thr[0]: %zu\n", minload);
log_dbg_printf("===> Proxy connection handler thread status:\nthr[0]: %zu\n", minload);
#endif /* DEBUG_THREAD */
int thridx = 0;
for (int idx = 1; idx < tmctx->num_thr; idx++) {
pthread_mutex_lock(&tmctx->thr[idx]->mutex);
#ifdef DEBUG_THREAD
log_dbg_printf("thr[%d]: %zu\n", idx, tmctx->thr[idx]->load);
#endif /* DEBUG_THREAD */
if (minload > tmctx->thr[idx]->load) {
minload = tmctx->thr[idx]->load;
size_t thrload = tmctx->thr[idx]->load;
if (minload > thrload) {
minload = thrload;
thridx = idx;
}
pthread_mutex_unlock(&tmctx->thr[idx]->mutex);
#ifdef DEBUG_THREAD
log_dbg_printf("thr[%d]: %zu\n", idx, thrload);
#endif /* DEBUG_THREAD */
}
// Defer adding the conn to the conn list of its thread until after a successful conn setup while returning from pxy_conn_connect()
// otherwise pxy_thrmgr_timer_cb() may try to access the conn ctx while it is being freed on failure (signal 6 crash)
ctx->thr = tmctx->thr[thridx];
ctx->evbase = ctx->thr->evbase;
#ifdef DEBUG_THREAD
log_dbg_printf("thridx: %d\n", thridx);
#endif /* DEBUG_THREAD */
}
/*
* Detach a connection from a thread by index.
* This function cannot fail.
*/
void
pxy_thrmgr_detach(pxy_conn_ctx_t *ctx)
{
pthread_mutex_lock(&ctx->thr->mutex);
log_finest("ENTER");
pxy_thrmgr_remove_conn_unlocked(ctx);
pthread_mutex_unlock(&ctx->thr->mutex);
}
/* vim: set noet ft=c: */

@ -31,51 +31,11 @@
#include "opts.h"
#include "attrib.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <event2/event.h>
#include <event2/dns.h>
#include <pthread.h>
#include "pxythr.h"
extern int descriptor_table_size;
#define FD_RESERVE 10
typedef struct pxy_conn_ctx pxy_conn_ctx_t;
typedef struct pxy_thrmgr_ctx pxy_thrmgr_ctx_t;
typedef struct pxy_thr_ctx {
pthread_t thr;
int thridx;
pxy_thrmgr_ctx_t *thrmgr;
size_t load;
struct event_base *evbase;
int running;
// Per-thread locking is necessary during connection setup and termination
// to prevent multithreading issues between thrmgr thread and conn handling threads
pthread_mutex_t mutex;
// Statistics
evutil_socket_t max_fd;
size_t max_load;
size_t errors;
size_t set_watermarks;
size_t unset_watermarks;
long long unsigned int intif_in_bytes;
long long unsigned int intif_out_bytes;
long long unsigned int extif_in_bytes;
long long unsigned int extif_out_bytes;
// Each stats has an id, incremented on each stats print
unsigned short stats_id;
// Used to print statistics, compared against stats_period
unsigned int timeout_count;
// List of active connections on the thread
pxy_conn_ctx_t *conns;
} pxy_thr_ctx_t;
struct pxy_thrmgr_ctx {
int num_thr;
opts_t *opts;
@ -89,10 +49,7 @@ pxy_thrmgr_ctx_t * pxy_thrmgr_new(opts_t *) MALLOC;
int pxy_thrmgr_run(pxy_thrmgr_ctx_t *) NONNULL(1) WUNRES;
void pxy_thrmgr_free(pxy_thrmgr_ctx_t *) NONNULL(1);
void pxy_thrmgr_add_conn(pxy_conn_ctx_t *) NONNULL(1);
void pxy_thrmgr_attach(pxy_conn_ctx_t *) NONNULL(1);
void pxy_thrmgr_detach(pxy_conn_ctx_t *) NONNULL(1);
#endif /* !PXYTHRMGR_H */

Loading…
Cancel
Save