Removed all global state maps, which will allow starting multiple shadowsocks instances in one process

This commit is contained in:
zonyitoo
2018-12-30 15:58:22 +08:00
parent 1e026296ae
commit 5dec63a044
18 changed files with 234 additions and 186 deletions

1
Cargo.lock generated
View File

@@ -1220,7 +1220,6 @@ dependencies = [
"env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"json5 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.45 (registry+https://github.com/rust-lang/crates.io-index)",
"libsodium-ffi 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@@ -47,7 +47,6 @@ libc = "0.2"
futures = "0.1"
tokio-io = "0.1"
tokio = "0.1"
lazy_static = "1.0"
json5 = "0.2"
base64 = "0.9"
bytes = "0.4"

View File

@@ -730,6 +730,11 @@ impl Config {
}
}
// UDP switch
if let Some(enable) = config.enable_udp {
nconfig.enable_udp = enable;
}
Ok(nconfig)
}

60
src/context.rs Normal file
View File

@@ -0,0 +1,60 @@
//! Shadowsocks Server Context
use std::{
net::SocketAddr,
sync::{Arc, Mutex, MutexGuard},
time::Instant,
};
use lru_cache::LruCache;
use trust_dns_resolver::AsyncResolver;
use config::Config;
use relay::dns_resolver::create_resolver;
type DnsQueryCache = LruCache<u16, (SocketAddr, Instant)>;
#[derive(Clone)]
pub struct Context {
config: Config,
dns_resolver: Arc<AsyncResolver>,
dns_query_cache: Option<Arc<Mutex<DnsQueryCache>>>,
}
pub type SharedContext = Arc<Context>;
impl Context {
pub fn new(config: Config) -> Context {
let resolver = create_resolver(config.get_dns_config());
Context {
config: config,
dns_resolver: Arc::new(resolver),
dns_query_cache: None,
}
}
pub fn new_dns(config: Config) -> Context {
let resolver = create_resolver(config.get_dns_config());
Context {
config: config,
dns_resolver: Arc::new(resolver),
dns_query_cache: Some(Arc::new(Mutex::new(LruCache::new(1024)))),
}
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn config_mut(&mut self) -> &mut Config {
&mut self.config
}
pub fn dns_resolver(&self) -> &AsyncResolver {
&*self.dns_resolver
}
pub fn dns_query_cache<'a>(&'a self) -> MutexGuard<'a, DnsQueryCache> {
self.dns_query_cache.as_ref().unwrap().lock().unwrap()
}
}

View File

@@ -77,8 +77,6 @@ extern crate bytes;
extern crate digest;
#[macro_use]
extern crate futures;
#[macro_use]
extern crate lazy_static;
extern crate libc;
#[cfg(feature = "sodium")]
extern crate libsodium_ffi;
@@ -117,6 +115,7 @@ pub use self::{
};
pub mod config;
mod context;
pub mod crypto;
mod monitor;
pub mod plugin;

View File

@@ -1,19 +1,14 @@
//! DNS relay
use std::{io, sync::Arc};
use std::io;
use futures::Future;
use futures::{self, Future};
use super::dns_resolver::set_dns_config;
use config::Config;
use context::{Context, SharedContext};
use relay::udprelay::dns::run as run_udp;
/// DNS Relay server running under local environment.
pub fn run(config: Config) -> impl Future<Item = (), Error = io::Error> + Send {
if let Some(c) = config.get_dns_config() {
set_dns_config(c);
}
let config = Arc::new(config);
run_udp(config)
futures::lazy(move || run_udp(SharedContext::new(Context::new_dns(config))))
}

View File

@@ -3,41 +3,20 @@
use std::{
io::{self, ErrorKind},
net::SocketAddr,
sync::Arc,
};
use futures::Future;
use spin::Mutex;
use tokio;
use trust_dns_resolver::{config::ResolverConfig, AsyncResolver};
use config::Config;
use context::SharedContext;
// Taken from
// bluejekyll/trust-dns/resolver/examples/global_resolver.rs
lazy_static! {
static ref GLOBAL_DNS_ADDRESS: Mutex<Option<ResolverConfig>> = Mutex::new(None);
// First we need to setup the global Resolver
static ref GLOBAL_DNS_RESOLVER: AsyncResolver = init_resolver();
}
/// Set address for global DNS resolver
/// Must be called before servers are actually run
pub fn set_dns_config(addr: ResolverConfig) {
*GLOBAL_DNS_ADDRESS.lock() = Some(addr);
}
fn get_dns_address() -> Option<ResolverConfig> {
GLOBAL_DNS_ADDRESS.lock().clone()
}
fn init_resolver() -> AsyncResolver {
pub fn create_resolver(dns: Option<ResolverConfig>) -> AsyncResolver {
let (resolver, bg) = {
// To make this independent, if targeting macOS, BSD, Linux, or Windows, we can use the system's configuration:
#[cfg(any(unix, windows))]
{
if let Some(conf) = get_dns_address() {
if let Some(conf) = dns {
use trust_dns_resolver::config::ResolverOpts;
AsyncResolver::new(conf, ResolverOpts::default())
} else {
@@ -54,7 +33,7 @@ fn init_resolver() -> AsyncResolver {
// Directly reference the config types
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
if let Some(conf) = get_dns_address() {
if let Some(conf) = dns {
AsyncResolver::new(conf, ResolverOpts::default())
} else {
// Get a new resolver with the google nameservers as the upstream recursive resolvers
@@ -70,15 +49,17 @@ fn init_resolver() -> AsyncResolver {
}
fn inner_resolve(
config: Arc<Config>,
context: SharedContext,
addr: &str,
port: u16,
check_forbidden: bool,
) -> impl Future<Item = Vec<SocketAddr>, Error = io::Error> + Send {
let owned_addr = addr.to_owned();
let owned_addr2 = owned_addr.clone();
let cloned_context = context.clone();
GLOBAL_DNS_RESOLVER
context
.dns_resolver()
.lookup_ip(addr)
.map_err(move |err| {
error!("Failed to resolve {}, err: {}", owned_addr2, err);
@@ -88,7 +69,7 @@ fn inner_resolve(
let mut vaddr = Vec::new();
for ip in lookup_result.iter() {
if check_forbidden {
let forbidden_ip = &config.forbidden_ip;
let forbidden_ip = &cloned_context.config().forbidden_ip;
if forbidden_ip.contains(&ip) {
debug!("Resolved {} => {}, which is skipped by forbidden_ip", owned_addr, ip);
continue;
@@ -112,10 +93,10 @@ fn inner_resolve(
/// Resolve address to IP
pub fn resolve(
config: Arc<Config>,
context: SharedContext,
addr: &str,
port: u16,
check_forbidden: bool,
) -> impl Future<Item = Vec<SocketAddr>, Error = io::Error> + Send {
inner_resolve(config, addr, port, check_forbidden)
inner_resolve(context, addr, port, check_forbidden)
}

View File

@@ -1,11 +1,11 @@
//! Local side
use std::{io, sync::Arc};
use std::io;
use futures::{stream::futures_unordered, Future, Stream};
use super::dns_resolver::set_dns_config;
use config::Config;
use context::{Context, SharedContext};
use plugin::{launch_plugin, PluginMode};
use relay::{boxed_future, tcprelay::local::run as run_tcp, udprelay::local::run as run_udp};
@@ -33,39 +33,36 @@ use relay::{boxed_future, tcprelay::local::run as run_tcp, udprelay::local::run
/// let fut = run(config);
/// tokio::run(fut.map_err(|err| panic!("Server run failed with error {}", err)));
/// ```
pub fn run(mut config: Config) -> impl Future<Item = (), Error = io::Error> + Send {
if let Some(c) = config.get_dns_config() {
set_dns_config(c);
}
pub fn run(config: Config) -> impl Future<Item = (), Error = io::Error> + Send {
futures::lazy(move || {
let mut vf = Vec::new();
let mut vf = Vec::new();
let mut context = Context::new(config);
if config.enable_udp {
// Clone config here, because the config for TCP relay will be modified
// after plugins started
let udp_config = Arc::new(config.clone());
if context.config().enable_udp {
// Clone config here, because the config for TCP relay will be modified
// after plugins started
let udp_context = SharedContext::new(context.clone());
// Run UDP relay before starting plugins
// Because plugins doesn't support UDP relay
let udp_fut = run_udp(udp_config);
vf.push(boxed_future(udp_fut));
}
// Hold it here, kill all plugins when `tokio::run` is finished
let plugins = launch_plugin(&mut config, PluginMode::Client).expect("Failed to launch plugins");
let mon = ::monitor::monitor_signal(plugins);
// Recreate shared config here
let config = Arc::new(config);
let tcp_fut = run_tcp(config.clone());
vf.push(boxed_future(mon));
vf.push(boxed_future(tcp_fut));
futures_unordered(vf).into_future().then(|res| -> io::Result<()> {
match res {
Ok(..) => Ok(()),
Err((err, ..)) => Err(err),
// Run UDP relay before starting plugins
// Because plugins doesn't support UDP relay
let udp_fut = run_udp(udp_context);
vf.push(boxed_future(udp_fut));
}
// Hold it here, kill all plugins when `tokio::run` is finished
let plugins = launch_plugin(context.config_mut(), PluginMode::Client).expect("Failed to launch plugins");
let mon = ::monitor::monitor_signal(plugins);
let tcp_fut = run_tcp(SharedContext::new(context));
vf.push(boxed_future(mon));
vf.push(boxed_future(tcp_fut));
futures_unordered(vf).into_future().then(|res| -> io::Result<()> {
match res {
Ok(..) => Ok(()),
Err((err, ..)) => Err(err),
}
})
})
}

View File

@@ -3,7 +3,7 @@
use futures::Future;
pub mod dns;
mod dns_resolver;
pub(crate) mod dns_resolver;
mod loadbalancing;
pub mod local;
pub mod server;

View File

@@ -1,11 +1,11 @@
//! Server side
use std::{io, sync::Arc};
use std::io;
use futures::{stream::futures_unordered, Future, Stream};
use super::dns_resolver::set_dns_config;
use config::Config;
use context::{Context, SharedContext};
use plugin::{launch_plugin, PluginMode};
use relay::{boxed_future, tcprelay::server::run as run_tcp, udprelay::server::run as run_udp};
@@ -33,40 +33,37 @@ use relay::{boxed_future, tcprelay::server::run as run_tcp, udprelay::server::ru
/// let fut = run(config);
/// tokio::run(fut.map_err(|err| panic!("Server run failed with error {}", err)));
/// ```
pub fn run(mut config: Config) -> impl Future<Item = (), Error = io::Error> + Send {
if let Some(c) = config.get_dns_config() {
set_dns_config(c);
}
pub fn run(config: Config) -> impl Future<Item = (), Error = io::Error> + Send {
futures::lazy(move || {
let mut context = Context::new(config);
let mut vf = Vec::new();
let mut vf = Vec::new();
if config.enable_udp {
// Clone config here, because the config for TCP relay will be modified
// after plugins started
let udp_config = Arc::new(config.clone());
if context.config().enable_udp {
// Clone config here, because the config for TCP relay will be modified
// after plugins started
let udp_context = SharedContext::new(context.clone());
// Run UDP relay before starting plugins
// Because plugins doesn't support UDP relay
let udp_fut = run_udp(udp_config);
vf.push(boxed_future(udp_fut));
}
// Hold it here, kill all plugins when `tokio::run` is finished
let plugins = launch_plugin(&mut config, PluginMode::Server).expect("Failed to launch plugins");
let mon = ::monitor::monitor_signal(plugins);
// Recreate shared config here
let config = Arc::new(config);
let tcp_fut = run_tcp(config.clone());
vf.push(boxed_future(mon));
vf.push(boxed_future(tcp_fut));
futures_unordered(vf).into_future().then(|res| -> io::Result<()> {
match res {
Ok(..) => Ok(()),
Err((err, ..)) => Err(err),
// Run UDP relay before starting plugins
// Because plugins doesn't support UDP relay
let udp_fut = run_udp(udp_context);
vf.push(boxed_future(udp_fut));
}
// Hold it here, kill all plugins when `tokio::run` is finished
let plugins = launch_plugin(context.config_mut(), PluginMode::Server).expect("Failed to launch plugins");
let mon = ::monitor::monitor_signal(plugins);
let tcp_fut = run_tcp(SharedContext::new(context));
vf.push(boxed_future(mon));
vf.push(boxed_future(tcp_fut));
futures_unordered(vf).into_future().then(|res| -> io::Result<()> {
match res {
Ok(..) => Ok(()),
Err((err, ..)) => Err(err),
}
})
})
}

View File

@@ -1,13 +1,13 @@
//! Relay for TCP server that running on local environment
use std::{io, sync::Arc};
use std::io;
use futures::Future;
use super::socks5_local;
use config::Config;
use context::SharedContext;
/// Starts a TCP local server
pub fn run(config: Arc<Config>) -> impl Future<Item = (), Error = io::Error> + Send {
socks5_local::run(config)
pub fn run(context: SharedContext) -> impl Future<Item = (), Error = io::Error> + Send {
socks5_local::run(context)
}

View File

@@ -9,7 +9,8 @@ use std::{
time::Duration,
};
use config::{Config, ServerAddr, ServerConfig};
use config::{ServerAddr, ServerConfig};
use context::SharedContext;
use crypto::CipherCategory;
use relay::{boxed_future, dns_resolver::resolve, socks5::Address};
@@ -227,7 +228,7 @@ impl<I: Iterator<Item = SocketAddr>> Future for TcpStreamConnect<I> {
}
fn connect_proxy_server(
config: Arc<Config>,
context: SharedContext,
svr_cfg: Arc<ServerConfig>,
) -> impl Future<Item = TcpStream, Error = io::Error> + Send {
let timeout = svr_cfg.timeout();
@@ -239,7 +240,7 @@ fn connect_proxy_server(
}
ServerAddr::DomainName(ref domain, port) => {
let fut = {
try_timeout(resolve(config.clone(), &domain[..], port, false), timeout).and_then(move |vec_ipaddr| {
try_timeout(resolve(context, &domain[..], port, false), timeout).and_then(move |vec_ipaddr| {
let fut = TcpStreamConnect::new(vec_ipaddr.into_iter());
try_timeout(fut, timeout)
})

View File

@@ -7,7 +7,7 @@ use std::{
time::Duration,
};
use config::{Config, ServerConfig};
use config::ServerConfig;
use relay::{
boxed_future,
@@ -16,6 +16,8 @@ use relay::{
tcprelay::crypto_io::{DecryptedRead, EncryptedWrite},
};
use context::SharedContext;
use futures::{
self,
stream::{futures_unordered, Stream},
@@ -37,7 +39,7 @@ use super::{proxy_handshake, try_timeout, tunnel, DecryptedHalf, EncryptedHalf,
pub struct TcpRelayClientHandshake {
s: TcpStream,
svr_cfg: Arc<ServerConfig>,
config: Arc<Config>,
context: SharedContext,
}
impl TcpRelayClientHandshake {
@@ -59,7 +61,7 @@ impl TcpRelayClientHandshake {
Item = TcpRelayClientPending<impl Future<Item = EncryptedHalf, Error = io::Error> + Send + 'static>,
Error = io::Error,
> + Send {
let TcpRelayClientHandshake { s, svr_cfg, config } = self;
let TcpRelayClientHandshake { s, svr_cfg, context } = self;
futures::lazy(move || s.peer_addr().map(|p| (s, p))).and_then(|(s, peer_addr)| {
debug!("Handshaking with peer {}", peer_addr);
@@ -77,7 +79,7 @@ impl TcpRelayClientHandshake {
addr: addr,
w: w_fut,
timeout: timeout,
config: config,
context: context,
})
})
})
@@ -93,13 +95,13 @@ where
addr: Address,
w: E,
timeout: Option<Duration>,
config: Arc<Config>,
context: SharedContext,
}
/// Connect to the remote server
#[inline]
fn connect_remote(
config: Arc<Config>,
context: SharedContext,
addr: Address,
timeout: Option<Duration>,
) -> impl Future<Item = TcpStream, Error = io::Error> + Send {
@@ -107,7 +109,7 @@ fn connect_remote(
match addr {
Address::SocketAddress(saddr) => {
if config.forbidden_ip.contains(&saddr.ip()) {
if context.config().forbidden_ip.contains(&saddr.ip()) {
let err = io::Error::new(
ErrorKind::Other,
format!("{} is forbidden, failed to connect {}", saddr.ip(), saddr),
@@ -121,7 +123,7 @@ fn connect_remote(
}
Address::DomainNameAddress(dname, port) => {
let fut = {
try_timeout(resolve(config, dname.as_str(), port, true), timeout).and_then(move |addrs| {
try_timeout(resolve(context, dname.as_str(), port, true), timeout).and_then(move |addrs| {
let conn = TcpStreamConnect::new(addrs.into_iter());
try_timeout(conn, timeout)
})
@@ -145,7 +147,7 @@ where
let addr = self.addr.clone();
let client_pair = (self.r, self.w);
let timeout = self.timeout;
connect_remote(self.config, self.addr, self.timeout).map(move |stream| TcpRelayClientConnected {
connect_remote(self.context, self.addr, self.timeout).map(move |stream| TcpRelayClientConnected {
server: stream.split(),
client: client_pair,
addr: addr,
@@ -186,7 +188,7 @@ where
fn handle_client(
server_cfg: Arc<ServerConfig>,
config: Arc<Config>,
context: SharedContext,
socket: TcpStream,
) -> impl Future<Item = (), Error = ()> + Send {
if let Err(err) = socket.set_keepalive(server_cfg.timeout()) {
@@ -211,7 +213,7 @@ fn handle_client(
let client = TcpRelayClientHandshake {
s: socket,
svr_cfg: server_cfg,
config: config,
context: context,
};
client
@@ -225,10 +227,10 @@ fn handle_client(
}
/// Runs the server
pub fn run(config: Arc<Config>) -> impl Future<Item = (), Error = io::Error> + Send {
let mut vec_fut = Vec::with_capacity(config.server.len());
pub fn run(context: SharedContext) -> impl Future<Item = (), Error = io::Error> + Send {
let mut vec_fut = Vec::with_capacity(context.config().server.len());
for svr_cfg in &config.server {
for svr_cfg in &context.config().server {
let listener = {
let addr = svr_cfg.addr();
let addr = addr.listen_addr();
@@ -240,13 +242,13 @@ pub fn run(config: Arc<Config>) -> impl Future<Item = (), Error = io::Error> + S
};
let svr_cfg = Arc::new(svr_cfg.clone());
let config = config.clone();
let context = context.clone();
let listening = listener
.incoming()
.for_each(move |socket| {
let server_cfg = svr_cfg.clone();
let config = config.clone();
tokio::spawn(handle_client(server_cfg, config, socket));
let context = context.clone();
tokio::spawn(handle_client(server_cfg, context, socket));
Ok(())
})
.map_err(|err| {

View File

@@ -13,7 +13,8 @@ use tokio_io::{
AsyncRead,
};
use config::{Config, ServerConfig};
use config::ServerConfig;
use context::SharedContext;
use relay::{
boxed_future,
@@ -31,7 +32,7 @@ struct UdpConfig {
}
fn handle_socks5_connect(
config: Arc<Config>,
context: SharedContext,
(r, w): (ReadHalf<TcpStream>, WriteHalf<TcpStream>),
client_addr: SocketAddr,
addr: Address,
@@ -40,7 +41,7 @@ fn handle_socks5_connect(
let cloned_addr = addr.clone();
let cloned_svr_cfg = svr_cfg.clone();
let timeout = svr_cfg.timeout();
super::connect_proxy_server(config.clone(), svr_cfg)
super::connect_proxy_server(context.clone(), svr_cfg)
.then(move |res| {
let (header, r) = match res {
Ok(svr_s) => {
@@ -89,7 +90,7 @@ fn handle_socks5_connect(
}
fn handle_socks5_client(
config: Arc<Config>,
context: SharedContext,
s: TcpStream,
conf: Arc<ServerConfig>,
udp_conf: UdpConfig,
@@ -153,7 +154,7 @@ fn handle_socks5_client(
match header.command {
socks5::Command::TcpConnect => {
debug!("CONNECT {}", addr);
let fut = handle_socks5_connect(config, (r, w), cloned_client_addr, addr, conf);
let fut = handle_socks5_connect(context, (r, w), cloned_client_addr, addr, conf);
boxed_future(fut)
}
socks5::Command::TcpBind => {
@@ -200,25 +201,25 @@ fn handle_socks5_client(
}
/// Starts a TCP local server with Socks5 proxy protocol
pub fn run(config: Arc<Config>) -> impl Future<Item = (), Error = io::Error> + Send {
let local_addr = *config.local.as_ref().expect("Missing local config");
pub fn run(context: SharedContext) -> impl Future<Item = (), Error = io::Error> + Send {
let local_addr = *context.config().local.as_ref().expect("Missing local config");
let listener = TcpListener::bind(&local_addr).unwrap_or_else(|err| panic!("Failed to listen, {}", err));
info!("ShadowSocks TCP Listening on {}", local_addr);
let udp_conf = UdpConfig {
enable_udp: config.enable_udp,
enable_udp: context.config().enable_udp,
client_addr: local_addr,
};
let mut servers = RoundRobin::new(&*config);
let mut servers = RoundRobin::new(context.config());
listener.incoming().for_each(move |socket| {
let server_cfg = servers.pick_server();
trace!("Got connection, addr: {}", socket.peer_addr()?);
trace!("Picked proxy server: {:?}", server_cfg);
handle_socks5_client(config.clone(), socket, server_cfg, udp_conf.clone())
handle_socks5_client(context.clone(), socket, server_cfg, udp_conf.clone())
})
}

View File

@@ -10,7 +10,6 @@ use std::{
use dns_parser::{Packet, RRData};
use futures::{self, future::join_all, stream::futures_unordered, Future, Stream};
use lru_cache::LruCache;
use tokio::{self, net::UdpSocket};
use super::{
@@ -19,7 +18,8 @@ use super::{
SendDgramRc,
SharedUdpSocket,
};
use config::{Config, ServerAddr, ServerConfig};
use config::{ServerAddr, ServerConfig};
use context::SharedContext;
use relay::{boxed_future, dns_resolver::resolve, socks5::Address};
struct PrettyRRData<'a> {
@@ -135,40 +135,40 @@ impl<'a> fmt::Display for PrettyPacket<'a> {
}
/// Starts a UDP DNS server
pub fn run(config: Arc<Config>) -> impl Future<Item = (), Error = io::Error> + Send {
let local_addr = *config.local.as_ref().unwrap();
pub fn run(context: SharedContext) -> impl Future<Item = (), Error = io::Error> + Send {
let local_addr = *context.config().local.as_ref().unwrap();
futures::lazy(move || {
info!("ShadowSocks UDP DNS Listening on {}", local_addr);
UdpSocket::bind(&local_addr)
})
.and_then(move |l| listen(config, l))
.and_then(move |l| listen(context, l))
}
fn listen(config: Arc<Config>, l: UdpSocket) -> impl Future<Item = (), Error = io::Error> + Send {
assert!(!config.server.is_empty());
fn listen(context: SharedContext, l: UdpSocket) -> impl Future<Item = (), Error = io::Error> + Send {
assert!(!context.config().server.is_empty());
let mut svr_fut = Vec::with_capacity(config.server.len());
for svr in &config.server {
let mut svr_fut = Vec::with_capacity(context.config().server.len());
for svr in &context.config().server {
match *svr.addr() {
ServerAddr::SocketAddr(ref addr) => {
svr_fut.push(boxed_future(futures::finished::<_, io::Error>(vec![*addr])));
}
ServerAddr::DomainName(ref dom, ref port) => {
svr_fut.push(boxed_future(resolve(config.clone(), &*dom, *port, false)));
svr_fut.push(boxed_future(resolve(context.clone(), &*dom, *port, false)));
}
}
}
let cloned_config = config.clone();
let cloned_context = context.clone();
join_all(svr_fut)
.and_then(move |svr_addrs| {
let mut u = Vec::with_capacity(svr_addrs.len());
for (idx, svr_addr) in svr_addrs.into_iter().enumerate() {
let local_addr = SocketAddr::new(IpAddr::from(Ipv4Addr::new(0, 0, 0, 0)), 0);
let s = UdpSocket::bind(&local_addr)?;
let svr_cfg = Arc::new(cloned_config.server[idx].clone());
let svr_cfg = Arc::new(cloned_context.config().server[idx].clone());
u.push((Arc::new(Mutex::new(s)), svr_cfg, svr_addr[0]));
}
Ok(u)
@@ -176,9 +176,13 @@ fn listen(config: Arc<Config>, l: UdpSocket) -> impl Future<Item = (), Error = i
.and_then(move |vec_servers| {
let mut f = Vec::with_capacity(1 + vec_servers.len());
let l = Arc::new(Mutex::new(l));
f.push(boxed_future(handle_l2r(config, l.clone(), vec_servers.clone())));
f.push(boxed_future(handle_l2r(
context.clone(),
l.clone(),
vec_servers.clone(),
)));
for (svr, cfg, _) in vec_servers {
f.push(boxed_future(handle_r2l(l.clone(), svr, cfg)))
f.push(boxed_future(handle_r2l(context.clone(), l.clone(), svr, cfg)))
}
futures_unordered(f).into_future().then(|res| match res {
Ok(..) => {
@@ -194,12 +198,8 @@ fn listen(config: Arc<Config>, l: UdpSocket) -> impl Future<Item = (), Error = i
})
}
lazy_static! {
static ref GLOBAL_QUERY_ADDR: Mutex<LruCache<u16, (SocketAddr, Instant)>> = Mutex::new(LruCache::new(1024));
}
fn handle_l2r(
config: Arc<Config>,
context: SharedContext,
l: SharedUdpSocket,
server: Vec<(SharedUdpSocket, Arc<ServerConfig>, SocketAddr)>,
) -> impl Future<Item = (), Error = io::Error> + Send {
@@ -210,7 +210,8 @@ fn handle_l2r(
PacketStream::new(l).for_each(move |(payload, src)| {
let server = server.clone();
let config = config.clone();
let context = context.clone();
let context2 = context.clone();
let pkt_fut = futures::lazy(move || {
let pkt = Packet::parse(&payload[..]).map_err(|err| {
error!("Failed to parse DNS payload, err: {}", err);
@@ -225,7 +226,7 @@ fn handle_l2r(
trace!("DETAIL {} -> {} {:?}", src, svr_cfg.addr(), pkt);
let mut buf = Vec::new();
Address::SocketAddress(config.get_remote_dns()).write_to_buf(&mut buf);
Address::SocketAddress(context.config().get_remote_dns()).write_to_buf(&mut buf);
buf.extend_from_slice(&payload);
@@ -236,7 +237,7 @@ fn handle_l2r(
})
.and_then(move |(socket, svr_addr, send_payload, id)| {
SendDgramRc::new(socket, send_payload, svr_addr).map(move |_| {
GLOBAL_QUERY_ADDR.lock().unwrap().insert(id, (src, Instant::now()));
context2.dns_query_cache().insert(id, (src, Instant::now()));
})
});
tokio::spawn(pkt_fut.then(|res| match res {
@@ -252,6 +253,7 @@ fn handle_l2r(
}
fn handle_r2l(
context: SharedContext,
l: SharedUdpSocket,
r: SharedUdpSocket,
svr_cfg: Arc<ServerConfig>,
@@ -259,6 +261,7 @@ fn handle_r2l(
PacketStream::new(r).for_each(move |(payload, src)| {
let l = l.clone();
let svr_cfg = svr_cfg.clone();
let context = context.clone();
let pkt_fut = futures::lazy(move || decrypt_payload(svr_cfg.method(), svr_cfg.key(), &payload))
.and_then(move |payload| Address::read_from(Cursor::new(payload)).map_err(From::from))
.and_then(move |(cur, ..)| {
@@ -272,7 +275,8 @@ fn handle_r2l(
let payload = payload[pos..].to_vec();
match GLOBAL_QUERY_ADDR.lock().unwrap().remove(&pkt.header.id) {
let mut cache = context.dns_query_cache();
match cache.remove(&pkt.header.id) {
Some((cli_addr, start_time)) => {
debug!(
"DNS {} <- {} {} elapsed: {:?}",

View File

@@ -11,7 +11,8 @@ use futures::{self, Future, Stream};
use tokio::{self, net::UdpSocket, util::FutureExt};
use config::{Config, ServerAddr, ServerConfig};
use config::{ServerAddr, ServerConfig};
use context::SharedContext;
use relay::{
boxed_future,
dns_resolver::resolve,
@@ -28,7 +29,7 @@ use super::{
/// Resolves server address to SocketAddr
fn resolve_server_addr(
config: Arc<Config>,
context: SharedContext,
svr_cfg: Arc<ServerConfig>,
) -> impl Future<Item = SocketAddr, Error = io::Error> + Send {
match *svr_cfg.addr() {
@@ -36,7 +37,7 @@ fn resolve_server_addr(
ServerAddr::SocketAddr(ref addr) => boxed_future(futures::finished(*addr)),
// Resolve domain name to SocketAddr
ServerAddr::DomainName(ref dname, port) => {
let fut = resolve(config, dname, port, false).map(move |vec_ipaddr| {
let fut = resolve(context, dname, port, false).map(move |vec_ipaddr| {
assert!(!vec_ipaddr.is_empty());
vec_ipaddr[0]
});
@@ -45,16 +46,16 @@ fn resolve_server_addr(
}
}
fn listen(config: Arc<Config>, l: UdpSocket) -> impl Future<Item = (), Error = io::Error> + Send {
fn listen(context: SharedContext, l: UdpSocket) -> impl Future<Item = (), Error = io::Error> + Send {
let socket = Arc::new(Mutex::new(l));
let mut balancer = RoundRobin::new(&*config);
let mut balancer = RoundRobin::new(context.config());
PacketStream::new(socket.clone()).for_each(move |(pkt, src)| {
let svr_cfg = balancer.pick_server();
let svr_cfg_cloned = svr_cfg.clone();
let svr_cfg_cloned_cloned = svr_cfg.clone();
let socket = socket.clone();
let config = config.clone();
let context = context.clone();
let timeout = *svr_cfg.udp_timeout();
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
@@ -76,7 +77,7 @@ fn listen(config: Arc<Config>, l: UdpSocket) -> impl Future<Item = (), Error = i
let mut payload = Vec::new();
cur.read_to_end(&mut payload).unwrap();
resolve_server_addr(config, svr_cfg)
resolve_server_addr(context, svr_cfg)
.and_then(|remote_addr| {
let local_addr = SocketAddr::new(IpAddr::from(Ipv4Addr::new(0, 0, 0, 0)), 0);
UdpSocket::bind(&local_addr).map(|remote_udp| (remote_udp, remote_addr))
@@ -167,13 +168,13 @@ fn listen(config: Arc<Config>, l: UdpSocket) -> impl Future<Item = (), Error = i
}
/// Starts a UDP local server
pub fn run(config: Arc<Config>) -> impl Future<Item = (), Error = io::Error> + Send {
let local_addr = *config.local.as_ref().unwrap();
pub fn run(context: SharedContext) -> impl Future<Item = (), Error = io::Error> + Send {
let local_addr = *context.config().local.as_ref().unwrap();
futures::lazy(move || {
info!("ShadowSocks UDP Listening on {}", local_addr);
UdpSocket::bind(&local_addr)
})
.and_then(move |l| listen(config, l))
.and_then(move |l| listen(context, l))
}

View File

@@ -11,7 +11,8 @@ use futures::{self, stream::futures_unordered, Future, Stream};
use tokio::{self, net::UdpSocket, util::FutureExt};
use config::{Config, ServerConfig};
use config::ServerConfig;
use context::SharedContext;
use relay::{boxed_future, dns_resolver::resolve, socks5::Address};
use super::{
@@ -21,10 +22,13 @@ use super::{
MAXIMUM_UDP_PAYLOAD_SIZE,
};
fn resolve_remote_addr(config: Arc<Config>, addr: Address) -> impl Future<Item = SocketAddr, Error = io::Error> + Send {
fn resolve_remote_addr(
context: SharedContext,
addr: Address,
) -> impl Future<Item = SocketAddr, Error = io::Error> + Send {
match addr {
Address::SocketAddress(s) => {
if config.forbidden_ip.contains(&s.ip()) {
if context.config().forbidden_ip.contains(&s.ip()) {
let err = io::Error::new(
ErrorKind::Other,
format!("{} is forbidden, failed to connect {}", s.ip(), s),
@@ -35,7 +39,7 @@ fn resolve_remote_addr(config: Arc<Config>, addr: Address) -> impl Future<Item =
boxed_future(futures::finished(s))
}
Address::DomainNameAddress(dname, port) => {
let fut = resolve(config, &dname, port, true).map(move |vec_ipaddr| {
let fut = resolve(context, &dname, port, true).map(move |vec_ipaddr| {
assert!(!vec_ipaddr.is_empty());
vec_ipaddr[0]
});
@@ -44,7 +48,7 @@ fn resolve_remote_addr(config: Arc<Config>, addr: Address) -> impl Future<Item =
}
}
fn listen(config: Arc<Config>, svr_cfg: Arc<ServerConfig>) -> impl Future<Item = (), Error = io::Error> + Send {
fn listen(context: SharedContext, svr_cfg: Arc<ServerConfig>) -> impl Future<Item = (), Error = io::Error> + Send {
let listen_addr = *svr_cfg.addr().listen_addr();
info!("ShadowSocks UDP listening on {}", listen_addr);
futures::lazy(move || UdpSocket::bind(&listen_addr)).and_then(move |socket| {
@@ -53,7 +57,7 @@ fn listen(config: Arc<Config>, svr_cfg: Arc<ServerConfig>) -> impl Future<Item =
let svr_cfg = svr_cfg.clone();
let svr_cfg_cloned = svr_cfg.clone();
let socket = socket.clone();
let config = config.clone();
let context = context.clone();
let timeout = svr_cfg.timeout();
let rel = futures::lazy(move || decrypt_payload(svr_cfg.method(), svr_cfg.key(), &pkt))
.and_then(move |payload| {
@@ -74,7 +78,7 @@ fn listen(config: Arc<Config>, svr_cfg: Arc<ServerConfig>) -> impl Future<Item =
UdpSocket::bind(&local_addr).map(|remote_udp| (remote_udp, addr, body))
})
.and_then(|(remote_udp, addr, body)| {
resolve_remote_addr(config, addr.clone())
resolve_remote_addr(context, addr.clone())
.and_then(|addr| remote_udp.send_dgram(body, &addr))
.map(|(remote_udp, _)| (remote_udp, addr))
})
@@ -135,13 +139,13 @@ fn listen(config: Arc<Config>, svr_cfg: Arc<ServerConfig>) -> impl Future<Item =
}
/// Starts a UDP relay server
pub fn run(config: Arc<Config>) -> impl Future<Item = (), Error = io::Error> + Send {
pub fn run(context: SharedContext) -> impl Future<Item = (), Error = io::Error> + Send {
let mut vec_fut = Vec::new();
for svr in &config.server {
for svr in &context.config().server {
let svr_cfg = Arc::new(svr.clone());
let svr_fut = listen(config.clone(), svr_cfg);
let svr_fut = listen(context.clone(), svr_cfg);
vec_fut.push(boxed_future(svr_fut));
}

View File

@@ -39,6 +39,9 @@ fn dns_relay() {
let server_cfg = Config::load_from_str(CONFIG, ConfigType::Server).unwrap();
let dns_cfg = Config::load_from_str(CONFIG, ConfigType::Local).unwrap();
trace!("Server: {:?}", server_cfg);
trace!("Client: {:?}", dns_cfg);
thread::spawn(move || {
let mut runtime = Runtime::new().expect("Failed to create Runtime");
runtime.block_on(run_server(server_cfg)).unwrap();