remove unnecessary struct instance

This commit is contained in:
Y. T. Chung
2016-10-26 00:01:37 +08:00
parent de4dc07f9b
commit 8a8b5b5907
10 changed files with 51 additions and 324 deletions

View File

@@ -230,5 +230,5 @@ fn main() {
debug!("Config: {:?}", config);
RelayLocal::new(config).run().unwrap();
RelayLocal::run(config).unwrap();
}

View File

@@ -238,5 +238,5 @@ fn main() {
.ok()
.expect("`threads` should be an integer");
RelayServer::new(config).run(threads).unwrap();
RelayServer::run(config, threads).unwrap();
}

View File

@@ -31,7 +31,7 @@ pub struct RoundRobin {
}
impl RoundRobin {
pub fn new(config: Arc<Config>) -> RoundRobin {
pub fn new(config: &Config) -> RoundRobin {
RoundRobin {
servers: config.server.iter().map(|s| Arc::new(s.clone())).collect(),
index: 0usize,

View File

@@ -53,20 +53,14 @@ use config::Config;
/// RelayLocal::new(config).run();
/// ```
#[derive(Clone)]
pub struct RelayLocal {
config: Config,
}
pub struct RelayLocal;
impl RelayLocal {
pub fn new(config: Config) -> RelayLocal {
RelayLocal { config: config }
}
pub fn run(self) -> io::Result<()> {
pub fn run(config: Config) -> io::Result<()> {
let mut lp = try!(Core::new());
let handle = lp.handle();
let config = Arc::new(self.config);
let tcp_fut = TcpRelayLocal::new(config).run(handle.clone());
let config = Arc::new(config);
let tcp_fut = TcpRelayLocal::run(config, handle);
lp.run(tcp_fut)
}
}

View File

@@ -24,6 +24,10 @@
pub use self::local::RelayLocal;
pub use self::server::RelayServer;
use std::io;
use futures::BoxFuture;
mod tcprelay;
#[cfg(feature = "enable-udp")]
mod udprelay;
@@ -31,3 +35,5 @@ pub mod local;
pub mod server;
mod loadbalancing;
pub mod socks5;
pub type BoxIoFuture<T> = BoxFuture<T, io::Error>;

View File

@@ -53,20 +53,14 @@ use config::Config;
/// ```
///
#[derive(Clone)]
pub struct RelayServer {
config: Config,
}
pub struct RelayServer;
impl RelayServer {
pub fn new(config: Config) -> RelayServer {
RelayServer { config: config }
}
pub fn run(self, threads: usize) -> io::Result<()> {
pub fn run(config: Config, threads: usize) -> io::Result<()> {
let mut lp = try!(Core::new());
let handle = lp.handle();
let config = Arc::new(self.config);
let tcp_fut = TcpRelayServer::new(config, threads).run(handle.clone());
let config = Arc::new(config);
let tcp_fut = TcpRelayServer::run(config, handle, threads);
lp.run(tcp_fut)
}
}

View File

@@ -42,25 +42,20 @@ use relay::socks5::{self, HandshakeRequest, HandshakeResponse, Address};
use relay::socks5::{TcpRequestHeader, TcpResponseHeader};
use relay::loadbalancing::server::RoundRobin;
use relay::loadbalancing::server::LoadBalancer;
use relay::BoxIoFuture;
use super::http::{self, HttpRequestFut};
use super::{BoxIoFuture, tunnel};
use super::tunnel;
/// TCP relay local server
pub struct TcpRelayLocal {
config: Arc<Config>,
}
pub struct TcpRelayLocal;
impl TcpRelayLocal {
pub fn new(config: Arc<Config>) -> TcpRelayLocal {
TcpRelayLocal { config: config }
}
pub fn run(self, handle: Handle) -> Box<Future<Item = (), Error = io::Error>> {
let tcp_fut = Socks5RelayLocal::new(self.config.clone()).run(handle.clone());
match &self.config.http_proxy {
pub fn run(config: Arc<Config>, handle: Handle) -> Box<Future<Item = (), Error = io::Error>> {
let tcp_fut = Socks5RelayLocal::run(config.clone(), handle.clone());
match &config.http_proxy {
&Some(..) => {
let http_fut = HttpRelayServer::new(self.config.clone()).run(handle);
let http_fut = HttpRelayServer::run(config, handle);
Box::new(tcp_fut.join(http_fut)
.map(|_| ()))
}
@@ -70,15 +65,9 @@ impl TcpRelayLocal {
}
/// Socks5 local server
pub struct Socks5RelayLocal {
config: Arc<Config>,
}
pub struct Socks5RelayLocal;
impl Socks5RelayLocal {
pub fn new(config: Arc<Config>) -> Socks5RelayLocal {
Socks5RelayLocal { config: config }
}
fn handle_socks5_connect(handle: &Handle,
(r, w): (ReadHalf<TcpStream>, WriteHalf<TcpStream>),
client_addr: SocketAddr,
@@ -196,15 +185,15 @@ impl Socks5RelayLocal {
}
// Runs TCP relay local server
pub fn run(self, handle: Handle) -> Box<Future<Item = (), Error = io::Error>> {
pub fn run(config: Arc<Config>, handle: Handle) -> Box<Future<Item = (), Error = io::Error>> {
let listener = {
let local_addr = self.config.local.as_ref().unwrap();
let local_addr = config.local.as_ref().unwrap();
let listener = TcpListener::bind(local_addr, &handle).unwrap();
info!("ShadowSocks TCP Listening on {}", local_addr);
listener
};
let mut servers = RoundRobin::new(self.config);
let mut servers = RoundRobin::new(&*config);
let listening = listener.incoming()
.for_each(move |(socket, addr)| {
let server_cfg = servers.pick_server();
@@ -221,15 +210,9 @@ impl Socks5RelayLocal {
}
/// HTTP local server
pub struct HttpRelayServer {
config: Arc<Config>,
}
pub struct HttpRelayServer;
impl HttpRelayServer {
pub fn new(config: Arc<Config>) -> HttpRelayServer {
HttpRelayServer { config: config }
}
fn handle_connect(handle: Handle,
(r, w): (ReadHalf<TcpStream>, WriteHalf<TcpStream>),
req: http::HttpRequest,
@@ -410,15 +393,15 @@ impl HttpRelayServer {
Ok(())
}
pub fn run(self, handle: Handle) -> Box<Future<Item = (), Error = io::Error>> {
pub fn run(config: Arc<Config>, handle: Handle) -> Box<Future<Item = (), Error = io::Error>> {
let listener = {
let local_addr = self.config.http_proxy.as_ref().unwrap();
let local_addr = config.http_proxy.as_ref().unwrap();
let listener = TcpListener::bind(local_addr, &handle).unwrap();
info!("ShadowSocks HTTP Listening on {}", local_addr);
listener
};
let mut servers = RoundRobin::new(self.config);
let mut servers = RoundRobin::new(&*config);
let listening = listener.incoming()
.for_each(move |(socket, addr)| {
let server_cfg = servers.pick_server();

View File

@@ -28,6 +28,7 @@ use std::mem;
use crypto::cipher;
use crypto::CryptoMode;
use relay::socks5::Address;
use relay::BoxIoFuture;
use config::ServerConfig;
use tokio_core::net::TcpStream;
@@ -40,10 +41,6 @@ use futures::{self, Future, BoxFuture, Poll};
use self::stream::{EncryptedWriter, DecryptedReader};
// use coio::net::TcpStream;
// use self::stream::{DecryptedReader, EncryptedWriter};
// mod cached_dns;
pub mod local;
pub mod server;
@@ -62,8 +59,6 @@ pub type EncryptedHalf = EncryptedWriter<WriteHalf<TcpStream>>;
pub type DecryptedHalfFut = BoxFuture<DecryptedHalf, io::Error>;
pub type EncryptedHalfFut = BoxFuture<EncryptedHalf, io::Error>;
pub type BoxIoFuture<T> = BoxFuture<T, io::Error>;
fn connect_proxy_server(handle: &Handle, svr_cfg: Arc<ServerConfig>) -> BoxIoFuture<TcpStream> {
TcpStream::connect(&svr_cfg.addr, handle).boxed()
}

View File

@@ -29,8 +29,9 @@ use std::collections::HashSet;
use config::{Config, ServerConfig};
use relay::socks5::Address;
use relay::BoxIoFuture;
use futures::{self, Future, BoxFuture};
use futures::{self, Future};
use futures::stream::Stream;
use futures_cpupool::CpuPool;
@@ -45,22 +46,9 @@ use ip::IpAddr;
use super::{tunnel, proxy_handshake, DecryptedHalf, EncryptedHalfFut};
/// TCP Relay backend
pub struct TcpRelayServer {
config: Arc<Config>,
cpu_pool: CpuPool,
}
type BoxIoFuture<T> = BoxFuture<T, io::Error>;
pub struct TcpRelayServer;
impl TcpRelayServer {
/// Creates an instance
pub fn new(config: Arc<Config>, threads: usize) -> TcpRelayServer {
TcpRelayServer {
config: config,
cpu_pool: CpuPool::new(threads),
}
}
fn handshake(remote_stream: TcpStream,
svr_cfg: Arc<ServerConfig>)
-> BoxIoFuture<(DecryptedHalf, Address, EncryptedHalfFut)> {
@@ -155,13 +143,15 @@ impl TcpRelayServer {
}
/// Runs the server
pub fn run(self, handle: Handle) -> Box<Future<Item = (), Error = io::Error>> {
pub fn run(config: Arc<Config>, handle: Handle, threads: usize) -> Box<Future<Item = (), Error = io::Error>> {
let cpu_pool = CpuPool::new(threads);
let mut fut: Option<Box<Future<Item = (), Error = io::Error>>> = None;
let ref forbidden_ip = self.config.forbidden_ip;
let ref forbidden_ip = config.forbidden_ip;
let forbidden_ip = Arc::new(forbidden_ip.clone());
for svr_cfg in &self.config.server {
for svr_cfg in &config.server {
let listener = {
let addr = &svr_cfg.addr;
let listener = TcpListener::bind(addr, &handle).unwrap();
@@ -171,7 +161,7 @@ impl TcpRelayServer {
let svr_cfg = Arc::new(svr_cfg.clone());
let handle = handle.clone();
let cpu_pool = self.cpu_pool.clone();
let cpu_pool = cpu_pool.clone();
let forbidden_ip = forbidden_ip.clone();
let listening = listener.incoming()
.for_each(move |(socket, addr)| {

View File

@@ -59,9 +59,7 @@ use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6, lookup_host};
use std::collections::HashMap;
use std::io::{self, BufReader};
use coio::Scheduler;
use coio::net::UdpSocket;
use futures::Future;
use lru_cache::LruCache;
use crypto::{cipher, CryptoMode};
@@ -70,250 +68,17 @@ use config::{Config, ServerConfig};
use relay::socks5;
use relay::loadbalancing::server::{LoadBalancer, RoundRobin};
use relay::udprelay::UDP_RELAY_LOCAL_LRU_CACHE_CAPACITY;
use relay::BoxIoFuture;
#[derive(Clone)]
pub struct UdpRelayLocal {
config: Config,
}
pub struct UdpRelayLocal;
impl UdpRelayLocal {
pub fn new(config: Config) -> UdpRelayLocal {
UdpRelayLocal { config: config }
}
}
impl UdpRelayLocal {
pub fn run(&self) {
let addr = match self.config.local {
Some(addr) => addr,
None => {
error!("Local configuration should not be None");
return;
}
};
let mut server_load_balancer = RoundRobin::new(self.config.server.clone());
let (server_set, server_addr) = {
let mut server_set = HashMap::new();
let mut server_addr = HashMap::new();
for s in &self.config.server {
let addrs = match lookup_host(&s.addr[..]) {
Ok(addr) => addr,
Err(..) => continue,
};
for addr in addrs {
let addr = match addr {
SocketAddr::V4(v4) => SocketAddr::V4(SocketAddrV4::new(*v4.ip(), s.port)),
SocketAddr::V6(v6) => {
SocketAddr::V6(SocketAddrV6::new(*v6.ip(),
s.port,
v6.flowinfo(),
v6.scope_id()))
}
};
if self.config.forbidden_ip.contains(&::relay::take_ip_addr(&addr)) {
info!("{} is in `forbidden_ip` list, skipping", addr);
continue;
}
server_set.insert(addr.clone(), s.clone());
server_addr.insert(s.addr.clone(), addr);
}
}
(server_set, server_addr)
};
let client_map_arc =
Arc::new(Mutex::new(LruCache::<socks5::Address,
SocketAddr>::new(UDP_RELAY_LOCAL_LRU_CACHE_CAPACITY)));
let socket = match UdpSocket::bind(&addr) {
Ok(sk) => sk,
Err(err) => {
error!("Failed to bind udp socket: {:?}", err);
return;
}
};
let mut buf = [0u8; 0xffff];
loop {
match socket.recv_from(&mut buf) {
Ok((len, source_addr)) => {
if len < 4 {
error!("UDP request is too short");
continue;
}
let request_message = buf[..len].to_vec();
let move_socket = socket.try_clone().unwrap();
let client_map = client_map_arc.clone();
debug!("Received UDP packet, source {:?}", source_addr);
match server_set.get(&source_addr) {
Some(sref) => {
let s = sref.clone();
Scheduler::spawn(move || {
handle_response(move_socket,
&request_message[..],
source_addr,
&s,
client_map)
});
}
None => {
let s = server_load_balancer.pick_server().clone();
debug!("UDP associate picked a server: {:?}", s);
match server_addr.get(&s.addr) {
Some(saddr) => {
let saddr = saddr.clone();
Scheduler::spawn(move || {
handle_request(move_socket,
&request_message[..],
source_addr,
saddr,
&s,
client_map)
});
}
None => {}
}
}
}
}
Err(err) => {
error!("Failed in UDP recv_from: {}", err);
break;
}
}
}
}
}
fn handle_request(socket: UdpSocket,
request_message: &[u8],
from_addr: SocketAddr,
server_addr: SocketAddr,
config: &ServerConfig,
client_map: Arc<Mutex<LruCache<socks5::Address, SocketAddr>>>) {
// According to RFC 1928
//
// Implementation of fragmentation is optional; an implementation that
// does not support fragmentation MUST drop any datagram whose FRAG
// field is other than X'00'.
if request_message[2] != 0x00u8 {
// Drop it
warn!("Does not support fragmentation");
return;
}
let mut bufr = BufReader::new(request_message);
let request = match socks5::UdpAssociateHeader::read_from(&mut bufr) {
Ok(r) => r,
Err(err) => {
error!("Error occurs while reading UdpAssociateHeader: {:?}", err);
return;
}
};
let addr = request.address.clone();
info!("UDP ASSOCIATE {}", addr);
debug!("UDP associate {} <-> {}", addr, from_addr);
client_map.lock().unwrap().insert(addr, from_addr);
let key = config.method.bytes_to_key(config.password.as_bytes());
let mut iv = config.method.gen_init_vec();
let mut encryptor = cipher::with_type(config.method, &key[..], &iv[..], CryptoMode::Encrypt);
let mut wbuf = Vec::new();
if let Err(err) = request.write_to(&mut wbuf) {
error!("Error occurs while writing request: {:?}", err);
return;
}
if let Err(err) = io::copy(&mut bufr, &mut wbuf) {
error!("Error occurs while copying from bufr to wbuf: {:?}", err);
return;
}
if let Err(err) = encryptor.update(&wbuf[..], &mut iv) {
error!("Error occurs while encrypting: {:?}", err);
return;
}
if let Err(err) = encryptor.finalize(&mut iv) {
error!("Error occurs while finalizing: {:?}", err);
return;
}
if let Err(err) = socket.send_to(&iv[..], &server_addr) {
error!("Error occurs while sending to remote: {:?}", err);
}
debug!("Sent packet:{:?} to {:?}", iv, server_addr);
}
fn handle_response(socket: UdpSocket,
response_message: &[u8],
from_addr: SocketAddr,
config: &ServerConfig,
client_map: Arc<Mutex<LruCache<socks5::Address, SocketAddr>>>) {
let key = config.method.bytes_to_key(config.password.as_bytes());
let mut decryptor = cipher::with_type(config.method,
&key[..],
&response_message[0..config.method.block_size()],
CryptoMode::Decrypt);
let mut decrypted_data = Vec::new();
if let Err(err) = decryptor.update(&response_message[config.method.block_size()..],
&mut decrypted_data) {
error!("Error occurs while decrypting data: {:?}", err);
return;
}
if let Err(err) = decryptor.finalize(&mut decrypted_data) {
error!("Error occurs while finalizing decrypt: {:?}", err);
return;
}
let mut bufr = BufReader::new(&decrypted_data[..]);
let addr = match socks5::Address::read_from(&mut bufr) {
Ok(addr) => addr,
Err(err) => {
error!("Error occurs while reading address: {:?}", err);
return;
}
};
let client_addr = {
let mut cmap = client_map.lock().unwrap();
match cmap.get_mut(&addr) {
Some(a) => a.clone(),
None => return,
}
};
debug!("UDP response {} -> {}", from_addr, client_addr);
let mut bufw = Vec::new();
if let Err(err) = socks5::UdpAssociateHeader::new(0, addr).write_to(&mut bufw) {
error!("Error occurs while writing UdpAssociateHeader: {:?}", err);
return;
}
if let Err(err) = io::copy(&mut bufr, &mut bufw) {
error!("Error occurs while copying from bufr to bufw: {:?}", err);
return;
}
if let Err(err) = socket.send_to(&bufw[..], &client_addr) {
error!("Error occurs while sending to local: {:?}", err);
pub fn run(config: Config) -> BoxIoFuture<()> {
let config = Arc::new(config);
let local_addr = config.local.as_ref().unwrap();
let mut servers = RoundRobin::new(&*config);
}
}