feat(shadowsocks-service): HTTP client auto retry (#2026)

This commit is contained in:
zonyitoo
2025-10-11 00:37:51 +08:00
parent 839861138c
commit 0321037f62

View File

@@ -16,11 +16,11 @@ use http::{HeaderValue, Method as HttpMethod, Uri, Version as HttpVersion, heade
use hyper::{
Request, Response,
body::{self, Body},
client::conn::{http1, http2},
client::conn::{TrySendError, http1, http2},
http::uri::Scheme,
rt::{Sleep, Timer},
};
use log::{error, trace};
use log::{debug, error, trace};
use lru_time_cache::LruCache;
use pin_project::pin_project;
use shadowsocks::relay::Address;
@@ -34,7 +34,7 @@ use super::{
utils::{check_keep_alive, connect_host, host_addr},
};
const CONNECTION_EXPIRE_DURATION: Duration = Duration::from_secs(5);
const CONNECTION_EXPIRE_DURATION: Duration = Duration::from_secs(20);
/// HTTPClient API request errors
#[derive(thiserror::Error, Debug)]
@@ -53,6 +53,15 @@ pub enum HttpClientError {
InvalidHeaderValue(#[from] InvalidHeaderValue),
}
#[derive(thiserror::Error, Debug)]
enum SendRequestError<B> {
#[error("{0}")]
Http(#[from] http::Error),
#[error("{0}")]
TrySend(#[from] TrySendError<Request<B>>),
}
#[derive(Clone, Debug)]
pub struct TokioTimer;
@@ -167,14 +176,40 @@ where
headers.insert("Host", host_value);
}
}
let req = Request::from_parts(req_parts, req_body);
let mut req = Request::from_parts(req_parts, req_body);
// 1. Check if there is an available client
//
// FIXME: If the cached connection is closed unexpectedly, this request will fail immediately.
if let Some(c) = self.get_cached_connection(&host).await {
trace!("HTTP client for host: {} taken from cache", host);
return self.send_request_conn(host, c, req).await;
match self.send_request_conn(host.clone(), c, req).await {
Ok(response) => return Ok(response),
Err(SendRequestError::TrySend(mut err)) => {
if let Some(inner_req) = err.take_message() {
req = inner_req;
// If TrySendError, the connection is probably broken, we should make a new connection
debug!(
"failed to send request via cached connection to host: {}, error: {}. retry with a new connection",
host,
err.error()
);
} else {
error!(
"failed to send request via cached connection to host: {}, error: {}. no request to retry",
host,
err.error()
);
return Err(err.into_error().into());
}
}
Err(SendRequestError::Http(err)) => {
error!(
"failed to send request via cached connection to host: {}, error: {}",
host, err
);
return Err(err.into());
}
}
}
// 2. If no. Make a new connection
@@ -196,7 +231,11 @@ where
}
};
self.send_request_conn(host, c, req).await
match self.send_request_conn(host, c, req).await {
Ok(response) => Ok(response),
Err(SendRequestError::TrySend(err)) => Err(err.into_error().into()),
Err(SendRequestError::Http(err)) => Err(err.into()),
}
}
async fn get_cached_connection(&self, host: &Address) -> Option<HttpConnection<B>> {
@@ -220,7 +259,7 @@ where
host: Address,
mut c: HttpConnection<B>,
req: Request<B>,
) -> Result<Response<body::Incoming>, HttpClientError> {
) -> Result<Response<body::Incoming>, SendRequestError<B>> {
trace!("HTTP making request to host: {}, request: {:?}", host, req);
let response = c.send_request(req).await?;
trace!("HTTP received response from host: {}, response: {:?}", host, response);
@@ -357,7 +396,7 @@ where
}
#[inline]
pub async fn send_request(&mut self, mut req: Request<B>) -> Result<Response<body::Incoming>, HttpClientError> {
pub async fn send_request(&mut self, mut req: Request<B>) -> Result<Response<body::Incoming>, SendRequestError<B>> {
match self {
Self::Http1(r) => {
if !matches!(
@@ -388,7 +427,7 @@ where
*(req.uri_mut()) = builder.build()?;
}
r.send_request(req).await.map_err(Into::into)
r.try_send_request(req).await.map_err(Into::into)
}
Self::Http2(r) => {
if !matches!(req.version(), HttpVersion::HTTP_2) {
@@ -397,7 +436,7 @@ where
*req.version_mut() = HttpVersion::HTTP_2;
}
r.send_request(req).await.map_err(Into::into)
r.try_send_request(req).await.map_err(Into::into)
}
}
}