move codel off of pointer types

pull/15/head
Jeff Becker 6 years ago
parent 4e693a2414
commit 8bae1a4735
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -34,36 +34,6 @@ namespace llarp
{
}
};
template < typename T, typename GetTime >
struct CoDelCompareTime
{
bool
operator()(const T& left, const T& right) const
{
return GetTime()(left) < GetTime()(right);
}
};
template < typename T >
struct CoDelComparePriority
{
bool
operator()(const T& left, const T& right) const
{
return left < right;
}
};
template < typename Compare, typename T >
struct CoDelCompare
{
bool
operator()(const std::unique_ptr< T >& left,
const std::unique_ptr< T >& right) const
{
return Compare()(left.get(), right.get());
}
};
template < typename T, typename GetTime, typename PutTime, typename Compare,
typename Mutex_t = util::Mutex, typename Lock_t = util::Lock,
@ -83,20 +53,19 @@ namespace llarp
template < typename... Args >
bool
EmplaceIf(std::function< bool(T*) > pred, Args&&... args)
EmplaceIf(std::function< bool(T&) > pred, Args&&... args)
{
T* ptr = new T(std::forward< Args >(args)...);
if(!pred(ptr))
T t(std::forward< Args >(args)...);
if(!pred(t))
{
delete ptr;
return false;
}
PutTime()(ptr);
PutTime()(t);
{
Lock_t lock(m_QueueMutex);
if(firstPut == 0)
firstPut = GetTime()(ptr);
m_Queue.push(std::unique_ptr< T >(ptr));
firstPut = GetTime()(t);
m_Queue.push(t);
}
return true;
}
@ -105,27 +74,28 @@ namespace llarp
void
Emplace(Args&&... args)
{
T* ptr = new T(std::forward< Args >(args)...);
PutTime()(ptr);
T t(std::forward< Args >(args)...);
PutTime()(t);
{
Lock_t lock(m_QueueMutex);
if(firstPut == 0)
firstPut = GetTime()(ptr);
m_Queue.push(std::unique_ptr< T >(ptr));
firstPut = GetTime()(t);
m_Queue.push(t);
}
}
void
Put(T* ptr)
Put(T& t)
{
PutTime()(ptr);
PutTime()(t);
{
Lock_t lock(m_QueueMutex);
if(firstPut == 0)
firstPut = GetTime()(ptr);
m_Queue.push(std::unique_ptr< T >(ptr));
firstPut = GetTime()(t);
m_Queue.push(t);
}
}
template < typename Visit >
void
Process(Visit visitor)
@ -138,8 +108,8 @@ namespace llarp
while(m_Queue.size())
{
llarp::LogDebug("CoDelQueue::Process - queue has ", m_Queue.size());
const auto& item = m_Queue.top();
auto dlt = start - GetTime()(item.get());
auto& item = m_Queue.front();
auto dlt = start - GetTime()(item);
// llarp::LogInfo("CoDelQueue::Process - dlt ", dlt);
lowest = std::min(dlt, lowest);
if(m_Queue.size() == 1)
@ -160,7 +130,7 @@ namespace llarp
}
}
// llarp::LogInfo("CoDelQueue::Process - passing");
visitor(item.get());
visitor(item);
m_Queue.pop();
}
firstPut = 0;
@ -170,10 +140,7 @@ namespace llarp
size_t dropNum = 0;
llarp_time_t nextTickInterval = initialIntervalMs;
Mutex_t m_QueueMutex;
typedef std::priority_queue< std::unique_ptr< T >,
std::vector< std::unique_ptr< T > >,
CoDelCompare< Compare, T > >
Queue_t;
typedef std::queue< T > Queue_t;
Queue_t m_Queue;
std::string m_name;
};

@ -184,26 +184,26 @@ struct iwp_async_frame
struct FramePutTime
{
void
operator()(iwp_async_frame *frame) const
operator()(iwp_async_frame &frame) const
{
frame->created = llarp_time_now_ms();
frame.created = llarp_time_now_ms();
}
};
struct FrameGetTime
{
llarp_time_t
operator()(const iwp_async_frame *frame) const
operator()(const iwp_async_frame &frame) const
{
return frame->created;
return frame.created;
}
};
struct FrameCompareTime
{
bool
operator()(const iwp_async_frame *left, const iwp_async_frame *right) const
operator()(const iwp_async_frame &left, const iwp_async_frame &right) const
{
return left->created < right->created;
return left.created < right.created;
}
};
#endif

@ -73,27 +73,27 @@ namespace llarp
struct GetTime
{
llarp_time_t
operator()(const IPv4Packet* pkt) const
operator()(const IPv4Packet& pkt) const
{
return pkt->timestamp;
return pkt.timestamp;
}
};
struct PutTime
{
void
operator()(IPv4Packet* pkt) const
operator()(IPv4Packet& pkt) const
{
pkt->timestamp = llarp_time_now_ms();
pkt.timestamp = llarp_time_now_ms();
}
};
struct CompareOrder
{
bool
operator()(const IPv4Packet* left, const IPv4Packet* right)
operator()(const IPv4Packet& left, const IPv4Packet& right)
{
return left->timestamp < right->timestamp;
return left.timestamp < right.timestamp;
}
};

@ -47,27 +47,27 @@ struct InboundMessage
struct GetTime
{
llarp_time_t
operator()(const InboundMessage *msg)
operator()(const InboundMessage &msg)
{
return msg->queued;
return msg.queued;
}
};
struct OrderCompare
{
bool
operator()(const InboundMessage *left, const InboundMessage *right) const
operator()(const InboundMessage &left, const InboundMessage &right) const
{
return left->msgid < right->msgid;
return left < right;
}
};
struct PutTime
{
void
operator()(InboundMessage *msg)
operator()(InboundMessage &msg)
{
msg->queued = llarp_time_now_ms();
msg.queued = llarp_time_now_ms();
}
};
};

@ -12,13 +12,10 @@ struct sendbuf_t
_sz = 0;
}
sendbuf_t(sendbuf_t &&other)
sendbuf_t(const sendbuf_t &other)
{
if(other._sz > sizeof(_buf))
throw std::logic_error("sendbuf too big");
memcpy(_buf, other._buf, other._sz);
_sz = other._sz;
other._sz = 0;
_sz = std::min(other._sz, sizeof(_buf));
memcpy(_buf, other._buf, _sz);
}
sendbuf_t(size_t s)
@ -57,27 +54,27 @@ struct sendbuf_t
struct GetTime
{
llarp_time_t
operator()(const sendbuf_t *buf) const
operator()(const sendbuf_t &buf) const
{
return buf->timestamp;
return buf.timestamp;
}
};
struct PutTime
{
void
operator()(sendbuf_t *buf) const
operator()(sendbuf_t &buf) const
{
buf->timestamp = llarp_time_now_ms();
buf.timestamp = llarp_time_now_ms();
}
};
struct Compare
{
bool
operator()(const sendbuf_t *left, const sendbuf_t *right) const
operator()(const sendbuf_t &left, const sendbuf_t &right) const
{
return left->timestamp < right->timestamp;
return left.timestamp < right.timestamp;
}
};

@ -48,8 +48,8 @@ struct llarp_link_session
llarp_link *
get_parent();
const llarp::RouterContact &
get_remote_router() const;
llarp::RouterContact *
get_remote_router();
bool
CheckRCValid();
@ -168,7 +168,7 @@ struct llarp_link_session
void
EncryptOutboundFrames();
iwp_async_frame *
iwp_async_frame
alloc_frame(const void *buf, size_t sz);
void
decrypt_frame(const void *buf, size_t sz);

@ -5,17 +5,13 @@ namespace llarp
{
struct LinkIntroMessage : public ILinkMessage
{
LinkIntroMessage(const RouterContact& rc) : ILinkMessage(), RC(rc)
LinkIntroMessage(RouterContact* rc) : ILinkMessage(), RC(rc)
{
hasRC = true;
}
LinkIntroMessage();
~LinkIntroMessage();
bool hasRC = false;
RouterContact RC;
RouterContact* RC;
bool
DecodeKey(llarp_buffer_t key, llarp_buffer_t* buf);

@ -130,7 +130,7 @@ struct llarp_async_load_rc
void
llarp_nodedb_async_load_rc(struct llarp_async_load_rc *job);
void
bool
llarp_nodedb_select_random_hop(struct llarp_nodedb *n,
const llarp::RouterContact &prev,
llarp::RouterContact &result, size_t N);

@ -40,9 +40,9 @@ namespace llarp
queue_write(const void* data, size_t sz)
{
return m_writeq.EmplaceIf(
[&](WriteBuffer* pkt) -> bool {
[&](const WriteBuffer& pkt) -> bool {
return m_writeq.Size() < MAX_WRITE_QUEUE_SIZE
&& sz <= sizeof(pkt->buf);
&& sz <= sizeof(pkt.buf);
},
data, sz);
}
@ -53,10 +53,10 @@ namespace llarp
virtual void
flush_write()
{
m_writeq.Process([&](WriteBuffer* buffer) {
m_writeq.Process([&](WriteBuffer& buffer) {
// todo: wtf???
#ifndef _WIN32
write(fd, buffer->buf, buffer->bufsz);
write(fd, buffer.buf, buffer.bufsz);
// if we would block we save the entries for later
// discard entry
@ -89,27 +89,27 @@ namespace llarp
struct GetTime
{
llarp_time_t
operator()(const WriteBuffer* w) const
operator()(const WriteBuffer& w) const
{
return w->timestamp;
return w.timestamp;
}
};
struct PutTime
{
void
operator()(WriteBuffer* w) const
operator()(WriteBuffer& w) const
{
w->timestamp = llarp_time_now_ms();
w.timestamp = llarp_time_now_ms();
}
};
struct Compare
{
bool
operator()(const WriteBuffer* left, const WriteBuffer* right) const
operator()(const WriteBuffer& left, const WriteBuffer& right) const
{
return left->timestamp < right->timestamp;
return left.timestamp < right.timestamp;
}
};
};

@ -179,17 +179,17 @@ namespace llarp
void
TunEndpoint::FlushSend()
{
m_UserToNetworkPktQueue.Process([&](net::IPv4Packet *pkt) {
auto itr = m_IPToAddr.find(pkt->dst());
m_UserToNetworkPktQueue.Process([&](net::IPv4Packet &pkt) {
auto itr = m_IPToAddr.find(pkt.dst());
if(itr == m_IPToAddr.end())
{
in_addr a;
a.s_addr = pkt->dst();
a.s_addr = pkt.dst();
llarp::LogWarn("drop packet to ", inet_ntoa(a));
llarp::DumpBuffer(pkt->Buffer());
llarp::DumpBuffer(pkt.Buffer());
return true;
}
return SendToOrQueue(itr->second, pkt->Buffer(),
return SendToOrQueue(itr->second, pkt.Buffer(),
service::eProtocolTraffic);
});
}
@ -207,13 +207,13 @@ namespace llarp
uint32_t usIP = m_OurIP;
auto buf = llarp::Buffer(msg->payload);
if(!m_NetworkToUserPktQueue.EmplaceIf(
[buf, themIP, usIP](net::IPv4Packet *pkt) -> bool {
[buf, themIP, usIP](net::IPv4Packet &pkt) -> bool {
// do packet info rewrite here
// TODO: don't truncate packet here
memcpy(pkt->buf, buf.base, std::min(buf.sz, sizeof(pkt->buf)));
pkt->src(themIP);
pkt->dst(usIP);
pkt->UpdateChecksum();
memcpy(pkt.buf, buf.base, std::min(buf.sz, sizeof(pkt.buf)));
pkt.src(themIP);
pkt.dst(usIP);
pkt.UpdateChecksum();
return true;
}))
{
@ -307,8 +307,8 @@ namespace llarp
{
// called in the isolated network thread
TunEndpoint *self = static_cast< TunEndpoint * >(tun->user);
self->m_NetworkToUserPktQueue.Process([self, tun](net::IPv4Packet *pkt) {
if(!llarp_ev_tun_async_write(tun, pkt->buf, pkt->sz))
self->m_NetworkToUserPktQueue.Process([self, tun](net::IPv4Packet &pkt) {
if(!llarp_ev_tun_async_write(tun, pkt.buf, pkt.sz))
llarp::LogWarn("packet dropped");
});
if(self->m_UserToNetworkPktQueue.Size())
@ -329,9 +329,9 @@ namespace llarp
TunEndpoint *self = static_cast< TunEndpoint * >(tun->user);
llarp::LogDebug("got pkt ", sz, " bytes");
if(!self->m_UserToNetworkPktQueue.EmplaceIf(
[self, buf, sz](net::IPv4Packet *pkt) -> bool {
return pkt->Load(llarp::InitBuffer(buf, sz))
&& pkt->Header()->ip_version == 4;
[self, buf, sz](net::IPv4Packet &pkt) -> bool {
return pkt.Load(llarp::InitBuffer(buf, sz))
&& pkt.Header()->ip_version == 4;
}))
llarp::LogError("Failed to parse ipv4 packet");
}

@ -23,16 +23,16 @@ bool
frame_state::process_inbound_queue()
{
uint64_t last = 0;
recvqueue.Process([&](const InboundMessage *msg) {
if(last != msg->msgid)
recvqueue.Process([&](InboundMessage &msg) {
if(last != msg.msgid)
{
auto buffer = msg->Buffer();
auto buffer = msg.Buffer();
if(!Router()->HandleRecvLinkMessage(parent, buffer))
{
llarp::LogWarn("failed to process inbound message ", msg->msgid);
llarp::LogWarn("failed to process inbound message ", msg.msgid);
llarp::DumpBuffer< llarp_buffer_t, 128 >(buffer);
}
last = msg->msgid;
last = msg.msgid;
}
else
{
@ -194,8 +194,8 @@ void
frame_state::push_ackfor(uint64_t id, uint32_t bitmask)
{
llarp::LogDebug("ACK for msgid=", id, " mask=", bitmask);
auto pkt = new sendbuf_t(12 + 6);
auto body_ptr = init_sendbuf(pkt, eACKS, 12, txflags);
sendbuf_t pkt(12 + 6);
auto body_ptr = init_sendbuf(&pkt, eACKS, 12, txflags);
htobe64buf(body_ptr, id);
htobe32buf(body_ptr + 8, bitmask);
sendqueue.Put(pkt);

@ -111,7 +111,7 @@ llarp_link::TickSessions()
{
if(itr->second->Tick(now))
{
m_Connected.erase(itr->second->get_remote_router().pubkey);
m_Connected.erase(itr->second->get_remote_router()->pubkey);
itr = m_sessions.erase(itr);
}
else
@ -160,7 +160,6 @@ llarp_link::pending_session_active(const llarp::Addr& addr)
if(itr == m_PendingSessions.end())
return;
itr->second->our_router = &router->rc;
m_sessions.insert(std::make_pair(addr, std::move(itr->second)));
m_PendingSessions.erase(itr);
}

@ -127,10 +127,10 @@ llarp_link_session::session_established()
llarp_logic_cancel_call(serv->logic, establish_job_id);
}
const llarp::RouterContact &
llarp_link_session::get_remote_router() const
llarp::RouterContact *
llarp_link_session::get_remote_router()
{
return remote_router;
return &remote_router;
}
void
@ -179,7 +179,7 @@ llarp_link_session::send_LIM()
byte_t tmp[MAX_RC_SIZE + 64];
auto buf = llarp::StackBuffer< decltype(tmp) >(tmp);
// return a llarp_buffer_t of encoded link message
if(llarp::EncodeLIM(&buf, our_router))
if(llarp::EncodeLIM(&buf, &serv->router->rc))
{
// rewind message buffer
buf.sz = buf.cur - buf.base;
@ -383,7 +383,8 @@ llarp_link_session::get_parent()
void
llarp_link_session::TickLogic(llarp_time_t now)
{
decryptedFrames.Process([=](iwp_async_frame *f) { handle_frame_decrypt(f); });
decryptedFrames.Process(
[=](iwp_async_frame &f) { handle_frame_decrypt(&f); });
frame.process_inbound_queue();
frame.retransmit(now);
pump();
@ -425,9 +426,9 @@ llarp_link_session::keepalive()
void
llarp_link_session::EncryptOutboundFrames()
{
outboundFrames.Process([&](iwp_async_frame *frame) {
if(iwp_encrypt_frame(frame))
if(llarp_ev_udp_sendto(udp, addr, frame->buf, frame->sz) == -1)
outboundFrames.Process([&](iwp_async_frame &frame) {
if(iwp_encrypt_frame(&frame))
if(llarp_ev_udp_sendto(udp, addr, frame.buf, frame.sz) == -1)
llarp::LogError("sendto ", addr, " failed");
});
}
@ -572,7 +573,7 @@ llarp_link_session::decrypt_frame(const void *buf, size_t sz)
// inboundFrames.Put(frame);
auto f = alloc_frame(buf, sz);
if(iwp_decrypt_frame(f))
if(iwp_decrypt_frame(&f))
{
decryptedFrames.Put(f);
}
@ -722,23 +723,17 @@ llarp_link_session::recv(const void *buf, size_t sz)
}
}
iwp_async_frame *
iwp_async_frame
llarp_link_session::alloc_frame(const void *buf, size_t sz)
{
// TODO don't hard code 1500
if(sz > 1500)
{
llarp::LogWarn("alloc frame - frame too big, >1500");
return nullptr;
}
iwp_async_frame *frame = new iwp_async_frame;
iwp_async_frame frame;
sz = std::min(sz, sizeof(frame.buf));
if(buf)
memcpy(frame->buf, buf, sz);
frame->iwp = iwp;
frame->sz = sz;
frame->user = this;
frame->sessionkey = sessionkey;
memcpy(frame.buf, buf, sz);
frame.iwp = iwp;
frame.sz = sz;
frame.user = this;
frame.sessionkey = sessionkey;
/// TODO: this could be rather slow
// frame->created = now;
// llarp::LogInfo("alloc_frame putting into q");
@ -751,14 +746,12 @@ llarp_link_session::encrypt_frame_async_send(const void *buf, size_t sz)
{
// 64 bytes frame overhead for nonce and hmac
auto frame = alloc_frame(nullptr, sz + 64);
if(frame == nullptr)
return;
memcpy(frame->buf + 64, buf, sz);
memcpy(frame.buf + 64, buf, sz);
// maybe add upto 128 random bytes to the packet
auto padding = llarp_randint() % MAX_PAD;
if(padding)
crypto->randbytes(frame->buf + 64 + sz, padding);
frame->sz += padding;
crypto->randbytes(frame.buf + 64 + sz, padding);
frame.sz += padding;
// frame is modified, so now we can push it to queue
outboundFrames.Put(frame);
}
@ -767,8 +760,8 @@ void
llarp_link_session::pump()
{
bool flush = false;
frame.sendqueue.Process([&, this](sendbuf_t *msg) {
encrypt_frame_async_send(msg->data(), msg->size());
frame.sendqueue.Process([&, this](sendbuf_t &msg) {
encrypt_frame_async_send(msg.data(), msg.size());
flush = true;
});
if(flush)

@ -108,9 +108,9 @@ transit_message::completed() const
void
transit_message::generate_xmit(sendqueue_t &queue, byte_t flags)
{
uint16_t sz = lastfrag.size() + sizeof(msginfo.buffer);
auto pkt = new sendbuf_t(sz + 6);
auto body_ptr = init_sendbuf(pkt, eXMIT, sz, flags);
uint16_t sz = lastfrag.size() + sizeof(msginfo.buffer);
sendbuf_t pkt(sz + 6);
auto body_ptr = init_sendbuf(&pkt, eXMIT, sz, flags);
memcpy(body_ptr, msginfo.buffer, sizeof(msginfo.buffer));
body_ptr += sizeof(msginfo.buffer);
memcpy(body_ptr, lastfrag.data(), lastfrag.size());
@ -127,9 +127,9 @@ transit_message::retransmit_frags(sendqueue_t &queue, byte_t flags)
{
if(status.test(frag.first))
continue;
uint16_t sz = 9 + fragsize;
auto pkt = new sendbuf_t(sz + 6);
auto body_ptr = init_sendbuf(pkt, eFRAG, sz, flags);
uint16_t sz = 9 + fragsize;
sendbuf_t pkt(sz + 6);
auto body_ptr = init_sendbuf(&pkt, eFRAG, sz, flags);
htobe64buf(body_ptr, msgid);
body_ptr[8] = frag.first;
memcpy(body_ptr + 9, frag.second.data(), fragsize);

@ -15,10 +15,7 @@ namespace llarp
{
if(llarp_buffer_eq(key, "r"))
{
if(!RC.BDecode(buf))
return false;
hasRC = true;
return true;
return RC->BDecode(buf);
}
else if(llarp_buffer_eq(key, "v"))
{
@ -51,11 +48,12 @@ namespace llarp
if(!bencode_write_bytestring(buf, "i", 1))
return false;
if(hasRC)
if(RC)
{
if(!bencode_write_bytestring(buf, "r", 1))
return false;
return RC.BEncode(buf);
if(!RC->BEncode(buf))
return false;
}
if(!bencode_write_version_entry(buf))
@ -67,7 +65,8 @@ namespace llarp
bool
LinkIntroMessage::HandleMessage(llarp_router* router) const
{
router->async_verify_RC(RC, !RC.IsPublicRouter());
RouterContact contact = *RC;
router->async_verify_RC(contact, !contact.IsPublicRouter());
return true;
}
} // namespace llarp

@ -88,7 +88,7 @@ namespace llarp
RouterID
InboundMessageParser::GetCurrentFrom()
{
return from->get_remote_router().pubkey;
return from->get_remote_router()->pubkey;
}
bool

@ -373,7 +373,7 @@ llarp_nodedb_num_loaded(struct llarp_nodedb *n)
return n->entries.size();
}
void
bool
llarp_nodedb_select_random_hop(struct llarp_nodedb *n,
const llarp::RouterContact &prev,
llarp::RouterContact &result, size_t N)
@ -381,7 +381,8 @@ llarp_nodedb_select_random_hop(struct llarp_nodedb *n,
/// checking for "guard" status for N = 0 is done by caller inside of
/// pathbuilder's scope
auto sz = n->entries.size();
if(sz == 0)
return false;
if(N)
{
do
@ -398,7 +399,7 @@ llarp_nodedb_select_random_hop(struct llarp_nodedb *n,
if(itr->second.addrs.size())
{
result = itr->second;
return;
return true;
}
} while(true);
}
@ -412,5 +413,6 @@ llarp_nodedb_select_random_hop(struct llarp_nodedb *n,
std::advance(itr, idx - 1);
}
result = itr->second;
return true;
}
}

@ -175,14 +175,9 @@ namespace llarp
if(router->NumberOfConnectedRouters())
return router->GetRandomConnectedRouter(cur);
else
{
llarp_nodedb_select_random_hop(db, prev, cur, 0);
return true;
}
return llarp_nodedb_select_random_hop(db, prev, cur, 0);
}
else
llarp_nodedb_select_random_hop(db, prev, cur, hop);
return true;
return llarp_nodedb_select_random_hop(db, prev, cur, hop);
}
const byte_t*
@ -202,9 +197,10 @@ namespace llarp
Builder::BuildOne()
{
// select hops
std::vector< RouterContact > hops(numHops);
std::vector< RouterContact > hops;
for(size_t i = 0; i < numHops; ++i)
hops.emplace_back();
size_t idx = 0;
RouterContact prev;
while(idx < numHops)
{
if(idx == 0)

Loading…
Cancel
Save