feat(server): tokio v1.38 stablized RuntimeMetrics::num_workers

This commit is contained in:
zonyitoo
2024-06-13 00:00:05 +08:00
parent a5130caa0f
commit 6345c0dcf1
10 changed files with 4 additions and 59 deletions

1
Cargo.lock generated
View File

@@ -3184,7 +3184,6 @@ dependencies = [
"log4rs",
"mimalloc",
"mime",
"num_cpus",
"qrcode",
"rand",
"reqwest",

View File

@@ -239,7 +239,6 @@ rand = "0.8"
futures = "0.3"
tokio = { version = "1", features = ["rt", "signal"] }
num_cpus = "1.15"
ipnet = { version = "2.9", optional = true }

View File

@@ -144,7 +144,7 @@ rand = { version = "0.8", features = ["small_rng"] }
sled = { version = "0.34.7", optional = true }
futures = "0.3"
tokio = { version = "1.5", features = [
tokio = { version = "1.38", features = [
"io-util",
"macros",
"net",

View File

@@ -1359,11 +1359,6 @@ pub struct Config {
/// This is normally for auto-reloading if implementation supports.
pub config_path: Option<PathBuf>,
#[doc(hidden)]
/// Workers in runtime
/// It should be replaced with metrics APIs: https://github.com/tokio-rs/tokio/issues/4073
pub worker_count: usize,
/// OnlineConfiguration (SIP008)
/// https://shadowsocks.org/doc/sip008.html
#[cfg(feature = "local-online-config")]
@@ -1488,8 +1483,6 @@ impl Config {
config_path: None,
worker_count: 1,
#[cfg(feature = "local-online-config")]
online_config: None,
}

View File

@@ -85,7 +85,6 @@ pub struct ManagerBuilder {
acl: Option<Arc<AccessControl>>,
ipv6_first: bool,
security: SecurityConfig,
worker_count: usize,
}
impl ManagerBuilder {
@@ -106,7 +105,6 @@ impl ManagerBuilder {
acl: None,
ipv6_first: false,
security: SecurityConfig::default(),
worker_count: 1,
}
}
@@ -156,14 +154,6 @@ impl ManagerBuilder {
self.security = security;
}
/// Set runtime worker count
///
/// Should be replaced with tokio's metric API when it is stablized.
/// https://github.com/tokio-rs/tokio/issues/4073
pub fn set_worker_count(&mut self, worker_count: usize) {
self.worker_count = worker_count;
}
/// Build the manager server instance
pub async fn build(self) -> io::Result<Manager> {
let listener = ManagerListener::bind(&self.context, &self.svr_cfg.addr).await?;
@@ -178,7 +168,6 @@ impl ManagerBuilder {
acl: self.acl,
ipv6_first: self.ipv6_first,
security: self.security,
worker_count: self.worker_count,
listener,
})
}
@@ -196,7 +185,6 @@ pub struct Manager {
acl: Option<Arc<AccessControl>>,
ipv6_first: bool,
security: SecurityConfig,
worker_count: usize,
listener: ManagerListener,
}
@@ -293,8 +281,6 @@ impl Manager {
server_builder.set_security_config(&self.security);
server_builder.set_worker_count(self.worker_count);
let server_port = server_builder.server_config().addr().port();
let mut servers = self.servers.lock().await;

View File

@@ -150,10 +150,6 @@ pub async fn run(config: Config) -> io::Result<()> {
server_builder.set_ipv6_first(config.ipv6_first);
}
if config.worker_count >= 1 {
server_builder.set_worker_count(config.worker_count);
}
server_builder.set_security_config(&config.security);
let server = server_builder.build().await?;

View File

@@ -30,7 +30,6 @@ pub struct ServerBuilder {
udp_capacity: Option<usize>,
manager_addr: Option<ManagerAddr>,
accept_opts: AcceptOpts,
worker_count: usize,
}
impl ServerBuilder {
@@ -48,7 +47,6 @@ impl ServerBuilder {
udp_capacity: None,
manager_addr: None,
accept_opts: AcceptOpts::default(),
worker_count: 1,
}
}
@@ -83,14 +81,6 @@ impl ServerBuilder {
self.manager_addr = Some(manager_addr);
}
/// Set runtime worker count
///
/// Should be replaced with tokio's metric API when it is stablized.
/// https://github.com/tokio-rs/tokio/issues/4073
pub fn set_worker_count(&mut self, worker_count: usize) {
self.worker_count = worker_count;
}
/// Get server's configuration
pub fn server_config(&self) -> &ServerConfig {
&self.svr_cfg
@@ -147,7 +137,7 @@ impl ServerBuilder {
let mut udp_server = None;
if self.svr_cfg.mode().enable_udp() {
let mut server = UdpServer::new(
let server = UdpServer::new(
self.context.clone(),
self.svr_cfg.clone(),
self.udp_expiry_duration,
@@ -155,7 +145,6 @@ impl ServerBuilder {
self.accept_opts.clone(),
)
.await?;
server.set_worker_count(self.worker_count);
udp_server = Some(server);
}

View File

@@ -27,7 +27,7 @@ use shadowsocks::{
},
ServerConfig,
};
use tokio::{sync::mpsc, task::JoinHandle, time};
use tokio::{runtime::Handle, sync::mpsc, task::JoinHandle, time};
#[cfg(windows)]
use windows_sys::Win32::Networking::WinSock::WSAEAFNOSUPPORT;
@@ -93,7 +93,6 @@ pub struct UdpServer {
keepalive_tx: mpsc::Sender<NatKey>,
keepalive_rx: mpsc::Receiver<NatKey>,
time_to_live: Duration,
worker_count: usize,
listener: Arc<MonProxySocket>,
svr_cfg: ServerConfig,
}
@@ -140,17 +139,11 @@ impl UdpServer {
keepalive_tx,
keepalive_rx,
time_to_live,
worker_count: 1,
listener,
svr_cfg,
})
}
#[inline]
pub(crate) fn set_worker_count(&mut self, worker_count: usize) {
self.worker_count = worker_count;
}
/// Server's configuration
pub fn server_config(&self) -> &ServerConfig {
&self.svr_cfg
@@ -173,7 +166,7 @@ impl UdpServer {
let mut orx_opt = None;
let cpus = self.worker_count;
let cpus = Handle::current().metrics().num_workers();
let mut other_receivers = Vec::new();
if cpus > 1 {
let (otx, orx) = mpsc::channel((cpus - 1) * 16);

View File

@@ -500,23 +500,18 @@ pub fn create(matches: &ArgMatches) -> Result<(Runtime, impl Future<Output = Exi
info!("shadowsocks manager {} build {}", crate::VERSION, crate::BUILD_TIME);
let mut worker_count = 1;
let mut builder = match service_config.runtime.mode {
RuntimeMode::SingleThread => Builder::new_current_thread(),
#[cfg(feature = "multi-threaded")]
RuntimeMode::MultiThread => {
let mut builder = Builder::new_multi_thread();
if let Some(worker_threads) = service_config.runtime.worker_count {
worker_count = worker_threads;
builder.worker_threads(worker_threads);
} else {
worker_count = num_cpus::get();
}
builder
}
};
config.worker_count = worker_count;
let runtime = builder.enable_all().build().expect("create tokio Runtime");

View File

@@ -520,23 +520,18 @@ pub fn create(matches: &ArgMatches) -> Result<(Runtime, impl Future<Output = Exi
info!("shadowsocks server {} build {}", crate::VERSION, crate::BUILD_TIME);
let mut worker_count = 1;
let mut builder = match service_config.runtime.mode {
RuntimeMode::SingleThread => Builder::new_current_thread(),
#[cfg(feature = "multi-threaded")]
RuntimeMode::MultiThread => {
let mut builder = Builder::new_multi_thread();
if let Some(worker_threads) = service_config.runtime.worker_count {
worker_count = worker_threads;
builder.worker_threads(worker_threads);
} else {
worker_count = num_cpus::get();
}
builder
}
};
config.worker_count = worker_count;
let runtime = builder.enable_all().build().expect("create tokio Runtime");