Call tokio::run from the outermost, allow test to run tokio runtime in multiple threads

This commit is contained in:
Y. T. Chung
2018-05-18 23:21:30 +08:00
parent 8f4ba32b58
commit 8f69141b33
9 changed files with 87 additions and 50 deletions

View File

@@ -9,8 +9,10 @@ extern crate clap;
extern crate env_logger;
#[macro_use]
extern crate log;
extern crate futures;
extern crate shadowsocks;
extern crate time;
extern crate tokio;
use clap::{App, Arg};
@@ -20,6 +22,7 @@ use std::net::SocketAddr;
use env_logger::fmt::Formatter;
use env_logger::Builder;
use futures::Future;
use log::{LevelFilter, Record};
use shadowsocks::plugin::PluginConfig;
@@ -148,16 +151,18 @@ fn main() {
let mut has_provided_config = false;
let mut config = match matches.value_of("CONFIG") {
Some(cpath) => match Config::load_from_file(cpath, ConfigType::Local) {
Ok(cfg) => {
has_provided_config = true;
cfg
Some(cpath) => {
match Config::load_from_file(cpath, ConfigType::Local) {
Ok(cfg) => {
has_provided_config = true;
cfg
}
Err(err) => {
error!("{:?}", err);
return;
}
}
Err(err) => {
error!("{:?}", err);
return;
}
},
}
None => Config::new(),
};
@@ -229,5 +234,10 @@ fn main() {
debug!("Config: {:?}", config);
run_local(config);
tokio::run(run_local(config).then(|res| -> Result<(), ()> {
match res {
Ok(..) => panic!("Server exited without error"),
Err(err) => panic!("Server exited with error {}", err),
}
}));
}

View File

@@ -11,8 +11,10 @@ extern crate clap;
extern crate env_logger;
#[macro_use]
extern crate log;
extern crate futures;
extern crate shadowsocks;
extern crate time;
extern crate tokio;
use std::env;
use std::io::{self, Write};
@@ -21,6 +23,7 @@ use clap::{App, Arg};
use env_logger::fmt::Formatter;
use env_logger::Builder;
use futures::Future;
use log::{LevelFilter, Record};
use shadowsocks::plugin::PluginConfig;
@@ -192,5 +195,10 @@ fn main() {
debug!("Config: {:?}", config);
run_server(config);
tokio::run(run_server(config).then(|res| -> Result<(), ()> {
match res {
Ok(..) => panic!("Server exited without error"),
Err(err) => panic!("Server exited with error {}", err),
}
}));
}

View File

@@ -6,6 +6,7 @@ extern crate tokio;
#[macro_use]
extern crate log;
extern crate env_logger;
extern crate futures;
extern crate time;
use clap::{App, Arg};
@@ -16,6 +17,7 @@ use std::net::SocketAddr;
use env_logger::fmt::Formatter;
use env_logger::Builder;
use futures::Future;
use log::{LevelFilter, Record};
use shadowsocks::{run_dns, Config, ConfigType, ServerAddr, ServerConfig};
@@ -211,5 +213,10 @@ fn main() {
debug!("Config: {:?}", config);
run_dns(config);
tokio::run(run_dns(config).then(|res| -> Result<(), ()> {
match res {
Ok(..) => panic!("Server exited without error"),
Err(err) => panic!("Server exited with error {}", err),
}
}));
}

View File

@@ -1,18 +1,15 @@
//! DNS relay
use std::io;
use std::sync::Arc;
use futures::Future;
use tokio;
use config::Config;
use relay::udprelay::dns::run as run_udp;
/// DNS Relay server running under local environment.
pub fn run(config: Config) {
pub fn run(config: Config) -> impl Future<Item = (), Error = io::Error> + Send {
let config = Arc::new(config);
tokio::run(run_udp(config).then(|res| match res {
Ok(..) => Ok(()),
Err(err) => panic!("Failed to run server, err: {}", err),
}));
run_udp(config)
}

View File

@@ -1,9 +1,8 @@
//! Local side
use std::io;
use std::sync::Arc;
use tokio;
use futures::stream::futures_unordered;
use futures::{Future, Stream};
@@ -25,9 +24,10 @@ use relay::udprelay::local::run as run_udp;
/// config.server = vec![ServerConfig::basic("127.0.0.1:8388".parse().unwrap(),
/// "server-password".to_string(),
/// CipherType::Aes256Cfb)];
/// run(config);
/// let fut = run(config);
/// tokio::run(fut.map_err(|err| panic!("Server run failed with error {}", err));
/// ```
pub fn run(mut config: Config) {
pub fn run(mut config: Config) -> impl Future<Item = (), Error = io::Error> + Send {
let mut vf = Vec::new();
if config.enable_udp {
@@ -52,12 +52,10 @@ pub fn run(mut config: Config) {
vf.push(boxed_future(mon));
vf.push(boxed_future(tcp_fut));
tokio::run(futures_unordered(vf).into_future().then(|res| -> Result<(), ()> {
match res {
Ok(..) => unreachable!("Server exited without error"),
Err((err, ..)) => {
panic!("Server exited with error {}", err)
}
}
}));
futures_unordered(vf).into_future().then(|res| -> io::Result<()> {
match res {
Ok(..) => Ok(()),
Err((err, ..)) => Err(err),
}
})
}

View File

@@ -1,9 +1,8 @@
//! Server side
use std::io;
use std::sync::Arc;
use tokio;
use futures::stream::futures_unordered;
use futures::{Future, Stream};
@@ -27,7 +26,7 @@ use relay::udprelay::server::run as run_udp;
/// run(config);
/// ```
///
pub fn run(mut config: Config) {
pub fn run(mut config: Config) -> impl Future<Item = (), Error = io::Error> + Send {
let mut vf = Vec::new();
if config.enable_udp {
@@ -53,12 +52,10 @@ pub fn run(mut config: Config) {
vf.push(boxed_future(mon));
vf.push(boxed_future(tcp_fut));
tokio::run(futures_unordered(vf).into_future().then(|res| -> Result<(), ()> {
match res {
Ok(..) => unreachable!("Server exited without error"),
Err((err, ..)) => {
panic!("Server exited with error {}", err)
}
}
}));
futures_unordered(vf).into_future().then(|res| -> io::Result<()> {
match res {
Ok(..) => Ok(()),
Err((err, ..)) => Err(err),
}
})
}

View File

@@ -2,6 +2,7 @@ extern crate dns_parser;
extern crate env_logger;
extern crate rand;
extern crate shadowsocks;
extern crate tokio;
use std::collections::HashSet;
use std::net::{SocketAddr, UdpSocket};
@@ -11,6 +12,7 @@ use std::time::Duration;
use dns_parser::{Builder, Packet, QueryClass, QueryType};
use shadowsocks::config::{Config, ConfigType};
use shadowsocks::{run_dns, run_server};
use tokio::runtime::current_thread::Runtime;
const CONFIG: &'static str = r#"{
"server": "127.0.0.1",
@@ -31,11 +33,13 @@ fn dns_relay() {
let dns_cfg = Config::load_from_str(CONFIG, ConfigType::Local).unwrap();
thread::spawn(move || {
run_server(server_cfg);
let mut runtime = Runtime::new().expect("Failed to create Runtime");
runtime.block_on(run_server(server_cfg)).unwrap();
});
thread::spawn(move || {
run_dns(dns_cfg);
let mut runtime = Runtime::new().expect("Failed to create Runtime");
runtime.block_on(run_dns(dns_cfg)).unwrap();
});
thread::sleep(Duration::from_secs(1));

View File

@@ -10,6 +10,7 @@ use std::thread;
use std::time::Duration;
use futures::Future;
use tokio::runtime::current_thread::Runtime;
use tokio_io::io::{flush, read_to_end, write_all};
use shadowsocks::config::{Config, ServerConfig};
@@ -53,12 +54,16 @@ impl Socks5TestServer {
pub fn run(&self) {
let svr_cfg = self.config.clone();
thread::spawn(move || {
run_server(svr_cfg);
let mut runtime = Runtime::new().expect("Failed to create Runtime");
let fut = run_server(svr_cfg);
runtime.block_on(fut).expect("Failed to run Server");
});
let client_cfg = self.config.clone();
thread::spawn(move || {
run_local(client_cfg);
let mut runtime = Runtime::new().expect("Failed to create Runtime");
let fut = run_local(client_cfg);
runtime.block_on(fut).expect("Failed to run Local");
});
thread::sleep(Duration::from_secs(1));
@@ -89,7 +94,8 @@ fn socks5_relay_stream() {
})
});
tokio::run(fut.map_err(|_| ()));
let mut runtime = Runtime::new().expect("Failed to create Runtime");
runtime.block_on(fut).unwrap();
}
#[test]
@@ -115,5 +121,6 @@ fn socks5_relay_aead() {
})
});
tokio::run(fut.map_err(|_| ()));
let mut runtime = Runtime::new().expect("Failed to create Runtime");
runtime.block_on(fut).unwrap();
}

View File

@@ -15,6 +15,7 @@ use std::time::Duration;
use bytes::{BufMut, BytesMut};
use futures::Future;
use tokio::runtime::current_thread::Runtime;
use tokio_io::io::read_to_end;
use shadowsocks::config::{Config, ServerConfig};
@@ -46,15 +47,21 @@ fn get_client_addr() -> SocketAddr {
fn start_server(bar: Arc<Barrier>) {
thread::spawn(move || {
let mut runtime = Runtime::new().expect("Failed to create Runtime");
let fut = run_server(get_config());
bar.wait();
run_server(get_config());
runtime.block_on(fut).expect("Failed to run Server");
});
}
fn start_local(bar: Arc<Barrier>) {
thread::spawn(move || {
let mut runtime = Runtime::new().expect("Failed to create Runtime");
let fut = run_local(get_config());
bar.wait();
run_local(get_config());
runtime.block_on(fut).expect("Failed to run Local");
});
}
@@ -75,6 +82,8 @@ fn start_udp_echo_server(bar: Arc<Barrier>) {
fn start_udp_request_holder(bar: Arc<Barrier>, addr: Address) {
thread::spawn(move || {
let mut runtime = Runtime::new().expect("Failed to create Runtime");
let c = Socks5Client::udp_associate(addr, get_client_addr());
let fut = c.and_then(|(c, addr)| {
assert_eq!(addr, Address::SocketAddress(LOCAL_ADDR.parse().unwrap()));
@@ -85,7 +94,7 @@ fn start_udp_request_holder(bar: Arc<Barrier>, addr: Address) {
bar.wait();
tokio::run(fut.map_err(|_| ()));
runtime.block_on(fut).expect("Failed to run UDP socks5 client");
});
}