From f7b2a1777a0f62fcfaac47e3ed102d1497fe8171 Mon Sep 17 00:00:00 2001 From: Frank Denis Date: Sun, 8 Sep 2019 20:08:20 +0200 Subject: [PATCH] up --- Cargo.toml | 3 +- src/main.rs | 116 +++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 90 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d588921..af18385 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,8 +14,9 @@ env_logger = "0.6.2" failure = "0.1.5" futures-preview = { version = "=0.3.0-alpha.18", features = ["compat", "async-await", "nightly", "io-compat", "cfg-target-has-atomic"] } jemallocator = "0.3.2" -libsodium-sys="0.2.3" +libsodium-sys="0.2.4" log = "0.4.8" +parking_lot = "0.9.0" tokio = "=0.2.0-alpha.4" [profile.release] diff --git a/src/main.rs b/src/main.rs index 508ec30..a4c4fe5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -34,59 +34,119 @@ use dnsstamps::{InformalProperty, WithInformalProperty}; use failure::{bail, ensure}; use futures::prelude::*; use futures::{FutureExt, StreamExt}; +use parking_lot::RwLock; +use std::convert::TryFrom; +use std::mem; use std::net::SocketAddr; +use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use std::sync::Arc; -use tokio::net::{TcpListener, UdpSocket}; +use tokio::net::{TcpListener, TcpStream, UdpSocket}; use tokio::prelude::*; -use tokio::runtime::Runtime; +use tokio::runtime::{current_thread::Handle, Runtime}; const DNSCRYPT_QUERY_MIN_SIZE: usize = 12; const DNSCRYPT_QUERY_MAX_SIZE: usize = 512; +#[derive(Debug)] +struct UdpClientCtx { + udp_socket_fd: RawFd, + packet: Vec, + client_addr: SocketAddr, +} + +#[derive(Debug)] +struct TcpClientCtx { + packet: Vec, + client_connection: TcpStream, +} + +#[derive(Debug)] +enum ClientCtx { + Udp(UdpClientCtx), + Tcp(TcpClientCtx), +} + +async fn respond_to_query(client_ctx: ClientCtx) -> Result<(), Error> { + match client_ctx { + ClientCtx::Udp(client_ctx) => { + let packet = client_ctx.packet; + let udp_socket = unsafe { std::net::UdpSocket::from_raw_fd(client_ctx.udp_socket_fd) }; + let _ = udp_socket.send_to(&packet, client_ctx.client_addr); + mem::forget(udp_socket); + } + ClientCtx::Tcp(client_ctx) => {} + } + Ok(()) +} + +async fn handle_client_query(client_ctx: ClientCtx) -> Result<(), Error> { + // if let Some(synth_packet) = + // serve_certificates(&packet, &globals.provider_name, &globals.dnscrypt_certs)? + // { + // let _ = udp_socket.send_to(&synth_packet, client_addr).await; + // continue; + // } + // truncate(&mut packet); + // let _ = udp_socket.send_to(&packet, client_addr).await; + + dbg!(&client_ctx); + respond_to_query(client_ctx).await +} + async fn tcp_acceptor(globals: Arc, tcp_listener: TcpListener) -> Result<(), Error> { + let runtime = globals.runtime.clone(); let mut tcp_listener = tcp_listener.incoming(); while let Some(client) = tcp_listener.next().await { - let mut client = match client { - Ok(client) => client, - Err(_) => continue, + let mut client_connection: TcpStream = match client { + Ok(client_connection) => client_connection, + Err(e) => bail!(e), }; - let mut binlen = [0u8, 0]; - client.read_exact(&mut binlen).await?; - let packet_len = BigEndian::read_u16(&binlen) as usize; - ensure!( - (DNSCRYPT_QUERY_MIN_SIZE..=DNSCRYPT_QUERY_MAX_SIZE).contains(&packet_len), - "Unexpected query size" + runtime.spawn( + async { + let mut binlen = [0u8, 0]; + client_connection.read_exact(&mut binlen).await?; + let packet_len = BigEndian::read_u16(&binlen) as usize; + ensure!( + (DNSCRYPT_QUERY_MIN_SIZE..=DNSCRYPT_QUERY_MAX_SIZE).contains(&packet_len), + "Unexpected query size" + ); + let mut packet = vec![0u8; packet_len]; + client_connection.read_exact(&mut packet).await?; + let client_ctx = ClientCtx::Tcp(TcpClientCtx { + packet, + client_connection, + }); + let _ = handle_client_query(client_ctx).await; + Ok(()) + } + .map(|_| ()), ); - let mut packet = vec![0u8; packet_len]; - client.read_exact(&mut packet).await?; - dbg!(packet); } Ok(()) } -async fn udp_acceptor(globals: Arc, mut udp_listener: UdpSocket) -> Result<(), Error> { +async fn udp_acceptor(globals: Arc, mut udp_socket: UdpSocket) -> Result<(), Error> { + let runtime = globals.runtime.clone(); loop { let mut packet = vec![0u8; DNSCRYPT_QUERY_MAX_SIZE]; - let (packet_len, client_addr) = udp_listener.recv_from(&mut packet).await?; - dbg!(&packet); - let mut packet = &mut packet[..packet_len]; - if let Some(synth_packet) = - serve_certificates(&packet, &globals.provider_name, &globals.dnscrypt_certs)? - { - let _ = udp_listener.send_to(&synth_packet, client_addr).await; - continue; - } - truncate(&mut packet); - let _ = udp_listener.send_to(&packet, client_addr).await; + let (packet_len, client_addr) = udp_socket.recv_from(&mut packet).await?; + let udp_socket_fd = udp_socket.as_raw_fd(); + packet.truncate(packet_len); + let client_ctx = ClientCtx::Udp(UdpClientCtx { + udp_socket_fd, + packet, + client_addr, + }); + runtime.spawn(async { handle_client_query(client_ctx).await }.map(|_| ())); } } async fn start(globals: Arc, runtime: Arc) -> Result<(), Error> { let socket_addr: SocketAddr = globals.listen_addr; let tcp_listener = TcpListener::bind(&socket_addr).await?; - let udp_listener = UdpSocket::bind(&socket_addr).await?; + let udp_socket = UdpSocket::bind(&socket_addr).await?; runtime.spawn(tcp_acceptor(globals.clone(), tcp_listener).map(|_| {})); - runtime.spawn(udp_acceptor(globals.clone(), udp_listener).map(|_| {})); + runtime.spawn(udp_acceptor(globals.clone(), udp_socket).map(|_| {})); Ok(()) }