feat(shadowsocks): splits DatagramTransport into DatagramSocket, DatagramSend, DatagramReceive

This commit is contained in:
zonyitoo
2024-09-21 00:20:31 +08:00
parent 60d0576b26
commit 0ce6cb333b
4 changed files with 73 additions and 41 deletions

View File

@@ -5,7 +5,7 @@ use std::{io, net::SocketAddr, sync::Arc};
use shadowsocks::{
relay::{
socks5::Address,
udprelay::{options::UdpSocketControlData, DatagramTransport},
udprelay::{options::UdpSocketControlData, DatagramReceive, DatagramSend},
},
ProxySocket,
};
@@ -39,7 +39,7 @@ impl<S> MonProxySocket<S> {
impl<S> MonProxySocket<S>
where
S: DatagramTransport,
S: DatagramSend,
{
/// Send a UDP packet to addr through proxy
#[inline]
@@ -87,7 +87,12 @@ where
Ok(())
}
}
impl<S> MonProxySocket<S>
where
S: DatagramReceive,
{
/// Receive packet from Shadowsocks' UDP server
///
/// This function will use `recv_buf` to store intermediate data, so it has to be big enough to store the whole shadowsocks' packet

View File

@@ -13,18 +13,24 @@ use tokio::io::ReadBuf;
use crate::net::UdpSocket;
/// a trait for datagram transport that wraps around a tokio `UdpSocket`
pub trait DatagramTransport {
/// A socket I/O object that can transport datagram
pub trait DatagramSocket {
/// Local binded address
fn local_addr(&self) -> io::Result<SocketAddr>;
}
/// A socket I/O object that can receive datagram
pub trait DatagramReceive {
/// `recv` data into `buf`
fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>>;
/// `recv` data into `buf` with source address
fn poll_recv_from(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<SocketAddr>>;
/// Check if the underlying I/O object is ready for `recv`
fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
}
/// A socket I/O object that can send datagram
pub trait DatagramSend {
/// `send` data with `buf`, returning the sent bytes
fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>;
/// `send` data with `buf` to `target`, returning the sent bytes
@@ -33,7 +39,13 @@ pub trait DatagramTransport {
fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
}
impl DatagramTransport for UdpSocket {
impl DatagramSocket for UdpSocket {
fn local_addr(&self) -> io::Result<SocketAddr> {
self.deref().local_addr()
}
}
impl DatagramReceive for UdpSocket {
fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
UdpSocket::poll_recv(self, cx, buf)
}
@@ -45,7 +57,9 @@ impl DatagramTransport for UdpSocket {
fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.deref().poll_recv_ready(cx)
}
}
impl DatagramSend for UdpSocket {
fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
UdpSocket::poll_send(self, cx, buf)
}
@@ -57,21 +71,17 @@ impl DatagramTransport for UdpSocket {
fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.deref().poll_send_ready(cx)
}
fn local_addr(&self) -> io::Result<SocketAddr> {
self.deref().local_addr()
}
}
/// Future for `recv`
#[pin_project]
pub struct RecvFut<'a, S: DatagramTransport + ?Sized> {
pub struct RecvFut<'a, S: DatagramReceive + ?Sized> {
#[pin]
io: &'a S,
buf: &'a mut [u8],
}
impl<'a, S: DatagramTransport + ?Sized> Future for RecvFut<'a, S> {
impl<'a, S: DatagramReceive + ?Sized> Future for RecvFut<'a, S> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -85,13 +95,13 @@ impl<'a, S: DatagramTransport + ?Sized> Future for RecvFut<'a, S> {
/// Future for `recv_from`
#[pin_project]
pub struct RecvFromFut<'a, S: DatagramTransport + ?Sized> {
pub struct RecvFromFut<'a, S: DatagramReceive + ?Sized> {
#[pin]
io: &'a S,
buf: &'a mut [u8],
}
impl<'a, S: DatagramTransport + ?Sized> Future for RecvFromFut<'a, S> {
impl<'a, S: DatagramReceive + ?Sized> Future for RecvFromFut<'a, S> {
type Output = io::Result<(usize, SocketAddr)>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -104,11 +114,11 @@ impl<'a, S: DatagramTransport + ?Sized> Future for RecvFromFut<'a, S> {
}
/// Future for `recv_ready`
pub struct RecvReadyFut<'a, S: DatagramTransport + ?Sized> {
pub struct RecvReadyFut<'a, S: DatagramReceive + ?Sized> {
io: &'a S,
}
impl<'a, S: DatagramTransport + ?Sized> Future for RecvReadyFut<'a, S> {
impl<'a, S: DatagramReceive + ?Sized> Future for RecvReadyFut<'a, S> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -117,12 +127,12 @@ impl<'a, S: DatagramTransport + ?Sized> Future for RecvReadyFut<'a, S> {
}
/// Future for `send`
pub struct SendFut<'a, S: DatagramTransport + ?Sized> {
pub struct SendFut<'a, S: DatagramSend + ?Sized> {
io: &'a S,
buf: &'a [u8],
}
impl<'a, S: DatagramTransport + ?Sized> Future for SendFut<'a, S> {
impl<'a, S: DatagramSend + ?Sized> Future for SendFut<'a, S> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -131,13 +141,13 @@ impl<'a, S: DatagramTransport + ?Sized> Future for SendFut<'a, S> {
}
/// Future for `send_to`
pub struct SendToFut<'a, S: DatagramTransport + ?Sized> {
pub struct SendToFut<'a, S: DatagramSend + ?Sized> {
io: &'a S,
target: SocketAddr,
buf: &'a [u8],
}
impl<'a, S: DatagramTransport + ?Sized> Future for SendToFut<'a, S> {
impl<'a, S: DatagramSend + ?Sized> Future for SendToFut<'a, S> {
type Output = io::Result<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -146,20 +156,20 @@ impl<'a, S: DatagramTransport + ?Sized> Future for SendToFut<'a, S> {
}
/// Future for `recv_ready`
pub struct SendReadyFut<'a, S: DatagramTransport + ?Sized> {
pub struct SendReadyFut<'a, S: DatagramSend + ?Sized> {
io: &'a S,
}
impl<'a, S: DatagramTransport + ?Sized> Future for SendReadyFut<'a, S> {
impl<'a, S: DatagramSend + ?Sized> Future for SendReadyFut<'a, S> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.io.poll_recv_ready(cx)
self.io.poll_send_ready(cx)
}
}
/// Extension methods for `DatagramTransport`
pub trait DatagramTransportExt: DatagramTransport {
/// Extension methods for `DatagramReceive`
pub trait DatagramReceiveExt: DatagramReceive {
/// Async method for `poll_recv`
fn recv<'a, 'b>(&'a self, buf: &'a mut [u8]) -> RecvFut<'a, Self> {
RecvFut { io: self, buf }
@@ -174,7 +184,12 @@ pub trait DatagramTransportExt: DatagramTransport {
fn recv_ready<'a>(&'a self) -> RecvReadyFut<'a, Self> {
RecvReadyFut { io: self }
}
}
impl<S: DatagramReceive> DatagramReceiveExt for S {}
/// Extension methods for `DatagramSend`
pub trait DatagramSendExt: DatagramSend {
/// Async method for `poll_send`
fn send<'a>(&'a self, buf: &'a [u8]) -> SendFut<'a, Self> {
SendFut { io: self, buf }
@@ -191,4 +206,4 @@ pub trait DatagramTransportExt: DatagramTransport {
}
}
impl<S: DatagramTransport> DatagramTransportExt for S {}
impl<S: DatagramSend> DatagramSendExt for S {}

View File

@@ -50,7 +50,7 @@
use std::time::Duration;
pub use self::proxy_socket::ProxySocket;
pub use compat::{DatagramTransport, DatagramTransportExt};
pub use compat::{DatagramReceive, DatagramReceiveExt, DatagramSend, DatagramSendExt, DatagramSocket};
mod aead;
#[cfg(feature = "aead-cipher-2022")]

View File

@@ -23,7 +23,7 @@ use crate::{
};
use super::{
compat::{DatagramTransport, DatagramTransportExt},
compat::{DatagramReceive, DatagramReceiveExt, DatagramSend, DatagramSendExt, DatagramSocket},
crypto_io::{
decrypt_client_payload, decrypt_server_payload, encrypt_client_payload, encrypt_server_payload, ProtocolError,
ProtocolResult,
@@ -156,10 +156,7 @@ impl ProxySocket<ShadowUdpSocket> {
}
}
impl<S> ProxySocket<S>
where
S: DatagramTransport,
{
impl<S> ProxySocket<S> {
/// Create a `ProxySocket` from a I/O object that impls `DatagramTransport`
pub fn from_socket(
socket_type: UdpSocketType,
@@ -190,6 +187,21 @@ where
}
}
/// Set `send` timeout, `None` will clear timeout
pub fn set_send_timeout(&mut self, t: Option<Duration>) {
self.send_timeout = t;
}
/// Set `recv` timeout, `None` will clear timeout
pub fn set_recv_timeout(&mut self, t: Option<Duration>) {
self.recv_timeout = t;
}
}
impl<S> ProxySocket<S>
where
S: DatagramSend,
{
fn encrypt_send_buffer(
&self,
addr: &Address,
@@ -421,7 +433,12 @@ where
Ok(send_len)
}
}
impl<S> ProxySocket<S>
where
S: DatagramReceive,
{
fn decrypt_recv_buffer(
&self,
recv_buf: &mut [u8],
@@ -587,21 +604,16 @@ where
pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<ProxySocketResult<()>> {
self.io.poll_recv_ready(cx).map_err(|x| x.into())
}
}
impl<S> ProxySocket<S>
where
S: DatagramSocket,
{
/// Get local addr of socket
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.io.local_addr()
}
/// Set `send` timeout, `None` will clear timeout
pub fn set_send_timeout(&mut self, t: Option<Duration>) {
self.send_timeout = t;
}
/// Set `recv` timeout, `None` will clear timeout
pub fn set_recv_timeout(&mut self, t: Option<Duration>) {
self.recv_timeout = t;
}
}
#[cfg(unix)]