support ssmanager server standalone mode

- fixes #648
- allow restarting ssmanager without killing all running ssservers
This commit is contained in:
zonyitoo
2021-10-15 19:02:18 +08:00
parent 9cba0ba85b
commit a9e2f372c4
5 changed files with 341 additions and 15 deletions

View File

@@ -498,7 +498,7 @@ fn main() {
}
#[cfg(unix)]
if matches.is_present("DAEMONIZE") {
if matches.is_present("DAEMONIZE") || matches.is_present("DAEMONIZE_PID_PATH") {
use self::common::daemonize;
daemonize::daemonize(matches.value_of("DAEMONIZE_PID_PATH"));
}

View File

@@ -89,6 +89,9 @@ fn main() {
app = clap_app!(@app (app)
(@arg DAEMONIZE: -d --("daemonize") "Daemonize")
(@arg DAEMONIZE_PID_PATH: --("daemonize-pid") +takes_value "File path to store daemonized process's PID")
(@arg MANAGER_SERVER_MODE: --("manager-server-mode") +takes_value possible_values(&["builtin", "standalone"]) "Servers that running in builtin or standalone mode")
(@arg MANAGER_SERVER_WORKING_DIRECTORY: --("manager-server-working-directory") +takes_value "Folder for putting servers' configuration and pid files, default is current directory")
);
}
@@ -210,6 +213,18 @@ fn main() {
plugin_args: Vec::new(),
});
}
#[cfg(unix)]
if let Some(server_mode) = matches.value_of("MANAGER_SERVER_MODE") {
manager_config.server_mode = server_mode.parse().expect("manager-server-mode");
}
#[cfg(unix)]
if let Some(server_working_directory) = matches.value_of("MANAGER_SERVER_WORKING_DIRECTORY") {
manager_config.server_working_directory = server_working_directory
.parse()
.expect("manager-server-working-directory");
}
}
// Overrides
@@ -288,7 +303,7 @@ fn main() {
}
#[cfg(unix)]
if matches.is_present("DAEMONIZE") {
if matches.is_present("DAEMONIZE") || matches.is_present("DAEMONIZE_PID_PATH") {
use self::common::daemonize;
daemonize::daemonize(matches.value_of("DAEMONIZE_PID_PATH"));
}

View File

@@ -86,7 +86,7 @@ fn main() {
{
app = clap_app!(@app (app)
(@arg DAEMONIZE: -d --("daemonize") "Daemonize")
(@arg DAEMONIZE_PID_PATH: --("daemonize-pid") +takes_value "File path to store daemonized process's PID")
(@arg DAEMONIZE_PID_PATH: -f --("daemonize-pid") +takes_value "File path to store daemonized process's PID")
);
}
@@ -287,7 +287,7 @@ fn main() {
}
#[cfg(unix)]
if matches.is_present("DAEMONIZE") {
if matches.is_present("DAEMONIZE") || matches.is_present("DAEMONIZE_PID_PATH") {
use self::common::daemonize;
daemonize::daemonize(matches.value_of("DAEMONIZE_PID_PATH"));
}

View File

@@ -541,6 +541,50 @@ impl FromStr for ManagerServerHost {
}
}
/// Mode of Manager's server
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum ManagerServerMode {
/// Run shadowsocks server in the same process of manager
Builtin,
/// Run shadowsocks server in standalone (process) mode
#[cfg(unix)]
Standalone,
}
impl Default for ManagerServerMode {
fn default() -> ManagerServerMode {
ManagerServerMode::Builtin
}
}
/// Parsing ManagerServerMode error
#[derive(Debug, Clone, Copy)]
pub struct ManagerServerModeError;
impl FromStr for ManagerServerMode {
type Err = ManagerServerModeError;
fn from_str(s: &str) -> Result<ManagerServerMode, Self::Err> {
match s {
"builtin" => Ok(ManagerServerMode::Builtin),
#[cfg(unix)]
"standalone" => Ok(ManagerServerMode::Standalone),
_ => Err(ManagerServerModeError),
}
}
}
impl Display for ManagerServerMode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ManagerServerMode::Builtin => f.write_str("builtin"),
#[cfg(unix)]
ManagerServerMode::Standalone => f.write_str("standalone"),
}
}
}
/// Configuration for Manager
#[derive(Clone, Debug)]
pub struct ManagerConfig {
@@ -558,6 +602,14 @@ pub struct ManagerConfig {
pub server_host: ManagerServerHost,
/// Server's mode
pub mode: Mode,
/// Server's running mode
pub server_mode: ManagerServerMode,
/// Server's command if running in Standalone mode
#[cfg(unix)]
pub server_program: String,
/// Server's working directory if running in Standalone mode
#[cfg(unix)]
pub server_working_directory: PathBuf,
}
impl ManagerConfig {
@@ -570,6 +622,14 @@ impl ManagerConfig {
timeout: None,
server_host: ManagerServerHost::default(),
mode: Mode::TcpOnly,
server_mode: ManagerServerMode::Builtin,
#[cfg(unix)]
server_program: "ssserver".to_owned(),
#[cfg(unix)]
server_working_directory: match std::env::current_dir() {
Ok(d) => d,
Err(..) => "/tmp/shadowsocks-manager".into(),
},
}
}
}

View File

@@ -1,6 +1,14 @@
//! Shadowsocks Manager server
use std::{collections::HashMap, io, net::SocketAddr, sync::Arc, time::Duration};
#[cfg(unix)]
use std::path::PathBuf;
use std::{
collections::{hash_map::Entry, HashMap},
io,
net::SocketAddr,
sync::Arc,
time::Duration,
};
use log::{error, info, trace};
use shadowsocks::{
@@ -29,20 +37,41 @@ use tokio::{sync::Mutex, task::JoinHandle};
use crate::{
acl::AccessControl,
config::{ManagerConfig, ManagerServerHost, SecurityConfig},
config::{ManagerConfig, ManagerServerHost, ManagerServerMode, SecurityConfig},
net::FlowStat,
server::Server,
};
enum ServerInstanceMode {
Builtin {
flow_stat: Arc<FlowStat>,
abortable: JoinHandle<io::Result<()>>,
},
#[cfg(unix)]
Standalone { flow_stat: u64 },
}
struct ServerInstance {
flow_stat: Arc<FlowStat>,
abortable: JoinHandle<io::Result<()>>,
mode: ServerInstanceMode,
svr_cfg: ServerConfig,
}
impl Drop for ServerInstance {
fn drop(&mut self) {
self.abortable.abort();
if let ServerInstanceMode::Builtin { ref abortable, .. } = self.mode {
abortable.abort();
}
}
}
impl ServerInstance {
fn flow_stat(&self) -> u64 {
match self.mode {
ServerInstanceMode::Builtin { ref flow_stat, .. } => flow_stat.tx() + flow_stat.rx(),
#[cfg(unix)]
ServerInstanceMode::Standalone { flow_stat } => flow_stat,
}
}
}
@@ -175,6 +204,14 @@ impl Manager {
}
pub async fn add_server(&self, svr_cfg: ServerConfig) {
match self.svr_cfg.server_mode {
ManagerServerMode::Builtin => self.add_server_builtin(svr_cfg).await,
#[cfg(unix)]
ManagerServerMode::Standalone => self.add_server_standalone(svr_cfg).await,
}
}
async fn add_server_builtin(&self, svr_cfg: ServerConfig) {
// Each server should use a separate Context, but shares
//
// * AccessControlList
@@ -222,8 +259,142 @@ impl Manager {
servers.insert(
server_port,
ServerInstance {
flow_stat,
abortable,
mode: ServerInstanceMode::Builtin { flow_stat, abortable },
svr_cfg,
},
);
}
#[cfg(unix)]
fn server_pid_path(&self, port: u16) -> PathBuf {
let pid_file_name = format!("shadowsocks-server-{}.pid", port);
let mut pid_path = self.svr_cfg.server_working_directory.clone();
pid_path.push(&pid_file_name);
pid_path
}
#[cfg(unix)]
fn server_config_path(&self, port: u16) -> PathBuf {
let config_file_name = format!("shadowsocks-server-{}.json", port);
let mut config_file_path = self.svr_cfg.server_working_directory.clone();
config_file_path.push(&config_file_name);
config_file_path
}
#[cfg(unix)]
fn kill_standalone_server(&self, port: u16) {
use log::{debug, warn};
use std::{
fs::{self, File},
io::Read,
};
let pid_path = self.server_pid_path(port);
if pid_path.exists() {
if let Ok(mut pid_file) = File::open(&pid_path) {
let mut pid_content = String::new();
if let Ok(..) = pid_file.read_to_string(&mut pid_content) {
let pid_content = pid_content.trim();
match pid_content.parse::<libc::pid_t>() {
Ok(pid) => {
let _ = unsafe { libc::kill(pid, libc::SIGTERM) };
debug!("killed standalone server port {}, pid: {}", port, pid);
}
Err(..) => {
warn!("failed to read pid from {}", pid_path.display());
}
}
}
}
}
let server_config_path = self.server_config_path(port);
let _ = fs::remove_file(&pid_path);
let _ = fs::remove_file(&server_config_path);
}
#[cfg(unix)]
async fn add_server_standalone(&self, svr_cfg: ServerConfig) {
use std::{
fs::{self, OpenOptions},
io::Write,
};
use tokio::process::Command;
use crate::config::{Config, ConfigType};
// Lock the map first incase there are multiple requests to create one server instance
let mut servers = self.servers.lock().await;
// Check if working_directory exists
if !self.svr_cfg.server_working_directory.exists() {
fs::create_dir_all(&self.svr_cfg.server_working_directory).expect("create working_directory");
}
let port = svr_cfg.addr().port();
// Check if there is already a running process
self.kill_standalone_server(port);
// Create configuration file for server
let config_file_path = self.server_config_path(port);
let pid_path = self.server_pid_path(port);
let mut config = Config::new(ConfigType::Server);
config.server.push(svr_cfg.clone());
trace!("created standalone server with config {:?}", config);
let config_file_content = format!("{}", config);
match OpenOptions::new().write(true).create(true).open(&config_file_path) {
Err(err) => {
error!(
"failed to open {} for writing, error: {}",
config_file_path.display(),
err
);
return;
}
Ok(mut file) => {
if let Err(err) = file.write_all(config_file_content.as_bytes()) {
error!("failed to write {}, error: {}", config_file_path.display(), err);
return;
}
let _ = file.sync_data();
}
}
let manager_addr = self.svr_cfg.addr.to_string();
// Start server process
let child_result = Command::new(&self.svr_cfg.server_program)
.arg("-c")
.arg(&config_file_path)
.arg("--daemonize")
.arg("--daemonize-pid")
.arg(&pid_path)
.arg("--manager-addr")
.arg(&manager_addr)
.kill_on_drop(false)
.spawn();
if let Err(err) = child_result {
error!(
"failed to spawn process of {}, error: {}",
self.svr_cfg.server_program, err
);
return;
}
// Greate. Record into the map
servers.insert(
port,
ServerInstance {
mode: ServerInstanceMode::Standalone { flow_stat: 0 },
svr_cfg,
},
);
@@ -284,6 +455,12 @@ impl Manager {
async fn handle_remove(&self, req: &RemoveRequest) -> RemoveResponse {
let mut servers = self.servers.lock().await;
servers.remove(&req.server_port);
#[cfg(unix)]
if self.svr_cfg.server_mode == ManagerServerMode::Standalone {
self.kill_standalone_server(req.server_port);
}
RemoveResponse("ok".to_owned())
}
@@ -315,14 +492,88 @@ impl Manager {
let mut stat = HashMap::new();
for (port, server) in instances.iter() {
let flow_stat = &server.flow_stat;
stat.insert(*port, flow_stat.tx() + flow_stat.rx());
stat.insert(*port, server.flow_stat());
}
PingResponse { stat }
}
async fn handle_stat(&self, _stat: &StatRequest) {
// `stat` is not supported, because all servers are running in the same process of the manager
#[cfg(not(unix))]
async fn handle_stat(&self, _: &StatRequest) {}
#[cfg(unix)]
async fn handle_stat(&self, stat: &StatRequest) {
use log::warn;
use crate::config::{Config, ConfigType};
// `stat` is only supported for Standalone mode
if self.svr_cfg.server_mode != ManagerServerMode::Standalone {
return;
}
let mut instances = self.servers.lock().await;
// Get or create a new instance then record the data statistic numbers
for (port, flow) in stat.stat.iter() {
match instances.entry(*port) {
Entry::Occupied(mut occ) => match occ.get_mut().mode {
ServerInstanceMode::Builtin { .. } => {
error!("received `stat` for port {} that is running a builtin server", *port)
}
ServerInstanceMode::Standalone { ref mut flow_stat } => *flow_stat = *flow,
},
Entry::Vacant(vac) => {
// Read config from file
let server_config_path = self.server_config_path(*port);
if !server_config_path.exists() {
warn!(
"received `stat` for port {} but file {} doesn't exist",
*port,
server_config_path.display()
);
continue;
}
match Config::load_from_file(&server_config_path, ConfigType::Server) {
Err(err) => {
error!(
"failed to load {} for server port {}, error: {}",
server_config_path.display(),
*port,
err
);
continue;
}
Ok(config) => {
trace!(
"loaded {} for server port {}, {:?}",
server_config_path.display(),
*port,
config
);
if config.server.len() != 1 {
error!(
"invalid config {} for server port {}, containing {} servers",
server_config_path.display(),
*port,
config.server.len()
);
continue;
}
let svr_cfg = config.server[0].clone();
vac.insert(ServerInstance {
mode: ServerInstanceMode::Standalone { flow_stat: *flow },
svr_cfg,
});
}
}
}
}
}
}
}