UDP associations manager removes unnecessary blocking calls

- removes Mutex on manager's association map
- avoids send-back keep-alive events piling up in queue
This commit is contained in:
zonyitoo
2022-03-05 00:32:29 +08:00
parent e32b8dc823
commit 445e306aa9
13 changed files with 470 additions and 358 deletions

38
Cargo.lock generated
View File

@@ -301,9 +301,9 @@ dependencies = [
[[package]]
name = "clap"
version = "3.1.2"
version = "3.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5177fac1ab67102d8989464efd043c6ff44191b1557ec1ddd489b4f7e1447e77"
checksum = "ced1892c55c910c1219e98d6fc8d71f6bddba7905866ce740066d8bfea859312"
dependencies = [
"atty",
"bitflags",
@@ -513,9 +513,9 @@ dependencies = [
[[package]]
name = "ed25519"
version = "1.3.0"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74e1069e39f1454367eb2de793ed062fac4c35c2934b76a81d90dd9abcd28816"
checksum = "eed12bbf7b5312f8da1c2722bc06d8c6b12c2d86a7fb35a194c7f3e6fc2bbe39"
dependencies = [
"signature",
]
@@ -596,7 +596,7 @@ checksum = "975ccf83d8d9d0d84682850a38c8169027be83368805971cc4f238c2b245bc98"
dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.2.10",
"redox_syscall 0.2.11",
"winapi",
]
@@ -1343,9 +1343,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.9.0"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da32515d9f6e6e489d7bc9d84c71b060db7247dc035bbe44eac88cf87486d8d5"
checksum = "87f3e037eac156d1775da914196f0f37741a274155e34a0b7e427c35d2a2ecb9"
[[package]]
name = "opaque-debug"
@@ -1462,7 +1462,7 @@ dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall 0.2.10",
"redox_syscall 0.2.11",
"smallvec",
"winapi",
]
@@ -1475,7 +1475,7 @@ checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954"
dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.2.10",
"redox_syscall 0.2.11",
"smallvec",
"windows-sys",
]
@@ -1667,9 +1667,9 @@ checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
[[package]]
name = "redox_syscall"
version = "0.2.10"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c"
dependencies = [
"bitflags",
]
@@ -1681,7 +1681,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64"
dependencies = [
"getrandom",
"redox_syscall 0.2.10",
"redox_syscall 0.2.11",
]
[[package]]
@@ -2066,7 +2066,7 @@ dependencies = [
[[package]]
name = "shadowsocks-rust"
version = "1.13.5"
version = "1.14.0"
dependencies = [
"build-time",
"byte_string",
@@ -2098,7 +2098,7 @@ dependencies = [
[[package]]
name = "shadowsocks-service"
version = "1.13.5"
version = "1.14.0"
dependencies = [
"arc-swap 1.5.0",
"async-trait",
@@ -2295,16 +2295,16 @@ dependencies = [
"cfg-if",
"fastrand",
"libc",
"redox_syscall 0.2.10",
"redox_syscall 0.2.11",
"remove_dir_all",
"winapi",
]
[[package]]
name = "termcolor"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4"
checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755"
dependencies = [
"winapi-util",
]
@@ -2321,9 +2321,9 @@ dependencies = [
[[package]]
name = "textwrap"
version = "0.14.2"
version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0066c8d12af8b5acd21e00547c3797fde4e8677254a7ee429176ccebbe93dd80"
checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb"
dependencies = [
"terminal_size",
]

View File

@@ -1,6 +1,6 @@
[package]
name = "shadowsocks-rust"
version = "1.13.5"
version = "1.14.0"
authors = ["Shadowsocks Contributors"]
description = "shadowsocks is a fast tunnel proxy that helps you bypass firewalls."
repository = "https://github.com/shadowsocks/shadowsocks-rust"
@@ -164,7 +164,7 @@ jemallocator = { version = "0.3", optional = true }
snmalloc-rs = { version = "0.2", optional = true }
rpmalloc = { version = "0.2", optional = true }
shadowsocks-service = { version = "1.13.5", path = "./crates/shadowsocks-service" }
shadowsocks-service = { version = "1.14.0", path = "./crates/shadowsocks-service" }
[target.'cfg(unix)'.dependencies]
daemonize = "0.4"

View File

@@ -1,6 +1,6 @@
[package]
name = "shadowsocks-service"
version = "1.13.5"
version = "1.14.0"
authors = ["Shadowsocks Contributors"]
description = "shadowsocks is a fast tunnel proxy that helps you bypass firewalls."
repository = "https://github.com/shadowsocks/shadowsocks-rust"

View File

@@ -2,7 +2,12 @@
pub use self::{
tcp::{auto_proxy_io::AutoProxyIo, auto_proxy_stream::AutoProxyClientStream},
udp::{UdpAssociationManager, UdpInboundWrite},
udp::{
UdpAssociationManager,
UdpInboundWrite,
UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE,
UDP_ASSOCIATION_SEND_CHANNEL_SIZE,
},
};
mod tcp;

View File

@@ -4,7 +4,10 @@ use std::{
io::{self, ErrorKind},
marker::PhantomData,
net::SocketAddr,
sync::Arc,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
@@ -13,11 +16,7 @@ use bytes::Bytes;
use futures::future;
use log::{debug, error, trace, warn};
use lru_time_cache::LruCache;
use tokio::{
sync::{mpsc, Mutex},
task::JoinHandle,
time,
};
use tokio::{sync::mpsc, task::JoinHandle, time};
use shadowsocks::{
lookup_then,
@@ -29,7 +28,11 @@ use shadowsocks::{
};
use crate::{
local::{context::ServiceContext, loadbalancing::PingBalancer},
local::{
context::ServiceContext,
loadbalancing::PingBalancer,
net::udp::{UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE, UDP_ASSOCIATION_SEND_CHANNEL_SIZE},
},
net::MonProxySocket,
};
@@ -44,7 +47,6 @@ pub trait UdpInboundWrite {
}
type AssociationMap<W> = LruCache<SocketAddr, UdpAssociation<W>>;
type SharedAssociationMap<W> = Arc<Mutex<AssociationMap<W>>>;
/// UDP association manager
pub struct UdpAssociationManager<W>
@@ -53,82 +55,51 @@ where
{
respond_writer: W,
context: Arc<ServiceContext>,
assoc_map: SharedAssociationMap<W>,
cleanup_abortable: JoinHandle<()>,
keepalive_abortable: JoinHandle<()>,
assoc_map: AssociationMap<W>,
keepalive_tx: mpsc::Sender<SocketAddr>,
balancer: PingBalancer,
}
impl<W> Drop for UdpAssociationManager<W>
where
W: UdpInboundWrite + Clone + Send + Sync + Unpin + 'static,
{
fn drop(&mut self) {
self.cleanup_abortable.abort();
self.keepalive_abortable.abort();
}
}
impl<W> UdpAssociationManager<W>
where
W: UdpInboundWrite + Clone + Send + Sync + Unpin + 'static,
{
/// Create a new `UdpAssociationManager`
///
/// Returns (`UdpAssociationManager`, Cleanup Interval, Keep-alive Receiver<SocketAddr>)
pub fn new(
context: Arc<ServiceContext>,
respond_writer: W,
time_to_live: Option<Duration>,
capacity: Option<usize>,
balancer: PingBalancer,
) -> UdpAssociationManager<W> {
) -> (UdpAssociationManager<W>, Duration, mpsc::Receiver<SocketAddr>) {
let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION);
let assoc_map = Arc::new(Mutex::new(match capacity {
let assoc_map = match capacity {
Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity),
None => LruCache::with_expiry_duration(time_to_live),
}));
let cleanup_abortable = {
let assoc_map = assoc_map.clone();
tokio::spawn(async move {
loop {
time::sleep(time_to_live).await;
// cleanup expired associations. iter() will remove expired elements
let _ = assoc_map.lock().await.iter();
}
})
};
let (keepalive_tx, mut keepalive_rx) = mpsc::channel(256);
let (keepalive_tx, keepalive_rx) = mpsc::channel(UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE);
let keepalive_abortable = {
let assoc_map = assoc_map.clone();
tokio::spawn(async move {
while let Some(peer_addr) = keepalive_rx.recv().await {
assoc_map.lock().await.get(&peer_addr);
}
})
};
UdpAssociationManager {
respond_writer,
context,
assoc_map,
cleanup_abortable,
keepalive_abortable,
keepalive_tx,
balancer,
}
(
UdpAssociationManager {
respond_writer,
context,
assoc_map,
keepalive_tx,
balancer,
},
time_to_live,
keepalive_rx,
)
}
/// Sends `data` from `peer_addr` to `target_addr`
pub async fn send_to(&self, peer_addr: SocketAddr, target_addr: Address, data: &[u8]) -> io::Result<()> {
pub async fn send_to(&mut self, peer_addr: SocketAddr, target_addr: Address, data: &[u8]) -> io::Result<()> {
// Check or (re)create an association
let mut assoc_map = self.assoc_map.lock().await;
if let Some(assoc) = assoc_map.get(&peer_addr) {
if let Some(assoc) = self.assoc_map.get(&peer_addr) {
return assoc.try_send((target_addr, Bytes::copy_from_slice(data)));
}
@@ -143,10 +114,20 @@ where
debug!("created udp association for {}", peer_addr);
assoc.try_send((target_addr, Bytes::copy_from_slice(data)))?;
assoc_map.insert(peer_addr, assoc);
self.assoc_map.insert(peer_addr, assoc);
Ok(())
}
/// Cleanup expired associations
pub async fn cleanup_expired(&mut self) {
self.assoc_map.iter();
}
/// Keep-alive association
pub async fn keep_alive(&mut self, peer_addr: &SocketAddr) {
self.assoc_map.get(peer_addr);
}
}
struct UdpAssociation<W>
@@ -154,6 +135,7 @@ where
W: UdpInboundWrite + Send + Sync + Unpin + 'static,
{
assoc_handle: JoinHandle<()>,
keepalive_handle: JoinHandle<()>,
sender: mpsc::Sender<(Address, Bytes)>,
writer: PhantomData<W>,
}
@@ -164,6 +146,7 @@ where
{
fn drop(&mut self) {
self.assoc_handle.abort();
self.keepalive_handle.abort();
}
}
@@ -178,10 +161,26 @@ where
balancer: PingBalancer,
respond_writer: W,
) -> UdpAssociation<W> {
let keepalive_flag = Arc::new(AtomicBool::new(false));
let keepalive_handle = {
let keepalive_flag = keepalive_flag.clone();
tokio::spawn(async move {
loop {
time::sleep(Duration::from_secs(1)).await;
if keepalive_flag.load(Ordering::Acquire) {
let _ = keepalive_tx.send(peer_addr).await;
keepalive_flag.store(false, Ordering::Release);
}
}
})
};
let (assoc_handle, sender) =
UdpAssociationContext::create(context, peer_addr, keepalive_tx, balancer, respond_writer);
UdpAssociationContext::create(context, peer_addr, keepalive_flag, balancer, respond_writer);
UdpAssociation {
assoc_handle,
keepalive_handle,
sender,
writer: PhantomData,
}
@@ -205,7 +204,7 @@ where
bypassed_ipv4_socket: Option<ShadowUdpSocket>,
bypassed_ipv6_socket: Option<ShadowUdpSocket>,
proxied_socket: Option<MonProxySocket>,
keepalive_tx: mpsc::Sender<SocketAddr>,
keepalive_flag: Arc<AtomicBool>,
balancer: PingBalancer,
respond_writer: W,
}
@@ -226,14 +225,14 @@ where
fn create(
context: Arc<ServiceContext>,
peer_addr: SocketAddr,
keepalive_tx: mpsc::Sender<SocketAddr>,
keepalive_flag: Arc<AtomicBool>,
balancer: PingBalancer,
respond_writer: W,
) -> (JoinHandle<()>, mpsc::Sender<(Address, Bytes)>) {
// Pending packets 128 for each association should be good enough for a server.
// Pending packets UDP_ASSOCIATION_SEND_CHANNEL_SIZE for each association should be good enough for a server.
// If there are plenty of packets stuck in the channel, dropping excessive packets is a good way to protect the server from
// being OOM.
let (sender, receiver) = mpsc::channel(128);
let (sender, receiver) = mpsc::channel(UDP_ASSOCIATION_SEND_CHANNEL_SIZE);
let mut assoc = UdpAssociationContext {
context,
@@ -241,7 +240,7 @@ where
bypassed_ipv4_socket: None,
bypassed_ipv6_socket: None,
proxied_socket: None,
keepalive_tx,
keepalive_flag,
balancer,
respond_writer,
};
@@ -474,16 +473,15 @@ where
if bypassed { "bypassed" } else { "proxied" },
data.len(),
);
// Keep association alive in map
let _ = self
.keepalive_tx
.send_timeout(self.peer_addr, Duration::from_secs(1))
.await;
self.keepalive_flag.store(true, Ordering::Release);
// Send back to client
if let Err(err) = self.respond_writer.send_to(self.peer_addr, addr, data).await {
warn!(
"udp failed to send back to client {}, from target {} ({}), error: {}",
"udp failed to send back {} bytes to client {}, from target {} ({}), error: {}",
data.len(),
self.peer_addr,
addr,
if bypassed { "bypassed" } else { "proxied" },

View File

@@ -1,3 +1,9 @@
pub use self::association::{UdpAssociationManager, UdpInboundWrite};
pub mod association;
/// Packet size for all UDP associations' send queue
pub static UDP_ASSOCIATION_SEND_CHANNEL_SIZE: usize = 8192;
/// Keep-alive channel size for UDP associations' manager
pub static UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE: usize = 256;

View File

@@ -16,7 +16,7 @@ use shadowsocks::{
relay::{socks5::Address, udprelay::MAXIMUM_UDP_PAYLOAD_SIZE},
ServerAddr,
};
use tokio::{sync::Mutex, task::JoinHandle};
use tokio::{sync::Mutex, task::JoinHandle, time};
use crate::{
config::RedirType,
@@ -197,7 +197,7 @@ impl UdpRedir {
);
#[allow(clippy::needless_update)]
let manager = UdpAssociationManager::new(
let (mut manager, cleanup_interval, mut keepalive_rx) = UdpAssociationManager::new(
self.context.clone(),
UdpRedirInboundWriter::new(self.redir_ty, self.context.connect_opts_ref()),
self.time_to_live,
@@ -206,52 +206,68 @@ impl UdpRedir {
);
let mut pkt_buf = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE];
let mut cleanup_timer = time::interval(cleanup_interval);
loop {
let (recv_len, src, mut dst) = match listener.recv_dest_from(&mut pkt_buf).await {
Ok(o) => o,
Err(err) => {
error!("recv_dest_from failed with err: {}", err);
continue;
tokio::select! {
_ = cleanup_timer.tick() => {
// cleanup expired associations. iter() will remove expired elements
manager.cleanup_expired().await;
}
};
// Packet length is limited by MAXIMUM_UDP_PAYLOAD_SIZE, excess bytes will be discarded.
// Copy bytes, because udp_associate runs in another tokio Task
let pkt = &pkt_buf[..recv_len];
trace!(
"received UDP packet from {}, destination {}, length {} bytes",
src,
dst,
recv_len
);
if recv_len == 0 {
// For windows, it will generate a ICMP Port Unreachable Message
// https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-recvfrom
// Which will result in recv_from return 0.
//
// It cannot be solved here, because `WSAGetLastError` is already set.
//
// See `relay::udprelay::utils::create_socket` for more detail.
continue;
}
// Try to convert IPv4 mapped IPv6 address for dual-stack mode.
if let SocketAddr::V6(ref a) = dst {
if let Some(v4) = to_ipv4_mapped(a.ip()) {
dst = SocketAddr::new(IpAddr::from(v4), a.port());
peer_addr_opt = keepalive_rx.recv() => {
let peer_addr = peer_addr_opt.expect("keep-alive channel closed unexpectly");
manager.keep_alive(&peer_addr).await;
}
}
if let Err(err) = manager.send_to(src, Address::from(dst), pkt).await {
error!(
"udp packet relay {} -> {} with {} bytes failed, error: {}",
src,
dst,
pkt.len(),
err
);
recv_result = listener.recv_dest_from(&mut pkt_buf) => {
let (recv_len, src, mut dst) = match recv_result {
Ok(o) => o,
Err(err) => {
error!("recv_dest_from failed with err: {}", err);
continue;
}
};
// Packet length is limited by MAXIMUM_UDP_PAYLOAD_SIZE, excess bytes will be discarded.
// Copy bytes, because udp_associate runs in another tokio Task
let pkt = &pkt_buf[..recv_len];
trace!(
"received UDP packet from {}, destination {}, length {} bytes",
src,
dst,
recv_len
);
if recv_len == 0 {
// For windows, it will generate a ICMP Port Unreachable Message
// https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-recvfrom
// Which will result in recv_from return 0.
//
// It cannot be solved here, because `WSAGetLastError` is already set.
//
// See `relay::udprelay::utils::create_socket` for more detail.
continue;
}
// Try to convert IPv4 mapped IPv6 address for dual-stack mode.
if let SocketAddr::V6(ref a) = dst {
if let Some(v4) = to_ipv4_mapped(a.ip()) {
dst = SocketAddr::new(IpAddr::from(v4), a.port());
}
}
if let Err(err) = manager.send_to(src, Address::from(dst), pkt).await {
error!(
"udp packet relay {} -> {} with {} bytes failed, error: {}",
src,
dst,
pkt.len(),
err
);
}
}
}
}
}

View File

@@ -84,7 +84,7 @@ impl Socks5UdpServer {
info!("shadowsocks socks5 UDP listening on {}", socket.local_addr()?);
let listener = Arc::new(socket);
let manager = UdpAssociationManager::new(
let (mut manager, cleanup_interval, mut keepalive_rx) = UdpAssociationManager::new(
self.context.clone(),
Socks5UdpInboundWriter {
inbound: listener.clone(),
@@ -95,50 +95,66 @@ impl Socks5UdpServer {
);
let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE];
let mut cleanup_timer = time::interval(cleanup_interval);
loop {
let (n, peer_addr) = match listener.recv_from(&mut buffer).await {
Ok(s) => s,
Err(err) => {
error!("udp server recv_from failed with error: {}", err);
time::sleep(Duration::from_secs(1)).await;
continue;
tokio::select! {
_ = cleanup_timer.tick() => {
// cleanup expired associations. iter() will remove expired elements
manager.cleanup_expired().await;
}
};
let data = &buffer[..n];
// PKT = UdpAssociateHeader + PAYLOAD
let mut cur = Cursor::new(data);
let header = match UdpAssociateHeader::read_from(&mut cur).await {
Ok(h) => h,
Err(..) => {
error!("received invalid UDP associate packet: {:?}", ByteStr::new(data));
continue;
peer_addr_opt = keepalive_rx.recv() => {
let peer_addr = peer_addr_opt.expect("keep-alive channel closed unexpectly");
manager.keep_alive(&peer_addr).await;
}
};
if header.frag != 0 {
error!("received UDP associate with frag != 0, which is not supported by shadowsocks");
continue;
}
recv_result = listener.recv_from(&mut buffer) => {
let (n, peer_addr) = match recv_result {
Ok(s) => s,
Err(err) => {
error!("udp server recv_from failed with error: {}", err);
time::sleep(Duration::from_secs(1)).await;
continue;
}
};
let pos = cur.position() as usize;
let payload = &data[pos..];
let data = &buffer[..n];
trace!(
"UDP ASSOCIATE {} -> {}, {} bytes",
peer_addr,
header.address,
payload.len()
);
// PKT = UdpAssociateHeader + PAYLOAD
let mut cur = Cursor::new(data);
let header = match UdpAssociateHeader::read_from(&mut cur).await {
Ok(h) => h,
Err(..) => {
error!("received invalid UDP associate packet: {:?}", ByteStr::new(data));
continue;
}
};
if let Err(err) = manager.send_to(peer_addr, header.address, payload).await {
error!(
"udp packet from {} relay {} bytes failed, error: {}",
peer_addr,
data.len(),
err
);
if header.frag != 0 {
error!("received UDP associate with frag != 0, which is not supported by shadowsocks");
continue;
}
let pos = cur.position() as usize;
let payload = &data[pos..];
trace!(
"UDP ASSOCIATE {} -> {}, {} bytes",
peer_addr,
header.address,
payload.len()
);
if let Err(err) = manager.send_to(peer_addr, header.address, payload).await {
error!(
"udp packet from {} relay {} bytes failed, error: {}",
peer_addr,
data.len(),
err
);
}
}
}
}
}

View File

@@ -14,7 +14,7 @@ use ipnet::IpNet;
use log::{debug, error, info, trace, warn};
use shadowsocks::config::Mode;
use smoltcp::wire::{IpProtocol, TcpPacket, UdpPacket};
use tokio::io::AsyncReadExt;
use tokio::{io::AsyncReadExt, sync::mpsc, time};
use tun::{AsyncDevice, Configuration as TunConfiguration, Device as TunDevice, Error as TunError, Layer};
use crate::local::{context::ServiceContext, loadbalancing::PingBalancer};
@@ -99,7 +99,7 @@ impl TunBuilder {
Err(err) => return Err(io::Error::new(ErrorKind::Other, err)),
};
let udp = UdpTun::new(
let (udp, udp_cleanup_interval, udp_keepalive_rx) = UdpTun::new(
self.context.clone(),
self.balancer.clone(),
self.udp_expiry_duration,
@@ -116,6 +116,8 @@ impl TunBuilder {
device,
tcp,
udp,
udp_cleanup_interval,
udp_keepalive_rx,
mode: self.mode,
})
}
@@ -125,6 +127,8 @@ pub struct Tun {
device: AsyncDevice,
tcp: TcpTun,
udp: UdpTun,
udp_cleanup_interval: Duration,
udp_keepalive_rx: mpsc::Receiver<SocketAddr>,
mode: Mode,
}
@@ -141,6 +145,7 @@ impl Tun {
);
let mut packet_buffer = vec![0u8; 65536 + IFF_PI_PREFIX_LEN].into_boxed_slice();
let mut udp_cleanup_timer = time::interval(self.udp_cleanup_interval);
loop {
tokio::select! {
@@ -173,6 +178,17 @@ impl Tun {
}
}
// UDP cleanup expired associations
_ = udp_cleanup_timer.tick() => {
self.udp.cleanup_expired().await;
}
// UDP keep-alive associations
peer_addr_opt = self.udp_keepalive_rx.recv() => {
let peer_addr = peer_addr_opt.expect("UDP keep-alive channel closed unexpectly");
self.udp.keep_alive(&peer_addr).await;
}
// TCP channel sent back
packet = self.tcp.recv_packet() => {
if let Err(err) = write_packet_with_pi(&mut self.device, &packet).await {

View File

@@ -30,19 +30,17 @@ impl UdpTun {
balancer: PingBalancer,
time_to_live: Option<Duration>,
capacity: Option<usize>,
) -> UdpTun {
) -> (UdpTun, Duration, mpsc::Receiver<SocketAddr>) {
let (tun_tx, tun_rx) = mpsc::channel(64);
let (manager, cleanup_interval, keepalive_rx) = UdpAssociationManager::new(
context,
UdpTunInboundWriter::new(tun_tx),
time_to_live,
capacity,
balancer,
);
UdpTun {
tun_rx,
manager: UdpAssociationManager::new(
context,
UdpTunInboundWriter::new(tun_tx),
time_to_live,
capacity,
balancer,
),
}
(UdpTun { tun_rx, manager }, cleanup_interval, keepalive_rx)
}
pub async fn handle_packet(
@@ -61,6 +59,16 @@ impl UdpTun {
None => unreachable!("channel closed unexpectedly"),
}
}
#[inline(always)]
pub async fn cleanup_expired(&mut self) {
self.manager.cleanup_expired().await;
}
#[inline(always)]
pub async fn keep_alive(&mut self, peer_addr: &SocketAddr) {
self.manager.keep_alive(peer_addr).await;
}
}
#[derive(Clone)]

View File

@@ -1,10 +1,17 @@
//! UDP Tunnel server
use std::{io, net::SocketAddr, sync::Arc, time::Duration};
use std::{
io::{self, ErrorKind},
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use bytes::Bytes;
use futures::future;
use io::ErrorKind;
use log::{debug, error, info, trace, warn};
use lru_time_cache::LruCache;
use shadowsocks::{
@@ -16,73 +23,43 @@ use shadowsocks::{
},
ServerAddr,
};
use tokio::{
net::UdpSocket,
sync::{mpsc, Mutex},
task::JoinHandle,
time,
};
use tokio::{net::UdpSocket, sync::mpsc, task::JoinHandle, time};
use crate::{
local::{context::ServiceContext, loadbalancing::PingBalancer},
local::{
context::ServiceContext,
loadbalancing::PingBalancer,
net::{UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE, UDP_ASSOCIATION_SEND_CHANNEL_SIZE},
},
net::MonProxySocket,
};
type AssociationMap = LruCache<SocketAddr, UdpAssociation>;
type SharedAssociationMap = Arc<Mutex<AssociationMap>>;
pub struct UdpTunnel {
context: Arc<ServiceContext>,
assoc_map: SharedAssociationMap,
cleanup_abortable: JoinHandle<()>,
keepalive_abortable: JoinHandle<()>,
assoc_map: AssociationMap,
keepalive_tx: mpsc::Sender<SocketAddr>,
}
impl Drop for UdpTunnel {
fn drop(&mut self) {
self.cleanup_abortable.abort();
self.keepalive_abortable.abort();
}
keepalive_rx: mpsc::Receiver<SocketAddr>,
time_to_live: Duration,
}
impl UdpTunnel {
pub fn new(context: Arc<ServiceContext>, time_to_live: Option<Duration>, capacity: Option<usize>) -> UdpTunnel {
let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION);
let assoc_map = Arc::new(Mutex::new(match capacity {
let assoc_map = match capacity {
Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity),
None => LruCache::with_expiry_duration(time_to_live),
}));
let cleanup_abortable = {
let assoc_map = assoc_map.clone();
tokio::spawn(async move {
loop {
time::sleep(time_to_live).await;
// cleanup expired associations. iter() will remove expired elements
let _ = assoc_map.lock().await.iter();
}
})
};
let (keepalive_tx, mut keepalive_rx) = mpsc::channel(64);
let keepalive_abortable = {
let assoc_map = assoc_map.clone();
tokio::spawn(async move {
while let Some(peer_addr) = keepalive_rx.recv().await {
assoc_map.lock().await.get(&peer_addr);
}
})
};
let (keepalive_tx, keepalive_rx) = mpsc::channel(UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE);
UdpTunnel {
context,
assoc_map,
cleanup_abortable,
keepalive_abortable,
keepalive_tx,
keepalive_rx,
time_to_live,
}
}
@@ -110,28 +87,55 @@ impl UdpTunnel {
let listener = Arc::new(socket);
let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE];
loop {
let (n, peer_addr) = match listener.recv_from(&mut buffer).await {
Ok(s) => s,
Err(err) => {
error!("udp server recv_from failed with error: {}", err);
time::sleep(Duration::from_secs(1)).await;
continue;
}
};
let mut cleanup_timer = time::interval(self.time_to_live);
let data = &buffer[..n];
if let Err(err) = self
.send_packet(&listener, peer_addr, &balancer, forward_addr, data)
.await
{
error!(
"udp packet relay {} -> {} with {} bytes failed, error: {}",
peer_addr,
forward_addr,
data.len(),
err
);
loop {
tokio::select! {
_ = cleanup_timer.tick() => {
// cleanup expired associations. iter() will remove expired elements
let _ = self.assoc_map.iter();
}
peer_addr_opt = self.keepalive_rx.recv() => {
let peer_addr = peer_addr_opt.expect("keep-alive channel closed unexpectly");
self.assoc_map.get(&peer_addr);
}
recv_result = listener.recv_from(&mut buffer) => {
let (n, peer_addr) = match recv_result {
Ok(s) => s,
Err(err) => {
error!("udp server recv_from failed with error: {}", err);
time::sleep(Duration::from_secs(1)).await;
continue;
}
};
if n == 0 {
// For windows, it will generate a ICMP Port Unreachable Message
// https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-recvfrom
// Which will result in recv_from return 0.
//
// It cannot be solved here, because `WSAGetLastError` is already set.
//
// See `relay::udprelay::utils::create_socket` for more detail.
continue;
}
let data = &buffer[..n];
if let Err(err) = self
.send_packet(&listener, peer_addr, &balancer, forward_addr, data)
.await
{
error!(
"udp packet relay {} -> {} with {} bytes failed, error: {}",
peer_addr,
forward_addr,
data.len(),
err
);
}
}
}
}
}
@@ -144,9 +148,7 @@ impl UdpTunnel {
forward_addr: &Address,
data: &[u8],
) -> io::Result<()> {
let mut assoc_map = self.assoc_map.lock().await;
if let Some(assoc) = assoc_map.get(&peer_addr) {
if let Some(assoc) = self.assoc_map.get(&peer_addr) {
return assoc.try_send(Bytes::copy_from_slice(data));
}
@@ -162,7 +164,7 @@ impl UdpTunnel {
debug!("created udp association for {}", peer_addr);
assoc.try_send(Bytes::copy_from_slice(data))?;
assoc_map.insert(peer_addr, assoc);
self.assoc_map.insert(peer_addr, assoc);
Ok(())
}
@@ -170,12 +172,14 @@ impl UdpTunnel {
struct UdpAssociation {
assoc_handle: JoinHandle<()>,
keepalive_handle: JoinHandle<()>,
sender: mpsc::Sender<Bytes>,
}
impl Drop for UdpAssociation {
fn drop(&mut self) {
self.assoc_handle.abort();
self.keepalive_handle.abort();
}
}
@@ -188,9 +192,28 @@ impl UdpAssociation {
keepalive_tx: mpsc::Sender<SocketAddr>,
balancer: PingBalancer,
) -> UdpAssociation {
let keepalive_flag = Arc::new(AtomicBool::new(false));
let keepalive_handle = {
let keepalive_flag = keepalive_flag.clone();
tokio::spawn(async move {
loop {
time::sleep(Duration::from_secs(1)).await;
if keepalive_flag.load(Ordering::Acquire) {
let _ = keepalive_tx.send(peer_addr).await;
keepalive_flag.store(false, Ordering::Release);
}
}
})
};
let (assoc_handle, sender) =
UdpAssociationContext::create(context, inbound, peer_addr, forward_addr, keepalive_tx, balancer);
UdpAssociation { assoc_handle, sender }
UdpAssociationContext::create(context, inbound, peer_addr, forward_addr, keepalive_flag, balancer);
UdpAssociation {
assoc_handle,
keepalive_handle,
sender,
}
}
fn try_send(&self, data: Bytes) -> io::Result<()> {
@@ -207,7 +230,7 @@ struct UdpAssociationContext {
peer_addr: SocketAddr,
forward_addr: Address,
proxied_socket: Option<MonProxySocket>,
keepalive_tx: mpsc::Sender<SocketAddr>,
keepalive_flag: Arc<AtomicBool>,
balancer: PingBalancer,
inbound: Arc<UdpSocket>,
}
@@ -224,20 +247,20 @@ impl UdpAssociationContext {
inbound: Arc<UdpSocket>,
peer_addr: SocketAddr,
forward_addr: Address,
keepalive_tx: mpsc::Sender<SocketAddr>,
keepalive_flag: Arc<AtomicBool>,
balancer: PingBalancer,
) -> (JoinHandle<()>, mpsc::Sender<Bytes>) {
// Pending packets 128 for each association should be good enough for a server.
// Pending packets UDP_ASSOCIATION_SEND_CHANNEL_SIZE for each association should be good enough for a server.
// If there are plenty of packets stuck in the channel, dropping excessive packets is a good way to protect the server from
// being OOM.
let (sender, receiver) = mpsc::channel(128);
let (sender, receiver) = mpsc::channel(UDP_ASSOCIATION_SEND_CHANNEL_SIZE);
let mut assoc = UdpAssociationContext {
context,
peer_addr,
forward_addr,
proxied_socket: None,
keepalive_tx,
keepalive_flag,
balancer,
inbound,
};
@@ -353,18 +376,19 @@ impl UdpAssociationContext {
}
async fn send_received_respond_packet(&mut self, addr: &Address, data: &[u8]) {
trace!("udp relay {} <- {} received {} bytes", self.peer_addr, addr, data.len(),);
trace!("udp relay {} <- {} received {} bytes", self.peer_addr, addr, data.len());
// Keep association alive in map
let _ = self
.keepalive_tx
.send_timeout(self.peer_addr, Duration::from_secs(1))
.await;
self.keepalive_flag.store(true, Ordering::Release);
// Send back to client
if let Err(err) = self.inbound.send_to(data, self.peer_addr).await {
warn!(
"udp failed to send back to client {}, from target {}, error: {}",
self.peer_addr, addr, err
"udp failed to send back {} bytes to client {}, from target {}, error: {}",
data.len(),
self.peer_addr,
addr,
err
);
} else {
trace!("udp relay {} <- {} with {} bytes", self.peer_addr, addr, data.len());

View File

@@ -1,10 +1,17 @@
//! Shadowsocks UDP server
use std::{io, net::SocketAddr, sync::Arc, time::Duration};
use std::{
io::{self, ErrorKind},
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use bytes::Bytes;
use futures::future;
use io::ErrorKind;
use log::{debug, error, info, trace, warn};
use lru_time_cache::LruCache;
use shadowsocks::{
@@ -16,35 +23,26 @@ use shadowsocks::{
},
ServerConfig,
};
use tokio::{
sync::{mpsc, Mutex},
task::JoinHandle,
time,
};
use tokio::{sync::mpsc, task::JoinHandle, time};
use crate::net::MonProxySocket;
use crate::{
local::net::{UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE, UDP_ASSOCIATION_SEND_CHANNEL_SIZE},
net::MonProxySocket,
};
use super::context::ServiceContext;
type AssociationMap = LruCache<SocketAddr, UdpAssociation>;
type SharedAssociationMap = Arc<Mutex<AssociationMap>>;
pub struct UdpServer {
context: Arc<ServiceContext>,
assoc_map: SharedAssociationMap,
cleanup_abortable: JoinHandle<()>,
keepalive_abortable: JoinHandle<()>,
assoc_map: AssociationMap,
keepalive_tx: mpsc::Sender<SocketAddr>,
keepalive_rx: mpsc::Receiver<SocketAddr>,
time_to_live: Duration,
accept_opts: AcceptOpts,
}
impl Drop for UdpServer {
fn drop(&mut self) {
self.cleanup_abortable.abort();
self.keepalive_abortable.abort();
}
}
impl UdpServer {
pub fn new(
context: Arc<ServiceContext>,
@@ -53,40 +51,19 @@ impl UdpServer {
accept_opts: AcceptOpts,
) -> UdpServer {
let time_to_live = time_to_live.unwrap_or(crate::DEFAULT_UDP_EXPIRY_DURATION);
let assoc_map = Arc::new(Mutex::new(match capacity {
let assoc_map = match capacity {
Some(capacity) => LruCache::with_expiry_duration_and_capacity(time_to_live, capacity),
None => LruCache::with_expiry_duration(time_to_live),
}));
let cleanup_abortable = {
let assoc_map = assoc_map.clone();
tokio::spawn(async move {
loop {
time::sleep(time_to_live).await;
// cleanup expired associations. iter() will remove expired elements
let _ = assoc_map.lock().await.iter();
}
})
};
let (keepalive_tx, mut keepalive_rx) = mpsc::channel(64);
let keepalive_abortable = {
let assoc_map = assoc_map.clone();
tokio::spawn(async move {
while let Some(peer_addr) = keepalive_rx.recv().await {
assoc_map.lock().await.get(&peer_addr);
}
})
};
let (keepalive_tx, keepalive_rx) = mpsc::channel(UDP_ASSOCIATION_KEEP_ALIVE_CHANNEL_SIZE);
UdpServer {
context,
assoc_map,
cleanup_abortable,
keepalive_abortable,
keepalive_tx,
keepalive_rx,
time_to_live,
accept_opts,
}
}
@@ -103,36 +80,63 @@ impl UdpServer {
let listener = Arc::new(socket);
let mut buffer = [0u8; MAXIMUM_UDP_PAYLOAD_SIZE];
let mut cleanup_timer = time::interval(self.time_to_live);
loop {
let (n, peer_addr, target_addr) = match listener.recv_from(&mut buffer).await {
Ok(s) => s,
Err(err) => {
error!("udp server recv_from failed with error: {}", err);
continue;
tokio::select! {
_ = cleanup_timer.tick() => {
// cleanup expired associations. iter() will remove expired elements
let _ = self.assoc_map.iter();
}
};
if self.context.check_client_blocked(&peer_addr) {
warn!(
"udp client {} outbound {} access denied by ACL rules",
peer_addr, target_addr
);
continue;
}
peer_addr_opt = self.keepalive_rx.recv() => {
let peer_addr = peer_addr_opt.expect("keep-alive channel closed unexpectly");
self.assoc_map.get(&peer_addr);
}
if self.context.check_outbound_blocked(&target_addr).await {
warn!("udp client {} outbound {} blocked by ACL rules", peer_addr, target_addr);
continue;
}
recv_result = listener.recv_from(&mut buffer) => {
let (n, peer_addr, target_addr) = match recv_result {
Ok(s) => s,
Err(err) => {
error!("udp server recv_from failed with error: {}", err);
continue;
}
};
let data = &buffer[..n];
if let Err(err) = self.send_packet(&listener, peer_addr, target_addr, data).await {
error!(
"udp packet relay {} with {} bytes failed, error: {}",
peer_addr,
data.len(),
err
);
if n == 0 {
// For windows, it will generate a ICMP Port Unreachable Message
// https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-recvfrom
// Which will result in recv_from return 0.
//
// It cannot be solved here, because `WSAGetLastError` is already set.
//
// See `relay::udprelay::utils::create_socket` for more detail.
continue;
}
if self.context.check_client_blocked(&peer_addr) {
warn!(
"udp client {} outbound {} access denied by ACL rules",
peer_addr, target_addr
);
continue;
}
if self.context.check_outbound_blocked(&target_addr).await {
warn!("udp client {} outbound {} blocked by ACL rules", peer_addr, target_addr);
continue;
}
let data = &buffer[..n];
if let Err(err) = self.send_packet(&listener, peer_addr, target_addr, data).await {
error!(
"udp packet relay {} with {} bytes failed, error: {}",
peer_addr,
data.len(),
err
);
}
}
}
}
}
@@ -144,9 +148,7 @@ impl UdpServer {
target_addr: Address,
data: &[u8],
) -> io::Result<()> {
let mut assoc_map = self.assoc_map.lock().await;
if let Some(assoc) = assoc_map.get(&peer_addr) {
if let Some(assoc) = self.assoc_map.get(&peer_addr) {
return assoc.try_send((target_addr, Bytes::copy_from_slice(data)));
}
@@ -160,7 +162,7 @@ impl UdpServer {
debug!("created udp association for {}", peer_addr);
assoc.try_send((target_addr, Bytes::copy_from_slice(data)))?;
assoc_map.insert(peer_addr, assoc);
self.assoc_map.insert(peer_addr, assoc);
Ok(())
}
@@ -168,12 +170,14 @@ impl UdpServer {
struct UdpAssociation {
assoc_handle: JoinHandle<()>,
keepalive_handle: JoinHandle<()>,
sender: mpsc::Sender<(Address, Bytes)>,
}
impl Drop for UdpAssociation {
fn drop(&mut self) {
self.assoc_handle.abort();
self.keepalive_handle.abort();
}
}
@@ -184,8 +188,27 @@ impl UdpAssociation {
peer_addr: SocketAddr,
keepalive_tx: mpsc::Sender<SocketAddr>,
) -> UdpAssociation {
let (assoc_handle, sender) = UdpAssociationContext::create(context, inbound, peer_addr, keepalive_tx);
UdpAssociation { assoc_handle, sender }
let keepalive_flag = Arc::new(AtomicBool::new(false));
let keepalive_handle = {
let keepalive_flag = keepalive_flag.clone();
tokio::spawn(async move {
loop {
time::sleep(Duration::from_secs(1)).await;
if keepalive_flag.load(Ordering::Acquire) {
let _ = keepalive_tx.send(peer_addr).await;
keepalive_flag.store(false, Ordering::Release);
}
}
})
};
let (assoc_handle, sender) = UdpAssociationContext::create(context, inbound, peer_addr, keepalive_flag);
UdpAssociation {
assoc_handle,
keepalive_handle,
sender,
}
}
fn try_send(&self, data: (Address, Bytes)) -> io::Result<()> {
@@ -202,7 +225,7 @@ struct UdpAssociationContext {
peer_addr: SocketAddr,
outbound_ipv4_socket: Option<OutboundUdpSocket>,
outbound_ipv6_socket: Option<OutboundUdpSocket>,
keepalive_tx: mpsc::Sender<SocketAddr>,
keepalive_flag: Arc<AtomicBool>,
inbound: Arc<MonProxySocket>,
}
@@ -217,19 +240,19 @@ impl UdpAssociationContext {
context: Arc<ServiceContext>,
inbound: Arc<MonProxySocket>,
peer_addr: SocketAddr,
keepalive_tx: mpsc::Sender<SocketAddr>,
keepalive_flag: Arc<AtomicBool>,
) -> (JoinHandle<()>, mpsc::Sender<(Address, Bytes)>) {
// Pending packets 128 for each association should be good enough for a server.
// Pending packets UDP_ASSOCIATION_SEND_CHANNEL_SIZE for each association should be good enough for a server.
// If there are plenty of packets stuck in the channel, dropping excessive packets is a good way to protect the server from
// being OOM.
let (sender, receiver) = mpsc::channel(128);
let (sender, receiver) = mpsc::channel(UDP_ASSOCIATION_SEND_CHANNEL_SIZE);
let mut assoc = UdpAssociationContext {
context,
peer_addr,
outbound_ipv4_socket: None,
outbound_ipv6_socket: None,
keepalive_tx,
keepalive_flag,
inbound,
};
let handle = tokio::spawn(async move { assoc.dispatch_packet(receiver).await });
@@ -381,16 +404,16 @@ impl UdpAssociationContext {
trace!("udp relay {} <- {} received {} bytes", self.peer_addr, addr, data.len());
// Keep association alive in map
let _ = self
.keepalive_tx
.send_timeout(self.peer_addr, Duration::from_secs(1))
.await;
self.keepalive_flag.store(true, Ordering::Release);
// Send back to client
if let Err(err) = self.inbound.send_to(self.peer_addr, addr, data).await {
warn!(
"udp failed to send back to client {}, from target {}, error: {}",
self.peer_addr, addr, err
"udp failed to send back {} bytes to client {}, from target {}, error: {}",
data.len(),
self.peer_addr,
addr,
err
);
} else {
trace!("udp relay {} <- {} with {} bytes", self.peer_addr, addr, data.len());

View File

@@ -1,7 +1,7 @@
use std::{net::SocketAddr, time::Duration};
use byteorder::{BigEndian, ByteOrder};
use clap::{App, Arg};
use clap::{Arg, Command};
use shadowsocks::{
config::{ServerAddr, ServerType},
context::Context,
@@ -20,7 +20,7 @@ use trust_dns_proto::{
async fn main() {
env_logger::init();
let matches = App::new("dns-pressure")
let matches = Command::new("dns-pressure")
.arg(
Arg::new("OUTBOUND_BIND_INTERFACE")
.long("outbound-bind-interface")