only flush when no other jobs are executing

pull/935/head
Jeff Becker 5 years ago committed by Jason Rhinelander
parent 9e305c5b30
commit d823d6fa70

@ -36,6 +36,8 @@ namespace llarp
{
m_UpstreamGather.enable();
m_DownstreamGather.enable();
m_UpstreamWorkCounter = 0;
m_DownstreamWorkCounter = 0;
}
bool
@ -122,6 +124,18 @@ namespace llarp
void
TransitHop::DownstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{
m_DownstreamWorkCounter++;
auto flushIt = [self = shared_from_this(), r]() {
std::vector< RelayDownstreamMessage > msgs;
do
{
auto maybe = self->m_DownstreamGather.tryPopFront();
if(not maybe.has_value())
break;
msgs.emplace_back(maybe.value());
} while(true);
self->HandleAllDownstream(std::move(msgs), r);
};
for(auto& ev : *msgs)
{
RelayDownstreamMessage msg;
@ -132,27 +146,32 @@ namespace llarp
msg.X = buf;
llarp::LogDebug("relay ", msg.X.size(), " bytes downstream from ",
info.upstream, " to ", info.downstream);
if(m_DownstreamGather.empty() || m_DownstreamGather.full())
if(m_DownstreamGather.full())
{
LogicCall(r->logic(), [self = shared_from_this(), r]() {
std::vector< RelayDownstreamMessage > msgs;
do
{
auto maybe = self->m_DownstreamGather.tryPopFront();
if(not maybe.has_value())
break;
msgs.emplace_back(maybe.value());
} while(true);
self->HandleAllDownstream(std::move(msgs), r);
});
LogicCall(r->logic(), flushIt);
}
m_DownstreamGather.pushBack(msg);
}
m_DownstreamWorkCounter--;
if(m_DownstreamWorkCounter == 0)
flushIt();
}
void
TransitHop::UpstreamWork(TrafficQueue_ptr msgs, AbstractRouter* r)
{
m_UpstreamWorkCounter++;
auto flushIt = [self = shared_from_this(), r]() {
std::vector< RelayUpstreamMessage > msgs;
do
{
auto maybe = self->m_UpstreamGather.tryPopFront();
if(not maybe.has_value())
break;
msgs.emplace_back(maybe.value());
} while(true);
self->HandleAllUpstream(std::move(msgs), r);
};
for(auto& ev : *msgs)
{
const llarp_buffer_t buf(ev.first);
@ -161,22 +180,15 @@ namespace llarp
msg.pathid = info.txID;
msg.Y = ev.second ^ nonceXOR;
msg.X = buf;
if(m_UpstreamGather.empty() || m_UpstreamGather.full())
if(m_UpstreamGather.full())
{
LogicCall(r->logic(), [self = shared_from_this(), r]() {
std::vector< RelayUpstreamMessage > msgs;
do
{
auto maybe = self->m_UpstreamGather.tryPopFront();
if(not maybe.has_value())
break;
msgs.emplace_back(maybe.value());
} while(true);
self->HandleAllUpstream(std::move(msgs), r);
});
LogicCall(r->logic(), flushIt);
}
m_UpstreamGather.pushBack(msg);
}
m_UpstreamWorkCounter--;
if(m_UpstreamWorkCounter == 0)
flushIt();
}
void
@ -235,17 +247,6 @@ namespace llarp
std::move(m_UpstreamQueue), r));
m_UpstreamQueue = nullptr;
std::vector< RelayUpstreamMessage > msgs;
do
{
auto maybe = m_UpstreamGather.tryPopFront();
if(not maybe.has_value())
break;
msgs.emplace_back(maybe.value());
} while(true);
if(msgs.empty())
return;
HandleAllUpstream(std::move(msgs), r);
}
void
@ -256,17 +257,6 @@ namespace llarp
shared_from_this(),
std::move(m_DownstreamQueue), r));
m_DownstreamQueue = nullptr;
std::vector< RelayDownstreamMessage > msgs;
do
{
auto maybe = m_DownstreamGather.tryPopFront();
if(not maybe.has_value())
break;
msgs.emplace_back(maybe.value());
} while(true);
if(msgs.empty())
return;
HandleAllDownstream(std::move(msgs), r);
}
bool

@ -235,6 +235,8 @@ namespace llarp
m_FlushOthers;
thread::Queue< RelayUpstreamMessage > m_UpstreamGather;
thread::Queue< RelayDownstreamMessage > m_DownstreamGather;
std::atomic< uint32_t > m_UpstreamWorkCounter;
std::atomic< uint32_t > m_DownstreamWorkCounter;
};
inline std::ostream&

Loading…
Cancel
Save