From e4dfd2f9d8d6bf98f5576de07bbb43b2d286cd7e Mon Sep 17 00:00:00 2001 From: zonyitoo Date: Tue, 20 Jul 2021 15:17:49 +0800 Subject: [PATCH] replaced future::abort with tokio JoinHandle abort - MUST: tokio >= v1.8.2 --- .../src/local/loadbalancing/ping_balancer.rs | 17 +++--- crates/shadowsocks-service/src/local/mod.rs | 5 +- .../src/local/net/udp/association.rs | 56 ++++++++----------- .../src/local/tunnel/udprelay.rs | 34 +++++------ .../src/server/udprelay.rs | 44 ++++++--------- 5 files changed, 63 insertions(+), 93 deletions(-) diff --git a/crates/shadowsocks-service/src/local/loadbalancing/ping_balancer.rs b/crates/shadowsocks-service/src/local/loadbalancing/ping_balancer.rs index 1572c42a..6e045e7b 100644 --- a/crates/shadowsocks-service/src/local/loadbalancing/ping_balancer.rs +++ b/crates/shadowsocks-service/src/local/loadbalancing/ping_balancer.rs @@ -2,7 +2,6 @@ use std::{ fmt::{self, Debug, Display}, - future::Future, io, net::{Ipv4Addr, SocketAddr}, sync::{ @@ -13,7 +12,7 @@ use std::{ }; use byte_string::ByteStr; -use futures::future::{self, AbortHandle}; +use futures::future; use log::{debug, info, trace}; use shadowsocks::{ config::Mode, @@ -26,6 +25,7 @@ use shadowsocks::{ }; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + task::JoinHandle, time, }; @@ -72,7 +72,7 @@ impl PingBalancerBuilder { self.servers.push(Arc::new(server)); } - pub async fn build(self) -> (PingBalancer, impl Future) { + pub async fn build(self) -> PingBalancer { assert!(!self.servers.is_empty(), "build PingBalancer without any servers"); let balancer_context = PingBalancerContext { @@ -87,12 +87,9 @@ impl PingBalancerBuilder { let shared_context = Arc::new(balancer_context); - let (checker, abortable) = { + let abortable = { let shared_context = shared_context.clone(); - future::abortable(async move { shared_context.checker_task().await }) - }; - let checker = async move { - let _ = checker.await; + tokio::spawn(async move { shared_context.checker_task().await }) }; let balancer = PingBalancer { @@ -101,7 +98,7 @@ impl PingBalancerBuilder { abortable, }), }; - (balancer, checker) + balancer } } @@ -260,7 +257,7 @@ impl PingBalancerContext { struct PingBalancerInner { context: Arc, - abortable: AbortHandle, + abortable: JoinHandle<()>, } impl Drop for PingBalancerInner { diff --git a/crates/shadowsocks-service/src/local/mod.rs b/crates/shadowsocks-service/src/local/mod.rs index f622481d..8ce844dc 100644 --- a/crates/shadowsocks-service/src/local/mod.rs +++ b/crates/shadowsocks-service/src/local/mod.rs @@ -189,10 +189,7 @@ pub async fn run(mut config: Config) -> io::Result<()> { for server in config.server { balancer_builder.add_server(ServerIdent::new(server)); } - let (balancer, checker) = balancer_builder.build().await; - tokio::spawn(checker); - - balancer + balancer_builder.build().await }; #[cfg(feature = "local-flow-stat")] diff --git a/crates/shadowsocks-service/src/local/net/udp/association.rs b/crates/shadowsocks-service/src/local/net/udp/association.rs index e7c0f544..50584dff 100644 --- a/crates/shadowsocks-service/src/local/net/udp/association.rs +++ b/crates/shadowsocks-service/src/local/net/udp/association.rs @@ -9,7 +9,6 @@ use std::{ use async_trait::async_trait; use bytes::Bytes; -use futures::future::{self, AbortHandle}; use log::{debug, error, trace, warn}; use lru_time_cache::LruCache; use shadowsocks::{ @@ -24,6 +23,7 @@ use spin::Mutex as SpinMutex; use tokio::{ net::UdpSocket, sync::{mpsc, Mutex}, + task::JoinHandle, time, }; @@ -53,8 +53,8 @@ where respond_writer: W, context: Arc, assoc_map: SharedAssociationMap, - cleanup_abortable: AbortHandle, - keepalive_abortable: AbortHandle, + cleanup_abortable: JoinHandle<()>, + keepalive_abortable: JoinHandle<()>, keepalive_tx: mpsc::Sender, balancer: PingBalancer, } @@ -89,29 +89,25 @@ where let cleanup_abortable = { let assoc_map = assoc_map.clone(); - let (cleanup_task, cleanup_abortable) = future::abortable(async move { + tokio::spawn(async move { loop { time::sleep(time_to_live).await; // cleanup expired associations. iter() will remove expired elements let _ = assoc_map.lock().await.iter(); } - }); - tokio::spawn(cleanup_task); - cleanup_abortable + }) }; let (keepalive_tx, mut keepalive_rx) = mpsc::channel(256); let keepalive_abortable = { let assoc_map = assoc_map.clone(); - let (keepalive_task, keepalive_abortable) = future::abortable(async move { + tokio::spawn(async move { while let Some(peer_addr) = keepalive_rx.recv().await { assoc_map.lock().await.get(&peer_addr); } - }); - tokio::spawn(keepalive_task); - keepalive_abortable + }) }; UdpAssociationManager { @@ -199,7 +195,7 @@ enum UdpAssociationBypassState { Empty, Connected { socket: Arc, - abortable: AbortHandle, + abortable: JoinHandle>, }, Aborted, } @@ -217,7 +213,7 @@ impl UdpAssociationBypassState { UdpAssociationBypassState::Empty } - fn set_connected(&mut self, socket: Arc, abortable: AbortHandle) { + fn set_connected(&mut self, socket: Arc, abortable: JoinHandle>) { *self = UdpAssociationBypassState::Connected { socket, abortable }; } @@ -230,7 +226,7 @@ enum UdpAssociationProxyState { Empty, Connected { socket: Arc, - abortable: AbortHandle, + abortable: JoinHandle>, }, Aborted, } @@ -251,7 +247,7 @@ impl UdpAssociationProxyState { *self = UdpAssociationProxyState::Empty; } - fn set_connected(&mut self, socket: Arc, abortable: AbortHandle) { + fn set_connected(&mut self, socket: Arc, abortable: JoinHandle>) { self.abort_inner(); *self = UdpAssociationProxyState::Connected { socket, abortable }; } @@ -394,13 +390,11 @@ where ShadowUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?; let socket: Arc = Arc::new(socket.into()); - let (r2l_fut, r2l_abortable) = { - let assoc = self.clone(); - future::abortable(assoc.copy_bypassed_r2l(socket.clone())) - }; - // CLIENT <- REMOTE - tokio::spawn(r2l_fut); + let r2l_abortable = { + let assoc = self.clone(); + tokio::spawn(assoc.copy_bypassed_r2l(socket.clone())) + }; debug!( "created udp association for {} (bypassed) with {:?}", self.peer_addr, @@ -449,13 +443,11 @@ where ShadowUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?; let socket: Arc = Arc::new(socket.into()); - let (r2l_fut, r2l_abortable) = { - let assoc = self.clone(); - future::abortable(assoc.copy_bypassed_r2l(socket.clone())) - }; - // CLIENT <- REMOTE - tokio::spawn(r2l_fut); + let r2l_abortable = { + let assoc = self.clone(); + tokio::spawn(assoc.copy_bypassed_r2l(socket.clone())) + }; debug!( "created udp association for {} (bypassed) with {:?}", self.peer_addr, @@ -515,13 +507,11 @@ where let socket = MonProxySocket::from_socket(socket, self.context.flow_stat()); let socket = Arc::new(socket); - let (r2l_fut, r2l_abortable) = { - let assoc = self.clone(); - future::abortable(assoc.copy_proxied_r2l(socket.clone())) - }; - // CLIENT <- REMOTE - tokio::spawn(r2l_fut); + let r2l_abortable = { + let assoc = self.clone(); + tokio::spawn(assoc.copy_proxied_r2l(socket.clone())) + }; debug!( "created udp association for {} <-> {} (proxied) with {:?}", diff --git a/crates/shadowsocks-service/src/local/tunnel/udprelay.rs b/crates/shadowsocks-service/src/local/tunnel/udprelay.rs index 5b70eeb2..5407f778 100644 --- a/crates/shadowsocks-service/src/local/tunnel/udprelay.rs +++ b/crates/shadowsocks-service/src/local/tunnel/udprelay.rs @@ -3,7 +3,6 @@ use std::{io, net::SocketAddr, sync::Arc, time::Duration}; use bytes::Bytes; -use futures::future::{self, AbortHandle}; use io::ErrorKind; use log::{debug, error, info, trace, warn}; use lru_time_cache::LruCache; @@ -20,6 +19,7 @@ use spin::Mutex as SpinMutex; use tokio::{ net::UdpSocket, sync::{mpsc, Mutex}, + task::JoinHandle, time, }; @@ -34,8 +34,8 @@ type SharedAssociationMap = Arc>; pub struct UdpTunnel { context: Arc, assoc_map: SharedAssociationMap, - cleanup_abortable: AbortHandle, - keepalive_abortable: AbortHandle, + cleanup_abortable: JoinHandle<()>, + keepalive_abortable: JoinHandle<()>, keepalive_tx: mpsc::Sender, } @@ -56,29 +56,25 @@ impl UdpTunnel { let cleanup_abortable = { let assoc_map = assoc_map.clone(); - let (cleanup_task, cleanup_abortable) = future::abortable(async move { + tokio::spawn(async move { loop { time::sleep(time_to_live).await; // cleanup expired associations. iter() will remove expired elements let _ = assoc_map.lock().await.iter(); } - }); - tokio::spawn(cleanup_task); - cleanup_abortable + }) }; - let (keepalive_tx, mut keepalive_rx) = mpsc::channel(256); + let (keepalive_tx, mut keepalive_rx) = mpsc::channel(64); let keepalive_abortable = { let assoc_map = assoc_map.clone(); - let (keepalive_task, keepalive_abortable) = future::abortable(async move { + tokio::spawn(async move { while let Some(peer_addr) = keepalive_rx.recv().await { assoc_map.lock().await.get(&peer_addr); } - }); - tokio::spawn(keepalive_task); - keepalive_abortable + }) }; UdpTunnel { @@ -208,7 +204,7 @@ enum UdpAssociationState { Empty, Connected { socket: Arc, - abortable: AbortHandle, + abortable: JoinHandle>, }, Aborted, } @@ -229,7 +225,7 @@ impl UdpAssociationState { *self = UdpAssociationState::Empty; } - fn set_connected(&mut self, socket: Arc, abortable: AbortHandle) { + fn set_connected(&mut self, socket: Arc, abortable: JoinHandle>) { self.abort_inner(); *self = UdpAssociationState::Connected { socket, abortable }; } @@ -337,13 +333,11 @@ impl UdpAssociationContext { let socket = MonProxySocket::from_socket(socket, self.context.flow_stat()); let socket = Arc::new(socket); - let (r2l_fut, r2l_abortable) = { - let assoc = self.clone(); - future::abortable(assoc.copy_proxied_r2l(socket.clone())) - }; - // CLIENT <- REMOTE - tokio::spawn(r2l_fut); + let r2l_abortable = { + let assoc = self.clone(); + tokio::spawn(assoc.copy_proxied_r2l(socket.clone())) + }; debug!( "created udp association for {} <-> {} (proxied) with {:?}", diff --git a/crates/shadowsocks-service/src/server/udprelay.rs b/crates/shadowsocks-service/src/server/udprelay.rs index 0c60f99a..3c23e920 100644 --- a/crates/shadowsocks-service/src/server/udprelay.rs +++ b/crates/shadowsocks-service/src/server/udprelay.rs @@ -3,7 +3,6 @@ use std::{io, net::SocketAddr, sync::Arc, time::Duration}; use bytes::Bytes; -use futures::future::{self, AbortHandle}; use io::ErrorKind; use log::{debug, error, info, trace, warn}; use lru_time_cache::LruCache; @@ -19,6 +18,7 @@ use shadowsocks::{ use spin::Mutex as SpinMutex; use tokio::{ sync::{mpsc, Mutex}, + task::JoinHandle, time, }; @@ -32,8 +32,8 @@ type SharedAssociationMap = Arc>; pub struct UdpServer { context: Arc, assoc_map: SharedAssociationMap, - cleanup_abortable: AbortHandle, - keepalive_abortable: AbortHandle, + cleanup_abortable: JoinHandle<()>, + keepalive_abortable: JoinHandle<()>, keepalive_tx: mpsc::Sender, } @@ -54,29 +54,25 @@ impl UdpServer { let cleanup_abortable = { let assoc_map = assoc_map.clone(); - let (cleanup_task, cleanup_abortable) = future::abortable(async move { + tokio::spawn(async move { loop { time::sleep(time_to_live).await; // cleanup expired associations. iter() will remove expired elements let _ = assoc_map.lock().await.iter(); } - }); - tokio::spawn(cleanup_task); - cleanup_abortable + }) }; - let (keepalive_tx, mut keepalive_rx) = mpsc::channel(256); + let (keepalive_tx, mut keepalive_rx) = mpsc::channel(64); let keepalive_abortable = { let assoc_map = assoc_map.clone(); - let (keepalive_task, keepalive_abortable) = future::abortable(async move { + tokio::spawn(async move { while let Some(peer_addr) = keepalive_rx.recv().await { assoc_map.lock().await.get(&peer_addr); } - }); - tokio::spawn(keepalive_task); - keepalive_abortable + }) }; UdpServer { @@ -191,7 +187,7 @@ enum UdpAssociationState { Empty, Connected { socket: Arc, - abortable: AbortHandle, + abortable: JoinHandle>, }, Aborted, } @@ -207,7 +203,7 @@ impl UdpAssociationState { UdpAssociationState::Empty } - fn set_connected(&mut self, socket: Arc, abortable: AbortHandle) { + fn set_connected(&mut self, socket: Arc, abortable: JoinHandle>) { self.abort_inner(); *self = UdpAssociationState::Connected { socket, abortable }; } @@ -335,13 +331,11 @@ impl UdpAssociationContext { OutboundUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?; let socket: Arc = Arc::new(socket); - let (r2l_fut, r2l_abortable) = { - let assoc = self.clone(); - future::abortable(assoc.copy_r2l(socket.clone())) - }; - // CLIENT <- REMOTE - tokio::spawn(r2l_fut); + let r2l_abortable = { + let assoc = self.clone(); + tokio::spawn(assoc.copy_r2l(socket.clone())) + }; debug!( "created udp association for {} with {:?}", self.peer_addr, @@ -390,13 +384,11 @@ impl UdpAssociationContext { OutboundUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?; let socket: Arc = Arc::new(socket); - let (r2l_fut, r2l_abortable) = { - let assoc = self.clone(); - future::abortable(assoc.copy_r2l(socket.clone())) - }; - // CLIENT <- REMOTE - tokio::spawn(r2l_fut); + let r2l_abortable = { + let assoc = self.clone(); + tokio::spawn(assoc.copy_r2l(socket.clone())) + }; debug!( "created udp association for {} with {:?}", self.peer_addr,