diff --git a/src/bin/local.rs b/src/bin/local.rs index 3dbab134..c82b5b11 100644 --- a/src/bin/local.rs +++ b/src/bin/local.rs @@ -41,8 +41,7 @@ use std::env; use env_logger::LogBuilder; use log::{LogRecord, LogLevelFilter}; -use shadowsocks::config::{self, Config, ServerConfig}; -use shadowsocks::config::DEFAULT_DNS_CACHE_CAPACITY; +use shadowsocks::config::{self, Config, ServerConfig, ServerAddr}; use shadowsocks::relay::RelayLocal; fn main() { @@ -183,7 +182,7 @@ fn main() { .unwrap(); let sc = ServerConfig { - addr: svr_addr.parse::().expect("Invalid server addr"), + addr: svr_addr.parse::().expect("Invalid server addr"), password: password.to_owned(), method: match method.parse() { Ok(m) => m, @@ -192,7 +191,6 @@ fn main() { } }, timeout: None, - dns_cache_capacity: DEFAULT_DNS_CACHE_CAPACITY, }; config.server.push(sc); diff --git a/src/bin/server.rs b/src/bin/server.rs index f2ad4761..896fa60c 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -36,15 +36,13 @@ extern crate env_logger; extern crate time; use std::env; -use std::net::SocketAddr; use clap::{App, Arg}; use env_logger::LogBuilder; use log::{LogRecord, LogLevelFilter}; -use shadowsocks::config::{self, Config, ServerConfig}; -use shadowsocks::config::DEFAULT_DNS_CACHE_CAPACITY; +use shadowsocks::config::{self, Config, ServerConfig, ServerAddr}; use shadowsocks::relay::RelayServer; fn main() { @@ -90,11 +88,6 @@ fn main() { .long("encrypt-method") .takes_value(true) .help("Encryption method")) - .arg(Arg::with_name("THREADS") - .short("t") - .long("threads") - .takes_value(true) - .help("Threads in thread pool")) .get_matches(); let mut log_builder = LogBuilder::new(); @@ -194,7 +187,7 @@ fn main() { .unwrap(); let sc = ServerConfig { - addr: svr_addr.parse::().expect("`server-addr` invalid"), + addr: svr_addr.parse::().expect("`server-addr` invalid"), password: password.to_owned(), method: match method.parse() { Ok(m) => m, @@ -203,7 +196,6 @@ fn main() { } }, timeout: None, - dns_cache_capacity: DEFAULT_DNS_CACHE_CAPACITY, }; config.server.push(sc); @@ -232,11 +224,5 @@ fn main() { debug!("Config: {:?}", config); - let threads = matches.value_of("THREADS") - .unwrap_or("1") - .parse::() - .ok() - .expect("`threads` should be an integer"); - - RelayServer::run(config, threads).unwrap(); + RelayServer::run(config).unwrap(); } diff --git a/src/config.rs b/src/config.rs index a49d837d..bca5749a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -72,37 +72,116 @@ use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr}; use std::string::ToString; use std::option::Option; use std::default::Default; -use std::fmt::{self, Debug, Formatter}; +use std::fmt::{self, Display, Debug, Formatter}; use std::path::Path; use std::collections::HashSet; use std::time::Duration; use std::convert::From; +use std::str::FromStr; use ip::IpAddr; use crypto::cipher::CipherType; /// Default DNS cache capacity -pub const DEFAULT_DNS_CACHE_CAPACITY: usize = 65536; +pub const DEFAULT_DNS_CACHE_CAPACITY: usize = 128; + +/// Server address +#[derive(Clone, Debug)] +pub enum ServerAddr { + /// IP Address + SocketAddr(SocketAddr), + /// Domain name address, eg. example.com:8080 + DomainName(String, u16), +} + +impl ServerAddr { + /// Get address for server listener + /// Panic if address is domain name + pub fn listen_addr(&self) -> &SocketAddr { + match self { + &ServerAddr::SocketAddr(ref s) => s, + _ => panic!("Cannot use domain name as server listen address"), + } + } + + fn to_json_object_inner(&self, obj: &mut json::Object, addr_key: &str, port_key: &str) { + use serialize::json::Json; + + match self { + &ServerAddr::SocketAddr(SocketAddr::V4(ref v4)) => { + obj.insert(addr_key.to_owned(), Json::String(v4.ip().to_string())); + obj.insert(port_key.to_owned(), Json::U64(v4.port() as u64)); + } + &ServerAddr::SocketAddr(SocketAddr::V6(ref v6)) => { + obj.insert(addr_key.to_owned(), Json::String(v6.ip().to_string())); + obj.insert(port_key.to_owned(), Json::U64(v6.port() as u64)); + } + &ServerAddr::DomainName(ref domain, port) => { + obj.insert(addr_key.to_owned(), Json::String(domain.to_owned())); + obj.insert(port_key.to_owned(), Json::U64(port as u64)); + } + } + } + + fn to_json_object(&self, obj: &mut json::Object) { + self.to_json_object_inner(obj, "address", "port") + } + + fn to_json_object_old(&self, obj: &mut json::Object) { + self.to_json_object_inner(obj, "server", "server_port") + } +} + +#[derive(Debug)] +pub struct ServerAddrError; + +impl FromStr for ServerAddr { + type Err = ServerAddrError; + fn from_str(s: &str) -> Result { + match s.parse::() { + Ok(addr) => Ok(ServerAddr::SocketAddr(addr)), + Err(..) => { + let mut sp = s.split(':'); + match (sp.next(), sp.next()) { + (Some(dn), Some(port)) => { + match port.parse::() { + Ok(port) => Ok(ServerAddr::DomainName(dn.to_owned(), port)), + Err(..) => Err(ServerAddrError), + } + } + _ => Err(ServerAddrError), + } + } + } + } +} + +impl Display for ServerAddr { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + match self { + &ServerAddr::SocketAddr(ref a) => write!(f, "{}", a), + &ServerAddr::DomainName(ref d, port) => write!(f, "{}:{}", d, port), + } + } +} /// Configuration for a server #[derive(Clone, Debug)] pub struct ServerConfig { - pub addr: SocketAddr, + pub addr: ServerAddr, pub password: String, pub method: CipherType, pub timeout: Option, - pub dns_cache_capacity: usize, } impl ServerConfig { pub fn basic(addr: SocketAddr, password: String, method: CipherType) -> ServerConfig { ServerConfig { - addr: addr, + addr: ServerAddr::SocketAddr(addr), password: password, method: method, timeout: None, - dns_cache_capacity: DEFAULT_DNS_CACHE_CAPACITY, } } } @@ -112,24 +191,13 @@ impl json::ToJson for ServerConfig { use serialize::json::Json; let mut obj = json::Object::new(); - match self.addr { - SocketAddr::V4(ref v4) => { - obj.insert("address".to_owned(), Json::String(v4.ip().to_string())); - obj.insert("port".to_owned(), Json::U64(v4.port() as u64)); - } - SocketAddr::V6(ref v6) => { - obj.insert("address".to_owned(), Json::String(v6.ip().to_string())); - obj.insert("port".to_owned(), Json::U64(v6.port() as u64)); - } - } + self.addr.to_json_object(&mut obj); obj.insert("password".to_owned(), Json::String(self.password.clone())); obj.insert("method".to_owned(), Json::String(self.method.to_string())); if let Some(t) = self.timeout { obj.insert("timeout".to_owned(), Json::U64(t.as_secs())); } - obj.insert("dns_cache_capacity".to_owned(), - Json::U64(self.dns_cache_capacity as u64)); Json::Object(obj) } @@ -153,6 +221,7 @@ pub struct Config { pub enable_udp: bool, pub timeout: Option, pub forbidden_ip: HashSet, + pub dns_cache_capacity: usize, } impl Default for Config { @@ -177,8 +246,6 @@ pub struct Error { pub detail: Option, } - - impl Error { pub fn new(kind: ErrorKind, desc: &'static str, detail: Option) -> Error { Error { @@ -229,6 +296,7 @@ impl Debug for Error { } impl Config { + /// Creates an empty configuration pub fn new() -> Config { Config { server: Vec::new(), @@ -237,6 +305,7 @@ impl Config { enable_udp: false, timeout: None, forbidden_ip: HashSet::new(), + dns_cache_capacity: DEFAULT_DNS_CACHE_CAPACITY, } } @@ -286,21 +355,15 @@ impl Config { }) .and_then(|addr_str| { addr_str.parse::() - .map(|v4| SocketAddr::V4(SocketAddrV4::new(v4, port))) + .map(|v4| ServerAddr::SocketAddr(SocketAddr::V4(SocketAddrV4::new(v4, port)))) .or_else(|_| { addr_str.parse::() - .map(|v6| SocketAddr::V6(SocketAddrV6::new(v6, port, 0, 0))) + .map(|v6| ServerAddr::SocketAddr(SocketAddr::V6(SocketAddrV6::new(v6, port, 0, 0)))) }) - .map_err(|_| Error::new(ErrorKind::Malformed, "invalid server addr", None)) + .or_else(|_| Ok(ServerAddr::DomainName(addr_str.to_string(), port))) }); - let mut addr = try!(addr); - - // Merge address and port - match addr { - SocketAddr::V4(ref mut v4) => v4.set_port(port), - SocketAddr::V6(ref mut v6) => v6.set_port(port), - } + let addr = try!(addr); let password = server.get("password") .ok_or_else(|| Error::new(ErrorKind::MissingField, "need to specify a password", None)) @@ -321,22 +384,11 @@ impl Config { None => None, }; - let dns_cache_capacity = match server.get("dns_cache_capacity") { - Some(t) => { - try!(t.as_u64() - .ok_or(Error::new(ErrorKind::Malformed, - "`dns_cache_capacity` should be an integer", - None))) as usize - } - None => DEFAULT_DNS_CACHE_CAPACITY, - }; - Ok(ServerConfig { addr: addr, password: password, method: method, timeout: timeout, - dns_cache_capacity: dns_cache_capacity, }) } @@ -477,6 +529,18 @@ impl Config { })); } + let dns_cache_capacity = match o.get("dns_cache_capacity") { + Some(t) => { + try!(t.as_u64() + .ok_or(Error::new(ErrorKind::Malformed, + "`dns_cache_capacity` should be an integer", + None))) as usize + } + None => DEFAULT_DNS_CACHE_CAPACITY, + }; + + config.dns_cache_capacity = dns_cache_capacity; + Ok(config) } @@ -515,17 +579,7 @@ impl json::ToJson for Config { // Official format let server = &self.server[0]; - - match server.addr { - SocketAddr::V4(ref v4) => { - obj.insert("server".to_owned(), Json::String(v4.ip().to_string())); - obj.insert("server_port".to_owned(), Json::U64(v4.port() as u64)); - } - SocketAddr::V6(ref v6) => { - obj.insert("server".to_owned(), Json::String(v6.ip().to_string())); - obj.insert("server_port".to_owned(), Json::U64(v6.port() as u64)); - } - } + server.addr.to_json_object_old(&mut obj); obj.insert("password".to_owned(), Json::String(self.server[0].password.clone())); @@ -550,6 +604,8 @@ impl json::ToJson for Config { } obj.insert("enable_udp".to_owned(), Json::Boolean(self.enable_udp)); + obj.insert("dns_cache_capacity".to_owned(), + Json::U64(self.dns_cache_capacity as u64)); Json::Object(obj) } diff --git a/src/relay/dns_resolver.rs b/src/relay/dns_resolver.rs new file mode 100644 index 00000000..72506eb7 --- /dev/null +++ b/src/relay/dns_resolver.rs @@ -0,0 +1,86 @@ +// The MIT License (MIT) + +// Copyright (c) 2014 Y. T. CHUNG + +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +use std::net::{SocketAddr, ToSocketAddrs}; +use std::sync::{Arc, Mutex}; +use std::io; + +use relay::BoxIoFuture; + +use lru_cache::LruCache; + +use futures::{self, Future}; +use futures_cpupool::CpuPool; + +use ip::IpAddr; + +fn socket_addr_to_ip(addr: SocketAddr) -> IpAddr { + match addr { + SocketAddr::V4(v4) => IpAddr::V4(v4.ip().clone()), + SocketAddr::V6(v6) => IpAddr::V6(v6.ip().clone()), + } +} + +#[derive(Clone)] +pub struct DnsResolver { + cpu_pool: CpuPool, + dns_cache: Arc>>, +} + +impl DnsResolver { + /// Creates an DNS resolver + pub fn new(cache_size: usize) -> DnsResolver { + DnsResolver { + cpu_pool: CpuPool::new_num_cpus(), + dns_cache: Arc::new(Mutex::new(LruCache::new(cache_size))), + } + } + + /// Resolves address asynchronously + pub fn resolve(&self, addr: &str) -> BoxIoFuture { + let dns_cache = self.dns_cache.clone(); + + let mut guard = self.dns_cache.lock().unwrap(); + match guard.get_mut(addr) { + Some(addr) => futures::finished(addr.clone()).boxed(), + None => { + let addr = addr.to_owned(); + self.cpu_pool + .spawn(futures::lazy(move || { + let mixed_addr = format!("{}:0", addr); + match try!(mixed_addr.to_socket_addrs()).next() { + Some(sock_addr) => { + let mut guard = dns_cache.lock().unwrap(); + let ipaddr = socket_addr_to_ip(sock_addr); + guard.insert(addr, ipaddr); + Ok(ipaddr) + } + None => { + let err = io::Error::new(io::ErrorKind::Other, "Failed to resolve address"); + Err(err) + } + } + })) + .boxed() + } + } + } +} diff --git a/src/relay/local.rs b/src/relay/local.rs index 8ad82aa2..afdce568 100644 --- a/src/relay/local.rs +++ b/src/relay/local.rs @@ -29,6 +29,7 @@ use tokio_core::reactor::Core; use relay::tcprelay::local::TcpRelayLocal; #[cfg(feature = "enable-udp")] use relay::udprelay::local::UdpRelayLocal; +use relay::dns_resolver::DnsResolver; use config::Config; /// Relay server running under local environment. @@ -38,19 +39,18 @@ use config::Config; /// use std::sync::Arc; /// /// use shadowsocks::relay::RelayLocal; -/// use shadowsocks::config::{Config, ServerConfig}; +/// use shadowsocks::config::{Config, ServerConfig, ServerAddr}; /// use shadowsocks::crypto::CipherType; /// /// let mut config = Config::new(); /// config.local = Some("127.0.0.1:1080".parse().unwrap()); /// config.server = vec![ServerConfig { -/// addr: "127.0.0.1:8388".parse().unwrap(), +/// addr: ServerAddr::SocketAddr("127.0.0.1:8388".parse().unwrap()), /// password: "server-password".to_string(), /// method: CipherType::Aes256Cfb, /// timeout: None, -/// dns_cache_capacity: 1024, /// }]; -/// RelayLocal::new(config).run(); +/// RelayLocal::run(config); /// ``` #[derive(Clone)] pub struct RelayLocal; @@ -60,7 +60,8 @@ impl RelayLocal { let mut lp = try!(Core::new()); let handle = lp.handle(); let config = Arc::new(config); - let tcp_fut = TcpRelayLocal::run(config, handle); + let dns_resolver = DnsResolver::new(config.dns_cache_capacity); + let tcp_fut = TcpRelayLocal::run(config, handle, dns_resolver); lp.run(tcp_fut) } } diff --git a/src/relay/mod.rs b/src/relay/mod.rs index 5021c501..51a2024d 100644 --- a/src/relay/mod.rs +++ b/src/relay/mod.rs @@ -26,7 +26,7 @@ pub use self::server::RelayServer; use std::io; -use futures::BoxFuture; +use futures::Future; mod tcprelay; #[cfg(feature = "enable-udp")] @@ -34,6 +34,7 @@ mod udprelay; pub mod local; pub mod server; mod loadbalancing; +mod dns_resolver; pub mod socks5; -pub type BoxIoFuture = BoxFuture; \ No newline at end of file +pub type BoxIoFuture = Box>; \ No newline at end of file diff --git a/src/relay/server.rs b/src/relay/server.rs index 47b8d69d..90c34cd6 100644 --- a/src/relay/server.rs +++ b/src/relay/server.rs @@ -38,29 +38,28 @@ use config::Config; /// use std::sync::Arc; /// /// use shadowsocks::relay::RelayServer; -/// use shadowsocks::config::{Config, ServerConfig}; +/// use shadowsocks::config::{Config, ServerConfig, ServerAddr}; /// use shadowsocks::crypto::CipherType; /// /// let mut config = Config::new(); /// config.server = vec![ServerConfig { -/// addr: "127.0.0.1:8388".parse().unwrap(), +/// addr: ServerAddr::SocketAddr("127.0.0.1:8388".parse().unwrap()), /// password: "server-password".to_string(), /// method: CipherType::Aes256Cfb, /// timeout: None, -/// dns_cache_capacity: 1024, /// }]; -/// RelayServer::new(config).run(1); +/// RelayServer::run(config); /// ``` /// #[derive(Clone)] pub struct RelayServer; impl RelayServer { - pub fn run(config: Config, threads: usize) -> io::Result<()> { + pub fn run(config: Config) -> io::Result<()> { let mut lp = try!(Core::new()); let handle = lp.handle(); let config = Arc::new(config); - let tcp_fut = TcpRelayServer::run(config, handle, threads); + let tcp_fut = TcpRelayServer::run(config, handle); lp.run(tcp_fut) } } diff --git a/src/relay/tcprelay/cached_dns.rs b/src/relay/tcprelay/cached_dns.rs deleted file mode 100644 index 547b20ca..00000000 --- a/src/relay/tcprelay/cached_dns.rs +++ /dev/null @@ -1,100 +0,0 @@ -// The MIT License (MIT) - -// Copyright (c) 2014 Y. T. CHUNG - -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: - -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. - -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -use std::sync::Arc; -// use std::sync::atomic::{AtomicOption, SeqCst}; -use std::net::lookup_host; -use std::net::SocketAddr; - -use coio::sync::Mutex; - -use lru_cache::LruCache; - -struct DnsLruCache { - cache: LruCache>, - totally_matched: usize, - totally_missed: usize, -} - -pub struct CachedDns { - lru_cache: Arc>, -} - -impl CachedDns { - pub fn with_capacity(cache_capacity: usize) -> CachedDns { - CachedDns { - lru_cache: Arc::new(Mutex::new(DnsLruCache { - cache: LruCache::new(cache_capacity), - totally_missed: 0, - totally_matched: 0, - })), - } - } - - pub fn resolve(&self, addr_str: &str) -> Option> { - { - let mut cache = self.lru_cache.lock().unwrap(); - match cache.cache.get_mut(addr_str).map(|x| x.clone()) { - Some(addrs) => { - cache.totally_matched += 1; - debug!("DNS cache matched!: {}", addr_str); - debug!("DNS cache matched: {}, missed: {}", - cache.totally_matched, - cache.totally_missed); - return Some(addrs); - } - None => { - cache.totally_missed += 1; - debug!("DNS cache missed!: {}", addr_str); - debug!("DNS cache matched: {}, missed: {}", - cache.totally_matched, - cache.totally_missed); - } - } - } - - let addr_vec = match lookup_host(addr_str) { - Ok(addrs) => addrs.collect::>(), - Err(err) => { - error!("Failed to resolve {}: {}", addr_str, err); - return None; - } - }; - - let cloned_mutex = self.lru_cache.clone(); - - if addr_vec.is_empty() { - error!("Failed to resolve {}", addr_str); - return None; - } - let cloned_addrs = addr_vec.clone(); - - let addr_string: String = addr_str.to_owned(); - { - let mut cache = cloned_mutex.lock().unwrap(); - cache.cache.insert(addr_string, cloned_addrs); - } - Some(addr_vec) - } -} - -unsafe impl Send for CachedDns {} -unsafe impl Sync for CachedDns {} diff --git a/src/relay/tcprelay/local.rs b/src/relay/tcprelay/local.rs index f66fe7c6..e89be321 100644 --- a/src/relay/tcprelay/local.rs +++ b/src/relay/tcprelay/local.rs @@ -25,7 +25,7 @@ use std::io; use std::net::SocketAddr; use std::sync::Arc; -use futures::{self, Future, BoxFuture}; +use futures::{self, Future}; use futures::stream::Stream; use tokio_core::net::{TcpStream, TcpListener}; @@ -43,6 +43,7 @@ use relay::socks5::{TcpRequestHeader, TcpResponseHeader}; use relay::loadbalancing::server::RoundRobin; use relay::loadbalancing::server::LoadBalancer; use relay::BoxIoFuture; +use relay::dns_resolver::DnsResolver; use super::http::{self, HttpRequestFut}; use super::tunnel; @@ -51,11 +52,14 @@ use super::tunnel; pub struct TcpRelayLocal; impl TcpRelayLocal { - pub fn run(config: Arc, handle: Handle) -> Box> { - let tcp_fut = Socks5RelayLocal::run(config.clone(), handle.clone()); + pub fn run(config: Arc, + handle: Handle, + dns_resolver: DnsResolver) + -> Box> { + let tcp_fut = Socks5RelayLocal::run(config.clone(), handle.clone(), dns_resolver.clone()); match &config.http_proxy { &Some(..) => { - let http_fut = HttpRelayServer::run(config, handle); + let http_fut = HttpRelayServer::run(config, handle, dns_resolver); Box::new(tcp_fut.join(http_fut) .map(|_| ())) } @@ -72,11 +76,12 @@ impl Socks5RelayLocal { (r, w): (ReadHalf, WriteHalf), client_addr: SocketAddr, addr: Address, - svr_cfg: Arc) - -> BoxFuture<(), io::Error> { + svr_cfg: Arc, + dns_resolver: DnsResolver) + -> Box> { let cloned_addr = addr.clone(); let cloned_svr_cfg = svr_cfg.clone(); - super::connect_proxy_server(handle, svr_cfg) + let fut = super::connect_proxy_server(handle, svr_cfg, dns_resolver) .and_then(move |svr_s| { trace!("Proxy server connected"); @@ -96,11 +101,17 @@ impl Socks5RelayLocal { tunnel(cloned_addr, whalf, rhalf) }) - }) - .boxed() + }); + + Box::new(fut) } - fn handle_client(handle: &Handle, s: TcpStream, _: SocketAddr, conf: Arc) -> io::Result<()> { + fn handle_client(handle: &Handle, + s: TcpStream, + _: SocketAddr, + conf: Arc, + dns_resolver: DnsResolver) + -> io::Result<()> { let cloned_handle = handle.clone(); let client_addr = try!(s.peer_addr()); let cloned_client_addr = client_addr.clone(); @@ -149,7 +160,12 @@ impl Socks5RelayLocal { match header.command { socks5::Command::TcpConnect => { info!("CONNECT {}", addr); - Socks5RelayLocal::handle_socks5_connect(&cloned_handle, (r, w), cloned_client_addr, addr, conf) + Socks5RelayLocal::handle_socks5_connect(&cloned_handle, + (r, w), + cloned_client_addr, + addr, + conf, + dns_resolver) } socks5::Command::TcpBind => { warn!("BIND is not supported"); @@ -185,7 +201,10 @@ impl Socks5RelayLocal { } // Runs TCP relay local server - pub fn run(config: Arc, handle: Handle) -> Box> { + pub fn run(config: Arc, + handle: Handle, + dns_resolver: DnsResolver) + -> Box> { let listener = { let local_addr = config.local.as_ref().unwrap(); let listener = TcpListener::bind(local_addr, &handle).unwrap(); @@ -193,13 +212,16 @@ impl Socks5RelayLocal { listener }; + let dns_resolver = dns_resolver.clone(); + let mut servers = RoundRobin::new(&*config); let listening = listener.incoming() .for_each(move |(socket, addr)| { let server_cfg = servers.pick_server(); trace!("Got connection, addr: {}", addr); trace!("Picked proxy server: {:?}", server_cfg); - Socks5RelayLocal::handle_client(&handle, socket, addr, server_cfg) + let dns_resolver = dns_resolver.clone(); + Socks5RelayLocal::handle_client(&handle, socket, addr, server_cfg, dns_resolver) }); Box::new(listening.map_err(|err| { @@ -218,13 +240,14 @@ impl HttpRelayServer { req: http::HttpRequest, addr: Address, remains: Vec, - svr_cfg: Arc) - -> BoxFuture<(), io::Error> { + svr_cfg: Arc, + dns_resolver: DnsResolver) + -> Box> { let cloned_addr = addr.clone(); let http_version = req.version; let cloned_svr_cfg = svr_cfg.clone(); - super::connect_proxy_server(&handle, svr_cfg) + let fut = super::connect_proxy_server(&handle, svr_cfg, dns_resolver) .and_then(move |svr_s| { trace!("Proxy server connected"); @@ -243,46 +266,46 @@ impl HttpRelayServer { tunnel(cloned_addr, whalf, rhalf) }) - }) - .boxed() + }); + + Box::new(fut) } fn handle_http_keepalive(r: ReadHalf, svr_w: super::EncryptedHalf, req_remains: Vec) -> BoxIoFuture<()> { - HttpRequestFut::with_buf(r, req_remains) - .then(|res| { - match res { - Ok((r, req, remains)) => { - let should_keep_alive = http::should_keep_alive(&req); - trace!("Going to proxy request: {:?}", req); - trace!("Should keep alive? {}", should_keep_alive); + let fut = HttpRequestFut::with_buf(r, req_remains).then(|res| { + match res { + Ok((r, req, remains)) => { + let should_keep_alive = http::should_keep_alive(&req); + trace!("Going to proxy request: {:?}", req); + trace!("Should keep alive? {}", should_keep_alive); - http::proxy_request((r, svr_w), None, req, remains) - .and_then(move |(r, svr_w, req_remains)| { - if should_keep_alive { - HttpRelayServer::handle_http_keepalive(r, svr_w, req_remains) - } else { - futures::finished(()).boxed() - } - }) - .boxed() - } - Err(err) => { - futures::lazy(|| { - use std::io::ErrorKind; - match err.kind() { - // It is Ok for client to close connection - ErrorKind::UnexpectedEof | ErrorKind::BrokenPipe => Ok(()), - _ => Err(err), - } - }) - .boxed() - } + let fut = http::proxy_request((r, svr_w), None, req, remains) + .and_then(move |(r, svr_w, req_remains)| { + if should_keep_alive { + HttpRelayServer::handle_http_keepalive(r, svr_w, req_remains) + } else { + futures::finished(()).boxed() + } + }); + Box::new(fut) as BoxIoFuture<()> } - }) - .boxed() + Err(err) => { + let fut = futures::lazy(|| { + use std::io::ErrorKind; + match err.kind() { + // It is Ok for client to close connection + ErrorKind::UnexpectedEof | ErrorKind::BrokenPipe => Ok(()), + _ => Err(err), + } + }); + Box::new(fut) as BoxIoFuture<()> + } + } + }); + Box::new(fut) } fn handle_http_proxy(handle: Handle, @@ -291,44 +314,49 @@ impl HttpRelayServer { req: http::HttpRequest, addr: Address, remains: Vec, - svr_cfg: Arc) - -> BoxFuture<(), io::Error> { + svr_cfg: Arc, + dns_resolver: DnsResolver) + -> Box> { trace!("Using HTTP Proxy for {} -> {}", client_addr, addr); let should_keep_alive = http::should_keep_alive(&req); - super::connect_proxy_server(&handle, svr_cfg.clone()) - .and_then(move |svr_s| { - trace!("Proxy server connected"); + let fut = super::connect_proxy_server(&handle, svr_cfg.clone(), dns_resolver).and_then(move |svr_s| { + trace!("Proxy server connected"); - let cloned_addr = addr.clone(); - super::proxy_server_handshake(svr_s, svr_cfg, addr).and_then(move |(svr_r, svr_w)| { - // Just proxy anything to client - let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w)); - let whalf = svr_w.and_then(move |svr_w| { - // Send the first request to server - trace!("Going to proxy request: {:?}", req); - trace!("Should keep alive? {}", should_keep_alive); - http::proxy_request((r, svr_w), None, req, remains) - .and_then(move |(r, svr_w, req_remains)| { - if should_keep_alive { - HttpRelayServer::handle_http_keepalive(r, svr_w, req_remains) - } else { - futures::finished(()).boxed() - } - }) - }); + let cloned_addr = addr.clone(); + super::proxy_server_handshake(svr_s, svr_cfg, addr).and_then(move |(svr_r, svr_w)| { + // Just proxy anything to client + let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w)); + let whalf = svr_w.and_then(move |svr_w| { + // Send the first request to server + trace!("Going to proxy request: {:?}", req); + trace!("Should keep alive? {}", should_keep_alive); + http::proxy_request((r, svr_w), None, req, remains).and_then(move |(r, svr_w, req_remains)| { + if should_keep_alive { + HttpRelayServer::handle_http_keepalive(r, svr_w, req_remains) + } else { + futures::finished(()).boxed() + } + }) + }); - rhalf.join(whalf) - .then(move |_| { - trace!("Relay to {} is finished", cloned_addr); - Ok(()) - }) - }) + rhalf.join(whalf) + .then(move |_| { + trace!("Relay to {} is finished", cloned_addr); + Ok(()) + }) }) - .boxed() + }); + + Box::new(fut) } - fn handle_client(handle: &Handle, socket: TcpStream, _: SocketAddr, svr_cfg: Arc) -> io::Result<()> { + fn handle_client(handle: &Handle, + socket: TcpStream, + _: SocketAddr, + svr_cfg: Arc, + dns_resolver: DnsResolver) + -> io::Result<()> { let cloned_handle = handle.clone(); let client_addr = try!(socket.peer_addr()); let fut = futures::lazy(|| Ok(socket.split())) @@ -362,7 +390,13 @@ impl HttpRelayServer { match req.method.clone() { Method::Connect => { info!("CONNECT (Http) {}", addr); - HttpRelayServer::handle_connect(cloned_handle, (r, w), req, addr, remains, svr_cfg) + HttpRelayServer::handle_connect(cloned_handle, + (r, w), + req, + addr, + remains, + svr_cfg, + dns_resolver) } met => { info!("{} (Http) {}", met, addr); @@ -372,7 +406,8 @@ impl HttpRelayServer { req, addr, remains, - svr_cfg) + svr_cfg, + dns_resolver) } } }); @@ -393,7 +428,10 @@ impl HttpRelayServer { Ok(()) } - pub fn run(config: Arc, handle: Handle) -> Box> { + pub fn run(config: Arc, + handle: Handle, + dns_resolver: DnsResolver) + -> Box> { let listener = { let local_addr = config.http_proxy.as_ref().unwrap(); let listener = TcpListener::bind(local_addr, &handle).unwrap(); @@ -407,7 +445,8 @@ impl HttpRelayServer { let server_cfg = servers.pick_server(); trace!("Got connection, addr: {}", addr); trace!("Picked proxy server: {:?}", server_cfg); - HttpRelayServer::handle_client(&handle, socket, addr, server_cfg) + let dns_resolver = dns_resolver.clone(); + HttpRelayServer::handle_client(&handle, socket, addr, server_cfg, dns_resolver) }); Box::new(listening.map_err(|err| { diff --git a/src/relay/tcprelay/mod.rs b/src/relay/tcprelay/mod.rs index a2602f57..7a037994 100644 --- a/src/relay/tcprelay/mod.rs +++ b/src/relay/tcprelay/mod.rs @@ -22,6 +22,7 @@ //! TcpRelay implementation use std::io::{self, Read, Write}; +use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; use std::sync::Arc; use std::mem; @@ -29,7 +30,8 @@ use crypto::cipher; use crypto::CryptoMode; use relay::socks5::Address; use relay::BoxIoFuture; -use config::ServerConfig; +use relay::dns_resolver::DnsResolver; +use config::{ServerConfig, ServerAddr}; use tokio_core::net::TcpStream; use tokio_core::reactor::Handle; @@ -39,9 +41,10 @@ use tokio_core::io::Io; use futures::{self, Future, BoxFuture, Poll}; +use ip::IpAddr; + use self::stream::{EncryptedWriter, DecryptedReader}; -// mod cached_dns; pub mod local; pub mod server; mod stream; @@ -59,8 +62,25 @@ pub type EncryptedHalf = EncryptedWriter>; pub type DecryptedHalfFut = BoxFuture; pub type EncryptedHalfFut = BoxFuture; -fn connect_proxy_server(handle: &Handle, svr_cfg: Arc) -> BoxIoFuture { - TcpStream::connect(&svr_cfg.addr, handle).boxed() +fn connect_proxy_server(handle: &Handle, + svr_cfg: Arc, + dns_resolver: DnsResolver) + -> Box> { + match &svr_cfg.addr { + &ServerAddr::SocketAddr(ref addr) => Box::new(TcpStream::connect(addr, handle)), + &ServerAddr::DomainName(ref domain, port) => { + let handle = handle.clone(); + let fut = dns_resolver.resolve(&domain[..]) + .and_then(move |sockaddr| { + let sockaddr = match sockaddr { + IpAddr::V4(v4) => SocketAddr::V4(SocketAddrV4::new(v4, port)), + IpAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(v6, port, 0, 0)), + }; + TcpStream::connect(&sockaddr, &handle).boxed() + }); + Box::new(fut) + } + } } /// Handshake logic for ShadowSocks Client @@ -68,20 +88,19 @@ pub fn proxy_server_handshake(remote_stream: TcpStream, svr_cfg: Arc, relay_addr: Address) -> BoxIoFuture<(DecryptedHalfFut, EncryptedHalfFut)> { - proxy_handshake(remote_stream, svr_cfg) - .and_then(|(r_fut, w_fut)| { - let w_fut = w_fut.and_then(move |enc_w| { - trace!("Got encrypt stream and going to send addr: {:?}", - relay_addr); + let fut = proxy_handshake(remote_stream, svr_cfg).and_then(|(r_fut, w_fut)| { + let w_fut = w_fut.and_then(move |enc_w| { + trace!("Got encrypt stream and going to send addr: {:?}", + relay_addr); - // Send relay address to remote - relay_addr.write_to(enc_w).and_then(flush) - }) - .boxed(); + // Send relay address to remote + relay_addr.write_to(enc_w).and_then(flush) + }) + .boxed(); - Ok((r_fut, w_fut)) - }) - .boxed() + Ok((r_fut, w_fut)) + }); + Box::new(fut) } /// ShadowSocks Client-Server handshake protocol diff --git a/src/relay/tcprelay/server.rs b/src/relay/tcprelay/server.rs index 212d9742..33cf0da1 100644 --- a/src/relay/tcprelay/server.rs +++ b/src/relay/tcprelay/server.rs @@ -22,7 +22,7 @@ //! TcpRelay server that running on the server side use std::io; -use std::net::{SocketAddr, ToSocketAddrs}; +use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; use std::sync::Arc; use std::collections::HashSet; @@ -30,12 +30,11 @@ use config::{Config, ServerConfig}; use relay::socks5::Address; use relay::BoxIoFuture; +use relay::dns_resolver::DnsResolver; use futures::{self, Future}; use futures::stream::Stream; -use futures_cpupool::CpuPool; - use tokio_core::reactor::Handle; use tokio_core::net::{TcpStream, TcpListener}; use tokio_core::io::Io; @@ -52,63 +51,63 @@ impl TcpRelayServer { fn handshake(remote_stream: TcpStream, svr_cfg: Arc) -> BoxIoFuture<(DecryptedHalf, Address, EncryptedHalfFut)> { - proxy_handshake(remote_stream, svr_cfg) - .and_then(|(r_fut, w_fut)| { - r_fut.and_then(|r| Address::read_from(r).map_err(From::from)) - .map(move |(r, addr)| (r, addr, w_fut)) - }) - .boxed() + let fut = proxy_handshake(remote_stream, svr_cfg).and_then(|(r_fut, w_fut)| { + r_fut.and_then(|r| Address::read_from(r).map_err(From::from)) + .map(move |(r, addr)| (r, addr, w_fut)) + }); + Box::new(fut) } - fn resolve_address(addr: Address, cpu_pool: CpuPool) -> BoxIoFuture { + fn resolve_address(addr: Address, dns_resolver: DnsResolver) -> BoxIoFuture { match addr { - Address::SocketAddress(addr) => futures::finished(addr).boxed(), + Address::SocketAddress(addr) => Box::new(futures::finished(addr)), Address::DomainNameAddress(dname, port) => { - cpu_pool.spawn(futures::lazy(move || { - let dname = format!("{}:{}", dname, port); - let mut addrs = try!(dname.to_socket_addrs()); - addrs.next().ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Failed to resolve domain")) - })) - .boxed() + let fut = dns_resolver.resolve(&dname[..]) + .and_then(move |ipaddr| { + Ok(match ipaddr { + IpAddr::V4(v4) => SocketAddr::V4(SocketAddrV4::new(v4, port)), + IpAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(v6, port, 0, 0)), + }) + }); + Box::new(fut) } } } - fn resolve_remote(cpu_pool: CpuPool, + fn resolve_remote(dns_resolver: DnsResolver, addr: Address, forbidden_ip: Arc>) - -> Box> { - TcpRelayServer::resolve_address(addr, cpu_pool) - .and_then(move |addr| { - trace!("Resolved address as {}", addr); - let ipaddr = match addr.clone() { - SocketAddr::V4(v4) => IpAddr::V4(v4.ip().clone()), - SocketAddr::V6(v6) => IpAddr::V6(v6.ip().clone()), - }; + -> BoxIoFuture { + let fut = TcpRelayServer::resolve_address(addr, dns_resolver).and_then(move |addr| { + trace!("Resolved address as {}", addr); + let ipaddr = match addr.clone() { + SocketAddr::V4(v4) => IpAddr::V4(v4.ip().clone()), + SocketAddr::V6(v6) => IpAddr::V6(v6.ip().clone()), + }; - if forbidden_ip.contains(&ipaddr) { - info!("{} has been forbidden", ipaddr); - let err = io::Error::new(io::ErrorKind::Other, "Forbidden IP"); - Err(err) - } else { - Ok(addr) - } - }) - .boxed() + if forbidden_ip.contains(&ipaddr) { + info!("{} has been forbidden", ipaddr); + let err = io::Error::new(io::ErrorKind::Other, "Forbidden IP"); + Err(err) + } else { + Ok(addr) + } + }); + Box::new(fut) } - fn connect_remote(cpu_pool: CpuPool, + fn connect_remote(dns_resolver: DnsResolver, handle: Handle, addr: Address, forbidden_ip: Arc>) - -> Box> { + -> BoxIoFuture { trace!("Connecting to remote {}", addr); - Box::new(TcpRelayServer::resolve_remote(cpu_pool, addr, forbidden_ip) + Box::new(TcpRelayServer::resolve_remote(dns_resolver, addr, forbidden_ip) .and_then(move |addr| TcpStream::connect(&addr, &handle))) } pub fn handle_client(handle: &Handle, - cpu_pool: CpuPool, + dns_resolver: DnsResolver, s: TcpStream, svr_cfg: Arc, forbidden_ip: Arc>) @@ -121,12 +120,13 @@ impl TcpRelayServer { let fut = TcpRelayServer::handshake(s, svr_cfg).and_then(move |(r, addr, w_fut)| { info!("Connecting {}", addr); let cloned_addr = addr.clone(); - TcpRelayServer::connect_remote(cpu_pool, cloned_handle.clone(), addr, forbidden_ip).and_then(move |svr_s| { - let (svr_r, svr_w) = svr_s.split(); - tunnel(cloned_addr, - copy(r, svr_w), - w_fut.and_then(|w| copy(svr_r, w))) - }) + TcpRelayServer::connect_remote(dns_resolver, cloned_handle.clone(), addr, forbidden_ip) + .and_then(move |svr_s| { + let (svr_r, svr_w) = svr_s.split(); + tunnel(cloned_addr, + copy(r, svr_w), + w_fut.and_then(|w| copy(svr_r, w))) + }) }); handle.spawn(fut.then(|res| { @@ -143,8 +143,8 @@ impl TcpRelayServer { } /// Runs the server - pub fn run(config: Arc, handle: Handle, threads: usize) -> Box> { - let cpu_pool = CpuPool::new(threads); + pub fn run(config: Arc, handle: Handle) -> Box> { + let dns_resolver = DnsResolver::new(config.dns_cache_capacity); let mut fut: Option>> = None; @@ -154,6 +154,7 @@ impl TcpRelayServer { for svr_cfg in &config.server { let listener = { let addr = &svr_cfg.addr; + let addr = addr.listen_addr(); let listener = TcpListener::bind(addr, &handle).unwrap(); trace!("ShadowSocks TCP Listening on {}", addr); listener @@ -161,17 +162,17 @@ impl TcpRelayServer { let svr_cfg = Arc::new(svr_cfg.clone()); let handle = handle.clone(); - let cpu_pool = cpu_pool.clone(); + let dns_resolver = dns_resolver.clone(); let forbidden_ip = forbidden_ip.clone(); let listening = listener.incoming() .for_each(move |(socket, addr)| { let server_cfg = svr_cfg.clone(); let forbidden_ip = forbidden_ip.clone(); - let cpu_pool = cpu_pool.clone(); + let dns_resolver = dns_resolver.clone(); trace!("Got connection, addr: {}", addr); trace!("Picked proxy server: {:?}", server_cfg); - TcpRelayServer::handle_client(&handle, cpu_pool, socket, server_cfg, forbidden_ip) + TcpRelayServer::handle_client(&handle, dns_resolver, socket, server_cfg, forbidden_ip) }) .map_err(|err| { error!("Server run failed: {}", err);