diff --git a/crates/shadowsocks-service/src/local/http/http_client.rs b/crates/shadowsocks-service/src/local/http/http_client.rs index da7d95e2..493c52f6 100644 --- a/crates/shadowsocks-service/src/local/http/http_client.rs +++ b/crates/shadowsocks-service/src/local/http/http_client.rs @@ -16,11 +16,11 @@ use http::{HeaderValue, Method as HttpMethod, Uri, Version as HttpVersion, heade use hyper::{ Request, Response, body::{self, Body}, - client::conn::{http1, http2}, + client::conn::{TrySendError, http1, http2}, http::uri::Scheme, rt::{Sleep, Timer}, }; -use log::{error, trace}; +use log::{debug, error, trace}; use lru_time_cache::LruCache; use pin_project::pin_project; use shadowsocks::relay::Address; @@ -34,7 +34,7 @@ use super::{ utils::{check_keep_alive, connect_host, host_addr}, }; -const CONNECTION_EXPIRE_DURATION: Duration = Duration::from_secs(5); +const CONNECTION_EXPIRE_DURATION: Duration = Duration::from_secs(20); /// HTTPClient API request errors #[derive(thiserror::Error, Debug)] @@ -53,6 +53,15 @@ pub enum HttpClientError { InvalidHeaderValue(#[from] InvalidHeaderValue), } +#[derive(thiserror::Error, Debug)] +enum SendRequestError { + #[error("{0}")] + Http(#[from] http::Error), + + #[error("{0}")] + TrySend(#[from] TrySendError>), +} + #[derive(Clone, Debug)] pub struct TokioTimer; @@ -167,14 +176,40 @@ where headers.insert("Host", host_value); } } - let req = Request::from_parts(req_parts, req_body); + let mut req = Request::from_parts(req_parts, req_body); // 1. Check if there is an available client - // - // FIXME: If the cached connection is closed unexpectedly, this request will fail immediately. if let Some(c) = self.get_cached_connection(&host).await { trace!("HTTP client for host: {} taken from cache", host); - return self.send_request_conn(host, c, req).await; + match self.send_request_conn(host.clone(), c, req).await { + Ok(response) => return Ok(response), + Err(SendRequestError::TrySend(mut err)) => { + if let Some(inner_req) = err.take_message() { + req = inner_req; + + // If TrySendError, the connection is probably broken, we should make a new connection + debug!( + "failed to send request via cached connection to host: {}, error: {}. retry with a new connection", + host, + err.error() + ); + } else { + error!( + "failed to send request via cached connection to host: {}, error: {}. no request to retry", + host, + err.error() + ); + return Err(err.into_error().into()); + } + } + Err(SendRequestError::Http(err)) => { + error!( + "failed to send request via cached connection to host: {}, error: {}", + host, err + ); + return Err(err.into()); + } + } } // 2. If no. Make a new connection @@ -196,7 +231,11 @@ where } }; - self.send_request_conn(host, c, req).await + match self.send_request_conn(host, c, req).await { + Ok(response) => Ok(response), + Err(SendRequestError::TrySend(err)) => Err(err.into_error().into()), + Err(SendRequestError::Http(err)) => Err(err.into()), + } } async fn get_cached_connection(&self, host: &Address) -> Option> { @@ -220,7 +259,7 @@ where host: Address, mut c: HttpConnection, req: Request, - ) -> Result, HttpClientError> { + ) -> Result, SendRequestError> { trace!("HTTP making request to host: {}, request: {:?}", host, req); let response = c.send_request(req).await?; trace!("HTTP received response from host: {}, response: {:?}", host, response); @@ -357,7 +396,7 @@ where } #[inline] - pub async fn send_request(&mut self, mut req: Request) -> Result, HttpClientError> { + pub async fn send_request(&mut self, mut req: Request) -> Result, SendRequestError> { match self { Self::Http1(r) => { if !matches!( @@ -388,7 +427,7 @@ where *(req.uri_mut()) = builder.build()?; } - r.send_request(req).await.map_err(Into::into) + r.try_send_request(req).await.map_err(Into::into) } Self::Http2(r) => { if !matches!(req.version(), HttpVersion::HTTP_2) { @@ -397,7 +436,7 @@ where *req.version_mut() = HttpVersion::HTTP_2; } - r.send_request(req).await.map_err(Into::into) + r.try_send_request(req).await.map_err(Into::into) } } }