Merge remote-tracking branch 'origin/master' into ipv6-tun

pull/686/head
Jeff Becker 5 years ago
commit dcefcd7879
No known key found for this signature in database
GPG Key ID: F357B3B42F6F9B05

@ -67,7 +67,8 @@ set(LIB_PLATFORM_SRC
ev/ev.cpp
ev/pipe.cpp
metrics/metrictank_publisher.cpp
metrics/publishers.cpp
metrics/json_publisher.cpp
metrics/stream_publisher.cpp
net/ip.cpp
net/net.cpp
net/net_addr.cpp

@ -8,7 +8,8 @@
#include <dnsd.hpp>
#include <ev/ev.hpp>
#include <metrics/metrictank_publisher.hpp>
#include <metrics/publishers.hpp>
#include <metrics/json_publisher.hpp>
#include <metrics/stream_publisher.hpp>
#include <nodedb.hpp>
#include <router/router.hpp>
#include <util/logger.h>

@ -101,4 +101,4 @@ namespace libuv
} // namespace libuv
#endif
#endif

@ -0,0 +1,177 @@
#include <metrics/json_publisher.hpp>
#include <fstream>
#include <iomanip>
#include <iostream>
namespace llarp
{
namespace metrics
{
namespace
{
nlohmann::json
tagsToJson(const Tags &tags)
{
nlohmann::json result;
std::for_each(tags.begin(), tags.end(), [&](const auto &tag) {
absl::visit([&](const auto &t) { result[tag.first] = t; },
tag.second);
});
return result;
}
template < typename Value >
nlohmann::json
formatValue(const Record< Value > &record, const Tags &tags,
double elapsedTime, Publication::Type publicationType)
{
switch(publicationType)
{
case Publication::Type::Unspecified:
{
assert(false && "Invalid publication type");
}
break;
case Publication::Type::Total:
{
return {{"tags", tagsToJson(tags)}, {"total", record.total()}};
}
break;
case Publication::Type::Count:
{
return {{"tags", tagsToJson(tags)}, {"count", record.count()}};
}
break;
case Publication::Type::Min:
{
return {{"tags", tagsToJson(tags)}, {"min", record.min()}};
}
break;
case Publication::Type::Max:
{
return {{"tags", tagsToJson(tags)}, {"max", record.max()}};
}
break;
case Publication::Type::Avg:
{
return {{"tags", tagsToJson(tags)},
{"avg", record.total() / record.count()}};
}
break;
case Publication::Type::Rate:
{
return {{"tags", tagsToJson(tags)},
{"rate", record.total() / elapsedTime}};
}
break;
case Publication::Type::RateCount:
{
return {{"tags", tagsToJson(tags)},
{"rateCount", record.count() / elapsedTime}};
}
break;
}
}
template < typename Value >
nlohmann::json
recordToJson(const TaggedRecords< Value > &taggedRecord,
double elapsedTime)
{
nlohmann::json result;
result["id"] = taggedRecord.id.toString();
auto publicationType = taggedRecord.id.description()->type();
for(const auto &rec : taggedRecord.data)
{
const auto &record = rec.second;
if(publicationType != Publication::Type::Unspecified)
{
result["publicationType"] = Publication::repr(publicationType);
result["metrics"].push_back(
formatValue(record, rec.first, elapsedTime, publicationType));
}
else
{
nlohmann::json tmp;
tmp["tags"] = tagsToJson(rec.first);
tmp["count"] = record.count();
tmp["total"] = record.total();
if(Record< Value >::DEFAULT_MIN() != record.min())
{
tmp["min"] = record.min();
}
if(Record< Value >::DEFAULT_MAX() == record.max())
{
tmp["max"] = record.max();
}
result["metrics"].push_back(tmp);
}
}
return result;
}
} // namespace
void
JsonPublisher::publish(const Sample &values)
{
if(values.recordCount() == 0)
{
// nothing to publish
return;
}
nlohmann::json result;
result["sampleTime"] = absl::UnparseFlag(values.sampleTime());
result["recordCount"] = values.recordCount();
auto gIt = values.begin();
auto prev = values.begin();
for(; gIt != values.end(); ++gIt)
{
const double elapsedTime = absl::ToDoubleSeconds(samplePeriod(*gIt));
if(gIt == prev || samplePeriod(*gIt) != samplePeriod(*prev))
{
result["elapsedTime"] = elapsedTime;
}
absl::visit(
[&](const auto &x) -> void {
for(const auto &record : x)
{
result["record"].emplace_back(
recordToJson(record, elapsedTime));
}
},
*gIt);
prev = gIt;
}
m_publish(result);
}
void
JsonPublisher::directoryPublisher(const nlohmann::json &result,
const fs::path &path)
{
std::ofstream fstream(path.string(), std::ios_base::app);
if(!fstream)
{
std::cerr << "Skipping metrics publish, " << path << " is not a file\n";
return;
}
fstream << std::setw(0) << result << '\n';
fstream.close();
}
} // namespace metrics
} // namespace llarp

@ -1,34 +1,18 @@
#ifndef LLARP_METRICS_PUBLISHERS_HPP
#define LLARP_METRICS_PUBLISHERS_HPP
#ifndef LLARP_METRICS_JSON_PUBLISHER_HPP
#define LLARP_METRICS_JSON_PUBLISHER_HPP
#include <util/fs.hpp>
#include <util/metrics_core.hpp>
#include <util/fs.hpp>
#include <nlohmann/json.hpp>
#include <functional>
#include <iosfwd>
#include <nlohmann/json.hpp>
namespace llarp
{
namespace metrics
{
class StreamPublisher final : public Publisher
{
std::ostream& m_stream;
public:
StreamPublisher(std::ostream& stream) : m_stream(stream)
{
}
~StreamPublisher()
{
}
void
publish(const Sample& values) override;
};
class JsonPublisher final : public Publisher
{
public:
@ -53,7 +37,5 @@ namespace llarp
directoryPublisher(const nlohmann::json& result, const fs::path& path);
};
} // namespace metrics
} // namespace llarp
#endif

@ -1,4 +1,4 @@
#include <metrics/publishers.hpp>
#include <metrics/stream_publisher.hpp>
#include <fstream>
#include <iostream>
@ -155,115 +155,6 @@ namespace llarp
}
stream << "\n\t\t]\n";
}
nlohmann::json
tagsToJson(const Tags &tags)
{
nlohmann::json result;
std::for_each(tags.begin(), tags.end(), [&](const auto &tag) {
absl::visit([&](const auto &t) { result[tag.first] = t; },
tag.second);
});
return result;
}
template < typename Value >
nlohmann::json
formatValue(const Record< Value > &record, const Tags &tags,
double elapsedTime, Publication::Type publicationType)
{
switch(publicationType)
{
case Publication::Type::Unspecified:
{
assert(false && "Invalid publication type");
}
break;
case Publication::Type::Total:
{
return {{"tags", tagsToJson(tags)}, {"total", record.total()}};
}
break;
case Publication::Type::Count:
{
return {{"tags", tagsToJson(tags)}, {"count", record.count()}};
}
break;
case Publication::Type::Min:
{
return {{"tags", tagsToJson(tags)}, {"min", record.min()}};
}
break;
case Publication::Type::Max:
{
return {{"tags", tagsToJson(tags)}, {"max", record.max()}};
}
break;
case Publication::Type::Avg:
{
return {{"tags", tagsToJson(tags)},
{"avg", record.total() / record.count()}};
}
break;
case Publication::Type::Rate:
{
return {{"tags", tagsToJson(tags)},
{"rate", record.total() / elapsedTime}};
}
break;
case Publication::Type::RateCount:
{
return {{"tags", tagsToJson(tags)},
{"rateCount", record.count() / elapsedTime}};
}
break;
}
}
template < typename Value >
nlohmann::json
recordToJson(const TaggedRecords< Value > &taggedRecord,
double elapsedTime)
{
nlohmann::json result;
result["id"] = taggedRecord.id.toString();
auto publicationType = taggedRecord.id.description()->type();
for(const auto &rec : taggedRecord.data)
{
const auto &record = rec.second;
if(publicationType != Publication::Type::Unspecified)
{
result["publicationType"] = Publication::repr(publicationType);
result["metrics"].push_back(
formatValue(record, rec.first, elapsedTime, publicationType));
}
else
{
nlohmann::json tmp;
tmp["tags"] = tagsToJson(rec.first);
tmp["count"] = record.count();
tmp["total"] = record.total();
if(Record< Value >::DEFAULT_MIN() != record.min())
{
tmp["min"] = record.min();
}
if(Record< Value >::DEFAULT_MAX() == record.max())
{
tmp["max"] = record.max();
}
result["metrics"].push_back(tmp);
}
}
return result;
}
} // namespace
void
@ -301,59 +192,5 @@ namespace llarp
prev = gIt;
}
}
void
JsonPublisher::publish(const Sample &values)
{
if(values.recordCount() == 0)
{
// nothing to publish
return;
}
nlohmann::json result;
result["sampleTime"] = absl::UnparseFlag(values.sampleTime());
result["recordCount"] = values.recordCount();
auto gIt = values.begin();
auto prev = values.begin();
for(; gIt != values.end(); ++gIt)
{
const double elapsedTime = absl::ToDoubleSeconds(samplePeriod(*gIt));
if(gIt == prev || samplePeriod(*gIt) != samplePeriod(*prev))
{
result["elapsedTime"] = elapsedTime;
}
absl::visit(
[&](const auto &x) -> void {
for(const auto &record : x)
{
result["record"].emplace_back(
recordToJson(record, elapsedTime));
}
},
*gIt);
prev = gIt;
}
m_publish(result);
}
void
JsonPublisher::directoryPublisher(const nlohmann::json &result,
const fs::path &path)
{
std::ofstream fstream(path.string(), std::ios_base::app);
if(!fstream)
{
std::cerr << "Skipping metrics publish, " << path << " is not a file\n";
return;
}
fstream << std::setw(0) << result << '\n';
fstream.close();
}
} // namespace metrics
} // namespace llarp

@ -0,0 +1,30 @@
#ifndef LLARP_METRICS_STREAM_PUBLISHER_HPP
#define LLARP_METRICS_STREAM_PUBLISHER_HPP
#include <util/metrics_core.hpp>
#include <iosfwd>
namespace llarp
{
namespace metrics
{
class StreamPublisher final : public Publisher
{
std::ostream& m_stream;
public:
StreamPublisher(std::ostream& stream) : m_stream(stream)
{
}
~StreamPublisher() = default;
void
publish(const Sample& values) override;
};
} // namespace metrics
} // namespace llarp
#endif

@ -13,7 +13,7 @@ using TestPipeReadFunc = std::function< bool(const llarp_buffer_t) >;
struct EventLoopTest : public ::testing::Test
{
llarp_ev_loop_ptr loop;
std::shared_ptr<llarp::Logic> _logic;
std::shared_ptr< llarp::Logic > _logic;
llarp::sodium::CryptoLibSodium crypto;
static void
@ -27,8 +27,8 @@ struct EventLoopTest : public ::testing::Test
void
SetUp()
{
loop = llarp_make_ev_loop();
_logic = std::make_shared<llarp::Logic>();
loop = llarp_make_ev_loop();
_logic = std::make_shared< llarp::Logic >();
_logic->call_later({10000, this, &OnTimeout});
}
@ -62,6 +62,7 @@ struct EventLoopTest : public ::testing::Test
TEST_F(EventLoopTest, PipeWriteOne)
{
#ifdef _WIN32
llarp::AlignedBuffer< 32 > data;
data.Randomize();
llarp_buffer_t other(data);
@ -89,10 +90,14 @@ TEST_F(EventLoopTest, PipeWriteOne)
ASSERT_TRUE(loop->add_ev(testpipe, false));
ASSERT_TRUE(testpipe->Write(other));
RunLoop();
#else
SUCCEED();
#endif
}
TEST_F(EventLoopTest, PipeWrite1K)
{
#ifdef _WIN32
struct TestPipe : public llarp_ev_pkt_pipe
{
using Data_t = std::vector< llarp::AlignedBuffer< 1500 > >;
@ -133,4 +138,7 @@ TEST_F(EventLoopTest, PipeWrite1K)
ASSERT_TRUE(loop->add_ev(testpipe, false));
testpipe->PumpIt();
RunLoop();
#else
SUCCEED();
#endif
}

@ -1,4 +1,4 @@
#include <metrics/publishers.hpp>
#include <metrics/stream_publisher.hpp>
#include <gtest/gtest.h>
#include <gmock/gmock.h>

Loading…
Cancel
Save