mirror of
https://github.com/shadowsocks/shadowsocks-rust.git
synced 2026-02-09 01:59:16 +08:00
Manage UDP associations with LFU strategy (#506)
* manage UDP associations with LFU - DNS associations could be evicted firstly then others like HTTP/3 * UDP redir caches inbound non-local socket - Optimization: prevent creating new sockets for the same remotes * updated libc to v0.2.94 for unified TCP options * make clippy happy * Pin lfu-cache to v1.2.1 for edward-shen/lfu-cache#2 * UDP target should cache with expiration 1hrs * UDP tunnel test with an echo server - CI crashes occasionally because of 8.8.8.8:53 doesn't respond
This commit is contained in:
20
Cargo.lock
generated
20
Cargo.lock
generated
@@ -750,10 +750,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.93"
|
||||
name = "lfu_cache"
|
||||
version = "1.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41"
|
||||
checksum = "29d2e56f95e5fda80586d85e2e98bb6dba8f71f4406161ce90698fa38ff16486"
|
||||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.94"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "18794a8ad5b29321f790b55d93dfba91e125cb1a9edbd4f8e3150acc771c1a5e"
|
||||
|
||||
[[package]]
|
||||
name = "libmimalloc-sys"
|
||||
@@ -831,12 +837,6 @@ dependencies = [
|
||||
"linked-hash-map",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lru_time_cache"
|
||||
version = "0.11.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "991058cf0e7f161b37a4a610ddbada9d2f051d0db76369b973de9f3466f1cba3"
|
||||
|
||||
[[package]]
|
||||
name = "maplit"
|
||||
version = "1.0.2"
|
||||
@@ -1549,10 +1549,10 @@ dependencies = [
|
||||
"ipnet",
|
||||
"iprange",
|
||||
"json5",
|
||||
"lfu_cache",
|
||||
"libc",
|
||||
"log",
|
||||
"log4rs",
|
||||
"lru_time_cache",
|
||||
"mio",
|
||||
"native-tls",
|
||||
"nix",
|
||||
|
||||
@@ -357,7 +357,7 @@ fn main() {
|
||||
|
||||
#[cfg(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "ios"))]
|
||||
if let Some(iface) = matches.value_of("OUTBOUND_BIND_INTERFACE") {
|
||||
config.outbound_bind_interface = Some(From::from(iface.to_owned()));
|
||||
config.outbound_bind_interface = Some(iface.to_owned());
|
||||
}
|
||||
|
||||
#[cfg(all(unix, not(target_os = "android")))]
|
||||
|
||||
@@ -163,9 +163,9 @@ fn main() {
|
||||
config.outbound_fwmark = Some(mark.parse::<u32>().expect("an unsigned integer for `outbound-fwmark`"));
|
||||
}
|
||||
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
#[cfg(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "ios"))]
|
||||
if let Some(iface) = matches.value_of("OUTBOUND_BIND_INTERFACE") {
|
||||
config.outbound_bind_interface = Some(From::from(iface.to_owned()));
|
||||
config.outbound_bind_interface = Some(iface.to_owned());
|
||||
}
|
||||
|
||||
if let Some(m) = matches.value_of("MANAGER_ADDRESS") {
|
||||
|
||||
@@ -204,9 +204,9 @@ fn main() {
|
||||
config.outbound_fwmark = Some(mark.parse::<u32>().expect("an unsigned integer for `outbound-fwmark`"));
|
||||
}
|
||||
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
#[cfg(any(target_os = "linux", target_os = "android", target_os = "macos", target_os = "ios"))]
|
||||
if let Some(iface) = matches.value_of("OUTBOUND_BIND_INTERFACE") {
|
||||
config.outbound_bind_interface = Some(From::from(iface.to_owned()));
|
||||
config.outbound_bind_interface = Some(iface.to_owned());
|
||||
}
|
||||
|
||||
if let Some(m) = matches.value_of("MANAGER_ADDRESS") {
|
||||
|
||||
@@ -77,7 +77,7 @@ once_cell = "1.7"
|
||||
thiserror = "1.0"
|
||||
|
||||
spin = { version = "0.9", features = ["std"] }
|
||||
lru_time_cache = "0.11"
|
||||
lfu_cache = "1.2.1"
|
||||
bytes = "1.0"
|
||||
byte_string = "1.0"
|
||||
byteorder = "1.3"
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::sync::Arc;
|
||||
use std::{net::IpAddr, time::Duration};
|
||||
|
||||
#[cfg(feature = "local-dns")]
|
||||
use lru_time_cache::LruCache;
|
||||
use lfu_cache::TimedLfuCache;
|
||||
use shadowsocks::{
|
||||
config::ServerType,
|
||||
context::{Context, SharedContext},
|
||||
@@ -32,7 +32,7 @@ pub struct ServiceContext {
|
||||
|
||||
// For DNS relay's ACL domain name reverse lookup -- whether the IP shall be forwarded
|
||||
#[cfg(feature = "local-dns")]
|
||||
reverse_lookup_cache: Mutex<LruCache<IpAddr, bool>>,
|
||||
reverse_lookup_cache: Mutex<TimedLfuCache<IpAddr, bool>>,
|
||||
}
|
||||
|
||||
impl Default for ServiceContext {
|
||||
@@ -51,7 +51,10 @@ impl ServiceContext {
|
||||
acl: None,
|
||||
flow_stat: Arc::new(FlowStat::new()),
|
||||
#[cfg(feature = "local-dns")]
|
||||
reverse_lookup_cache: Mutex::new(LruCache::with_expiry_duration(Duration::from_secs(3 * 24 * 60 * 60))),
|
||||
reverse_lookup_cache: Mutex::new(TimedLfuCache::with_capacity_and_expiration(
|
||||
10240, // XXX: It should be enough for a normal user.
|
||||
Duration::from_secs(3 * 24 * 60 * 60),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use hyper::{Body, Client};
|
||||
use lru_time_cache::LruCache;
|
||||
use lfu_cache::TimedLfuCache;
|
||||
use shadowsocks::config::ServerAddr;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
@@ -14,14 +14,14 @@ use super::{connector::ProxyConnector, http_client::ProxyHttpClient};
|
||||
/// Cached HTTP client for remote servers
|
||||
pub struct ProxyClientCache {
|
||||
context: Arc<ServiceContext>,
|
||||
cache: Mutex<LruCache<ServerAddr, ProxyHttpClient>>,
|
||||
cache: Mutex<TimedLfuCache<ServerAddr, ProxyHttpClient>>,
|
||||
}
|
||||
|
||||
impl ProxyClientCache {
|
||||
pub fn new(context: Arc<ServiceContext>) -> ProxyClientCache {
|
||||
ProxyClientCache {
|
||||
context,
|
||||
cache: Mutex::new(LruCache::with_expiry_duration_and_capacity(Duration::from_secs(60), 5)),
|
||||
cache: Mutex::new(TimedLfuCache::with_capacity_and_expiration(5, Duration::from_secs(60))),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ impl AutoProxyClientStream {
|
||||
let addr = addr.into();
|
||||
let stream =
|
||||
TcpStream::connect_remote_with_opts(context.context_ref(), &addr, context.connect_opts_ref()).await?;
|
||||
Ok(AutoProxyClientStream::Bypassed(stream.into()))
|
||||
Ok(AutoProxyClientStream::Bypassed(stream))
|
||||
}
|
||||
|
||||
/// Connect to target `addr` via shadowsocks' server configured by `svr_cfg`
|
||||
|
||||
@@ -10,8 +10,8 @@ use std::{
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::future::{self, AbortHandle};
|
||||
use lfu_cache::TimedLfuCache;
|
||||
use log::{debug, error, trace, warn};
|
||||
use lru_time_cache::{Entry, LruCache};
|
||||
use shadowsocks::{
|
||||
lookup_then,
|
||||
net::UdpSocket as ShadowUdpSocket,
|
||||
@@ -42,6 +42,9 @@ pub trait UdpInboundWrite {
|
||||
async fn send_to(&self, peer_addr: SocketAddr, remote_addr: &Address, data: &[u8]) -> io::Result<()>;
|
||||
}
|
||||
|
||||
type AssociationMap<W> = TimedLfuCache<SocketAddr, UdpAssociation<W>>;
|
||||
type SharedAssociationMap<W> = Arc<Mutex<AssociationMap<W>>>;
|
||||
|
||||
/// UDP association manager
|
||||
pub struct UdpAssociationManager<W>
|
||||
where
|
||||
@@ -49,7 +52,7 @@ where
|
||||
{
|
||||
respond_writer: W,
|
||||
context: Arc<ServiceContext>,
|
||||
assoc_map: Arc<Mutex<LruCache<SocketAddr, UdpAssociation<W>>>>,
|
||||
assoc_map: SharedAssociationMap<W>,
|
||||
cleanup_abortable: AbortHandle,
|
||||
balancer: PingBalancer,
|
||||
}
|
||||
@@ -77,8 +80,8 @@ where
|
||||
) -> UdpAssociationManager<W> {
|
||||
let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION);
|
||||
let assoc_map = Arc::new(Mutex::new(match capacity {
|
||||
Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity),
|
||||
None => LruCache::with_expiry_duration(time_to_live),
|
||||
Some(capacity) => TimedLfuCache::with_capacity_and_expiration(capacity, time_to_live),
|
||||
None => TimedLfuCache::with_expiration(time_to_live),
|
||||
}));
|
||||
|
||||
let cleanup_abortable = {
|
||||
@@ -87,8 +90,8 @@ where
|
||||
loop {
|
||||
time::sleep(time_to_live).await;
|
||||
|
||||
// iter() will trigger a cleanup of expired associations
|
||||
let _ = assoc_map.lock().await.iter();
|
||||
// cleanup expired associations
|
||||
let _ = assoc_map.lock().await.evict_expired();
|
||||
}
|
||||
});
|
||||
tokio::spawn(cleanup_task);
|
||||
@@ -107,23 +110,27 @@ where
|
||||
/// Sends `data` from `peer_addr` to `target_addr`
|
||||
pub async fn send_to(&self, peer_addr: SocketAddr, target_addr: Address, data: &[u8]) -> io::Result<()> {
|
||||
// Check or (re)create an association
|
||||
match self.assoc_map.lock().await.entry(peer_addr) {
|
||||
Entry::Occupied(occ) => {
|
||||
let assoc = occ.into_mut();
|
||||
assoc.try_send((target_addr, Bytes::copy_from_slice(data)))
|
||||
}
|
||||
Entry::Vacant(vac) => {
|
||||
let assoc = vac.insert(UdpAssociation::new(
|
||||
self.context.clone(),
|
||||
peer_addr,
|
||||
self.assoc_map.clone(),
|
||||
self.balancer.clone(),
|
||||
self.respond_writer.clone(),
|
||||
));
|
||||
trace!("created udp association for {}", peer_addr);
|
||||
assoc.try_send((target_addr, Bytes::copy_from_slice(data)))
|
||||
}
|
||||
|
||||
let mut assoc_map = self.assoc_map.lock().await;
|
||||
|
||||
if let Some(assoc) = assoc_map.get(&peer_addr) {
|
||||
return assoc.try_send((target_addr, Bytes::copy_from_slice(data)));
|
||||
}
|
||||
|
||||
let assoc = UdpAssociation::new(
|
||||
self.context.clone(),
|
||||
peer_addr,
|
||||
self.assoc_map.clone(),
|
||||
self.balancer.clone(),
|
||||
self.respond_writer.clone(),
|
||||
);
|
||||
|
||||
trace!("created udp association for {}", peer_addr);
|
||||
|
||||
assoc.try_send((target_addr, Bytes::copy_from_slice(data)))?;
|
||||
assoc_map.insert(peer_addr, assoc);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -153,7 +160,7 @@ where
|
||||
fn new(
|
||||
context: Arc<ServiceContext>,
|
||||
peer_addr: SocketAddr,
|
||||
assoc_map: Arc<Mutex<LruCache<SocketAddr, UdpAssociation<W>>>>,
|
||||
assoc_map: SharedAssociationMap<W>,
|
||||
balancer: PingBalancer,
|
||||
respond_writer: W,
|
||||
) -> UdpAssociation<W> {
|
||||
@@ -245,7 +252,7 @@ where
|
||||
bypassed_ipv4_socket: SpinMutex<UdpAssociationBypassState>,
|
||||
bypassed_ipv6_socket: SpinMutex<UdpAssociationBypassState>,
|
||||
proxied_socket: SpinMutex<UdpAssociationProxyState>,
|
||||
assoc_map: Arc<Mutex<LruCache<SocketAddr, UdpAssociation<W>>>>,
|
||||
assoc_map: SharedAssociationMap<W>,
|
||||
balancer: PingBalancer,
|
||||
respond_writer: W,
|
||||
}
|
||||
@@ -266,7 +273,7 @@ where
|
||||
fn new(
|
||||
context: Arc<ServiceContext>,
|
||||
peer_addr: SocketAddr,
|
||||
assoc_map: Arc<Mutex<LruCache<SocketAddr, UdpAssociation<W>>>>,
|
||||
assoc_map: SharedAssociationMap<W>,
|
||||
balancer: PingBalancer,
|
||||
respond_writer: W,
|
||||
) -> (Arc<UdpAssociationContext<W>>, mpsc::Sender<(Address, Bytes)>) {
|
||||
|
||||
@@ -8,12 +8,16 @@ use std::{
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::future::{self, AbortHandle};
|
||||
use lfu_cache::TimedLfuCache;
|
||||
use log::{error, info, trace, warn};
|
||||
use shadowsocks::{
|
||||
lookup_then,
|
||||
net::ConnectOpts,
|
||||
relay::{socks5::Address, udprelay::MAXIMUM_UDP_PAYLOAD_SIZE},
|
||||
ServerAddr,
|
||||
};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::{
|
||||
config::RedirType,
|
||||
@@ -32,10 +36,63 @@ use self::sys::UdpRedirSocket;
|
||||
|
||||
mod sys;
|
||||
|
||||
const INBOUND_SOCKET_CACHE_EXPIRATION: Duration = Duration::from_secs(60);
|
||||
const INBOUND_SOCKET_CACHE_CAPACITY: usize = 256;
|
||||
|
||||
struct UdpRedirInboundCache {
|
||||
cache: Arc<Mutex<TimedLfuCache<SocketAddr, Arc<UdpRedirSocket>>>>,
|
||||
watcher: AbortHandle,
|
||||
}
|
||||
|
||||
impl Drop for UdpRedirInboundCache {
|
||||
fn drop(&mut self) {
|
||||
self.watcher.abort();
|
||||
}
|
||||
}
|
||||
|
||||
impl UdpRedirInboundCache {
|
||||
fn new() -> UdpRedirInboundCache {
|
||||
let cache = Arc::new(Mutex::new(TimedLfuCache::with_capacity_and_expiration(
|
||||
INBOUND_SOCKET_CACHE_CAPACITY,
|
||||
INBOUND_SOCKET_CACHE_EXPIRATION,
|
||||
)));
|
||||
|
||||
let (cleanup_fut, watcher) = {
|
||||
let cache = cache.clone();
|
||||
future::abortable(async move {
|
||||
loop {
|
||||
tokio::time::sleep(INBOUND_SOCKET_CACHE_EXPIRATION).await;
|
||||
cache.lock().await.evict_expired();
|
||||
}
|
||||
})
|
||||
};
|
||||
tokio::spawn(cleanup_fut);
|
||||
|
||||
UdpRedirInboundCache { cache, watcher }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct UdpRedirInboundWriter {
|
||||
redir_ty: RedirType,
|
||||
socket_opts: RedirSocketOpts,
|
||||
inbound_cache: Arc<UdpRedirInboundCache>,
|
||||
}
|
||||
|
||||
impl UdpRedirInboundWriter {
|
||||
#[allow(unused_variables, clippy::needless_update)]
|
||||
fn new(redir_ty: RedirType, opts: &ConnectOpts) -> UdpRedirInboundWriter {
|
||||
UdpRedirInboundWriter {
|
||||
redir_ty,
|
||||
socket_opts: RedirSocketOpts {
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
fwmark: opts.fwmark,
|
||||
|
||||
..Default::default()
|
||||
},
|
||||
inbound_cache: Arc::new(UdpRedirInboundCache::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -61,12 +118,25 @@ impl UdpInboundWrite for UdpRedirInboundWriter {
|
||||
}
|
||||
};
|
||||
|
||||
// Create a socket binds to destination addr
|
||||
// This only works for systems that supports binding to non-local addresses
|
||||
//
|
||||
// This socket has to set SO_REUSEADDR and SO_REUSEPORT.
|
||||
// Outbound addresses could be connected from different source addresses.
|
||||
let inbound = UdpRedirSocket::bind_nonlocal(self.redir_ty, addr, &self.socket_opts)?;
|
||||
let inbound = {
|
||||
let mut cache = self.inbound_cache.cache.lock().await;
|
||||
if let Some(socket) = cache.get(&addr) {
|
||||
socket.clone()
|
||||
} else {
|
||||
// Create a socket binds to destination addr
|
||||
// This only works for systems that supports binding to non-local addresses
|
||||
//
|
||||
// This socket has to set SO_REUSEADDR and SO_REUSEPORT.
|
||||
// Outbound addresses could be connected from different source addresses.
|
||||
let inbound = UdpRedirSocket::bind_nonlocal(self.redir_ty, addr, &self.socket_opts)?;
|
||||
|
||||
// UDP socket could be shared between threads and is safe to be manipulated by multiple threads
|
||||
let inbound = Arc::new(inbound);
|
||||
cache.insert(addr, inbound.clone());
|
||||
|
||||
inbound
|
||||
}
|
||||
};
|
||||
|
||||
// Send back to client
|
||||
inbound.send_to(data, peer_addr).await.map(|n| {
|
||||
@@ -133,15 +203,7 @@ impl UdpRedir {
|
||||
#[allow(clippy::needless_update)]
|
||||
let manager = UdpAssociationManager::new(
|
||||
self.context.clone(),
|
||||
UdpRedirInboundWriter {
|
||||
redir_ty: self.redir_ty,
|
||||
socket_opts: RedirSocketOpts {
|
||||
#[cfg(any(target_os = "linux", target_os = "android"))]
|
||||
fwmark: self.context.connect_opts_ref().fwmark,
|
||||
|
||||
..Default::default()
|
||||
},
|
||||
},
|
||||
UdpRedirInboundWriter::new(self.redir_ty, self.context.connect_opts_ref()),
|
||||
self.time_to_live,
|
||||
self.capacity,
|
||||
balancer,
|
||||
|
||||
@@ -5,8 +5,8 @@ use std::{io, net::SocketAddr, sync::Arc, time::Duration};
|
||||
use bytes::Bytes;
|
||||
use futures::future::{self, AbortHandle};
|
||||
use io::ErrorKind;
|
||||
use lfu_cache::TimedLfuCache;
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use lru_time_cache::{Entry, LruCache};
|
||||
use shadowsocks::{
|
||||
lookup_then,
|
||||
net::UdpSocket as ShadowUdpSocket,
|
||||
@@ -28,9 +28,12 @@ use crate::{
|
||||
net::MonProxySocket,
|
||||
};
|
||||
|
||||
type AssociationMap = TimedLfuCache<SocketAddr, UdpAssociation>;
|
||||
type SharedAssociationMap = Arc<Mutex<AssociationMap>>;
|
||||
|
||||
pub struct UdpTunnel {
|
||||
context: Arc<ServiceContext>,
|
||||
assoc_map: Arc<Mutex<LruCache<SocketAddr, UdpAssociation>>>,
|
||||
assoc_map: SharedAssociationMap,
|
||||
cleanup_abortable: AbortHandle,
|
||||
}
|
||||
|
||||
@@ -44,8 +47,8 @@ impl UdpTunnel {
|
||||
pub fn new(context: Arc<ServiceContext>, time_to_live: Option<Duration>, capacity: Option<usize>) -> UdpTunnel {
|
||||
let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION);
|
||||
let assoc_map = Arc::new(Mutex::new(match capacity {
|
||||
Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity),
|
||||
None => LruCache::with_expiry_duration(time_to_live),
|
||||
Some(capacity) => TimedLfuCache::with_capacity_and_expiration(capacity, time_to_live),
|
||||
None => TimedLfuCache::with_expiration(time_to_live),
|
||||
}));
|
||||
|
||||
let cleanup_abortable = {
|
||||
@@ -54,8 +57,8 @@ impl UdpTunnel {
|
||||
loop {
|
||||
time::sleep(time_to_live).await;
|
||||
|
||||
// iter() will trigger a cleanup of expired associations
|
||||
let _ = assoc_map.lock().await.iter();
|
||||
// cleanup expired associations
|
||||
let _ = assoc_map.lock().await.evict_expired();
|
||||
}
|
||||
});
|
||||
tokio::spawn(cleanup_task);
|
||||
@@ -126,24 +129,26 @@ impl UdpTunnel {
|
||||
data: &[u8],
|
||||
) -> io::Result<()> {
|
||||
let mut assoc_map = self.assoc_map.lock().await;
|
||||
match assoc_map.entry(peer_addr) {
|
||||
Entry::Occupied(occ) => {
|
||||
let assoc = occ.into_mut();
|
||||
assoc.try_send(Bytes::copy_from_slice(data))
|
||||
}
|
||||
Entry::Vacant(vac) => {
|
||||
let assoc = vac.insert(UdpAssociation::new(
|
||||
self.context.clone(),
|
||||
listener.clone(),
|
||||
peer_addr,
|
||||
forward_addr.clone(),
|
||||
self.assoc_map.clone(),
|
||||
balancer.clone(),
|
||||
));
|
||||
trace!("created udp association for {}", peer_addr);
|
||||
assoc.try_send(Bytes::copy_from_slice(data))
|
||||
}
|
||||
|
||||
if let Some(assoc) = assoc_map.get(&peer_addr) {
|
||||
return assoc.try_send(Bytes::copy_from_slice(data));
|
||||
}
|
||||
|
||||
let assoc = UdpAssociation::new(
|
||||
self.context.clone(),
|
||||
listener.clone(),
|
||||
peer_addr,
|
||||
forward_addr.clone(),
|
||||
self.assoc_map.clone(),
|
||||
balancer.clone(),
|
||||
);
|
||||
|
||||
trace!("created udp association for {}", peer_addr);
|
||||
|
||||
assoc.try_send(Bytes::copy_from_slice(data))?;
|
||||
assoc_map.insert(peer_addr, assoc);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -164,7 +169,7 @@ impl UdpAssociation {
|
||||
inbound: Arc<UdpSocket>,
|
||||
peer_addr: SocketAddr,
|
||||
forward_addr: Address,
|
||||
assoc_map: Arc<Mutex<LruCache<SocketAddr, UdpAssociation>>>,
|
||||
assoc_map: SharedAssociationMap,
|
||||
balancer: PingBalancer,
|
||||
) -> UdpAssociation {
|
||||
let (assoc, sender) =
|
||||
@@ -222,7 +227,7 @@ struct UdpAssociationContext {
|
||||
peer_addr: SocketAddr,
|
||||
forward_addr: Address,
|
||||
proxied_socket: SpinMutex<UdpAssociationState>,
|
||||
assoc_map: Arc<Mutex<LruCache<SocketAddr, UdpAssociation>>>,
|
||||
assoc_map: SharedAssociationMap,
|
||||
balancer: PingBalancer,
|
||||
}
|
||||
|
||||
@@ -238,7 +243,7 @@ impl UdpAssociationContext {
|
||||
inbound: Arc<UdpSocket>,
|
||||
peer_addr: SocketAddr,
|
||||
forward_addr: Address,
|
||||
assoc_map: Arc<Mutex<LruCache<SocketAddr, UdpAssociation>>>,
|
||||
assoc_map: SharedAssociationMap,
|
||||
balancer: PingBalancer,
|
||||
) -> (Arc<UdpAssociationContext>, mpsc::Sender<Bytes>) {
|
||||
// Pending packets 1024 should be good enough for a server.
|
||||
|
||||
@@ -5,8 +5,8 @@ use std::{io, net::SocketAddr, sync::Arc, time::Duration};
|
||||
use bytes::Bytes;
|
||||
use futures::future::{self, AbortHandle};
|
||||
use io::ErrorKind;
|
||||
use lfu_cache::TimedLfuCache;
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use lru_time_cache::{Entry, LruCache};
|
||||
use shadowsocks::{
|
||||
lookup_then,
|
||||
net::UdpSocket as OutboundUdpSocket,
|
||||
@@ -26,9 +26,12 @@ use crate::net::MonProxySocket;
|
||||
|
||||
use super::context::ServiceContext;
|
||||
|
||||
type AssociationMap = TimedLfuCache<SocketAddr, UdpAssociation>;
|
||||
type SharedAssociationMap = Arc<Mutex<AssociationMap>>;
|
||||
|
||||
pub struct UdpServer {
|
||||
context: Arc<ServiceContext>,
|
||||
assoc_map: Arc<Mutex<LruCache<SocketAddr, UdpAssociation>>>,
|
||||
assoc_map: SharedAssociationMap,
|
||||
cleanup_abortable: AbortHandle,
|
||||
}
|
||||
|
||||
@@ -42,8 +45,8 @@ impl UdpServer {
|
||||
pub fn new(context: Arc<ServiceContext>, time_to_live: Option<Duration>, capacity: Option<usize>) -> UdpServer {
|
||||
let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION);
|
||||
let assoc_map = Arc::new(Mutex::new(match capacity {
|
||||
Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity),
|
||||
None => LruCache::with_expiry_duration(time_to_live),
|
||||
Some(capacity) => TimedLfuCache::with_capacity_and_expiration(capacity, time_to_live),
|
||||
None => TimedLfuCache::with_expiration(time_to_live),
|
||||
}));
|
||||
|
||||
let cleanup_abortable = {
|
||||
@@ -52,8 +55,8 @@ impl UdpServer {
|
||||
loop {
|
||||
time::sleep(time_to_live).await;
|
||||
|
||||
// iter() will trigger a cleanup of expired associations
|
||||
let _ = assoc_map.lock().await.iter();
|
||||
// cleanup expired associations
|
||||
assoc_map.lock().await.evict_expired();
|
||||
}
|
||||
});
|
||||
tokio::spawn(cleanup_task);
|
||||
@@ -112,22 +115,25 @@ impl UdpServer {
|
||||
target_addr: Address,
|
||||
data: &[u8],
|
||||
) -> io::Result<()> {
|
||||
match self.assoc_map.lock().await.entry(peer_addr) {
|
||||
Entry::Occupied(occ) => {
|
||||
let assoc = occ.into_mut();
|
||||
assoc.try_send((target_addr, Bytes::copy_from_slice(data)))
|
||||
}
|
||||
Entry::Vacant(vac) => {
|
||||
let assoc = vac.insert(UdpAssociation::new(
|
||||
self.context.clone(),
|
||||
listener.clone(),
|
||||
peer_addr,
|
||||
self.assoc_map.clone(),
|
||||
));
|
||||
trace!("created udp association for {}", peer_addr);
|
||||
assoc.try_send((target_addr, Bytes::copy_from_slice(data)))
|
||||
}
|
||||
let mut assoc_map = self.assoc_map.lock().await;
|
||||
|
||||
if let Some(assoc) = assoc_map.get(&peer_addr) {
|
||||
return assoc.try_send((target_addr, Bytes::copy_from_slice(data)));
|
||||
}
|
||||
|
||||
let assoc = UdpAssociation::new(
|
||||
self.context.clone(),
|
||||
listener.clone(),
|
||||
peer_addr,
|
||||
self.assoc_map.clone(),
|
||||
);
|
||||
|
||||
trace!("created udp association for {}", peer_addr);
|
||||
|
||||
assoc.try_send((target_addr, Bytes::copy_from_slice(data)))?;
|
||||
assoc_map.insert(peer_addr, assoc);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,7 +154,7 @@ impl UdpAssociation {
|
||||
context: Arc<ServiceContext>,
|
||||
inbound: Arc<MonProxySocket>,
|
||||
peer_addr: SocketAddr,
|
||||
assoc_map: Arc<Mutex<LruCache<SocketAddr, UdpAssociation>>>,
|
||||
assoc_map: SharedAssociationMap,
|
||||
) -> UdpAssociation {
|
||||
let (assoc, sender) = UdpAssociationContext::new(context, inbound, peer_addr, assoc_map);
|
||||
UdpAssociation { assoc, sender }
|
||||
@@ -200,8 +206,8 @@ struct UdpAssociationContext {
|
||||
peer_addr: SocketAddr,
|
||||
outbound_ipv4_socket: SpinMutex<UdpAssociationState>,
|
||||
outbound_ipv6_socket: SpinMutex<UdpAssociationState>,
|
||||
assoc_map: Arc<Mutex<LruCache<SocketAddr, UdpAssociation>>>,
|
||||
target_cache: Mutex<LruCache<SocketAddr, Address>>,
|
||||
assoc_map: SharedAssociationMap,
|
||||
target_cache: Mutex<TimedLfuCache<SocketAddr, Address>>,
|
||||
}
|
||||
|
||||
impl Drop for UdpAssociationContext {
|
||||
@@ -215,7 +221,7 @@ impl UdpAssociationContext {
|
||||
context: Arc<ServiceContext>,
|
||||
inbound: Arc<MonProxySocket>,
|
||||
peer_addr: SocketAddr,
|
||||
assoc_map: Arc<Mutex<LruCache<SocketAddr, UdpAssociation>>>,
|
||||
assoc_map: SharedAssociationMap,
|
||||
) -> (Arc<UdpAssociationContext>, mpsc::Sender<(Address, Bytes)>) {
|
||||
// Pending packets 1024 should be good enough for a server.
|
||||
// If there are plenty of packets stuck in the channel, dropping exccess packets is a good way to protect the server from
|
||||
@@ -232,8 +238,12 @@ impl UdpAssociationContext {
|
||||
// Cache for remembering the original Address of target,
|
||||
// when recv_from a SocketAddr, we have to know whch Address that client was originally requested.
|
||||
//
|
||||
// XXX: 64 target addresses should be enough for __one__ client.
|
||||
target_cache: Mutex::new(LruCache::with_capacity(64)),
|
||||
// XXX: 128 target addresses should be enough for __one__ client.
|
||||
// 1 hours should be enough for caching the address mapping. Most of the DNS records' TTL won't last that long.
|
||||
target_cache: Mutex::new(TimedLfuCache::with_capacity_and_expiration(
|
||||
128,
|
||||
Duration::from_secs(1 * 60 * 60),
|
||||
)),
|
||||
});
|
||||
|
||||
let l2r_task = {
|
||||
|
||||
@@ -34,7 +34,7 @@ aead-cipher-extra = ["shadowsocks-crypto/v1-aead-extra"]
|
||||
[dependencies]
|
||||
log = "0.4"
|
||||
|
||||
libc = "0.2"
|
||||
libc = "0.2.94"
|
||||
bytes = "1.0"
|
||||
cfg-if = "1"
|
||||
byte_string = "1.0"
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
use std::str;
|
||||
|
||||
use byte_string::ByteStr;
|
||||
use log::debug;
|
||||
use tokio::{
|
||||
self,
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
@@ -74,11 +75,24 @@ async fn tcp_tunnel() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn udp_tunnel() {
|
||||
// Query firefox.com, TransactionID: 0x1234
|
||||
static DNS_QUERY: &[u8] = b"\x12\x34\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07firefox\x03com\x00\x00\x01\x00\x01";
|
||||
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
// A UDP echo server
|
||||
tokio::spawn(async {
|
||||
let socket = UdpSocket::bind("127.0.0.1:9230").await.unwrap();
|
||||
|
||||
debug!("UDP echo server listening on 127.0.0.1:9230");
|
||||
|
||||
let mut buffer = [0u8; 65536];
|
||||
loop {
|
||||
let (n, peer_addr) = socket.recv_from(&mut buffer).await.unwrap();
|
||||
debug!("UDP echo server received {} bytes from {}, echoing", n, peer_addr);
|
||||
socket.send_to(&buffer[..n], peer_addr).await.unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
let local_config = Config::load_from_str(
|
||||
r#"{
|
||||
"locals": [
|
||||
@@ -86,8 +100,8 @@ async fn udp_tunnel() {
|
||||
"local_port": 9210,
|
||||
"local_address": "127.0.0.1",
|
||||
"protocol": "tunnel",
|
||||
"forward_address": "8.8.8.8",
|
||||
"forward_port": 53
|
||||
"forward_address": "127.0.0.1",
|
||||
"forward_port": 9230
|
||||
}
|
||||
],
|
||||
"server": "127.0.0.1",
|
||||
@@ -117,17 +131,16 @@ async fn udp_tunnel() {
|
||||
|
||||
time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
const MESSAGE: &[u8] = b"hello shadowsocks";
|
||||
|
||||
let socket = UdpSocket::bind("0.0.0.0:0").await.unwrap();
|
||||
socket.send_to(DNS_QUERY, "127.0.0.1:9210").await.unwrap();
|
||||
socket.send_to(MESSAGE, "127.0.0.1:9210").await.unwrap();
|
||||
|
||||
let mut buf = vec![0u8; 65536];
|
||||
let n = socket.recv(&mut buf).await.unwrap();
|
||||
|
||||
// DNS response have at least 12 bytes
|
||||
assert!(n >= 12);
|
||||
|
||||
let recv_payload = &buf[..n];
|
||||
println!("Got reply from server: {:?}", ByteStr::new(&recv_payload));
|
||||
|
||||
assert_eq!(b"\x12\x34", &recv_payload[0..2]);
|
||||
assert_eq!(MESSAGE, recv_payload);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user