diff --git a/Cargo.toml b/Cargo.toml index 73a272b..0e5402c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,20 +12,20 @@ categories = ["asynchronous", "network-programming","command-line-utilities"] readme = "README.md" [dependencies] -anyhow = "1.0.33" +anyhow = "1.0.36" byteorder = "1.3.4" clap = { version = "2.33.3", default-features = false, features = ["wrap_help"] } -clockpro-cache = "0.1.8" -coarsetime = "0.1.16" +clockpro-cache = "0.1.9" +coarsetime = "0.1.18" daemonize-simple = "0.1.4" derivative = "2.1.1" dnsstamps = "0.1.4" env_logger = { version = "0.8.1", default-features = false, features = ["humantime"] } -futures = { version = "0.3.6", features = ["async-await"] } -hyper = { version = "0.13.8", default_features = false, optional = true } +futures = { version = "0.3.8", features = ["async-await"] } +hyper = { version = "0.14.0", default_features = false, features = ["server", "http1"], optional = true } ipext = "0.1.0" jemallocator = "0.3.2" -libsodium-sys-stable= "1.19.11" +libsodium-sys-stable= "1.19.12" log = { version = "0.4.11", features = ["std", "release_max_level_debug"] } socket2 = "0.3" parking_lot = "0.11" @@ -36,7 +36,7 @@ serde = "1.0" serde_derive = "1.0" serde-big-array = "0.3.0" siphasher = "0.3" -tokio = { version = "0.2.22", features = ["fs", "rt-threaded", "time", "tcp", "udp", "stream", "parking_lot"] } +tokio = { version = "1", features = ["net", "io-std", "io-util", "fs", "time", "rt-multi-thread", "parking_lot"] } toml = "0.5" [dependencies.prometheus] diff --git a/src/anonymized_dns.rs b/src/anonymized_dns.rs index 4d854e8..ab37660 100644 --- a/src/anonymized_dns.rs +++ b/src/anonymized_dns.rs @@ -78,7 +78,7 @@ pub async fn handle_anonymized_dns( != ANONYMIZED_DNSCRYPT_QUERY_MAGIC, "Loop detected" ); - let mut ext_socket = match globals.external_addr { + let ext_socket = match globals.external_addr { Some(x) => UdpSocket::bind(x).await?, None => match upstream_address { SocketAddr::V4(_) => { diff --git a/src/config.rs b/src/config.rs index d92e462..9aad07f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,7 +6,7 @@ use std::fs; use std::mem; use std::net::{IpAddr, SocketAddr}; use std::path::{Path, PathBuf}; -use tokio::prelude::*; +use tokio::io::AsyncWriteExt; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct AccessControlConfig { diff --git a/src/main.rs b/src/main.rs index 9f787f6..c1d7ea0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -69,8 +69,8 @@ use std::path::Path; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::Duration; -use tokio::net::{TcpListener, TcpStream, UdpSocket}; -use tokio::prelude::*; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket}; use tokio::runtime::Handle; use tokio::sync::oneshot; @@ -127,7 +127,7 @@ pub async fn respond_to_query(client_ctx: ClientCtx, response: Vec) -> Resul BigEndian::write_u16(&mut binlen[..], response_len as u16); client_connection.write_all(&binlen).await?; client_connection.write_all(&response).await?; - client_connection.flush(); + client_connection.flush().await?; } } Ok(()) @@ -249,41 +249,25 @@ async fn tls_proxy( None => return Ok(()), Some(tls_upstream_addr) => tls_upstream_addr, }; - let std_socket = match globals.external_addr { + let socket = match globals.external_addr { Some(x @ SocketAddr::V4(_)) => { - let kindy = socket2::Socket::new( - socket2::Domain::ipv4(), - socket2::Type::stream(), - Some(socket2::Protocol::tcp()), - )?; - kindy.bind(&x.into())?; - kindy.into_tcp_stream() + let socket = TcpSocket::new_v4()?; + socket.set_reuseaddr(true).ok(); + socket.bind(x)?; + socket } Some(x @ SocketAddr::V6(_)) => { - let kindy = socket2::Socket::new( - socket2::Domain::ipv6(), - socket2::Type::stream(), - Some(socket2::Protocol::tcp()), - )?; - kindy.bind(&x.into())?; - kindy.into_tcp_stream() + let socket = TcpSocket::new_v6()?; + socket.set_reuseaddr(true).ok(); + socket.bind(x)?; + socket } None => match tls_upstream_addr { - SocketAddr::V4(_) => socket2::Socket::new( - socket2::Domain::ipv4(), - socket2::Type::stream(), - Some(socket2::Protocol::tcp()), - )? - .into_tcp_stream(), - SocketAddr::V6(_) => socket2::Socket::new( - socket2::Domain::ipv6(), - socket2::Type::stream(), - Some(socket2::Protocol::tcp()), - )? - .into_tcp_stream(), + SocketAddr::V4(_) => TcpSocket::new_v4()?, + SocketAddr::V6(_) => TcpSocket::new_v6()?, }, }; - let mut ext_socket = TcpStream::connect_std(std_socket, tls_upstream_addr).await?; + let mut ext_socket = socket.connect(*tls_upstream_addr).await?; let (mut erh, mut ewh) = ext_socket.split(); let (mut rh, mut wh) = client_connection.split(); ewh.write_all(&binlen).await?; @@ -295,20 +279,12 @@ async fn tls_proxy( } } -async fn tcp_acceptor(globals: Arc, mut tcp_listener: TcpListener) -> Result<(), Error> { +async fn tcp_acceptor(globals: Arc, tcp_listener: TcpListener) -> Result<(), Error> { let runtime_handle = globals.runtime_handle.clone(); - let mut tcp_listener = tcp_listener.incoming(); let timeout = globals.tcp_timeout; let concurrent_connections = globals.tcp_concurrent_connections.clone(); let active_connections = globals.tcp_active_connections.clone(); - while let Some(client) = tcp_listener.next().await { - let mut client_connection: TcpStream = match client { - Ok(client_connection) => client_connection, - Err(e) => { - debug!("{}", e); - continue; - } - }; + while let Ok((mut client_connection, _client_addr)) = tcp_listener.accept().await { let (tx, rx) = oneshot::channel::<()>(); { let mut active_connections = active_connections.lock(); @@ -363,7 +339,7 @@ async fn udp_acceptor( net_udp_socket: std::net::UdpSocket, ) -> Result<(), Error> { let runtime_handle = globals.runtime_handle.clone(); - let mut tokio_udp_socket = UdpSocket::try_from(net_udp_socket.try_clone()?)?; + let tokio_udp_socket = UdpSocket::try_from(net_udp_socket.try_clone()?)?; let timeout = globals.udp_timeout; let concurrent_connections = globals.udp_concurrent_connections.clone(); let active_connections = globals.udp_active_connections.clone(); @@ -456,6 +432,7 @@ fn bind_listeners( kindy.into_tcp_listener() } }; + tcp_listener.set_nonblocking(true)?; let udp_socket = match listen_addr { SocketAddr::V4(_) => { let kindy = socket2::Socket::new( @@ -479,6 +456,7 @@ fn bind_listeners( kindy.into_udp_socket() } }; + udp_socket.set_nonblocking(true)?; sockets.push((tcp_listener, udp_socket)) } Ok(sockets) @@ -565,11 +543,10 @@ fn main() -> Result<(), Error> { }; let external_addr = config.external_addr.map(|addr| SocketAddr::new(addr, 0)); - let mut runtime_builder = tokio::runtime::Builder::new(); + let mut runtime_builder = tokio::runtime::Builder::new_multi_thread(); runtime_builder.enable_all(); - runtime_builder.threaded_scheduler(); runtime_builder.thread_name("encrypted-dns-"); - let mut runtime = runtime_builder.build()?; + let runtime = runtime_builder.build()?; let listen_addrs: Vec<_> = config.listen_addrs.iter().map(|x| x.local).collect(); let listeners = bind_listeners(&listen_addrs) diff --git a/src/metrics.rs b/src/metrics.rs index 1d56da3..3b33b94 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -77,7 +77,8 @@ pub async fn prometheus_service( kindy.into_tcp_listener() } }; - let mut stream = TcpListener::from_std(std_socket)?; + std_socket.set_nonblocking(true)?; + let stream = TcpListener::from_std(std_socket)?; let concurrent_connections = Arc::new(AtomicU32::new(0)); loop { let (client, _client_addr) = stream.accept().await?; diff --git a/src/resolver.rs b/src/resolver.rs index 5ebb944..f6e3f97 100644 --- a/src/resolver.rs +++ b/src/resolver.rs @@ -10,8 +10,8 @@ use siphasher::sip128::Hasher128; use std::cmp; use std::hash::Hasher; use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; -use tokio::net::{TcpStream, UdpSocket}; -use tokio::prelude::*; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpSocket, UdpSocket}; pub async fn resolve_udp( globals: &Globals, @@ -67,7 +67,8 @@ pub async fn resolve_udp( } }, }; - let mut ext_socket = UdpSocket::from_std(std_socket)?; + std_socket.set_nonblocking(true)?; + let ext_socket = UdpSocket::from_std(std_socket)?; ext_socket.connect(&globals.upstream_addr).await?; dns::set_edns_max_payload_size(&mut packet, DNS_MAX_PACKET_SIZE as u16)?; let mut response; @@ -110,47 +111,31 @@ pub async fn resolve_tcp( packet_qname: &[u8], tid: u16, ) -> Result, Error> { - let std_socket = match globals.external_addr { + let socket = match globals.external_addr { Some(x @ SocketAddr::V4(_)) => { - let kindy = socket2::Socket::new( - socket2::Domain::ipv4(), - socket2::Type::stream(), - Some(socket2::Protocol::tcp()), - )?; - kindy.bind(&x.into())?; - kindy.into_tcp_stream() + let socket = TcpSocket::new_v4()?; + socket.set_reuseaddr(true).ok(); + socket.bind(x)?; + socket } Some(x @ SocketAddr::V6(_)) => { - let kindy = socket2::Socket::new( - socket2::Domain::ipv6(), - socket2::Type::stream(), - Some(socket2::Protocol::tcp()), - )?; - kindy.bind(&x.into())?; - kindy.into_tcp_stream() + let socket = TcpSocket::new_v6()?; + socket.set_reuseaddr(true).ok(); + socket.bind(x)?; + socket } None => match globals.upstream_addr { - SocketAddr::V4(_) => socket2::Socket::new( - socket2::Domain::ipv4(), - socket2::Type::stream(), - Some(socket2::Protocol::tcp()), - )? - .into_tcp_stream(), - SocketAddr::V6(_) => socket2::Socket::new( - socket2::Domain::ipv6(), - socket2::Type::stream(), - Some(socket2::Protocol::tcp()), - )? - .into_tcp_stream(), + SocketAddr::V4(_) => TcpSocket::new_v4()?, + SocketAddr::V6(_) => TcpSocket::new_v6()?, }, }; - let mut ext_socket = TcpStream::connect_std(std_socket, &globals.upstream_addr).await?; + let mut ext_socket = socket.connect(globals.upstream_addr).await?; ext_socket.set_nodelay(true)?; let mut binlen = [0u8, 0]; BigEndian::write_u16(&mut binlen[..], packet.len() as u16); ext_socket.write_all(&binlen).await?; ext_socket.write_all(&packet).await?; - ext_socket.flush(); + ext_socket.flush().await?; ext_socket.read_exact(&mut binlen).await?; let response_len = BigEndian::read_u16(&binlen) as usize; ensure!(