* loopback test network

* fix various crashes and race conditions

* rename iwp-connect to connect in config

* rename iwp-links to bind in config

* always make a link just for outbound sessions even if no bind section is provided
pull/1/head
Jeff Becker 6 years ago
parent a0f1d548f8
commit 315798a0c4
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

8
.gitignore vendored

@ -26,5 +26,11 @@ callgrind.*
*.signed
*.key
shadow.data
shadow.config.xml
*.log
*.log
testnet_tmp
*.pid

@ -3,6 +3,9 @@ all: debug
SIGN = gpg --sign --detach
REPO := $(shell dirname $(realpath $(lastword $(MAKEFILE_LIST))))
TARGETS = llarpd libllarp.so libllarp-static.a
SIGS = $(TARGETS:=.sig)
@ -11,6 +14,10 @@ SHADOW_BIN=$(SHADOW_ROOT)/bin/shadow
SHADOW_CONFIG=shadow.config.xml
SHADOW_PLUGIN=libshadow-plugin-llarp.so
TESTNET_ROOT=$(REPO)/testnet_tmp
TESTNET_CONF=$(TESTNET_ROOT)/supervisor.conf
TESTNET_LOG=$(TESTNET_ROOT)/testnet.log
clean:
rm -f build.ninja rules.ninja cmake_install.cmake CMakeCache.txt
rm -rf CMakeFiles
@ -47,7 +54,19 @@ shadow-build: shadow-configure
shadow: shadow-build
python3 contrib/shadow/genconf.py $(SHADOW_CONFIG)
bash -c "$(SHADOW_BIN) -w 16 $(SHADOW_CONFIG) &> shadow.log.txt"
bash -c "$(SHADOW_BIN) -w $$(cat /proc/cpuinfo | grep processor | wc -l) $(SHADOW_CONFIG) &> shadow.log.txt"
testnet-configure: clean
cmake -GNinja -DCMAKE_BUILD_TYPE=Debug
testnet-build: testnet-configure
ninja clean
ninja
testnet: testnet-build
mkdir -p $(TESTNET_ROOT)
python3 contrib/testnet/genconf.py --bin=$(REPO)/llarpd --svc=30 --clients=300 --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF)
supervisord -n -d $(TESTNET_ROOT) -l $(TESTNET_LOG) -c $(TESTNET_CONF)
format:
clang-format -i $$(find daemon llarp include | grep -E '\.[h,c](pp)?$$')

@ -0,0 +1,91 @@
#!/usr/bin/env python3
#
# this script generate supervisord configs for running a test network on loopback
#
from argparse import ArgumentParser as AP
from configparser import ConfigParser as CP
import os
svcNodeName = lambda id : 'svc-node-%03d' % id
clientNodeName = lambda id : 'client-node-%03d' % id
def main():
ap = AP()
ap.add_argument('--dir', type=str, default='testnet_tmp')
ap.add_argument('--svc', type=int, default=20, help='number of service nodes')
ap.add_argument('--baseport', type=int, default=19000)
ap.add_argument('--clients', type=int, default=200, help='number of client nodes')
ap.add_argument('--bin', type=str, required=True)
ap.add_argument('--out', type=str, required=True)
ap.add_argument('--connect', type=int, default=5)
args = ap.parse_args()
basedir = os.path.abspath(args.dir)
for nodeid in range(args.svc):
config = CP()
config['bind'] = {
'lo' : str(args.baseport + nodeid)
}
config['netdb'] = {
'dir' : 'netdb'
}
config['connect'] = {}
for otherid in range(args.svc):
if otherid != nodeid:
name = svcNodeName(otherid)
config['connect'][name] = os.path.join(basedir, name, 'rc.signed')
d = os.path.join(args.dir, svcNodeName(nodeid))
if not os.path.exists(d):
os.mkdir(d)
fp = os.path.join(d, 'daemon.ini')
with open(fp, 'w') as f:
config.write(f)
for nodeid in range(args.clients):
config = CP()
config['netdb'] = {
'dir' : 'netdb'
}
config['connect'] = {}
for otherid in range(args.svc):
if otherid % args.connect == 0:
name = svcNodeName(otherid)
config['connect'][name] = os.path.join(basedir, name, 'rc.signed')
d = os.path.join(args.dir, clientNodeName(nodeid))
if not os.path.exists(d):
os.mkdir(d)
fp = os.path.join(d, 'daemon.ini')
with open(fp, 'w') as f:
config.write(f)
with open(args.out, 'w') as f:
f.write('''[program:svc-node]
directory = {}
command = {}
redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
process_name = svc-node-%(process_num)03d
numprocs = {}
'''.format(os.path.join(args.dir, 'svc-node-%(process_num)03d'), args.bin, args.svc))
f.write('''[program:client-node]
directory = {}
command = {}
redirect_stderr=true
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
process_name = client-node-%(process_num)03d
numprocs = {}
'''.format(os.path.join(args.dir, 'client-node-%(process_num)03d'), args.bin, args.clients))
f.write('[supervisord]\ndirectory=.\n')
if __name__ == '__main__':
main()

@ -108,6 +108,8 @@ struct llarp_link_session
void (*close)(struct llarp_link_session *);
/** set session established */
void (*established)(struct llarp_link_session *);
/** get parent link */
struct llarp_link *(*get_parent)(struct llarp_link_session *);
/** get router contact of remote router */
struct llarp_rc *(*get_remote_router)(struct llarp_link_session *);
};

@ -25,9 +25,9 @@ namespace llarp
auto &top = parser.top();
router = find_section(top, "router", section_t{});
network = find_section(top, "network", section_t{});
connect = find_section(top, "iwp-connect", section_t{});
connect = find_section(top, "connect", section_t{});
netdb = find_section(top, "netdb", section_t{});
iwp_links = find_section(top, "iwp-links", section_t{});
iwp_links = find_section(top, "bind", section_t{});
return true;
}
return false;
@ -66,8 +66,8 @@ llarp_config_iter(struct llarp_config *conf, struct llarp_config_iterator *iter)
iter->conf = conf;
std::map< std::string, llarp::Config::section_t & > sections = {
{"network", conf->impl.network},
{"iwp-connect", conf->impl.connect},
{"iwp-links", conf->impl.iwp_links},
{"connect", conf->impl.connect},
{"bind", conf->impl.iwp_links},
{"netdb", conf->impl.netdb}};
for(const auto item : conf->impl.router)

@ -308,7 +308,6 @@ namespace iwp
else // token missmatch
{
session->buf = nullptr;
printf("token miss match\n");
}
}
else // hmac fail

@ -714,6 +714,7 @@ namespace iwp
}
llarp::Zero(&remote_router, sizeof(llarp_rc));
crypto->randbytes(token, 32);
}
~session()
@ -938,11 +939,10 @@ namespace iwp
{
// invalid signature
llarp::Error("introack verify failed from ", link->addr);
link->done();
return;
}
link->EnterState(eIntroAckRecv);
// copy decrypted token
memcpy(link->token, introack->token, 32);
link->session_start();
}
@ -1058,17 +1058,7 @@ namespace iwp
}
static void
handle_verify_intro(iwp_async_intro *intro)
{
session *self = static_cast< session * >(intro->user);
if(!intro->buf)
{
llarp::Error("intro verify failed from ", self->addr);
delete self;
return;
}
self->intro_ack();
}
handle_verify_intro(iwp_async_intro *intro);
static void
handle_introack_generated(iwp_async_introack *i);
@ -1086,9 +1076,8 @@ namespace iwp
// randomize nonce
introack.nonce = introack.buf + 32;
crypto->randbytes(introack.nonce, 32);
// randomize token
// token
introack.token = token;
crypto->randbytes(introack.token, 32);
// keys
introack.remote_pubkey = remote;
@ -1154,6 +1143,9 @@ namespace iwp
iwp_call_async_verify_introack(iwp, &introack);
}
static llarp_link *
get_parent(llarp_link_session *s);
static void
handle_generated_intro(iwp_async_intro *i)
{
@ -1372,6 +1364,7 @@ namespace iwp
s.close = &session::close;
s.get_remote_router = &session::get_remote_router;
s.established = &session::set_established;
s.get_parent = &session::get_parent;
{
lock_t lock(m_sessions_Mutex);
m_sessions[src] = s;
@ -1548,6 +1541,19 @@ namespace iwp
return success;
}
void
session::handle_verify_intro(iwp_async_intro *intro)
{
session *self = static_cast< session * >(intro->user);
if(!intro->buf)
{
llarp::Error("intro verify failed from ", self->addr, " via ",
self->serv->addr);
return;
}
self->intro_ack();
}
void
session::done()
{
@ -1637,6 +1643,13 @@ namespace iwp
return true;
}
llarp_link *
session::get_parent(llarp_link_session *s)
{
session *link = static_cast< session * >(s->impl);
return link->serv->parent;
}
void
session::handle_verify_session_start(iwp_async_session_start *s)
{
@ -1646,7 +1659,6 @@ namespace iwp
// verify fail
// TODO: remove session?
llarp::Warn("session start verify failed from ", self->addr);
self->serv->RemoveSessionByAddr(self->addr);
return;
}
self->send_LIM();
@ -1680,6 +1692,12 @@ namespace iwp
addr->port = link->addr.port();
}
const char *
outboundLink_name()
{
return "OWP";
}
bool
link_configure(struct llarp_link *l, struct llarp_ev_loop *netloop,
const char *ifname, int af, uint16_t port)
@ -1722,6 +1740,8 @@ namespace iwp
return false;
}
}
else
l->name = outboundLink_name;
switch(af)
{
@ -1861,13 +1881,12 @@ namespace iwp
return;
}
link->serv->put_session(link->addr, link);
llarp::Debug("send introack");
llarp::Debug("send introack to ", link->addr, " via ", link->serv->addr);
if(llarp_ev_udp_sendto(link->udp, link->addr, i->buf, i->sz) == -1)
{
llarp::Warn("sendto failed");
return;
}
llarp::Debug("sent");
link->EnterState(eIntroAckSent);
}
else

@ -13,5 +13,5 @@ bool
llarp_link_session_initialized(struct llarp_link_session* s)
{
return s && s->impl && s->sendto && s->timeout && s->close
&& s->get_remote_router && s->established;
&& s->get_remote_router && s->established && s->get_parent;
}

@ -51,16 +51,22 @@ bool
llarp_router::SendToOrQueue(const llarp::RouterID &remote,
std::vector< llarp::ILinkMessage * > msgs)
{
bool has = false;
for(auto &item : links)
llarp_link *chosen = nullptr;
if(!outboundLink->has_session_to(outboundLink, remote))
{
if(!item.second)
continue;
auto link = item.first;
has |= link->has_session_to(link, remote);
for(auto link : inboundLinks)
{
if(link->has_session_to(link, remote))
{
chosen = link;
break;
}
}
}
else
chosen = outboundLink;
if(!has)
if(!chosen)
{
llarp_rc rc;
llarp_rc_clear(&rc);
@ -94,7 +100,7 @@ llarp_router::SendToOrQueue(const llarp::RouterID &remote,
{
outboundMesssageQueue[remote].push(msg);
}
FlushOutboundFor(remote);
FlushOutboundFor(remote, chosen);
return true;
}
@ -152,16 +158,15 @@ llarp_router::EnsureIdentity()
}
void
llarp_router::AddLink(struct llarp_link *link, bool isOutbound)
llarp_router::AddInboundLink(struct llarp_link *link)
{
links.push_back({link, isOutbound});
ready = true;
inboundLinks.push_back(link);
}
bool
llarp_router::Ready()
{
return ready;
return outboundLink != nullptr;
}
bool
@ -194,14 +199,18 @@ llarp_router::SaveRC()
void
llarp_router::Close()
{
for(auto &pair : links)
for(auto link : inboundLinks)
{
auto link = pair.first;
link->stop_link(link);
link->free_impl(link);
delete link;
}
links.clear();
inboundLinks.clear();
outboundLink->stop_link(outboundLink);
outboundLink->free_impl(outboundLink);
delete outboundLink;
outboundLink = nullptr;
}
void
@ -267,7 +276,8 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job)
// this was an outbound establish job
if(ctx->establish_job->session)
{
router->FlushOutboundFor(pubkey);
auto session = ctx->establish_job->session;
router->FlushOutboundFor(pubkey, session->get_parent(session));
}
llarp_rc_free(&job->rc);
delete job;
@ -293,13 +303,7 @@ llarp_router::Tick()
iter.visit = &send_padded_message;
if(sendPadding)
{
for(auto &item : links)
{
if(!item.second)
continue;
auto link = item.first;
link->iter_sessions(link, iter);
}
outboundLink->iter_sessions(outboundLink, iter);
}
}
@ -334,15 +338,15 @@ llarp_router::SendTo(llarp::RouterID remote, llarp::ILinkMessage *msg)
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
bool sent = false;
for(auto &item : links)
bool sent = outboundLink->sendto(outboundLink, remote, buf);
if(!sent)
{
if(!item.second)
continue;
if(!sent)
for(auto link : inboundLinks)
{
auto link = item.first;
sent = link->sendto(link, remote, buf);
if(!sent)
{
sent = link->sendto(link, remote, buf);
}
}
}
}
@ -367,7 +371,8 @@ llarp_router::SessionClosed(const llarp::RouterID &remote)
}
void
llarp_router::FlushOutboundFor(const llarp::RouterID &remote)
llarp_router::FlushOutboundFor(const llarp::RouterID &remote,
llarp_link *chosen)
{
llarp::Debug("Flush outbound for ", remote);
auto itr = outboundMesssageQueue.find(remote);
@ -390,24 +395,10 @@ llarp_router::FlushOutboundFor(const llarp::RouterID &remote)
// set size of message
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
if(!chosen->sendto(chosen, remote, buf))
llarp::Warn("failed to send outboud message to ", remote, " via ",
chosen->name());
llarp::RouterID peer = remote;
bool sent = false;
for(auto &item : links)
{
if(item.second)
{
if(!sent)
{
auto link = item.first;
sent = link->sendto(link, peer, buf);
}
}
}
if(!sent)
{
llarp::Warn("failed to flush outboud message queue for ", remote);
}
delete msg;
itr->second.pop();
}
@ -419,11 +410,8 @@ llarp_router::on_try_connect_result(llarp_link_establish_job *job)
llarp_router *router = static_cast< llarp_router * >(job->user);
if(job->session)
{
delete job;
/*
auto session = job->session;
router->async_verify_RC(session, false, job);
*/
return;
}
llarp::Info("session not established");
@ -456,11 +444,8 @@ llarp_router::Run()
llarp::Zero(&rc, sizeof(llarp_rc));
// fill our address list
rc.addrs = llarp_ai_list_new();
for(auto &item : links)
for(auto link : inboundLinks)
{
if(item.second)
continue;
auto link = item.first;
llarp_ai addr;
link->get_our_address(link, &addr);
llarp_ai_list_pushback(rc.addrs, &addr);
@ -480,24 +465,37 @@ llarp_router::Run()
llarp::Info("our router has public key ", ourPubkey);
llarp_dht_context_start(dht, ourPubkey);
llarp::Debug("starting outbound link");
if(!outboundLink->start_link(outboundLink, logic))
{
llarp::Warn("outbound link failed to start");
}
// start links
for(auto &item : links)
for(auto link : inboundLinks)
{
auto link = item.first;
int result = link->start_link(link, logic);
if(result == -1)
llarp::Warn("Link ", link->name(), " failed to start");
else
if(link->start_link(link, logic))
llarp::Debug("Link ", link->name(), " started");
else
llarp::Warn("Link ", link->name(), " failed to start");
}
for(const auto &itr : connect)
llarp_logic_call_later(logic, {1000, this, &ConnectAll});
ScheduleTicker(500);
}
void
llarp_router::ConnectAll(void *user, uint64_t orig, uint64_t left)
{
if(left)
return;
llarp_router *self = static_cast< llarp_router * >(user);
for(const auto &itr : self->connect)
{
llarp::Info("connecting to node ", itr.first);
try_connect(itr.second);
self->try_connect(itr.second);
}
ScheduleTicker(500);
}
bool
@ -545,6 +543,41 @@ llarp_init_router(struct llarp_threadpool *tp, struct llarp_ev_loop *netloop,
return router;
}
bool
llarp_router::InitOutboundLink()
{
if(outboundLink)
return true;
auto link = new llarp_link;
llarp::Zero(link, sizeof(llarp_link));
llarp_iwp_args args = {
.crypto = &crypto,
.logic = logic,
.cryptoworker = tp,
.router = this,
.keyfile = transport_keyfile.c_str(),
};
auto afs = {AF_INET, AF_INET6};
iwp_link_init(link, args);
if(llarp_link_initialized(link))
{
llarp::Info("outbound link initialized");
for(auto af : afs)
{
if(link->configure(link, netloop, "*", af, 0))
{
outboundLink = link;
llarp::Info("outbound link ready");
return true;
}
}
}
delete link;
llarp::Error("failed to initialize outbound link");
return false;
}
bool
llarp_configure_router(struct llarp_router *router, struct llarp_config *conf)
{
@ -552,6 +585,8 @@ llarp_configure_router(struct llarp_router *router, struct llarp_config *conf)
iter.user = router;
iter.visit = llarp::router_iter_config;
llarp_config_iter(conf, &iter);
if(!router->InitOutboundLink())
return false;
if(!router->Ready())
{
return false;
@ -573,21 +608,16 @@ llarp_router_try_connect(struct llarp_router *router, struct llarp_rc *remote)
llarp_ai addr;
if(llarp_ai_list_index(remote->addrs, 0, &addr))
{
for(auto &item : router->links)
{
if(!item.second)
continue;
auto link = item.first;
llarp_link_establish_job *job = new llarp_link_establish_job;
llarp_ai_copy(&job->ai, &addr);
job->timeout = 10000;
job->result = &llarp_router::on_try_connect_result;
// give router as user pointer
job->user = router;
link->try_establish(link, job);
return true;
}
auto link = router->outboundLink;
llarp_link_establish_job *job = new llarp_link_establish_job;
llarp_ai_copy(&job->ai, &addr);
job->timeout = 10000;
job->result = &llarp_router::on_try_connect_result;
// give router as user pointer
job->user = router;
link->try_establish(link, job);
return true;
}
return false;
}
@ -698,10 +728,10 @@ void
llarp_router_iterate_links(struct llarp_router *router,
struct llarp_router_link_iter i)
{
for(auto item : router->links)
if(item.second)
if(!i.visit(&i, router, item.first))
return;
for(auto link : router->inboundLinks)
if(!i.visit(&i, router, link))
return;
i.visit(&i, router, router->outboundLink);
}
void
@ -742,46 +772,49 @@ namespace llarp
}
struct llarp_link *link = nullptr;
if(StrEq(section, "iwp-links"))
if(StrEq(section, "bind"))
{
link = new llarp_link;
llarp::Zero(link, sizeof(llarp_link));
llarp_iwp_args args = {
.crypto = &self->crypto,
.logic = self->logic,
.cryptoworker = self->tp,
.router = self,
.keyfile = self->transport_keyfile.c_str(),
};
iwp_link_init(link, args);
if(llarp_link_initialized(link))
if(!StrEq(key, "*"))
{
llarp::Info("link ", key, " initialized");
if(link->configure(link, self->netloop, key, af, proto))
link = new llarp_link;
llarp::Zero(link, sizeof(llarp_link));
llarp_iwp_args args = {
.crypto = &self->crypto,
.logic = self->logic,
.cryptoworker = self->tp,
.router = self,
.keyfile = self->transport_keyfile.c_str(),
};
iwp_link_init(link, args);
if(llarp_link_initialized(link))
{
self->AddLink(link, llarp::StrEq(key, "*"));
return;
}
if(af == AF_INET6)
{
// we failed to configure IPv6
// try IPv4
llarp::Info("link ", key, " failed to configure IPv6, trying IPv4");
af = AF_INET;
llarp::Info("link ", key, " initialized");
if(link->configure(link, self->netloop, key, af, proto))
{
llarp_ai ai;
link->get_our_address(link, &ai);
llarp::Addr addr = ai;
self->AddLink(link, llarp::StrEq(key, "*"));
self->AddInboundLink(link);
return;
}
if(af == AF_INET6)
{
// we failed to configure IPv6
// try IPv4
llarp::Info("link ", key, " failed to configure IPv6, trying IPv4");
af = AF_INET;
if(link->configure(link, self->netloop, key, af, proto))
{
llarp_ai ai;
link->get_our_address(link, &ai);
llarp::Addr addr = ai;
self->AddInboundLink(link);
return;
}
}
}
}
llarp::Error("link ", key, " failed to configure");
}
else if(StrEq(section, "iwp-connect"))
else if(StrEq(section, "connect"))
{
self->connect[key] = val;
}

@ -68,7 +68,8 @@ struct llarp_router
llarp::InboundMessageParser inbound_msg_parser;
std::list< std::pair< llarp_link *, bool > > links;
llarp_link *outboundLink = nullptr;
std::list< llarp_link * > inboundLinks;
typedef std::queue< llarp::ILinkMessage * > MessageQueue;
@ -86,7 +87,10 @@ struct llarp_router
HandleRecvLinkMessage(struct llarp_link_session *from, llarp_buffer_t msg);
void
AddLink(struct llarp_link *link, bool isOutbound);
AddInboundLink(struct llarp_link *link);
bool
InitOutboundLink();
void
Close();
@ -97,6 +101,9 @@ struct llarp_router
void
Run();
static void
ConnectAll(void *user, uint64_t orig, uint64_t left);
bool
EnsureIdentity();
@ -125,7 +132,7 @@ struct llarp_router
/// manually flush outbound message queue for just 1 router
void
FlushOutboundFor(const llarp::RouterID &remote);
FlushOutboundFor(const llarp::RouterID &remote, llarp_link *chosen);
/// flush outbound message queue
void

Loading…
Cancel
Save