enable udp

This commit is contained in:
Y. T. Chung
2015-07-09 23:01:31 +08:00
parent b810441ff5
commit 4b49dd17dd
7 changed files with 187 additions and 78 deletions

View File

@@ -21,6 +21,8 @@ default = [
"cipher-chacha20",
"cipher-salsa20",
"enable-udp",
]
cipher-aes-cfb = []
@@ -37,7 +39,7 @@ cipher-seed-cfb = []
cipher-chacha20 = ["enable-sodium"]
cipher-salsa20 = ["enable-sodium"]
enable-udp = ["lru-cache"]
enable-udp = []
enable-sodium = ["libsodium-sys"]
[[bin]]
@@ -74,8 +76,7 @@ git = "https://github.com/zonyitoo/libsodium-sys.git"
optional = true
[dependencies.lru-cache]
version = "*"
optional = true
git = "https://github.com/zonyitoo/lru-cache.git"
[dependencies.simplesched]
git = "https://github.com/zonyitoo/simplesched.git"

View File

@@ -28,7 +28,6 @@
extern crate rustc_serialize as serialize;
#[macro_use]
extern crate log;
#[cfg(feature = "enable-udp")]
extern crate lru_cache;
extern crate libsodium_sys as libsodium_ffi;

View File

@@ -19,24 +19,26 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
use std::sync::{Arc, Mutex, TaskPool};
use std::sync::Arc;
// use std::sync::atomic::{AtomicOption, SeqCst};
use std::net::lookup_host;
use std::net::IpAddr;
use std::net::{SocketAddr, ToSocketAddrs};
use std::io;
use std::vec::IntoIter;
use collect::LruCache;
use simplesched::Scheduler;
use simplesched::sync::Mutex;
const TASK_POOL_SIZE: usize = 4;
use lru_cache::LruCache;
struct DnsLruCache {
cache: LruCache<String, IpAddr>,
cache: LruCache<String, Vec<SocketAddr>>,
totally_matched: usize,
totally_missed: usize,
}
pub struct CachedDns {
lru_cache: Arc<Mutex<DnsLruCache>>,
pool: TaskPool,
}
impl CachedDns {
@@ -47,49 +49,60 @@ impl CachedDns {
totally_missed: 0,
totally_matched: 0,
})),
pool: TaskPool::new(TASK_POOL_SIZE),
}
}
pub fn resolve(&self, addr: &str) -> Option<IpAddr> {
let addr_string = addr.to_string();
pub fn resolve(&self, addr_str: &str) -> Option<Vec<SocketAddr>> {
{
let mut cache = self.lru_cache.lock().unwrap();
match cache.cache.get(&addr_string).map(|x| x.clone()) {
match cache.cache.get(addr_str).map(|x| x.clone()) {
Some(addrs) => {
cache.totally_matched += 1;
debug!("DNS cache matched!: {}", addr_string);
debug!("DNS cache matched!: {}", addr_str);
debug!("DNS cache matched: {}, missed: {}", cache.totally_matched, cache.totally_missed);
return Some(addrs)
},
None => {
cache.totally_missed += 1;
debug!("DNS cache missed!: {}", addr_string);
debug!("DNS cache missed!: {}", addr_str);
debug!("DNS cache matched: {}, missed: {}", cache.totally_matched, cache.totally_missed);
}
}
}
let mut addrs = match lookup_host(addr) {
let addrs = match lookup_host(addr_str) {
Ok(addrs) => addrs,
Err(err) => {
error!("Failed to resolve {}: {}", addr, err);
error!("Failed to resolve {}: {}", addr_str, err);
return None;
}
};
let cloned_mutex = self.lru_cache.clone();
let addr = match addrs.next() {
Some(Ok(addr)) => addr.ip(),
_ => return None,
};
let cloned_addr = addr.clone();
self.pool.execute(move || {
let mut addr_vec = Vec::new();
let mut last_err: io::Result<()> = Ok(());
for sock_addr in addrs {
match sock_addr {
Ok(addr) => {
addr_vec.push(addr);
},
Err(err) => last_err = Err(err),
}
}
if addr_vec.is_empty() && last_err.is_err() {
error!("Failed to resolve {}: {:?}", addr_str, last_err.unwrap_err());
return None;
}
let cloned_addrs = addr_vec.clone();
let addr_string: String = addr_str.to_owned();
Scheduler::spawn(move || {
let mut cache = cloned_mutex.lock().unwrap();
cache.cache.insert(addr_string, cloned_addr);
cache.cache.insert(addr_string, cloned_addrs);
});
Some(addr)
Some(addr_vec)
}
}

View File

@@ -21,7 +21,7 @@
//! TcpRelay implementation
// mod cached_dns;
mod cached_dns;
pub mod local;
pub mod server;
mod stream;

View File

@@ -21,15 +21,15 @@
//! TcpRelay server that running on the server side
// use std::sync::Arc;
use std::io::{Read, Write, BufReader, ErrorKind};
use std::sync::Arc;
use std::io::{self, Read, Write, BufReader, ErrorKind};
use simplesched::Scheduler;
use simplesched::net::{TcpListener, TcpStream, Shutdown};
use config::{Config, ServerConfig};
use relay::socks5;
// use relay::tcprelay::cached_dns::CachedDns;
use relay::tcprelay::cached_dns::CachedDns;
use relay::tcprelay::stream::{DecryptedReader, EncryptedWriter};
use crypto::cipher;
use crypto::CryptoMode;
@@ -53,9 +53,9 @@ impl TcpRelayServer {
let acceptor = TcpListener::bind(&(&s.addr[..], s.port))
.unwrap_or_else(|err| panic!("Failed to bind: {:?}", err));
info!("Shadowsocks listening on {}", s.addr);
info!("Shadowsocks listening on {}:{}", s.addr, s.port);
// let dnscache_arc = Arc::new(CachedDns::with_capacity(s.dns_cache_capacity));
let dnscache_arc = Arc::new(CachedDns::with_capacity(s.dns_cache_capacity));
let pwd = s.method.bytes_to_key(s.password.as_bytes());
let timeout = s.timeout;
@@ -81,7 +81,7 @@ impl TcpRelayServer {
let pwd = pwd.clone();
let encrypt_method = method;
// let dnscache = dnscache_arc.clone();
let dnscache = dnscache_arc.clone();
Scheduler::spawn(move || {
let remote_iv = {
@@ -143,11 +143,42 @@ impl TcpRelayServer {
};
info!("Connecting to {}", addr);
let remote_stream = match TcpStream::connect(&addr) {
Ok(stream) => stream,
Err(err) => {
error!("Unable to connect {:?}: {}", addr, err);
return;
let remote_stream = match &addr {
&socks5::Address::SocketAddress(ref addr) => {
match TcpStream::connect(&addr) {
Ok(stream) => stream,
Err(err) => {
error!("Unable to connect {:?}: {}", addr, err);
return;
}
}
},
&socks5::Address::DomainNameAddress(ref dname, ref port) => {
let addrs = match dnscache.resolve(&dname) {
Some(addrs) => addrs,
None => return,
};
let processing = || {
let mut last_err: Option<io::Result<TcpStream>> = None;
for addr in addrs.into_iter() {
match TcpStream::connect(&(addr.ip(), *port)) {
Ok(stream) => return Ok(stream),
Err(err) => {
error!("Unable to connect {:?}: {}", addr, err);
last_err = Some(Err(err));
}
}
}
last_err.unwrap()
};
match processing() {
Ok(s) => s,
Err(_) => return
}
}
};

View File

@@ -67,7 +67,6 @@ use lru_cache::LruCache;
use crypto::{cipher, CryptoMode};
use crypto::cipher::Cipher;
use config::{Config, ServerConfig};
use relay::Relay;
use relay::socks5;
use relay::loadbalancing::server::{LoadBalancer, RoundRobin};
use relay::udprelay::UDP_RELAY_LOCAL_LRU_CACHE_CAPACITY;
@@ -85,9 +84,15 @@ impl UdpRelayLocal {
}
}
impl Relay for UdpRelayLocal {
fn run(&self) {
let addr = self.config.local.expect("Local configuration should not be None");
impl UdpRelayLocal {
pub fn run(&self) {
let addr = match self.config.local {
Some(addr) => addr,
None => {
error!("Local configuration should not be None");
return;
}
};
let mut server_load_balancer = RoundRobin::new(self.config.server.clone());
@@ -114,7 +119,13 @@ impl Relay for UdpRelayLocal {
let client_map_arc = Arc::new(Mutex::new(
LruCache::<socks5::Address, SocketAddr>::new(UDP_RELAY_LOCAL_LRU_CACHE_CAPACITY)));
let socket = UdpSocket::bind(&addr).ok().expect("Failed to bind udp socket");
let socket = match UdpSocket::bind(&addr) {
Ok(sk) => sk,
Err(err) => {
error!("Failed to bind udp socket: {:?}", err);
return;
}
};
let mut buf = [0u8; 0xffff];
loop {
@@ -186,7 +197,13 @@ fn handle_request(socket: UdpSocket,
let mut bufr = BufReader::new(request_message);
let request = socks5::UdpAssociateHeader::read_from(&mut bufr).unwrap();
let request = match socks5::UdpAssociateHeader::read_from(&mut bufr) {
Ok(r) => r,
Err(err) => {
error!("Error occurs while reading UdpAssociateHeader: {:?}", err);
return;
}
};
let addr = request.address.clone();
@@ -203,14 +220,29 @@ fn handle_request(socket: UdpSocket,
CryptoMode::Encrypt);
let mut wbuf = Vec::new();
request.write_to(&mut wbuf).unwrap();
io::copy(&mut bufr, &mut wbuf).unwrap();
if let Err(err) = request.write_to(&mut wbuf) {
error!("Error occurs while writing request: {:?}", err);
return;
}
encryptor.update(&wbuf[..], &mut iv).unwrap();
encryptor.finalize(&mut iv).unwrap();
if let Err(err) = io::copy(&mut bufr, &mut wbuf) {
error!("Error occurs while copying from bufr to wbuf: {:?}", err);
return;
}
socket.send_to(&iv[..], &server_addr)
.ok().expect("Error occurs while sending to remote");
if let Err(err) = encryptor.update(&wbuf[..], &mut iv) {
error!("Error occurs while encrypting: {:?}", err);
return;
}
if let Err(err) = encryptor.finalize(&mut iv) {
error!("Error occurs while finalizing: {:?}", err);
return;
}
if let Err(err) = socket.send_to(&iv[..], &server_addr) {
error!("Error occurs while sending to remote: {:?}", err);
}
}
fn handle_response(socket: UdpSocket,
@@ -225,12 +257,25 @@ fn handle_response(socket: UdpSocket,
&response_message[0..config.method.block_size()],
CryptoMode::Decrypt);
let mut decrypted_data = Vec::new();
decryptor.update(&response_message[config.method.block_size()..], &mut decrypted_data).unwrap();
decryptor.finalize(&mut decrypted_data).unwrap();
if let Err(err) = decryptor.update(&response_message[config.method.block_size()..], &mut decrypted_data) {
error!("Error occurs while decrypting data: {:?}", err);
return;
}
if let Err(err) = decryptor.finalize(&mut decrypted_data) {
error!("Error occurs while finalizing decrypt: {:?}", err);
return;
}
let mut bufr = BufReader::new(&decrypted_data[..]);
let addr = socks5::Address::read_from(&mut bufr).unwrap();
let addr = match socks5::Address::read_from(&mut bufr) {
Ok(addr) => addr,
Err(err) => {
error!("Error occurs while reading address: {:?}", err);
return;
}
};
let client_addr = {
let mut cmap = client_map.lock().unwrap();
@@ -243,10 +288,17 @@ fn handle_response(socket: UdpSocket,
debug!("UDP response {} -> {}", from_addr, client_addr);
let mut bufw = Vec::new();
socks5::UdpAssociateHeader::new(0, addr)
.write_to(&mut bufw).unwrap();
io::copy(&mut bufr, &mut bufw).unwrap();
if let Err(err) = socks5::UdpAssociateHeader::new(0, addr).write_to(&mut bufw) {
error!("Error occurs while writing UdpAssociateHeader: {:?}", err);
return;
}
socket.send_to(&bufw[..], &client_addr)
.ok().expect("Error occurs while sending to local");
if let Err(err) = io::copy(&mut bufr, &mut bufw) {
error!("Error occurs while copying from bufr to bufw: {:?}", err);
return;
}
if let Err(err) = socket.send_to(&bufw[..], &client_addr) {
error!("Error occurs while sending to local: {:?}", err);
}
}

View File

@@ -22,12 +22,12 @@
use std::sync::{Arc, Mutex};
use std::net::{UdpSocket, SocketAddr, SocketAddrV4, SocketAddrV6, lookup_host};
use std::io::{BufReader, Read};
use std::thread;
use lru_cache::LruCache;
use simplesched::Scheduler;
use config::{Config, ServerConfig};
use relay::Relay;
use relay::socks5::{Address, self};
use relay::udprelay::{UDP_RELAY_SERVER_LRU_CACHE_CAPACITY};
use crypto::{cipher, CryptoMode};
@@ -62,12 +62,18 @@ impl UdpRelayServer {
let data = buf[..len].to_vec();
let client_map = client_map_arc.clone();
let remote_map = remote_map_arc.clone();
let captured_socket = socket.try_clone().unwrap();
let captured_socket = match socket.try_clone() {
Ok(sk) => sk,
Err(err) => {
error!("Error occurs while cloning socket: {:?}", err);
return;
}
};
let method = svr_config.method;
let password = svr_config.password.clone();
thread::spawn(move || {
Scheduler::spawn(move || {
match remote_map.lock().unwrap().get(&src) {
Some(remote_addr) => {
match client_map.lock().unwrap().get(remote_addr) {
@@ -76,7 +82,11 @@ impl UdpRelayServer {
// Make a header
let mut response_buf = Vec::new();
remote_addr.write_to(&mut response_buf).unwrap();
if let Err(err) = remote_addr.write_to(&mut response_buf) {
error!("Error occurs while writing remote addr: {:?}", err);
return;
}
response_buf.push_all(&data[..]);
let key = method.bytes_to_key(password.as_bytes());
@@ -86,12 +96,21 @@ impl UdpRelayServer {
&key[..],
&iv[..],
CryptoMode::Encrypt);
encryptor.update(&response_buf[..], &mut iv).unwrap();
encryptor.finalize(&mut iv).unwrap();
captured_socket
.send_to(&iv[..], &client_addr)
.unwrap();
if let Err(err) = encryptor.update(&response_buf[..], &mut iv) {
error!("Error occurs while encrypting: {:?}", err);
return;
}
if let Err(err) = encryptor.finalize(&mut iv) {
error!("Error occurs while finalizing: {:?}", err);
return;
}
if let Err(err) = captured_socket.send_to(&iv[..], &client_addr) {
error!("Error occurs while sending data: {:?}", err);
return;
}
},
None => {
// Unknown response, drop it.
@@ -178,17 +197,11 @@ impl UdpRelayServer {
}
}
impl Relay for UdpRelayServer {
fn run(&self) {
let mut threads = Vec::new();
impl UdpRelayServer {
pub fn run(&self) {
for s in self.config.server.iter() {
let s = s.clone();
let fut = thread::spawn(move || UdpRelayServer::accept_loop(s));
threads.push(fut);
}
for fut in threads.into_iter() {
fut.join().unwrap();
Scheduler::spawn(move || UdpRelayServer::accept_loop(s));
}
}
}