Remove support of multithread

This commit is contained in:
Y. T. Chung
2017-05-30 23:36:58 +08:00
parent b44c0c778e
commit 964d2d4f33
12 changed files with 219 additions and 382 deletions

3
Cargo.lock generated
View File

@@ -1,6 +1,6 @@
[root]
name = "shadowsocks-rust"
version = "1.3.3"
version = "1.4.0"
dependencies = [
"argon2rs 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)",
"base64 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -15,7 +15,6 @@ dependencies = [
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"lru_time_cache 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"net2 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)",
"num_cpus 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"openssl 0.9.12 (registry+https://github.com/rust-lang/crates.io-index)",
"qrcode 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@@ -1,6 +1,6 @@
[package]
name = "shadowsocks-rust"
version = "1.3.3"
version = "1.4.0"
authors = ["Y. T. CHUNG <zonyitoo@gmail.com>"]
description = "shadowsocks is a fast tunnel proxy that helps you bypass firewalls."
repository = "https://github.com/zonyitoo/shadowsocks-rust"
@@ -43,7 +43,6 @@ libc = "0.2"
futures = "0.1"
tokio-core = "0.1"
tokio-io = "0.1"
net2 = "0.2"
num_cpus = "1.1"
lazy_static = "0.2"
serde_json = "0.9"

View File

@@ -1 +1 @@
max_width = 200
max_width = 120

View File

@@ -17,7 +17,6 @@ use clap::{App, Arg};
use std::net::SocketAddr;
use std::env;
use std::thread;
use env_logger::LogBuilder;
use log::{LogRecord, LogLevelFilter};
@@ -30,43 +29,38 @@ fn main() {
.author("Y. T. Chung <zonyitoo@gmail.com>")
.about("A fast tunnel proxy that helps you bypass firewalls.")
.arg(Arg::with_name("VERBOSE")
.short("v")
.multiple(true)
.help("Set the level of debug"))
.short("v")
.multiple(true)
.help("Set the level of debug"))
.arg(Arg::with_name("ENABLE_UDP")
.short("u")
.long("enable-udp")
.help("Enable UDP relay"))
.short("u")
.long("enable-udp")
.help("Enable UDP relay"))
.arg(Arg::with_name("CONFIG")
.short("c")
.long("config")
.takes_value(true)
.help("Specify config file"))
.short("c")
.long("config")
.takes_value(true)
.help("Specify config file"))
.arg(Arg::with_name("SERVER_ADDR")
.short("s")
.long("server-addr")
.takes_value(true)
.help("Server address"))
.short("s")
.long("server-addr")
.takes_value(true)
.help("Server address"))
.arg(Arg::with_name("LOCAL_ADDR")
.short("b")
.long("local-addr")
.takes_value(true)
.help("Local address, listen only to this address if specified"))
.short("b")
.long("local-addr")
.takes_value(true)
.help("Local address, listen only to this address if specified"))
.arg(Arg::with_name("PASSWORD")
.short("k")
.long("password")
.takes_value(true)
.help("Password"))
.short("k")
.long("password")
.takes_value(true)
.help("Password"))
.arg(Arg::with_name("ENCRYPT_METHOD")
.short("m")
.long("encrypt-method")
.takes_value(true)
.help("Encryption method"))
.arg(Arg::with_name("THREADS")
.short("t")
.long("threads")
.takes_value(true)
.help("Number of worker threads (defaults to number of CPUs)"))
.short("m")
.long("encrypt-method")
.takes_value(true)
.help("Encryption method"))
.get_matches();
let mut log_builder = LogBuilder::new();
@@ -77,52 +71,54 @@ fn main() {
0 => {
// Default filter
log_builder.format(|record: &LogRecord| {
format!("[{}][{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.args())
});
format!("[{}][{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.args())
});
}
1 => {
let mut log_builder = log_builder.format(|record: &LogRecord| {
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
log_builder.filter(Some("sslocal"), LogLevelFilter::Debug);
}
2 => {
let mut log_builder = log_builder.format(|record: &LogRecord| {
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
log_builder.filter(Some("sslocal"), LogLevelFilter::Debug)
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
log_builder
.filter(Some("sslocal"), LogLevelFilter::Debug)
.filter(Some("shadowsocks"), LogLevelFilter::Debug);
}
3 => {
let mut log_builder = log_builder.format(|record: &LogRecord| {
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
log_builder.filter(Some("sslocal"), LogLevelFilter::Trace)
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
log_builder
.filter(Some("sslocal"), LogLevelFilter::Trace)
.filter(Some("shadowsocks"), LogLevelFilter::Trace);
}
_ => {
let mut log_builder = log_builder.format(|record: &LogRecord| {
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
log_builder.filter(None, LogLevelFilter::Trace);
}
}
@@ -153,17 +149,15 @@ fn main() {
let mut has_provided_server_config = false;
if matches.value_of("SERVER_ADDR").is_some() && matches.value_of("PASSWORD").is_some() &&
matches.value_of("ENCRYPT_METHOD").is_some() {
let (svr_addr, password, method) = matches.value_of("SERVER_ADDR")
.and_then(|svr_addr| {
matches.value_of("PASSWORD")
.map(|pwd| (svr_addr, pwd))
})
if matches.value_of("SERVER_ADDR").is_some() && matches.value_of("PASSWORD").is_some() && matches.value_of("ENCRYPT_METHOD").is_some() {
let (svr_addr, password, method) = matches
.value_of("SERVER_ADDR")
.and_then(|svr_addr| matches.value_of("PASSWORD").map(|pwd| (svr_addr, pwd)))
.and_then(|(svr_addr, pwd)| {
matches.value_of("ENCRYPT_METHOD")
.map(|m| (svr_addr, pwd, m))
})
matches
.value_of("ENCRYPT_METHOD")
.map(|m| (svr_addr, pwd, m))
})
.unwrap();
let method = match method.parse() {
@@ -180,18 +174,17 @@ fn main() {
config.server.push(sc);
has_provided_server_config = true;
} else if matches.value_of("SERVER_ADDR").is_none() && matches.value_of("PASSWORD").is_none() &&
matches.value_of("ENCRYPT_METHOD").is_none() {
} else if matches.value_of("SERVER_ADDR").is_none() && matches.value_of("PASSWORD").is_none() && matches.value_of("ENCRYPT_METHOD").is_none() {
// Does not provide server config
} else {
panic!("`server-addr`, `method` and `password` should be provided together");
}
let has_provided_local_config = if matches.value_of("LOCAL_ADDR").is_some() {
let local_addr = matches.value_of("LOCAL_ADDR")
.unwrap();
let local_addr = matches.value_of("LOCAL_ADDR").unwrap();
let local_addr: SocketAddr = local_addr.parse()
let local_addr: SocketAddr = local_addr
.parse()
.expect("`local-addr` is not a valid IP address");
config.local = Some(local_addr);
@@ -212,13 +205,5 @@ fn main() {
debug!("Config: {:?}", config);
let threads = matches.value_of("THREADS").map(|m| m.parse::<usize>().unwrap()).unwrap_or_else(num_cpus::get);
debug!("Threads: {}", threads);
for _ in 1..threads {
let cloned_config = config.clone();
thread::spawn(move || run_local(cloned_config).unwrap());
}
run_local(config).unwrap();
}

View File

@@ -16,7 +16,6 @@ extern crate time;
extern crate num_cpus;
use std::env;
use std::thread;
use clap::{App, Arg};
@@ -31,48 +30,43 @@ fn main() {
.author("Y. T. Chung <zonyitoo@gmail.com>")
.about("A fast tunnel proxy that helps you bypass firewalls.")
.arg(Arg::with_name("VERBOSE")
.short("v")
.multiple(true)
.help("Set the level of debug"))
.short("v")
.multiple(true)
.help("Set the level of debug"))
.arg(Arg::with_name("ENABLE_UDP")
.short("u")
.long("enable-udp")
.help("Enable UDP relay"))
.short("u")
.long("enable-udp")
.help("Enable UDP relay"))
.arg(Arg::with_name("CONFIG")
.short("c")
.long("config")
.takes_value(true)
.help("Specify config file"))
.short("c")
.long("config")
.takes_value(true)
.help("Specify config file"))
.arg(Arg::with_name("SERVER_ADDR")
.short("s")
.long("server-addr")
.takes_value(true)
.help("Server address"))
.short("s")
.long("server-addr")
.takes_value(true)
.help("Server address"))
.arg(Arg::with_name("LOCAL_ADDR")
.short("b")
.long("local-addr")
.takes_value(true)
.help("Local address, listen only to this address if specified"))
.short("b")
.long("local-addr")
.takes_value(true)
.help("Local address, listen only to this address if specified"))
.arg(Arg::with_name("LOCAL_PORT")
.short("l")
.long("local-port")
.takes_value(true)
.help("Local port"))
.short("l")
.long("local-port")
.takes_value(true)
.help("Local port"))
.arg(Arg::with_name("PASSWORD")
.short("k")
.long("password")
.takes_value(true)
.help("Password"))
.short("k")
.long("password")
.takes_value(true)
.help("Password"))
.arg(Arg::with_name("ENCRYPT_METHOD")
.short("m")
.long("encrypt-method")
.takes_value(true)
.help("Encryption method"))
.arg(Arg::with_name("THREADS")
.short("t")
.long("threads")
.takes_value(true)
.help("Number of worker threads (defaults to number of CPUs)"))
.short("m")
.long("encrypt-method")
.takes_value(true)
.help("Encryption method"))
.get_matches();
let mut log_builder = LogBuilder::new();
@@ -83,52 +77,54 @@ fn main() {
0 => {
// Default filter
log_builder.format(|record: &LogRecord| {
format!("[{}][{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.args())
});
format!("[{}][{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.args())
});
}
1 => {
let mut log_builder = log_builder.format(|record: &LogRecord| {
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
log_builder.filter(Some("ssserver"), LogLevelFilter::Debug);
}
2 => {
let mut log_builder = log_builder.format(|record: &LogRecord| {
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
log_builder.filter(Some("ssserver"), LogLevelFilter::Debug)
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
log_builder
.filter(Some("ssserver"), LogLevelFilter::Debug)
.filter(Some("shadowsocks"), LogLevelFilter::Debug);
}
3 => {
let mut log_builder = log_builder.format(|record: &LogRecord| {
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
log_builder.filter(Some("ssserver"), LogLevelFilter::Trace)
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
log_builder
.filter(Some("ssserver"), LogLevelFilter::Trace)
.filter(Some("shadowsocks"), LogLevelFilter::Trace);
}
_ => {
let mut log_builder = log_builder.format(|record: &LogRecord| {
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
format!("[{}][{}] [{}] {}",
time::now().strftime("%Y-%m-%d][%H:%M:%S").unwrap(),
record.level(),
record.location().module_path(),
record.args())
});
log_builder.filter(None, LogLevelFilter::Trace);
}
}
@@ -158,17 +154,15 @@ fn main() {
let mut has_provided_server_config = false;
if matches.value_of("SERVER_ADDR").is_some() && matches.value_of("PASSWORD").is_some() &&
matches.value_of("ENCRYPT_METHOD").is_some() {
let (svr_addr, password, method) = matches.value_of("SERVER_ADDR")
.and_then(|svr_addr| {
matches.value_of("PASSWORD")
.map(|pwd| (svr_addr, pwd))
})
if matches.value_of("SERVER_ADDR").is_some() && matches.value_of("PASSWORD").is_some() && matches.value_of("ENCRYPT_METHOD").is_some() {
let (svr_addr, password, method) = matches
.value_of("SERVER_ADDR")
.and_then(|svr_addr| matches.value_of("PASSWORD").map(|pwd| (svr_addr, pwd)))
.and_then(|(svr_addr, pwd)| {
matches.value_of("ENCRYPT_METHOD")
.map(|m| (svr_addr, pwd, m))
})
matches
.value_of("ENCRYPT_METHOD")
.map(|m| (svr_addr, pwd, m))
})
.unwrap();
let method = match method.parse() {
@@ -185,8 +179,7 @@ fn main() {
config.server.push(sc);
has_provided_server_config = true;
} else if matches.value_of("SERVER_ADDR").is_none() && matches.value_of("PASSWORD").is_none() &&
matches.value_of("ENCRYPT_METHOD").is_none() {
} else if matches.value_of("SERVER_ADDR").is_none() && matches.value_of("PASSWORD").is_none() && matches.value_of("ENCRYPT_METHOD").is_none() {
// Does not provide server config
} else {
panic!("`server-addr`, `method` and `password` should be provided together");
@@ -204,13 +197,5 @@ fn main() {
debug!("Config: {:?}", config);
let threads = matches.value_of("THREADS").map(|m| m.parse::<usize>().unwrap()).unwrap_or_else(num_cpus::get);
debug!("Threads: {}", threads);
for _ in 1..threads {
let cloned_config = config.clone();
thread::spawn(move || run_server(cloned_config).unwrap());
}
run_server(config).unwrap();
}

View File

@@ -89,7 +89,6 @@ extern crate futures;
extern crate tokio_core;
#[macro_use]
extern crate tokio_io;
extern crate net2;
extern crate domain;
extern crate bytes;

View File

@@ -24,8 +24,6 @@ use futures::{self, Future, Poll};
use bytes::{BufMut, BytesMut};
use net2::TcpBuilder;
pub use self::crypto_io::{DecryptedRead, EncryptedWrite};
use self::stream::{DecryptedReader as StreamDecryptedReader, EncryptedWriter as StreamEncryptedWriter};
@@ -194,10 +192,7 @@ fn connect_proxy_server(svr_cfg: Rc<ServerConfig>) -> BoxIoFuture<TcpStream> {
}
/// Handshake logic for ShadowSocks Client
pub fn proxy_server_handshake(remote_stream: TcpStream,
svr_cfg: Rc<ServerConfig>,
relay_addr: Address)
-> BoxIoFuture<(DecryptedHalfFut, EncryptedHalfFut)> {
pub fn proxy_server_handshake(remote_stream: TcpStream, svr_cfg: Rc<ServerConfig>, relay_addr: Address) -> BoxIoFuture<(DecryptedHalfFut, EncryptedHalfFut)> {
let timeout = *svr_cfg.timeout();
let fut = proxy_handshake(remote_stream, svr_cfg).and_then(move |(r_fut, w_fut)| {;
let w_fut = w_fut.and_then(move |enc_w| {
@@ -218,9 +213,7 @@ pub fn proxy_server_handshake(remote_stream: TcpStream,
/// ShadowSocks Client-Server handshake protocol
/// Exchange cipher IV and creates stream wrapper
pub fn proxy_handshake(remote_stream: TcpStream,
svr_cfg: Rc<ServerConfig>)
-> BoxIoFuture<(DecryptedHalfFut, EncryptedHalfFut)> {
pub fn proxy_handshake(remote_stream: TcpStream, svr_cfg: Rc<ServerConfig>) -> BoxIoFuture<(DecryptedHalfFut, EncryptedHalfFut)> {
let fut = futures::lazy(|| Ok(remote_stream.split())).and_then(move |(r, w)| {
let timeout = svr_cfg.timeout().clone();
@@ -247,22 +240,17 @@ pub fn proxy_handshake(remote_stream: TcpStream,
};
Context::with(|ctx| {
try_timeout(write_all(w, prev_buf), timeout, ctx.handle())
.and_then(move |(w, prev_buf)| match svr_cfg.method().category() {
CipherCategory::Stream => {
let local_iv = prev_buf;
Ok(From::from(StreamEncryptedWriter::new(w,
svr_cfg.method(),
svr_cfg.key(),
&local_iv)))
}
CipherCategory::Aead => {
let local_salt = prev_buf;
let wtr =
AeadEncryptedWriter::new(w, svr_cfg.method(), svr_cfg.key(), &local_salt[..]);
Ok(From::from(wtr))
}
})
try_timeout(write_all(w, prev_buf), timeout, ctx.handle()).and_then(move |(w, prev_buf)| match svr_cfg.method().category() {
CipherCategory::Stream => {
let local_iv = prev_buf;
Ok(From::from(StreamEncryptedWriter::new(w, svr_cfg.method(), svr_cfg.key(), &local_iv)))
}
CipherCategory::Aead => {
let local_salt = prev_buf;
let wtr = AeadEncryptedWriter::new(w, svr_cfg.method(), svr_cfg.key(), &local_salt[..]);
Ok(From::from(wtr))
}
})
})
};
@@ -278,20 +266,18 @@ pub fn proxy_handshake(remote_stream: TcpStream,
};
Context::with(|ctx| {
try_timeout(read_exact(r, vec![0u8; prev_len]), timeout, ctx.handle())
.and_then(move |(r, remote_iv)| match svr_cfg.method().category() {
CipherCategory::Stream => {
trace!("Got initialize vector {:?}", remote_iv);
let decrypt_stream =
StreamDecryptedReader::new(r, svr_cfg.method(), svr_cfg.key(), &remote_iv);
Ok(From::from(decrypt_stream))
}
CipherCategory::Aead => {
trace!("Got salt {:?}", remote_iv);
let dr = AeadDecryptedReader::new(r, svr_cfg.method(), svr_cfg.key(), &remote_iv);
Ok(From::from(dr))
}
})
try_timeout(read_exact(r, vec![0u8; prev_len]), timeout, ctx.handle()).and_then(move |(r, remote_iv)| match svr_cfg.method().category() {
CipherCategory::Stream => {
trace!("Got initialize vector {:?}", remote_iv);
let decrypt_stream = StreamDecryptedReader::new(r, svr_cfg.method(), svr_cfg.key(), &remote_iv);
Ok(From::from(decrypt_stream))
}
CipherCategory::Aead => {
trace!("Got salt {:?}", remote_iv);
let dr = AeadDecryptedReader::new(r, svr_cfg.method(), svr_cfg.key(), &remote_iv);
Ok(From::from(dr))
}
})
})
};
@@ -398,17 +384,6 @@ pub fn ignore_until_end<R: Read>(r: R) -> IgnoreUntilEnd<R> {
IgnoreUntilEnd::Pending { r: r, amt: 0 }
}
#[cfg(unix)]
fn reuse_port(builder: &TcpBuilder) -> io::Result<&TcpBuilder> {
use net2::unix::UnixTcpBuilderExt;
builder.reuse_port(true)
}
#[cfg(windows)]
fn reuse_port(builder: &TcpBuilder) -> io::Result<&TcpBuilder> {
Ok(builder)
}
fn try_timeout<T, F>(fut: F, dur: Option<Duration>, handle: &Handle) -> BoxIoFuture<T>
where F: Future<Item = T, Error = io::Error> + 'static,
T: 'static

View File

@@ -21,8 +21,6 @@ use tokio_core::net::{TcpStream, TcpListener};
use tokio_io::io::{ReadHalf, WriteHalf};
use tokio_io::AsyncRead;
use net2::TcpBuilder;
use super::{tunnel, proxy_handshake, DecryptedHalf, EncryptedHalfFut, try_timeout};
/// Context for doing handshake with client
@@ -174,21 +172,7 @@ pub fn run() -> Box<Future<Item = (), Error = io::Error>> {
let addr = svr_cfg.addr();
let addr = addr.listen_addr();
let tcp_builder = match *addr {
SocketAddr::V4(..) => TcpBuilder::new_v4(),
SocketAddr::V6(..) => TcpBuilder::new_v6(),
}
.unwrap_or_else(|err| panic!("Failed to create listener, {}", err));
super::reuse_port(&tcp_builder)
.and_then(|builder| builder.reuse_address(true))
.and_then(|builder| builder.bind(addr))
.unwrap_or_else(|err| panic!("Failed to bind {}, {}", addr, err));
let listener = tcp_builder
.listen(1024)
.and_then(|l| TcpListener::from_listener(l, addr, ctx.handle()))
.unwrap_or_else(|err| panic!("Failed to listen, {}", err));
let listener = TcpListener::bind(&addr, ctx.handle()).unwrap_or_else(|err| panic!("Failed to listen, {}", err));
info!("ShadowSocks TCP Listening on {}", addr);
listener

View File

@@ -12,8 +12,6 @@ use tokio_io::AsyncRead;
use tokio_io::io::{ReadHalf, WriteHalf};
use tokio_io::io::flush;
use net2::TcpBuilder;
use config::ServerConfig;
use relay::socks5::{self, HandshakeRequest, HandshakeResponse, Address};
@@ -106,12 +104,11 @@ fn handle_socks5_client(s: TcpStream, conf: Rc<ServerConfig>, udp_conf: UdpConfi
if !req.methods.contains(&socks5::SOCKS5_AUTH_METHOD_NONE) {
let resp = HandshakeResponse::new(socks5::SOCKS5_AUTH_METHOD_NOT_ACCEPTABLE);
let fut = resp.write_to(w)
.then(|_| {
warn!("Currently shadowsocks-rust does not support authentication");
Err(io::Error::new(io::ErrorKind::Other,
"Currently shadowsocks-rust does not support authentication"))
});
let fut = resp.write_to(w).then(|_| {
warn!("Currently shadowsocks-rust does not support authentication");
Err(io::Error::new(io::ErrorKind::Other,
"Currently shadowsocks-rust does not support authentication"))
});
boxed_future(fut)
} else {
// Reply to client
@@ -193,53 +190,16 @@ fn handle_socks5_client(s: TcpStream, conf: Rc<ServerConfig>, udp_conf: UdpConfi
/// Starts a TCP local server with Socks5 proxy protocol
pub fn run() -> Box<Future<Item = (), Error = io::Error>> {
// let (listener, local_addr) = {
// let local_addr = config.local.as_ref().unwrap();
// let tcp_builder = match *local_addr {
// SocketAddr::V4(..) => TcpBuilder::new_v4(),
// SocketAddr::V6(..) => TcpBuilder::new_v6(),
// }
// .unwrap_or_else(|err| panic!("Failed to create listener, {}", err));
// super::reuse_port(&tcp_builder)
// .and_then(|builder| builder.reuse_address(true))
// .and_then(|builder| builder.bind(local_addr))
// .unwrap_or_else(|err| panic!("Failed to bind {}, {}", local_addr, err));
// let listener = tcp_builder
// .listen(1024)
// .and_then(|l| TcpListener::from_listener(l, local_addr, &handle))
// .unwrap_or_else(|err| panic!("Failed to listen, {}", err));
// info!("ShadowSocks TCP Listening on {}", local_addr);
// (listener, *local_addr)
// };
let (listener, local_addr) = Context::with(|ctx| {
let config = &ctx.config;
let handle = &ctx.handle;
let local_addr = config.local.as_ref().unwrap();
let tcp_builder = match *local_addr {
SocketAddr::V4(..) => TcpBuilder::new_v4(),
SocketAddr::V6(..) => TcpBuilder::new_v6(),
}
.unwrap_or_else(|err| panic!("Failed to create listener, {}", err));
super::reuse_port(&tcp_builder)
.and_then(|builder| builder.reuse_address(true))
.and_then(|builder| builder.bind(local_addr))
.unwrap_or_else(|err| panic!("Failed to bind {}, {}", local_addr, err));
let listener = tcp_builder
.listen(1024)
.and_then(|l| TcpListener::from_listener(l, local_addr, &handle))
.unwrap_or_else(|err| panic!("Failed to listen, {}", err));
let l = TcpListener::bind(&local_addr, &handle).unwrap_or_else(|err| panic!("Failed to listen, {}", err));
info!("ShadowSocks TCP Listening on {}", local_addr);
(listener, *local_addr)
(l, *local_addr)
});
let udp_conf = UdpConfig {
@@ -248,14 +208,12 @@ pub fn run() -> Box<Future<Item = (), Error = io::Error>> {
};
let mut servers = Context::with(|ctx| RoundRobin::new(ctx.config()));
let listening = listener
.incoming()
.for_each(move |(socket, addr)| {
let server_cfg = servers.pick_server();
trace!("Got connection, addr: {}", addr);
trace!("Picked proxy server: {:?}", server_cfg);
handle_socks5_client(socket, server_cfg, udp_conf.clone())
});
let listening = listener.incoming().for_each(move |(socket, addr)| {
let server_cfg = servers.pick_server();
trace!("Got connection, addr: {}", addr);
trace!("Picked proxy server: {:?}", server_cfg);
handle_socks5_client(socket, server_cfg, udp_conf.clone())
});
Box::new(listening.map_err(|err| {
error!("Socks5 server run failed: {}", err);

View File

@@ -10,8 +10,6 @@ use futures::{self, Future};
use tokio_core::net::UdpSocket;
use net2::UdpBuilder;
use lru_cache::LruCache;
use config::{ServerConfig, ServerAddr};
@@ -186,7 +184,8 @@ impl Client {
})
.and_then(move |payload| -> io::Result<_> {
// Encrypt the whole body as payload
encrypt_payload(svr_cfg.method(), svr_cfg.key(), &payload).map(move |b| (svr_cfg, b))
encrypt_payload(svr_cfg.method(), svr_cfg.key(), &payload)
.map(move |b| (svr_cfg, b))
})
.map_err(From::from)
.and_then(move |(svr_cfg, payload)| {
@@ -199,17 +198,15 @@ impl Client {
svrs_ref.insert(addr, svr_cfg.clone());
}
socket
.send_dgram(payload, addr)
.map(|(socket, body)| {
trace!("Sent body, size: {}", body.len());
Client {
assoc: assoc,
server_picker: server_picker,
servers: servers,
socket: socket,
}
})
socket.send_dgram(payload, addr).map(|(socket, body)| {
trace!("Sent body, size: {}", body.len());
Client {
assoc: assoc,
server_picker: server_picker,
servers: servers,
socket: socket,
}
})
})
})
});
@@ -258,12 +255,13 @@ fn handle_client(client: Client) -> BoxIoFuture<()> {
fn listen(l: UdpSocket) -> BoxIoFuture<()> {
let assoc = Rc::new(RefCell::new(AssociateMap::new(MAXIMUM_ASSOCIATE_MAP_SIZE)));
let (server_picker, servers) = Context::with(|ctx| {
let config = ctx.config();
let server_picker = Rc::new(RefCell::new(RoundRobin::new(&*config)));
let servers = Rc::new(RefCell::new(ServerCache::new(config.server.len())));
(server_picker, servers)
});
let (server_picker, servers) =
Context::with(|ctx| {
let config = ctx.config();
let server_picker = Rc::new(RefCell::new(RoundRobin::new(&*config)));
let servers = Rc::new(RefCell::new(ServerCache::new(config.server.len())));
(server_picker, servers)
});
let c = Client {
@@ -280,25 +278,13 @@ fn listen(l: UdpSocket) -> BoxIoFuture<()> {
/// Starts a UDP local server
pub fn run() -> BoxIoFuture<()> {
let fut = futures::lazy(|| {
Context::with(|ctx| {
let local_addr = ctx.config().local.as_ref().unwrap();
let udp_builder = match *local_addr {
SocketAddr::V4(..) => UdpBuilder::new_v4(),
SocketAddr::V6(..) => UdpBuilder::new_v6(),
}
.unwrap_or_else(|err| panic!("Failed to create socket, {}", err));
Context::with(|ctx| {
let local_addr = ctx.config().local.as_ref().unwrap();
info!("ShadowSocks UDP Listening on {}", local_addr);
super::reuse_port(&udp_builder)
.and_then(|b| b.reuse_address(true))
.unwrap_or_else(|err| panic!("Failed to set reuse {}, {}", local_addr, err));
info!("ShadowSocks UDP Listening on {}", local_addr);
udp_builder
.bind(local_addr)
.and_then(|s| UdpSocket::from_socket(s, ctx.handle()))
})
})
UdpSocket::bind(local_addr, ctx.handle())
})
})
.and_then(|l| listen(l));

View File

@@ -37,10 +37,6 @@
//! | Fixed | Variable |
//! +-------+--------------+
use std::io;
use net2::UdpBuilder;
pub mod local;
pub mod server;
@@ -55,14 +51,3 @@ pub const MAXIMUM_UDP_PAYLOAD_SIZE: usize = 65536;
/// Maximum associations to maintain
pub const MAXIMUM_ASSOCIATE_MAP_SIZE: usize = 65536;
#[cfg(unix)]
fn reuse_port(builder: &UdpBuilder) -> io::Result<&UdpBuilder> {
use net2::unix::UnixUdpBuilderExt;
builder.reuse_port(true)
}
#[cfg(windows)]
fn reuse_port(builder: &UdpBuilder) -> io::Result<&UdpBuilder> {
Ok(builder)
}

View File

@@ -10,8 +10,6 @@ use tokio_core::net::UdpSocket;
use lru_cache::LruCache;
use net2::UdpBuilder;
use config::ServerConfig;
use relay::{BoxIoFuture, boxed_future};
use relay::dns_resolver::resolve;
@@ -104,7 +102,7 @@ impl ConnectionContext {
}: Associate,
buf: Vec<u8>,
n: usize)
-> BoxIoFuture<ConnectionContext> {
-> BoxIoFuture<ConnectionContext>{
let ConnectionContext {
assoc,
svr_cfg,
@@ -197,31 +195,15 @@ fn handle_client(c: ConnectionContext) -> BoxIoFuture<()> {
fn listen(svr_cfg: Rc<ServerConfig>) -> BoxIoFuture<()> {
let listen_addr = *svr_cfg.addr().listen_addr();
info!("ShadowSocks UDP listening on {}", listen_addr);
let fut = futures::lazy(move || {
let udp_builder = match listen_addr {
SocketAddr::V4(..) => UdpBuilder::new_v4(),
SocketAddr::V6(..) => UdpBuilder::new_v6(),
}
.unwrap_or_else(|err| panic!("Failed to create socket, {}", err));
super::reuse_port(&udp_builder)
.and_then(|b| b.reuse_address(true))
.unwrap_or_else(|err| panic!("Failed to set reuse {}, {}", listen_addr, err));
Context::with(|ctx| {
udp_builder
.bind(listen_addr)
.and_then(|s| UdpSocket::from_socket(s, ctx.handle()))
})
})
.and_then(|socket| {
let c = ConnectionContext {
assoc: Rc::new(RefCell::new(AssociateMap::new(MAXIMUM_ASSOCIATE_MAP_SIZE))),
svr_cfg: svr_cfg,
socket: socket,
};
handle_client(c)
});
let fut = futures::lazy(move || Context::with(|ctx| UdpSocket::bind(&listen_addr, ctx.handle())))
.and_then(|socket| {
let c = ConnectionContext {
assoc: Rc::new(RefCell::new(AssociateMap::new(MAXIMUM_ASSOCIATE_MAP_SIZE))),
svr_cfg: svr_cfg,
socket: socket,
};
handle_client(c)
});
boxed_future(fut)
}