fix(server): exit server instance if any of the sub-tasks exited (#1612)

Replaced FutureUnordered with futures::select_all, and put all sub-tasks
into individual tokio tasks.
This commit is contained in:
zonyitoo
2024-08-14 23:30:38 +08:00
parent d67908f27e
commit 6ffeda5495

View File

@@ -2,12 +2,15 @@
use std::{
collections::HashMap,
future::Future,
io::{self, ErrorKind},
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use futures::{future, ready};
use log::{error, trace};
use shadowsocks::{
config::{ManagerAddr, ServerConfig},
@@ -16,7 +19,7 @@ use shadowsocks::{
plugin::{Plugin, PluginMode},
ManagerClient,
};
use tokio::time;
use tokio::{task::JoinHandle, time};
use crate::{acl::AccessControl, config::SecurityConfig, net::FlowStat};
@@ -159,6 +162,27 @@ impl ServerBuilder {
}
}
struct ServerHandle(JoinHandle<io::Result<()>>);
impl Drop for ServerHandle {
#[inline]
fn drop(&mut self) {
self.0.abort();
}
}
impl Future for ServerHandle {
type Output = io::Result<()>;
#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.0).poll(cx)) {
Ok(res) => res.into(),
Err(err) => Err(io::Error::new(ErrorKind::Other, err)).into(),
}
}
}
/// Shadowsocks Server instance
pub struct Server {
context: Arc<ServiceContext>,
@@ -187,36 +211,33 @@ impl Server {
/// Start serving
pub async fn run(self) -> io::Result<()> {
let vfut = FuturesUnordered::new();
let mut vfut = Vec::new();
if let Some(plugin) = self.plugin {
vfut.push(
async move {
match plugin.join().await {
Ok(status) => {
error!("plugin exited with status: {}", status);
Ok(())
}
Err(err) => {
error!("plugin exited with error: {}", err);
Err(err)
}
vfut.push(ServerHandle(tokio::spawn(async move {
match plugin.join().await {
Ok(status) => {
error!("plugin exited with status: {}", status);
Ok(())
}
Err(err) => {
error!("plugin exited with error: {}", err);
Err(err)
}
}
.boxed(),
);
})));
}
if let Some(tcp_server) = self.tcp_server {
vfut.push(tcp_server.run().boxed());
vfut.push(ServerHandle(tokio::spawn(tcp_server.run())));
}
if let Some(udp_server) = self.udp_server {
vfut.push(udp_server.run().boxed())
vfut.push(ServerHandle(tokio::spawn(udp_server.run())));
}
if let Some(manager_addr) = self.manager_addr {
let manager_fut = async move {
vfut.push(ServerHandle(tokio::spawn(async move {
loop {
match ManagerClient::connect(
self.context.context_ref(),
@@ -251,13 +272,10 @@ impl Server {
// Report every 10 seconds
time::sleep(Duration::from_secs(10)).await;
}
}
.boxed();
vfut.push(manager_fut);
})));
}
let (res, _) = vfut.into_future().await;
if let Some(Err(err)) = res {
if let (Err(err), ..) = future::select_all(vfut).await {
error!("servers exited with error: {}", err);
}