optimize code structure

This commit is contained in:
Y. T. Chung
2016-11-01 22:56:09 +08:00
parent 0b9e302ba8
commit 5e5194014b
4 changed files with 330 additions and 265 deletions

View File

@@ -1,5 +1,7 @@
language: rust
rust:
- stable
- beta
- nightly
cache: cargo
@@ -12,5 +14,4 @@ addons:
- libssl-dev
script:
- cargo build -v
- cargo test -v
- cargo test -v --no-fail-fast

View File

@@ -25,9 +25,9 @@
//!
//! ## Usage
//!
//! Build shadowsocks and you will got at least 2 binaries: `sslocal` and `ssserver`
//! Build shadowsocks and you will get at least 2 binaries: `sslocal` and `ssserver`
//!
//! Write your servers in a configuration file, which is defined in
//! Write your servers in a configuration file. Format is defined in
//! [shadowsocks' documentation](https://github.com/shadowsocks/shadowsocks/wiki)
//!
//! For example:

View File

@@ -93,181 +93,12 @@ struct Client {
}
impl Client {
fn handle_once(self) -> BoxIoFuture<Client> {
let Client { assoc, server_picker, servers, dns_resolver, socket } = self;
let fut = recv_from(socket, vec![0u8; MAXIMUM_UDP_PAYLOAD_SIZE])
.and_then(move |(socket, buf, n, src)| {
let buf = &buf[..n];
let cloned_servers = servers.clone();
let cloned_assoc = assoc.clone();
let fut = match servers.borrow_mut().get_mut(&src) {
Some(svr_cfg) => {
// Proxy -> Client
trace!("Got packet from server {}, length {}",
svr_cfg.addr(),
buf.len());
let iv_len = svr_cfg.method().iv_size();
if buf.len() < iv_len {
error!("Invalid ShadowSocks UDP packet, expected IV length {}, packet length {}",
iv_len,
buf.len());
let err = io::Error::new(io::ErrorKind::Other, "early eof");
return Err(err);
}
let iv = &buf[..iv_len];
let mut cipher = cipher::with_type(svr_cfg.method(), svr_cfg.key(), iv, CryptoMode::Decrypt);
let mut payload = Vec::with_capacity(buf.len());
try!(cipher.update(&buf[iv_len..], &mut payload));
try!(cipher.finalize(&mut payload));
let reader = Cursor::new(payload);
let fut = Address::read_from(reader).map_err(From::from).and_then(move |(r, addr)| {
let header_len = r.position() as usize;
let payload = r.into_inner();
let body = &payload[header_len..];
trace!("Got packet from {}, payload length {}", addr, body.len());
// Will always success
let mut reply_body =
UdpAssociateHeader::new(0, addr.clone()).write_to(Vec::new()).wait().unwrap();
reply_body.extend_from_slice(body);
let mut assoc = assoc.borrow_mut();
match assoc.remove(&addr) {
None => {
warn!("Got unassociated packet from server, addr: {:?}", addr);
let err = io::Error::new(io::ErrorKind::Other, "unassociated packet");
boxed_future(futures::failed(err))
}
Some(client_addr) => {
info!("UDP ASSOCIATE {} <- {}, payload length {} bytes",
client_addr,
addr,
body.len());
let fut = send_to(socket, reply_body, client_addr).map(move |(socket, _, _)| {
Client {
assoc: cloned_assoc,
servers: cloned_servers,
dns_resolver: dns_resolver,
server_picker: server_picker,
socket: socket,
}
});
boxed_future(fut)
}
}
});
boxed_future(fut)
}
None => {
// Client -> Proxy
let reader = Cursor::new(buf.to_vec());
let fut = UdpAssociateHeader::read_from(reader)
.map_err(From::from)
.and_then(move |(r, header)| {
if header.frag != 0 {
warn!("Does not support UDP fragment, got header {:?}", header);
let err = io::Error::new(io::ErrorKind::Other, "Not supported UDP fragment");
return boxed_future(futures::failed(err));
}
let header_len = r.position() as usize;
let payload = r.into_inner();
let assoc_addr = header.address;
info!("UDP ASSOCIATE {} -> {}, payload length {} bytes",
src,
assoc_addr,
&payload[header_len..].len());
// If we have recorded address, then it is a return packet from server
// Proxy -> Client
let mut assoc = assoc.borrow_mut();
let svr_cfg = server_picker.borrow_mut().pick_server();
assoc.insert(assoc_addr.clone(), src);
// Client -> Proxy
let fut = futures::lazy(move || {
let iv = svr_cfg.method().gen_init_vec();
let mut cipher = cipher::with_type(svr_cfg.method(),
svr_cfg.key(),
&iv[..],
CryptoMode::Encrypt);
let payload_buf = Vec::with_capacity(payload.len());
assoc_addr.write_to(payload_buf)
.and_then(move |mut payload_buf| {
payload_buf.extend_from_slice(&payload[header_len..]);
Ok(payload_buf)
})
.and_then(move |payload| -> io::Result<_> {
let mut send_payload = Vec::with_capacity(iv.len() + payload.len());
send_payload.extend_from_slice(&iv[..]);
try!(cipher.update(&payload[..], &mut send_payload));
try!(cipher.finalize(&mut send_payload));
Ok((svr_cfg, send_payload))
})
})
.map_err(From::from)
.and_then(move |(svr_cfg, payload)| {
UdpRelayLocal::resolve_server_addr(svr_cfg.clone(), dns_resolver.clone())
.and_then(move |addr| {
// And we have to know the proxy servers' addresses
let servers = cloned_servers.clone();
let mut svrs_ref = cloned_servers.borrow_mut();
svrs_ref.insert(addr.clone(), svr_cfg.clone());
let fut = send_to(socket, payload, addr).map(|(socket, body, len)| {
trace!("Body size: {}, sent packet size: {}", body.len(), len);
Client {
assoc: cloned_assoc,
server_picker: server_picker,
servers: servers,
dns_resolver: dns_resolver,
socket: socket,
}
});
boxed_future(fut)
})
});
boxed_future(fut)
});
boxed_future(fut)
}
};
Ok(fut)
})
.and_then(|fut| fut);
boxed_future(fut)
}
}
/// UDP relay local server
pub struct UdpRelayLocal;
impl UdpRelayLocal {
/// Resolves server address to SocketAddr
fn resolve_server_addr(svr_cfg: Rc<ServerConfig>, dns_resolver: DnsResolver) -> BoxIoFuture<SocketAddr> {
match svr_cfg.addr() {
// Return directly if it is a SocketAddr
&ServerAddr::SocketAddr(ref addr) => boxed_future(futures::finished(addr.clone())),
// Resolve domain name to SocketAddr
&ServerAddr::DomainName(ref dname, port) => {
let fut = dns_resolver.resolve(dname)
.map(move |sockaddr| {
@@ -281,6 +112,210 @@ impl UdpRelayLocal {
}
}
/// Handles relay from proxy to client
///
/// Extract actual body from payload
/// Appends a SOCKS5 UDP Associate header in front of the body, and send it to client
fn handle_s2c(self, svr_cfg: Rc<ServerConfig>, buf: Vec<u8>, n: usize) -> BoxIoFuture<Client> {
let Client { assoc, server_picker, servers, dns_resolver, socket } = self;
let fut = futures::lazy(move || {
let buf = &buf[..n];
trace!("Got packet from server {}, length {}",
svr_cfg.addr(),
buf.len());
let iv_len = svr_cfg.method().iv_size();
if buf.len() < iv_len {
error!("Invalid ShadowSocks UDP packet, expected IV length {}, packet length {}",
iv_len,
buf.len());
let err = io::Error::new(io::ErrorKind::Other, "early eof");
return Err(err);
}
let iv = &buf[..iv_len];
let mut cipher = cipher::with_type(svr_cfg.method(), svr_cfg.key(), iv, CryptoMode::Decrypt);
// Decrypt payload with cipher
let mut payload = Vec::with_capacity(buf.len());
try!(cipher.update(&buf[iv_len..], &mut payload));
try!(cipher.finalize(&mut payload));
Ok(payload)
})
.and_then(move |payload| {
// Get Address from the front of payload (ShadowSocks protocol)
Address::read_from(Cursor::new(payload))
.map_err(From::from)
.and_then(move |(r, addr)| {
let header_len = r.position() as usize;
let payload = r.into_inner();
let body = &payload[header_len..];
trace!("Got packet from {}, payload length {}", addr, body.len());
// Append header in front of the actual body (SOCKS5 protocol)
let mut reply_body =
UdpAssociateHeader::new(0, addr.clone()).write_to(Vec::new()).wait().unwrap();
reply_body.extend_from_slice(body);
// Get associated client's SocketAddr
// We have to know who sent packet to this `addr`
let cloned_assoc = assoc.clone();
let mut assoc = assoc.borrow_mut();
assoc.remove(&addr)
.ok_or_else(|| {
warn!("Got unassociated packet from server, addr: {:?}", addr);
io::Error::new(io::ErrorKind::Other, "unassociated packet")
})
.map(|client_addr| {
info!("UDP ASSOCIATE {} <- {}, payload length {} bytes",
client_addr,
addr,
body.len());
(client_addr, cloned_assoc, reply_body)
})
})
.and_then(|(client_addr, assoc, reply_body)| {
send_to(socket, reply_body, client_addr).map(move |(socket, _, _)| {
Client {
assoc: assoc,
servers: servers,
dns_resolver: dns_resolver,
server_picker: server_picker,
socket: socket,
}
})
})
});
boxed_future(fut)
}
/// Handles relay from client to proxy
///
/// Appends a Address header in front of the packet, and send it to proxy after encryption
fn handle_c2s(self, buf: Vec<u8>, n: usize, src: SocketAddr) -> BoxIoFuture<Client> {
let Client { assoc, server_picker, servers, dns_resolver, socket } = self;
let fut = futures::lazy(move || {
// Extract UDP associate header in the front (SOCKS5 protocol)
let reader = Cursor::new(buf[..n].to_vec());
let (reader, header) = try!(UdpAssociateHeader::read_from(reader).wait());
let header_length = reader.position() as usize;
Ok((reader.into_inner(), header, header_length))
})
.and_then(|(payload, header, header_len)| {
// ShadowSocks does not support UDP fragment
// Drop the packet directly according to SOCKS5's RFC
if header.frag != 0x00 {
warn!("Does not support UDP fragment, got header {:?}", header);
let err = io::Error::new(io::ErrorKind::Other, "Not supported UDP fragment");
Err(err)
} else {
Ok((payload, header, header_len))
}
})
.and_then(move |(payload, header, header_len)| {
let assoc_addr = header.address;
info!("UDP ASSOCIATE {} -> {}, payload length {} bytes",
src,
assoc_addr,
&payload[header_len..].len());
{
// Record association: addr -> SocketAddr (Client)
let mut assoc = assoc.borrow_mut();
assoc.insert(assoc_addr.clone(), src);
}
let svr_cfg = server_picker.borrow_mut().pick_server();
// Client -> Proxy
let iv = svr_cfg.method().gen_init_vec();
let mut cipher = cipher::with_type(svr_cfg.method(),
svr_cfg.key(),
&iv[..],
CryptoMode::Encrypt);
let payload_buf = Vec::with_capacity(payload.len());
// Append Address to the front (ShadowSocks protocol)
assoc_addr.write_to(payload_buf)
.and_then(move |mut payload_buf| {
payload_buf.extend_from_slice(&payload[header_len..]);
Ok(payload_buf)
})
.and_then(move |payload| -> io::Result<_> {
// Encrypt the whole body as payload
let mut send_payload = Vec::with_capacity(iv.len() + payload.len());
send_payload.extend_from_slice(&iv[..]);
try!(cipher.update(&payload[..], &mut send_payload));
try!(cipher.finalize(&mut send_payload));
Ok((svr_cfg, send_payload))
})
.map_err(From::from)
.and_then(move |(svr_cfg, payload)| {
// Select one server
Client::resolve_server_addr(svr_cfg.clone(), dns_resolver.clone()).and_then(move |addr| {
{
// Record server's address in ServerCache, so we can know which packets
// are from proxy servers
let mut svrs_ref = servers.borrow_mut();
svrs_ref.insert(addr.clone(), svr_cfg.clone());
}
send_to(socket, payload, addr).map(|(socket, body, len)| {
trace!("Body size: {}, sent packet size: {}", body.len(), len);
Client {
assoc: assoc,
server_picker: server_picker,
servers: servers,
dns_resolver: dns_resolver,
socket: socket,
}
})
})
})
});
boxed_future(fut)
}
/// Handle Client after `recv_from`
fn handle_once(self) -> BoxIoFuture<Client> {
let Client { assoc, server_picker, servers, dns_resolver, socket } = self;
let fut = recv_from(socket, vec![0u8; MAXIMUM_UDP_PAYLOAD_SIZE]).and_then(move |(socket, buf, n, src)| {
// Reassemble Client
let c = Client {
assoc: assoc,
server_picker: server_picker,
servers: servers.clone(),
dns_resolver: dns_resolver,
socket: socket,
};
let mut servers = servers.borrow_mut();
match servers.get_mut(&src) {
Some(svr_cfg) => c.handle_s2c(svr_cfg.clone(), buf, n),
None => c.handle_c2s(buf, n, src),
}
});
boxed_future(fut)
}
}
/// UDP relay local server
pub struct UdpRelayLocal;
impl UdpRelayLocal {
// Recursive method for handling clients
// Handle one by one
fn handle_client(client: Client) -> BoxIoFuture<()> {
let fut = client.handle_once()
.and_then(|c| UdpRelayLocal::handle_client(c));
@@ -300,6 +335,7 @@ impl UdpRelayLocal {
socket: l,
};
// Starts to handle all connections after initialization
boxed_future(UdpRelayLocal::handle_client(c))
}

View File

@@ -61,109 +61,137 @@ struct Client {
}
impl Client {
fn handle_once(self) -> BoxIoFuture<Client> {
/// Handles Client to Remote
///
/// Extract and send the actual request body and associate remote with client
fn handle_c2s(self, buf: Vec<u8>, n: usize, src: SocketAddr) -> BoxIoFuture<Client> {
let Client { assoc, svr_cfg, dns_resolver, socket } = self;
let fut = recv_from(socket, vec![0u8; MAXIMUM_UDP_PAYLOAD_SIZE])
.and_then(move |(socket, buf, n, src)| {
let cloned_assoc = assoc.clone();
// Client -> Remote
let fut = futures::lazy(move || {
let buf = &buf[..n];
let fut = match assoc.borrow_mut().remove(&src) {
None => {
// Client -> Remote
let iv_len = svr_cfg.method().iv_size();
if buf.len() < iv_len {
error!("Invalid ShadowSocks UDP packet, expected IV length {}, packet length {}",
iv_len,
buf.len());
let err = io::Error::new(io::ErrorKind::Other, "early eof");
return Err(err);
}
let iv_len = svr_cfg.method().iv_size();
if buf.len() < iv_len {
error!("Invalid ShadowSocks UDP packet, expected IV length {}, packet length {}",
iv_len,
buf.len());
let err = io::Error::new(io::ErrorKind::Other, "early eof");
return Err(err);
}
let iv = &buf[..iv_len];
let mut cipher = cipher::with_type(svr_cfg.method(), svr_cfg.key(), iv, CryptoMode::Decrypt);
let iv = &buf[..iv_len];
let mut cipher = cipher::with_type(svr_cfg.method(), svr_cfg.key(), iv, CryptoMode::Decrypt);
let mut payload = Vec::with_capacity(buf.len());
try!(cipher.update(&buf[iv_len..], &mut payload));
try!(cipher.finalize(&mut payload));
// Decrypt payload from Client
let mut payload = Vec::with_capacity(buf.len());
try!(cipher.update(&buf[iv_len..], &mut payload));
try!(cipher.finalize(&mut payload));
let reader = Cursor::new(payload);
let fut = Address::read_from(reader).map_err(From::from).and_then(move |(r, addr)| {
let header_len = r.position() as usize;
let mut payload = r.into_inner();
payload.drain(..header_len);
let body = payload;
Ok((payload, svr_cfg))
})
.and_then(move |(payload, svr_cfg)| {
// Read Address in the front (ShadowSocks protocol)
Address::read_from(Cursor::new(payload)).map_err(From::from).and_then(move |(r, addr)| {
let header_len = r.position() as usize;
let mut payload = r.into_inner();
payload.drain(..header_len);
let body = payload;
info!("UDP ASSOCIATE {} -> {}, payload length {} bytes",
src,
addr,
body.len());
info!("UDP ASSOCIATE {} -> {}, payload length {} bytes",
src,
addr,
body.len());
let assoc = cloned_assoc.clone();
let cloned_addr = addr.clone();
Client::resolve_remote_addr(addr, dns_resolver.clone())
.and_then(move |remote_addr| {
// Record association
let mut assoc = cloned_assoc.borrow_mut();
assoc.insert(remote_addr.clone(),
Associate {
address: cloned_addr,
client_addr: src,
});
let cloned_assoc = assoc.clone();
let cloned_addr = addr.clone();
Client::resolve_remote_addr(addr, dns_resolver.clone())
.and_then(move |remote_addr| {
// Associate client address with remote
let mut assoc = cloned_assoc.borrow_mut();
assoc.insert(remote_addr.clone(),
Associate {
address: cloned_addr,
client_addr: src,
});
send_to(socket, body, remote_addr)
})
.map(move |(socket, body, len)| {
trace!("Sent body {} bytes, actual {} bytes", body.len(), len);
Client {
assoc: assoc,
svr_cfg: svr_cfg,
dns_resolver: dns_resolver,
socket: socket,
}
})
});
send_to(socket, body, remote_addr)
})
.map(move |(socket, body, len)| {
trace!("Sent body {} bytes, actual {} bytes", body.len(), len);
Client {
assoc: assoc,
svr_cfg: svr_cfg,
dns_resolver: dns_resolver,
socket: socket,
}
})
})
});
boxed_future(fut)
}
boxed_future(fut)
}
Some(Associate { address, client_addr }) => {
info!("UDP ASSOCIATE {} <- {}, payload length {} bytes",
client_addr,
address,
buf.len());
/// Handle Remote to Client
///
/// Return packet to Client with encryption
fn handle_s2c(self, Associate { address, client_addr }: Associate, buf: Vec<u8>, n: usize) -> BoxIoFuture<Client> {
let Client { assoc, svr_cfg, dns_resolver, socket } = self;
// Client <- Remote
let mut iv = svr_cfg.method().gen_init_vec();
let mut cipher = cipher::with_type(svr_cfg.method(),
svr_cfg.key(),
&iv[..],
CryptoMode::Encrypt);
let buf_len = buf[..n].len();
info!("UDP ASSOCIATE {} <- {}, payload length {} bytes",
client_addr,
address,
buf_len);
let mut buf = buf.to_vec();
let fut = address.write_to(Vec::new())
.map(move |mut send_buf| {
send_buf.append(&mut buf);
send_buf
})
.and_then(move |send_buf| -> io::Result<_> {
try!(cipher.update(&send_buf[..], &mut iv));
try!(cipher.finalize(&mut iv));
Ok(iv)
})
.and_then(move |final_buf| send_to(socket, final_buf, client_addr))
.map(|(socket, buf, len)| {
trace!("Sent body {} actual {}", buf.len(), len);
Client {
assoc: cloned_assoc,
svr_cfg: svr_cfg,
dns_resolver: dns_resolver,
socket: socket,
}
});
// Client <- Remote
let mut iv = svr_cfg.method().gen_init_vec();
let mut cipher = cipher::with_type(svr_cfg.method(),
svr_cfg.key(),
&iv[..],
CryptoMode::Encrypt);
boxed_future(fut)
}
// Append Address in front of body (ShadowSocks protocol)
let fut = address.write_to(Vec::with_capacity(buf_len))
.map(move |mut send_buf| {
send_buf.extend_from_slice(&buf[..n]);
send_buf
})
.and_then(move |send_buf| -> io::Result<_> {
try!(cipher.update(&send_buf[..], &mut iv));
try!(cipher.finalize(&mut iv));
Ok(iv)
})
.and_then(move |final_buf| send_to(socket, final_buf, client_addr))
.map(|(socket, buf, len)| {
trace!("Sent body {} actual {}", buf.len(), len);
Client {
assoc: assoc,
svr_cfg: svr_cfg,
dns_resolver: dns_resolver,
socket: socket,
}
});
boxed_future(fut)
}
// Handle one packet
fn handle_once(self) -> BoxIoFuture<Client> {
let Client { assoc, svr_cfg, dns_resolver, socket } = self;
let fut = recv_from(socket, vec![0u8; MAXIMUM_UDP_PAYLOAD_SIZE])
.and_then(move |(socket, buf, n, src)| {
let c = Client {
assoc: assoc.clone(),
svr_cfg: svr_cfg,
dns_resolver: dns_resolver,
socket: socket,
};
let mut assoc = assoc.borrow_mut();
let fut = match assoc.remove(&src) {
None => c.handle_c2s(buf, n, src),
Some(cassoc) => c.handle_s2c(cassoc, buf, n),
};
Ok(fut)