From 8f69141b333937ddb53686b795aef4f0dbd8ce84 Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Fri, 18 May 2018 23:21:30 +0800 Subject: [PATCH] Call tokio::run from the outermost, allow test to run tokio runtime in multiple threads --- src/bin/local.rs | 30 ++++++++++++++++++++---------- src/bin/server.rs | 10 +++++++++- src/bin/ssdns.rs | 9 ++++++++- src/relay/dns.rs | 9 +++------ src/relay/local.rs | 22 ++++++++++------------ src/relay/server.rs | 19 ++++++++----------- tests/dns.rs | 8 ++++++-- tests/socks5.rs | 15 +++++++++++---- tests/udp.rs | 15 ++++++++++++--- 9 files changed, 87 insertions(+), 50 deletions(-) diff --git a/src/bin/local.rs b/src/bin/local.rs index 5e89d9af..06295fbb 100644 --- a/src/bin/local.rs +++ b/src/bin/local.rs @@ -9,8 +9,10 @@ extern crate clap; extern crate env_logger; #[macro_use] extern crate log; +extern crate futures; extern crate shadowsocks; extern crate time; +extern crate tokio; use clap::{App, Arg}; @@ -20,6 +22,7 @@ use std::net::SocketAddr; use env_logger::fmt::Formatter; use env_logger::Builder; +use futures::Future; use log::{LevelFilter, Record}; use shadowsocks::plugin::PluginConfig; @@ -148,16 +151,18 @@ fn main() { let mut has_provided_config = false; let mut config = match matches.value_of("CONFIG") { - Some(cpath) => match Config::load_from_file(cpath, ConfigType::Local) { - Ok(cfg) => { - has_provided_config = true; - cfg + Some(cpath) => { + match Config::load_from_file(cpath, ConfigType::Local) { + Ok(cfg) => { + has_provided_config = true; + cfg + } + Err(err) => { + error!("{:?}", err); + return; + } } - Err(err) => { - error!("{:?}", err); - return; - } - }, + } None => Config::new(), }; @@ -229,5 +234,10 @@ fn main() { debug!("Config: {:?}", config); - run_local(config); + tokio::run(run_local(config).then(|res| -> Result<(), ()> { + match res { + Ok(..) => panic!("Server exited without error"), + Err(err) => panic!("Server exited with error {}", err), + } + })); } diff --git a/src/bin/server.rs b/src/bin/server.rs index 5aeaaf21..2d207731 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -11,8 +11,10 @@ extern crate clap; extern crate env_logger; #[macro_use] extern crate log; +extern crate futures; extern crate shadowsocks; extern crate time; +extern crate tokio; use std::env; use std::io::{self, Write}; @@ -21,6 +23,7 @@ use clap::{App, Arg}; use env_logger::fmt::Formatter; use env_logger::Builder; +use futures::Future; use log::{LevelFilter, Record}; use shadowsocks::plugin::PluginConfig; @@ -192,5 +195,10 @@ fn main() { debug!("Config: {:?}", config); - run_server(config); + tokio::run(run_server(config).then(|res| -> Result<(), ()> { + match res { + Ok(..) => panic!("Server exited without error"), + Err(err) => panic!("Server exited with error {}", err), + } + })); } diff --git a/src/bin/ssdns.rs b/src/bin/ssdns.rs index 72d3a116..91fcdd06 100644 --- a/src/bin/ssdns.rs +++ b/src/bin/ssdns.rs @@ -6,6 +6,7 @@ extern crate tokio; #[macro_use] extern crate log; extern crate env_logger; +extern crate futures; extern crate time; use clap::{App, Arg}; @@ -16,6 +17,7 @@ use std::net::SocketAddr; use env_logger::fmt::Formatter; use env_logger::Builder; +use futures::Future; use log::{LevelFilter, Record}; use shadowsocks::{run_dns, Config, ConfigType, ServerAddr, ServerConfig}; @@ -211,5 +213,10 @@ fn main() { debug!("Config: {:?}", config); - run_dns(config); + tokio::run(run_dns(config).then(|res| -> Result<(), ()> { + match res { + Ok(..) => panic!("Server exited without error"), + Err(err) => panic!("Server exited with error {}", err), + } + })); } diff --git a/src/relay/dns.rs b/src/relay/dns.rs index b176e990..b0765096 100644 --- a/src/relay/dns.rs +++ b/src/relay/dns.rs @@ -1,18 +1,15 @@ //! DNS relay +use std::io; use std::sync::Arc; use futures::Future; -use tokio; use config::Config; use relay::udprelay::dns::run as run_udp; /// DNS Relay server running under local environment. -pub fn run(config: Config) { +pub fn run(config: Config) -> impl Future + Send { let config = Arc::new(config); - tokio::run(run_udp(config).then(|res| match res { - Ok(..) => Ok(()), - Err(err) => panic!("Failed to run server, err: {}", err), - })); + run_udp(config) } diff --git a/src/relay/local.rs b/src/relay/local.rs index 2db3c88a..3463392c 100644 --- a/src/relay/local.rs +++ b/src/relay/local.rs @@ -1,9 +1,8 @@ //! Local side +use std::io; use std::sync::Arc; -use tokio; - use futures::stream::futures_unordered; use futures::{Future, Stream}; @@ -25,9 +24,10 @@ use relay::udprelay::local::run as run_udp; /// config.server = vec![ServerConfig::basic("127.0.0.1:8388".parse().unwrap(), /// "server-password".to_string(), /// CipherType::Aes256Cfb)]; -/// run(config); +/// let fut = run(config); +/// tokio::run(fut.map_err(|err| panic!("Server run failed with error {}", err)); /// ``` -pub fn run(mut config: Config) { +pub fn run(mut config: Config) -> impl Future + Send { let mut vf = Vec::new(); if config.enable_udp { @@ -52,12 +52,10 @@ pub fn run(mut config: Config) { vf.push(boxed_future(mon)); vf.push(boxed_future(tcp_fut)); - tokio::run(futures_unordered(vf).into_future().then(|res| -> Result<(), ()> { - match res { - Ok(..) => unreachable!("Server exited without error"), - Err((err, ..)) => { - panic!("Server exited with error {}", err) - } - } - })); + futures_unordered(vf).into_future().then(|res| -> io::Result<()> { + match res { + Ok(..) => Ok(()), + Err((err, ..)) => Err(err), + } + }) } diff --git a/src/relay/server.rs b/src/relay/server.rs index 8efb2073..42a1c68f 100644 --- a/src/relay/server.rs +++ b/src/relay/server.rs @@ -1,9 +1,8 @@ //! Server side +use std::io; use std::sync::Arc; -use tokio; - use futures::stream::futures_unordered; use futures::{Future, Stream}; @@ -27,7 +26,7 @@ use relay::udprelay::server::run as run_udp; /// run(config); /// ``` /// -pub fn run(mut config: Config) { +pub fn run(mut config: Config) -> impl Future + Send { let mut vf = Vec::new(); if config.enable_udp { @@ -53,12 +52,10 @@ pub fn run(mut config: Config) { vf.push(boxed_future(mon)); vf.push(boxed_future(tcp_fut)); - tokio::run(futures_unordered(vf).into_future().then(|res| -> Result<(), ()> { - match res { - Ok(..) => unreachable!("Server exited without error"), - Err((err, ..)) => { - panic!("Server exited with error {}", err) - } - } - })); + futures_unordered(vf).into_future().then(|res| -> io::Result<()> { + match res { + Ok(..) => Ok(()), + Err((err, ..)) => Err(err), + } + }) } diff --git a/tests/dns.rs b/tests/dns.rs index dc6cdea8..d73126a1 100644 --- a/tests/dns.rs +++ b/tests/dns.rs @@ -2,6 +2,7 @@ extern crate dns_parser; extern crate env_logger; extern crate rand; extern crate shadowsocks; +extern crate tokio; use std::collections::HashSet; use std::net::{SocketAddr, UdpSocket}; @@ -11,6 +12,7 @@ use std::time::Duration; use dns_parser::{Builder, Packet, QueryClass, QueryType}; use shadowsocks::config::{Config, ConfigType}; use shadowsocks::{run_dns, run_server}; +use tokio::runtime::current_thread::Runtime; const CONFIG: &'static str = r#"{ "server": "127.0.0.1", @@ -31,11 +33,13 @@ fn dns_relay() { let dns_cfg = Config::load_from_str(CONFIG, ConfigType::Local).unwrap(); thread::spawn(move || { - run_server(server_cfg); + let mut runtime = Runtime::new().expect("Failed to create Runtime"); + runtime.block_on(run_server(server_cfg)).unwrap(); }); thread::spawn(move || { - run_dns(dns_cfg); + let mut runtime = Runtime::new().expect("Failed to create Runtime"); + runtime.block_on(run_dns(dns_cfg)).unwrap(); }); thread::sleep(Duration::from_secs(1)); diff --git a/tests/socks5.rs b/tests/socks5.rs index e297682d..c6c9273f 100644 --- a/tests/socks5.rs +++ b/tests/socks5.rs @@ -10,6 +10,7 @@ use std::thread; use std::time::Duration; use futures::Future; +use tokio::runtime::current_thread::Runtime; use tokio_io::io::{flush, read_to_end, write_all}; use shadowsocks::config::{Config, ServerConfig}; @@ -53,12 +54,16 @@ impl Socks5TestServer { pub fn run(&self) { let svr_cfg = self.config.clone(); thread::spawn(move || { - run_server(svr_cfg); + let mut runtime = Runtime::new().expect("Failed to create Runtime"); + let fut = run_server(svr_cfg); + runtime.block_on(fut).expect("Failed to run Server"); }); let client_cfg = self.config.clone(); thread::spawn(move || { - run_local(client_cfg); + let mut runtime = Runtime::new().expect("Failed to create Runtime"); + let fut = run_local(client_cfg); + runtime.block_on(fut).expect("Failed to run Local"); }); thread::sleep(Duration::from_secs(1)); @@ -89,7 +94,8 @@ fn socks5_relay_stream() { }) }); - tokio::run(fut.map_err(|_| ())); + let mut runtime = Runtime::new().expect("Failed to create Runtime"); + runtime.block_on(fut).unwrap(); } #[test] @@ -115,5 +121,6 @@ fn socks5_relay_aead() { }) }); - tokio::run(fut.map_err(|_| ())); + let mut runtime = Runtime::new().expect("Failed to create Runtime"); + runtime.block_on(fut).unwrap(); } diff --git a/tests/udp.rs b/tests/udp.rs index e59cb714..750bdee6 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -15,6 +15,7 @@ use std::time::Duration; use bytes::{BufMut, BytesMut}; use futures::Future; +use tokio::runtime::current_thread::Runtime; use tokio_io::io::read_to_end; use shadowsocks::config::{Config, ServerConfig}; @@ -46,15 +47,21 @@ fn get_client_addr() -> SocketAddr { fn start_server(bar: Arc) { thread::spawn(move || { + let mut runtime = Runtime::new().expect("Failed to create Runtime"); + + let fut = run_server(get_config()); bar.wait(); - run_server(get_config()); + runtime.block_on(fut).expect("Failed to run Server"); }); } fn start_local(bar: Arc) { thread::spawn(move || { + let mut runtime = Runtime::new().expect("Failed to create Runtime"); + + let fut = run_local(get_config()); bar.wait(); - run_local(get_config()); + runtime.block_on(fut).expect("Failed to run Local"); }); } @@ -75,6 +82,8 @@ fn start_udp_echo_server(bar: Arc) { fn start_udp_request_holder(bar: Arc, addr: Address) { thread::spawn(move || { + let mut runtime = Runtime::new().expect("Failed to create Runtime"); + let c = Socks5Client::udp_associate(addr, get_client_addr()); let fut = c.and_then(|(c, addr)| { assert_eq!(addr, Address::SocketAddress(LOCAL_ADDR.parse().unwrap())); @@ -85,7 +94,7 @@ fn start_udp_request_holder(bar: Arc, addr: Address) { bar.wait(); - tokio::run(fut.map_err(|_| ())); + runtime.block_on(fut).expect("Failed to run UDP socks5 client"); }); }