@ -24,7 +24,7 @@ namespace llarp
std : : shared_ptr < link : : Connection >
Endpoint : : get_conn ( const RemoteRC & rc ) const
{
if ( auto itr = conns. find ( rc . router_id ( ) ) ; itr ! = conns. end ( ) )
if ( auto itr = active_ conns. find ( rc . router_id ( ) ) ; itr ! = active_ conns. end ( ) )
return itr - > second ;
return nullptr ;
@ -33,7 +33,7 @@ namespace llarp
std : : shared_ptr < link : : Connection >
Endpoint : : get_conn ( const RouterID & rid ) const
{
if ( auto itr = conns. find ( rid ) ; itr ! = conns. end ( ) )
if ( auto itr = active_ conns. find ( rid ) ; itr ! = active_ conns. end ( ) )
return itr - > second ;
return nullptr ;
@ -42,7 +42,7 @@ namespace llarp
bool
Endpoint : : have_conn ( const RouterID & remote , bool client_only ) const
{
if ( auto itr = conns. find ( remote ) ; itr ! = conns. end ( ) )
if ( auto itr = active_ conns. find ( remote ) ; itr ! = active_ conns. end ( ) )
{
if ( not ( itr - > second - > remote_is_relay and client_only ) )
return true ;
@ -56,7 +56,7 @@ namespace llarp
{
size_t count = 0 ;
for ( const auto & c : conns)
for ( const auto & c : active_ conns)
{
if ( not ( c . second - > remote_is_relay and clients_only ) )
count + = 1 ;
@ -68,9 +68,9 @@ namespace llarp
bool
Endpoint : : get_random_connection ( RemoteRC & router ) const
{
if ( const auto size = conns. size ( ) ; size )
if ( const auto size = active_ conns. size ( ) ; size )
{
auto itr = conns. begin ( ) ;
auto itr = active_ conns. begin ( ) ;
std : : advance ( itr , randint ( ) % size ) ;
RouterID rid { itr - > second - > conn - > remote_key ( ) } ;
@ -91,7 +91,7 @@ namespace llarp
void
Endpoint : : for_each_connection ( std : : function < void ( link : : Connection & ) > func )
{
for ( const auto & [ rid , conn ] : conns)
for ( const auto & [ rid , conn ] : active_ conns)
func ( * conn ) ;
}
@ -99,14 +99,14 @@ namespace llarp
Endpoint : : close_connection ( RouterID _rid )
{
assert ( link_manager . _router . loop ( ) - > inEventLoop ( ) ) ;
auto itr = conns. find ( _rid ) ;
if ( itr ! = conns. end ( ) )
auto itr = active_ conns. find ( _rid ) ;
if ( itr ! = active_ conns. end ( ) )
return ;
auto & conn = * itr - > second - > conn ;
conn . close_connection ( ) ;
connid_map . erase ( conn . scid ( ) ) ;
conns. erase ( itr ) ;
active_ conns. erase ( itr ) ;
}
} // namespace link
@ -125,8 +125,22 @@ namespace llarp
void
LinkManager : : register_commands ( std : : shared_ptr < oxen : : quic : : BTRequestStream > & s )
{
assert ( ep . connid_map . count ( s - > conn_id ( ) ) ) ;
const RouterID & router_id = ep . connid_map [ s - > conn_id ( ) ] ;
log : : critical ( logcat , " {} called " , __PRETTY_FUNCTION__ ) ;
const RouterID & router_id { s - > conn . remote_key ( ) } ;
log : : critical ( logcat , " Registering commands (RID:{}) " , router_id ) ;
s - > register_command ( " bfetch_rcs " s , [ this ] ( oxen : : quic : : message m ) {
_router . loop ( ) - > call (
[ this , msg = std : : move ( m ) ] ( ) mutable { handle_fetch_bootstrap_rcs ( std : : move ( msg ) ) ; } ) ;
} ) ;
// s->register_command("bfetch_rcs"s, [this](oxen::quic::message m) {
// _router.loop()->call(
// [this, msg = std::move(m)]() mutable { handle_fetch_bootstrap_rcs(std::move(msg)); });
// });
log : : critical ( logcat , " Registered `bfetch_rcs` (RID:{}) " , router_id ) ;
s - > register_command ( " path_build " s , [ this , rid = router_id ] ( oxen : : quic : : message m ) {
_router . loop ( ) - > call (
@ -143,11 +157,6 @@ namespace llarp
[ this , msg = std : : move ( m ) ] ( ) mutable { handle_gossip_rc ( std : : move ( msg ) ) ; } ) ;
} ) ;
s - > register_command ( " bfetch_rcs " s , [ this ] ( oxen : : quic : : message m ) {
_router . loop ( ) - > call (
[ this , msg = std : : move ( m ) ] ( ) mutable { handle_fetch_bootstrap_rcs ( std : : move ( msg ) ) ; } ) ;
} ) ;
for ( auto & method : direct_requests )
{
s - > register_command (
@ -162,6 +171,24 @@ namespace llarp
} ) ;
} ) ;
}
log : : critical ( logcat , " Registered all commands! (RID:{}) " , router_id ) ;
}
LinkManager : : LinkManager ( Router & r )
: _router { r }
, quic { std : : make_unique < oxen : : quic : : Network > ( ) }
, tls_creds { oxen : : quic : : GNUTLSCreds : : make_from_ed_keys (
{ reinterpret_cast < const char * > ( _router . identity ( ) . data ( ) ) , size_t { 32 } } ,
{ reinterpret_cast < const char * > ( _router . identity ( ) . toPublic ( ) . data ( ) ) , size_t { 32 } } ) }
, ep { startup_endpoint ( ) , * this }
{ }
std : : unique_ptr < LinkManager >
LinkManager : : make ( Router & r )
{
std : : unique_ptr < LinkManager > p { new LinkManager ( r ) } ;
return p ;
}
std : : shared_ptr < oxen : : quic : : Endpoint >
@ -184,25 +211,26 @@ namespace llarp
} ,
[ this ] ( oxen : : quic : : dgram_interface & di , bstring dgram ) { recv_data_message ( di , dgram ) ; } ) ;
tls_creds - > set_key_verify_callback ( [ this ] ( const ustring_view & key , const ustring_view & ) {
bool result = fals e;
bool result = tru e;
RouterID other { key . data ( ) } ;
if ( _router . is_bootstrap_seed ( ) )
{
if ( node_db - > whitelist ( ) . count ( other ) )
// FIXME: remove "|| true", this is just for local testing!
if ( node_db - > whitelist ( ) . count ( other ) | | true )
{
log : : critical ( logcat , " Saving bootstrap seed requester... " ) ;
auto [ it , b ] = node_db - > seeds ( ) . emplace ( other ) ;
result & = b ;
result | = b ;
}
log : : critical (
logcat ,
" Bootstrap seed node was {} to confirm fetch requester is white-listed; saving RID" ,
result ? " able " : " unable " );
" Bootstrap seed node was {} to confirm fetch requester is white-listed; {}successfully saved RID" ,
result ? " able " : " unable " , result ? " " : " un " );
return result ;
}
if ( node_db - > has_rc ( other ) )
result = true ;
result = node_db - > has_rc ( other ) ;
log : : critical (
logcat , " {}uccessfully verified connection to {}! " , result ? " S " : " Uns " , other ) ;
@ -215,8 +243,10 @@ namespace llarp
[ & ] ( oxen : : quic : : Connection & c ,
oxen : : quic : : Endpoint & e ,
std : : optional < int64_t > id ) - > std : : shared_ptr < oxen : : quic : : Stream > {
if ( id & & id = = 0 )
if ( id & & * id = = 0 )
{
log : : critical ( logcat , " Stream constructor constructing BTStream (ID:{}) " , id ) ;
auto s = e . make_shared < oxen : : quic : : BTRequestStream > (
c , e , [ ] ( oxen : : quic : : Stream & s , uint64_t error_code ) {
log : : warning (
@ -229,28 +259,133 @@ namespace llarp
return s ;
}
log : : critical ( logcat , " Stream constructor constructing Stream (ID:{})! " , id ) ;
return e . make_shared < oxen : : quic : : Stream > ( c , e ) ;
} ) ;
}
return ep ;
}
LinkManager : : LinkManager ( Router & r )
: _router { r }
, quic { std : : make_unique < oxen : : quic : : Network > ( ) }
, tls_creds { oxen : : quic : : GNUTLSCreds : : make_from_ed_keys (
{ reinterpret_cast < const char * > ( _router . identity ( ) . data ( ) ) , size_t { 32 } } ,
{ reinterpret_cast < const char * > ( _router . identity ( ) . toPublic ( ) . data ( ) ) , size_t { 32 } } ) }
, ep { startup_endpoint ( ) , * this }
{ }
void
LinkManager : : on_inbound_conn ( oxen : : quic : : connection_interface & ci )
{
const auto & scid = ci . scid ( ) ;
RouterID rid { ci . remote_key ( ) } ;
ep . connid_map . emplace ( scid , rid ) ;
auto [ itr , b ] = ep . active_conns . emplace ( rid , nullptr ) ;
std : : unique_ptr < LinkManager >
LinkManager : : make ( Router & r )
log : : critical ( logcat , " Queueing BTStream to be opened... " ) ;
auto control_stream = ci . queue_stream < oxen : : quic : : BTRequestStream > ( [ ] ( oxen : : quic : : Stream & s ,
uint64_t error_code ) {
log : : warning (
logcat , " BTRequestStream closed unexpectedly (ec:{}); closing connection... " , error_code ) ;
s . conn . close_connection ( error_code ) ;
} ) ;
log : : critical ( logcat , " Queued BTStream to be opened ID:{} " , control_stream - > stream_id ( ) ) ;
assert ( control_stream - > stream_id ( ) = = 0 ) ;
// register_commands(control_stream);
itr - > second = std : : make_shared < link : : Connection > ( ci . shared_from_this ( ) , control_stream ) ;
log : : critical ( logcat , " Successfully configured inbound connection fom {}... " , rid ) ;
}
// TODO: should we add routes here now that Router::SessionOpen is gone?
void
LinkManager : : on_conn_open ( oxen : : quic : : connection_interface & ci )
{
std : : unique_ptr < LinkManager > p { new LinkManager ( r ) } ;
return p ;
_router . loop ( ) - > call ( [ this , & conn_interface = ci ] ( ) {
const auto rid = RouterID { conn_interface . remote_key ( ) } ;
const auto & remote = conn_interface . remote ( ) ;
const auto & scid = conn_interface . scid ( ) ;
if ( conn_interface . is_inbound ( ) )
{
log : : critical ( logcat , " Inbound connection fom {} (remote:{}) " , rid , remote ) ;
on_inbound_conn ( conn_interface ) ;
}
else
{
if ( auto itr = ep . pending_conns . find ( rid ) ; itr ! = ep . pending_conns . end ( ) )
{
ep . connid_map . emplace ( scid , rid ) ;
auto [ it , b ] = ep . active_conns . emplace ( rid , nullptr ) ;
it - > second = std : : move ( itr - > second ) ;
log : : critical ( logcat , " Connection to RID:{} moved from pending to active conns! " , rid ) ;
}
else
throw std : : runtime_error { " Could not find newly established connection in pending conns! " } ;
}
log : : critical (
logcat ,
" SERVICE NODE (RID:{}) ESTABLISHED CONNECTION TO RID:{} " ,
_router . local_rid ( ) ,
rid ) ;
// check to see if this connection was established while we were attempting to queue
// messages to the remote
if ( auto itr = pending_conn_msg_queue . find ( rid ) ; itr ! = pending_conn_msg_queue . end ( ) )
{
log : : critical ( logcat , " Clearing pending queue for RID:{} " , rid ) ;
auto & que = itr - > second ;
while ( not que . empty ( ) )
{
auto & msg = que . front ( ) ;
if ( msg . is_control )
{
log : : critical ( logcat , " Dispatching {} request! " , * msg . endpoint ) ;
ep . active_conns [ rid ] - > control_stream - > command (
std : : move ( * msg . endpoint ) , std : : move ( msg . body ) , std : : move ( msg . func ) ) ;
}
else
{
conn_interface . send_datagram ( std : : move ( msg . body ) ) ;
}
que . pop_front ( ) ;
}
return ;
}
log : : warning ( logcat , " No pending queue to clear for RID:{} " , rid ) ;
} ) ;
} ;
void
LinkManager : : on_conn_closed ( oxen : : quic : : connection_interface & ci , uint64_t ec )
{
_router . loop ( ) - > call ( [ this , & conn_interface = ci , error_code = ec ] ( ) {
const auto & scid = conn_interface . scid ( ) ;
log : : critical ( quic_cat , " Purging quic connection CID:{} (ec: {}) " , scid , error_code ) ;
if ( const auto & c_itr = ep . connid_map . find ( scid ) ; c_itr ! = ep . connid_map . end ( ) )
{
const auto & rid = c_itr - > second ;
// if (auto maybe = rids_pending_verification.find(rid);
// maybe != rids_pending_verification.end())
// rids_pending_verification.erase(maybe);
// in case this didn't clear earlier, do it now
if ( auto p_itr = pending_conn_msg_queue . find ( rid ) ; p_itr ! = pending_conn_msg_queue . end ( ) )
pending_conn_msg_queue . erase ( p_itr ) ;
if ( auto m_itr = ep . active_conns . find ( rid ) ; m_itr ! = ep . active_conns . end ( ) )
ep . active_conns . erase ( m_itr ) ;
ep . connid_map . erase ( c_itr ) ;
log : : critical ( quic_cat , " Quic connection CID:{} purged successfully " , scid ) ;
}
} ) ;
}
bool
LinkManager : : send_control_message (
const RouterID & remote ,
@ -293,11 +428,23 @@ namespace llarp
body = std : : move ( body ) ,
f = std : : move ( func ) ] ( ) {
auto pending = PendingMessage ( std : : move ( body ) , std : : move ( endpoint ) , std : : move ( f ) ) ;
if ( auto it1 = ep . pending_conns . find ( remote ) ; it1 ! = ep . pending_conns . end ( ) )
{
if ( auto it2 = pending_conn_msg_queue . find ( remote ) ; it2 ! = pending_conn_msg_queue . end ( ) )
{
it2 - > second . push_back ( std : : move ( pending ) ) ;
log : : critical ( logcat , " Connection (RID:{}) is pending; message appended to send queue! " , remote ) ;
}
}
else
{
log : : critical ( logcat , " Connection (RID:{}) not found in pending conns; creating send queue! " , remote ) ;
auto [ itr , b ] = pending_conn_msg_queue . emplace ( remote , MessageQueue ( ) ) ;
itr - > second . push_back ( std : : move ( pending ) ) ;
connect_to ( remote ) ;
}
auto [ itr , b ] = pending_conn_msg_queue . emplace ( remote , MessageQueue ( ) ) ;
itr - > second . push_back ( std : : move ( pending ) ) ;
connect_to ( remote ) ;
} ) ;
return false ;
@ -386,106 +533,6 @@ namespace llarp
log : : warning ( quic_cat , " Failed to begin establishing connection to {} " , remote_addr ) ;
}
void
LinkManager : : on_inbound_conn ( oxen : : quic : : connection_interface & ci )
{
const auto & scid = ci . scid ( ) ;
RouterID rid { ci . remote_key ( ) } ;
ep . connid_map . emplace ( scid , rid ) ;
auto [ itr , b ] = ep . conns . emplace ( rid , nullptr ) ;
auto control_stream = ci . queue_stream < oxen : : quic : : BTRequestStream > ( [ ] ( oxen : : quic : : Stream & s ,
uint64_t error_code ) {
log : : warning (
logcat , " BTRequestStream closed unexpectedly (ec:{}); closing connection... " , error_code ) ;
s . conn . close_connection ( error_code ) ;
} ) ;
log : : critical ( logcat , " Opened BTStream ID:{} " , control_stream - > stream_id ( ) ) ;
itr - > second = std : : make_shared < link : : Connection > ( ci . shared_from_this ( ) , control_stream ) ;
log : : critical ( logcat , " Successfully configured inbound connection fom {}... " , rid ) ;
}
// TODO: should we add routes here now that Router::SessionOpen is gone?
void
LinkManager : : on_conn_open ( oxen : : quic : : connection_interface & ci )
{
_router . loop ( ) - > call ( [ this , & conn_interface = ci ] ( ) {
const auto rid = RouterID { conn_interface . remote_key ( ) } ;
const auto & remote = conn_interface . remote ( ) ;
if ( conn_interface . is_inbound ( ) )
{
log : : critical ( logcat , " Inbound connection fom {} (remote:{}) " , rid , remote ) ;
on_inbound_conn ( conn_interface ) ;
}
log : : critical (
logcat ,
" SERVICE NODE (RID:{}) ESTABLISHED CONNECTION TO RID:{} " ,
_router . local_rid ( ) ,
rid ) ;
// check to see if this connection was established while we were attempting to queue
// messages to the remote
if ( auto itr = pending_conn_msg_queue . find ( rid ) ; itr ! = pending_conn_msg_queue . end ( ) )
{
log : : critical ( logcat , " Clearing pending queue for RID:{} " , rid ) ;
auto & que = itr - > second ;
while ( not que . empty ( ) )
{
auto & msg = que . front ( ) ;
if ( msg . is_control )
{
log : : critical ( logcat , " Dispatching {} request! " , * msg . endpoint ) ;
ep . conns [ rid ] - > control_stream - > command (
std : : move ( * msg . endpoint ) , std : : move ( msg . body ) , std : : move ( msg . func ) ) ;
}
else
{
conn_interface . send_datagram ( std : : move ( msg . body ) ) ;
}
que . pop_front ( ) ;
}
return ;
}
log : : warning ( logcat , " No pending queue to clear for RID:{} " , rid ) ;
} ) ;
} ;
void
LinkManager : : on_conn_closed ( oxen : : quic : : connection_interface & ci , uint64_t ec )
{
_router . loop ( ) - > call ( [ this , & conn_interface = ci , error_code = ec ] ( ) {
const auto & scid = conn_interface . scid ( ) ;
log : : critical ( quic_cat , " Purging quic connection CID:{} (ec: {}) " , scid , error_code ) ;
if ( const auto & c_itr = ep . connid_map . find ( scid ) ; c_itr ! = ep . connid_map . end ( ) )
{
const auto & rid = c_itr - > second ;
// if (auto maybe = rids_pending_verification.find(rid);
// maybe != rids_pending_verification.end())
// rids_pending_verification.erase(maybe);
// in case this didn't clear earlier, do it now
if ( auto p_itr = pending_conn_msg_queue . find ( rid ) ; p_itr ! = pending_conn_msg_queue . end ( ) )
pending_conn_msg_queue . erase ( p_itr ) ;
if ( auto m_itr = ep . conns . find ( rid ) ; m_itr ! = ep . conns . end ( ) )
ep . conns . erase ( m_itr ) ;
ep . connid_map . erase ( c_itr ) ;
log : : critical ( quic_cat , " Quic connection CID:{} purged successfully " , scid ) ;
}
} ) ;
}
bool
LinkManager : : have_connection_to ( const RouterID & remote , bool client_only ) const
{
@ -598,7 +645,7 @@ namespace llarp
void
LinkManager : : gossip_rc ( const RouterID & rc_rid , std : : string serialized_rc )
{
for ( auto & [ rid , conn ] : ep . conns)
for ( auto & [ rid , conn ] : ep . active_ conns)
{
// don't send back to the owner...
if ( rid = = rc_rid )
@ -607,7 +654,9 @@ namespace llarp
if ( not conn - > remote_is_relay )
continue ;
send_control_message ( rid , " gossip_rc " , serialized_rc ) ;
send_control_message ( rid , " gossip_rc " , serialized_rc , [ ] ( oxen : : quic : : message ) mutable {
log : : critical ( logcat , " PLACEHOLDER FOR GOSSIP RC RESPONSE HANDLER " ) ;
} ) ;
}
}
@ -702,12 +751,12 @@ namespace llarp
}
}
auto & src = is_seed ? node_db - > bootstrap_seeds ( ) : node_db- > get_known_rcs ( ) ;
auto & src = node_db- > get_known_rcs ( ) ;
auto count = src . size ( ) ;
if ( count = = 0 )
{
log : : error ( logcat , " No {} locally to send!" , is_seed ? " bootstrap seeds " : " known RCs " ) ;
log : : error ( logcat , " No known RCs locally to send! " ) ;
m . respond ( messages : : ERROR_RESPONSE , true ) ;
return ;
}
@ -899,9 +948,9 @@ namespace llarp
void
LinkManager : : handle_find_name_response ( oxen : : quic : : message m )
{
if ( m. timed_out )
if ( not m )
{
log : : info ( link_cat , " FindNameMessage timed out !" ) ;
log : : info ( link_cat , " FindNameMessage failed !" ) ;
return ;
}
@ -1035,7 +1084,7 @@ namespace llarp
" publish_intro " ,
PublishIntroMessage : : serialize ( introset , relay_order , is_relayed ) ,
[ respond = std : : move ( respond ) ] ( oxen : : quic : : message m ) {
if ( m. timed_out )
if ( not m )
return ; // drop if timed out; requester will have timed out as well
respond ( m . body_str ( ) ) ;
} ) ;
@ -1073,7 +1122,7 @@ namespace llarp
void
LinkManager : : handle_publish_intro_response ( oxen : : quic : : message m )
{
if ( m. timed_out )
if ( not m )
{
log : : info ( link_cat , " PublishIntroMessage timed out! " ) ;
return ;
@ -1176,7 +1225,7 @@ namespace llarp
link_cat ,
" Relayed FindIntroMessage returned successful response; transmitting to initial "
" requester " ) ;
else if ( relay_response. timed_out )
else if ( not relay_response)
log : : critical (
link_cat , " Relayed FindIntroMessage timed out! Notifying initial requester " ) ;
else
@ -1203,7 +1252,7 @@ namespace llarp
void
LinkManager : : handle_find_intro_response ( oxen : : quic : : message m )
{
if ( m. timed_out )
if ( not m )
{
log : : info ( link_cat , " FindIntroMessage timed out! " ) ;
return ;
@ -1396,7 +1445,7 @@ namespace llarp
" then relaying response " ) ;
_router . path_context ( ) . put_transit_hop ( hop ) ;
}
if ( m. timed_out )
if ( not m )
log : : info ( link_cat , " Upstream timed out on path build; relaying timeout " ) ;
else
log : : info ( link_cat , " Upstream returned path build failure; relaying response " ) ;
@ -1513,7 +1562,7 @@ namespace llarp
void
LinkManager : : handle_obtain_exit_response ( oxen : : quic : : message m )
{
if ( m. timed_out )
if ( not m )
{
log : : info ( link_cat , " ObtainExitMessage timed out! " ) ;
return ;
@ -1591,7 +1640,7 @@ namespace llarp
void
LinkManager : : handle_update_exit_response ( oxen : : quic : : message m )
{
if ( m. timed_out )
if ( not m )
{
log : : info ( link_cat , " UpdateExitMessage timed out! " ) ;
return ;
@ -1676,7 +1725,7 @@ namespace llarp
void
LinkManager : : handle_close_exit_response ( oxen : : quic : : message m )
{
if ( m. timed_out )
if ( not m )
{
log : : info ( link_cat , " CloseExitMessage timed out! " ) ;
return ;
@ -1828,7 +1877,7 @@ namespace llarp
void
LinkManager : : handle_convo_intro ( oxen : : quic : : message m )
{
if ( m. timed_out )
if ( not m )
{
log : : info ( link_cat , " Path control message timed out! " ) ;
return ;