Prometheus metrics

pull/5/head
Frank Denis 5 years ago
parent f77a5aed47
commit 27e6097dc9

@ -21,7 +21,7 @@ derivative = "1.0.3"
dnsstamps = "0.1.1" dnsstamps = "0.1.1"
env_logger = { version="0.7.0", default-features = false, features = ["humantime"]} env_logger = { version="0.7.0", default-features = false, features = ["humantime"]}
failure = "0.1.5" failure = "0.1.5"
futures-preview = { version = "=0.3.0-alpha.18", features = ["async-await", "nightly", "cfg-target-has-atomic"] } futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await", "unstable", "cfg-target-has-atomic"] }
jemallocator = "0.3.2" jemallocator = "0.3.2"
libsodium-sys-stable="1.18.1" libsodium-sys-stable="1.18.1"
log = { version = "0.4.8", features = ["std", "release_max_level_debug"] } log = { version = "0.4.8", features = ["std", "release_max_level_debug"] }
@ -33,13 +33,13 @@ serde = "1.0.101"
serde_derive = "1.0.101" serde_derive = "1.0.101"
serde-big-array = "0.1.5" serde-big-array = "0.1.5"
siphasher = "0.3.1" siphasher = "0.3.1"
tokio = "=0.2.0-alpha.5" tokio = "=0.2.0-alpha.6"
tokio-net = "=0.2.0-alpha.5" tokio-net = "=0.2.0-alpha.6"
toml = "0.5.3" toml = "0.5.3"
[dependencies.hyper] [dependencies.hyper]
optional = true optional = true
version = "0.13.0-alpha.2" version = "0.13.0-alpha.3"
default_features = false default_features = false
[dependencies.prometheus] [dependencies.prometheus]

@ -166,3 +166,17 @@ key_cache_capacity = 10000
[filtering] [filtering]
# domain_blacklist = "/etc/domain_blacklist.txt" # domain_blacklist = "/etc/domain_blacklist.txt"
#######################################
# Server-side filtering #
#######################################
[metrics]
type = "prometheus"
listen_addr = "0.0.0.0:9100"
path = "/metrics"

@ -9,7 +9,15 @@ use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use tokio::prelude::*; use tokio::prelude::*;
#[derive(Serialize, Deserialize, Debug)] #[cfg(feature = "metrics")]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct MetricsConfig {
pub r#type: String,
pub listen_addr: SocketAddr,
pub path: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DNSCryptConfig { pub struct DNSCryptConfig {
pub provider_name: String, pub provider_name: String,
pub key_cache_capacity: usize, pub key_cache_capacity: usize,
@ -18,23 +26,23 @@ pub struct DNSCryptConfig {
pub no_logs: bool, pub no_logs: bool,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TLSConfig { pub struct TLSConfig {
pub upstream_addr: Option<SocketAddr>, pub upstream_addr: Option<SocketAddr>,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ListenAddrConfig { pub struct ListenAddrConfig {
pub local: SocketAddr, pub local: SocketAddr,
pub external: SocketAddr, pub external: SocketAddr,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FilteringConfig { pub struct FilteringConfig {
pub domain_blacklist: Option<PathBuf>, pub domain_blacklist: Option<PathBuf>,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Config { pub struct Config {
pub listen_addrs: Vec<ListenAddrConfig>, pub listen_addrs: Vec<ListenAddrConfig>,
pub external_addr: IpAddr, pub external_addr: IpAddr,
@ -57,6 +65,8 @@ pub struct Config {
pub daemonize: bool, pub daemonize: bool,
pub pid_file: Option<PathBuf>, pub pid_file: Option<PathBuf>,
pub log_file: Option<PathBuf>, pub log_file: Option<PathBuf>,
#[cfg(feature = "metrics")]
pub metrics: Option<MetricsConfig>,
} }
impl Config { impl Config {

@ -2,6 +2,8 @@ use crate::blacklist::*;
use crate::cache::*; use crate::cache::*;
use crate::crypto::*; use crate::crypto::*;
use crate::dnscrypt_certs::*; use crate::dnscrypt_certs::*;
#[cfg(feature = "metrics")]
use crate::varz::*;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use siphasher::sip128::SipHasher13; use siphasher::sip128::SipHasher13;
@ -14,7 +16,8 @@ use std::time::Duration;
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use tokio::sync::oneshot; use tokio::sync::oneshot;
#[derive(Debug, Clone)] #[derive(Clone, Derivative)]
#[derivative(Debug)]
pub struct Globals { pub struct Globals {
pub runtime: Arc<Runtime>, pub runtime: Arc<Runtime>,
pub state_file: PathBuf, pub state_file: PathBuf,
@ -37,4 +40,7 @@ pub struct Globals {
pub hasher: SipHasher13, pub hasher: SipHasher13,
pub cache: Cache, pub cache: Cache,
pub blacklist: Option<BlackList>, pub blacklist: Option<BlackList>,
#[cfg(feature = "metrics")]
#[derivative(Debug = "ignore")]
pub varz: Varz,
} }

@ -46,6 +46,8 @@ use dnscrypt::*;
use dnscrypt_certs::*; use dnscrypt_certs::*;
use errors::*; use errors::*;
use globals::*; use globals::*;
#[cfg(feature = "metrics")]
use varz::*;
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use clap::Arg; use clap::Arg;
@ -566,11 +568,30 @@ fn main() -> Result<(), Error> {
hasher, hasher,
cache, cache,
blacklist, blacklist,
#[cfg(feature = "metrics")]
varz: Varz::default(),
}); });
let updater = DNSCryptEncryptionParamsUpdater::new(globals.clone()); let updater = DNSCryptEncryptionParamsUpdater::new(globals.clone());
if !state_is_new { if !state_is_new {
updater.update(); updater.update();
} }
#[cfg(feature = "metrics")]
{
if let Some(metrics_config) = config.metrics {
runtime.spawn(
metrics::prometheus_service(
globals.varz.clone(),
metrics_config.clone(),
runtime.clone(),
)
.map_err(|e| {
error!("Unable to start the metrics service: [{}]", e);
std::process::exit(1);
})
.map(|_| ()),
);
}
}
runtime.spawn( runtime.spawn(
start(globals, runtime.clone(), listeners) start(globals, runtime.clone(), listeners)
.map_err(|e| { .map_err(|e| {
@ -579,8 +600,6 @@ fn main() -> Result<(), Error> {
}) })
.map(|_| ()), .map(|_| ()),
); );
#[cfg(feature = "metrics")]
runtime.spawn(metrics::prometheus_service(runtime.clone()).map(|_| ()));
runtime.block_on(updater.run()); runtime.block_on(updater.run());
Ok(()) Ok(())
} }

@ -1,23 +1,56 @@
use crate::config::*;
use crate::errors::*; use crate::errors::*;
use crate::varz::*;
use futures::FutureExt; use futures::prelude::*;
use hyper::header::CONTENT_TYPE;
use hyper::server::conn::Http; use hyper::server::conn::Http;
use hyper::service::service_fn; use hyper::service::service_fn;
use hyper::{Body, Request, Response}; use hyper::{Body, Request, Response, StatusCode};
use prometheus::{self, Encoder, TextEncoder};
use std::sync::Arc; use std::sync::Arc;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
async fn handle_client_connection(_req: Request<Body>) -> Result<Response<Body>, Error> { async fn handle_client_connection(
let res = Response::new(Body::from("OK\n")); req: Request<Body>,
Ok(res) varz: Varz,
path: Arc<String>,
) -> Result<Response<Body>, Error> {
let mut buffer = vec![];
if req.uri().path() != path.as_str() {
let response = Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())?;
return Ok(response);
}
let StartInstant(start_instant) = varz.start_instant;
let uptime = start_instant.elapsed().as_secs();
varz.uptime.set(uptime as f64);
let client_queries = varz.client_queries_udp.get() + varz.client_queries_tcp.get();
varz.client_queries.set(client_queries);
let metric_families = prometheus::gather();
let encoder = TextEncoder::new();
encoder.encode(&metric_families, &mut buffer)?;
let response = Response::builder()
.header(CONTENT_TYPE, encoder.format_type())
.body(buffer.into())?;
Ok(response)
} }
pub async fn prometheus_service(runtime: Arc<Runtime>) -> Result<(), Error> { pub async fn prometheus_service(
let mut stream = TcpListener::bind("0.0.0.0:8000").await?; varz: Varz,
metrics_config: MetricsConfig,
runtime: Arc<Runtime>,
) -> Result<(), Error> {
let path = Arc::new(metrics_config.path);
let mut stream = TcpListener::bind(metrics_config.listen_addr).await?;
loop { loop {
let (client, _client_addr) = stream.accept().await?; let (client, _client_addr) = stream.accept().await?;
let service = service_fn(handle_client_connection); let path = path.clone();
let varz = varz.clone();
let service =
service_fn(move |req| handle_client_connection(req, varz.clone(), path.clone()));
let connection = Http::new().serve_connection(client, service); let connection = Http::new().serve_connection(client, service);
runtime.spawn(connection.map(|_| {})); runtime.spawn(connection.map(|_| {}));
} }

Loading…
Cancel
Save