initial impls HTTP tunnel

This commit is contained in:
Y. T. Chung
2016-10-19 00:44:36 +08:00
parent a5ac010315
commit 2e7e403293
8 changed files with 610 additions and 132 deletions

View File

@@ -61,6 +61,7 @@ ip = "1.0.0"
openssl = "^0.7.1"
lru-cache = "0.0.7"
libc = "^0.2.7"
hyper = "0.9"
[dependencies.coio]
git = "https://github.com/zonyitoo/coio-rs.git"

View File

@@ -19,14 +19,18 @@ const BLACK: &'static str = "\x1b[40m \x1b[0m";
const WHITE: &'static str = "\x1b[47m \x1b[0m";
fn encode_url(svr: &ServerConfig) -> String {
let url = format!("{}:{}@{}:{}", svr.method.to_string(), svr.password, svr.addr, svr.port);
let url = format!("{}:{}@{}:{}",
svr.method.to_string(),
svr.password,
svr.addr,
svr.port);
format!("ss://{}", url.as_bytes().to_base64(URL_SAFE))
}
fn print_qrcode(encoded: &str) {
let qrcode = QrCode::new(encoded.as_bytes()).unwrap();
for _ in 0..qrcode.width()+2 {
for _ in 0..qrcode.width() + 2 {
print!("{}", WHITE);
}
println!("");
@@ -34,18 +38,14 @@ fn print_qrcode(encoded: &str) {
for y in 0..qrcode.width() {
print!("{}", WHITE);
for x in 0..qrcode.width() {
let color = if qrcode[(x, y)] {
BLACK
} else {
WHITE
};
let color = if qrcode[(x, y)] { BLACK } else { WHITE };
print!("{}", color);
}
println!("{}", WHITE);
}
for _ in 0..qrcode.width()+2 {
for _ in 0..qrcode.width() + 2 {
print!("{}", WHITE);
}
println!("");
@@ -75,9 +75,7 @@ fn decode(encoded: &str, need_qrcode: bool) {
let mut sp1 = decoded.split('@');
let (account, addr) = match (sp1.next(), sp1.next()) {
(Some(account), Some(addr)) => {
(account, addr)
},
(Some(account), Some(addr)) => (account, addr),
_ => panic!("Malformed input"),
};
@@ -93,12 +91,15 @@ fn decode(encoded: &str, need_qrcode: bool) {
_ => panic!("Malformed input"),
};
let svrconfig = ServerConfig::basic(addr.to_owned(), port.parse().unwrap(),
pwd.to_owned(), method.parse().unwrap());
let svrconfig = ServerConfig::basic(addr.to_owned(),
port.parse().unwrap(),
pwd.to_owned(),
method.parse().unwrap());
let config = Config {
server: vec![svrconfig],
local: None,
http_proxy: None,
enable_udp: false,
timeout: None,
forbidden_ip: HashSet::new(),
@@ -114,17 +115,23 @@ fn decode(encoded: &str, need_qrcode: bool) {
fn main() {
let app = App::new("ssurl")
.author("Y. T. Chung <zonyitoo@gmail.com>")
.about("Encode and decode ShadowSocks URL")
.arg(Arg::with_name("ENCODE").short("e").long("encode")
.takes_value(true)
.help("Encode the server configuration in the provided JSON file"))
.arg(Arg::with_name("DECODE").short("d").long("decode")
.takes_value(true)
.help("Decode the server configuration from the provide ShadowSocks URL"))
.arg(Arg::with_name("QRCODE").short("c").long("qrcode")
.takes_value(false)
.help("Generate the QRCode with the provided configuration"));
.author("Y. T. Chung <zonyitoo@gmail.com>")
.about("Encode and decode ShadowSocks URL")
.arg(Arg::with_name("ENCODE")
.short("e")
.long("encode")
.takes_value(true)
.help("Encode the server configuration in the provided JSON file"))
.arg(Arg::with_name("DECODE")
.short("d")
.long("decode")
.takes_value(true)
.help("Decode the server configuration from the provide ShadowSocks URL"))
.arg(Arg::with_name("QRCODE")
.short("c")
.long("qrcode")
.takes_value(false)
.help("Generate the QRCode with the provided configuration"));
let matches = app.get_matches();
let need_qrcode = matches.is_present("QRCODE");

View File

@@ -140,6 +140,7 @@ pub enum ConfigType {
pub struct Config {
pub server: Vec<ServerConfig>,
pub local: Option<ClientConfig>,
pub http_proxy: Option<ClientConfig>,
pub enable_udp: bool,
pub timeout: Option<Duration>,
pub forbidden_ip: HashSet<IpAddr>,
@@ -191,6 +192,7 @@ impl Config {
Config {
server: Vec::new(),
local: None,
http_proxy: None,
enable_udp: false,
timeout: None,
forbidden_ip: HashSet::new(),
@@ -203,9 +205,7 @@ impl Config {
config.timeout = match o.get("timeout") {
Some(t_str) => {
let val = try!(t_str.as_u64()
.ok_or(Error::new(ErrorKind::Malformed,
"`timeout` should be an integer",
None)));
.ok_or(Error::new(ErrorKind::Malformed, "`timeout` should be an integer", None)));
Some(Duration::from_secs(val))
}
None => None,
@@ -213,21 +213,15 @@ impl Config {
if o.contains_key("servers") {
let server_list = try!(o.get("servers")
.unwrap()
.as_array()
.ok_or(Error::new(ErrorKind::Malformed,
"`servers` should be a list",
None)));
.unwrap()
.as_array()
.ok_or(Error::new(ErrorKind::Malformed, "`servers` should be a list", None)));
for server in server_list.iter() {
let method_o = try!(server.find("method")
.ok_or(Error::new(ErrorKind::MissingField,
"need to specify a method",
None)));
let method_str = try!(method_o.as_string().ok_or(Error::new(ErrorKind::Malformed,
"`method` should \
be a string",
None)));
.ok_or(Error::new(ErrorKind::MissingField, "need to specify a method", None)));
let method_str = try!(method_o.as_string()
.ok_or(Error::new(ErrorKind::Malformed, "`method` should be a string", None)));
let method = try!(method_str.parse::<CipherType>().map_err(|_| {
Error::new(ErrorKind::Invalid,
"not supported method",
@@ -235,40 +229,33 @@ impl Config {
}));
let addr_o = try!(server.find("address")
.ok_or(Error::new(ErrorKind::MissingField,
"need to specify a server address",
None)));
.ok_or(Error::new(ErrorKind::MissingField,
"need to specify a server address",
None)));
let addr_str = try!(addr_o.as_string()
.ok_or(Error::new(ErrorKind::Malformed,
"`address` should be a string",
None)));
.ok_or(Error::new(ErrorKind::Malformed, "`address` should be a string", None)));
let cfg = ServerConfig {
addr: addr_str.to_string(),
port: try!(try!(server.find("port")
.ok_or(Error::new(ErrorKind::MissingField,
"need to specify a server port",
None)))
.as_u64()
.ok_or(Error::new(ErrorKind::Malformed,
"`port` should be an integer",
None))) as u16,
.ok_or(Error::new(ErrorKind::MissingField,
"need to specify a server port",
None)))
.as_u64()
.ok_or(Error::new(ErrorKind::Malformed,
"`port` should be an \
integer",
None))) as u16,
password: try!(try!(server.find("password")
.ok_or(Error::new(ErrorKind::MissingField,
"need to specify a password",
None)))
.as_string()
.ok_or(Error::new(ErrorKind::Malformed,
"`password` should be a string",
None)))
.to_string(),
.ok_or(Error::new(ErrorKind::MissingField, "need to specify a password", None)))
.as_string()
.ok_or(Error::new(ErrorKind::Malformed, "`password` should be a string", None)))
.to_string(),
method: method,
timeout: match server.find("timeout") {
Some(t) => {
let val = try!(t.as_u64()
.ok_or(Error::new(ErrorKind::Malformed,
"`timeout` should be an integer",
None)));
.ok_or(Error::new(ErrorKind::Malformed, "`timeout` should be an integer", None)));
Some(Duration::from_secs(val))
}
None => None,
@@ -276,10 +263,9 @@ impl Config {
dns_cache_capacity: match server.find("dns_cache_capacity") {
Some(t) => {
try!(t.as_u64()
.ok_or(Error::new(ErrorKind::Malformed,
"`dns_cache_capacity` should be an \
integer",
None))) as usize
.ok_or(Error::new(ErrorKind::Malformed,
"`dns_cache_capacity` should be an integer",
None))) as usize
}
None => DEFAULT_DNS_CACHE_CAPACITY,
},
@@ -288,60 +274,47 @@ impl Config {
config.server.push(cfg);
}
} else if o.contains_key("server") && o.contains_key("server_port") &&
o.contains_key("password") && o.contains_key("method") {
} else if o.contains_key("server") && o.contains_key("server_port") && o.contains_key("password") &&
o.contains_key("method") {
// Traditional configuration file
let method_o = try!(o.get("method")
.ok_or(Error::new(ErrorKind::MissingField,
"need to specify method",
None)));
.ok_or(Error::new(ErrorKind::MissingField, "need to specify method", None)));
let method_str = try!(method_o.as_string()
.ok_or(Error::new(ErrorKind::Malformed,
"`method` should be a string",
None)));
.ok_or(Error::new(ErrorKind::Malformed, "`method` should be a string", None)));
let method = try!(method_str.parse::<CipherType>()
.map_err(|_| {
Error::new(ErrorKind::Invalid,
"not supported method",
Some(format!("`{}` is not a supported \
method",
method_str)))
}));
.map_err(|_| {
Error::new(ErrorKind::Invalid,
"not supported method",
Some(format!("`{}` is not a supported method", method_str)))
}));
let addr_o = try!(o.get("server")
.ok_or(Error::new(ErrorKind::MissingField,
"need to specify server address",
None)));
.ok_or(Error::new(ErrorKind::MissingField,
"need to specify server address",
None)));
let addr_str = try!(addr_o.as_string()
.ok_or(Error::new(ErrorKind::Malformed,
"`server` should be a string",
None)));
.ok_or(Error::new(ErrorKind::Malformed, "`server` should be a string", None)));
let single_server = ServerConfig {
addr: addr_str.to_string(),
port: try!(try!(o.get("server_port")
.ok_or(Error::new(ErrorKind::MissingField,
"need to specify a server port",
None)))
.as_u64()
.ok_or(Error::new(ErrorKind::Malformed,
"`port` should be an integer",
None))) as u16,
.ok_or(Error::new(ErrorKind::MissingField,
"need to specify a server port",
None)))
.as_u64()
.ok_or(Error::new(ErrorKind::Malformed,
"`port` should be an \
integer",
None))) as u16,
password: try!(try!(o.get("password")
.ok_or(Error::new(ErrorKind::MissingField,
"need to specify a password",
None)))
.as_string()
.ok_or(Error::new(ErrorKind::Malformed,
"`password` should be a string",
None)))
.to_string(),
.ok_or(Error::new(ErrorKind::MissingField, "need to specify a password", None)))
.as_string()
.ok_or(Error::new(ErrorKind::Malformed, "`password` should be a string", None)))
.to_string(),
method: method,
timeout: match o.get("timeout") {
Some(t) => {
let val = try!(t.as_u64()
.ok_or(Error::new(ErrorKind::Malformed,
"`timeout` should be an integer",
None)));
.ok_or(Error::new(ErrorKind::Malformed, "`timeout` should be an integer", None)));
Some(Duration::from_secs(val))
}
None => None,
@@ -349,9 +322,9 @@ impl Config {
dns_cache_capacity: match o.get("dns_cache_capacity") {
Some(t) => {
try!(t.as_u64()
.ok_or(Error::new(ErrorKind::Malformed,
"`dns_cache_capacity` should be an integer",
None))) as usize
.ok_or(Error::new(ErrorKind::Malformed,
"`dns_cache_capacity` should be an integer",
None))) as usize
}
None => DEFAULT_DNS_CACHE_CAPACITY,
},
@@ -368,26 +341,22 @@ impl Config {
config.local = match o.get("local_address") {
Some(local_addr) => {
let addr_str = try!(local_addr.as_string()
.ok_or(Error::new(ErrorKind::Malformed,
"`local_address` \
should be a string",
None)));
.ok_or(Error::new(ErrorKind::Malformed,
"`local_address` should be a string",
None)));
let port =
try!(o.get("local_port")
.unwrap()
.as_u64()
.ok_or(Error::new(ErrorKind::Malformed,
"`local_port` should be an integer",
None))) as u16;
let port = try!(o.get("local_port")
.unwrap()
.as_u64()
.ok_or(Error::new(ErrorKind::Malformed,
"`local_port` should be an integer",
None))) as u16;
match addr_str.parse::<Ipv4Addr>() {
Ok(ip) => Some(SocketAddr::V4(SocketAddrV4::new(ip, port))),
Err(..) => {
match addr_str.parse::<Ipv6Addr>() {
Ok(ip) => {
Some(SocketAddr::V6(SocketAddrV6::new(ip, port, 0, 0)))
}
Ok(ip) => Some(SocketAddr::V6(SocketAddrV6::new(ip, port, 0, 0))),
Err(..) => {
return Err(Error::new(ErrorKind::Malformed,
"`local_address` is not a valid IP \
@@ -403,14 +372,52 @@ impl Config {
} else if has_local_address ^ has_local_port {
panic!("You have to provide `local_address` and `local_port` together");
}
let has_proxy_addr = o.contains_key("local_http_address");
let has_proxy_port = o.contains_key("local_http_port");
if has_proxy_addr && has_proxy_port {
config.http_proxy = match o.get("local_http_address") {
Some(local_addr) => {
let addr_str = try!(local_addr.as_string()
.ok_or(Error::new(ErrorKind::Malformed,
"`local_http_address` should be a string",
None)));
let port = try!(o.get("local_http_port")
.unwrap()
.as_u64()
.ok_or(Error::new(ErrorKind::Malformed,
"`local_http_port` should be an integer",
None))) as u16;
match addr_str.parse::<Ipv4Addr>() {
Ok(ip) => Some(SocketAddr::V4(SocketAddrV4::new(ip, port))),
Err(..) => {
match addr_str.parse::<Ipv6Addr>() {
Ok(ip) => Some(SocketAddr::V6(SocketAddrV6::new(ip, port, 0, 0))),
Err(..) => {
return Err(Error::new(ErrorKind::Malformed,
"`local_http_address` is not a valid IP \
address",
None))
}
}
}
}
}
None => None,
};
} else if has_proxy_addr ^ has_proxy_port {
panic!("You have to provide `local_http_address` and `local_http_port` together");
}
}
if let Some(forbidden_ip_conf) = o.get("forbidden_ip") {
let forbidden_ip_arr = try!(forbidden_ip_conf.as_array()
.ok_or(Error::new(ErrorKind::Malformed,
"`forbidden_ip` \
should be a list",
None)));
.ok_or(Error::new(ErrorKind::Malformed,
"`forbidden_ip` should be a list",
None)));
config.forbidden_ip.extend(forbidden_ip_arr.into_iter().filter_map(|x| {
let x = match x.as_string() {
Some(x) => x,

View File

@@ -37,6 +37,7 @@ extern crate coio;
extern crate crypto as rust_crypto;
extern crate ip;
extern crate openssl;
extern crate hyper;
extern crate libc;

View File

@@ -24,6 +24,7 @@
use coio::Scheduler;
use relay::Relay;
use relay::tcprelay::local::HttpRelayLocal;
use relay::tcprelay::local::TcpRelayLocal;
#[cfg(feature = "enable-udp")]
use relay::udprelay::local::UdpRelayLocal;
@@ -57,6 +58,7 @@ use config::Config;
pub struct RelayLocal {
enable_udp: bool,
tcprelay: TcpRelayLocal,
httprelay: Option<HttpRelayLocal>,
#[cfg(feature = "enable-udp")]
udprelay: UdpRelayLocal,
}
@@ -65,9 +67,15 @@ impl RelayLocal {
#[cfg(feature = "enable-udp")]
pub fn new(config: Config) -> RelayLocal {
let tcprelay = TcpRelayLocal::new(config.clone());
let httprelay = if config.http_proxy.is_some() {
Some(HttpRelayLocal::new(config.clone()))
} else {
None
};
let udprelay = UdpRelayLocal::new(config.clone());
RelayLocal {
tcprelay: tcprelay,
httprelay: httprelay,
udprelay: udprelay,
enable_udp: config.enable_udp,
}
@@ -76,8 +84,14 @@ impl RelayLocal {
#[cfg(not(feature = "enable-udp"))]
pub fn new(config: Config) -> RelayLocal {
let tcprelay = TcpRelayLocal::new(config.clone());
let httprelay = if config.http_proxy.is_some() {
Some(HttpRelayLocal::new(config.clone()))
} else {
None
};
RelayLocal {
tcprelay: tcprelay,
httprelay: httprelay,
enable_udp: config.enable_udp,
}
}
@@ -89,14 +103,25 @@ impl Relay for RelayLocal {
if self.enable_udp {
warn!("UDP relay feature is disabled, recompile with feature=\"enable-udp\" to enable this feature");
}
let tcprelay = self.tcprelay.clone();
let fut = Scheduler::spawn(move || tcprelay.run());
info!("Enabled TCP relay");
let mut futs = Vec::new();
fut.join().unwrap();
let tcprelay = self.tcprelay.clone();
let tcp_fut = Scheduler::spawn(move || tcprelay.run());
info!("Enabled TCP relay");
futs.push(tcp_fut);
if let Some(ref httprelay) = self.httprelay {
let httprelay = httprelay.clone();
let http_fut = Scheduler::spawn(move || httprelay.run());
info!("Enabled Http relay");
futs.push(http_fut);
}
for fut in futs {
fut.join().unwrap();
}
}
#[cfg(feature = "enable-udp")]
fn run(&self) {
let mut futs = Vec::new();
@@ -112,6 +137,13 @@ impl Relay for RelayLocal {
futs.push(udp_fut);
}
if let Some(ref httprelay) = self.httprelay {
let httprelay = httprelay.clone();
let http_fut = Scheduler::spawn(move || httprelay.run());
info!("Enabled Http relay");
futs.push(http_fut);
}
for fut in futs {
fut.join().unwrap();
}

146
src/relay/tcprelay/http.rs Normal file
View File

@@ -0,0 +1,146 @@
// The MIT License (MIT)
// Copyright (c) 2014 Y. T. CHUNG <zonyitoo@gmail.com>
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
/// Http Proxy
use std::io::{self, Read, Write};
use std::net::{SocketAddr, Shutdown};
use std::time::Duration;
use coio::net::TcpStream;
use hyper::net::NetworkStream;
use hyper::server::request::Request;
use hyper::server::response::Response;
use hyper::method::Method;
use hyper::uri::RequestUri;
use hyper::buffer::BufReader;
use hyper::header::Headers;
use hyper::status::StatusCode;
use relay::socks5::Address;
#[derive(Debug)]
pub struct HttpStream(pub TcpStream);
impl Read for HttpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.read(buf)
}
}
impl Write for HttpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.0.flush()
}
}
impl NetworkStream for HttpStream {
fn peer_addr(&mut self) -> io::Result<SocketAddr> {
self.0.peer_addr()
}
fn set_read_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.0.set_read_timeout(dur)
}
fn set_write_timeout(&self, dur: Option<Duration>) -> io::Result<()> {
self.0.set_write_timeout(dur)
}
fn close(&mut self, how: Shutdown) -> io::Result<()> {
self.0.shutdown(how)
}
}
fn do_handshake(stream: &mut HttpStream, addr: SocketAddr) -> Result<Address, StatusCode> {
let mut reader = BufReader::new(stream as &mut NetworkStream);
let request = match Request::new(&mut reader, addr) {
Ok(r) => r,
Err(err) => {
error!("Failed to create Request: {:?}", err);
return Err(StatusCode::BadRequest);
}
};
match request.method {
Method::Connect => {}
_ => {
error!("Does not support {:?}", request.method);
return Err(StatusCode::MethodNotAllowed);
}
}
match &request.uri {
&RequestUri::Authority(ref s) => {
match s.parse::<SocketAddr>() {
Ok(addr) => Ok(Address::SocketAddress(addr)),
Err(_) => {
let mut sp = s.splitn(2, ':');
match (sp.next(), sp.next()) {
(Some(host), Some(port)) => {
let port = match port.parse::<u16>() {
Ok(port) => port,
Err(err) => {
error!("Failed to parse Url, {}", err);
return Err(StatusCode::BadRequest);
}
};
Ok(Address::DomainNameAddress(host.to_owned(), port))
}
(host, port) => {
error!("Failed to parse Url, {:?}:{:?}", host, port);
return Err(StatusCode::BadRequest);
}
}
}
}
}
u => {
error!("Invalid Uri {:?}", u);
Err(StatusCode::BadRequest)
}
}
}
pub fn handshake(stream: &mut HttpStream, addr: SocketAddr) -> io::Result<Address> {
match do_handshake(stream, addr) {
Ok(r) => {
stream.write_all(b"HTTP/1.1 200 Connection Established\r\n\r\n")
.map(|_| r)
}
Err(status) => {
let mut headers = Headers::new();
let mut resp = Response::new(stream, &mut headers);
*resp.status_mut() = status;
try!(resp.start().and_then(|r| r.end()));
let err = io::Error::new(io::ErrorKind::Other, "Handshake error");
Err(err)
}
}
}

View File

@@ -425,3 +425,286 @@ impl TcpRelayLocal {
}
}
}
#[derive(Clone)]
pub struct HttpRelayLocal {
config: Arc<Config>,
}
impl HttpRelayLocal {
pub fn new(c: Config) -> HttpRelayLocal {
if c.server.is_empty() || c.http_proxy.is_none() {
panic!("You have to provide configuration for server and http_proxy");
}
HttpRelayLocal { config: Arc::new(c) }
}
fn handle_client(stream: TcpStream, server_addr: SocketAddr, password: Vec<u8>, encrypt_method: CipherType) {
use super::http::handshake;
use super::http::HttpStream;
let sockname = match stream.peer_addr() {
Ok(sockname) => sockname,
Err(err) => {
error!("Failed to get peer addr: {}", err);
return;
}
};
let stream_writer = match stream.try_clone() {
Ok(s) => s,
Err(err) => {
error!("Failed to clone local stream: {}", err);
return;
}
};
let mut http_stream = HttpStream(stream);
let addr = match handshake(&mut http_stream, sockname) {
Ok(addr) => addr,
Err(err) => {
error!("Error occurs while doing handshake: {}", err);
return;
}
};
// NOTE: We have already sent Connection Established while doing handshake
let HttpStream(stream) = http_stream; // Extract
let mut local_reader = BufReader::new(stream);
let local_writer = BufWriter::new(stream_writer);
info!("CONNECT (Http) {}", addr);
let mut remote_stream = match TcpStream::connect(&server_addr) {
Err(err) => {
error!("Failed to connect remote server: {}", err);
return;
}
Ok(s) => s,
};
// Send initialize vector to remote and create encryptor
let mut encrypt_stream = {
let local_iv = encrypt_method.gen_init_vec();
let encryptor = cipher::with_type(encrypt_method,
&password[..],
&local_iv[..],
CryptoMode::Encrypt);
if let Err(err) = remote_stream.write_all(&local_iv[..]) {
error!("Error occurs while writing initialize vector: {}", err);
return;
}
let remote_writer = match remote_stream.try_clone() {
Ok(s) => s,
Err(err) => {
error!("Error occurs while cloning remote stream: {}", err);
return;
}
};
EncryptedWriter::new(remote_writer, encryptor)
};
// Send relay address to remote
let mut addr_buf = Vec::new();
addr.write_to(&mut addr_buf).unwrap();
// if let Err(err) = addr.write_to(&mut encrypt_stream) {
if let Err(err) = encrypt_stream.write_all(&addr_buf) {
error!("Error occurs while writing address: {}", err);
return;
}
let addr_cloned = addr.clone();
Scheduler::spawn(move || {
loop {
match ::relay::copy_once(&mut local_reader, &mut encrypt_stream) {
Ok(0) => {
trace!("{} local -> remote: EOF", addr_cloned);
break;
}
Ok(n) => {
trace!("{} local -> remote: relayed {} bytes", addr_cloned, n);
}
Err(err) => {
error!("SYSTEM Connect {} local -> remote: {}", addr_cloned, err);
break;
}
}
}
debug!("SYSTEM Connect {} local -> remote is closing", addr_cloned);
let _ = encrypt_stream.get_ref().shutdown(Shutdown::Both);
let _ = local_reader.get_ref().shutdown(Shutdown::Both);
});
Scheduler::spawn(move || {
let remote_iv = {
let mut iv = Vec::with_capacity(encrypt_method.block_size());
unsafe {
iv.set_len(encrypt_method.block_size());
}
let mut total_len = 0;
while total_len < encrypt_method.block_size() {
match remote_stream.read(&mut iv[total_len..]) {
Ok(0) => {
error!("Unexpected EOF while reading initialize vector");
debug!("Already read: {:?}", &iv[..total_len]);
let _ = local_writer.get_ref().shutdown(Shutdown::Both);
return;
}
Ok(n) => total_len += n,
Err(err) => {
error!("Error while reading initialize vector: {}", err);
let _ = local_writer.get_ref().shutdown(Shutdown::Both);
return;
}
}
}
iv
};
trace!("Got initialize vector {:?}", remote_iv);
let decryptor = cipher::with_type(encrypt_method,
&password[..],
&remote_iv[..],
CryptoMode::Decrypt);
let mut decrypt_stream = DecryptedReader::new(remote_stream, decryptor);
let mut local_writer = match local_writer.into_inner() {
Ok(writer) => writer,
Err(err) => {
error!("Error occurs while taking out local writer: {}", err);
return;
}
};
loop {
match ::relay::copy_once(&mut decrypt_stream, &mut local_writer) {
Ok(0) => {
trace!("{} local <- remote: EOF", addr);
break;
}
Ok(n) => {
trace!("{} local <- remote: relayed {} bytes", addr, n);
}
Err(err) => {
error!("SYSTEM Connect {} local <- remote: {}", addr, err);
break;
}
}
}
let _ = local_writer.flush();
debug!("SYSTEM Connect {} local <- remote is closing", addr);
let _ = decrypt_stream.get_mut().shutdown(Shutdown::Both);
let _ = local_writer.shutdown(Shutdown::Both);
});
}
}
impl HttpRelayLocal {
pub fn run(&self) {
let mut server_load_balancer = RoundRobin::new(self.config.server.clone());
let local_conf = self.config.http_proxy.expect("need http_proxy configuration");
let acceptor = match TcpListener::bind(&local_conf) {
Ok(acpt) => acpt,
Err(e) => {
panic!("Error occurs while listening local address: {}",
e.to_string());
}
};
info!("Shadowsocks listening on {} (HTTP tunnel)", local_conf);
let mut cached_proxy: BTreeMap<String, SocketAddr> = BTreeMap::new();
for s in acceptor.incoming() {
let stream = match s {
Ok((s, addr)) => {
debug!("Got connection from client {:?}", addr);
s
}
Err(err) => {
panic!("Error occurs while accepting: {:?}", err);
}
};
if let Err(err) = stream.set_read_timeout(self.config.timeout) {
error!("Failed to set read timeout: {:?}", err);
continue;
}
if let Err(err) = stream.set_nodelay(true) {
error!("Failed to set no delay: {:?}", err);
continue;
}
let mut succeed = false;
for _ in 0..server_load_balancer.total() {
let ref server_cfg = server_load_balancer.pick_server();
let addr = {
match cached_proxy.get(&server_cfg.addr[..]).map(|x| x.clone()) {
Some(addr) => addr,
None => {
match lookup_host(&server_cfg.addr[..]) {
Ok(mut addr_itr) => {
match addr_itr.next() {
None => {
error!("cannot resolve proxy server `{}`", server_cfg.addr);
continue;
}
Some(addr) => {
let addr = addr.clone();
cached_proxy.insert(server_cfg.addr.clone(), addr.clone());
addr
}
}
}
Err(err) => {
error!("cannot resolve proxy server `{}`: {}", server_cfg.addr, err);
continue;
}
}
}
}
};
let server_addr = match addr {
SocketAddr::V4(addr) => SocketAddr::V4(SocketAddrV4::new(addr.ip().clone(), server_cfg.port)),
SocketAddr::V6(addr) => {
SocketAddr::V6(SocketAddrV6::new(addr.ip().clone(),
server_cfg.port,
addr.flowinfo(),
addr.scope_id()))
}
};
if self.config.forbidden_ip.contains(&::relay::take_ip_addr(&server_addr)) {
info!("{} is in `forbidden_ip` list, skipping", server_addr);
continue;
}
debug!("Using proxy `{}:{}` (`{}`)",
server_cfg.addr,
server_cfg.port,
server_addr);
let encrypt_method = server_cfg.method.clone();
let pwd = encrypt_method.bytes_to_key(server_cfg.password.as_bytes());
Scheduler::spawn(move || HttpRelayLocal::handle_client(stream, server_addr, pwd, encrypt_method));
succeed = true;
break;
}
if !succeed {
panic!("All proxy servers are failed!");
}
}
}
}

View File

@@ -25,3 +25,4 @@ mod cached_dns;
pub mod local;
pub mod server;
mod stream;
mod http;