Create callback functions for set/unset watermark

Watermarking for underlying bevs is for autossl only. The other
protocols should not waste time with it.
pull/48/head
Soner Tari 2 years ago
parent 013814317c
commit 7143102efa

@ -146,6 +146,50 @@ protoautossl_conn_connect(pxy_conn_ctx_t *ctx)
return 0;
}
static void
protoautossl_try_set_watermark(struct bufferevent *bev, pxy_conn_ctx_t *ctx, struct bufferevent *other)
{
struct bufferevent *ubev_other = bufferevent_get_underlying(other);
if (evbuffer_get_length(bufferevent_get_output(other)) >= OUTBUF_LIMIT ||
(ubev_other && evbuffer_get_length(bufferevent_get_output(ubev_other)) >= OUTBUF_LIMIT)) {
log_fine_va("%s", prototcp_get_event_name(bev, ctx));
/* temporarily disable data source;
* set an appropriate watermark. */
bufferevent_setwatermark(other, EV_WRITE, OUTBUF_LIMIT/2, OUTBUF_LIMIT);
bufferevent_disable(bev, EV_READ);
/* The watermark for ubev_other may be already set, see pxy_try_unset_watermark,
* but getting is equally expensive as setting */
if (ubev_other)
bufferevent_setwatermark(ubev_other, EV_WRITE, OUTBUF_LIMIT/2, OUTBUF_LIMIT);
ctx->thr->set_watermarks++;
}
}
static void
protoautossl_try_unset_watermark(struct bufferevent *bev, pxy_conn_ctx_t *ctx, pxy_conn_desc_t *other)
{
if (other->bev && !(bufferevent_get_enabled(other->bev) & EV_READ)) {
log_fine_va("%s", prototcp_get_event_name(bev, ctx));
/* data source temporarily disabled;
* re-enable and reset watermark to 0. */
bufferevent_setwatermark(bev, EV_WRITE, 0, 0);
bufferevent_enable(other->bev, EV_READ);
/* Do not reset the watermark for ubev without checking its buf len,
* because the current write event may be due to the buf len of bev
* falling below OUTBUF_LIMIT/2, not that of ubev */
struct bufferevent *ubev = bufferevent_get_underlying(bev);
if (ubev && evbuffer_get_length(bufferevent_get_output(ubev)) < OUTBUF_LIMIT/2)
bufferevent_setwatermark(ubev, EV_WRITE, 0, 0);
ctx->thr->unset_watermarks++;
}
}
static void NONNULL(1)
protoautossl_bev_readcb_src(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
{
@ -183,7 +227,7 @@ protoautossl_bev_readcb_src(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
return;
}
pxy_try_set_watermark(bev, ctx, ctx->dst.bev);
ctx->protoctx->set_watermarkcb(bev, ctx, ctx->dst.bev);
}
static void NONNULL(1)
@ -206,7 +250,7 @@ protoautossl_bev_readcb_srvdst(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
}
evbuffer_add_buffer(bufferevent_get_output(ctx->src.bev), bufferevent_get_input(bev));
pxy_try_set_watermark(bev, ctx, ctx->src.bev);
ctx->protoctx->set_watermarkcb(bev, ctx, ctx->src.bev);
}
static void NONNULL(1,2)
@ -497,6 +541,9 @@ protoautossl_setup(pxy_conn_ctx_t *ctx)
ctx->protoctx->classify_usercb = protoautossl_classify_user;
#endif /* !WITHOUT_USERAUTH */
ctx->protoctx->set_watermarkcb = protoautossl_try_set_watermark;
ctx->protoctx->unset_watermarkcb = protoautossl_try_unset_watermark;
ctx->protoctx->arg = malloc(sizeof(protoautossl_ctx_t));
if (!ctx->protoctx->arg) {
return PROTO_ERROR;
@ -523,6 +570,9 @@ protoautossl_setup_child(pxy_conn_child_ctx_t *ctx)
ctx->protoctx->bev_writecb = prototcp_bev_writecb_child;
ctx->protoctx->bev_eventcb = protoautossl_bev_eventcb_child;
ctx->protoctx->set_watermarkcb = protoautossl_try_set_watermark;
ctx->protoctx->unset_watermarkcb = protoautossl_try_unset_watermark;
return PROTO_AUTOSSL;
}

@ -858,7 +858,7 @@ protohttp_bev_readcb_src(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
}
}
pxy_try_set_watermark(bev, ctx, ctx->dst.bev);
ctx->protoctx->set_watermarkcb(bev, ctx, ctx->dst.bev);
}
/*
@ -1001,7 +1001,7 @@ protohttp_bev_readcb_dst(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
log_finest_va("HTTP Response Body, size=%zu", evbuffer_get_length(inbuf));
evbuffer_add_buffer(outbuf, inbuf);
}
pxy_try_set_watermark(bev, ctx, ctx->src.bev);
ctx->protoctx->set_watermarkcb(bev, ctx, ctx->src.bev);
}
static void NONNULL(1)
@ -1034,7 +1034,7 @@ protohttp_bev_readcb_src_child(struct bufferevent *bev, pxy_conn_child_ctx_t *ct
log_finest_va("HTTP Request Body, size=%zu", evbuffer_get_length(inbuf));
evbuffer_add_buffer(outbuf, inbuf);
}
pxy_try_set_watermark(bev, ctx->conn, ctx->dst.bev);
ctx->protoctx->set_watermarkcb(bev, ctx->conn, ctx->dst.bev);
}
static void NONNULL(1)
@ -1062,7 +1062,7 @@ protohttp_bev_readcb_dst_child(struct bufferevent *bev, pxy_conn_child_ctx_t *ct
log_finest_va("HTTP Response Body, size=%zu", evbuffer_get_length(inbuf));
evbuffer_add_buffer(outbuf, inbuf);
}
pxy_try_set_watermark(bev, ctx->conn, ctx->src.bev);
ctx->protoctx->set_watermarkcb(bev, ctx->conn, ctx->src.bev);
}
static void NONNULL(1)
@ -1133,7 +1133,7 @@ protohttp_bev_writecb_src(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
}
return;
}
pxy_try_unset_watermark(bev, ctx, &ctx->dst);
ctx->protoctx->unset_watermarkcb(bev, ctx, &ctx->dst);
}
static void NONNULL(1)

@ -154,7 +154,7 @@ protopassthrough_bev_readcb_src(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
}
evbuffer_add_buffer(bufferevent_get_output(ctx->srvdst.bev), bufferevent_get_input(bev));
pxy_try_set_watermark(bev, ctx, ctx->srvdst.bev);
ctx->protoctx->set_watermarkcb(bev, ctx, ctx->srvdst.bev);
}
static void NONNULL(1)
@ -169,7 +169,7 @@ protopassthrough_bev_readcb_srvdst(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
}
evbuffer_add_buffer(bufferevent_get_output(ctx->src.bev), bufferevent_get_input(bev));
pxy_try_set_watermark(bev, ctx, ctx->src.bev);
ctx->protoctx->set_watermarkcb(bev, ctx, ctx->src.bev);
}
static void NONNULL(1)
@ -191,7 +191,7 @@ protopassthrough_bev_writecb_src(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
}
return;
}
pxy_try_unset_watermark(bev, ctx, &ctx->srvdst);
ctx->protoctx->unset_watermarkcb(bev, ctx, &ctx->srvdst);
}
static void NONNULL(1)
@ -206,7 +206,7 @@ protopassthrough_bev_writecb_srvdst(struct bufferevent *bev, pxy_conn_ctx_t *ctx
}
return;
}
pxy_try_unset_watermark(bev, ctx, &ctx->src);
ctx->protoctx->unset_watermarkcb(bev, ctx, &ctx->src);
}
static void NONNULL(1,2)

@ -209,7 +209,7 @@ protosmtp_bev_readcb_srvdst(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
}
evbuffer_add_buffer(outbuf, inbuf);
pxy_try_set_watermark(bev, ctx, ctx->src.bev);
ctx->protoctx->set_watermarkcb(bev, ctx, ctx->src.bev);
}
static void NONNULL(1,2)

@ -222,6 +222,62 @@ prototcp_init_conn(UNUSED evutil_socket_t fd, UNUSED short what, void *arg)
pxy_conn_connect(ctx);
}
#ifdef DEBUG_PROXY
char *bev_names[] = {
"src",
"dst",
"srvdst",
"NULL",
"UNKWN"
};
char *
prototcp_get_event_name(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
{
if (bev == ctx->src.bev) {
return bev_names[0];
} else if (bev == ctx->dst.bev) {
return bev_names[1];
} else if (bev == ctx->srvdst.bev) {
return bev_names[2];
} else if (bev == NULL) {
log_fine("event_name=NULL");
return bev_names[3];
} else {
log_fine("event_name=UNKWN");
return bev_names[4];
}
}
#endif /* DEBUG_PROXY */
void
prototcp_try_set_watermark(struct bufferevent *bev, pxy_conn_ctx_t *ctx, struct bufferevent *other)
{
if (evbuffer_get_length(bufferevent_get_output(other)) >= OUTBUF_LIMIT) {
log_fine_va("%s", prototcp_get_event_name(bev, ctx));
/* temporarily disable data source;
* set an appropriate watermark. */
bufferevent_setwatermark(other, EV_WRITE, OUTBUF_LIMIT/2, OUTBUF_LIMIT);
bufferevent_disable(bev, EV_READ);
ctx->thr->set_watermarks++;
}
}
void
prototcp_try_unset_watermark(struct bufferevent *bev, pxy_conn_ctx_t *ctx, pxy_conn_desc_t *other)
{
if (other->bev && !(bufferevent_get_enabled(other->bev) & EV_READ)) {
log_fine_va("%s", prototcp_get_event_name(bev, ctx));
/* data source temporarily disabled;
* re-enable and reset watermark to 0. */
bufferevent_setwatermark(bev, EV_WRITE, 0, 0);
bufferevent_enable(other->bev, EV_READ);
ctx->thr->unset_watermarks++;
}
}
#ifndef WITHOUT_USERAUTH
int
prototcp_try_send_userauth_msg(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
@ -297,7 +353,7 @@ prototcp_bev_readcb_src(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
return;
}
pxy_try_set_watermark(bev, ctx, ctx->dst.bev);
ctx->protoctx->set_watermarkcb(bev, ctx, ctx->dst.bev);
}
void
@ -311,7 +367,7 @@ prototcp_bev_readcb_dst(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
}
evbuffer_add_buffer(bufferevent_get_output(ctx->src.bev), bufferevent_get_input(bev));
pxy_try_set_watermark(bev, ctx, ctx->src.bev);
ctx->protoctx->set_watermarkcb(bev, ctx, ctx->src.bev);
}
static void NONNULL(1)
@ -350,7 +406,7 @@ prototcp_bev_readcb_src_child(struct bufferevent *bev, pxy_conn_child_ctx_t *ctx
} else {
evbuffer_add_buffer(outbuf, inbuf);
}
pxy_try_set_watermark(bev, ctx->conn, ctx->dst.bev);
ctx->protoctx->set_watermarkcb(bev, ctx->conn, ctx->dst.bev);
}
static void NONNULL(1)
@ -364,7 +420,7 @@ prototcp_bev_readcb_dst_child(struct bufferevent *bev, pxy_conn_child_ctx_t *ctx
}
evbuffer_add_buffer(bufferevent_get_output(ctx->src.bev), bufferevent_get_input(bev));
pxy_try_set_watermark(bev, ctx->conn, ctx->src.bev);
ctx->protoctx->set_watermarkcb(bev, ctx->conn, ctx->src.bev);
}
#ifndef WITHOUT_USERAUTH
@ -425,7 +481,7 @@ prototcp_bev_writecb_src(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
}
return;
}
pxy_try_unset_watermark(bev, ctx, &ctx->dst);
ctx->protoctx->unset_watermarkcb(bev, ctx, &ctx->dst);
}
void
@ -440,7 +496,7 @@ prototcp_bev_writecb_dst(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
}
return;
}
pxy_try_unset_watermark(bev, ctx, &ctx->src);
ctx->protoctx->unset_watermarkcb(bev, ctx, &ctx->src);
}
static void NONNULL(1)
@ -455,7 +511,7 @@ prototcp_bev_writecb_src_child(struct bufferevent *bev, pxy_conn_child_ctx_t *ct
}
return;
}
pxy_try_unset_watermark(bev, ctx->conn, &ctx->dst);
ctx->protoctx->unset_watermarkcb(bev, ctx->conn, &ctx->dst);
}
static void NONNULL(1)
@ -470,7 +526,7 @@ prototcp_bev_writecb_dst_child(struct bufferevent *bev, pxy_conn_child_ctx_t *ct
}
return;
}
pxy_try_unset_watermark(bev, ctx->conn, &ctx->src);
ctx->protoctx->unset_watermarkcb(bev, ctx->conn, &ctx->src);
}
int
@ -896,6 +952,9 @@ prototcp_setup(pxy_conn_ctx_t *ctx)
ctx->protoctx->classify_usercb = pxy_classify_user;
#endif /* !WITHOUT_USERAUTH */
ctx->protoctx->set_watermarkcb = prototcp_try_set_watermark;
ctx->protoctx->unset_watermarkcb = prototcp_try_unset_watermark;
return PROTO_TCP;
}
@ -909,6 +968,9 @@ prototcp_setup_child(pxy_conn_child_ctx_t *ctx)
ctx->protoctx->bev_writecb = prototcp_bev_writecb_child;
ctx->protoctx->bev_eventcb = prototcp_bev_eventcb_child;
ctx->protoctx->set_watermarkcb = prototcp_try_set_watermark;
ctx->protoctx->unset_watermarkcb = prototcp_try_unset_watermark;
return PROTO_TCP;
}

@ -32,8 +32,20 @@
#include "pxyconn.h"
/*
* Maximum size of data to buffer per connection direction before
* temporarily stopping to read data from the other end.
*/
#define OUTBUF_LIMIT (128*1024)
void prototcp_init_conn(evutil_socket_t, short, void *);
#ifdef DEBUG_PROXY
char *prototcp_get_event_name(struct bufferevent *, pxy_conn_ctx_t *) NONNULL(2);
#endif /* DEBUG_PROXY */
void prototcp_try_set_watermark(struct bufferevent *, pxy_conn_ctx_t *, struct bufferevent *) NONNULL(1,2,3);
void prototcp_try_unset_watermark(struct bufferevent *, pxy_conn_ctx_t *, pxy_conn_desc_t *) NONNULL(1,2,3);
#ifndef WITHOUT_USERAUTH
int prototcp_try_send_userauth_msg(struct bufferevent *, pxy_conn_ctx_t *) NONNULL(1,2);
int prototcp_try_close_unauth_conn(struct bufferevent *, pxy_conn_ctx_t *) NONNULL(1,2);

@ -67,12 +67,6 @@
#include <net/if_dl.h>
#endif /* __OpenBSD__ */
/*
* Maximum size of data to buffer per connection direction before
* temporarily stopping to read data from the other end.
*/
#define OUTBUF_LIMIT (128*1024)
// getdtablecount() returns int, hence we don't use size_t here
int descriptor_table_size = 0;
@ -814,78 +808,6 @@ pxy_malloc_packet(size_t sz, pxy_conn_ctx_t *ctx)
return packet;
}
#ifdef DEBUG_PROXY
char *bev_names[] = {
"src",
"dst",
"srvdst",
"NULL",
"UNKWN"
};
static char *
pxy_get_event_name(struct bufferevent *bev, pxy_conn_ctx_t *ctx)
{
if (bev == ctx->src.bev) {
return bev_names[0];
} else if (bev == ctx->dst.bev) {
return bev_names[1];
} else if (bev == ctx->srvdst.bev) {
return bev_names[2];
} else if (bev == NULL) {
log_fine("event_name=NULL");
return bev_names[3];
} else {
log_fine("event_name=UNKWN");
return bev_names[4];
}
}
#endif /* DEBUG_PROXY */
void
pxy_try_set_watermark(struct bufferevent *bev, pxy_conn_ctx_t *ctx, struct bufferevent *other)
{
struct bufferevent *ubev_other = bufferevent_get_underlying(other);
if (evbuffer_get_length(bufferevent_get_output(other)) >= OUTBUF_LIMIT ||
(ubev_other && evbuffer_get_length(bufferevent_get_output(ubev_other)) >= OUTBUF_LIMIT)) {
log_fine_va("%s", pxy_get_event_name(bev, ctx));
/* temporarily disable data source;
* set an appropriate watermark. */
bufferevent_setwatermark(other, EV_WRITE, OUTBUF_LIMIT/2, OUTBUF_LIMIT);
bufferevent_disable(bev, EV_READ);
/* The watermark for ubev_other may be already set, see pxy_try_unset_watermark,
* but getting is equally expensive as setting */
if (ubev_other)
bufferevent_setwatermark(ubev_other, EV_WRITE, OUTBUF_LIMIT/2, OUTBUF_LIMIT);
ctx->thr->set_watermarks++;
}
}
void
pxy_try_unset_watermark(struct bufferevent *bev, pxy_conn_ctx_t *ctx, pxy_conn_desc_t *other)
{
if (other->bev && !(bufferevent_get_enabled(other->bev) & EV_READ)) {
log_fine_va("%s", pxy_get_event_name(bev, ctx));
/* data source temporarily disabled;
* re-enable and reset watermark to 0. */
bufferevent_setwatermark(bev, EV_WRITE, 0, 0);
bufferevent_enable(other->bev, EV_READ);
/* Do not reset the watermark for ubev without checking its buf len,
* because the current write event may be due to the buf len of bev
* falling below OUTBUF_LIMIT/2, not that of ubev */
struct bufferevent *ubev = bufferevent_get_underlying(bev);
if (ubev && evbuffer_get_length(bufferevent_get_output(ubev)) < OUTBUF_LIMIT/2)
bufferevent_setwatermark(ubev, EV_WRITE, 0, 0);
ctx->thr->unset_watermarks++;
}
}
void
pxy_discard_inbuf(struct bufferevent *bev)
{

@ -64,6 +64,7 @@
#define PROTOERROR_MSG "Connection is terminated due to protocol error\r\n"
#define PROTOERROR_MSG_LEN strlen(PROTOERROR_MSG)
typedef struct pxy_conn_desc pxy_conn_desc_t;
typedef struct pxy_conn_child_ctx pxy_conn_child_ctx_t;
typedef void (*init_conn_func_t)(evutil_socket_t, short, void *);
@ -83,6 +84,9 @@ typedef void (*proto_classify_user_func_t)(pxy_conn_ctx_t *);
typedef void (*child_connect_func_t)(pxy_conn_child_ctx_t *);
typedef void (*child_proto_free_func_t)(pxy_conn_child_ctx_t *);
typedef void (*set_watermark_func_t)(struct bufferevent *, pxy_conn_ctx_t *, struct bufferevent *);
typedef void (*unset_watermark_func_t)(struct bufferevent *, pxy_conn_ctx_t *, pxy_conn_desc_t *);
typedef filter_action_t * (*proto_filter_func_t)(pxy_conn_ctx_t *, filter_list_t *);
/*
@ -93,12 +97,12 @@ typedef filter_action_t * (*proto_filter_func_t)(pxy_conn_ctx_t *, filter_list_t
*/
/* single socket bufferevent descriptor */
typedef struct pxy_conn_desc {
struct pxy_conn_desc {
struct bufferevent *bev;
SSL *ssl;
unsigned int closed : 1;
bev_free_func_t free;
} pxy_conn_desc_t;
};
enum conn_type {
CONN_TYPE_PARENT = 0,
@ -167,6 +171,9 @@ struct proto_ctx {
proto_classify_user_func_t classify_usercb;
#endif /* !WITHOUT_USERAUTH */
set_watermark_func_t set_watermarkcb;
unset_watermark_func_t unset_watermarkcb;
// For protocol specific fields, if any
void *arg;
};
@ -182,6 +189,9 @@ struct proto_child_ctx {
child_proto_free_func_t proto_free;
set_watermark_func_t set_watermarkcb;
unset_watermark_func_t unset_watermarkcb;
// For protocol specific fields, if any
void *arg;
};
@ -410,9 +420,6 @@ unsigned char *pxy_malloc_packet(size_t, pxy_conn_ctx_t *) MALLOC NONNULL(2) WUN
int pxy_try_prepend_sslproxy_header(pxy_conn_ctx_t *ctx, struct evbuffer *, struct evbuffer *) NONNULL(1,2,3);
void pxy_try_remove_sslproxy_header(pxy_conn_child_ctx_t *, unsigned char *, size_t *) NONNULL(1,2,3);
void pxy_try_set_watermark(struct bufferevent *, pxy_conn_ctx_t *, struct bufferevent *) NONNULL(1,2,3);
void pxy_try_unset_watermark(struct bufferevent *, pxy_conn_ctx_t *, pxy_conn_desc_t *) NONNULL(1,2,3);
int pxy_try_close_conn_end(pxy_conn_desc_t *, pxy_conn_ctx_t *) NONNULL(1,2);
void pxy_try_disconnect(pxy_conn_ctx_t *, pxy_conn_desc_t *, pxy_conn_desc_t *, int) NONNULL(1,2,3);

Loading…
Cancel
Save