replaced future::abort with tokio JoinHandle abort

- MUST: tokio >= v1.8.2
This commit is contained in:
zonyitoo
2021-07-20 15:17:49 +08:00
parent d730c0e4ec
commit e4dfd2f9d8
5 changed files with 63 additions and 93 deletions

View File

@@ -2,7 +2,6 @@
use std::{
fmt::{self, Debug, Display},
future::Future,
io,
net::{Ipv4Addr, SocketAddr},
sync::{
@@ -13,7 +12,7 @@ use std::{
};
use byte_string::ByteStr;
use futures::future::{self, AbortHandle};
use futures::future;
use log::{debug, info, trace};
use shadowsocks::{
config::Mode,
@@ -26,6 +25,7 @@ use shadowsocks::{
};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
task::JoinHandle,
time,
};
@@ -72,7 +72,7 @@ impl PingBalancerBuilder {
self.servers.push(Arc::new(server));
}
pub async fn build(self) -> (PingBalancer, impl Future<Output = ()>) {
pub async fn build(self) -> PingBalancer {
assert!(!self.servers.is_empty(), "build PingBalancer without any servers");
let balancer_context = PingBalancerContext {
@@ -87,12 +87,9 @@ impl PingBalancerBuilder {
let shared_context = Arc::new(balancer_context);
let (checker, abortable) = {
let abortable = {
let shared_context = shared_context.clone();
future::abortable(async move { shared_context.checker_task().await })
};
let checker = async move {
let _ = checker.await;
tokio::spawn(async move { shared_context.checker_task().await })
};
let balancer = PingBalancer {
@@ -101,7 +98,7 @@ impl PingBalancerBuilder {
abortable,
}),
};
(balancer, checker)
balancer
}
}
@@ -260,7 +257,7 @@ impl PingBalancerContext {
struct PingBalancerInner {
context: Arc<PingBalancerContext>,
abortable: AbortHandle,
abortable: JoinHandle<()>,
}
impl Drop for PingBalancerInner {

View File

@@ -189,10 +189,7 @@ pub async fn run(mut config: Config) -> io::Result<()> {
for server in config.server {
balancer_builder.add_server(ServerIdent::new(server));
}
let (balancer, checker) = balancer_builder.build().await;
tokio::spawn(checker);
balancer
balancer_builder.build().await
};
#[cfg(feature = "local-flow-stat")]

View File

@@ -9,7 +9,6 @@ use std::{
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::{self, AbortHandle};
use log::{debug, error, trace, warn};
use lru_time_cache::LruCache;
use shadowsocks::{
@@ -24,6 +23,7 @@ use spin::Mutex as SpinMutex;
use tokio::{
net::UdpSocket,
sync::{mpsc, Mutex},
task::JoinHandle,
time,
};
@@ -53,8 +53,8 @@ where
respond_writer: W,
context: Arc<ServiceContext>,
assoc_map: SharedAssociationMap<W>,
cleanup_abortable: AbortHandle,
keepalive_abortable: AbortHandle,
cleanup_abortable: JoinHandle<()>,
keepalive_abortable: JoinHandle<()>,
keepalive_tx: mpsc::Sender<SocketAddr>,
balancer: PingBalancer,
}
@@ -89,29 +89,25 @@ where
let cleanup_abortable = {
let assoc_map = assoc_map.clone();
let (cleanup_task, cleanup_abortable) = future::abortable(async move {
tokio::spawn(async move {
loop {
time::sleep(time_to_live).await;
// cleanup expired associations. iter() will remove expired elements
let _ = assoc_map.lock().await.iter();
}
});
tokio::spawn(cleanup_task);
cleanup_abortable
})
};
let (keepalive_tx, mut keepalive_rx) = mpsc::channel(256);
let keepalive_abortable = {
let assoc_map = assoc_map.clone();
let (keepalive_task, keepalive_abortable) = future::abortable(async move {
tokio::spawn(async move {
while let Some(peer_addr) = keepalive_rx.recv().await {
assoc_map.lock().await.get(&peer_addr);
}
});
tokio::spawn(keepalive_task);
keepalive_abortable
})
};
UdpAssociationManager {
@@ -199,7 +195,7 @@ enum UdpAssociationBypassState {
Empty,
Connected {
socket: Arc<UdpSocket>,
abortable: AbortHandle,
abortable: JoinHandle<io::Result<()>>,
},
Aborted,
}
@@ -217,7 +213,7 @@ impl UdpAssociationBypassState {
UdpAssociationBypassState::Empty
}
fn set_connected(&mut self, socket: Arc<UdpSocket>, abortable: AbortHandle) {
fn set_connected(&mut self, socket: Arc<UdpSocket>, abortable: JoinHandle<io::Result<()>>) {
*self = UdpAssociationBypassState::Connected { socket, abortable };
}
@@ -230,7 +226,7 @@ enum UdpAssociationProxyState {
Empty,
Connected {
socket: Arc<MonProxySocket>,
abortable: AbortHandle,
abortable: JoinHandle<io::Result<()>>,
},
Aborted,
}
@@ -251,7 +247,7 @@ impl UdpAssociationProxyState {
*self = UdpAssociationProxyState::Empty;
}
fn set_connected(&mut self, socket: Arc<MonProxySocket>, abortable: AbortHandle) {
fn set_connected(&mut self, socket: Arc<MonProxySocket>, abortable: JoinHandle<io::Result<()>>) {
self.abort_inner();
*self = UdpAssociationProxyState::Connected { socket, abortable };
}
@@ -394,13 +390,11 @@ where
ShadowUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?;
let socket: Arc<UdpSocket> = Arc::new(socket.into());
let (r2l_fut, r2l_abortable) = {
let assoc = self.clone();
future::abortable(assoc.copy_bypassed_r2l(socket.clone()))
};
// CLIENT <- REMOTE
tokio::spawn(r2l_fut);
let r2l_abortable = {
let assoc = self.clone();
tokio::spawn(assoc.copy_bypassed_r2l(socket.clone()))
};
debug!(
"created udp association for {} (bypassed) with {:?}",
self.peer_addr,
@@ -449,13 +443,11 @@ where
ShadowUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?;
let socket: Arc<UdpSocket> = Arc::new(socket.into());
let (r2l_fut, r2l_abortable) = {
let assoc = self.clone();
future::abortable(assoc.copy_bypassed_r2l(socket.clone()))
};
// CLIENT <- REMOTE
tokio::spawn(r2l_fut);
let r2l_abortable = {
let assoc = self.clone();
tokio::spawn(assoc.copy_bypassed_r2l(socket.clone()))
};
debug!(
"created udp association for {} (bypassed) with {:?}",
self.peer_addr,
@@ -515,13 +507,11 @@ where
let socket = MonProxySocket::from_socket(socket, self.context.flow_stat());
let socket = Arc::new(socket);
let (r2l_fut, r2l_abortable) = {
let assoc = self.clone();
future::abortable(assoc.copy_proxied_r2l(socket.clone()))
};
// CLIENT <- REMOTE
tokio::spawn(r2l_fut);
let r2l_abortable = {
let assoc = self.clone();
tokio::spawn(assoc.copy_proxied_r2l(socket.clone()))
};
debug!(
"created udp association for {} <-> {} (proxied) with {:?}",

View File

@@ -3,7 +3,6 @@
use std::{io, net::SocketAddr, sync::Arc, time::Duration};
use bytes::Bytes;
use futures::future::{self, AbortHandle};
use io::ErrorKind;
use log::{debug, error, info, trace, warn};
use lru_time_cache::LruCache;
@@ -20,6 +19,7 @@ use spin::Mutex as SpinMutex;
use tokio::{
net::UdpSocket,
sync::{mpsc, Mutex},
task::JoinHandle,
time,
};
@@ -34,8 +34,8 @@ type SharedAssociationMap = Arc<Mutex<AssociationMap>>;
pub struct UdpTunnel {
context: Arc<ServiceContext>,
assoc_map: SharedAssociationMap,
cleanup_abortable: AbortHandle,
keepalive_abortable: AbortHandle,
cleanup_abortable: JoinHandle<()>,
keepalive_abortable: JoinHandle<()>,
keepalive_tx: mpsc::Sender<SocketAddr>,
}
@@ -56,29 +56,25 @@ impl UdpTunnel {
let cleanup_abortable = {
let assoc_map = assoc_map.clone();
let (cleanup_task, cleanup_abortable) = future::abortable(async move {
tokio::spawn(async move {
loop {
time::sleep(time_to_live).await;
// cleanup expired associations. iter() will remove expired elements
let _ = assoc_map.lock().await.iter();
}
});
tokio::spawn(cleanup_task);
cleanup_abortable
})
};
let (keepalive_tx, mut keepalive_rx) = mpsc::channel(256);
let (keepalive_tx, mut keepalive_rx) = mpsc::channel(64);
let keepalive_abortable = {
let assoc_map = assoc_map.clone();
let (keepalive_task, keepalive_abortable) = future::abortable(async move {
tokio::spawn(async move {
while let Some(peer_addr) = keepalive_rx.recv().await {
assoc_map.lock().await.get(&peer_addr);
}
});
tokio::spawn(keepalive_task);
keepalive_abortable
})
};
UdpTunnel {
@@ -208,7 +204,7 @@ enum UdpAssociationState {
Empty,
Connected {
socket: Arc<MonProxySocket>,
abortable: AbortHandle,
abortable: JoinHandle<io::Result<()>>,
},
Aborted,
}
@@ -229,7 +225,7 @@ impl UdpAssociationState {
*self = UdpAssociationState::Empty;
}
fn set_connected(&mut self, socket: Arc<MonProxySocket>, abortable: AbortHandle) {
fn set_connected(&mut self, socket: Arc<MonProxySocket>, abortable: JoinHandle<io::Result<()>>) {
self.abort_inner();
*self = UdpAssociationState::Connected { socket, abortable };
}
@@ -337,13 +333,11 @@ impl UdpAssociationContext {
let socket = MonProxySocket::from_socket(socket, self.context.flow_stat());
let socket = Arc::new(socket);
let (r2l_fut, r2l_abortable) = {
let assoc = self.clone();
future::abortable(assoc.copy_proxied_r2l(socket.clone()))
};
// CLIENT <- REMOTE
tokio::spawn(r2l_fut);
let r2l_abortable = {
let assoc = self.clone();
tokio::spawn(assoc.copy_proxied_r2l(socket.clone()))
};
debug!(
"created udp association for {} <-> {} (proxied) with {:?}",

View File

@@ -3,7 +3,6 @@
use std::{io, net::SocketAddr, sync::Arc, time::Duration};
use bytes::Bytes;
use futures::future::{self, AbortHandle};
use io::ErrorKind;
use log::{debug, error, info, trace, warn};
use lru_time_cache::LruCache;
@@ -19,6 +18,7 @@ use shadowsocks::{
use spin::Mutex as SpinMutex;
use tokio::{
sync::{mpsc, Mutex},
task::JoinHandle,
time,
};
@@ -32,8 +32,8 @@ type SharedAssociationMap = Arc<Mutex<AssociationMap>>;
pub struct UdpServer {
context: Arc<ServiceContext>,
assoc_map: SharedAssociationMap,
cleanup_abortable: AbortHandle,
keepalive_abortable: AbortHandle,
cleanup_abortable: JoinHandle<()>,
keepalive_abortable: JoinHandle<()>,
keepalive_tx: mpsc::Sender<SocketAddr>,
}
@@ -54,29 +54,25 @@ impl UdpServer {
let cleanup_abortable = {
let assoc_map = assoc_map.clone();
let (cleanup_task, cleanup_abortable) = future::abortable(async move {
tokio::spawn(async move {
loop {
time::sleep(time_to_live).await;
// cleanup expired associations. iter() will remove expired elements
let _ = assoc_map.lock().await.iter();
}
});
tokio::spawn(cleanup_task);
cleanup_abortable
})
};
let (keepalive_tx, mut keepalive_rx) = mpsc::channel(256);
let (keepalive_tx, mut keepalive_rx) = mpsc::channel(64);
let keepalive_abortable = {
let assoc_map = assoc_map.clone();
let (keepalive_task, keepalive_abortable) = future::abortable(async move {
tokio::spawn(async move {
while let Some(peer_addr) = keepalive_rx.recv().await {
assoc_map.lock().await.get(&peer_addr);
}
});
tokio::spawn(keepalive_task);
keepalive_abortable
})
};
UdpServer {
@@ -191,7 +187,7 @@ enum UdpAssociationState {
Empty,
Connected {
socket: Arc<OutboundUdpSocket>,
abortable: AbortHandle,
abortable: JoinHandle<io::Result<()>>,
},
Aborted,
}
@@ -207,7 +203,7 @@ impl UdpAssociationState {
UdpAssociationState::Empty
}
fn set_connected(&mut self, socket: Arc<OutboundUdpSocket>, abortable: AbortHandle) {
fn set_connected(&mut self, socket: Arc<OutboundUdpSocket>, abortable: JoinHandle<io::Result<()>>) {
self.abort_inner();
*self = UdpAssociationState::Connected { socket, abortable };
}
@@ -335,13 +331,11 @@ impl UdpAssociationContext {
OutboundUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?;
let socket: Arc<OutboundUdpSocket> = Arc::new(socket);
let (r2l_fut, r2l_abortable) = {
let assoc = self.clone();
future::abortable(assoc.copy_r2l(socket.clone()))
};
// CLIENT <- REMOTE
tokio::spawn(r2l_fut);
let r2l_abortable = {
let assoc = self.clone();
tokio::spawn(assoc.copy_r2l(socket.clone()))
};
debug!(
"created udp association for {} with {:?}",
self.peer_addr,
@@ -390,13 +384,11 @@ impl UdpAssociationContext {
OutboundUdpSocket::connect_any_with_opts(&target_addr, self.context.connect_opts_ref()).await?;
let socket: Arc<OutboundUdpSocket> = Arc::new(socket);
let (r2l_fut, r2l_abortable) = {
let assoc = self.clone();
future::abortable(assoc.copy_r2l(socket.clone()))
};
// CLIENT <- REMOTE
tokio::spawn(r2l_fut);
let r2l_abortable = {
let assoc = self.clone();
tokio::spawn(assoc.copy_r2l(socket.clone()))
};
debug!(
"created udp association for {} with {:?}",
self.peer_addr,