should not split stream in different task

This commit is contained in:
Y. T. Chung
2016-11-07 22:35:01 +08:00
parent 34a70a7dc7
commit 7c86bcd281
4 changed files with 96 additions and 84 deletions

View File

@@ -474,12 +474,13 @@ fn handle_connect(handle: Handle,
.map(|w| (svr_s, w))
})
.and_then(move |(svr_s, w)| {
let (svr_r, svr_w) = super::proxy_server_handshake(svr_s, cloned_svr_cfg, addr);
let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w));
let whalf = svr_w.and_then(move |svr_w| svr_w.write_all_encrypted(remains))
.and_then(move |(svr_w, _)| svr_w.copy_from_encrypted(r));
super::proxy_server_handshake(svr_s, cloned_svr_cfg, addr).and_then(|(svr_r, svr_w)| {
let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w));
let whalf = svr_w.and_then(move |svr_w| svr_w.write_all_encrypted(remains))
.and_then(move |(svr_w, _)| svr_w.copy_from_encrypted(r));
tunnel(cloned_addr, whalf, rhalf)
tunnel(cloned_addr, whalf, rhalf)
})
});
Box::new(fut)
@@ -534,28 +535,29 @@ fn handle_http_proxy(handle: Handle,
trace!("Proxy server connected");
let cloned_addr = addr.clone();
let (svr_r, svr_w) = super::proxy_server_handshake(svr_s, svr_cfg, addr);
super::proxy_server_handshake(svr_s, svr_cfg, addr).and_then(move |(svr_r, svr_w)| {
// Just proxy anything to client
let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w));
let whalf = svr_w.and_then(move |svr_w| {
// Send the first request to server
trace!("Going to proxy request: {:?}", req);
trace!("Should keep alive? {}", should_keep_alive);
proxy_request_encrypted((r, svr_w), None, req, remains).and_then(move |(r, svr_w, req_remains)| {
if should_keep_alive {
handle_http_keepalive(r, svr_w, req_remains)
} else {
futures::finished(()).boxed()
}
})
});
// Just proxy anything to client
let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w));
let whalf = svr_w.and_then(move |svr_w| {
// Send the first request to server
trace!("Going to proxy request: {:?}", req);
trace!("Should keep alive? {}", should_keep_alive);
proxy_request_encrypted((r, svr_w), None, req, remains).and_then(move |(r, svr_w, req_remains)| {
if should_keep_alive {
handle_http_keepalive(r, svr_w, req_remains)
} else {
futures::finished(()).boxed()
}
})
});
rhalf.join(whalf)
.then(move |_| {
trace!("Relay to {} is finished", cloned_addr);
Ok(())
})
rhalf.join(whalf)
.then(move |_| {
trace!("Relay to {} is finished", cloned_addr);
Ok(())
})
})
});
Box::new(fut)

View File

@@ -40,7 +40,7 @@ use tokio_core::io::{read_exact, write_all};
use tokio_core::io::{ReadHalf, WriteHalf};
use tokio_core::io::Io;
use futures::{Future, Poll};
use futures::{self, Future, Poll};
use net2::TcpBuilder;
@@ -97,66 +97,74 @@ fn connect_proxy_server(handle: &Handle, svr_cfg: Rc<ServerConfig>) -> BoxIoFutu
pub fn proxy_server_handshake(remote_stream: TcpStream,
svr_cfg: Rc<ServerConfig>,
relay_addr: Address)
-> (DecryptedHalfFut, EncryptedHalfFut) {
let (r_fut, w_fut) = proxy_handshake(remote_stream, svr_cfg);
let w_fut = w_fut.and_then(move |enc_w| {
trace!("Got encrypt stream and going to send addr: {:?}",
relay_addr);
-> BoxIoFuture<(DecryptedHalfFut, EncryptedHalfFut)> {
let fut = proxy_handshake(remote_stream, svr_cfg).and_then(|(r_fut, w_fut)| {;
let w_fut = w_fut.and_then(move |enc_w| {
trace!("Got encrypt stream and going to send addr: {:?}",
relay_addr);
// Send relay address to remote
let local_buf = Vec::new();
relay_addr.write_to(local_buf)
.and_then(|buf| enc_w.write_all_encrypted(buf))
.map(|(w, _)| w)
// Send relay address to remote
let local_buf = Vec::new();
relay_addr.write_to(local_buf)
.and_then(|buf| enc_w.write_all_encrypted(buf))
.map(|(w, _)| w)
});
Ok((r_fut, boxed_future(w_fut)))
});
(r_fut, boxed_future(w_fut))
boxed_future(fut)
}
/// ShadowSocks Client-Server handshake protocol
/// Exchange cipher IV and creates stream wrapper
pub fn proxy_handshake(remote_stream: TcpStream, svr_cfg: Rc<ServerConfig>) -> (DecryptedHalfFut, EncryptedHalfFut) {
let (r, w) = remote_stream.split();
pub fn proxy_handshake(remote_stream: TcpStream,
svr_cfg: Rc<ServerConfig>)
-> BoxIoFuture<(DecryptedHalfFut, EncryptedHalfFut)> {
let fut = futures::lazy(|| Ok(remote_stream.split())).and_then(|(r, w)| {
let svr_cfg_cloned = svr_cfg.clone();
let enc = {
// Encrypt data to remote server
let svr_cfg_cloned = svr_cfg.clone();
// Send initialize vector to remote and create encryptor
let enc = {
// Encrypt data to remote server
let local_iv = svr_cfg.method().gen_init_vec();
trace!("Going to send initialize vector: {:?}", local_iv);
// Send initialize vector to remote and create encryptor
write_all(w, local_iv).and_then(move |(w, local_iv)| {
let encryptor = cipher::with_type(svr_cfg.method(),
svr_cfg.key(),
&local_iv[..],
CryptoMode::Encrypt);
let local_iv = svr_cfg.method().gen_init_vec();
trace!("Going to send initialize vector: {:?}", local_iv);
Ok(EncryptedWriter::new(w, encryptor))
})
};
write_all(w, local_iv).and_then(move |(w, local_iv)| {
let encryptor = cipher::with_type(svr_cfg.method(),
svr_cfg.key(),
&local_iv[..],
CryptoMode::Encrypt);
let dec = {
let svr_cfg = svr_cfg_cloned;
Ok(EncryptedWriter::new(w, encryptor))
})
};
// Decrypt data from remote server
let iv_len = svr_cfg.method().iv_size();
read_exact(r, vec![0u8; iv_len]).and_then(move |(r, remote_iv)| {
trace!("Got initialize vector {:?}", remote_iv);
let dec = {
let svr_cfg = svr_cfg_cloned;
let decryptor = cipher::with_type(svr_cfg.method(),
svr_cfg.key(),
&remote_iv[..],
CryptoMode::Decrypt);
let decrypt_stream = DecryptedReader::new(r, decryptor);
// Decrypt data from remote server
let iv_len = svr_cfg.method().iv_size();
read_exact(r, vec![0u8; iv_len]).and_then(move |(r, remote_iv)| {
trace!("Got initialize vector {:?}", remote_iv);
Ok(decrypt_stream)
})
};
let decryptor = cipher::with_type(svr_cfg.method(),
svr_cfg.key(),
&remote_iv[..],
CryptoMode::Decrypt);
let decrypt_stream = DecryptedReader::new(r, decryptor);
(boxed_future(dec), boxed_future(enc))
Ok(decrypt_stream)
})
};
Ok((boxed_future(dec), boxed_future(enc)))
});
boxed_future(fut)
}
/// Copy exactly N bytes by encryption

View File

@@ -59,17 +59,18 @@ impl TcpRelayClientHandshake {
pub fn handshake(self) -> BoxIoFuture<TcpRelayClientPending> {
let TcpRelayClientHandshake { handle, forbidden_ip, s, svr_cfg } = self;
let (r_fut, w_fut) = proxy_handshake(s, svr_cfg);
let fut = r_fut.and_then(|r| Address::read_from(r).map_err(From::from))
.map(move |(r, addr)| {
TcpRelayClientPending {
handle: handle,
r: r,
addr: addr,
w: w_fut,
forbidden_ip: forbidden_ip,
}
});
let fut = proxy_handshake(s, svr_cfg).and_then(|(r_fut, w_fut)| {
r_fut.and_then(|r| Address::read_from(r).map_err(From::from))
.map(move |(r, addr)| {
TcpRelayClientPending {
handle: handle,
r: r,
addr: addr,
w: w_fut,
forbidden_ip: forbidden_ip,
}
})
});
boxed_future(fut)
}
}

View File

@@ -74,11 +74,12 @@ fn handle_socks5_connect(handle: &Handle,
.map(move |w| (svr_s, w))
})
.and_then(move |(svr_s, w)| {
let (svr_r, svr_w) = super::proxy_server_handshake(svr_s, cloned_svr_cfg, addr);
let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w));
let whalf = svr_w.and_then(move |svr_w| svr_w.copy_from_encrypted(r));
super::proxy_server_handshake(svr_s, cloned_svr_cfg, addr).and_then(|(svr_r, svr_w)| {
let rhalf = svr_r.and_then(move |svr_r| copy(svr_r, w));
let whalf = svr_w.and_then(move |svr_w| svr_w.copy_from_encrypted(r));
tunnel(cloned_addr, whalf, rhalf)
tunnel(cloned_addr, whalf, rhalf)
})
});
Box::new(fut)