From 4b49dd17dda12e902be82115428e5a61ffdbb358 Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Thu, 9 Jul 2015 23:01:31 +0800 Subject: [PATCH] enable udp --- Cargo.toml | 7 +-- src/lib.rs | 1 - src/relay/tcprelay/cached_dns.rs | 59 ++++++++++++-------- src/relay/tcprelay/mod.rs | 2 +- src/relay/tcprelay/server.rs | 53 ++++++++++++++---- src/relay/udprelay/local.rs | 92 +++++++++++++++++++++++++------- src/relay/udprelay/server.rs | 51 +++++++++++------- 7 files changed, 187 insertions(+), 78 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7a83f3bc..54c050b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,8 @@ default = [ "cipher-chacha20", "cipher-salsa20", + + "enable-udp", ] cipher-aes-cfb = [] @@ -37,7 +39,7 @@ cipher-seed-cfb = [] cipher-chacha20 = ["enable-sodium"] cipher-salsa20 = ["enable-sodium"] -enable-udp = ["lru-cache"] +enable-udp = [] enable-sodium = ["libsodium-sys"] [[bin]] @@ -74,8 +76,7 @@ git = "https://github.com/zonyitoo/libsodium-sys.git" optional = true [dependencies.lru-cache] -version = "*" -optional = true +git = "https://github.com/zonyitoo/lru-cache.git" [dependencies.simplesched] git = "https://github.com/zonyitoo/simplesched.git" diff --git a/src/lib.rs b/src/lib.rs index 4070dddf..13beec04 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,6 @@ extern crate rustc_serialize as serialize; #[macro_use] extern crate log; -#[cfg(feature = "enable-udp")] extern crate lru_cache; extern crate libsodium_sys as libsodium_ffi; diff --git a/src/relay/tcprelay/cached_dns.rs b/src/relay/tcprelay/cached_dns.rs index e193867b..d88ee23b 100644 --- a/src/relay/tcprelay/cached_dns.rs +++ b/src/relay/tcprelay/cached_dns.rs @@ -19,24 +19,26 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -use std::sync::{Arc, Mutex, TaskPool}; +use std::sync::Arc; // use std::sync::atomic::{AtomicOption, SeqCst}; use std::net::lookup_host; -use std::net::IpAddr; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::io; +use std::vec::IntoIter; -use collect::LruCache; +use simplesched::Scheduler; +use simplesched::sync::Mutex; -const TASK_POOL_SIZE: usize = 4; +use lru_cache::LruCache; struct DnsLruCache { - cache: LruCache, + cache: LruCache>, totally_matched: usize, totally_missed: usize, } pub struct CachedDns { lru_cache: Arc>, - pool: TaskPool, } impl CachedDns { @@ -47,49 +49,60 @@ impl CachedDns { totally_missed: 0, totally_matched: 0, })), - pool: TaskPool::new(TASK_POOL_SIZE), } } - pub fn resolve(&self, addr: &str) -> Option { - let addr_string = addr.to_string(); - + pub fn resolve(&self, addr_str: &str) -> Option> { { let mut cache = self.lru_cache.lock().unwrap(); - match cache.cache.get(&addr_string).map(|x| x.clone()) { + match cache.cache.get(addr_str).map(|x| x.clone()) { Some(addrs) => { cache.totally_matched += 1; - debug!("DNS cache matched!: {}", addr_string); + debug!("DNS cache matched!: {}", addr_str); debug!("DNS cache matched: {}, missed: {}", cache.totally_matched, cache.totally_missed); return Some(addrs) }, None => { cache.totally_missed += 1; - debug!("DNS cache missed!: {}", addr_string); + debug!("DNS cache missed!: {}", addr_str); debug!("DNS cache matched: {}, missed: {}", cache.totally_matched, cache.totally_missed); } } } - let mut addrs = match lookup_host(addr) { + let addrs = match lookup_host(addr_str) { Ok(addrs) => addrs, Err(err) => { - error!("Failed to resolve {}: {}", addr, err); + error!("Failed to resolve {}: {}", addr_str, err); return None; } }; let cloned_mutex = self.lru_cache.clone(); - let addr = match addrs.next() { - Some(Ok(addr)) => addr.ip(), - _ => return None, - }; - let cloned_addr = addr.clone(); - self.pool.execute(move || { + + let mut addr_vec = Vec::new(); + let mut last_err: io::Result<()> = Ok(()); + for sock_addr in addrs { + match sock_addr { + Ok(addr) => { + addr_vec.push(addr); + }, + Err(err) => last_err = Err(err), + } + } + + if addr_vec.is_empty() && last_err.is_err() { + error!("Failed to resolve {}: {:?}", addr_str, last_err.unwrap_err()); + return None; + } + let cloned_addrs = addr_vec.clone(); + + let addr_string: String = addr_str.to_owned(); + Scheduler::spawn(move || { let mut cache = cloned_mutex.lock().unwrap(); - cache.cache.insert(addr_string, cloned_addr); + cache.cache.insert(addr_string, cloned_addrs); }); - Some(addr) + Some(addr_vec) } } diff --git a/src/relay/tcprelay/mod.rs b/src/relay/tcprelay/mod.rs index 6ebe0843..f979385c 100644 --- a/src/relay/tcprelay/mod.rs +++ b/src/relay/tcprelay/mod.rs @@ -21,7 +21,7 @@ //! TcpRelay implementation -// mod cached_dns; +mod cached_dns; pub mod local; pub mod server; mod stream; diff --git a/src/relay/tcprelay/server.rs b/src/relay/tcprelay/server.rs index 8b1031e9..3d7348d7 100644 --- a/src/relay/tcprelay/server.rs +++ b/src/relay/tcprelay/server.rs @@ -21,15 +21,15 @@ //! TcpRelay server that running on the server side -// use std::sync::Arc; -use std::io::{Read, Write, BufReader, ErrorKind}; +use std::sync::Arc; +use std::io::{self, Read, Write, BufReader, ErrorKind}; use simplesched::Scheduler; use simplesched::net::{TcpListener, TcpStream, Shutdown}; use config::{Config, ServerConfig}; use relay::socks5; -// use relay::tcprelay::cached_dns::CachedDns; +use relay::tcprelay::cached_dns::CachedDns; use relay::tcprelay::stream::{DecryptedReader, EncryptedWriter}; use crypto::cipher; use crypto::CryptoMode; @@ -53,9 +53,9 @@ impl TcpRelayServer { let acceptor = TcpListener::bind(&(&s.addr[..], s.port)) .unwrap_or_else(|err| panic!("Failed to bind: {:?}", err)); - info!("Shadowsocks listening on {}", s.addr); + info!("Shadowsocks listening on {}:{}", s.addr, s.port); - // let dnscache_arc = Arc::new(CachedDns::with_capacity(s.dns_cache_capacity)); + let dnscache_arc = Arc::new(CachedDns::with_capacity(s.dns_cache_capacity)); let pwd = s.method.bytes_to_key(s.password.as_bytes()); let timeout = s.timeout; @@ -81,7 +81,7 @@ impl TcpRelayServer { let pwd = pwd.clone(); let encrypt_method = method; - // let dnscache = dnscache_arc.clone(); + let dnscache = dnscache_arc.clone(); Scheduler::spawn(move || { let remote_iv = { @@ -143,11 +143,42 @@ impl TcpRelayServer { }; info!("Connecting to {}", addr); - let remote_stream = match TcpStream::connect(&addr) { - Ok(stream) => stream, - Err(err) => { - error!("Unable to connect {:?}: {}", addr, err); - return; + + let remote_stream = match &addr { + &socks5::Address::SocketAddress(ref addr) => { + match TcpStream::connect(&addr) { + Ok(stream) => stream, + Err(err) => { + error!("Unable to connect {:?}: {}", addr, err); + return; + } + } + }, + &socks5::Address::DomainNameAddress(ref dname, ref port) => { + let addrs = match dnscache.resolve(&dname) { + Some(addrs) => addrs, + None => return, + }; + + let processing = || { + let mut last_err: Option> = None; + for addr in addrs.into_iter() { + match TcpStream::connect(&(addr.ip(), *port)) { + Ok(stream) => return Ok(stream), + Err(err) => { + error!("Unable to connect {:?}: {}", addr, err); + last_err = Some(Err(err)); + } + } + } + + last_err.unwrap() + }; + + match processing() { + Ok(s) => s, + Err(_) => return + } } }; diff --git a/src/relay/udprelay/local.rs b/src/relay/udprelay/local.rs index 4b705db8..5d773001 100644 --- a/src/relay/udprelay/local.rs +++ b/src/relay/udprelay/local.rs @@ -67,7 +67,6 @@ use lru_cache::LruCache; use crypto::{cipher, CryptoMode}; use crypto::cipher::Cipher; use config::{Config, ServerConfig}; -use relay::Relay; use relay::socks5; use relay::loadbalancing::server::{LoadBalancer, RoundRobin}; use relay::udprelay::UDP_RELAY_LOCAL_LRU_CACHE_CAPACITY; @@ -85,9 +84,15 @@ impl UdpRelayLocal { } } -impl Relay for UdpRelayLocal { - fn run(&self) { - let addr = self.config.local.expect("Local configuration should not be None"); +impl UdpRelayLocal { + pub fn run(&self) { + let addr = match self.config.local { + Some(addr) => addr, + None => { + error!("Local configuration should not be None"); + return; + } + }; let mut server_load_balancer = RoundRobin::new(self.config.server.clone()); @@ -114,7 +119,13 @@ impl Relay for UdpRelayLocal { let client_map_arc = Arc::new(Mutex::new( LruCache::::new(UDP_RELAY_LOCAL_LRU_CACHE_CAPACITY))); - let socket = UdpSocket::bind(&addr).ok().expect("Failed to bind udp socket"); + let socket = match UdpSocket::bind(&addr) { + Ok(sk) => sk, + Err(err) => { + error!("Failed to bind udp socket: {:?}", err); + return; + } + }; let mut buf = [0u8; 0xffff]; loop { @@ -186,7 +197,13 @@ fn handle_request(socket: UdpSocket, let mut bufr = BufReader::new(request_message); - let request = socks5::UdpAssociateHeader::read_from(&mut bufr).unwrap(); + let request = match socks5::UdpAssociateHeader::read_from(&mut bufr) { + Ok(r) => r, + Err(err) => { + error!("Error occurs while reading UdpAssociateHeader: {:?}", err); + return; + } + }; let addr = request.address.clone(); @@ -203,14 +220,29 @@ fn handle_request(socket: UdpSocket, CryptoMode::Encrypt); let mut wbuf = Vec::new(); - request.write_to(&mut wbuf).unwrap(); - io::copy(&mut bufr, &mut wbuf).unwrap(); + if let Err(err) = request.write_to(&mut wbuf) { + error!("Error occurs while writing request: {:?}", err); + return; + } - encryptor.update(&wbuf[..], &mut iv).unwrap(); - encryptor.finalize(&mut iv).unwrap(); + if let Err(err) = io::copy(&mut bufr, &mut wbuf) { + error!("Error occurs while copying from bufr to wbuf: {:?}", err); + return; + } - socket.send_to(&iv[..], &server_addr) - .ok().expect("Error occurs while sending to remote"); + if let Err(err) = encryptor.update(&wbuf[..], &mut iv) { + error!("Error occurs while encrypting: {:?}", err); + return; + } + + if let Err(err) = encryptor.finalize(&mut iv) { + error!("Error occurs while finalizing: {:?}", err); + return; + } + + if let Err(err) = socket.send_to(&iv[..], &server_addr) { + error!("Error occurs while sending to remote: {:?}", err); + } } fn handle_response(socket: UdpSocket, @@ -225,12 +257,25 @@ fn handle_response(socket: UdpSocket, &response_message[0..config.method.block_size()], CryptoMode::Decrypt); let mut decrypted_data = Vec::new(); - decryptor.update(&response_message[config.method.block_size()..], &mut decrypted_data).unwrap(); - decryptor.finalize(&mut decrypted_data).unwrap(); + if let Err(err) = decryptor.update(&response_message[config.method.block_size()..], &mut decrypted_data) { + error!("Error occurs while decrypting data: {:?}", err); + return; + } + + if let Err(err) = decryptor.finalize(&mut decrypted_data) { + error!("Error occurs while finalizing decrypt: {:?}", err); + return; + } let mut bufr = BufReader::new(&decrypted_data[..]); - let addr = socks5::Address::read_from(&mut bufr).unwrap(); + let addr = match socks5::Address::read_from(&mut bufr) { + Ok(addr) => addr, + Err(err) => { + error!("Error occurs while reading address: {:?}", err); + return; + } + }; let client_addr = { let mut cmap = client_map.lock().unwrap(); @@ -243,10 +288,17 @@ fn handle_response(socket: UdpSocket, debug!("UDP response {} -> {}", from_addr, client_addr); let mut bufw = Vec::new(); - socks5::UdpAssociateHeader::new(0, addr) - .write_to(&mut bufw).unwrap(); - io::copy(&mut bufr, &mut bufw).unwrap(); + if let Err(err) = socks5::UdpAssociateHeader::new(0, addr).write_to(&mut bufw) { + error!("Error occurs while writing UdpAssociateHeader: {:?}", err); + return; + } - socket.send_to(&bufw[..], &client_addr) - .ok().expect("Error occurs while sending to local"); + if let Err(err) = io::copy(&mut bufr, &mut bufw) { + error!("Error occurs while copying from bufr to bufw: {:?}", err); + return; + } + + if let Err(err) = socket.send_to(&bufw[..], &client_addr) { + error!("Error occurs while sending to local: {:?}", err); + } } diff --git a/src/relay/udprelay/server.rs b/src/relay/udprelay/server.rs index c20fc093..729c40ea 100644 --- a/src/relay/udprelay/server.rs +++ b/src/relay/udprelay/server.rs @@ -22,12 +22,12 @@ use std::sync::{Arc, Mutex}; use std::net::{UdpSocket, SocketAddr, SocketAddrV4, SocketAddrV6, lookup_host}; use std::io::{BufReader, Read}; -use std::thread; use lru_cache::LruCache; +use simplesched::Scheduler; + use config::{Config, ServerConfig}; -use relay::Relay; use relay::socks5::{Address, self}; use relay::udprelay::{UDP_RELAY_SERVER_LRU_CACHE_CAPACITY}; use crypto::{cipher, CryptoMode}; @@ -62,12 +62,18 @@ impl UdpRelayServer { let data = buf[..len].to_vec(); let client_map = client_map_arc.clone(); let remote_map = remote_map_arc.clone(); - let captured_socket = socket.try_clone().unwrap(); + let captured_socket = match socket.try_clone() { + Ok(sk) => sk, + Err(err) => { + error!("Error occurs while cloning socket: {:?}", err); + return; + } + }; let method = svr_config.method; let password = svr_config.password.clone(); - thread::spawn(move || { + Scheduler::spawn(move || { match remote_map.lock().unwrap().get(&src) { Some(remote_addr) => { match client_map.lock().unwrap().get(remote_addr) { @@ -76,7 +82,11 @@ impl UdpRelayServer { // Make a header let mut response_buf = Vec::new(); - remote_addr.write_to(&mut response_buf).unwrap(); + if let Err(err) = remote_addr.write_to(&mut response_buf) { + error!("Error occurs while writing remote addr: {:?}", err); + return; + } + response_buf.push_all(&data[..]); let key = method.bytes_to_key(password.as_bytes()); @@ -86,12 +96,21 @@ impl UdpRelayServer { &key[..], &iv[..], CryptoMode::Encrypt); - encryptor.update(&response_buf[..], &mut iv).unwrap(); - encryptor.finalize(&mut iv).unwrap(); - captured_socket - .send_to(&iv[..], &client_addr) - .unwrap(); + if let Err(err) = encryptor.update(&response_buf[..], &mut iv) { + error!("Error occurs while encrypting: {:?}", err); + return; + } + + if let Err(err) = encryptor.finalize(&mut iv) { + error!("Error occurs while finalizing: {:?}", err); + return; + } + + if let Err(err) = captured_socket.send_to(&iv[..], &client_addr) { + error!("Error occurs while sending data: {:?}", err); + return; + } }, None => { // Unknown response, drop it. @@ -178,17 +197,11 @@ impl UdpRelayServer { } } -impl Relay for UdpRelayServer { - fn run(&self) { - let mut threads = Vec::new(); +impl UdpRelayServer { + pub fn run(&self) { for s in self.config.server.iter() { let s = s.clone(); - let fut = thread::spawn(move || UdpRelayServer::accept_loop(s)); - threads.push(fut); - } - - for fut in threads.into_iter() { - fut.join().unwrap(); + Scheduler::spawn(move || UdpRelayServer::accept_loop(s)); } } }