fix up exit tun, fix up codel to actually do its job

pull/66/head
Jeff Becker 6 years ago
parent 9495f556e0
commit 658210b9d1
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -29,14 +29,23 @@ namespace llarp
}
};
struct GetNowSyscall
{
llarp_time_t
operator()() const
{
return llarp_time_now_ms();
}
};
template < typename T, typename GetTime, typename PutTime, typename Compare,
typename Mutex_t = util::Mutex, typename Lock_t = util::Lock,
llarp_time_t dropMs = 5, llarp_time_t initialIntervalMs = 100,
size_t MaxSize = 1024 >
typename GetNow = GetNowSyscall, typename Mutex_t = util::Mutex,
typename Lock_t = util::Lock, llarp_time_t dropMs = 5,
llarp_time_t initialIntervalMs = 100, size_t MaxSize = 1024 >
struct CoDelQueue
{
CoDelQueue(const std::string& name, const PutTime& put)
: m_name(name), _putTime(put)
CoDelQueue(const std::string& name, const PutTime& put, const GetNow& now)
: m_name(name), _putTime(put), _getNow(now)
{
}
@ -92,15 +101,24 @@ namespace llarp
return Process(v, [](T&) -> bool { return false; });
}
template < typename Visit >
void
ProcessN(size_t N, Visit v)
{
Process(v, [](T&) -> bool { return false; }, N);
}
template < typename Visit, typename Filter >
void
Process(Visit visitor, Filter f)
Process(Visit visitor, Filter f, size_t N = MaxSize)
{
llarp_time_t lowest = 0xFFFFFFFFFFFFFFFFUL;
// auto start = llarp_time_now_ms();
llarp_time_t lowest = std::numeric_limits< llarp_time_t >::max();
if(_getNow() < nextTickAt)
return;
// llarp::LogInfo("CoDelQueue::Process - start at ", start);
Lock_t lock(m_QueueMutex);
auto start = firstPut;
if(m_QueueIdx == 1)
{
visitor(m_Queue[0]);
@ -113,6 +131,8 @@ namespace llarp
size_t idx = 0;
while(m_QueueIdx)
{
--N;
llarp::LogDebug(m_name, " - queue has ", m_QueueIdx);
T* item = &m_Queue[idx++];
if(f(*item))
@ -121,7 +141,7 @@ namespace llarp
auto dlt = start - _getTime(*item);
// llarp::LogInfo("CoDelQueue::Process - dlt ", dlt);
lowest = std::min(dlt, lowest);
if(m_QueueIdx == 0)
if(m_QueueIdx == 0 || N == 0)
{
// llarp::LogInfo("CoDelQueue::Process - single item: lowest ",
// lowest, " dropMs: ", dropMs);
@ -129,7 +149,8 @@ namespace llarp
{
item->~T();
nextTickInterval += initialIntervalMs / std::sqrt(++dropNum);
firstPut = 0;
firstPut = 0;
nextTickAt = start + nextTickInterval;
return;
}
else
@ -141,18 +162,21 @@ namespace llarp
visitor(*item);
item->~T();
}
firstPut = 0;
firstPut = 0;
nextTickAt = start + nextTickInterval;
}
llarp_time_t firstPut = 0;
size_t dropNum = 0;
llarp_time_t nextTickInterval = initialIntervalMs;
llarp_time_t nextTickAt = 0;
Mutex_t m_QueueMutex;
size_t m_QueueIdx = 0;
T m_Queue[MaxSize];
std::string m_name;
GetTime _getTime;
PutTime _putTime;
GetNow _getNow;
}; // namespace util
} // namespace util
} // namespace llarp

@ -19,7 +19,7 @@ namespace llarp
struct Endpoint
{
Endpoint(const llarp::PubKey& remoteIdent,
const llarp::PathID_t& beginPath, bool rewriteIP,
const llarp::PathID_t& beginPath, bool rewriteDst, huint32_t ip,
llarp::handlers::ExitEndpoint* parent);
~Endpoint();

@ -9,24 +9,19 @@ namespace llarp
{
namespace handlers
{
struct ExitEndpoint final : public TunEndpoint
struct ExitEndpoint
{
ExitEndpoint(const std::string& name, llarp_router* r);
~ExitEndpoint();
void
Tick(llarp_time_t now) override;
Tick(llarp_time_t now);
bool
SetOption(const std::string& k, const std::string& v) override;
SetOption(const std::string& k, const std::string& v);
virtual std::string
Name() const override;
bool ShouldBuildMore(llarp_time_t) const
{
return false;
}
Name() const;
bool
AllocateNewExit(const llarp::PubKey& pk, const llarp::PathID_t& path,
@ -35,10 +30,23 @@ namespace llarp
llarp::exit::Endpoint*
FindEndpointByPath(const llarp::PathID_t& path);
llarp::exit::Endpoint*
FindEndpointByIP(huint32_t ip);
bool
UpdateEndpointPath(const llarp::PubKey& remote,
const llarp::PathID_t& next);
/// handle ip packet from outside
void
OnInetPacket(llarp_buffer_t buf);
llarp_router*
Router();
llarp_crypto*
Crypto();
template < typename Stats >
void
CalculateTrafficStats(Stats& stats)
@ -52,20 +60,42 @@ namespace llarp
}
}
/// DO NOT CALL ME IF YOU DONT KNOW WHAT THIS DOES
/// DO NOT CALL ME
void
DelEndpointInfo(const llarp::PathID_t& path, const huint32_t& ip,
const llarp::PubKey& pk);
/// DO NOT CALL ME IF YOU DONT KNOW WHAT THIS DOES
/// DO NOT CALL ME
void
RemoveExit(const llarp::exit::Endpoint* ep);
protected:
bool
QueueOutboundTraffic(llarp_buffer_t buf);
/// sets up networking and starts traffic
bool
Start();
huint32_t
GetIfAddr() const;
void
FlushSend() override;
FlushInbound();
private:
huint32_t
GetIPForIdent(const llarp::PubKey& pk);
huint32_t
AllocateNewAddress();
void
MarkIPActive(llarp::huint32_t ip);
void
KickIdentOffExit(const llarp::PubKey& pk);
llarp_router* m_Router;
std::string m_Name;
bool m_PermitExit;
std::unordered_map< llarp::PathID_t, llarp::PubKey,
@ -74,6 +104,33 @@ namespace llarp
std::unordered_multimap< llarp::PubKey, llarp::exit::Endpoint,
llarp::PubKey::Hash >
m_ActiveExits;
std::unordered_map< llarp::PubKey, llarp::huint32_t, llarp::PubKey::Hash >
m_KeyToIP;
std::unordered_map< llarp::huint32_t, llarp::PubKey,
llarp::huint32_t::Hash >
m_IPToKey;
huint32_t m_IfAddr;
huint32_t m_HigestAddr;
huint32_t m_NextAddr;
std::unordered_map< llarp::huint32_t, llarp_time_t,
llarp::huint32_t::Hash >
m_IPActivity;
llarp_tun_io m_Tun;
using Pkt_t = llarp::net::IPv4Packet;
using PacketQueue_t =
llarp::util::CoDelQueue< Pkt_t, Pkt_t::GetTime, Pkt_t::PutTime,
Pkt_t::CompareOrder, Pkt_t::GetNow,
llarp::util::DummyMutex,
llarp::util::DummyLock, 5, 100, 1024 >;
/// internet to llarp packet queue
PacketQueue_t m_InetToNetwork;
};
} // namespace handlers
} // namespace llarp

@ -121,7 +121,7 @@ namespace llarp
protected:
typedef llarp::util::CoDelQueue<
net::IPv4Packet, net::IPv4Packet::GetTime, net::IPv4Packet::PutTime,
net::IPv4Packet::CompareOrder >
net::IPv4Packet::CompareOrder, net::IPv4Packet::GetNow >
PacketQueue_t;
/// queue for sending packets over the network from us
PacketQueue_t m_UserToNetworkPktQueue;

@ -113,6 +113,19 @@ namespace llarp
}
};
struct GetNow
{
llarp_ev_loop* loop;
GetNow(llarp_ev_loop* evloop) : loop(evloop)
{
}
llarp_time_t
operator()() const
{
return llarp_ev_loop_time_now_ms(loop);
}
};
struct CompareOrder
{
bool

@ -141,7 +141,7 @@ namespace llarp
virtual ssize_t
do_write(void* data, size_t sz)
{
//DWORD w;
// DWORD w;
if(std::holds_alternative< HANDLE >(fd))
WriteFile(std::get< HANDLE >(fd), data, sz, nullptr, &portfd[1]);
else
@ -282,6 +282,20 @@ namespace llarp
}
};
struct GetNow
{
llarp_ev_loop* loop;
GetNow(llarp_ev_loop* l) : loop(l)
{
}
llarp_time_t
operator()() const
{
return llarp_ev_loop_time_now_ms(loop);
}
};
struct PutTime
{
llarp_ev_loop* loop;
@ -305,10 +319,10 @@ namespace llarp
};
};
typedef llarp::util::CoDelQueue< WriteBuffer, WriteBuffer::GetTime,
WriteBuffer::PutTime, WriteBuffer::Compare,
llarp::util::NullMutex,
llarp::util::NullLock, 5, 100, 128 >
typedef llarp::util::CoDelQueue<
WriteBuffer, WriteBuffer::GetTime, WriteBuffer::PutTime,
WriteBuffer::Compare, WriteBuffer::GetNow, llarp::util::NullMutex,
llarp::util::NullLock, 5, 100, 1024 >
LossyWriteQueue_t;
typedef std::deque< WriteBuffer > LosslessWriteQueue_t;

@ -177,7 +177,7 @@ namespace llarp
llarp_tun_io* t;
device* tunif;
tun(llarp_tun_io* tio, llarp_ev_loop* l)
: ev_io(-1, new LossyWriteQueue_t("tun_write_queue", l))
: ev_io(-1, new LossyWriteQueue_t("tun_write_queue", l, l))
, t(tio)
, tunif(tuntap_init())

@ -7,11 +7,11 @@ namespace llarp
{
Endpoint::Endpoint(const llarp::PubKey& remoteIdent,
const llarp::PathID_t& beginPath, bool rewriteIP,
llarp::handlers::ExitEndpoint* parent)
huint32_t ip, llarp::handlers::ExitEndpoint* parent)
: m_Parent(parent)
, m_remoteSignKey(remoteIdent)
, m_CurrentPath(beginPath)
, m_IP(parent->ObtainIPForAddr(remoteIdent))
, m_IP(ip)
, m_RewriteSource(rewriteIP)
{
}
@ -77,7 +77,7 @@ namespace llarp
else
dst = pkt.dst();
pkt.UpdateIPv4PacketOnDst(m_IP, dst);
if(!m_Parent->QueueOutboundTraffic(std::move(pkt)))
if(!m_Parent->QueueOutboundTraffic(pkt.Buffer()))
{
llarp::LogError("failed to queue outbound traffic");
return false;
@ -92,8 +92,19 @@ namespace llarp
auto path = GetCurrentPath();
if(path)
{
llarp::net::IPv4Packet pkt;
if(!pkt.Load(buf))
return false;
huint32_t src;
if(m_RewriteSource)
src = m_Parent->GetIfAddr();
else
src = pkt.src();
pkt.UpdateIPv4PacketOnDst(src, m_IP);
llarp::routing::TransferTrafficMessage msg;
if(!msg.PutBuffer(buf))
if(!msg.PutBuffer(pkt.Buffer()))
return false;
msg.S = path->NextSeqNo();
if(!msg.Sign(m_Parent->Crypto(), m_Parent->Router()->identity))

@ -1,19 +1,182 @@
#include <llarp/handlers/exit.hpp>
#include "../str.hpp"
#include "../router.hpp"
namespace llarp
{
namespace handlers
{
static void
ExitHandlerRecvPkt(llarp_tun_io *tun, const void *pkt, ssize_t sz)
{
static_cast< ExitEndpoint * >(tun->user)->OnInetPacket(
llarp::InitBuffer(pkt, sz));
}
static void
ExitHandlerFlushInbound(llarp_tun_io *tun)
{
static_cast< ExitEndpoint * >(tun->user)->FlushInbound();
}
ExitEndpoint::ExitEndpoint(const std::string &name, llarp_router *r)
: TunEndpoint(name, r), m_Name(name)
: m_Router(r)
, m_Name(name)
, m_Tun{{0}, 0, {0}, 0, 0, 0, 0, 0, 0}
, m_InetToNetwork(name + "_exit_rx", r->netloop, r->netloop)
{
m_Tun.user = this;
m_Tun.recvpkt = &ExitHandlerRecvPkt;
m_Tun.tick = &ExitHandlerFlushInbound;
}
ExitEndpoint::~ExitEndpoint()
{
}
void
ExitEndpoint::FlushInbound()
{
auto now = Router()->Now();
m_InetToNetwork.ProcessN(256, [&](Pkt_t &pkt) {
llarp::PubKey pk;
{
auto itr = m_IPToKey.find(pkt.dst());
if(itr == m_IPToKey.end())
{
// drop
llarp::LogWarn(Name(), " dropping packet, has no session at ",
pkt.dst());
return;
}
pk = itr->second;
}
llarp::exit::Endpoint *ep = nullptr;
auto range = m_ActiveExits.equal_range(pk);
auto itr = range.first;
uint64_t min = std::numeric_limits< uint64_t >::max();
/// pick path with lowest rx rate
while(itr != range.second)
{
if(ep == nullptr)
ep = &itr->second;
else if(itr->second.RxRate() < min && !itr->second.ExpiresSoon(now))
{
min = ep->RxRate();
ep = &itr->second;
}
++itr;
}
if(!ep->SendInboundTraffic(pkt.Buffer()))
{
llarp::LogWarn(Name(), " dropped inbound traffic for session ", pk);
}
});
}
bool
ExitEndpoint::Start()
{
return llarp_ev_add_tun(Router()->netloop, &m_Tun);
}
llarp_router *
ExitEndpoint::Router()
{
return m_Router;
}
llarp_crypto *
ExitEndpoint::Crypto()
{
return &m_Router->crypto;
}
huint32_t
ExitEndpoint::GetIfAddr() const
{
return m_IfAddr;
}
huint32_t
ExitEndpoint::GetIPForIdent(const llarp::PubKey &pk)
{
huint32_t found = {0};
const auto itr = m_KeyToIP.find(pk);
if(itr == m_KeyToIP.end())
{
// allocate and map
found = AllocateNewAddress();
m_KeyToIP.insert(std::make_pair(pk, found));
m_IPToKey.insert(std::make_pair(found, pk));
}
else
found = itr->second;
MarkIPActive(found);
return found;
}
huint32_t
ExitEndpoint::AllocateNewAddress()
{
if(m_NextAddr < m_HigestAddr)
return ++m_NextAddr;
// find oldest activity ip address
huint32_t found = {0};
llarp_time_t min = std::numeric_limits< llarp_time_t >::max();
auto itr = m_IPActivity.begin();
while(itr != m_IPActivity.end())
{
if(itr->second < min)
{
found = itr->first;
min = itr->second;
}
++itr;
}
// kick old ident off exit
// TODO: DoS
llarp::PubKey pk = m_IPToKey[found];
KickIdentOffExit(pk);
return found;
}
bool
ExitEndpoint::QueueOutboundTraffic(llarp_buffer_t buf)
{
return llarp_ev_tun_async_write(&m_Tun, buf.base, buf.sz);
}
void
ExitEndpoint::KickIdentOffExit(const llarp::PubKey &pk)
{
llarp::LogInfo(Name(), " kicking ", pk, " off exit");
huint32_t ip = m_KeyToIP[pk];
m_KeyToIP.erase(pk);
m_IPToKey.erase(ip);
auto range = m_ActiveExits.equal_range(pk);
auto exit_itr = range.first;
while(exit_itr != range.second)
exit_itr = m_ActiveExits.erase(exit_itr);
}
void
ExitEndpoint::MarkIPActive(llarp::huint32_t ip)
{
m_IPActivity[ip] = Router()->Now();
}
void
ExitEndpoint::OnInetPacket(llarp_buffer_t buf)
{
m_InetToNetwork.EmplaceIf(
[buf](Pkt_t &pkt) -> bool { return pkt.Load(buf); });
}
llarp::exit::Endpoint *
ExitEndpoint::FindEndpointByPath(const llarp::PathID_t &path)
{
@ -54,9 +217,28 @@ namespace llarp
if(k == "exit")
{
m_PermitExit = IsTrueValue(v.c_str());
// TODO: implement me
return true;
}
if(k == "ifaddr")
{
auto pos = v.find("/");
if(pos == std::string::npos)
{
llarp::LogError(Name(), " ifaddr is not a cidr: ", v);
return false;
}
std::string nmask_str = v.substr(1 + pos);
std::string host_str = v.substr(0, pos);
strncpy(m_Tun.ifaddr, host_str.c_str(), sizeof(m_Tun.ifaddr));
m_Tun.netmask = std::atoi(nmask_str.c_str());
llarp::LogInfo(Name(), " set ifaddr range to ", m_Tun.ifaddr, "/",
m_Tun.netmask);
}
if(k == "ifname")
{
strncpy(m_Tun.ifname, v.c_str(), sizeof(m_Tun.ifname));
llarp::LogInfo(Name(), " set ifname to ", m_Tun.ifname);
}
if(k == "exit-whitelist")
{
// add exit policy whitelist rule
@ -69,53 +251,44 @@ namespace llarp
// TODO: implement me
return true;
}
return TunEndpoint::SetOption(k, v);
return true;
}
bool
ExitEndpoint::AllocateNewExit(const llarp::PubKey &pk,
const llarp::PathID_t &path,
bool permitInternet)
bool wantInternet)
{
if(wantInternet && !m_PermitExit)
return false;
huint32_t ip = GetIPForIdent(pk);
m_ActiveExits.insert(std::make_pair(
pk, llarp::exit::Endpoint(pk, path, !permitInternet, this)));
pk, llarp::exit::Endpoint(pk, path, !wantInternet, ip, this)));
return true;
}
void
ExitEndpoint::FlushSend()
{
auto now = Now();
m_UserToNetworkPktQueue.Process([&](net::IPv4Packet &pkt) {
// find pubkey for addr
if(!HasLocalIP(pkt.dst()))
{
llarp::LogWarn(Name(), " has no endpoint for ", pkt.dst());
return true;
}
llarp::PubKey pk = ObtainAddrForIP< llarp::PubKey >(pkt.dst());
pkt.UpdateIPv4PacketOnDst(pkt.src(), {0});
llarp::exit::Endpoint *ep = nullptr;
auto range = m_ActiveExits.equal_range(pk);
auto itr = range.first;
uint64_t min = std::numeric_limits< uint64_t >::max();
/// pick path with lowest rx rate
while(itr != range.second)
/*
void
ExitEndpoint::FlushSend()
{
if(ep == nullptr)
ep = &itr->second;
else if(itr->second.RxRate() < min && !itr->second.ExpiresSoon(now))
{
min = ep->RxRate();
ep = &itr->second;
}
++itr;
auto now = Now();
m_UserToNetworkPktQueue.Process([&](net::IPv4Packet &pkt) {
// find pubkey for addr
if(!HasLocalIP(pkt.dst()))
{
llarp::LogWarn(Name(), " has no endpoint for ", pkt.dst());
return true;
}
llarp::PubKey pk = ObtainAddrForIP< llarp::PubKey >(pkt.dst());
pkt.UpdateIPv4PacketOnDst(pkt.src(), {0});
if(!ep->SendInboundTraffic(pkt.Buffer()))
llarp::LogWarn(Name(), " dropped traffic to ", pk);
return true;
});
}
if(!ep->SendInboundTraffic(pkt.Buffer()))
llarp::LogWarn(Name(), " dropped traffic to ", pk);
return true;
});
}
*/
std::string
ExitEndpoint::Name() const
@ -128,8 +301,8 @@ namespace llarp
const huint32_t &ip, const llarp::PubKey &pk)
{
m_Paths.erase(path);
m_IPToAddr.erase(ip);
m_AddrToIP.erase(pk);
m_IPToKey.erase(ip);
m_KeyToIP.erase(pk);
}
void
@ -157,7 +330,6 @@ namespace llarp
{
if(itr->second.IsExpired(now))
{
llarp::LogInfo("Exit expired for ", itr->first);
itr = m_ActiveExits.erase(itr);
}
else
@ -166,8 +338,6 @@ namespace llarp
++itr;
}
}
// call parent
TunEndpoint::Tick(now);
}
} // namespace handlers
} // namespace llarp

@ -19,8 +19,8 @@ namespace llarp
{
TunEndpoint::TunEndpoint(const std::string &nickname, llarp_router *r)
: service::Endpoint(nickname, r)
, m_UserToNetworkPktQueue(nickname + "_sendq", r->netloop)
, m_NetworkToUserPktQueue(nickname + "_recvq", r->netloop)
, m_UserToNetworkPktQueue(nickname + "_sendq", r->netloop, r->netloop)
, m_NetworkToUserPktQueue(nickname + "_recvq", r->netloop, r->netloop)
{
tunif.user = this;
tunif.netmask = DefaultTunNetmask;
@ -291,9 +291,7 @@ namespace llarp
m_OurIP = lAddr.xtohl();
m_NextIP = m_OurIP;
auto xmask = netmask_ipv4_bits(tunif.netmask);
auto baseaddr = m_OurIP & xmask;
m_MaxIP = baseaddr | ~xmask;
m_MaxIP = m_OurIP ^ (~xmask);
llarp::LogInfo(Name(), " set ", tunif.ifname, " to have address ", lAddr);
llarp::LogInfo(Name(), " allocated up to ", m_MaxIP);

Loading…
Cancel
Save