From 5e5194014bd276e6e0dfb36293c5eb5962713ce3 Mon Sep 17 00:00:00 2001 From: "Y. T. Chung" Date: Tue, 1 Nov 2016 22:56:09 +0800 Subject: [PATCH] optimize code structure --- .travis.yml | 5 +- src/lib.rs | 4 +- src/relay/udprelay/local.rs | 380 +++++++++++++++++++---------------- src/relay/udprelay/server.rs | 206 +++++++++++-------- 4 files changed, 330 insertions(+), 265 deletions(-) diff --git a/.travis.yml b/.travis.yml index 68f21c4a..39e2a2b0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,7 @@ language: rust rust: + - stable + - beta - nightly cache: cargo @@ -12,5 +14,4 @@ addons: - libssl-dev script: - - cargo build -v - - cargo test -v + - cargo test -v --no-fail-fast diff --git a/src/lib.rs b/src/lib.rs index dcd9214d..75522eb3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,9 +25,9 @@ //! //! ## Usage //! -//! Build shadowsocks and you will got at least 2 binaries: `sslocal` and `ssserver` +//! Build shadowsocks and you will get at least 2 binaries: `sslocal` and `ssserver` //! -//! Write your servers in a configuration file, which is defined in +//! Write your servers in a configuration file. Format is defined in //! [shadowsocks' documentation](https://github.com/shadowsocks/shadowsocks/wiki) //! //! For example: diff --git a/src/relay/udprelay/local.rs b/src/relay/udprelay/local.rs index b0e50d91..741c740d 100644 --- a/src/relay/udprelay/local.rs +++ b/src/relay/udprelay/local.rs @@ -93,181 +93,12 @@ struct Client { } impl Client { - fn handle_once(self) -> BoxIoFuture { - let Client { assoc, server_picker, servers, dns_resolver, socket } = self; - - let fut = recv_from(socket, vec![0u8; MAXIMUM_UDP_PAYLOAD_SIZE]) - .and_then(move |(socket, buf, n, src)| { - let buf = &buf[..n]; - - let cloned_servers = servers.clone(); - let cloned_assoc = assoc.clone(); - - let fut = match servers.borrow_mut().get_mut(&src) { - Some(svr_cfg) => { - // Proxy -> Client - trace!("Got packet from server {}, length {}", - svr_cfg.addr(), - buf.len()); - - let iv_len = svr_cfg.method().iv_size(); - if buf.len() < iv_len { - error!("Invalid ShadowSocks UDP packet, expected IV length {}, packet length {}", - iv_len, - buf.len()); - let err = io::Error::new(io::ErrorKind::Other, "early eof"); - return Err(err); - } - - let iv = &buf[..iv_len]; - let mut cipher = cipher::with_type(svr_cfg.method(), svr_cfg.key(), iv, CryptoMode::Decrypt); - - let mut payload = Vec::with_capacity(buf.len()); - try!(cipher.update(&buf[iv_len..], &mut payload)); - try!(cipher.finalize(&mut payload)); - - let reader = Cursor::new(payload); - let fut = Address::read_from(reader).map_err(From::from).and_then(move |(r, addr)| { - - let header_len = r.position() as usize; - let payload = r.into_inner(); - let body = &payload[header_len..]; - - trace!("Got packet from {}, payload length {}", addr, body.len()); - - // Will always success - let mut reply_body = - UdpAssociateHeader::new(0, addr.clone()).write_to(Vec::new()).wait().unwrap(); - reply_body.extend_from_slice(body); - - let mut assoc = assoc.borrow_mut(); - match assoc.remove(&addr) { - None => { - warn!("Got unassociated packet from server, addr: {:?}", addr); - let err = io::Error::new(io::ErrorKind::Other, "unassociated packet"); - boxed_future(futures::failed(err)) - } - Some(client_addr) => { - info!("UDP ASSOCIATE {} <- {}, payload length {} bytes", - client_addr, - addr, - body.len()); - - let fut = send_to(socket, reply_body, client_addr).map(move |(socket, _, _)| { - Client { - assoc: cloned_assoc, - servers: cloned_servers, - dns_resolver: dns_resolver, - server_picker: server_picker, - socket: socket, - } - }); - - boxed_future(fut) - } - } - }); - - boxed_future(fut) - } - - None => { - // Client -> Proxy - - let reader = Cursor::new(buf.to_vec()); - let fut = UdpAssociateHeader::read_from(reader) - .map_err(From::from) - .and_then(move |(r, header)| { - - if header.frag != 0 { - warn!("Does not support UDP fragment, got header {:?}", header); - let err = io::Error::new(io::ErrorKind::Other, "Not supported UDP fragment"); - return boxed_future(futures::failed(err)); - } - - let header_len = r.position() as usize; - let payload = r.into_inner(); - let assoc_addr = header.address; - - info!("UDP ASSOCIATE {} -> {}, payload length {} bytes", - src, - assoc_addr, - &payload[header_len..].len()); - - // If we have recorded address, then it is a return packet from server - // Proxy -> Client - let mut assoc = assoc.borrow_mut(); - - let svr_cfg = server_picker.borrow_mut().pick_server(); - assoc.insert(assoc_addr.clone(), src); - - // Client -> Proxy - let fut = futures::lazy(move || { - let iv = svr_cfg.method().gen_init_vec(); - let mut cipher = cipher::with_type(svr_cfg.method(), - svr_cfg.key(), - &iv[..], - CryptoMode::Encrypt); - - let payload_buf = Vec::with_capacity(payload.len()); - assoc_addr.write_to(payload_buf) - .and_then(move |mut payload_buf| { - payload_buf.extend_from_slice(&payload[header_len..]); - Ok(payload_buf) - }) - .and_then(move |payload| -> io::Result<_> { - let mut send_payload = Vec::with_capacity(iv.len() + payload.len()); - send_payload.extend_from_slice(&iv[..]); - try!(cipher.update(&payload[..], &mut send_payload)); - try!(cipher.finalize(&mut send_payload)); - Ok((svr_cfg, send_payload)) - }) - }) - .map_err(From::from) - .and_then(move |(svr_cfg, payload)| { - UdpRelayLocal::resolve_server_addr(svr_cfg.clone(), dns_resolver.clone()) - .and_then(move |addr| { - // And we have to know the proxy servers' addresses - let servers = cloned_servers.clone(); - let mut svrs_ref = cloned_servers.borrow_mut(); - svrs_ref.insert(addr.clone(), svr_cfg.clone()); - - let fut = send_to(socket, payload, addr).map(|(socket, body, len)| { - trace!("Body size: {}, sent packet size: {}", body.len(), len); - Client { - assoc: cloned_assoc, - server_picker: server_picker, - servers: servers, - dns_resolver: dns_resolver, - socket: socket, - } - }); - - boxed_future(fut) - }) - }); - boxed_future(fut) - }); - - boxed_future(fut) - } - }; - - Ok(fut) - }) - .and_then(|fut| fut); - - boxed_future(fut) - } -} - -/// UDP relay local server -pub struct UdpRelayLocal; - -impl UdpRelayLocal { + /// Resolves server address to SocketAddr fn resolve_server_addr(svr_cfg: Rc, dns_resolver: DnsResolver) -> BoxIoFuture { match svr_cfg.addr() { + // Return directly if it is a SocketAddr &ServerAddr::SocketAddr(ref addr) => boxed_future(futures::finished(addr.clone())), + // Resolve domain name to SocketAddr &ServerAddr::DomainName(ref dname, port) => { let fut = dns_resolver.resolve(dname) .map(move |sockaddr| { @@ -281,6 +112,210 @@ impl UdpRelayLocal { } } + /// Handles relay from proxy to client + /// + /// Extract actual body from payload + /// Appends a SOCKS5 UDP Associate header in front of the body, and send it to client + fn handle_s2c(self, svr_cfg: Rc, buf: Vec, n: usize) -> BoxIoFuture { + let Client { assoc, server_picker, servers, dns_resolver, socket } = self; + + let fut = futures::lazy(move || { + let buf = &buf[..n]; + + trace!("Got packet from server {}, length {}", + svr_cfg.addr(), + buf.len()); + + let iv_len = svr_cfg.method().iv_size(); + if buf.len() < iv_len { + error!("Invalid ShadowSocks UDP packet, expected IV length {}, packet length {}", + iv_len, + buf.len()); + let err = io::Error::new(io::ErrorKind::Other, "early eof"); + return Err(err); + } + + let iv = &buf[..iv_len]; + let mut cipher = cipher::with_type(svr_cfg.method(), svr_cfg.key(), iv, CryptoMode::Decrypt); + + // Decrypt payload with cipher + let mut payload = Vec::with_capacity(buf.len()); + try!(cipher.update(&buf[iv_len..], &mut payload)); + try!(cipher.finalize(&mut payload)); + + Ok(payload) + }) + .and_then(move |payload| { + // Get Address from the front of payload (ShadowSocks protocol) + Address::read_from(Cursor::new(payload)) + .map_err(From::from) + .and_then(move |(r, addr)| { + + let header_len = r.position() as usize; + let payload = r.into_inner(); + let body = &payload[header_len..]; + + trace!("Got packet from {}, payload length {}", addr, body.len()); + + // Append header in front of the actual body (SOCKS5 protocol) + let mut reply_body = + UdpAssociateHeader::new(0, addr.clone()).write_to(Vec::new()).wait().unwrap(); + reply_body.extend_from_slice(body); + + // Get associated client's SocketAddr + // We have to know who sent packet to this `addr` + let cloned_assoc = assoc.clone(); + let mut assoc = assoc.borrow_mut(); + assoc.remove(&addr) + .ok_or_else(|| { + warn!("Got unassociated packet from server, addr: {:?}", addr); + io::Error::new(io::ErrorKind::Other, "unassociated packet") + }) + .map(|client_addr| { + info!("UDP ASSOCIATE {} <- {}, payload length {} bytes", + client_addr, + addr, + body.len()); + (client_addr, cloned_assoc, reply_body) + }) + }) + .and_then(|(client_addr, assoc, reply_body)| { + send_to(socket, reply_body, client_addr).map(move |(socket, _, _)| { + Client { + assoc: assoc, + servers: servers, + dns_resolver: dns_resolver, + server_picker: server_picker, + socket: socket, + } + }) + }) + }); + + boxed_future(fut) + } + + /// Handles relay from client to proxy + /// + /// Appends a Address header in front of the packet, and send it to proxy after encryption + fn handle_c2s(self, buf: Vec, n: usize, src: SocketAddr) -> BoxIoFuture { + let Client { assoc, server_picker, servers, dns_resolver, socket } = self; + + let fut = futures::lazy(move || { + // Extract UDP associate header in the front (SOCKS5 protocol) + let reader = Cursor::new(buf[..n].to_vec()); + let (reader, header) = try!(UdpAssociateHeader::read_from(reader).wait()); + + let header_length = reader.position() as usize; + Ok((reader.into_inner(), header, header_length)) + }) + .and_then(|(payload, header, header_len)| { + // ShadowSocks does not support UDP fragment + // Drop the packet directly according to SOCKS5's RFC + if header.frag != 0x00 { + warn!("Does not support UDP fragment, got header {:?}", header); + let err = io::Error::new(io::ErrorKind::Other, "Not supported UDP fragment"); + Err(err) + } else { + Ok((payload, header, header_len)) + } + }) + .and_then(move |(payload, header, header_len)| { + let assoc_addr = header.address; + + info!("UDP ASSOCIATE {} -> {}, payload length {} bytes", + src, + assoc_addr, + &payload[header_len..].len()); + + { + // Record association: addr -> SocketAddr (Client) + let mut assoc = assoc.borrow_mut(); + assoc.insert(assoc_addr.clone(), src); + } + let svr_cfg = server_picker.borrow_mut().pick_server(); + + // Client -> Proxy + let iv = svr_cfg.method().gen_init_vec(); + let mut cipher = cipher::with_type(svr_cfg.method(), + svr_cfg.key(), + &iv[..], + CryptoMode::Encrypt); + + let payload_buf = Vec::with_capacity(payload.len()); + // Append Address to the front (ShadowSocks protocol) + assoc_addr.write_to(payload_buf) + .and_then(move |mut payload_buf| { + payload_buf.extend_from_slice(&payload[header_len..]); + Ok(payload_buf) + }) + .and_then(move |payload| -> io::Result<_> { + // Encrypt the whole body as payload + let mut send_payload = Vec::with_capacity(iv.len() + payload.len()); + send_payload.extend_from_slice(&iv[..]); + try!(cipher.update(&payload[..], &mut send_payload)); + try!(cipher.finalize(&mut send_payload)); + Ok((svr_cfg, send_payload)) + }) + .map_err(From::from) + .and_then(move |(svr_cfg, payload)| { + // Select one server + Client::resolve_server_addr(svr_cfg.clone(), dns_resolver.clone()).and_then(move |addr| { + { + // Record server's address in ServerCache, so we can know which packets + // are from proxy servers + let mut svrs_ref = servers.borrow_mut(); + svrs_ref.insert(addr.clone(), svr_cfg.clone()); + } + + send_to(socket, payload, addr).map(|(socket, body, len)| { + trace!("Body size: {}, sent packet size: {}", body.len(), len); + Client { + assoc: assoc, + server_picker: server_picker, + servers: servers, + dns_resolver: dns_resolver, + socket: socket, + } + }) + }) + }) + }); + + boxed_future(fut) + } + + /// Handle Client after `recv_from` + fn handle_once(self) -> BoxIoFuture { + let Client { assoc, server_picker, servers, dns_resolver, socket } = self; + + let fut = recv_from(socket, vec![0u8; MAXIMUM_UDP_PAYLOAD_SIZE]).and_then(move |(socket, buf, n, src)| { + // Reassemble Client + let c = Client { + assoc: assoc, + server_picker: server_picker, + servers: servers.clone(), + dns_resolver: dns_resolver, + socket: socket, + }; + + let mut servers = servers.borrow_mut(); + match servers.get_mut(&src) { + Some(svr_cfg) => c.handle_s2c(svr_cfg.clone(), buf, n), + None => c.handle_c2s(buf, n, src), + } + }); + + boxed_future(fut) + } +} + +/// UDP relay local server +pub struct UdpRelayLocal; + +impl UdpRelayLocal { + // Recursive method for handling clients + // Handle one by one fn handle_client(client: Client) -> BoxIoFuture<()> { let fut = client.handle_once() .and_then(|c| UdpRelayLocal::handle_client(c)); @@ -300,6 +335,7 @@ impl UdpRelayLocal { socket: l, }; + // Starts to handle all connections after initialization boxed_future(UdpRelayLocal::handle_client(c)) } diff --git a/src/relay/udprelay/server.rs b/src/relay/udprelay/server.rs index 89497f98..d315a928 100644 --- a/src/relay/udprelay/server.rs +++ b/src/relay/udprelay/server.rs @@ -61,109 +61,137 @@ struct Client { } impl Client { - fn handle_once(self) -> BoxIoFuture { + /// Handles Client to Remote + /// + /// Extract and send the actual request body and associate remote with client + fn handle_c2s(self, buf: Vec, n: usize, src: SocketAddr) -> BoxIoFuture { let Client { assoc, svr_cfg, dns_resolver, socket } = self; - let fut = recv_from(socket, vec![0u8; MAXIMUM_UDP_PAYLOAD_SIZE]) - .and_then(move |(socket, buf, n, src)| { - let cloned_assoc = assoc.clone(); + // Client -> Remote + let fut = futures::lazy(move || { let buf = &buf[..n]; - let fut = match assoc.borrow_mut().remove(&src) { - None => { - // Client -> Remote - let iv_len = svr_cfg.method().iv_size(); - if buf.len() < iv_len { - error!("Invalid ShadowSocks UDP packet, expected IV length {}, packet length {}", - iv_len, - buf.len()); - let err = io::Error::new(io::ErrorKind::Other, "early eof"); - return Err(err); - } + let iv_len = svr_cfg.method().iv_size(); + if buf.len() < iv_len { + error!("Invalid ShadowSocks UDP packet, expected IV length {}, packet length {}", + iv_len, + buf.len()); + let err = io::Error::new(io::ErrorKind::Other, "early eof"); + return Err(err); + } - let iv = &buf[..iv_len]; - let mut cipher = cipher::with_type(svr_cfg.method(), svr_cfg.key(), iv, CryptoMode::Decrypt); + let iv = &buf[..iv_len]; + let mut cipher = cipher::with_type(svr_cfg.method(), svr_cfg.key(), iv, CryptoMode::Decrypt); - let mut payload = Vec::with_capacity(buf.len()); - try!(cipher.update(&buf[iv_len..], &mut payload)); - try!(cipher.finalize(&mut payload)); + // Decrypt payload from Client + let mut payload = Vec::with_capacity(buf.len()); + try!(cipher.update(&buf[iv_len..], &mut payload)); + try!(cipher.finalize(&mut payload)); - let reader = Cursor::new(payload); - let fut = Address::read_from(reader).map_err(From::from).and_then(move |(r, addr)| { - let header_len = r.position() as usize; - let mut payload = r.into_inner(); - payload.drain(..header_len); - let body = payload; + Ok((payload, svr_cfg)) + }) + .and_then(move |(payload, svr_cfg)| { + // Read Address in the front (ShadowSocks protocol) + Address::read_from(Cursor::new(payload)).map_err(From::from).and_then(move |(r, addr)| { + let header_len = r.position() as usize; + let mut payload = r.into_inner(); + payload.drain(..header_len); + let body = payload; - info!("UDP ASSOCIATE {} -> {}, payload length {} bytes", - src, - addr, - body.len()); + info!("UDP ASSOCIATE {} -> {}, payload length {} bytes", + src, + addr, + body.len()); - let assoc = cloned_assoc.clone(); - let cloned_addr = addr.clone(); - Client::resolve_remote_addr(addr, dns_resolver.clone()) - .and_then(move |remote_addr| { - // Record association - let mut assoc = cloned_assoc.borrow_mut(); - assoc.insert(remote_addr.clone(), - Associate { - address: cloned_addr, - client_addr: src, - }); + let cloned_assoc = assoc.clone(); + let cloned_addr = addr.clone(); + Client::resolve_remote_addr(addr, dns_resolver.clone()) + .and_then(move |remote_addr| { + // Associate client address with remote + let mut assoc = cloned_assoc.borrow_mut(); + assoc.insert(remote_addr.clone(), + Associate { + address: cloned_addr, + client_addr: src, + }); - send_to(socket, body, remote_addr) - }) - .map(move |(socket, body, len)| { - trace!("Sent body {} bytes, actual {} bytes", body.len(), len); - Client { - assoc: assoc, - svr_cfg: svr_cfg, - dns_resolver: dns_resolver, - socket: socket, - } - }) - }); + send_to(socket, body, remote_addr) + }) + .map(move |(socket, body, len)| { + trace!("Sent body {} bytes, actual {} bytes", body.len(), len); + Client { + assoc: assoc, + svr_cfg: svr_cfg, + dns_resolver: dns_resolver, + socket: socket, + } + }) + }) + }); + boxed_future(fut) + } - boxed_future(fut) - } - Some(Associate { address, client_addr }) => { - info!("UDP ASSOCIATE {} <- {}, payload length {} bytes", - client_addr, - address, - buf.len()); + /// Handle Remote to Client + /// + /// Return packet to Client with encryption + fn handle_s2c(self, Associate { address, client_addr }: Associate, buf: Vec, n: usize) -> BoxIoFuture { + let Client { assoc, svr_cfg, dns_resolver, socket } = self; - // Client <- Remote - let mut iv = svr_cfg.method().gen_init_vec(); - let mut cipher = cipher::with_type(svr_cfg.method(), - svr_cfg.key(), - &iv[..], - CryptoMode::Encrypt); + let buf_len = buf[..n].len(); + info!("UDP ASSOCIATE {} <- {}, payload length {} bytes", + client_addr, + address, + buf_len); - let mut buf = buf.to_vec(); - let fut = address.write_to(Vec::new()) - .map(move |mut send_buf| { - send_buf.append(&mut buf); - send_buf - }) - .and_then(move |send_buf| -> io::Result<_> { - try!(cipher.update(&send_buf[..], &mut iv)); - try!(cipher.finalize(&mut iv)); - Ok(iv) - }) - .and_then(move |final_buf| send_to(socket, final_buf, client_addr)) - .map(|(socket, buf, len)| { - trace!("Sent body {} actual {}", buf.len(), len); - Client { - assoc: cloned_assoc, - svr_cfg: svr_cfg, - dns_resolver: dns_resolver, - socket: socket, - } - }); + // Client <- Remote + let mut iv = svr_cfg.method().gen_init_vec(); + let mut cipher = cipher::with_type(svr_cfg.method(), + svr_cfg.key(), + &iv[..], + CryptoMode::Encrypt); - boxed_future(fut) - } + // Append Address in front of body (ShadowSocks protocol) + let fut = address.write_to(Vec::with_capacity(buf_len)) + .map(move |mut send_buf| { + send_buf.extend_from_slice(&buf[..n]); + send_buf + }) + .and_then(move |send_buf| -> io::Result<_> { + try!(cipher.update(&send_buf[..], &mut iv)); + try!(cipher.finalize(&mut iv)); + Ok(iv) + }) + .and_then(move |final_buf| send_to(socket, final_buf, client_addr)) + .map(|(socket, buf, len)| { + trace!("Sent body {} actual {}", buf.len(), len); + Client { + assoc: assoc, + svr_cfg: svr_cfg, + dns_resolver: dns_resolver, + socket: socket, + } + }); + + boxed_future(fut) + } + + // Handle one packet + fn handle_once(self) -> BoxIoFuture { + let Client { assoc, svr_cfg, dns_resolver, socket } = self; + + let fut = recv_from(socket, vec![0u8; MAXIMUM_UDP_PAYLOAD_SIZE]) + .and_then(move |(socket, buf, n, src)| { + let c = Client { + assoc: assoc.clone(), + svr_cfg: svr_cfg, + dns_resolver: dns_resolver, + socket: socket, + }; + + let mut assoc = assoc.borrow_mut(); + let fut = match assoc.remove(&src) { + None => c.handle_c2s(buf, n, src), + Some(cassoc) => c.handle_s2c(cassoc, buf, n), }; Ok(fut)