Add DnsResolver, support domain name server address

This commit is contained in:
Y. T. Chung
2016-10-26 01:48:55 +08:00
parent 8a8b5b5907
commit f3be905c20
11 changed files with 420 additions and 334 deletions

View File

@@ -41,8 +41,7 @@ use std::env;
use env_logger::LogBuilder;
use log::{LogRecord, LogLevelFilter};
use shadowsocks::config::{self, Config, ServerConfig};
use shadowsocks::config::DEFAULT_DNS_CACHE_CAPACITY;
use shadowsocks::config::{self, Config, ServerConfig, ServerAddr};
use shadowsocks::relay::RelayLocal;
fn main() {
@@ -183,7 +182,7 @@ fn main() {
.unwrap();
let sc = ServerConfig {
addr: svr_addr.parse::<SocketAddr>().expect("Invalid server addr"),
addr: svr_addr.parse::<ServerAddr>().expect("Invalid server addr"),
password: password.to_owned(),
method: match method.parse() {
Ok(m) => m,
@@ -192,7 +191,6 @@ fn main() {
}
},
timeout: None,
dns_cache_capacity: DEFAULT_DNS_CACHE_CAPACITY,
};
config.server.push(sc);

View File

@@ -36,15 +36,13 @@ extern crate env_logger;
extern crate time;
use std::env;
use std::net::SocketAddr;
use clap::{App, Arg};
use env_logger::LogBuilder;
use log::{LogRecord, LogLevelFilter};
use shadowsocks::config::{self, Config, ServerConfig};
use shadowsocks::config::DEFAULT_DNS_CACHE_CAPACITY;
use shadowsocks::config::{self, Config, ServerConfig, ServerAddr};
use shadowsocks::relay::RelayServer;
fn main() {
@@ -90,11 +88,6 @@ fn main() {
.long("encrypt-method")
.takes_value(true)
.help("Encryption method"))
.arg(Arg::with_name("THREADS")
.short("t")
.long("threads")
.takes_value(true)
.help("Threads in thread pool"))
.get_matches();
let mut log_builder = LogBuilder::new();
@@ -194,7 +187,7 @@ fn main() {
.unwrap();
let sc = ServerConfig {
addr: svr_addr.parse::<SocketAddr>().expect("`server-addr` invalid"),
addr: svr_addr.parse::<ServerAddr>().expect("`server-addr` invalid"),
password: password.to_owned(),
method: match method.parse() {
Ok(m) => m,
@@ -203,7 +196,6 @@ fn main() {
}
},
timeout: None,
dns_cache_capacity: DEFAULT_DNS_CACHE_CAPACITY,
};
config.server.push(sc);
@@ -232,11 +224,5 @@ fn main() {
debug!("Config: {:?}", config);
let threads = matches.value_of("THREADS")
.unwrap_or("1")
.parse::<usize>()
.ok()
.expect("`threads` should be an integer");
RelayServer::run(config, threads).unwrap();
RelayServer::run(config).unwrap();
}

View File

@@ -72,37 +72,116 @@ use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6, Ipv4Addr, Ipv6Addr};
use std::string::ToString;
use std::option::Option;
use std::default::Default;
use std::fmt::{self, Debug, Formatter};
use std::fmt::{self, Display, Debug, Formatter};
use std::path::Path;
use std::collections::HashSet;
use std::time::Duration;
use std::convert::From;
use std::str::FromStr;
use ip::IpAddr;
use crypto::cipher::CipherType;
/// Default DNS cache capacity
pub const DEFAULT_DNS_CACHE_CAPACITY: usize = 65536;
pub const DEFAULT_DNS_CACHE_CAPACITY: usize = 128;
/// Server address
#[derive(Clone, Debug)]
pub enum ServerAddr {
/// IP Address
SocketAddr(SocketAddr),
/// Domain name address, eg. example.com:8080
DomainName(String, u16),
}
impl ServerAddr {
/// Get address for server listener
/// Panic if address is domain name
pub fn listen_addr(&self) -> &SocketAddr {
match self {
&ServerAddr::SocketAddr(ref s) => s,
_ => panic!("Cannot use domain name as server listen address"),
}
}
fn to_json_object_inner(&self, obj: &mut json::Object, addr_key: &str, port_key: &str) {
use serialize::json::Json;
match self {
&ServerAddr::SocketAddr(SocketAddr::V4(ref v4)) => {
obj.insert(addr_key.to_owned(), Json::String(v4.ip().to_string()));
obj.insert(port_key.to_owned(), Json::U64(v4.port() as u64));
}
&ServerAddr::SocketAddr(SocketAddr::V6(ref v6)) => {
obj.insert(addr_key.to_owned(), Json::String(v6.ip().to_string()));
obj.insert(port_key.to_owned(), Json::U64(v6.port() as u64));
}
&ServerAddr::DomainName(ref domain, port) => {
obj.insert(addr_key.to_owned(), Json::String(domain.to_owned()));
obj.insert(port_key.to_owned(), Json::U64(port as u64));
}
}
}
fn to_json_object(&self, obj: &mut json::Object) {
self.to_json_object_inner(obj, "address", "port")
}
fn to_json_object_old(&self, obj: &mut json::Object) {
self.to_json_object_inner(obj, "server", "server_port")
}
}
#[derive(Debug)]
pub struct ServerAddrError;
impl FromStr for ServerAddr {
type Err = ServerAddrError;
fn from_str(s: &str) -> Result<ServerAddr, ServerAddrError> {
match s.parse::<SocketAddr>() {
Ok(addr) => Ok(ServerAddr::SocketAddr(addr)),
Err(..) => {
let mut sp = s.split(':');
match (sp.next(), sp.next()) {
(Some(dn), Some(port)) => {
match port.parse::<u16>() {
Ok(port) => Ok(ServerAddr::DomainName(dn.to_owned(), port)),
Err(..) => Err(ServerAddrError),
}
}
_ => Err(ServerAddrError),
}
}
}
}
}
impl Display for ServerAddr {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
&ServerAddr::SocketAddr(ref a) => write!(f, "{}", a),
&ServerAddr::DomainName(ref d, port) => write!(f, "{}:{}", d, port),
}
}
}
/// Configuration for a server
#[derive(Clone, Debug)]
pub struct ServerConfig {
pub addr: SocketAddr,
pub addr: ServerAddr,
pub password: String,
pub method: CipherType,
pub timeout: Option<Duration>,
pub dns_cache_capacity: usize,
}
impl ServerConfig {
pub fn basic(addr: SocketAddr, password: String, method: CipherType) -> ServerConfig {
ServerConfig {
addr: addr,
addr: ServerAddr::SocketAddr(addr),
password: password,
method: method,
timeout: None,
dns_cache_capacity: DEFAULT_DNS_CACHE_CAPACITY,
}
}
}
@@ -112,24 +191,13 @@ impl json::ToJson for ServerConfig {
use serialize::json::Json;
let mut obj = json::Object::new();
match self.addr {
SocketAddr::V4(ref v4) => {
obj.insert("address".to_owned(), Json::String(v4.ip().to_string()));
obj.insert("port".to_owned(), Json::U64(v4.port() as u64));
}
SocketAddr::V6(ref v6) => {
obj.insert("address".to_owned(), Json::String(v6.ip().to_string()));
obj.insert("port".to_owned(), Json::U64(v6.port() as u64));
}
}
self.addr.to_json_object(&mut obj);
obj.insert("password".to_owned(), Json::String(self.password.clone()));
obj.insert("method".to_owned(), Json::String(self.method.to_string()));
if let Some(t) = self.timeout {
obj.insert("timeout".to_owned(), Json::U64(t.as_secs()));
}
obj.insert("dns_cache_capacity".to_owned(),
Json::U64(self.dns_cache_capacity as u64));
Json::Object(obj)
}
@@ -153,6 +221,7 @@ pub struct Config {
pub enable_udp: bool,
pub timeout: Option<Duration>,
pub forbidden_ip: HashSet<IpAddr>,
pub dns_cache_capacity: usize,
}
impl Default for Config {
@@ -177,8 +246,6 @@ pub struct Error {
pub detail: Option<String>,
}
impl Error {
pub fn new(kind: ErrorKind, desc: &'static str, detail: Option<String>) -> Error {
Error {
@@ -229,6 +296,7 @@ impl Debug for Error {
}
impl Config {
/// Creates an empty configuration
pub fn new() -> Config {
Config {
server: Vec::new(),
@@ -237,6 +305,7 @@ impl Config {
enable_udp: false,
timeout: None,
forbidden_ip: HashSet::new(),
dns_cache_capacity: DEFAULT_DNS_CACHE_CAPACITY,
}
}
@@ -286,21 +355,15 @@ impl Config {
})
.and_then(|addr_str| {
addr_str.parse::<Ipv4Addr>()
.map(|v4| SocketAddr::V4(SocketAddrV4::new(v4, port)))
.map(|v4| ServerAddr::SocketAddr(SocketAddr::V4(SocketAddrV4::new(v4, port))))
.or_else(|_| {
addr_str.parse::<Ipv6Addr>()
.map(|v6| SocketAddr::V6(SocketAddrV6::new(v6, port, 0, 0)))
.map(|v6| ServerAddr::SocketAddr(SocketAddr::V6(SocketAddrV6::new(v6, port, 0, 0))))
})
.map_err(|_| Error::new(ErrorKind::Malformed, "invalid server addr", None))
.or_else(|_| Ok(ServerAddr::DomainName(addr_str.to_string(), port)))
});
let mut addr = try!(addr);
// Merge address and port
match addr {
SocketAddr::V4(ref mut v4) => v4.set_port(port),
SocketAddr::V6(ref mut v6) => v6.set_port(port),
}
let addr = try!(addr);
let password = server.get("password")
.ok_or_else(|| Error::new(ErrorKind::MissingField, "need to specify a password", None))
@@ -321,22 +384,11 @@ impl Config {
None => None,
};
let dns_cache_capacity = match server.get("dns_cache_capacity") {
Some(t) => {
try!(t.as_u64()
.ok_or(Error::new(ErrorKind::Malformed,
"`dns_cache_capacity` should be an integer",
None))) as usize
}
None => DEFAULT_DNS_CACHE_CAPACITY,
};
Ok(ServerConfig {
addr: addr,
password: password,
method: method,
timeout: timeout,
dns_cache_capacity: dns_cache_capacity,
})
}
@@ -477,6 +529,18 @@ impl Config {
}));
}
let dns_cache_capacity = match o.get("dns_cache_capacity") {
Some(t) => {
try!(t.as_u64()
.ok_or(Error::new(ErrorKind::Malformed,
"`dns_cache_capacity` should be an integer",
None))) as usize
}
None => DEFAULT_DNS_CACHE_CAPACITY,
};
config.dns_cache_capacity = dns_cache_capacity;
Ok(config)
}
@@ -515,17 +579,7 @@ impl json::ToJson for Config {
// Official format
let server = &self.server[0];
match server.addr {
SocketAddr::V4(ref v4) => {
obj.insert("server".to_owned(), Json::String(v4.ip().to_string()));
obj.insert("server_port".to_owned(), Json::U64(v4.port() as u64));
}
SocketAddr::V6(ref v6) => {
obj.insert("server".to_owned(), Json::String(v6.ip().to_string()));
obj.insert("server_port".to_owned(), Json::U64(v6.port() as u64));
}
}
server.addr.to_json_object_old(&mut obj);
obj.insert("password".to_owned(),
Json::String(self.server[0].password.clone()));
@@ -550,6 +604,8 @@ impl json::ToJson for Config {
}
obj.insert("enable_udp".to_owned(), Json::Boolean(self.enable_udp));
obj.insert("dns_cache_capacity".to_owned(),
Json::U64(self.dns_cache_capacity as u64));
Json::Object(obj)
}

86
src/relay/dns_resolver.rs Normal file
View File

@@ -0,0 +1,86 @@
// The MIT License (MIT)
// Copyright (c) 2014 Y. T. CHUNG <zonyitoo@gmail.com>
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::{Arc, Mutex};
use std::io;
use relay::BoxIoFuture;
use lru_cache::LruCache;
use futures::{self, Future};
use futures_cpupool::CpuPool;
use ip::IpAddr;
fn socket_addr_to_ip(addr: SocketAddr) -> IpAddr {
match addr {
SocketAddr::V4(v4) => IpAddr::V4(v4.ip().clone()),
SocketAddr::V6(v6) => IpAddr::V6(v6.ip().clone()),
}
}
#[derive(Clone)]
pub struct DnsResolver {
cpu_pool: CpuPool,
dns_cache: Arc<Mutex<LruCache<String, IpAddr>>>,
}
impl DnsResolver {
/// Creates an DNS resolver
pub fn new(cache_size: usize) -> DnsResolver {
DnsResolver {
cpu_pool: CpuPool::new_num_cpus(),
dns_cache: Arc::new(Mutex::new(LruCache::new(cache_size))),
}
}
/// Resolves address asynchronously
pub fn resolve(&self, addr: &str) -> BoxIoFuture<IpAddr> {
let dns_cache = self.dns_cache.clone();
let mut guard = self.dns_cache.lock().unwrap();
match guard.get_mut(addr) {
Some(addr) => futures::finished(addr.clone()).boxed(),
None => {
let addr = addr.to_owned();
self.cpu_pool
.spawn(futures::lazy(move || {
let mixed_addr = format!("{}:0", addr);
match try!(mixed_addr.to_socket_addrs()).next() {
Some(sock_addr) => {
let mut guard = dns_cache.lock().unwrap();
let ipaddr = socket_addr_to_ip(sock_addr);
guard.insert(addr, ipaddr);
Ok(ipaddr)
}
None => {
let err = io::Error::new(io::ErrorKind::Other, "Failed to resolve address");
Err(err)
}
}
}))
.boxed()
}
}
}
}

View File

@@ -29,6 +29,7 @@ use tokio_core::reactor::Core;
use relay::tcprelay::local::TcpRelayLocal;
#[cfg(feature = "enable-udp")]
use relay::udprelay::local::UdpRelayLocal;
use relay::dns_resolver::DnsResolver;
use config::Config;
/// Relay server running under local environment.
@@ -38,19 +39,18 @@ use config::Config;
/// use std::sync::Arc;
///
/// use shadowsocks::relay::RelayLocal;
/// use shadowsocks::config::{Config, ServerConfig};
/// use shadowsocks::config::{Config, ServerConfig, ServerAddr};
/// use shadowsocks::crypto::CipherType;
///
/// let mut config = Config::new();
/// config.local = Some("127.0.0.1:1080".parse().unwrap());
/// config.server = vec![ServerConfig {
/// addr: "127.0.0.1:8388".parse().unwrap(),
/// addr: ServerAddr::SocketAddr("127.0.0.1:8388".parse().unwrap()),
/// password: "server-password".to_string(),
/// method: CipherType::Aes256Cfb,
/// timeout: None,
/// dns_cache_capacity: 1024,
/// }];
/// RelayLocal::new(config).run();
/// RelayLocal::run(config);
/// ```
#[derive(Clone)]
pub struct RelayLocal;
@@ -60,7 +60,8 @@ impl RelayLocal {
let mut lp = try!(Core::new());
let handle = lp.handle();
let config = Arc::new(config);
let tcp_fut = TcpRelayLocal::run(config, handle);
let dns_resolver = DnsResolver::new(config.dns_cache_capacity);
let tcp_fut = TcpRelayLocal::run(config, handle, dns_resolver);
lp.run(tcp_fut)
}
}

View File

@@ -26,7 +26,7 @@ pub use self::server::RelayServer;
use std::io;
use futures::BoxFuture;
use futures::Future;
mod tcprelay;
#[cfg(feature = "enable-udp")]
@@ -34,6 +34,7 @@ mod udprelay;
pub mod local;
pub mod server;
mod loadbalancing;
mod dns_resolver;
pub mod socks5;
pub type BoxIoFuture<T> = BoxFuture<T, io::Error>;
pub type BoxIoFuture<T> = Box<Future<Item = T, Error = io::Error>>;

View File

@@ -38,29 +38,28 @@ use config::Config;
/// use std::sync::Arc;
///
/// use shadowsocks::relay::RelayServer;
/// use shadowsocks::config::{Config, ServerConfig};
/// use shadowsocks::config::{Config, ServerConfig, ServerAddr};
/// use shadowsocks::crypto::CipherType;
///
/// let mut config = Config::new();
/// config.server = vec![ServerConfig {
/// addr: "127.0.0.1:8388".parse().unwrap(),
/// addr: ServerAddr::SocketAddr("127.0.0.1:8388".parse().unwrap()),
/// password: "server-password".to_string(),
/// method: CipherType::Aes256Cfb,
/// timeout: None,
/// dns_cache_capacity: 1024,
/// }];
/// RelayServer::new(config).run(1);
/// RelayServer::run(config);
/// ```
///
#[derive(Clone)]
pub struct RelayServer;
impl RelayServer {
pub fn run(config: Config, threads: usize) -> io::Result<()> {
pub fn run(config: Config) -> io::Result<()> {
let mut lp = try!(Core::new());
let handle = lp.handle();
let config = Arc::new(config);
let tcp_fut = TcpRelayServer::run(config, handle, threads);
let tcp_fut = TcpRelayServer::run(config, handle);
lp.run(tcp_fut)
}
}

View File

@@ -1,100 +0,0 @@
// The MIT License (MIT)
// Copyright (c) 2014 Y. T. CHUNG <zonyitoo@gmail.com>
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
use std::sync::Arc;
// use std::sync::atomic::{AtomicOption, SeqCst};
use std::net::lookup_host;
use std::net::SocketAddr;
use coio::sync::Mutex;
use lru_cache::LruCache;
struct DnsLruCache {
cache: LruCache<String, Vec<SocketAddr>>,
totally_matched: usize,
totally_missed: usize,
}
pub struct CachedDns {
lru_cache: Arc<Mutex<DnsLruCache>>,
}
impl CachedDns {
pub fn with_capacity(cache_capacity: usize) -> CachedDns {
CachedDns {
lru_cache: Arc::new(Mutex::new(DnsLruCache {
cache: LruCache::new(cache_capacity),
totally_missed: 0,
totally_matched: 0,
})),
}
}
pub fn resolve(&self, addr_str: &str) -> Option<Vec<SocketAddr>> {
{
let mut cache = self.lru_cache.lock().unwrap();
match cache.cache.get_mut(addr_str).map(|x| x.clone()) {
Some(addrs) => {
cache.totally_matched += 1;
debug!("DNS cache matched!: {}", addr_str);
debug!("DNS cache matched: {}, missed: {}",
cache.totally_matched,
cache.totally_missed);
return Some(addrs);
}
None => {
cache.totally_missed += 1;
debug!("DNS cache missed!: {}", addr_str);
debug!("DNS cache matched: {}, missed: {}",
cache.totally_matched,
cache.totally_missed);
}
}
}
let addr_vec = match lookup_host(addr_str) {
Ok(addrs) => addrs.collect::<Vec<SocketAddr>>(),
Err(err) => {
error!("Failed to resolve {}: {}", addr_str, err);
return None;
}
};
let cloned_mutex = self.lru_cache.clone();
if addr_vec.is_empty() {
error!("Failed to resolve {}", addr_str);
return None;
}
let cloned_addrs = addr_vec.clone();
let addr_string: String = addr_str.to_owned();
{
let mut cache = cloned_mutex.lock().unwrap();
cache.cache.insert(addr_string, cloned_addrs);
}
Some(addr_vec)
}
}
unsafe impl Send for CachedDns {}
unsafe impl Sync for CachedDns {}

View File

@@ -25,7 +25,7 @@ use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use futures::{self, Future, BoxFuture};
use futures::{self, Future};
use futures::stream::Stream;
use tokio_core::net::{TcpStream, TcpListener};
@@ -43,6 +43,7 @@ use relay::socks5::{TcpRequestHeader, TcpResponseHeader};
use relay::loadbalancing::server::RoundRobin;
use relay::loadbalancing::server::LoadBalancer;
use relay::BoxIoFuture;
use relay::dns_resolver::DnsResolver;
use super::http::{self, HttpRequestFut};
use super::tunnel;
@@ -51,11 +52,14 @@ use super::tunnel;
pub struct TcpRelayLocal;
impl TcpRelayLocal {
pub fn run(config: Arc<Config>, handle: Handle) -> Box<Future<Item = (), Error = io::Error>> {
let tcp_fut = Socks5RelayLocal::run(config.clone(), handle.clone());
pub fn run(config: Arc<Config>,
handle: Handle,
dns_resolver: DnsResolver)
-> Box<Future<Item = (), Error = io::Error>> {
let tcp_fut = Socks5RelayLocal::run(config.clone(), handle.clone(), dns_resolver.clone());
match &config.http_proxy {
&Some(..) => {
let http_fut = HttpRelayServer::run(config, handle);
let http_fut = HttpRelayServer::run(config, handle, dns_resolver);
Box::new(tcp_fut.join(http_fut)
.map(|_| ()))
}
@@ -72,11 +76,12 @@ impl Socks5RelayLocal {
(r, w): (ReadHalf<TcpStream>, WriteHalf<TcpStream>),
client_addr: SocketAddr,
addr: Address,
svr_cfg: Arc<ServerConfig>)
-> BoxFuture<(), io::Error> {
svr_cfg: Arc<ServerConfig>,
dns_resolver: DnsResolver)
-> Box<Future<Item = (), Error = io::Error>> {
let cloned_addr = addr.clone();
let cloned_svr_cfg = svr_cfg.clone();
super::connect_proxy_server(handle, svr_cfg)
let fut = super::connect_proxy_server(handle, svr_cfg, dns_resolver)
.and_then(move |svr_s| {
trace!("Proxy server connected");
@@ -96,11 +101,17 @@ impl Socks5RelayLocal {
tunnel(cloned_addr, whalf, rhalf)
})
})
.boxed()
});
Box::new(fut)
}
fn handle_client(handle: &Handle, s: TcpStream, _: SocketAddr, conf: Arc<ServerConfig>) -> io::Result<()> {
fn handle_client(handle: &Handle,
s: TcpStream,
_: SocketAddr,
conf: Arc<ServerConfig>,
dns_resolver: DnsResolver)
-> io::Result<()> {
let cloned_handle = handle.clone();
let client_addr = try!(s.peer_addr());
let cloned_client_addr = client_addr.clone();
@@ -149,7 +160,12 @@ impl Socks5RelayLocal {
match header.command {
socks5::Command::TcpConnect => {
info!("CONNECT {}", addr);
Socks5RelayLocal::handle_socks5_connect(&cloned_handle, (r, w), cloned_client_addr, addr, conf)
Socks5RelayLocal::handle_socks5_connect(&cloned_handle,
(r, w),
cloned_client_addr,
addr,
conf,
dns_resolver)
}
socks5::Command::TcpBind => {
warn!("BIND is not supported");
@@ -185,7 +201,10 @@ impl Socks5RelayLocal {
}
// Runs TCP relay local server
pub fn run(config: Arc<Config>, handle: Handle) -> Box<Future<Item = (), Error = io::Error>> {
pub fn run(config: Arc<Config>,
handle: Handle,
dns_resolver: DnsResolver)
-> Box<Future<Item = (), Error = io::Error>> {
let listener = {
let local_addr = config.local.as_ref().unwrap();
let listener = TcpListener::bind(local_addr, &handle).unwrap();
@@ -193,13 +212,16 @@ impl Socks5RelayLocal {
listener
};
let dns_resolver = dns_resolver.clone();
let mut servers = RoundRobin::new(&*config);
let listening = listener.incoming()
.for_each(move |(socket, addr)| {
let server_cfg = servers.pick_server();
trace!("Got connection, addr: {}", addr);
trace!("Picked proxy server: {:?}", server_cfg);
Socks5RelayLocal::handle_client(&handle, socket, addr, server_cfg)
let dns_resolver = dns_resolver.clone();
Socks5RelayLocal::handle_client(&handle, socket, addr, server_cfg, dns_resolver)
});
Box::new(listening.map_err(|err| {
@@ -218,13 +240,14 @@ impl HttpRelayServer {
req: http::HttpRequest,
addr: Address,
remains: Vec<u8>,
svr_cfg: Arc<ServerConfig>)
-> BoxFuture<(), io::Error> {
svr_cfg: Arc<ServerConfig>,
dns_resolver: DnsResolver)
-> Box<Future<Item = (), Error = io::Error>> {
let cloned_addr = addr.clone();
let http_version = req.version;
let cloned_svr_cfg = svr_cfg.clone();
super::connect_proxy_server(&handle, svr_cfg)
let fut = super::connect_proxy_server(&handle, svr_cfg, dns_resolver)
.and_then(move |svr_s| {
trace!("Proxy server connected");
@@ -243,46 +266,46 @@ impl HttpRelayServer {
tunnel(cloned_addr, whalf, rhalf)
})
})
.boxed()
});
Box::new(fut)
}
fn handle_http_keepalive(r: ReadHalf<TcpStream>,
svr_w: super::EncryptedHalf,
req_remains: Vec<u8>)
-> BoxIoFuture<()> {
HttpRequestFut::with_buf(r, req_remains)
.then(|res| {
match res {
Ok((r, req, remains)) => {
let should_keep_alive = http::should_keep_alive(&req);
trace!("Going to proxy request: {:?}", req);
trace!("Should keep alive? {}", should_keep_alive);
let fut = HttpRequestFut::with_buf(r, req_remains).then(|res| {
match res {
Ok((r, req, remains)) => {
let should_keep_alive = http::should_keep_alive(&req);
trace!("Going to proxy request: {:?}", req);
trace!("Should keep alive? {}", should_keep_alive);
http::proxy_request((r, svr_w), None, req, remains)
.and_then(move |(r, svr_w, req_remains)| {
if should_keep_alive {
HttpRelayServer::handle_http_keepalive(r, svr_w, req_remains)
} else {
futures::finished(()).boxed()
}
})
.boxed()
}
Err(err) => {
futures::lazy(|| {
use std::io::ErrorKind;
match err.kind() {
// It is Ok for client to close connection
ErrorKind::UnexpectedEof | ErrorKind::BrokenPipe => Ok(()),
_ => Err(err),
}
})
.boxed()
}
let fut = http::proxy_request((r, svr_w), None, req, remains)
.and_then(move |(r, svr_w, req_remains)| {
if should_keep_alive {
HttpRelayServer::handle_http_keepalive(r, svr_w, req_remains)
} else {
futures::finished(()).boxed()
}
});
Box::new(fut) as BoxIoFuture<()>
}
})
.boxed()
Err(err) => {
let fut = futures::lazy(|| {
use std::io::ErrorKind;
match err.kind() {
// It is Ok for client to close connection
ErrorKind::UnexpectedEof | ErrorKind::BrokenPipe => Ok(()),
_ => Err(err),
}
});
Box::new(fut) as BoxIoFuture<()>
}
}
});
Box::new(fut)
}
fn handle_http_proxy(handle: Handle,
@@ -291,44 +314,49 @@ impl HttpRelayServer {
req: http::HttpRequest,
addr: Address,
remains: Vec<u8>,
svr_cfg: Arc<ServerConfig>)
-> BoxFuture<(), io::Error> {
svr_cfg: Arc<ServerConfig>,
dns_resolver: DnsResolver)
-> Box<Future<Item = (), Error = io::Error>> {
trace!("Using HTTP Proxy for {} -> {}", client_addr, addr);
let should_keep_alive = http::should_keep_alive(&req);
super::connect_proxy_server(&handle, svr_cfg.clone())
.and_then(move |svr_s| {
trace!("Proxy server connected");
let fut = super::connect_proxy_server(&handle, svr_cfg.clone(), dns_resolver).and_then(move |svr_s| {
trace!("Proxy server connected");
let cloned_addr = addr.clone();
super::proxy_server_handshake(svr_s, svr_cfg, addr).and_then(move |(svr_r, svr_w)| {
// Just proxy anything to client
let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w));
let whalf = svr_w.and_then(move |svr_w| {
// Send the first request to server
trace!("Going to proxy request: {:?}", req);
trace!("Should keep alive? {}", should_keep_alive);
http::proxy_request((r, svr_w), None, req, remains)
.and_then(move |(r, svr_w, req_remains)| {
if should_keep_alive {
HttpRelayServer::handle_http_keepalive(r, svr_w, req_remains)
} else {
futures::finished(()).boxed()
}
})
});
let cloned_addr = addr.clone();
super::proxy_server_handshake(svr_s, svr_cfg, addr).and_then(move |(svr_r, svr_w)| {
// Just proxy anything to client
let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w));
let whalf = svr_w.and_then(move |svr_w| {
// Send the first request to server
trace!("Going to proxy request: {:?}", req);
trace!("Should keep alive? {}", should_keep_alive);
http::proxy_request((r, svr_w), None, req, remains).and_then(move |(r, svr_w, req_remains)| {
if should_keep_alive {
HttpRelayServer::handle_http_keepalive(r, svr_w, req_remains)
} else {
futures::finished(()).boxed()
}
})
});
rhalf.join(whalf)
.then(move |_| {
trace!("Relay to {} is finished", cloned_addr);
Ok(())
})
})
rhalf.join(whalf)
.then(move |_| {
trace!("Relay to {} is finished", cloned_addr);
Ok(())
})
})
.boxed()
});
Box::new(fut)
}
fn handle_client(handle: &Handle, socket: TcpStream, _: SocketAddr, svr_cfg: Arc<ServerConfig>) -> io::Result<()> {
fn handle_client(handle: &Handle,
socket: TcpStream,
_: SocketAddr,
svr_cfg: Arc<ServerConfig>,
dns_resolver: DnsResolver)
-> io::Result<()> {
let cloned_handle = handle.clone();
let client_addr = try!(socket.peer_addr());
let fut = futures::lazy(|| Ok(socket.split()))
@@ -362,7 +390,13 @@ impl HttpRelayServer {
match req.method.clone() {
Method::Connect => {
info!("CONNECT (Http) {}", addr);
HttpRelayServer::handle_connect(cloned_handle, (r, w), req, addr, remains, svr_cfg)
HttpRelayServer::handle_connect(cloned_handle,
(r, w),
req,
addr,
remains,
svr_cfg,
dns_resolver)
}
met => {
info!("{} (Http) {}", met, addr);
@@ -372,7 +406,8 @@ impl HttpRelayServer {
req,
addr,
remains,
svr_cfg)
svr_cfg,
dns_resolver)
}
}
});
@@ -393,7 +428,10 @@ impl HttpRelayServer {
Ok(())
}
pub fn run(config: Arc<Config>, handle: Handle) -> Box<Future<Item = (), Error = io::Error>> {
pub fn run(config: Arc<Config>,
handle: Handle,
dns_resolver: DnsResolver)
-> Box<Future<Item = (), Error = io::Error>> {
let listener = {
let local_addr = config.http_proxy.as_ref().unwrap();
let listener = TcpListener::bind(local_addr, &handle).unwrap();
@@ -407,7 +445,8 @@ impl HttpRelayServer {
let server_cfg = servers.pick_server();
trace!("Got connection, addr: {}", addr);
trace!("Picked proxy server: {:?}", server_cfg);
HttpRelayServer::handle_client(&handle, socket, addr, server_cfg)
let dns_resolver = dns_resolver.clone();
HttpRelayServer::handle_client(&handle, socket, addr, server_cfg, dns_resolver)
});
Box::new(listening.map_err(|err| {

View File

@@ -22,6 +22,7 @@
//! TcpRelay implementation
use std::io::{self, Read, Write};
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6};
use std::sync::Arc;
use std::mem;
@@ -29,7 +30,8 @@ use crypto::cipher;
use crypto::CryptoMode;
use relay::socks5::Address;
use relay::BoxIoFuture;
use config::ServerConfig;
use relay::dns_resolver::DnsResolver;
use config::{ServerConfig, ServerAddr};
use tokio_core::net::TcpStream;
use tokio_core::reactor::Handle;
@@ -39,9 +41,10 @@ use tokio_core::io::Io;
use futures::{self, Future, BoxFuture, Poll};
use ip::IpAddr;
use self::stream::{EncryptedWriter, DecryptedReader};
// mod cached_dns;
pub mod local;
pub mod server;
mod stream;
@@ -59,8 +62,25 @@ pub type EncryptedHalf = EncryptedWriter<WriteHalf<TcpStream>>;
pub type DecryptedHalfFut = BoxFuture<DecryptedHalf, io::Error>;
pub type EncryptedHalfFut = BoxFuture<EncryptedHalf, io::Error>;
fn connect_proxy_server(handle: &Handle, svr_cfg: Arc<ServerConfig>) -> BoxIoFuture<TcpStream> {
TcpStream::connect(&svr_cfg.addr, handle).boxed()
fn connect_proxy_server(handle: &Handle,
svr_cfg: Arc<ServerConfig>,
dns_resolver: DnsResolver)
-> Box<Future<Item = TcpStream, Error = io::Error>> {
match &svr_cfg.addr {
&ServerAddr::SocketAddr(ref addr) => Box::new(TcpStream::connect(addr, handle)),
&ServerAddr::DomainName(ref domain, port) => {
let handle = handle.clone();
let fut = dns_resolver.resolve(&domain[..])
.and_then(move |sockaddr| {
let sockaddr = match sockaddr {
IpAddr::V4(v4) => SocketAddr::V4(SocketAddrV4::new(v4, port)),
IpAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(v6, port, 0, 0)),
};
TcpStream::connect(&sockaddr, &handle).boxed()
});
Box::new(fut)
}
}
}
/// Handshake logic for ShadowSocks Client
@@ -68,20 +88,19 @@ pub fn proxy_server_handshake(remote_stream: TcpStream,
svr_cfg: Arc<ServerConfig>,
relay_addr: Address)
-> BoxIoFuture<(DecryptedHalfFut, EncryptedHalfFut)> {
proxy_handshake(remote_stream, svr_cfg)
.and_then(|(r_fut, w_fut)| {
let w_fut = w_fut.and_then(move |enc_w| {
trace!("Got encrypt stream and going to send addr: {:?}",
relay_addr);
let fut = proxy_handshake(remote_stream, svr_cfg).and_then(|(r_fut, w_fut)| {
let w_fut = w_fut.and_then(move |enc_w| {
trace!("Got encrypt stream and going to send addr: {:?}",
relay_addr);
// Send relay address to remote
relay_addr.write_to(enc_w).and_then(flush)
})
.boxed();
// Send relay address to remote
relay_addr.write_to(enc_w).and_then(flush)
})
.boxed();
Ok((r_fut, w_fut))
})
.boxed()
Ok((r_fut, w_fut))
});
Box::new(fut)
}
/// ShadowSocks Client-Server handshake protocol

View File

@@ -22,7 +22,7 @@
//! TcpRelay server that running on the server side
use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6};
use std::sync::Arc;
use std::collections::HashSet;
@@ -30,12 +30,11 @@ use config::{Config, ServerConfig};
use relay::socks5::Address;
use relay::BoxIoFuture;
use relay::dns_resolver::DnsResolver;
use futures::{self, Future};
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_core::reactor::Handle;
use tokio_core::net::{TcpStream, TcpListener};
use tokio_core::io::Io;
@@ -52,63 +51,63 @@ impl TcpRelayServer {
fn handshake(remote_stream: TcpStream,
svr_cfg: Arc<ServerConfig>)
-> BoxIoFuture<(DecryptedHalf, Address, EncryptedHalfFut)> {
proxy_handshake(remote_stream, svr_cfg)
.and_then(|(r_fut, w_fut)| {
r_fut.and_then(|r| Address::read_from(r).map_err(From::from))
.map(move |(r, addr)| (r, addr, w_fut))
})
.boxed()
let fut = proxy_handshake(remote_stream, svr_cfg).and_then(|(r_fut, w_fut)| {
r_fut.and_then(|r| Address::read_from(r).map_err(From::from))
.map(move |(r, addr)| (r, addr, w_fut))
});
Box::new(fut)
}
fn resolve_address(addr: Address, cpu_pool: CpuPool) -> BoxIoFuture<SocketAddr> {
fn resolve_address(addr: Address, dns_resolver: DnsResolver) -> BoxIoFuture<SocketAddr> {
match addr {
Address::SocketAddress(addr) => futures::finished(addr).boxed(),
Address::SocketAddress(addr) => Box::new(futures::finished(addr)),
Address::DomainNameAddress(dname, port) => {
cpu_pool.spawn(futures::lazy(move || {
let dname = format!("{}:{}", dname, port);
let mut addrs = try!(dname.to_socket_addrs());
addrs.next().ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Failed to resolve domain"))
}))
.boxed()
let fut = dns_resolver.resolve(&dname[..])
.and_then(move |ipaddr| {
Ok(match ipaddr {
IpAddr::V4(v4) => SocketAddr::V4(SocketAddrV4::new(v4, port)),
IpAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(v6, port, 0, 0)),
})
});
Box::new(fut)
}
}
}
fn resolve_remote(cpu_pool: CpuPool,
fn resolve_remote(dns_resolver: DnsResolver,
addr: Address,
forbidden_ip: Arc<HashSet<IpAddr>>)
-> Box<Future<Item = SocketAddr, Error = io::Error>> {
TcpRelayServer::resolve_address(addr, cpu_pool)
.and_then(move |addr| {
trace!("Resolved address as {}", addr);
let ipaddr = match addr.clone() {
SocketAddr::V4(v4) => IpAddr::V4(v4.ip().clone()),
SocketAddr::V6(v6) => IpAddr::V6(v6.ip().clone()),
};
-> BoxIoFuture<SocketAddr> {
let fut = TcpRelayServer::resolve_address(addr, dns_resolver).and_then(move |addr| {
trace!("Resolved address as {}", addr);
let ipaddr = match addr.clone() {
SocketAddr::V4(v4) => IpAddr::V4(v4.ip().clone()),
SocketAddr::V6(v6) => IpAddr::V6(v6.ip().clone()),
};
if forbidden_ip.contains(&ipaddr) {
info!("{} has been forbidden", ipaddr);
let err = io::Error::new(io::ErrorKind::Other, "Forbidden IP");
Err(err)
} else {
Ok(addr)
}
})
.boxed()
if forbidden_ip.contains(&ipaddr) {
info!("{} has been forbidden", ipaddr);
let err = io::Error::new(io::ErrorKind::Other, "Forbidden IP");
Err(err)
} else {
Ok(addr)
}
});
Box::new(fut)
}
fn connect_remote(cpu_pool: CpuPool,
fn connect_remote(dns_resolver: DnsResolver,
handle: Handle,
addr: Address,
forbidden_ip: Arc<HashSet<IpAddr>>)
-> Box<Future<Item = TcpStream, Error = io::Error>> {
-> BoxIoFuture<TcpStream> {
trace!("Connecting to remote {}", addr);
Box::new(TcpRelayServer::resolve_remote(cpu_pool, addr, forbidden_ip)
Box::new(TcpRelayServer::resolve_remote(dns_resolver, addr, forbidden_ip)
.and_then(move |addr| TcpStream::connect(&addr, &handle)))
}
pub fn handle_client(handle: &Handle,
cpu_pool: CpuPool,
dns_resolver: DnsResolver,
s: TcpStream,
svr_cfg: Arc<ServerConfig>,
forbidden_ip: Arc<HashSet<IpAddr>>)
@@ -121,12 +120,13 @@ impl TcpRelayServer {
let fut = TcpRelayServer::handshake(s, svr_cfg).and_then(move |(r, addr, w_fut)| {
info!("Connecting {}", addr);
let cloned_addr = addr.clone();
TcpRelayServer::connect_remote(cpu_pool, cloned_handle.clone(), addr, forbidden_ip).and_then(move |svr_s| {
let (svr_r, svr_w) = svr_s.split();
tunnel(cloned_addr,
copy(r, svr_w),
w_fut.and_then(|w| copy(svr_r, w)))
})
TcpRelayServer::connect_remote(dns_resolver, cloned_handle.clone(), addr, forbidden_ip)
.and_then(move |svr_s| {
let (svr_r, svr_w) = svr_s.split();
tunnel(cloned_addr,
copy(r, svr_w),
w_fut.and_then(|w| copy(svr_r, w)))
})
});
handle.spawn(fut.then(|res| {
@@ -143,8 +143,8 @@ impl TcpRelayServer {
}
/// Runs the server
pub fn run(config: Arc<Config>, handle: Handle, threads: usize) -> Box<Future<Item = (), Error = io::Error>> {
let cpu_pool = CpuPool::new(threads);
pub fn run(config: Arc<Config>, handle: Handle) -> Box<Future<Item = (), Error = io::Error>> {
let dns_resolver = DnsResolver::new(config.dns_cache_capacity);
let mut fut: Option<Box<Future<Item = (), Error = io::Error>>> = None;
@@ -154,6 +154,7 @@ impl TcpRelayServer {
for svr_cfg in &config.server {
let listener = {
let addr = &svr_cfg.addr;
let addr = addr.listen_addr();
let listener = TcpListener::bind(addr, &handle).unwrap();
trace!("ShadowSocks TCP Listening on {}", addr);
listener
@@ -161,17 +162,17 @@ impl TcpRelayServer {
let svr_cfg = Arc::new(svr_cfg.clone());
let handle = handle.clone();
let cpu_pool = cpu_pool.clone();
let dns_resolver = dns_resolver.clone();
let forbidden_ip = forbidden_ip.clone();
let listening = listener.incoming()
.for_each(move |(socket, addr)| {
let server_cfg = svr_cfg.clone();
let forbidden_ip = forbidden_ip.clone();
let cpu_pool = cpu_pool.clone();
let dns_resolver = dns_resolver.clone();
trace!("Got connection, addr: {}", addr);
trace!("Picked proxy server: {:?}", server_cfg);
TcpRelayServer::handle_client(&handle, cpu_pool, socket, server_cfg, forbidden_ip)
TcpRelayServer::handle_client(&handle, dns_resolver, socket, server_cfg, forbidden_ip)
})
.map_err(|err| {
error!("Server run failed: {}", err);