|
|
|
@ -27,19 +27,29 @@ frame_state::process_inbound_queue()
|
|
|
|
|
// TODO: is this right?
|
|
|
|
|
auto &front = q.top();
|
|
|
|
|
// the items are already sorted anyways so this doesn't really do much
|
|
|
|
|
nextMsgID = std::max(nextMsgID, front->msgid);
|
|
|
|
|
auto buffer = front->Buffer();
|
|
|
|
|
if(!Router()->HandleRecvLinkMessage(parent, buffer))
|
|
|
|
|
|
|
|
|
|
if(front->msgid < nextMsgID && nextMsgID - front->msgid > 1)
|
|
|
|
|
{
|
|
|
|
|
llarp::LogWarn("failed to process inbound message ", front->msgid);
|
|
|
|
|
llarp::DumpBuffer< llarp_buffer_t, 128 >(buffer);
|
|
|
|
|
// re queue
|
|
|
|
|
recvqueue.Put(front);
|
|
|
|
|
nextMsgID = front->msgid;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
auto buffer = front->Buffer();
|
|
|
|
|
if(!Router()->HandleRecvLinkMessage(parent, buffer))
|
|
|
|
|
{
|
|
|
|
|
llarp::LogWarn("failed to process inbound message ", front->msgid);
|
|
|
|
|
llarp::DumpBuffer< llarp_buffer_t, 128 >(buffer);
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
nextMsgID = std::max(front->msgid, nextMsgID + 1);
|
|
|
|
|
}
|
|
|
|
|
delete front;
|
|
|
|
|
}
|
|
|
|
|
delete front;
|
|
|
|
|
q.pop();
|
|
|
|
|
increment = true;
|
|
|
|
|
}
|
|
|
|
|
if(increment)
|
|
|
|
|
++nextMsgID;
|
|
|
|
|
// TODO: this isn't right
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
@ -145,13 +155,13 @@ frame_state::got_frag(frame_header hdr, size_t sz)
|
|
|
|
|
auto idItr = rxIDs.find(msgid);
|
|
|
|
|
if(idItr == rxIDs.end())
|
|
|
|
|
{
|
|
|
|
|
llarp::LogWarn("no such RX fragment, msgid=", msgid);
|
|
|
|
|
push_ackfor(msgid, 0);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
auto itr = rx.find(idItr->second);
|
|
|
|
|
if(itr == rx.end())
|
|
|
|
|
{
|
|
|
|
|
llarp::LogWarn("no such RX fragment, msgid=", msgid);
|
|
|
|
|
push_ackfor(msgid, 0);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
auto fragsize = itr->second->msginfo.fragsize();
|
|
|
|
@ -184,11 +194,12 @@ void
|
|
|
|
|
frame_state::push_ackfor(uint64_t id, uint32_t bitmask)
|
|
|
|
|
{
|
|
|
|
|
llarp::LogDebug("ACK for msgid=", id, " mask=", bitmask);
|
|
|
|
|
sendqueue.push(new sendbuf_t(12 + 6));
|
|
|
|
|
auto body_ptr = init_sendbuf(sendqueue.back(), eACKS, 12, txflags);
|
|
|
|
|
auto pkt = new sendbuf_t(12 + 6);
|
|
|
|
|
auto body_ptr = init_sendbuf(pkt, eACKS, 12, txflags);
|
|
|
|
|
// TODO: this assumes big endian
|
|
|
|
|
memcpy(body_ptr, &id, 8);
|
|
|
|
|
memcpy(body_ptr + 8, &bitmask, 4);
|
|
|
|
|
sendqueue.Put(pkt);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool
|
|
|
|
@ -331,6 +342,7 @@ frame_state::process(byte_t *buf, size_t sz)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
bool
|
|
|
|
|
frame_state::next_frame(llarp_buffer_t *buf)
|
|
|
|
|
{
|
|
|
|
@ -354,6 +366,7 @@ frame_state::pop_next_frame()
|
|
|
|
|
delete buf;
|
|
|
|
|
sendqueue.pop();
|
|
|
|
|
}
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
void
|
|
|
|
|
frame_state::queue_tx(uint64_t id, transit_message *msg)
|
|
|
|
|