configurable balancer max_server_rtt, check_interval

This commit is contained in:
zonyitoo
2021-10-04 00:37:15 +08:00
parent 665e717531
commit e431722561
7 changed files with 137 additions and 38 deletions

24
Cargo.lock generated
View File

@@ -525,9 +525,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.3.4"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7f3675cfef6a30c8031cf9e6493ebdc3bb3272a3fea3923c4210d1830e6a472"
checksum = "6c06815895acec637cd6ed6e9662c935b866d20a106f8361892893a7d9234964"
dependencies = [
"bytes",
"fnv",
@@ -969,9 +969,9 @@ dependencies = [
[[package]]
name = "nix"
version = "0.22.1"
version = "0.22.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7555d6c7164cc913be1ce7f95cbecdabda61eb2ccd89008524af306fb7f5031"
checksum = "d3bb9a13fa32bc5aeb64150cd3f32d6cf4c748f8f8a417cce5d2eb976a8370ba"
dependencies = [
"bitflags",
"cc",
@@ -1750,9 +1750,9 @@ checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "syn"
version = "1.0.77"
version = "1.0.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5239bc68e0fef57495900cfea4e8dc75596d9a319d7e16b1e0a440d24e6fe0a0"
checksum = "a4eac2e6c19f5c3abc0c229bea31ff0b9b091c7b14990e8924b92902a303a0c0"
dependencies = [
"proc-macro2",
"quote",
@@ -1905,9 +1905,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "1.3.0"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54473be61f4ebe4efd09cec9bd5d16fa51d70ea0192213d754d2d500457db110"
checksum = "154794c8f499c2619acd19e839294703e9e32e7630ef5f46ea80d4ef0fbee5eb"
dependencies = [
"proc-macro2",
"quote",
@@ -1989,9 +1989,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.16"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98863d0dd09fa59a1b79c6750ad80dbda6b75f4e71c437a6a1a8cb91a8bcbd77"
checksum = "c4f915eb6abf914599c200260efced9203504c4c37380af10cdf3b7d36970650"
dependencies = [
"proc-macro2",
"quote",
@@ -2000,9 +2000,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.20"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46125608c26121c81b0c6d693eab5a420e416da7e43c426d2e8f7df8da8a3acf"
checksum = "1f4ed65637b8390770814083d20756f87bfa2c21bf2f110babdc5438351746e4"
dependencies = [
"lazy_static",
]

View File

@@ -558,7 +558,16 @@ Example configuration:
"nofile": 10240,
// Try to resolve domain name to IPv6 (AAAA) addresses first
"ipv6_first": false
"ipv6_first": false,
// Balancer customization
"balancer": {
// MAX Round-Trip-Time (RTT) of servers
// The timeout seconds of each individual checks
"max_server_rtt": 5,
// Interval seconds between each check
"check_interval": 10,
}
}
```

View File

@@ -95,6 +95,14 @@ struct SSSecurityReplayAttackConfig {
policy: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
struct SSBalancerConfig {
#[serde(skip_serializing_if = "Option::is_none")]
max_server_rtt: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
check_interval: Option<u64>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
struct SSConfig {
#[serde(skip_serializing_if = "Option::is_none")]
@@ -148,6 +156,8 @@ struct SSConfig {
fast_open: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
security: Option<SSSecurityConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
balancer: Option<SSBalancerConfig>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
@@ -837,6 +847,15 @@ pub struct SecurityReplayAttackConfig {
pub policy: ReplayAttackPolicy,
}
/// Balancer Config
#[derive(Clone, Debug, Default)]
pub struct BalancerConfig {
/// MAX rtt of servers, which is the timeout duration of each check requests
pub max_server_rtt: Option<Duration>,
/// Interval between each checking
pub check_interval: Option<Duration>,
}
/// Configuration
#[derive(Clone, Debug)]
pub struct Config {
@@ -916,6 +935,9 @@ pub struct Config {
/// Replay attack policy
pub security: SecurityConfig,
/// Balancer config of local server
pub balancer: BalancerConfig,
}
/// Configuration parsing error kind
@@ -1020,6 +1042,8 @@ impl Config {
stat_path: None,
security: SecurityConfig::default(),
balancer: BalancerConfig::default(),
}
}
@@ -1550,6 +1574,13 @@ impl Config {
}
}
if let Some(balancer) = config.balancer {
nconfig.balancer = BalancerConfig {
max_server_rtt: balancer.max_server_rtt.map(Duration::from_secs),
check_interval: balancer.check_interval.map(Duration::from_secs),
};
}
Ok(nconfig)
}
@@ -1729,6 +1760,21 @@ impl Config {
);
return Err(err);
}
// Balancer related checks
if let Some(rtt) = self.balancer.max_server_rtt {
if rtt.as_secs() == 0 {
let err = Error::new(ErrorKind::Invalid, "balancer.max_server_rtt must be > 0", None);
return Err(err);
}
}
if let Some(intv) = self.balancer.check_interval {
if intv.as_secs() == 0 {
let err = Error::new(ErrorKind::Invalid, "balancer.check_interval must be > 0", None);
return Err(err);
}
}
}
if self.config_type.is_server() && self.server.is_empty() {
@@ -2081,6 +2127,14 @@ impl fmt::Display for Config {
});
}
// Balancer
if self.balancer.max_server_rtt.is_some() || self.balancer.check_interval.is_some() {
jconf.balancer = Some(SSBalancerConfig {
max_server_rtt: self.balancer.max_server_rtt.as_ref().map(Duration::as_secs),
check_interval: self.balancer.check_interval.as_ref().map(Duration::as_secs),
});
}
write!(f, "{}", json5::to_string(&jconf).unwrap())
}
}

View File

@@ -58,6 +58,8 @@ pub struct PingBalancerBuilder {
servers: Vec<Arc<ServerIdent>>,
context: Arc<ServiceContext>,
mode: Mode,
max_server_rtt: Duration,
check_interval: Duration,
}
impl PingBalancerBuilder {
@@ -66,11 +68,22 @@ impl PingBalancerBuilder {
servers: Vec::new(),
context,
mode,
max_server_rtt: Duration::from_secs(DEFAULT_CHECK_TIMEOUT_SEC),
check_interval: Duration::from_secs(DEFAULT_CHECK_INTERVAL_SEC),
}
}
pub fn add_server(&mut self, server: ServerIdent) {
self.servers.push(Arc::new(server));
pub fn add_server(&mut self, server: ServerConfig) {
let ident = ServerIdent::new(server, self.max_server_rtt);
self.servers.push(Arc::new(ident));
}
pub fn max_server_rtt(&mut self, rtt: Duration) {
self.max_server_rtt = rtt;
}
pub fn check_interval(&mut self, intv: Duration) {
self.check_interval = intv;
}
pub async fn build(self) -> PingBalancer {
@@ -131,6 +144,8 @@ impl PingBalancerBuilder {
best_udp_idx: AtomicUsize::new(best_udp_idx),
context: self.context,
mode: self.mode,
max_server_rtt: self.max_server_rtt,
check_interval: self.check_interval,
};
balancer_context.init_score().await;
@@ -157,6 +172,8 @@ struct PingBalancerContext {
best_udp_idx: AtomicUsize,
context: Arc<ServiceContext>,
mode: Mode,
max_server_rtt: Duration,
check_interval: Duration,
}
impl PingBalancerContext {
@@ -231,6 +248,7 @@ impl PingBalancerContext {
server: server.clone(),
server_type: ServerType::Tcp,
context: self.context.clone(),
max_server_rtt: self.max_server_rtt,
};
vfut_tcp.push(checker.check_update_score());
}
@@ -240,6 +258,7 @@ impl PingBalancerContext {
server: server.clone(),
server_type: ServerType::Udp,
context: self.context.clone(),
max_server_rtt: self.max_server_rtt,
};
vfut_udp.push(checker.check_update_score());
}
@@ -336,7 +355,7 @@ impl PingBalancerContext {
async fn checker_task_real(&self) {
loop {
time::sleep(Duration::from_secs(DEFAULT_CHECK_INTERVAL_SEC)).await;
time::sleep(self.check_interval).await;
// Sleep before check.
// PingBalancer already checked once when constructing
@@ -404,6 +423,7 @@ struct PingChecker {
server: Arc<ServerIdent>,
server_type: ServerType,
context: Arc<ServiceContext>,
max_server_rtt: Duration,
}
impl PingChecker {
@@ -563,8 +583,7 @@ impl PingChecker {
let start = Instant::now();
// Send HTTP GET and read the first byte
let timeout = Duration::from_secs(DEFAULT_CHECK_TIMEOUT_SEC);
let res = time::timeout(timeout, self.check_request()).await;
let res = time::timeout(self.max_server_rtt, self.check_request()).await;
let elapsed = Instant::now() - start;
let elapsed = elapsed.as_secs() as u32 * 1000 + elapsed.subsec_millis(); // Converted to ms

View File

@@ -3,6 +3,7 @@
use std::{
fmt::{self, Debug},
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};
use shadowsocks::ServerConfig;
@@ -18,9 +19,12 @@ pub struct ServerScore {
impl ServerScore {
/// Create a `ServerScore`
pub fn new(user_weight: f32) -> ServerScore {
pub fn new(user_weight: f32, max_server_rtt: Duration) -> ServerScore {
let max_server_rtt = max_server_rtt.as_millis() as u32;
assert!(max_server_rtt > 0);
ServerScore {
stat_data: Mutex::new(ServerStat::new(user_weight)),
stat_data: Mutex::new(ServerStat::new(user_weight, max_server_rtt)),
score: AtomicU32::new(u32::MAX),
}
}
@@ -61,11 +65,11 @@ pub struct ServerIdent {
}
impl ServerIdent {
/// Create a ServerIdent`
pub fn new(svr_cfg: ServerConfig) -> ServerIdent {
/// Create a `ServerIdent`
pub fn new(svr_cfg: ServerConfig, max_server_rtt: Duration) -> ServerIdent {
ServerIdent {
tcp_score: ServerScore::new(svr_cfg.weight().tcp_weight()),
udp_score: ServerScore::new(svr_cfg.weight().udp_weight()),
tcp_score: ServerScore::new(svr_cfg.weight().tcp_weight(), max_server_rtt),
udp_score: ServerScore::new(svr_cfg.weight().udp_weight(), max_server_rtt),
svr_cfg,
}
}

View File

@@ -7,8 +7,7 @@ pub const DEFAULT_CHECK_INTERVAL_SEC: u64 = 10;
/// Timeout of each check
pub const DEFAULT_CHECK_TIMEOUT_SEC: u64 = 5; // A common connection timeout of 5 seconds.
const MAX_SERVER_RTT: u32 = DEFAULT_CHECK_TIMEOUT_SEC as u32 * 1000;
const MAX_LATENCY_QUEUE_SIZE: usize = 59; // Account for the last 10 minutes.
const MAX_LATENCY_QUEUE_SIZE: usize = 67;
/// Statistic score
#[derive(Debug, Copy, Clone)]
@@ -27,20 +26,24 @@ pub struct ServerStat {
/// Use median instead of average time,
/// because probing result may have some really bad cases
rtt: u32,
/// MAX server's RTT, normally the check timeout milliseconds
max_server_rtt: u32,
/// Total_Fail / Total_Probe
fail_rate: f64,
/// Recently probe data
latency_queue: VecDeque<Score>,
/// Score's standard deviation
latency_stdev: f64,
/// Score's standard deviation MAX
max_latency_stdev: f64,
/// Score's average
latency_mean: f64,
/// User's customized weight
user_weight: f32,
}
fn max_latency_stdev() -> f64 {
let mrtt = MAX_SERVER_RTT as f64;
fn max_latency_stdev(max_server_rtt: u32) -> f64 {
let mrtt = max_server_rtt as f64;
let avg = (0.0 + mrtt) / 2.0;
let diff1 = (0.0 - avg) * (0.0 - avg);
let diff2 = (mrtt - avg) * (mrtt - avg);
@@ -49,14 +52,16 @@ fn max_latency_stdev() -> f64 {
}
impl ServerStat {
pub fn new(user_weight: f32) -> ServerStat {
pub fn new(user_weight: f32, max_server_rtt: u32) -> ServerStat {
assert!((0.0..=1.0).contains(&user_weight));
ServerStat {
rtt: MAX_SERVER_RTT,
rtt: max_server_rtt,
max_server_rtt,
fail_rate: 1.0,
latency_queue: VecDeque::new(),
latency_stdev: 0.0,
max_latency_stdev: max_latency_stdev(max_server_rtt),
latency_mean: 0.0,
user_weight,
}
@@ -64,10 +69,10 @@ impl ServerStat {
fn score(&self) -> u32 {
// Normalize rtt
let nrtt = self.rtt as f64 / MAX_SERVER_RTT as f64;
let nrtt = self.rtt as f64 / self.max_server_rtt as f64;
// Normalize stdev
let nstdev = self.latency_stdev / max_latency_stdev();
let nstdev = self.latency_stdev / self.max_latency_stdev;
const SCORE_RTT_WEIGHT: f64 = 1.0;
const SCORE_FAIL_WEIGHT: f64 = 3.0;

View File

@@ -27,10 +27,7 @@ use crate::{
dns::build_dns_resolver,
};
use self::{
context::ServiceContext,
loadbalancing::{PingBalancerBuilder, ServerIdent},
};
use self::{context::ServiceContext, loadbalancing::PingBalancerBuilder};
pub mod context;
#[cfg(feature = "local-dns")]
@@ -202,9 +199,20 @@ pub async fn run(mut config: Config) -> io::Result<()> {
}
let mut balancer_builder = PingBalancerBuilder::new(context.clone(), mode);
for server in config.server {
balancer_builder.add_server(ServerIdent::new(server));
// max_server_rtt have to be set before add_server
if let Some(rtt) = config.balancer.max_server_rtt {
balancer_builder.max_server_rtt(rtt);
}
if let Some(intv) = config.balancer.check_interval {
balancer_builder.check_interval(intv);
}
for server in config.server {
balancer_builder.add_server(server);
}
balancer_builder.build().await
};