From 2e7e4032935bf2b700aa592b3262f0ad54ab4bb6 Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Wed, 19 Oct 2016 00:44:36 +0800 Subject: [PATCH] initial impls HTTP tunnel --- Cargo.toml | 1 + src/bin/ssurl.rs | 55 ++++--- src/config.rs | 213 ++++++++++++++------------- src/lib.rs | 1 + src/relay/local.rs | 42 +++++- src/relay/tcprelay/http.rs | 146 +++++++++++++++++++ src/relay/tcprelay/local.rs | 283 ++++++++++++++++++++++++++++++++++++ src/relay/tcprelay/mod.rs | 1 + 8 files changed, 610 insertions(+), 132 deletions(-) create mode 100644 src/relay/tcprelay/http.rs diff --git a/Cargo.toml b/Cargo.toml index 108cf15d..68f8bcaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ ip = "1.0.0" openssl = "^0.7.1" lru-cache = "0.0.7" libc = "^0.2.7" +hyper = "0.9" [dependencies.coio] git = "https://github.com/zonyitoo/coio-rs.git" diff --git a/src/bin/ssurl.rs b/src/bin/ssurl.rs index 7f920870..9aa91997 100644 --- a/src/bin/ssurl.rs +++ b/src/bin/ssurl.rs @@ -19,14 +19,18 @@ const BLACK: &'static str = "\x1b[40m \x1b[0m"; const WHITE: &'static str = "\x1b[47m \x1b[0m"; fn encode_url(svr: &ServerConfig) -> String { - let url = format!("{}:{}@{}:{}", svr.method.to_string(), svr.password, svr.addr, svr.port); + let url = format!("{}:{}@{}:{}", + svr.method.to_string(), + svr.password, + svr.addr, + svr.port); format!("ss://{}", url.as_bytes().to_base64(URL_SAFE)) } fn print_qrcode(encoded: &str) { let qrcode = QrCode::new(encoded.as_bytes()).unwrap(); - for _ in 0..qrcode.width()+2 { + for _ in 0..qrcode.width() + 2 { print!("{}", WHITE); } println!(""); @@ -34,18 +38,14 @@ fn print_qrcode(encoded: &str) { for y in 0..qrcode.width() { print!("{}", WHITE); for x in 0..qrcode.width() { - let color = if qrcode[(x, y)] { - BLACK - } else { - WHITE - }; + let color = if qrcode[(x, y)] { BLACK } else { WHITE }; print!("{}", color); } println!("{}", WHITE); } - for _ in 0..qrcode.width()+2 { + for _ in 0..qrcode.width() + 2 { print!("{}", WHITE); } println!(""); @@ -75,9 +75,7 @@ fn decode(encoded: &str, need_qrcode: bool) { let mut sp1 = decoded.split('@'); let (account, addr) = match (sp1.next(), sp1.next()) { - (Some(account), Some(addr)) => { - (account, addr) - }, + (Some(account), Some(addr)) => (account, addr), _ => panic!("Malformed input"), }; @@ -93,12 +91,15 @@ fn decode(encoded: &str, need_qrcode: bool) { _ => panic!("Malformed input"), }; - let svrconfig = ServerConfig::basic(addr.to_owned(), port.parse().unwrap(), - pwd.to_owned(), method.parse().unwrap()); + let svrconfig = ServerConfig::basic(addr.to_owned(), + port.parse().unwrap(), + pwd.to_owned(), + method.parse().unwrap()); let config = Config { server: vec![svrconfig], local: None, + http_proxy: None, enable_udp: false, timeout: None, forbidden_ip: HashSet::new(), @@ -114,17 +115,23 @@ fn decode(encoded: &str, need_qrcode: bool) { fn main() { let app = App::new("ssurl") - .author("Y. T. Chung ") - .about("Encode and decode ShadowSocks URL") - .arg(Arg::with_name("ENCODE").short("e").long("encode") - .takes_value(true) - .help("Encode the server configuration in the provided JSON file")) - .arg(Arg::with_name("DECODE").short("d").long("decode") - .takes_value(true) - .help("Decode the server configuration from the provide ShadowSocks URL")) - .arg(Arg::with_name("QRCODE").short("c").long("qrcode") - .takes_value(false) - .help("Generate the QRCode with the provided configuration")); + .author("Y. T. Chung ") + .about("Encode and decode ShadowSocks URL") + .arg(Arg::with_name("ENCODE") + .short("e") + .long("encode") + .takes_value(true) + .help("Encode the server configuration in the provided JSON file")) + .arg(Arg::with_name("DECODE") + .short("d") + .long("decode") + .takes_value(true) + .help("Decode the server configuration from the provide ShadowSocks URL")) + .arg(Arg::with_name("QRCODE") + .short("c") + .long("qrcode") + .takes_value(false) + .help("Generate the QRCode with the provided configuration")); let matches = app.get_matches(); let need_qrcode = matches.is_present("QRCODE"); diff --git a/src/config.rs b/src/config.rs index 4a870048..81601a9f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -140,6 +140,7 @@ pub enum ConfigType { pub struct Config { pub server: Vec, pub local: Option, + pub http_proxy: Option, pub enable_udp: bool, pub timeout: Option, pub forbidden_ip: HashSet, @@ -191,6 +192,7 @@ impl Config { Config { server: Vec::new(), local: None, + http_proxy: None, enable_udp: false, timeout: None, forbidden_ip: HashSet::new(), @@ -203,9 +205,7 @@ impl Config { config.timeout = match o.get("timeout") { Some(t_str) => { let val = try!(t_str.as_u64() - .ok_or(Error::new(ErrorKind::Malformed, - "`timeout` should be an integer", - None))); + .ok_or(Error::new(ErrorKind::Malformed, "`timeout` should be an integer", None))); Some(Duration::from_secs(val)) } None => None, @@ -213,21 +213,15 @@ impl Config { if o.contains_key("servers") { let server_list = try!(o.get("servers") - .unwrap() - .as_array() - .ok_or(Error::new(ErrorKind::Malformed, - "`servers` should be a list", - None))); + .unwrap() + .as_array() + .ok_or(Error::new(ErrorKind::Malformed, "`servers` should be a list", None))); for server in server_list.iter() { let method_o = try!(server.find("method") - .ok_or(Error::new(ErrorKind::MissingField, - "need to specify a method", - None))); - let method_str = try!(method_o.as_string().ok_or(Error::new(ErrorKind::Malformed, - "`method` should \ - be a string", - None))); + .ok_or(Error::new(ErrorKind::MissingField, "need to specify a method", None))); + let method_str = try!(method_o.as_string() + .ok_or(Error::new(ErrorKind::Malformed, "`method` should be a string", None))); let method = try!(method_str.parse::().map_err(|_| { Error::new(ErrorKind::Invalid, "not supported method", @@ -235,40 +229,33 @@ impl Config { })); let addr_o = try!(server.find("address") - .ok_or(Error::new(ErrorKind::MissingField, - "need to specify a server address", - None))); + .ok_or(Error::new(ErrorKind::MissingField, + "need to specify a server address", + None))); let addr_str = try!(addr_o.as_string() - .ok_or(Error::new(ErrorKind::Malformed, - "`address` should be a string", - None))); + .ok_or(Error::new(ErrorKind::Malformed, "`address` should be a string", None))); let cfg = ServerConfig { addr: addr_str.to_string(), port: try!(try!(server.find("port") - .ok_or(Error::new(ErrorKind::MissingField, - "need to specify a server port", - None))) - .as_u64() - .ok_or(Error::new(ErrorKind::Malformed, - "`port` should be an integer", - None))) as u16, + .ok_or(Error::new(ErrorKind::MissingField, + "need to specify a server port", + None))) + .as_u64() + .ok_or(Error::new(ErrorKind::Malformed, + "`port` should be an \ + integer", + None))) as u16, password: try!(try!(server.find("password") - .ok_or(Error::new(ErrorKind::MissingField, - "need to specify a password", - None))) - .as_string() - .ok_or(Error::new(ErrorKind::Malformed, - "`password` should be a string", - None))) - .to_string(), + .ok_or(Error::new(ErrorKind::MissingField, "need to specify a password", None))) + .as_string() + .ok_or(Error::new(ErrorKind::Malformed, "`password` should be a string", None))) + .to_string(), method: method, timeout: match server.find("timeout") { Some(t) => { let val = try!(t.as_u64() - .ok_or(Error::new(ErrorKind::Malformed, - "`timeout` should be an integer", - None))); + .ok_or(Error::new(ErrorKind::Malformed, "`timeout` should be an integer", None))); Some(Duration::from_secs(val)) } None => None, @@ -276,10 +263,9 @@ impl Config { dns_cache_capacity: match server.find("dns_cache_capacity") { Some(t) => { try!(t.as_u64() - .ok_or(Error::new(ErrorKind::Malformed, - "`dns_cache_capacity` should be an \ - integer", - None))) as usize + .ok_or(Error::new(ErrorKind::Malformed, + "`dns_cache_capacity` should be an integer", + None))) as usize } None => DEFAULT_DNS_CACHE_CAPACITY, }, @@ -288,60 +274,47 @@ impl Config { config.server.push(cfg); } - } else if o.contains_key("server") && o.contains_key("server_port") && - o.contains_key("password") && o.contains_key("method") { + } else if o.contains_key("server") && o.contains_key("server_port") && o.contains_key("password") && + o.contains_key("method") { // Traditional configuration file let method_o = try!(o.get("method") - .ok_or(Error::new(ErrorKind::MissingField, - "need to specify method", - None))); + .ok_or(Error::new(ErrorKind::MissingField, "need to specify method", None))); let method_str = try!(method_o.as_string() - .ok_or(Error::new(ErrorKind::Malformed, - "`method` should be a string", - None))); + .ok_or(Error::new(ErrorKind::Malformed, "`method` should be a string", None))); let method = try!(method_str.parse::() - .map_err(|_| { - Error::new(ErrorKind::Invalid, - "not supported method", - Some(format!("`{}` is not a supported \ - method", - method_str))) - })); + .map_err(|_| { + Error::new(ErrorKind::Invalid, + "not supported method", + Some(format!("`{}` is not a supported method", method_str))) + })); let addr_o = try!(o.get("server") - .ok_or(Error::new(ErrorKind::MissingField, - "need to specify server address", - None))); + .ok_or(Error::new(ErrorKind::MissingField, + "need to specify server address", + None))); let addr_str = try!(addr_o.as_string() - .ok_or(Error::new(ErrorKind::Malformed, - "`server` should be a string", - None))); + .ok_or(Error::new(ErrorKind::Malformed, "`server` should be a string", None))); let single_server = ServerConfig { addr: addr_str.to_string(), port: try!(try!(o.get("server_port") - .ok_or(Error::new(ErrorKind::MissingField, - "need to specify a server port", - None))) - .as_u64() - .ok_or(Error::new(ErrorKind::Malformed, - "`port` should be an integer", - None))) as u16, + .ok_or(Error::new(ErrorKind::MissingField, + "need to specify a server port", + None))) + .as_u64() + .ok_or(Error::new(ErrorKind::Malformed, + "`port` should be an \ + integer", + None))) as u16, password: try!(try!(o.get("password") - .ok_or(Error::new(ErrorKind::MissingField, - "need to specify a password", - None))) - .as_string() - .ok_or(Error::new(ErrorKind::Malformed, - "`password` should be a string", - None))) - .to_string(), + .ok_or(Error::new(ErrorKind::MissingField, "need to specify a password", None))) + .as_string() + .ok_or(Error::new(ErrorKind::Malformed, "`password` should be a string", None))) + .to_string(), method: method, timeout: match o.get("timeout") { Some(t) => { let val = try!(t.as_u64() - .ok_or(Error::new(ErrorKind::Malformed, - "`timeout` should be an integer", - None))); + .ok_or(Error::new(ErrorKind::Malformed, "`timeout` should be an integer", None))); Some(Duration::from_secs(val)) } None => None, @@ -349,9 +322,9 @@ impl Config { dns_cache_capacity: match o.get("dns_cache_capacity") { Some(t) => { try!(t.as_u64() - .ok_or(Error::new(ErrorKind::Malformed, - "`dns_cache_capacity` should be an integer", - None))) as usize + .ok_or(Error::new(ErrorKind::Malformed, + "`dns_cache_capacity` should be an integer", + None))) as usize } None => DEFAULT_DNS_CACHE_CAPACITY, }, @@ -368,26 +341,22 @@ impl Config { config.local = match o.get("local_address") { Some(local_addr) => { let addr_str = try!(local_addr.as_string() - .ok_or(Error::new(ErrorKind::Malformed, - "`local_address` \ - should be a string", - None))); + .ok_or(Error::new(ErrorKind::Malformed, + "`local_address` should be a string", + None))); - let port = - try!(o.get("local_port") - .unwrap() - .as_u64() - .ok_or(Error::new(ErrorKind::Malformed, - "`local_port` should be an integer", - None))) as u16; + let port = try!(o.get("local_port") + .unwrap() + .as_u64() + .ok_or(Error::new(ErrorKind::Malformed, + "`local_port` should be an integer", + None))) as u16; match addr_str.parse::() { Ok(ip) => Some(SocketAddr::V4(SocketAddrV4::new(ip, port))), Err(..) => { match addr_str.parse::() { - Ok(ip) => { - Some(SocketAddr::V6(SocketAddrV6::new(ip, port, 0, 0))) - } + Ok(ip) => Some(SocketAddr::V6(SocketAddrV6::new(ip, port, 0, 0))), Err(..) => { return Err(Error::new(ErrorKind::Malformed, "`local_address` is not a valid IP \ @@ -403,14 +372,52 @@ impl Config { } else if has_local_address ^ has_local_port { panic!("You have to provide `local_address` and `local_port` together"); } + + let has_proxy_addr = o.contains_key("local_http_address"); + let has_proxy_port = o.contains_key("local_http_port"); + + if has_proxy_addr && has_proxy_port { + config.http_proxy = match o.get("local_http_address") { + Some(local_addr) => { + let addr_str = try!(local_addr.as_string() + .ok_or(Error::new(ErrorKind::Malformed, + "`local_http_address` should be a string", + None))); + + let port = try!(o.get("local_http_port") + .unwrap() + .as_u64() + .ok_or(Error::new(ErrorKind::Malformed, + "`local_http_port` should be an integer", + None))) as u16; + + match addr_str.parse::() { + Ok(ip) => Some(SocketAddr::V4(SocketAddrV4::new(ip, port))), + Err(..) => { + match addr_str.parse::() { + Ok(ip) => Some(SocketAddr::V6(SocketAddrV6::new(ip, port, 0, 0))), + Err(..) => { + return Err(Error::new(ErrorKind::Malformed, + "`local_http_address` is not a valid IP \ + address", + None)) + } + } + } + } + } + None => None, + }; + } else if has_proxy_addr ^ has_proxy_port { + panic!("You have to provide `local_http_address` and `local_http_port` together"); + } } if let Some(forbidden_ip_conf) = o.get("forbidden_ip") { let forbidden_ip_arr = try!(forbidden_ip_conf.as_array() - .ok_or(Error::new(ErrorKind::Malformed, - "`forbidden_ip` \ - should be a list", - None))); + .ok_or(Error::new(ErrorKind::Malformed, + "`forbidden_ip` should be a list", + None))); config.forbidden_ip.extend(forbidden_ip_arr.into_iter().filter_map(|x| { let x = match x.as_string() { Some(x) => x, diff --git a/src/lib.rs b/src/lib.rs index ff8e8a23..1ac05b8c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,7 @@ extern crate coio; extern crate crypto as rust_crypto; extern crate ip; extern crate openssl; +extern crate hyper; extern crate libc; diff --git a/src/relay/local.rs b/src/relay/local.rs index c8929a63..b0b4e92f 100644 --- a/src/relay/local.rs +++ b/src/relay/local.rs @@ -24,6 +24,7 @@ use coio::Scheduler; use relay::Relay; +use relay::tcprelay::local::HttpRelayLocal; use relay::tcprelay::local::TcpRelayLocal; #[cfg(feature = "enable-udp")] use relay::udprelay::local::UdpRelayLocal; @@ -57,6 +58,7 @@ use config::Config; pub struct RelayLocal { enable_udp: bool, tcprelay: TcpRelayLocal, + httprelay: Option, #[cfg(feature = "enable-udp")] udprelay: UdpRelayLocal, } @@ -65,9 +67,15 @@ impl RelayLocal { #[cfg(feature = "enable-udp")] pub fn new(config: Config) -> RelayLocal { let tcprelay = TcpRelayLocal::new(config.clone()); + let httprelay = if config.http_proxy.is_some() { + Some(HttpRelayLocal::new(config.clone())) + } else { + None + }; let udprelay = UdpRelayLocal::new(config.clone()); RelayLocal { tcprelay: tcprelay, + httprelay: httprelay, udprelay: udprelay, enable_udp: config.enable_udp, } @@ -76,8 +84,14 @@ impl RelayLocal { #[cfg(not(feature = "enable-udp"))] pub fn new(config: Config) -> RelayLocal { let tcprelay = TcpRelayLocal::new(config.clone()); + let httprelay = if config.http_proxy.is_some() { + Some(HttpRelayLocal::new(config.clone())) + } else { + None + }; RelayLocal { tcprelay: tcprelay, + httprelay: httprelay, enable_udp: config.enable_udp, } } @@ -89,14 +103,25 @@ impl Relay for RelayLocal { if self.enable_udp { warn!("UDP relay feature is disabled, recompile with feature=\"enable-udp\" to enable this feature"); } - let tcprelay = self.tcprelay.clone(); - let fut = Scheduler::spawn(move || tcprelay.run()); - info!("Enabled TCP relay"); + let mut futs = Vec::new(); - fut.join().unwrap(); + let tcprelay = self.tcprelay.clone(); + let tcp_fut = Scheduler::spawn(move || tcprelay.run()); + info!("Enabled TCP relay"); + futs.push(tcp_fut); + + if let Some(ref httprelay) = self.httprelay { + let httprelay = httprelay.clone(); + let http_fut = Scheduler::spawn(move || httprelay.run()); + info!("Enabled Http relay"); + futs.push(http_fut); + } + + for fut in futs { + fut.join().unwrap(); + } } - #[cfg(feature = "enable-udp")] fn run(&self) { let mut futs = Vec::new(); @@ -112,6 +137,13 @@ impl Relay for RelayLocal { futs.push(udp_fut); } + if let Some(ref httprelay) = self.httprelay { + let httprelay = httprelay.clone(); + let http_fut = Scheduler::spawn(move || httprelay.run()); + info!("Enabled Http relay"); + futs.push(http_fut); + } + for fut in futs { fut.join().unwrap(); } diff --git a/src/relay/tcprelay/http.rs b/src/relay/tcprelay/http.rs new file mode 100644 index 00000000..9f8a6519 --- /dev/null +++ b/src/relay/tcprelay/http.rs @@ -0,0 +1,146 @@ +// The MIT License (MIT) + +// Copyright (c) 2014 Y. T. CHUNG + +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +/// Http Proxy + +use std::io::{self, Read, Write}; +use std::net::{SocketAddr, Shutdown}; +use std::time::Duration; + +use coio::net::TcpStream; + +use hyper::net::NetworkStream; +use hyper::server::request::Request; +use hyper::server::response::Response; +use hyper::method::Method; +use hyper::uri::RequestUri; +use hyper::buffer::BufReader; +use hyper::header::Headers; +use hyper::status::StatusCode; + +use relay::socks5::Address; + +#[derive(Debug)] +pub struct HttpStream(pub TcpStream); + +impl Read for HttpStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.0.read(buf) + } +} + +impl Write for HttpStream { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.0.flush() + } +} + +impl NetworkStream for HttpStream { + fn peer_addr(&mut self) -> io::Result { + self.0.peer_addr() + } + + fn set_read_timeout(&self, dur: Option) -> io::Result<()> { + self.0.set_read_timeout(dur) + } + + fn set_write_timeout(&self, dur: Option) -> io::Result<()> { + self.0.set_write_timeout(dur) + } + + fn close(&mut self, how: Shutdown) -> io::Result<()> { + self.0.shutdown(how) + } +} + +fn do_handshake(stream: &mut HttpStream, addr: SocketAddr) -> Result { + let mut reader = BufReader::new(stream as &mut NetworkStream); + let request = match Request::new(&mut reader, addr) { + Ok(r) => r, + Err(err) => { + error!("Failed to create Request: {:?}", err); + return Err(StatusCode::BadRequest); + } + }; + + match request.method { + Method::Connect => {} + _ => { + error!("Does not support {:?}", request.method); + return Err(StatusCode::MethodNotAllowed); + } + } + + match &request.uri { + &RequestUri::Authority(ref s) => { + match s.parse::() { + Ok(addr) => Ok(Address::SocketAddress(addr)), + Err(_) => { + let mut sp = s.splitn(2, ':'); + match (sp.next(), sp.next()) { + (Some(host), Some(port)) => { + let port = match port.parse::() { + Ok(port) => port, + Err(err) => { + error!("Failed to parse Url, {}", err); + return Err(StatusCode::BadRequest); + } + }; + + Ok(Address::DomainNameAddress(host.to_owned(), port)) + } + (host, port) => { + error!("Failed to parse Url, {:?}:{:?}", host, port); + return Err(StatusCode::BadRequest); + } + } + } + } + } + u => { + error!("Invalid Uri {:?}", u); + Err(StatusCode::BadRequest) + } + } +} + +pub fn handshake(stream: &mut HttpStream, addr: SocketAddr) -> io::Result
{ + match do_handshake(stream, addr) { + Ok(r) => { + stream.write_all(b"HTTP/1.1 200 Connection Established\r\n\r\n") + .map(|_| r) + } + Err(status) => { + let mut headers = Headers::new(); + let mut resp = Response::new(stream, &mut headers); + *resp.status_mut() = status; + try!(resp.start().and_then(|r| r.end())); + + let err = io::Error::new(io::ErrorKind::Other, "Handshake error"); + Err(err) + } + } + +} diff --git a/src/relay/tcprelay/local.rs b/src/relay/tcprelay/local.rs index 6a375a5f..9a582704 100644 --- a/src/relay/tcprelay/local.rs +++ b/src/relay/tcprelay/local.rs @@ -425,3 +425,286 @@ impl TcpRelayLocal { } } } + +#[derive(Clone)] +pub struct HttpRelayLocal { + config: Arc, +} + +impl HttpRelayLocal { + pub fn new(c: Config) -> HttpRelayLocal { + if c.server.is_empty() || c.http_proxy.is_none() { + panic!("You have to provide configuration for server and http_proxy"); + } + + HttpRelayLocal { config: Arc::new(c) } + } + + fn handle_client(stream: TcpStream, server_addr: SocketAddr, password: Vec, encrypt_method: CipherType) { + use super::http::handshake; + use super::http::HttpStream; + + let sockname = match stream.peer_addr() { + Ok(sockname) => sockname, + Err(err) => { + error!("Failed to get peer addr: {}", err); + return; + } + }; + + let stream_writer = match stream.try_clone() { + Ok(s) => s, + Err(err) => { + error!("Failed to clone local stream: {}", err); + return; + } + }; + + let mut http_stream = HttpStream(stream); + let addr = match handshake(&mut http_stream, sockname) { + Ok(addr) => addr, + Err(err) => { + error!("Error occurs while doing handshake: {}", err); + return; + } + }; + + // NOTE: We have already sent Connection Established while doing handshake + + let HttpStream(stream) = http_stream; // Extract + + let mut local_reader = BufReader::new(stream); + let local_writer = BufWriter::new(stream_writer); + + info!("CONNECT (Http) {}", addr); + + let mut remote_stream = match TcpStream::connect(&server_addr) { + Err(err) => { + error!("Failed to connect remote server: {}", err); + return; + } + Ok(s) => s, + }; + + // Send initialize vector to remote and create encryptor + let mut encrypt_stream = { + let local_iv = encrypt_method.gen_init_vec(); + let encryptor = cipher::with_type(encrypt_method, + &password[..], + &local_iv[..], + CryptoMode::Encrypt); + if let Err(err) = remote_stream.write_all(&local_iv[..]) { + error!("Error occurs while writing initialize vector: {}", err); + return; + } + + let remote_writer = match remote_stream.try_clone() { + Ok(s) => s, + Err(err) => { + error!("Error occurs while cloning remote stream: {}", err); + return; + } + }; + EncryptedWriter::new(remote_writer, encryptor) + }; + + // Send relay address to remote + let mut addr_buf = Vec::new(); + addr.write_to(&mut addr_buf).unwrap(); + // if let Err(err) = addr.write_to(&mut encrypt_stream) { + if let Err(err) = encrypt_stream.write_all(&addr_buf) { + error!("Error occurs while writing address: {}", err); + return; + } + + let addr_cloned = addr.clone(); + + Scheduler::spawn(move || { + loop { + match ::relay::copy_once(&mut local_reader, &mut encrypt_stream) { + Ok(0) => { + trace!("{} local -> remote: EOF", addr_cloned); + break; + } + Ok(n) => { + trace!("{} local -> remote: relayed {} bytes", addr_cloned, n); + } + Err(err) => { + error!("SYSTEM Connect {} local -> remote: {}", addr_cloned, err); + break; + } + } + } + + debug!("SYSTEM Connect {} local -> remote is closing", addr_cloned); + + let _ = encrypt_stream.get_ref().shutdown(Shutdown::Both); + let _ = local_reader.get_ref().shutdown(Shutdown::Both); + }); + + Scheduler::spawn(move || { + let remote_iv = { + let mut iv = Vec::with_capacity(encrypt_method.block_size()); + unsafe { + iv.set_len(encrypt_method.block_size()); + } + + let mut total_len = 0; + while total_len < encrypt_method.block_size() { + match remote_stream.read(&mut iv[total_len..]) { + Ok(0) => { + error!("Unexpected EOF while reading initialize vector"); + debug!("Already read: {:?}", &iv[..total_len]); + let _ = local_writer.get_ref().shutdown(Shutdown::Both); + return; + } + Ok(n) => total_len += n, + Err(err) => { + error!("Error while reading initialize vector: {}", err); + let _ = local_writer.get_ref().shutdown(Shutdown::Both); + return; + } + } + } + iv + }; + trace!("Got initialize vector {:?}", remote_iv); + let decryptor = cipher::with_type(encrypt_method, + &password[..], + &remote_iv[..], + CryptoMode::Decrypt); + let mut decrypt_stream = DecryptedReader::new(remote_stream, decryptor); + let mut local_writer = match local_writer.into_inner() { + Ok(writer) => writer, + Err(err) => { + error!("Error occurs while taking out local writer: {}", err); + return; + } + }; + + loop { + match ::relay::copy_once(&mut decrypt_stream, &mut local_writer) { + Ok(0) => { + trace!("{} local <- remote: EOF", addr); + break; + } + Ok(n) => { + trace!("{} local <- remote: relayed {} bytes", addr, n); + } + Err(err) => { + error!("SYSTEM Connect {} local <- remote: {}", addr, err); + break; + } + } + } + + let _ = local_writer.flush(); + + debug!("SYSTEM Connect {} local <- remote is closing", addr); + + let _ = decrypt_stream.get_mut().shutdown(Shutdown::Both); + let _ = local_writer.shutdown(Shutdown::Both); + }); + } +} + +impl HttpRelayLocal { + pub fn run(&self) { + let mut server_load_balancer = RoundRobin::new(self.config.server.clone()); + + let local_conf = self.config.http_proxy.expect("need http_proxy configuration"); + + let acceptor = match TcpListener::bind(&local_conf) { + Ok(acpt) => acpt, + Err(e) => { + panic!("Error occurs while listening local address: {}", + e.to_string()); + } + }; + + info!("Shadowsocks listening on {} (HTTP tunnel)", local_conf); + + let mut cached_proxy: BTreeMap = BTreeMap::new(); + + for s in acceptor.incoming() { + let stream = match s { + Ok((s, addr)) => { + debug!("Got connection from client {:?}", addr); + s + } + Err(err) => { + panic!("Error occurs while accepting: {:?}", err); + } + }; + + if let Err(err) = stream.set_read_timeout(self.config.timeout) { + error!("Failed to set read timeout: {:?}", err); + continue; + } + + if let Err(err) = stream.set_nodelay(true) { + error!("Failed to set no delay: {:?}", err); + continue; + } + + let mut succeed = false; + for _ in 0..server_load_balancer.total() { + let ref server_cfg = server_load_balancer.pick_server(); + let addr = { + match cached_proxy.get(&server_cfg.addr[..]).map(|x| x.clone()) { + Some(addr) => addr, + None => { + match lookup_host(&server_cfg.addr[..]) { + Ok(mut addr_itr) => { + match addr_itr.next() { + None => { + error!("cannot resolve proxy server `{}`", server_cfg.addr); + continue; + } + Some(addr) => { + let addr = addr.clone(); + cached_proxy.insert(server_cfg.addr.clone(), addr.clone()); + addr + } + } + } + Err(err) => { + error!("cannot resolve proxy server `{}`: {}", server_cfg.addr, err); + continue; + } + } + } + } + }; + + let server_addr = match addr { + SocketAddr::V4(addr) => SocketAddr::V4(SocketAddrV4::new(addr.ip().clone(), server_cfg.port)), + SocketAddr::V6(addr) => { + SocketAddr::V6(SocketAddrV6::new(addr.ip().clone(), + server_cfg.port, + addr.flowinfo(), + addr.scope_id())) + } + }; + + if self.config.forbidden_ip.contains(&::relay::take_ip_addr(&server_addr)) { + info!("{} is in `forbidden_ip` list, skipping", server_addr); + continue; + } + + debug!("Using proxy `{}:{}` (`{}`)", + server_cfg.addr, + server_cfg.port, + server_addr); + let encrypt_method = server_cfg.method.clone(); + let pwd = encrypt_method.bytes_to_key(server_cfg.password.as_bytes()); + Scheduler::spawn(move || HttpRelayLocal::handle_client(stream, server_addr, pwd, encrypt_method)); + succeed = true; + break; + } + if !succeed { + panic!("All proxy servers are failed!"); + } + } + } +} diff --git a/src/relay/tcprelay/mod.rs b/src/relay/tcprelay/mod.rs index f979385c..bdcc76f4 100644 --- a/src/relay/tcprelay/mod.rs +++ b/src/relay/tcprelay/mod.rs @@ -25,3 +25,4 @@ mod cached_dns; pub mod local; pub mod server; mod stream; +mod http;