diff --git a/Cargo.lock b/Cargo.lock index e655d991..47c8d53b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -750,10 +750,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] -name = "libc" -version = "0.2.93" +name = "lfu_cache" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41" +checksum = "29d2e56f95e5fda80586d85e2e98bb6dba8f71f4406161ce90698fa38ff16486" + +[[package]] +name = "libc" +version = "0.2.94" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18794a8ad5b29321f790b55d93dfba91e125cb1a9edbd4f8e3150acc771c1a5e" [[package]] name = "libmimalloc-sys" @@ -831,12 +837,6 @@ dependencies = [ "linked-hash-map", ] -[[package]] -name = "lru_time_cache" -version = "0.11.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "991058cf0e7f161b37a4a610ddbada9d2f051d0db76369b973de9f3466f1cba3" - [[package]] name = "maplit" version = "1.0.2" @@ -1549,10 +1549,10 @@ dependencies = [ "ipnet", "iprange", "json5", + "lfu_cache", "libc", "log", "log4rs", - "lru_time_cache", "mio", "native-tls", "nix", diff --git a/bin/sslocal.rs b/bin/sslocal.rs index 33513d62..f927ca71 100644 --- a/bin/sslocal.rs +++ b/bin/sslocal.rs @@ -357,7 +357,7 @@ fn main() { #[cfg(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "ios"))] if let Some(iface) = matches.value_of("OUTBOUND_BIND_INTERFACE") { - config.outbound_bind_interface = Some(From::from(iface.to_owned())); + config.outbound_bind_interface = Some(iface.to_owned()); } #[cfg(all(unix, not(target_os = "android")))] diff --git a/bin/ssmanager.rs b/bin/ssmanager.rs index 2476057f..f42ff72d 100644 --- a/bin/ssmanager.rs +++ b/bin/ssmanager.rs @@ -163,9 +163,9 @@ fn main() { config.outbound_fwmark = Some(mark.parse::().expect("an unsigned integer for `outbound-fwmark`")); } - #[cfg(any(target_os = "linux", target_os = "android"))] + #[cfg(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "ios"))] if let Some(iface) = matches.value_of("OUTBOUND_BIND_INTERFACE") { - config.outbound_bind_interface = Some(From::from(iface.to_owned())); + config.outbound_bind_interface = Some(iface.to_owned()); } if let Some(m) = matches.value_of("MANAGER_ADDRESS") { diff --git a/bin/ssserver.rs b/bin/ssserver.rs index 144d6f2a..9aa987d1 100644 --- a/bin/ssserver.rs +++ b/bin/ssserver.rs @@ -204,9 +204,9 @@ fn main() { config.outbound_fwmark = Some(mark.parse::().expect("an unsigned integer for `outbound-fwmark`")); } - #[cfg(any(target_os = "linux", target_os = "android"))] + #[cfg(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "ios"))] if let Some(iface) = matches.value_of("OUTBOUND_BIND_INTERFACE") { - config.outbound_bind_interface = Some(From::from(iface.to_owned())); + config.outbound_bind_interface = Some(iface.to_owned()); } if let Some(m) = matches.value_of("MANAGER_ADDRESS") { diff --git a/crates/shadowsocks-service/Cargo.toml b/crates/shadowsocks-service/Cargo.toml index d15c20d3..3a8534e9 100644 --- a/crates/shadowsocks-service/Cargo.toml +++ b/crates/shadowsocks-service/Cargo.toml @@ -77,7 +77,7 @@ once_cell = "1.7" thiserror = "1.0" spin = { version = "0.9", features = ["std"] } -lru_time_cache = "0.11" +lfu_cache = "1.2.1" bytes = "1.0" byte_string = "1.0" byteorder = "1.3" diff --git a/crates/shadowsocks-service/src/local/context.rs b/crates/shadowsocks-service/src/local/context.rs index fde296c8..c5189313 100644 --- a/crates/shadowsocks-service/src/local/context.rs +++ b/crates/shadowsocks-service/src/local/context.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use std::{net::IpAddr, time::Duration}; #[cfg(feature = "local-dns")] -use lru_time_cache::LruCache; +use lfu_cache::TimedLfuCache; use shadowsocks::{ config::ServerType, context::{Context, SharedContext}, @@ -32,7 +32,7 @@ pub struct ServiceContext { // For DNS relay's ACL domain name reverse lookup -- whether the IP shall be forwarded #[cfg(feature = "local-dns")] - reverse_lookup_cache: Mutex>, + reverse_lookup_cache: Mutex>, } impl Default for ServiceContext { @@ -51,7 +51,10 @@ impl ServiceContext { acl: None, flow_stat: Arc::new(FlowStat::new()), #[cfg(feature = "local-dns")] - reverse_lookup_cache: Mutex::new(LruCache::with_expiry_duration(Duration::from_secs(3 * 24 * 60 * 60))), + reverse_lookup_cache: Mutex::new(TimedLfuCache::with_capacity_and_expiration( + 10240, // XXX: It should be enough for a normal user. + Duration::from_secs(3 * 24 * 60 * 60), + )), } } diff --git a/crates/shadowsocks-service/src/local/http/client_cache.rs b/crates/shadowsocks-service/src/local/http/client_cache.rs index ddf8dfb7..21d41d0a 100644 --- a/crates/shadowsocks-service/src/local/http/client_cache.rs +++ b/crates/shadowsocks-service/src/local/http/client_cache.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration}; use hyper::{Body, Client}; -use lru_time_cache::LruCache; +use lfu_cache::TimedLfuCache; use shadowsocks::config::ServerAddr; use tokio::sync::Mutex; @@ -14,14 +14,14 @@ use super::{connector::ProxyConnector, http_client::ProxyHttpClient}; /// Cached HTTP client for remote servers pub struct ProxyClientCache { context: Arc, - cache: Mutex>, + cache: Mutex>, } impl ProxyClientCache { pub fn new(context: Arc) -> ProxyClientCache { ProxyClientCache { context, - cache: Mutex::new(LruCache::with_expiry_duration_and_capacity(Duration::from_secs(60), 5)), + cache: Mutex::new(TimedLfuCache::with_capacity_and_expiration(5, Duration::from_secs(60))), } } diff --git a/crates/shadowsocks-service/src/local/net/tcp/auto_proxy_stream.rs b/crates/shadowsocks-service/src/local/net/tcp/auto_proxy_stream.rs index 00d08782..dfcb4b34 100644 --- a/crates/shadowsocks-service/src/local/net/tcp/auto_proxy_stream.rs +++ b/crates/shadowsocks-service/src/local/net/tcp/auto_proxy_stream.rs @@ -59,7 +59,7 @@ impl AutoProxyClientStream { let addr = addr.into(); let stream = TcpStream::connect_remote_with_opts(context.context_ref(), &addr, context.connect_opts_ref()).await?; - Ok(AutoProxyClientStream::Bypassed(stream.into())) + Ok(AutoProxyClientStream::Bypassed(stream)) } /// Connect to target `addr` via shadowsocks' server configured by `svr_cfg` diff --git a/crates/shadowsocks-service/src/local/net/udp/association.rs b/crates/shadowsocks-service/src/local/net/udp/association.rs index 3b38c7ac..967426c2 100644 --- a/crates/shadowsocks-service/src/local/net/udp/association.rs +++ b/crates/shadowsocks-service/src/local/net/udp/association.rs @@ -10,8 +10,8 @@ use std::{ use async_trait::async_trait; use bytes::Bytes; use futures::future::{self, AbortHandle}; +use lfu_cache::TimedLfuCache; use log::{debug, error, trace, warn}; -use lru_time_cache::{Entry, LruCache}; use shadowsocks::{ lookup_then, net::UdpSocket as ShadowUdpSocket, @@ -42,6 +42,9 @@ pub trait UdpInboundWrite { async fn send_to(&self, peer_addr: SocketAddr, remote_addr: &Address, data: &[u8]) -> io::Result<()>; } +type AssociationMap = TimedLfuCache>; +type SharedAssociationMap = Arc>>; + /// UDP association manager pub struct UdpAssociationManager where @@ -49,7 +52,7 @@ where { respond_writer: W, context: Arc, - assoc_map: Arc>>>, + assoc_map: SharedAssociationMap, cleanup_abortable: AbortHandle, balancer: PingBalancer, } @@ -77,8 +80,8 @@ where ) -> UdpAssociationManager { let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION); let assoc_map = Arc::new(Mutex::new(match capacity { - Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity), - None => LruCache::with_expiry_duration(time_to_live), + Some(capacity) => TimedLfuCache::with_capacity_and_expiration(capacity, time_to_live), + None => TimedLfuCache::with_expiration(time_to_live), })); let cleanup_abortable = { @@ -87,8 +90,8 @@ where loop { time::sleep(time_to_live).await; - // iter() will trigger a cleanup of expired associations - let _ = assoc_map.lock().await.iter(); + // cleanup expired associations + let _ = assoc_map.lock().await.evict_expired(); } }); tokio::spawn(cleanup_task); @@ -107,23 +110,27 @@ where /// Sends `data` from `peer_addr` to `target_addr` pub async fn send_to(&self, peer_addr: SocketAddr, target_addr: Address, data: &[u8]) -> io::Result<()> { // Check or (re)create an association - match self.assoc_map.lock().await.entry(peer_addr) { - Entry::Occupied(occ) => { - let assoc = occ.into_mut(); - assoc.try_send((target_addr, Bytes::copy_from_slice(data))) - } - Entry::Vacant(vac) => { - let assoc = vac.insert(UdpAssociation::new( - self.context.clone(), - peer_addr, - self.assoc_map.clone(), - self.balancer.clone(), - self.respond_writer.clone(), - )); - trace!("created udp association for {}", peer_addr); - assoc.try_send((target_addr, Bytes::copy_from_slice(data))) - } + + let mut assoc_map = self.assoc_map.lock().await; + + if let Some(assoc) = assoc_map.get(&peer_addr) { + return assoc.try_send((target_addr, Bytes::copy_from_slice(data))); } + + let assoc = UdpAssociation::new( + self.context.clone(), + peer_addr, + self.assoc_map.clone(), + self.balancer.clone(), + self.respond_writer.clone(), + ); + + trace!("created udp association for {}", peer_addr); + + assoc.try_send((target_addr, Bytes::copy_from_slice(data)))?; + assoc_map.insert(peer_addr, assoc); + + Ok(()) } } @@ -153,7 +160,7 @@ where fn new( context: Arc, peer_addr: SocketAddr, - assoc_map: Arc>>>, + assoc_map: SharedAssociationMap, balancer: PingBalancer, respond_writer: W, ) -> UdpAssociation { @@ -245,7 +252,7 @@ where bypassed_ipv4_socket: SpinMutex, bypassed_ipv6_socket: SpinMutex, proxied_socket: SpinMutex, - assoc_map: Arc>>>, + assoc_map: SharedAssociationMap, balancer: PingBalancer, respond_writer: W, } @@ -266,7 +273,7 @@ where fn new( context: Arc, peer_addr: SocketAddr, - assoc_map: Arc>>>, + assoc_map: SharedAssociationMap, balancer: PingBalancer, respond_writer: W, ) -> (Arc>, mpsc::Sender<(Address, Bytes)>) { diff --git a/crates/shadowsocks-service/src/local/redir/udprelay/mod.rs b/crates/shadowsocks-service/src/local/redir/udprelay/mod.rs index 44f45282..5ff97364 100644 --- a/crates/shadowsocks-service/src/local/redir/udprelay/mod.rs +++ b/crates/shadowsocks-service/src/local/redir/udprelay/mod.rs @@ -8,12 +8,16 @@ use std::{ }; use async_trait::async_trait; +use futures::future::{self, AbortHandle}; +use lfu_cache::TimedLfuCache; use log::{error, info, trace, warn}; use shadowsocks::{ lookup_then, + net::ConnectOpts, relay::{socks5::Address, udprelay::MAXIMUM_UDP_PAYLOAD_SIZE}, ServerAddr, }; +use tokio::sync::Mutex; use crate::{ config::RedirType, @@ -32,10 +36,63 @@ use self::sys::UdpRedirSocket; mod sys; +const INBOUND_SOCKET_CACHE_EXPIRATION: Duration = Duration::from_secs(60); +const INBOUND_SOCKET_CACHE_CAPACITY: usize = 256; + +struct UdpRedirInboundCache { + cache: Arc>>>, + watcher: AbortHandle, +} + +impl Drop for UdpRedirInboundCache { + fn drop(&mut self) { + self.watcher.abort(); + } +} + +impl UdpRedirInboundCache { + fn new() -> UdpRedirInboundCache { + let cache = Arc::new(Mutex::new(TimedLfuCache::with_capacity_and_expiration( + INBOUND_SOCKET_CACHE_CAPACITY, + INBOUND_SOCKET_CACHE_EXPIRATION, + ))); + + let (cleanup_fut, watcher) = { + let cache = cache.clone(); + future::abortable(async move { + loop { + tokio::time::sleep(INBOUND_SOCKET_CACHE_EXPIRATION).await; + cache.lock().await.evict_expired(); + } + }) + }; + tokio::spawn(cleanup_fut); + + UdpRedirInboundCache { cache, watcher } + } +} + #[derive(Clone)] struct UdpRedirInboundWriter { redir_ty: RedirType, socket_opts: RedirSocketOpts, + inbound_cache: Arc, +} + +impl UdpRedirInboundWriter { + #[allow(unused_variables, clippy::needless_update)] + fn new(redir_ty: RedirType, opts: &ConnectOpts) -> UdpRedirInboundWriter { + UdpRedirInboundWriter { + redir_ty, + socket_opts: RedirSocketOpts { + #[cfg(any(target_os = "linux", target_os = "android"))] + fwmark: opts.fwmark, + + ..Default::default() + }, + inbound_cache: Arc::new(UdpRedirInboundCache::new()), + } + } } #[async_trait] @@ -61,12 +118,25 @@ impl UdpInboundWrite for UdpRedirInboundWriter { } }; - // Create a socket binds to destination addr - // This only works for systems that supports binding to non-local addresses - // - // This socket has to set SO_REUSEADDR and SO_REUSEPORT. - // Outbound addresses could be connected from different source addresses. - let inbound = UdpRedirSocket::bind_nonlocal(self.redir_ty, addr, &self.socket_opts)?; + let inbound = { + let mut cache = self.inbound_cache.cache.lock().await; + if let Some(socket) = cache.get(&addr) { + socket.clone() + } else { + // Create a socket binds to destination addr + // This only works for systems that supports binding to non-local addresses + // + // This socket has to set SO_REUSEADDR and SO_REUSEPORT. + // Outbound addresses could be connected from different source addresses. + let inbound = UdpRedirSocket::bind_nonlocal(self.redir_ty, addr, &self.socket_opts)?; + + // UDP socket could be shared between threads and is safe to be manipulated by multiple threads + let inbound = Arc::new(inbound); + cache.insert(addr, inbound.clone()); + + inbound + } + }; // Send back to client inbound.send_to(data, peer_addr).await.map(|n| { @@ -133,15 +203,7 @@ impl UdpRedir { #[allow(clippy::needless_update)] let manager = UdpAssociationManager::new( self.context.clone(), - UdpRedirInboundWriter { - redir_ty: self.redir_ty, - socket_opts: RedirSocketOpts { - #[cfg(any(target_os = "linux", target_os = "android"))] - fwmark: self.context.connect_opts_ref().fwmark, - - ..Default::default() - }, - }, + UdpRedirInboundWriter::new(self.redir_ty, self.context.connect_opts_ref()), self.time_to_live, self.capacity, balancer, diff --git a/crates/shadowsocks-service/src/local/tunnel/udprelay.rs b/crates/shadowsocks-service/src/local/tunnel/udprelay.rs index 638b3292..5d50c18a 100644 --- a/crates/shadowsocks-service/src/local/tunnel/udprelay.rs +++ b/crates/shadowsocks-service/src/local/tunnel/udprelay.rs @@ -5,8 +5,8 @@ use std::{io, net::SocketAddr, sync::Arc, time::Duration}; use bytes::Bytes; use futures::future::{self, AbortHandle}; use io::ErrorKind; +use lfu_cache::TimedLfuCache; use log::{debug, error, info, trace, warn}; -use lru_time_cache::{Entry, LruCache}; use shadowsocks::{ lookup_then, net::UdpSocket as ShadowUdpSocket, @@ -28,9 +28,12 @@ use crate::{ net::MonProxySocket, }; +type AssociationMap = TimedLfuCache; +type SharedAssociationMap = Arc>; + pub struct UdpTunnel { context: Arc, - assoc_map: Arc>>, + assoc_map: SharedAssociationMap, cleanup_abortable: AbortHandle, } @@ -44,8 +47,8 @@ impl UdpTunnel { pub fn new(context: Arc, time_to_live: Option, capacity: Option) -> UdpTunnel { let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION); let assoc_map = Arc::new(Mutex::new(match capacity { - Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity), - None => LruCache::with_expiry_duration(time_to_live), + Some(capacity) => TimedLfuCache::with_capacity_and_expiration(capacity, time_to_live), + None => TimedLfuCache::with_expiration(time_to_live), })); let cleanup_abortable = { @@ -54,8 +57,8 @@ impl UdpTunnel { loop { time::sleep(time_to_live).await; - // iter() will trigger a cleanup of expired associations - let _ = assoc_map.lock().await.iter(); + // cleanup expired associations + let _ = assoc_map.lock().await.evict_expired(); } }); tokio::spawn(cleanup_task); @@ -126,24 +129,26 @@ impl UdpTunnel { data: &[u8], ) -> io::Result<()> { let mut assoc_map = self.assoc_map.lock().await; - match assoc_map.entry(peer_addr) { - Entry::Occupied(occ) => { - let assoc = occ.into_mut(); - assoc.try_send(Bytes::copy_from_slice(data)) - } - Entry::Vacant(vac) => { - let assoc = vac.insert(UdpAssociation::new( - self.context.clone(), - listener.clone(), - peer_addr, - forward_addr.clone(), - self.assoc_map.clone(), - balancer.clone(), - )); - trace!("created udp association for {}", peer_addr); - assoc.try_send(Bytes::copy_from_slice(data)) - } + + if let Some(assoc) = assoc_map.get(&peer_addr) { + return assoc.try_send(Bytes::copy_from_slice(data)); } + + let assoc = UdpAssociation::new( + self.context.clone(), + listener.clone(), + peer_addr, + forward_addr.clone(), + self.assoc_map.clone(), + balancer.clone(), + ); + + trace!("created udp association for {}", peer_addr); + + assoc.try_send(Bytes::copy_from_slice(data))?; + assoc_map.insert(peer_addr, assoc); + + Ok(()) } } @@ -164,7 +169,7 @@ impl UdpAssociation { inbound: Arc, peer_addr: SocketAddr, forward_addr: Address, - assoc_map: Arc>>, + assoc_map: SharedAssociationMap, balancer: PingBalancer, ) -> UdpAssociation { let (assoc, sender) = @@ -222,7 +227,7 @@ struct UdpAssociationContext { peer_addr: SocketAddr, forward_addr: Address, proxied_socket: SpinMutex, - assoc_map: Arc>>, + assoc_map: SharedAssociationMap, balancer: PingBalancer, } @@ -238,7 +243,7 @@ impl UdpAssociationContext { inbound: Arc, peer_addr: SocketAddr, forward_addr: Address, - assoc_map: Arc>>, + assoc_map: SharedAssociationMap, balancer: PingBalancer, ) -> (Arc, mpsc::Sender) { // Pending packets 1024 should be good enough for a server. diff --git a/crates/shadowsocks-service/src/server/udprelay.rs b/crates/shadowsocks-service/src/server/udprelay.rs index badc292e..0664956f 100644 --- a/crates/shadowsocks-service/src/server/udprelay.rs +++ b/crates/shadowsocks-service/src/server/udprelay.rs @@ -5,8 +5,8 @@ use std::{io, net::SocketAddr, sync::Arc, time::Duration}; use bytes::Bytes; use futures::future::{self, AbortHandle}; use io::ErrorKind; +use lfu_cache::TimedLfuCache; use log::{debug, error, info, trace, warn}; -use lru_time_cache::{Entry, LruCache}; use shadowsocks::{ lookup_then, net::UdpSocket as OutboundUdpSocket, @@ -26,9 +26,12 @@ use crate::net::MonProxySocket; use super::context::ServiceContext; +type AssociationMap = TimedLfuCache; +type SharedAssociationMap = Arc>; + pub struct UdpServer { context: Arc, - assoc_map: Arc>>, + assoc_map: SharedAssociationMap, cleanup_abortable: AbortHandle, } @@ -42,8 +45,8 @@ impl UdpServer { pub fn new(context: Arc, time_to_live: Option, capacity: Option) -> UdpServer { let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION); let assoc_map = Arc::new(Mutex::new(match capacity { - Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity), - None => LruCache::with_expiry_duration(time_to_live), + Some(capacity) => TimedLfuCache::with_capacity_and_expiration(capacity, time_to_live), + None => TimedLfuCache::with_expiration(time_to_live), })); let cleanup_abortable = { @@ -52,8 +55,8 @@ impl UdpServer { loop { time::sleep(time_to_live).await; - // iter() will trigger a cleanup of expired associations - let _ = assoc_map.lock().await.iter(); + // cleanup expired associations + assoc_map.lock().await.evict_expired(); } }); tokio::spawn(cleanup_task); @@ -112,22 +115,25 @@ impl UdpServer { target_addr: Address, data: &[u8], ) -> io::Result<()> { - match self.assoc_map.lock().await.entry(peer_addr) { - Entry::Occupied(occ) => { - let assoc = occ.into_mut(); - assoc.try_send((target_addr, Bytes::copy_from_slice(data))) - } - Entry::Vacant(vac) => { - let assoc = vac.insert(UdpAssociation::new( - self.context.clone(), - listener.clone(), - peer_addr, - self.assoc_map.clone(), - )); - trace!("created udp association for {}", peer_addr); - assoc.try_send((target_addr, Bytes::copy_from_slice(data))) - } + let mut assoc_map = self.assoc_map.lock().await; + + if let Some(assoc) = assoc_map.get(&peer_addr) { + return assoc.try_send((target_addr, Bytes::copy_from_slice(data))); } + + let assoc = UdpAssociation::new( + self.context.clone(), + listener.clone(), + peer_addr, + self.assoc_map.clone(), + ); + + trace!("created udp association for {}", peer_addr); + + assoc.try_send((target_addr, Bytes::copy_from_slice(data)))?; + assoc_map.insert(peer_addr, assoc); + + Ok(()) } } @@ -148,7 +154,7 @@ impl UdpAssociation { context: Arc, inbound: Arc, peer_addr: SocketAddr, - assoc_map: Arc>>, + assoc_map: SharedAssociationMap, ) -> UdpAssociation { let (assoc, sender) = UdpAssociationContext::new(context, inbound, peer_addr, assoc_map); UdpAssociation { assoc, sender } @@ -200,8 +206,8 @@ struct UdpAssociationContext { peer_addr: SocketAddr, outbound_ipv4_socket: SpinMutex, outbound_ipv6_socket: SpinMutex, - assoc_map: Arc>>, - target_cache: Mutex>, + assoc_map: SharedAssociationMap, + target_cache: Mutex>, } impl Drop for UdpAssociationContext { @@ -215,7 +221,7 @@ impl UdpAssociationContext { context: Arc, inbound: Arc, peer_addr: SocketAddr, - assoc_map: Arc>>, + assoc_map: SharedAssociationMap, ) -> (Arc, mpsc::Sender<(Address, Bytes)>) { // Pending packets 1024 should be good enough for a server. // If there are plenty of packets stuck in the channel, dropping exccess packets is a good way to protect the server from @@ -232,8 +238,12 @@ impl UdpAssociationContext { // Cache for remembering the original Address of target, // when recv_from a SocketAddr, we have to know whch Address that client was originally requested. // - // XXX: 64 target addresses should be enough for __one__ client. - target_cache: Mutex::new(LruCache::with_capacity(64)), + // XXX: 128 target addresses should be enough for __one__ client. + // 1 hours should be enough for caching the address mapping. Most of the DNS records' TTL won't last that long. + target_cache: Mutex::new(TimedLfuCache::with_capacity_and_expiration( + 128, + Duration::from_secs(1 * 60 * 60), + )), }); let l2r_task = { diff --git a/crates/shadowsocks/Cargo.toml b/crates/shadowsocks/Cargo.toml index 0f0d23de..1321d319 100644 --- a/crates/shadowsocks/Cargo.toml +++ b/crates/shadowsocks/Cargo.toml @@ -34,7 +34,7 @@ aead-cipher-extra = ["shadowsocks-crypto/v1-aead-extra"] [dependencies] log = "0.4" -libc = "0.2" +libc = "0.2.94" bytes = "1.0" cfg-if = "1" byte_string = "1.0" diff --git a/tests/tunnel.rs b/tests/tunnel.rs index 7cb2761e..8a57510b 100644 --- a/tests/tunnel.rs +++ b/tests/tunnel.rs @@ -3,6 +3,7 @@ use std::str; use byte_string::ByteStr; +use log::debug; use tokio::{ self, io::{AsyncReadExt, AsyncWriteExt}, @@ -74,11 +75,24 @@ async fn tcp_tunnel() { #[tokio::test] async fn udp_tunnel() { - // Query firefox.com, TransactionID: 0x1234 - static DNS_QUERY: &[u8] = b"\x12\x34\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07firefox\x03com\x00\x00\x01\x00\x01"; - let _ = env_logger::try_init(); + // A UDP echo server + tokio::spawn(async { + let socket = UdpSocket::bind("127.0.0.1:9230").await.unwrap(); + + debug!("UDP echo server listening on 127.0.0.1:9230"); + + let mut buffer = [0u8; 65536]; + loop { + let (n, peer_addr) = socket.recv_from(&mut buffer).await.unwrap(); + debug!("UDP echo server received {} bytes from {}, echoing", n, peer_addr); + socket.send_to(&buffer[..n], peer_addr).await.unwrap(); + } + }); + + time::sleep(Duration::from_secs(1)).await; + let local_config = Config::load_from_str( r#"{ "locals": [ @@ -86,8 +100,8 @@ async fn udp_tunnel() { "local_port": 9210, "local_address": "127.0.0.1", "protocol": "tunnel", - "forward_address": "8.8.8.8", - "forward_port": 53 + "forward_address": "127.0.0.1", + "forward_port": 9230 } ], "server": "127.0.0.1", @@ -117,17 +131,16 @@ async fn udp_tunnel() { time::sleep(Duration::from_secs(1)).await; + const MESSAGE: &[u8] = b"hello shadowsocks"; + let socket = UdpSocket::bind("0.0.0.0:0").await.unwrap(); - socket.send_to(DNS_QUERY, "127.0.0.1:9210").await.unwrap(); + socket.send_to(MESSAGE, "127.0.0.1:9210").await.unwrap(); let mut buf = vec![0u8; 65536]; let n = socket.recv(&mut buf).await.unwrap(); - // DNS response have at least 12 bytes - assert!(n >= 12); - let recv_payload = &buf[..n]; println!("Got reply from server: {:?}", ByteStr::new(&recv_payload)); - assert_eq!(b"\x12\x34", &recv_payload[0..2]); + assert_eq!(MESSAGE, recv_payload); }