Make picking server complexity O1

This commit is contained in:
zonyitoo
2019-04-26 08:59:23 +08:00
parent d048d25120
commit 3d3e5feb18

View File

@@ -1,7 +1,7 @@
use std::{
io,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
@@ -34,15 +34,25 @@ impl Server {
available: AtomicBool::new(true),
}
}
fn is_available(&self) -> bool {
self.available.load(Ordering::Acquire)
}
fn delay(&self) -> u64 {
self.elapsed.load(Ordering::Acquire)
}
}
#[derive(Clone)]
pub struct PingBalancer {
const DEFAULT_CHECK_INTERVAL_SEC: u64 = 10;
struct Inner {
servers: Vec<Arc<Server>>,
best_idx: AtomicUsize,
}
impl PingBalancer {
pub fn new(context: SharedContext) -> PingBalancer {
impl Inner {
fn new(context: SharedContext) -> Inner {
let config = context.config();
if config.server.is_empty() {
@@ -65,33 +75,39 @@ impl PingBalancer {
tokio::spawn(
// Check every 10 seconds
Interval::new(Instant::now() + Duration::from_secs(1), Duration::from_secs(10))
.for_each(move |_| {
let sc = sc.clone();
Interval::new(
Instant::now() + Duration::from_secs(1),
Duration::from_secs(DEFAULT_CHECK_INTERVAL_SEC),
)
.for_each(move |_| {
let sc = sc.clone();
let fut1 = PingBalancer::check_delay(sc.clone(), context.clone(), addr.clone());
let fut2 = PingBalancer::check_delay(sc.clone(), context.clone(), addr.clone());
let fut3 = PingBalancer::check_delay(sc.clone(), context.clone(), addr.clone());
let fut1 = Inner::check_delay(sc.clone(), context.clone(), addr.clone());
let fut2 = Inner::check_delay(sc.clone(), context.clone(), addr.clone());
let fut3 = Inner::check_delay(sc.clone(), context.clone(), addr.clone());
fut1.join3(fut2, fut3).then(move |res| {
match res {
Ok((d1, d2, d3)) => {
sc.available.store(true, Ordering::Release);
sc.elapsed.store((d1 + d2 + d3) / 3, Ordering::Release);
}
Err(..) => {
sc.available.store(false, Ordering::Release);
}
fut1.join3(fut2, fut3).then(move |res| {
match res {
Ok((d1, d2, d3)) => {
sc.available.store(true, Ordering::Release);
sc.elapsed.store((d1 + d2 + d3) / 3, Ordering::Release);
}
Err(..) => {
sc.available.store(false, Ordering::Release);
}
}
Ok(())
})
Ok(())
})
.map_err(|_| ()),
})
.map_err(|_| ()),
);
}
PingBalancer { servers }
Inner {
servers,
best_idx: AtomicUsize::new(0),
}
}
fn check_delay(
@@ -136,51 +152,76 @@ impl PingBalancer {
}
}
#[derive(Clone)]
pub struct PingBalancer {
inner: Arc<Inner>,
}
impl PingBalancer {
pub fn new(context: SharedContext) -> PingBalancer {
let inner = Arc::new(Inner::new(context));
let inner_cloned = inner.clone();
tokio::spawn(
Interval::new(
Instant::now() + Duration::from_secs(2),
Duration::from_secs(DEFAULT_CHECK_INTERVAL_SEC),
)
.for_each(move |_| {
let inner = inner_cloned.clone();
if inner.servers.is_empty() {
panic!("No server");
}
// Choose the best one
let mut svr_idx = 0;
for (idx, svr) in inner.servers.iter().enumerate() {
let choosen_svr = &inner.servers[svr_idx];
if svr.is_available() && (!choosen_svr.is_available() || svr.delay() < choosen_svr.delay()) {
svr_idx = idx;
}
}
let choosen_svr = &inner.servers[svr_idx];
if svr_idx == 0 && !choosen_svr.is_available() {
// Cannot find any usable servers, use the first one (svr_idx = 0)
error!(
"cannot find any usable servers, picked {} delay {} ms",
choosen_svr.config.addr(),
choosen_svr.delay()
);
} else {
debug!(
"chosen the best server {} delay {} ms",
choosen_svr.config.addr(),
choosen_svr.delay()
);
}
inner.best_idx.store(svr_idx, Ordering::Release);
Ok(())
})
.map_err(|_| ()),
);
PingBalancer { inner }
}
}
impl LoadBalancer for PingBalancer {
fn pick_server(&mut self) -> Arc<ServerConfig> {
if self.servers.is_empty() {
panic!("No server");
if self.inner.servers.is_empty() {
panic!("no server in configuration");
}
// Choose the best one
let mut choosen_svr = &self.servers[0];
let mut found_one = false;
for svr in &self.servers {
if svr.available.load(Ordering::Acquire)
&& (!choosen_svr.available.load(Ordering::Acquire)
|| svr.elapsed.load(Ordering::Acquire) < choosen_svr.elapsed.load(Ordering::Acquire))
{
found_one = true;
choosen_svr = svr;
}
}
if !found_one && !choosen_svr.available.load(Ordering::Acquire) {
// Just choose one available
for svr in &self.servers {
if svr.available.load(Ordering::Acquire) {
choosen_svr = svr;
}
}
debug!(
"cannot find any available servers, picked {} delay {} ms",
choosen_svr.config.addr(),
choosen_svr.elapsed.load(Ordering::Acquire)
);
} else {
debug!(
"choosen the best server {} delay {} ms",
choosen_svr.config.addr(),
choosen_svr.elapsed.load(Ordering::Acquire)
);
}
choosen_svr.config.clone()
let idx = self.inner.best_idx.load(Ordering::Acquire);
self.inner.servers[idx].config.clone()
}
fn total(&self) -> usize {
self.servers.len()
self.inner.servers.len()
}
}