memory optimization

This commit is contained in:
Y. T. Chung
2016-11-07 03:19:14 +08:00
parent bf01073a14
commit 34a70a7dc7
5 changed files with 92 additions and 102 deletions

View File

@@ -474,13 +474,12 @@ fn handle_connect(handle: Handle,
.map(|w| (svr_s, w))
})
.and_then(move |(svr_s, w)| {
super::proxy_server_handshake(svr_s, cloned_svr_cfg, addr).and_then(move |(svr_r, svr_w)| {
let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w));
let whalf = svr_w.and_then(move |svr_w| svr_w.write_all_encrypted(remains))
.and_then(move |(svr_w, _)| svr_w.copy_from_encrypted(r));
let (svr_r, svr_w) = super::proxy_server_handshake(svr_s, cloned_svr_cfg, addr);
let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w));
let whalf = svr_w.and_then(move |svr_w| svr_w.write_all_encrypted(remains))
.and_then(move |(svr_w, _)| svr_w.copy_from_encrypted(r));
tunnel(cloned_addr, whalf, rhalf)
})
tunnel(cloned_addr, whalf, rhalf)
});
Box::new(fut)
@@ -535,28 +534,28 @@ fn handle_http_proxy(handle: Handle,
trace!("Proxy server connected");
let cloned_addr = addr.clone();
super::proxy_server_handshake(svr_s, svr_cfg, addr).and_then(move |(svr_r, svr_w)| {
// Just proxy anything to client
let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w));
let whalf = svr_w.and_then(move |svr_w| {
// Send the first request to server
trace!("Going to proxy request: {:?}", req);
trace!("Should keep alive? {}", should_keep_alive);
proxy_request_encrypted((r, svr_w), None, req, remains).and_then(move |(r, svr_w, req_remains)| {
if should_keep_alive {
handle_http_keepalive(r, svr_w, req_remains)
} else {
futures::finished(()).boxed()
}
})
});
let (svr_r, svr_w) = super::proxy_server_handshake(svr_s, svr_cfg, addr);
rhalf.join(whalf)
.then(move |_| {
trace!("Relay to {} is finished", cloned_addr);
Ok(())
})
})
// Just proxy anything to client
let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w));
let whalf = svr_w.and_then(move |svr_w| {
// Send the first request to server
trace!("Going to proxy request: {:?}", req);
trace!("Should keep alive? {}", should_keep_alive);
proxy_request_encrypted((r, svr_w), None, req, remains).and_then(move |(r, svr_w, req_remains)| {
if should_keep_alive {
handle_http_keepalive(r, svr_w, req_remains)
} else {
futures::finished(()).boxed()
}
})
});
rhalf.join(whalf)
.then(move |_| {
trace!("Relay to {} is finished", cloned_addr);
Ok(())
})
});
Box::new(fut)

View File

@@ -36,11 +36,11 @@ use config::{ServerConfig, ServerAddr};
use tokio_core::net::TcpStream;
use tokio_core::reactor::Handle;
use tokio_core::io::{read_exact, write_all, flush};
use tokio_core::io::{read_exact, write_all};
use tokio_core::io::{ReadHalf, WriteHalf};
use tokio_core::io::Io;
use futures::{self, Future, Poll};
use futures::{Future, Poll};
use net2::TcpBuilder;
@@ -76,7 +76,7 @@ pub type EncryptedHalfFut = BoxIoFuture<EncryptedHalf>;
fn connect_proxy_server(handle: &Handle, svr_cfg: Rc<ServerConfig>) -> BoxIoFuture<TcpStream> {
match svr_cfg.addr() {
&ServerAddr::SocketAddr(ref addr) => Box::new(TcpStream::connect(addr, handle)),
&ServerAddr::SocketAddr(ref addr) => TcpStream::connect(addr, handle).boxed(),
&ServerAddr::DomainName(ref domain, port) => {
let handle = handle.clone();
let fut = DnsResolver::get_instance()
@@ -86,7 +86,7 @@ fn connect_proxy_server(handle: &Handle, svr_cfg: Rc<ServerConfig>) -> BoxIoFutu
IpAddr::V4(v4) => SocketAddr::V4(SocketAddrV4::new(v4, port)),
IpAddr::V6(v6) => SocketAddr::V6(SocketAddrV6::new(v6, port, 0, 0)),
};
TcpStream::connect(&sockaddr, &handle).boxed()
TcpStream::connect(&sockaddr, &handle)
});
Box::new(fut)
}
@@ -97,73 +97,66 @@ fn connect_proxy_server(handle: &Handle, svr_cfg: Rc<ServerConfig>) -> BoxIoFutu
pub fn proxy_server_handshake(remote_stream: TcpStream,
svr_cfg: Rc<ServerConfig>,
relay_addr: Address)
-> BoxIoFuture<(DecryptedHalfFut, EncryptedHalfFut)> {
let fut = proxy_handshake(remote_stream, svr_cfg).and_then(|(r_fut, w_fut)| {
let w_fut = w_fut.and_then(move |enc_w| {
trace!("Got encrypt stream and going to send addr: {:?}",
relay_addr);
-> (DecryptedHalfFut, EncryptedHalfFut) {
let (r_fut, w_fut) = proxy_handshake(remote_stream, svr_cfg);
let w_fut = w_fut.and_then(move |enc_w| {
trace!("Got encrypt stream and going to send addr: {:?}",
relay_addr);
// Send relay address to remote
let local_buf = Vec::new();
relay_addr.write_to(local_buf)
.and_then(|buf| enc_w.write_all_encrypted(buf))
.and_then(|(enc_w, _)| flush(enc_w))
});
Ok((r_fut, boxed_future(w_fut)))
// Send relay address to remote
let local_buf = Vec::new();
relay_addr.write_to(local_buf)
.and_then(|buf| enc_w.write_all_encrypted(buf))
.map(|(w, _)| w)
});
Box::new(fut)
(r_fut, boxed_future(w_fut))
}
/// ShadowSocks Client-Server handshake protocol
/// Exchange cipher IV and creates stream wrapper
pub fn proxy_handshake(remote_stream: TcpStream,
svr_cfg: Rc<ServerConfig>)
-> BoxIoFuture<(DecryptedHalfFut, EncryptedHalfFut)> {
let fut = futures::lazy(move || {
let (r, w) = remote_stream.split();
pub fn proxy_handshake(remote_stream: TcpStream, svr_cfg: Rc<ServerConfig>) -> (DecryptedHalfFut, EncryptedHalfFut) {
let (r, w) = remote_stream.split();
let svr_cfg_cloned = svr_cfg.clone();
let svr_cfg_cloned = svr_cfg.clone();
let enc = futures::lazy(move || {
// Encrypt data to remote server
let enc = {
// Encrypt data to remote server
// Send initialize vector to remote and create encryptor
// Send initialize vector to remote and create encryptor
let local_iv = svr_cfg.method().gen_init_vec();
trace!("Going to send initialize vector: {:?}", local_iv);
let local_iv = svr_cfg.method().gen_init_vec();
trace!("Going to send initialize vector: {:?}", local_iv);
write_all(w, local_iv).and_then(move |(w, local_iv)| {
let encryptor = cipher::with_type(svr_cfg.method(),
svr_cfg.key(),
&local_iv[..],
CryptoMode::Encrypt);
write_all(w, local_iv).and_then(move |(w, local_iv)| {
let encryptor = cipher::with_type(svr_cfg.method(),
svr_cfg.key(),
&local_iv[..],
CryptoMode::Encrypt);
Ok(EncryptedWriter::new(w, encryptor))
})
Ok(EncryptedWriter::new(w, encryptor))
})
};
});
let dec = {
let svr_cfg = svr_cfg_cloned;
let dec = futures::lazy(move || {
let svr_cfg = svr_cfg_cloned;
// Decrypt data from remote server
let iv_len = svr_cfg.method().iv_size();
read_exact(r, vec![0u8; iv_len]).and_then(move |(r, remote_iv)| {
trace!("Got initialize vector {:?}", remote_iv);
// Decrypt data from remote server
let iv_len = svr_cfg.method().iv_size();
read_exact(r, vec![0u8; iv_len]).and_then(move |(r, remote_iv)| {
trace!("Got initialize vector {:?}", remote_iv);
let decryptor = cipher::with_type(svr_cfg.method(),
svr_cfg.key(),
&remote_iv[..],
CryptoMode::Decrypt);
let decrypt_stream = DecryptedReader::new(r, decryptor);
let decryptor = cipher::with_type(svr_cfg.method(),
svr_cfg.key(),
&remote_iv[..],
CryptoMode::Decrypt);
let decrypt_stream = DecryptedReader::new(r, decryptor);
Ok(decrypt_stream)
})
};
Ok(decrypt_stream)
})
});
Ok((boxed_future(dec), boxed_future(enc)))
});
Box::new(fut)
(boxed_future(dec), boxed_future(enc))
}
/// Copy exactly N bytes by encryption

View File

@@ -29,7 +29,7 @@ use std::collections::HashSet;
use config::{Config, ServerConfig};
use relay::socks5::Address;
use relay::BoxIoFuture;
use relay::{BoxIoFuture, boxed_future};
use relay::dns_resolver::DnsResolver;
use futures::{self, Future};
@@ -59,19 +59,18 @@ impl TcpRelayClientHandshake {
pub fn handshake(self) -> BoxIoFuture<TcpRelayClientPending> {
let TcpRelayClientHandshake { handle, forbidden_ip, s, svr_cfg } = self;
let fut = proxy_handshake(s, svr_cfg).and_then(|(r_fut, w_fut)| {
r_fut.and_then(|r| Address::read_from(r).map_err(From::from))
.map(move |(r, addr)| {
TcpRelayClientPending {
handle: handle,
r: r,
addr: addr,
w: w_fut,
forbidden_ip: forbidden_ip,
}
})
});
Box::new(fut)
let (r_fut, w_fut) = proxy_handshake(s, svr_cfg);
let fut = r_fut.and_then(|r| Address::read_from(r).map_err(From::from))
.map(move |(r, addr)| {
TcpRelayClientPending {
handle: handle,
r: r,
addr: addr,
w: w_fut,
forbidden_ip: forbidden_ip,
}
});
boxed_future(fut)
}
}

View File

@@ -74,12 +74,11 @@ fn handle_socks5_connect(handle: &Handle,
.map(move |w| (svr_s, w))
})
.and_then(move |(svr_s, w)| {
super::proxy_server_handshake(svr_s, cloned_svr_cfg, addr).and_then(move |(svr_r, svr_w)| {
let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w));
let whalf = svr_w.and_then(move |svr_w| svr_w.copy_from_encrypted(r));
let (svr_r, svr_w) = super::proxy_server_handshake(svr_s, cloned_svr_cfg, addr);
let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w));
let whalf = svr_w.and_then(move |svr_w| svr_w.copy_from_encrypted(r));
tunnel(cloned_addr, whalf, rhalf)
})
tunnel(cloned_addr, whalf, rhalf)
});
Box::new(fut)

View File

@@ -40,7 +40,7 @@ pub struct DecryptedReader<R>
sent_final: bool,
}
const BUFFER_SIZE: usize = 2048;
const BUFFER_SIZE: usize = 4096;
impl<R> DecryptedReader<R>
where R: Read
@@ -275,7 +275,7 @@ impl<R: Read, W: Write> Future for EncryptedCopy<R, W> {
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut local_buf = [0u8; 2048];
let mut local_buf = [0u8; BUFFER_SIZE];
loop {
// If our buffer is empty, then we need to read some data to
// continue.