initial wack at 0.7.0 dht fixes

pull/1075/head
Jeff Becker 4 years ago
parent f6813717b5
commit 9efd796145
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -0,0 +1,94 @@
llarp's dht is a recusrive kademlia dht with optional request proxying via paths for requester anonymization.
dht is separated into 2 different networks, one for router contacts, one for introsets.
format for consesus propagation messages:
keys: A, H, K, N, O, T, U, V
concensus request messages
requester requests current table hash, H,N,O is set to zeros if not known
C -> S
{
A: "C",
H: "<32 byte last hash of consensus table>",
N: uint64_number_of_entries_to_request,
O: uint64_offset_in_table,
T: uint64_txid,
V: []
}
when H or N is set to zero from the requester, they are requesting the current consensus table's hash
consensus response message is as follows for a zero H or N value
S -> C
{
A: "C",
H: "<32 byte hash of current consensus table>",
N: uint64_number_of_entries_in_table,
T: uint64_txid,
U: uint64_ms_next_update_required,
V: [proto, major, minor, patch]
}
requester requests a part of the current table for hash H
N must be less than or equal to 512
C -> S
{
A: "C",
H: "<32 bytes current consensus table hash>",
N: 256,
O: 512,
T: uint64_txid,
V: []
}
consensus response message for routers 512 to 512 + 256
S -> C
{
A: "C",
H: "<32 bytes current concensus table hash>",
K: [list, of, N, pubkeys, from, request, starting, at, position, O],
T: uint64_txid,
V: [proto, major, minor, patch]
}
consensus table is a concatination of all public keys in lexigraphical order.
the hash function in use is 256 bit blake2
gossip RC message
broadcast style RC publish message. sent to all peers infrequently.
it is really an unwarrented GRCM, propagate to all peers.
{
A: "S",
R: [RC],
T: 0,
V: proto
}
replays are dropped using a decaying hashset or decaying bloom filter.
the introset dht has 3 message: GetIntroSet Message (GIM), PutIntroSet Message (PIM), FoundIntroSet Message (FIM)

@ -132,7 +132,12 @@ set(DNSLIB_SRC
dns/string.cpp
)
set(CONSENSUS_SRC
consensus/table.cpp
)
set(LIB_SRC
${CONSENSUS_SRC}
${DNSLIB_SRC}
bootstrap.cpp
context.cpp

@ -0,0 +1,17 @@
#include <consensus/table.hpp>
#include <crypto/crypto.hpp>
namespace llarp
{
namespace consensus
{
ShortHash
Table::CalculateHash() const
{
ShortHash h;
const llarp_buffer_t buf(begin()->data(), size());
CryptoManager::instance()->shorthash(h, buf);
return h;
}
} // namespace consensus
} // namespace llarp

@ -0,0 +1,20 @@
#ifndef LLARP_CONSENSUS_TABLE_HPP
#define LLARP_CONSENSUS_TABLE_HPP
#include <crypto/types.hpp>
#include <vector>
namespace llarp
{
namespace consensus
{
/// consensus table
struct Table : public std::vector< RouterID >
{
ShortHash
CalculateHash() const;
};
} // namespace consensus
} // namespace llarp
#endif

@ -12,6 +12,7 @@
namespace llarp
{
const std::array<uint16_t, 3> VERSION{{LLARP_VERSION_MAJ, LLARP_VERSION_MIN, LLARP_VERSION_PATCH}};
const std::array<uint64_t, 4> ROUTER_VERSION{{LLARP_PROTO_VERSION, LLARP_VERSION_MAJ, LLARP_VERSION_MIN, LLARP_VERSION_PATCH}};
const char* const VERSION_STR = LLARP_VERSION_STR;
const char* const VERSION_TAG = "@VERSIONTAG@";
const char* const VERSION_FULL = LLARP_NAME "-" LLARP_VERSION_STR "-@VERSIONTAG@";

@ -7,8 +7,8 @@
#define LLARP_NAME "lokinet"
#define LLARP_VERSION_MAJ 0
#define LLARP_VERSION_MIN 6
#define LLARP_VERSION_PATCH 4
#define LLARP_VERSION_MIN 7
#define LLARP_VERSION_PATCH 0
#define LLARP_DEFAULT_NETID "lokinet"

@ -6,10 +6,11 @@
namespace llarp
{
// Given a full lokinet version of: lokinet-1.2.3-abc these are:
extern const std::array< uint16_t, 3 > VERSION; // [1, 2, 3]
extern const char* const VERSION_STR; // "1.2.3"
extern const char* const VERSION_TAG; // "abc"
extern const char* const VERSION_FULL; // "lokinet-1.2.3-abc"
extern const std::array< uint16_t, 3 > VERSION; // [1, 2, 3]
extern const std::array< uint64_t, 4 > ROUTER_VERSION; // [proto, 1, 2, 3]
extern const char* const VERSION_STR; // "1.2.3"
extern const char* const VERSION_TAG; // "abc"
extern const char* const VERSION_FULL; // "lokinet-1.2.3-abc"
extern const char* const RELEASE_MOTTO;
extern const char* const DEFAULT_NETID;

@ -208,6 +208,16 @@ namespace llarp
}
}
template < typename Visit_t >
void
ForEachNode(Visit_t visit)
{
for(const auto& item : nodes)
{
visit(item.second);
}
}
void
Clear()
{

@ -21,6 +21,7 @@
#include <nodedb.hpp>
#include <profiling.hpp>
#include <router/i_rc_lookup_handler.hpp>
#include <util/decaying_hashset.hpp>
#include <vector>
namespace llarp
@ -228,11 +229,24 @@ namespace llarp
return router->nodedb()->Get(k.as_array(), rc);
}
void
FloodRCLater(const dht::Key_t from, const RouterContact rc) override;
PendingIntrosetLookups _pendingIntrosetLookups;
PendingTagLookups _pendingTagLookups;
PendingRouterLookups _pendingRouterLookups;
PendingExploreLookups _pendingExploreLookups;
using RCGossipReplayFilter_t = util::DecayingHashSet< RouterContact >;
RCGossipReplayFilter_t m_GossipReplayFilter;
using RCGossipList_t = std::unordered_multimap< RouterContact, RouterID,
RouterContact::Hash >;
/// list of RCs that we want to publish with who gave us the publish
RCGossipList_t m_GossipList;
PendingIntrosetLookups&
pendingIntrosetLookups() override
{
@ -305,7 +319,9 @@ namespace llarp
Key_t ourKey;
};
Context::Context()
static constexpr auto GossipFilterDecayInterval = std::chrono::minutes(30);
Context::Context() : m_GossipReplayFilter(GossipFilterDecayInterval)
{
randombytes((byte_t*)&ids, sizeof(uint64_t));
}
@ -355,11 +371,45 @@ namespace llarp
{
// clean up transactions
CleanupTX();
const llarp_time_t now = Now();
// flush pending floods
if(router->IsServiceNode())
{
std::unordered_set< RouterContact, RouterContact::Hash > keys;
for(const auto& item : m_GossipList)
{
// filter hit don't publish it at all
if(not m_GossipReplayFilter.Insert(item.first))
continue;
// skip if duplicate RC
if(not keys.emplace(item.first).second)
continue;
const auto& rc = item.first;
// build set of routers to not send to for this RC
std::set< RouterID > exclude = {rc.pubkey};
const auto range = m_GossipList.equal_range(rc);
auto itr = range.first;
while(itr != range.second)
{
exclude.emplace(itr->second);
++itr;
}
Nodes()->ForEachNode([self = this, rc, &exclude](const auto& node) {
const RouterID K(node.rc.pubkey);
if(exclude.find(K) == exclude.end())
self->DHTSendTo(K, new GotRouterMessage(rc));
});
}
}
// clear gossip list
m_GossipList.clear();
// decay gossip filter
m_GossipReplayFilter.Decay(now);
if(_services)
{
// expire intro sets
auto now = Now();
auto& nodes = _services->nodes;
auto itr = nodes.begin();
while(itr != nodes.end())
@ -564,7 +614,7 @@ namespace llarp
llarp::routing::DHTMessage reply;
if(!msg.HandleMessage(router->dht(), reply.M))
return false;
if(!reply.M.empty())
if(not reply.M.empty())
{
auto path = router->pathContext().GetByUpstream(router->pubkey(), id);
return path && path->SendRoutingMessage(reply, router);
@ -613,6 +663,16 @@ namespace llarp
((R + 1) * 2000));
}
void
Context::FloodRCLater(const dht::Key_t from, const RouterContact rc)
{
// check valid rc
if(not router->rcLookupHandler().CheckRC(rc))
return;
m_GossipList.emplace(rc, from.as_array());
// TODO: don't publish our rc in next interval (based of whitelist size)
}
void
Context::LookupIntroSetIterative(const service::Address& addr,
const Key_t& whoasked, uint64_t txid,

@ -185,6 +185,10 @@ namespace llarp
virtual void
StoreRC(const RouterContact rc) const = 0;
/// flood rc to all peers later in a batch
virtual void
FloodRCLater(const dht::Key_t from, const RouterContact rc) = 0;
};
std::unique_ptr< AbstractContext >

@ -0,0 +1,28 @@
#ifndef LLARP_DHT_MESSAGES_CONSENSUS_HPP
#define LLARP_DHT_MESSAGES_CONSENSUS_HPP
#include <dht/message.hpp>
#include <router_version.hpp>
namespace llarp
{
namespace dht
{
struct ConsensusMessage
{
/// H
ShortHash m_Hash;
/// K
std::vector< RouterID > m_Keys;
/// N
uint64_t m_NumberOfEntries;
/// O
uint64_t m_EntryOffset;
/// T
uint64_t m_TxID;
/// U
llarp_time_t m_NextUpdateRequired;
/// V
RouterVersion m_RotuerVersion;
};
} // namespace dht
} // namespace llarp

@ -124,6 +124,11 @@ namespace llarp
{
if(not dht.GetRouter()->rcLookupHandler().CheckRC(rc))
return false;
if(txid == 0 and R.size() == 1)
{
// flood as needed
dht.FloodRCLater(From, rc);
}
}
return true;
}

@ -1,6 +1,6 @@
#ifndef LLARP_DHT_MESSAGES_GOT_ROUTER_HPP
#define LLARP_DHT_MESSAGES_GOT_ROUTER_HPP
#include <constants/proto.hpp>
#include <dht/message.hpp>
#include <router_contact.hpp>
#include <util/copy_or_nullptr.hpp>
@ -42,6 +42,12 @@ namespace llarp
{
}
/// gossip message
GotRouterMessage(const RouterContact rc) : IMessage({}), R({rc}), txid(0)
{
version = LLARP_PROTO_VERSION;
}
GotRouterMessage(const GotRouterMessage& other)
: IMessage(other.From)
, foundRCs(other.foundRCs)

@ -123,6 +123,12 @@ namespace llarp
util::StatusObject
ExtractStatus() const override;
bool
IsInbound() const override
{
return m_Inbound;
}
private:
enum class State
{

@ -187,6 +187,8 @@ namespace llarp
}
for(const auto& pending : closedPending)
{
if(pending->IsInbound())
continue;
HandleTimeout(pending.get());
}
}

@ -82,6 +82,10 @@ namespace llarp
virtual PubKey
GetPubKey() const = 0;
/// is an inbound session or not
virtual bool
IsInbound() const = 0;
/// get remote address
virtual Addr
GetRemoteEndpoint() const = 0;

@ -24,6 +24,8 @@ namespace llarp
{
if(key == "c")
{
/// so we dont put it into the shitty queue
pathid.Fill('c');
return BEncodeReadArray(frames, buf);
}
bool read = false;
@ -248,9 +250,10 @@ namespace llarp
if(self->context->HasTransitHop(self->hop->info))
{
llarp::LogError("duplicate transit hop ", self->hop->info);
OnForwardLRCMResult(self->context->Router(), self->hop->info.rxID,
self->hop->info.downstream, self->hop->pathKey,
SendStatus::Congestion);
LR_StatusMessage::CreateAndSend(
self->context->Router(), self->hop->info.rxID,
self->hop->info.downstream, self->hop->pathKey,
LR_StatusRecord::FAIL_DUPLICATE_HOP);
self->hop = nullptr;
return;
}

@ -91,6 +91,7 @@ namespace llarp
{
std::for_each(frames.begin(), frames.end(), [](auto& f) { f.Clear(); });
version = 0;
status = 0;
}
bool
@ -134,7 +135,8 @@ namespace llarp
if(!path)
{
llarp::LogWarn(
"unhandled LR_Status message: no associated IHopHandler found");
"unhandled LR_Status message: no associated path found pathid=",
pathid);
return false;
}
@ -161,7 +163,7 @@ namespace llarp
{
auto message = std::make_shared< LR_StatusMessage >();
message->status = status & LR_StatusRecord::SUCCESS;
message->status = status;
message->pathid = pathid;
message->SetDummyFrames();
@ -230,13 +232,6 @@ namespace llarp
std::shared_ptr< LR_StatusMessage > msg)
{
llarp::LogDebug("Attempting to send LR_Status message to (", nextHop, ")");
if(not router->HasSessionTo(nextHop))
{
llarp::LogError(
"Sending LR_Status message, but no connection to previous hop (",
nextHop, ")");
return;
}
if(not router->SendToOrQueue(nextHop, msg.get()))
{
llarp::LogError("Sending LR_Status message, SendToOrQueue to ", nextHop,

@ -71,11 +71,12 @@ namespace llarp
LR_StatusMessage::QueueSendMessage(r, info.downstream, msg);
if((status & LR_StatusRecord::SUCCESS) == 0)
if((status & LR_StatusRecord::SUCCESS) != LR_StatusRecord::SUCCESS)
{
LogDebug(
LogWarn(
"TransitHop received non-successful LR_StatusMessage, queueing "
"self-destruct");
"self-destruct status=",
status);
QueueDestroySelf(r);
}

@ -221,7 +221,10 @@ namespace llarp
if(status == ILinkSession::DeliveryStatus::eDeliverySuccess)
DoCallback(callback, SendStatus::Success);
else
{
LogWarn("Send outbound message handler dropped message");
DoCallback(callback, SendStatus::Congestion);
}
});
}
@ -252,6 +255,10 @@ namespace llarp
!= llarp::thread::QueueReturn::Success)
{
m_queueStats.dropped++;
LogWarn(
"QueueOutboundMessage outbound message handler dropped message on "
"pathid=",
pathid);
DoCallback(callback_copy, SendStatus::Congestion);
}
else
@ -290,6 +297,10 @@ namespace llarp
}
else
{
LogWarn(
"ProcessOutboundQueue outbound message handler dropped message on "
"pathid=",
entry.pathid);
DoCallback(entry.message.second, SendStatus::Congestion);
m_queueStats.dropped++;
}

@ -57,9 +57,8 @@ namespace llarp
OutboundSessionMaker::OnConnectTimeout(ILinkSession *session)
{
// TODO: retry/num attempts
LogWarn("Session establish attempt to ", RouterID(session->GetPubKey()),
" timed out.");
" timed out.", session->GetRemoteEndpoint());
FinalizeRequest(session->GetPubKey(), SessionResult::Timeout);
}

@ -339,14 +339,18 @@ namespace llarp
if(!nextRC.Verify(time_now_ms(), false))
return false;
_rc = std::move(nextRC);
// propagate RC by renegotiating sessions
ForEachPeer([](ILinkSession *s) {
if(s->RenegotiateSession())
LogInfo("renegotiated session");
else
LogWarn("failed to renegotiate session");
});
if(rotateKeys)
{
// propagate RC by renegotiating sessions
ForEachPeer([](ILinkSession *s) {
if(s->RenegotiateSession())
LogInfo("renegotiated session");
else
LogWarn("failed to renegotiate session");
});
}
/// flood our rc
_dht->impl->FloodRCLater(dht::Key_t(pubkey()), _rc);
return SaveRC();
}

@ -3,6 +3,7 @@
#include <array>
#include <util/bencode.hpp>
<<<<<<< HEAD
#include <constants/version.hpp>
#include <constants/proto.hpp>
@ -64,8 +65,7 @@ namespace llarp
uint64_t m_ProtoVersion = LLARP_PROTO_VERSION;
};
inline std::ostream&
operator<<(std::ostream& out, const RouterVersion& rv)
inline std::ostream& operator<<(std::ostream& out, const RouterVersion& rv)
{
return out << rv.ToString();
}

@ -10,10 +10,10 @@ namespace llarp
bool
DHTMessage::DecodeKey(const llarp_buffer_t& key, llarp_buffer_t* val)
{
llarp::dht::Key_t fromKey;
fromKey.Zero();
if(key == "M")
{
llarp::dht::Key_t fromKey;
fromKey.Zero();
return llarp::dht::DecodeMesssageList(fromKey, val, M, true);
}
if(key == "S")
@ -49,7 +49,7 @@ namespace llarp
DHTMessage::HandleMessage(IMessageHandler* h, AbstractRouter* r) const
{
// set source as us
llarp::dht::Key_t us{r->pubkey()};
const llarp::dht::Key_t us(r->pubkey());
for(const auto& msg : M)
{
msg->From = us;

@ -11,6 +11,10 @@ namespace llarp
template < typename Val_t, typename Hash_t = typename Val_t::Hash >
struct DecayingHashSet
{
DecayingHashSet(std::chrono::milliseconds cacheInterval)
: DecayingHashSet(cacheInterval.count())
{
}
DecayingHashSet(llarp_time_t cacheInterval = 5000)
: m_CacheInterval(cacheInterval)
{
@ -30,7 +34,7 @@ namespace llarp
{
if(now == 0)
now = llarp::time_now_ms();
return m_Values.emplace(v, now + m_CacheInterval).second;
return m_Values.emplace(v, now).second;
}
/// decay hashset entries
@ -42,7 +46,7 @@ namespace llarp
auto itr = m_Values.begin();
while(itr != m_Values.end())
{
if(itr->second <= now)
if((m_CacheInterval + itr->second) <= now)
itr = m_Values.erase(itr);
else
++itr;
@ -55,8 +59,14 @@ namespace llarp
return m_CacheInterval;
}
void
DecayInterval(llarp_time_t interval)
{
m_CacheInterval = interval;
}
private:
const llarp_time_t m_CacheInterval;
llarp_time_t m_CacheInterval;
std::unordered_map< Val_t, llarp_time_t, Hash_t > m_Values;
};
} // namespace util

@ -85,7 +85,7 @@ namespace llarp
LogContext();
LogLevel curLevel = eLogInfo;
LogLevel startupLevel = eLogInfo;
LogLevel runtimeLevel = eLogWarn;
LogLevel runtimeLevel = eLogInfo;
ILogStream_ptr logStream;
std::string nodeName = "lokinet";
@ -106,13 +106,12 @@ namespace llarp
/** internal */
template < typename... TArgs >
void
inline static void
_Log(LogLevel lvl, const char* fname, int lineno, TArgs&&... args) noexcept
{
auto& log = LogContext::Instance();
if(log.curLevel > lvl)
return;
std::stringstream ss;
LogAppend(ss, std::forward< TArgs >(args)...);
log.logStream->AppendLog(lvl, fname, lineno, log.nodeName, ss.str());

@ -2,6 +2,7 @@
#define LLARP_TIME_HPP
#include <util/types.hpp>
#include <chrono>
namespace llarp
{

@ -118,6 +118,8 @@ namespace llarp
MOCK_CONST_METHOD0(Nodes, dht::Bucket< dht::RCNode >*());
MOCK_METHOD1(PutRCNodeAsync, void(const dht::RCNode& val));
MOCK_METHOD1(DelRCNodeAsync, void(const dht::Key_t& val));
MOCK_METHOD2(FloodRCLater, void(const dht::Key_t, const RouterContact));
};
} // namespace test

Loading…
Cancel
Save