diff --git a/Cargo.lock b/Cargo.lock index d650370d..52c2bc21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1220,7 +1220,6 @@ dependencies = [ "env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "json5 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)", "libsodium-ffi 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index e8b6e9cb..8c5ace58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,6 @@ libc = "0.2" futures = "0.1" tokio-io = "0.1" tokio = "0.1" -lazy_static = "1.0" json5 = "0.2" base64 = "0.9" bytes = "0.4" diff --git a/src/config.rs b/src/config.rs index ec94e8c2..b9ccfada 100644 --- a/src/config.rs +++ b/src/config.rs @@ -730,6 +730,11 @@ impl Config { } } + // UDP switch + if let Some(enable) = config.enable_udp { + nconfig.enable_udp = enable; + } + Ok(nconfig) } diff --git a/src/context.rs b/src/context.rs new file mode 100644 index 00000000..08061217 --- /dev/null +++ b/src/context.rs @@ -0,0 +1,60 @@ +//! Shadowsocks Server Context + +use std::{ + net::SocketAddr, + sync::{Arc, Mutex, MutexGuard}, + time::Instant, +}; + +use lru_cache::LruCache; +use trust_dns_resolver::AsyncResolver; + +use config::Config; +use relay::dns_resolver::create_resolver; + +type DnsQueryCache = LruCache; + +#[derive(Clone)] +pub struct Context { + config: Config, + dns_resolver: Arc, + dns_query_cache: Option>>, +} + +pub type SharedContext = Arc; + +impl Context { + pub fn new(config: Config) -> Context { + let resolver = create_resolver(config.get_dns_config()); + Context { + config: config, + dns_resolver: Arc::new(resolver), + dns_query_cache: None, + } + } + + pub fn new_dns(config: Config) -> Context { + let resolver = create_resolver(config.get_dns_config()); + Context { + config: config, + dns_resolver: Arc::new(resolver), + dns_query_cache: Some(Arc::new(Mutex::new(LruCache::new(1024)))), + } + } + + pub fn config(&self) -> &Config { + &self.config + } + + pub fn config_mut(&mut self) -> &mut Config { + &mut self.config + } + + pub fn dns_resolver(&self) -> &AsyncResolver { + &*self.dns_resolver + } + + pub fn dns_query_cache<'a>(&'a self) -> MutexGuard<'a, DnsQueryCache> { + self.dns_query_cache.as_ref().unwrap().lock().unwrap() + } +} diff --git a/src/lib.rs b/src/lib.rs index c3a00f8c..4f5988e2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -77,8 +77,6 @@ extern crate bytes; extern crate digest; #[macro_use] extern crate futures; -#[macro_use] -extern crate lazy_static; extern crate libc; #[cfg(feature = "sodium")] extern crate libsodium_ffi; @@ -117,6 +115,7 @@ pub use self::{ }; pub mod config; +mod context; pub mod crypto; mod monitor; pub mod plugin; diff --git a/src/relay/dns.rs b/src/relay/dns.rs index 4bcd84c1..c8e2b2ef 100644 --- a/src/relay/dns.rs +++ b/src/relay/dns.rs @@ -1,19 +1,14 @@ //! DNS relay -use std::{io, sync::Arc}; +use std::io; -use futures::Future; +use futures::{self, Future}; -use super::dns_resolver::set_dns_config; use config::Config; +use context::{Context, SharedContext}; use relay::udprelay::dns::run as run_udp; /// DNS Relay server running under local environment. pub fn run(config: Config) -> impl Future + Send { - if let Some(c) = config.get_dns_config() { - set_dns_config(c); - } - - let config = Arc::new(config); - run_udp(config) + futures::lazy(move || run_udp(SharedContext::new(Context::new_dns(config)))) } diff --git a/src/relay/dns_resolver.rs b/src/relay/dns_resolver.rs index 78458060..c5618193 100644 --- a/src/relay/dns_resolver.rs +++ b/src/relay/dns_resolver.rs @@ -3,41 +3,20 @@ use std::{ io::{self, ErrorKind}, net::SocketAddr, - sync::Arc, }; use futures::Future; -use spin::Mutex; use tokio; use trust_dns_resolver::{config::ResolverConfig, AsyncResolver}; -use config::Config; +use context::SharedContext; -// Taken from -// bluejekyll/trust-dns/resolver/examples/global_resolver.rs -lazy_static! { - static ref GLOBAL_DNS_ADDRESS: Mutex> = Mutex::new(None); - - // First we need to setup the global Resolver - static ref GLOBAL_DNS_RESOLVER: AsyncResolver = init_resolver(); -} - -/// Set address for global DNS resolver -/// Must be called before servers are actually run -pub fn set_dns_config(addr: ResolverConfig) { - *GLOBAL_DNS_ADDRESS.lock() = Some(addr); -} - -fn get_dns_address() -> Option { - GLOBAL_DNS_ADDRESS.lock().clone() -} - -fn init_resolver() -> AsyncResolver { +pub fn create_resolver(dns: Option) -> AsyncResolver { let (resolver, bg) = { // To make this independent, if targeting macOS, BSD, Linux, or Windows, we can use the system's configuration: #[cfg(any(unix, windows))] { - if let Some(conf) = get_dns_address() { + if let Some(conf) = dns { use trust_dns_resolver::config::ResolverOpts; AsyncResolver::new(conf, ResolverOpts::default()) } else { @@ -54,7 +33,7 @@ fn init_resolver() -> AsyncResolver { // Directly reference the config types use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; - if let Some(conf) = get_dns_address() { + if let Some(conf) = dns { AsyncResolver::new(conf, ResolverOpts::default()) } else { // Get a new resolver with the google nameservers as the upstream recursive resolvers @@ -70,15 +49,17 @@ fn init_resolver() -> AsyncResolver { } fn inner_resolve( - config: Arc, + context: SharedContext, addr: &str, port: u16, check_forbidden: bool, ) -> impl Future, Error = io::Error> + Send { let owned_addr = addr.to_owned(); let owned_addr2 = owned_addr.clone(); + let cloned_context = context.clone(); - GLOBAL_DNS_RESOLVER + context + .dns_resolver() .lookup_ip(addr) .map_err(move |err| { error!("Failed to resolve {}, err: {}", owned_addr2, err); @@ -88,7 +69,7 @@ fn inner_resolve( let mut vaddr = Vec::new(); for ip in lookup_result.iter() { if check_forbidden { - let forbidden_ip = &config.forbidden_ip; + let forbidden_ip = &cloned_context.config().forbidden_ip; if forbidden_ip.contains(&ip) { debug!("Resolved {} => {}, which is skipped by forbidden_ip", owned_addr, ip); continue; @@ -112,10 +93,10 @@ fn inner_resolve( /// Resolve address to IP pub fn resolve( - config: Arc, + context: SharedContext, addr: &str, port: u16, check_forbidden: bool, ) -> impl Future, Error = io::Error> + Send { - inner_resolve(config, addr, port, check_forbidden) + inner_resolve(context, addr, port, check_forbidden) } diff --git a/src/relay/local.rs b/src/relay/local.rs index 0fea9574..c577782f 100644 --- a/src/relay/local.rs +++ b/src/relay/local.rs @@ -1,11 +1,11 @@ //! Local side -use std::{io, sync::Arc}; +use std::io; use futures::{stream::futures_unordered, Future, Stream}; -use super::dns_resolver::set_dns_config; use config::Config; +use context::{Context, SharedContext}; use plugin::{launch_plugin, PluginMode}; use relay::{boxed_future, tcprelay::local::run as run_tcp, udprelay::local::run as run_udp}; @@ -33,39 +33,36 @@ use relay::{boxed_future, tcprelay::local::run as run_tcp, udprelay::local::run /// let fut = run(config); /// tokio::run(fut.map_err(|err| panic!("Server run failed with error {}", err))); /// ``` -pub fn run(mut config: Config) -> impl Future + Send { - if let Some(c) = config.get_dns_config() { - set_dns_config(c); - } +pub fn run(config: Config) -> impl Future + Send { + futures::lazy(move || { + let mut vf = Vec::new(); - let mut vf = Vec::new(); + let mut context = Context::new(config); - if config.enable_udp { - // Clone config here, because the config for TCP relay will be modified - // after plugins started - let udp_config = Arc::new(config.clone()); + if context.config().enable_udp { + // Clone config here, because the config for TCP relay will be modified + // after plugins started + let udp_context = SharedContext::new(context.clone()); - // Run UDP relay before starting plugins - // Because plugins doesn't support UDP relay - let udp_fut = run_udp(udp_config); - vf.push(boxed_future(udp_fut)); - } - - // Hold it here, kill all plugins when `tokio::run` is finished - let plugins = launch_plugin(&mut config, PluginMode::Client).expect("Failed to launch plugins"); - let mon = ::monitor::monitor_signal(plugins); - - // Recreate shared config here - let config = Arc::new(config); - - let tcp_fut = run_tcp(config.clone()); - - vf.push(boxed_future(mon)); - vf.push(boxed_future(tcp_fut)); - futures_unordered(vf).into_future().then(|res| -> io::Result<()> { - match res { - Ok(..) => Ok(()), - Err((err, ..)) => Err(err), + // Run UDP relay before starting plugins + // Because plugins doesn't support UDP relay + let udp_fut = run_udp(udp_context); + vf.push(boxed_future(udp_fut)); } + + // Hold it here, kill all plugins when `tokio::run` is finished + let plugins = launch_plugin(context.config_mut(), PluginMode::Client).expect("Failed to launch plugins"); + let mon = ::monitor::monitor_signal(plugins); + + let tcp_fut = run_tcp(SharedContext::new(context)); + + vf.push(boxed_future(mon)); + vf.push(boxed_future(tcp_fut)); + futures_unordered(vf).into_future().then(|res| -> io::Result<()> { + match res { + Ok(..) => Ok(()), + Err((err, ..)) => Err(err), + } + }) }) } diff --git a/src/relay/mod.rs b/src/relay/mod.rs index c96aa7af..9901b34a 100644 --- a/src/relay/mod.rs +++ b/src/relay/mod.rs @@ -3,7 +3,7 @@ use futures::Future; pub mod dns; -mod dns_resolver; +pub(crate) mod dns_resolver; mod loadbalancing; pub mod local; pub mod server; diff --git a/src/relay/server.rs b/src/relay/server.rs index 149fa4e3..654e070d 100644 --- a/src/relay/server.rs +++ b/src/relay/server.rs @@ -1,11 +1,11 @@ //! Server side -use std::{io, sync::Arc}; +use std::io; use futures::{stream::futures_unordered, Future, Stream}; -use super::dns_resolver::set_dns_config; use config::Config; +use context::{Context, SharedContext}; use plugin::{launch_plugin, PluginMode}; use relay::{boxed_future, tcprelay::server::run as run_tcp, udprelay::server::run as run_udp}; @@ -33,40 +33,37 @@ use relay::{boxed_future, tcprelay::server::run as run_tcp, udprelay::server::ru /// let fut = run(config); /// tokio::run(fut.map_err(|err| panic!("Server run failed with error {}", err))); /// ``` -pub fn run(mut config: Config) -> impl Future + Send { - if let Some(c) = config.get_dns_config() { - set_dns_config(c); - } +pub fn run(config: Config) -> impl Future + Send { + futures::lazy(move || { + let mut context = Context::new(config); - let mut vf = Vec::new(); + let mut vf = Vec::new(); - if config.enable_udp { - // Clone config here, because the config for TCP relay will be modified - // after plugins started - let udp_config = Arc::new(config.clone()); + if context.config().enable_udp { + // Clone config here, because the config for TCP relay will be modified + // after plugins started + let udp_context = SharedContext::new(context.clone()); - // Run UDP relay before starting plugins - // Because plugins doesn't support UDP relay - let udp_fut = run_udp(udp_config); - vf.push(boxed_future(udp_fut)); - } - - // Hold it here, kill all plugins when `tokio::run` is finished - let plugins = launch_plugin(&mut config, PluginMode::Server).expect("Failed to launch plugins"); - let mon = ::monitor::monitor_signal(plugins); - - // Recreate shared config here - let config = Arc::new(config); - - let tcp_fut = run_tcp(config.clone()); - - vf.push(boxed_future(mon)); - vf.push(boxed_future(tcp_fut)); - - futures_unordered(vf).into_future().then(|res| -> io::Result<()> { - match res { - Ok(..) => Ok(()), - Err((err, ..)) => Err(err), + // Run UDP relay before starting plugins + // Because plugins doesn't support UDP relay + let udp_fut = run_udp(udp_context); + vf.push(boxed_future(udp_fut)); } + + // Hold it here, kill all plugins when `tokio::run` is finished + let plugins = launch_plugin(context.config_mut(), PluginMode::Server).expect("Failed to launch plugins"); + let mon = ::monitor::monitor_signal(plugins); + + let tcp_fut = run_tcp(SharedContext::new(context)); + + vf.push(boxed_future(mon)); + vf.push(boxed_future(tcp_fut)); + + futures_unordered(vf).into_future().then(|res| -> io::Result<()> { + match res { + Ok(..) => Ok(()), + Err((err, ..)) => Err(err), + } + }) }) } diff --git a/src/relay/tcprelay/local.rs b/src/relay/tcprelay/local.rs index 78765936..a05fa2f8 100644 --- a/src/relay/tcprelay/local.rs +++ b/src/relay/tcprelay/local.rs @@ -1,13 +1,13 @@ //! Relay for TCP server that running on local environment -use std::{io, sync::Arc}; +use std::io; use futures::Future; use super::socks5_local; -use config::Config; +use context::SharedContext; /// Starts a TCP local server -pub fn run(config: Arc) -> impl Future + Send { - socks5_local::run(config) +pub fn run(context: SharedContext) -> impl Future + Send { + socks5_local::run(context) } diff --git a/src/relay/tcprelay/mod.rs b/src/relay/tcprelay/mod.rs index 30af78bd..c178a183 100644 --- a/src/relay/tcprelay/mod.rs +++ b/src/relay/tcprelay/mod.rs @@ -9,7 +9,8 @@ use std::{ time::Duration, }; -use config::{Config, ServerAddr, ServerConfig}; +use config::{ServerAddr, ServerConfig}; +use context::SharedContext; use crypto::CipherCategory; use relay::{boxed_future, dns_resolver::resolve, socks5::Address}; @@ -227,7 +228,7 @@ impl> Future for TcpStreamConnect { } fn connect_proxy_server( - config: Arc, + context: SharedContext, svr_cfg: Arc, ) -> impl Future + Send { let timeout = svr_cfg.timeout(); @@ -239,7 +240,7 @@ fn connect_proxy_server( } ServerAddr::DomainName(ref domain, port) => { let fut = { - try_timeout(resolve(config.clone(), &domain[..], port, false), timeout).and_then(move |vec_ipaddr| { + try_timeout(resolve(context, &domain[..], port, false), timeout).and_then(move |vec_ipaddr| { let fut = TcpStreamConnect::new(vec_ipaddr.into_iter()); try_timeout(fut, timeout) }) diff --git a/src/relay/tcprelay/server.rs b/src/relay/tcprelay/server.rs index e713982c..88e97e52 100644 --- a/src/relay/tcprelay/server.rs +++ b/src/relay/tcprelay/server.rs @@ -7,7 +7,7 @@ use std::{ time::Duration, }; -use config::{Config, ServerConfig}; +use config::ServerConfig; use relay::{ boxed_future, @@ -16,6 +16,8 @@ use relay::{ tcprelay::crypto_io::{DecryptedRead, EncryptedWrite}, }; +use context::SharedContext; + use futures::{ self, stream::{futures_unordered, Stream}, @@ -37,7 +39,7 @@ use super::{proxy_handshake, try_timeout, tunnel, DecryptedHalf, EncryptedHalf, pub struct TcpRelayClientHandshake { s: TcpStream, svr_cfg: Arc, - config: Arc, + context: SharedContext, } impl TcpRelayClientHandshake { @@ -59,7 +61,7 @@ impl TcpRelayClientHandshake { Item = TcpRelayClientPending + Send + 'static>, Error = io::Error, > + Send { - let TcpRelayClientHandshake { s, svr_cfg, config } = self; + let TcpRelayClientHandshake { s, svr_cfg, context } = self; futures::lazy(move || s.peer_addr().map(|p| (s, p))).and_then(|(s, peer_addr)| { debug!("Handshaking with peer {}", peer_addr); @@ -77,7 +79,7 @@ impl TcpRelayClientHandshake { addr: addr, w: w_fut, timeout: timeout, - config: config, + context: context, }) }) }) @@ -93,13 +95,13 @@ where addr: Address, w: E, timeout: Option, - config: Arc, + context: SharedContext, } /// Connect to the remote server #[inline] fn connect_remote( - config: Arc, + context: SharedContext, addr: Address, timeout: Option, ) -> impl Future + Send { @@ -107,7 +109,7 @@ fn connect_remote( match addr { Address::SocketAddress(saddr) => { - if config.forbidden_ip.contains(&saddr.ip()) { + if context.config().forbidden_ip.contains(&saddr.ip()) { let err = io::Error::new( ErrorKind::Other, format!("{} is forbidden, failed to connect {}", saddr.ip(), saddr), @@ -121,7 +123,7 @@ fn connect_remote( } Address::DomainNameAddress(dname, port) => { let fut = { - try_timeout(resolve(config, dname.as_str(), port, true), timeout).and_then(move |addrs| { + try_timeout(resolve(context, dname.as_str(), port, true), timeout).and_then(move |addrs| { let conn = TcpStreamConnect::new(addrs.into_iter()); try_timeout(conn, timeout) }) @@ -145,7 +147,7 @@ where let addr = self.addr.clone(); let client_pair = (self.r, self.w); let timeout = self.timeout; - connect_remote(self.config, self.addr, self.timeout).map(move |stream| TcpRelayClientConnected { + connect_remote(self.context, self.addr, self.timeout).map(move |stream| TcpRelayClientConnected { server: stream.split(), client: client_pair, addr: addr, @@ -186,7 +188,7 @@ where fn handle_client( server_cfg: Arc, - config: Arc, + context: SharedContext, socket: TcpStream, ) -> impl Future + Send { if let Err(err) = socket.set_keepalive(server_cfg.timeout()) { @@ -211,7 +213,7 @@ fn handle_client( let client = TcpRelayClientHandshake { s: socket, svr_cfg: server_cfg, - config: config, + context: context, }; client @@ -225,10 +227,10 @@ fn handle_client( } /// Runs the server -pub fn run(config: Arc) -> impl Future + Send { - let mut vec_fut = Vec::with_capacity(config.server.len()); +pub fn run(context: SharedContext) -> impl Future + Send { + let mut vec_fut = Vec::with_capacity(context.config().server.len()); - for svr_cfg in &config.server { + for svr_cfg in &context.config().server { let listener = { let addr = svr_cfg.addr(); let addr = addr.listen_addr(); @@ -240,13 +242,13 @@ pub fn run(config: Arc) -> impl Future + S }; let svr_cfg = Arc::new(svr_cfg.clone()); - let config = config.clone(); + let context = context.clone(); let listening = listener .incoming() .for_each(move |socket| { let server_cfg = svr_cfg.clone(); - let config = config.clone(); - tokio::spawn(handle_client(server_cfg, config, socket)); + let context = context.clone(); + tokio::spawn(handle_client(server_cfg, context, socket)); Ok(()) }) .map_err(|err| { diff --git a/src/relay/tcprelay/socks5_local.rs b/src/relay/tcprelay/socks5_local.rs index f2427555..07d5e995 100644 --- a/src/relay/tcprelay/socks5_local.rs +++ b/src/relay/tcprelay/socks5_local.rs @@ -13,7 +13,8 @@ use tokio_io::{ AsyncRead, }; -use config::{Config, ServerConfig}; +use config::ServerConfig; +use context::SharedContext; use relay::{ boxed_future, @@ -31,7 +32,7 @@ struct UdpConfig { } fn handle_socks5_connect( - config: Arc, + context: SharedContext, (r, w): (ReadHalf, WriteHalf), client_addr: SocketAddr, addr: Address, @@ -40,7 +41,7 @@ fn handle_socks5_connect( let cloned_addr = addr.clone(); let cloned_svr_cfg = svr_cfg.clone(); let timeout = svr_cfg.timeout(); - super::connect_proxy_server(config.clone(), svr_cfg) + super::connect_proxy_server(context.clone(), svr_cfg) .then(move |res| { let (header, r) = match res { Ok(svr_s) => { @@ -89,7 +90,7 @@ fn handle_socks5_connect( } fn handle_socks5_client( - config: Arc, + context: SharedContext, s: TcpStream, conf: Arc, udp_conf: UdpConfig, @@ -153,7 +154,7 @@ fn handle_socks5_client( match header.command { socks5::Command::TcpConnect => { debug!("CONNECT {}", addr); - let fut = handle_socks5_connect(config, (r, w), cloned_client_addr, addr, conf); + let fut = handle_socks5_connect(context, (r, w), cloned_client_addr, addr, conf); boxed_future(fut) } socks5::Command::TcpBind => { @@ -200,25 +201,25 @@ fn handle_socks5_client( } /// Starts a TCP local server with Socks5 proxy protocol -pub fn run(config: Arc) -> impl Future + Send { - let local_addr = *config.local.as_ref().expect("Missing local config"); +pub fn run(context: SharedContext) -> impl Future + Send { + let local_addr = *context.config().local.as_ref().expect("Missing local config"); let listener = TcpListener::bind(&local_addr).unwrap_or_else(|err| panic!("Failed to listen, {}", err)); info!("ShadowSocks TCP Listening on {}", local_addr); let udp_conf = UdpConfig { - enable_udp: config.enable_udp, + enable_udp: context.config().enable_udp, client_addr: local_addr, }; - let mut servers = RoundRobin::new(&*config); + let mut servers = RoundRobin::new(context.config()); listener.incoming().for_each(move |socket| { let server_cfg = servers.pick_server(); trace!("Got connection, addr: {}", socket.peer_addr()?); trace!("Picked proxy server: {:?}", server_cfg); - handle_socks5_client(config.clone(), socket, server_cfg, udp_conf.clone()) + handle_socks5_client(context.clone(), socket, server_cfg, udp_conf.clone()) }) } diff --git a/src/relay/udprelay/dns.rs b/src/relay/udprelay/dns.rs index 9e6268ca..312b7af1 100644 --- a/src/relay/udprelay/dns.rs +++ b/src/relay/udprelay/dns.rs @@ -10,7 +10,6 @@ use std::{ use dns_parser::{Packet, RRData}; use futures::{self, future::join_all, stream::futures_unordered, Future, Stream}; -use lru_cache::LruCache; use tokio::{self, net::UdpSocket}; use super::{ @@ -19,7 +18,8 @@ use super::{ SendDgramRc, SharedUdpSocket, }; -use config::{Config, ServerAddr, ServerConfig}; +use config::{ServerAddr, ServerConfig}; +use context::SharedContext; use relay::{boxed_future, dns_resolver::resolve, socks5::Address}; struct PrettyRRData<'a> { @@ -135,40 +135,40 @@ impl<'a> fmt::Display for PrettyPacket<'a> { } /// Starts a UDP DNS server -pub fn run(config: Arc) -> impl Future + Send { - let local_addr = *config.local.as_ref().unwrap(); +pub fn run(context: SharedContext) -> impl Future + Send { + let local_addr = *context.config().local.as_ref().unwrap(); futures::lazy(move || { info!("ShadowSocks UDP DNS Listening on {}", local_addr); UdpSocket::bind(&local_addr) }) - .and_then(move |l| listen(config, l)) + .and_then(move |l| listen(context, l)) } -fn listen(config: Arc, l: UdpSocket) -> impl Future + Send { - assert!(!config.server.is_empty()); +fn listen(context: SharedContext, l: UdpSocket) -> impl Future + Send { + assert!(!context.config().server.is_empty()); - let mut svr_fut = Vec::with_capacity(config.server.len()); - for svr in &config.server { + let mut svr_fut = Vec::with_capacity(context.config().server.len()); + for svr in &context.config().server { match *svr.addr() { ServerAddr::SocketAddr(ref addr) => { svr_fut.push(boxed_future(futures::finished::<_, io::Error>(vec![*addr]))); } ServerAddr::DomainName(ref dom, ref port) => { - svr_fut.push(boxed_future(resolve(config.clone(), &*dom, *port, false))); + svr_fut.push(boxed_future(resolve(context.clone(), &*dom, *port, false))); } } } - let cloned_config = config.clone(); + let cloned_context = context.clone(); join_all(svr_fut) .and_then(move |svr_addrs| { let mut u = Vec::with_capacity(svr_addrs.len()); for (idx, svr_addr) in svr_addrs.into_iter().enumerate() { let local_addr = SocketAddr::new(IpAddr::from(Ipv4Addr::new(0, 0, 0, 0)), 0); let s = UdpSocket::bind(&local_addr)?; - let svr_cfg = Arc::new(cloned_config.server[idx].clone()); + let svr_cfg = Arc::new(cloned_context.config().server[idx].clone()); u.push((Arc::new(Mutex::new(s)), svr_cfg, svr_addr[0])); } Ok(u) @@ -176,9 +176,13 @@ fn listen(config: Arc, l: UdpSocket) -> impl Future { @@ -194,12 +198,8 @@ fn listen(config: Arc, l: UdpSocket) -> impl Future> = Mutex::new(LruCache::new(1024)); -} - fn handle_l2r( - config: Arc, + context: SharedContext, l: SharedUdpSocket, server: Vec<(SharedUdpSocket, Arc, SocketAddr)>, ) -> impl Future + Send { @@ -210,7 +210,8 @@ fn handle_l2r( PacketStream::new(l).for_each(move |(payload, src)| { let server = server.clone(); - let config = config.clone(); + let context = context.clone(); + let context2 = context.clone(); let pkt_fut = futures::lazy(move || { let pkt = Packet::parse(&payload[..]).map_err(|err| { error!("Failed to parse DNS payload, err: {}", err); @@ -225,7 +226,7 @@ fn handle_l2r( trace!("DETAIL {} -> {} {:?}", src, svr_cfg.addr(), pkt); let mut buf = Vec::new(); - Address::SocketAddress(config.get_remote_dns()).write_to_buf(&mut buf); + Address::SocketAddress(context.config().get_remote_dns()).write_to_buf(&mut buf); buf.extend_from_slice(&payload); @@ -236,7 +237,7 @@ fn handle_l2r( }) .and_then(move |(socket, svr_addr, send_payload, id)| { SendDgramRc::new(socket, send_payload, svr_addr).map(move |_| { - GLOBAL_QUERY_ADDR.lock().unwrap().insert(id, (src, Instant::now())); + context2.dns_query_cache().insert(id, (src, Instant::now())); }) }); tokio::spawn(pkt_fut.then(|res| match res { @@ -252,6 +253,7 @@ fn handle_l2r( } fn handle_r2l( + context: SharedContext, l: SharedUdpSocket, r: SharedUdpSocket, svr_cfg: Arc, @@ -259,6 +261,7 @@ fn handle_r2l( PacketStream::new(r).for_each(move |(payload, src)| { let l = l.clone(); let svr_cfg = svr_cfg.clone(); + let context = context.clone(); let pkt_fut = futures::lazy(move || decrypt_payload(svr_cfg.method(), svr_cfg.key(), &payload)) .and_then(move |payload| Address::read_from(Cursor::new(payload)).map_err(From::from)) .and_then(move |(cur, ..)| { @@ -272,7 +275,8 @@ fn handle_r2l( let payload = payload[pos..].to_vec(); - match GLOBAL_QUERY_ADDR.lock().unwrap().remove(&pkt.header.id) { + let mut cache = context.dns_query_cache(); + match cache.remove(&pkt.header.id) { Some((cli_addr, start_time)) => { debug!( "DNS {} <- {} {} elapsed: {:?}", diff --git a/src/relay/udprelay/local.rs b/src/relay/udprelay/local.rs index 5d1770c9..644845a1 100644 --- a/src/relay/udprelay/local.rs +++ b/src/relay/udprelay/local.rs @@ -11,7 +11,8 @@ use futures::{self, Future, Stream}; use tokio::{self, net::UdpSocket, util::FutureExt}; -use config::{Config, ServerAddr, ServerConfig}; +use config::{ServerAddr, ServerConfig}; +use context::SharedContext; use relay::{ boxed_future, dns_resolver::resolve, @@ -28,7 +29,7 @@ use super::{ /// Resolves server address to SocketAddr fn resolve_server_addr( - config: Arc, + context: SharedContext, svr_cfg: Arc, ) -> impl Future + Send { match *svr_cfg.addr() { @@ -36,7 +37,7 @@ fn resolve_server_addr( ServerAddr::SocketAddr(ref addr) => boxed_future(futures::finished(*addr)), // Resolve domain name to SocketAddr ServerAddr::DomainName(ref dname, port) => { - let fut = resolve(config, dname, port, false).map(move |vec_ipaddr| { + let fut = resolve(context, dname, port, false).map(move |vec_ipaddr| { assert!(!vec_ipaddr.is_empty()); vec_ipaddr[0] }); @@ -45,16 +46,16 @@ fn resolve_server_addr( } } -fn listen(config: Arc, l: UdpSocket) -> impl Future + Send { +fn listen(context: SharedContext, l: UdpSocket) -> impl Future + Send { let socket = Arc::new(Mutex::new(l)); - let mut balancer = RoundRobin::new(&*config); + let mut balancer = RoundRobin::new(context.config()); PacketStream::new(socket.clone()).for_each(move |(pkt, src)| { let svr_cfg = balancer.pick_server(); let svr_cfg_cloned = svr_cfg.clone(); let svr_cfg_cloned_cloned = svr_cfg.clone(); let socket = socket.clone(); - let config = config.clone(); + let context = context.clone(); let timeout = *svr_cfg.udp_timeout(); const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); @@ -76,7 +77,7 @@ fn listen(config: Arc, l: UdpSocket) -> impl Future, l: UdpSocket) -> impl Future) -> impl Future + Send { - let local_addr = *config.local.as_ref().unwrap(); +pub fn run(context: SharedContext) -> impl Future + Send { + let local_addr = *context.config().local.as_ref().unwrap(); futures::lazy(move || { info!("ShadowSocks UDP Listening on {}", local_addr); UdpSocket::bind(&local_addr) }) - .and_then(move |l| listen(config, l)) + .and_then(move |l| listen(context, l)) } diff --git a/src/relay/udprelay/server.rs b/src/relay/udprelay/server.rs index f53b445a..96dcaa8c 100644 --- a/src/relay/udprelay/server.rs +++ b/src/relay/udprelay/server.rs @@ -11,7 +11,8 @@ use futures::{self, stream::futures_unordered, Future, Stream}; use tokio::{self, net::UdpSocket, util::FutureExt}; -use config::{Config, ServerConfig}; +use config::ServerConfig; +use context::SharedContext; use relay::{boxed_future, dns_resolver::resolve, socks5::Address}; use super::{ @@ -21,10 +22,13 @@ use super::{ MAXIMUM_UDP_PAYLOAD_SIZE, }; -fn resolve_remote_addr(config: Arc, addr: Address) -> impl Future + Send { +fn resolve_remote_addr( + context: SharedContext, + addr: Address, +) -> impl Future + Send { match addr { Address::SocketAddress(s) => { - if config.forbidden_ip.contains(&s.ip()) { + if context.config().forbidden_ip.contains(&s.ip()) { let err = io::Error::new( ErrorKind::Other, format!("{} is forbidden, failed to connect {}", s.ip(), s), @@ -35,7 +39,7 @@ fn resolve_remote_addr(config: Arc, addr: Address) -> impl Future { - let fut = resolve(config, &dname, port, true).map(move |vec_ipaddr| { + let fut = resolve(context, &dname, port, true).map(move |vec_ipaddr| { assert!(!vec_ipaddr.is_empty()); vec_ipaddr[0] }); @@ -44,7 +48,7 @@ fn resolve_remote_addr(config: Arc, addr: Address) -> impl Future, svr_cfg: Arc) -> impl Future + Send { +fn listen(context: SharedContext, svr_cfg: Arc) -> impl Future + Send { let listen_addr = *svr_cfg.addr().listen_addr(); info!("ShadowSocks UDP listening on {}", listen_addr); futures::lazy(move || UdpSocket::bind(&listen_addr)).and_then(move |socket| { @@ -53,7 +57,7 @@ fn listen(config: Arc, svr_cfg: Arc) -> impl Future, svr_cfg: Arc) -> impl Future, svr_cfg: Arc) -> impl Future) -> impl Future + Send { +pub fn run(context: SharedContext) -> impl Future + Send { let mut vec_fut = Vec::new(); - for svr in &config.server { + for svr in &context.config().server { let svr_cfg = Arc::new(svr.clone()); - let svr_fut = listen(config.clone(), svr_cfg); + let svr_fut = listen(context.clone(), svr_cfg); vec_fut.push(boxed_future(svr_fut)); } diff --git a/tests/dns.rs b/tests/dns.rs index 1a86ff39..211c7928 100644 --- a/tests/dns.rs +++ b/tests/dns.rs @@ -39,6 +39,9 @@ fn dns_relay() { let server_cfg = Config::load_from_str(CONFIG, ConfigType::Server).unwrap(); let dns_cfg = Config::load_from_str(CONFIG, ConfigType::Local).unwrap(); + trace!("Server: {:?}", server_cfg); + trace!("Client: {:?}", dns_cfg); + thread::spawn(move || { let mut runtime = Runtime::new().expect("Failed to create Runtime"); runtime.block_on(run_server(server_cfg)).unwrap();