tun tcp send/recv buffer lock-free

This commit is contained in:
zonyitoo
2022-02-23 18:05:25 +08:00
parent 66e192e894
commit 36916c7830
3 changed files with 148 additions and 86 deletions

16
Cargo.lock generated
View File

@@ -222,6 +222,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "cache-padded"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c"
[[package]]
name = "cc"
version = "1.0.72"
@@ -1719,6 +1725,15 @@ dependencies = [
"zeroize",
]
[[package]]
name = "ringbuf"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c60f3923939c33e6c543ddbff14d0ee6a407fcd186d560be37282559616adf3"
dependencies = [
"cache-padded",
]
[[package]]
name = "rpassword"
version = "5.0.1"
@@ -2105,6 +2120,7 @@ dependencies = [
"qrcode",
"rand",
"regex",
"ringbuf",
"rustls-native-certs",
"serde",
"shadowsocks",

View File

@@ -56,7 +56,7 @@ local-tunnel = ["local"]
# Enable socks4 protocol for sslocal
local-socks4 = ["local"]
# Enable Tun interface protocol for sslocal
local-tun = ["local", "etherparse", "tun", "rand", "smoltcp"]
local-tun = ["local", "etherparse", "tun", "rand", "smoltcp", "ringbuf"]
# Enable Stream Cipher Protocol
# WARN: Stream Cipher Protocol is proved to be insecure
@@ -95,6 +95,7 @@ bytes = "1.0"
byte_string = "1.0"
byteorder = "1.3"
rand = { version = "0.8", optional = true }
ringbuf = { version = "0.2.6", optional = true }
futures = "0.3"
tokio = { version = "1.5", features = ["io-util", "macros", "net", "parking_lot", "rt", "sync", "time"] }

View File

@@ -14,12 +14,12 @@ use std::{
};
use log::{error, trace};
use ringbuf::{Consumer as RingBufferConsumer, Producer as RingBufferProducer, RingBuffer};
use shadowsocks::{net::TcpSocketOpts, relay::socks5::Address};
use smoltcp::{
iface::{Interface, InterfaceBuilder, Routes, SocketHandle},
phy::{DeviceCapabilities, Medium},
socket::{TcpSocket, TcpSocketBuffer, TcpState},
storage::RingBuffer,
time::{Duration as SmolDuration, Instant as SmolInstant},
wire::{IpAddress, IpCidr, Ipv4Address, Ipv6Address, TcpPacket},
};
@@ -43,11 +43,9 @@ const DEFAULT_TCP_SEND_BUFFER_SIZE: u32 = 0x3FFF * 20;
const DEFAULT_TCP_RECV_BUFFER_SIZE: u32 = 0x3FFF * 20;
struct TcpSocketControl {
send_buffer: RingBuffer<'static, u8>,
send_waker: Option<Waker>,
recv_buffer: RingBuffer<'static, u8>,
recv_waker: Option<Waker>,
is_closed: bool,
send_waker: SpinMutex<Option<Waker>>,
recv_waker: SpinMutex<Option<Waker>>,
is_closed: AtomicBool,
}
struct ManagerNotify {
@@ -66,26 +64,33 @@ impl ManagerNotify {
struct TcpSocketManager {
iface: Interface<'static, VirtTunDevice>,
sockets: HashMap<SocketHandle, SharedTcpConnectionControl>,
sockets: HashMap<SocketHandle, TcpSocketControlBlock>,
socket_creation_rx: mpsc::UnboundedReceiver<TcpSocketCreation>,
}
type SharedTcpConnectionControl = Arc<SpinMutex<TcpSocketControl>>;
type SharedTcpConnectionControl = Arc<TcpSocketControl>;
struct TcpSocketControlBlock {
control: SharedTcpConnectionControl,
send_buffer_consumer: RingBufferConsumer<u8>,
recv_buffer_producer: RingBufferProducer<u8>,
}
struct TcpSocketCreation {
control: SharedTcpConnectionControl,
control_block: TcpSocketControlBlock,
socket: TcpSocket<'static>,
}
struct TcpConnection {
control: SharedTcpConnectionControl,
manager_notify: Arc<ManagerNotify>,
send_buffer_producer: RingBufferProducer<u8>,
recv_buffer_consumer: RingBufferConsumer<u8>,
}
impl Drop for TcpConnection {
fn drop(&mut self) {
let mut control = self.control.lock();
control.is_closed = true;
self.control.is_closed.store(true, Ordering::Release);
}
}
@@ -99,82 +104,123 @@ impl TcpConnection {
let send_buffer_size = tcp_opts.send_buffer_size.unwrap_or(DEFAULT_TCP_SEND_BUFFER_SIZE);
let recv_buffer_size = tcp_opts.recv_buffer_size.unwrap_or(DEFAULT_TCP_RECV_BUFFER_SIZE);
let control = Arc::new(SpinMutex::new(TcpSocketControl {
send_buffer: RingBuffer::new(vec![0u8; send_buffer_size as usize]),
send_waker: None,
recv_buffer: RingBuffer::new(vec![0u8; recv_buffer_size as usize]),
recv_waker: None,
is_closed: false,
}));
let send_buffer = RingBuffer::new(send_buffer_size as usize);
let recv_buffer = RingBuffer::new(recv_buffer_size as usize);
let _ = socket_creation_tx.send(TcpSocketCreation {
control: control.clone(),
socket,
let (send_buffer_producer, send_buffer_consumer) = send_buffer.split();
let (recv_buffer_producer, recv_buffer_consumer) = recv_buffer.split();
let control = Arc::new(TcpSocketControl {
send_waker: SpinMutex::new(None),
recv_waker: SpinMutex::new(None),
is_closed: AtomicBool::new(false),
});
let control_block = TcpSocketControlBlock {
control: control.clone(),
send_buffer_consumer,
recv_buffer_producer,
};
let _ = socket_creation_tx.send(TcpSocketCreation { control_block, socket });
TcpConnection {
control,
manager_notify,
send_buffer_producer,
recv_buffer_consumer,
}
}
}
impl AsyncRead for TcpConnection {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
let mut control = self.control.lock();
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
let TcpConnection {
ref control,
ref manager_notify,
ref mut recv_buffer_consumer,
..
} = *self;
// If socket is already closed, just return EOF directly.
if control.is_closed {
if control.is_closed.load(Ordering::Acquire) {
return Ok(()).into();
}
// Read from buffer
if control.recv_buffer.is_empty() {
let recv_buf = unsafe { mem::transmute::<_, &mut [u8]>(buf.unfilled_mut()) };
let mut n = recv_buffer_consumer.pop_slice(recv_buf);
if n == 0 {
// Nothing could be read. Wait for notify.
if let Some(old_waker) = control.recv_waker.replace(cx.waker().clone()) {
if !old_waker.will_wake(cx.waker()) {
old_waker.wake();
}
let mut recv_waker = control.recv_waker.lock();
// Double check.
if control.is_closed.load(Ordering::Acquire) {
return Ok(()).into();
}
return Poll::Pending;
n = recv_buffer_consumer.pop_slice(recv_buf);
if n == 0 {
if let Some(old_waker) = recv_waker.replace(cx.waker().clone()) {
if !old_waker.will_wake(cx.waker()) {
old_waker.wake();
}
}
return Poll::Pending;
}
}
let recv_buf = unsafe { mem::transmute::<_, &mut [u8]>(buf.unfilled_mut()) };
let n = control.recv_buffer.dequeue_slice(recv_buf);
buf.advance(n);
if control.recv_buffer.is_empty() {
self.manager_notify.notify();
if recv_buffer_consumer.is_empty() {
manager_notify.notify();
}
Ok(()).into()
}
}
impl AsyncWrite for TcpConnection {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
let mut control = self.control.lock();
if control.is_closed {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
let TcpConnection {
ref control,
ref manager_notify,
ref mut send_buffer_producer,
..
} = *self;
// If socket is closed. EPIPE.
if control.is_closed.load(Ordering::Acquire) {
return Err(io::ErrorKind::BrokenPipe.into()).into();
}
// Write to buffer
if control.send_buffer.is_full() {
if let Some(old_waker) = control.send_waker.replace(cx.waker().clone()) {
if !old_waker.will_wake(cx.waker()) {
old_waker.wake();
}
let mut n = send_buffer_producer.push_slice(buf);
if n == 0 {
// Nothing could be write. Wait for notify.
let mut send_waker = control.send_waker.lock();
// Double check.
if control.is_closed.load(Ordering::Acquire) {
return Err(io::ErrorKind::BrokenPipe.into()).into();
}
return Poll::Pending;
n = send_buffer_producer.push_slice(buf);
if n == 0 {
if let Some(old_waker) = send_waker.replace(cx.waker().clone()) {
if !old_waker.will_wake(cx.waker()) {
old_waker.wake();
}
}
return Poll::Pending;
}
}
let n = control.send_buffer.enqueue_slice(buf);
if control.send_buffer.is_full() {
self.manager_notify.notify();
if send_buffer_producer.is_full() {
manager_notify.notify();
}
Ok(n).into()
}
@@ -183,21 +229,13 @@ impl AsyncWrite for TcpConnection {
Ok(()).into()
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut control = self.control.lock();
if control.is_closed {
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if self.control.is_closed.load(Ordering::Acquire) {
return Ok(()).into();
}
control.is_closed = true;
if let Some(old_waker) = control.send_waker.replace(cx.waker().clone()) {
if !old_waker.will_wake(cx.waker()) {
old_waker.wake();
}
}
Poll::Pending
self.control.is_closed.store(true, Ordering::Release);
Ok(()).into()
}
}
@@ -266,9 +304,9 @@ impl TcpTun {
} = manager;
while manager_running.load(Ordering::Relaxed) {
while let Ok(TcpSocketCreation { control, socket }) = socket_creation_rx.try_recv() {
while let Ok(TcpSocketCreation { control_block, socket }) = socket_creation_rx.try_recv() {
let handle = iface.add_socket(socket);
sockets.insert(handle, control);
sockets.insert(handle, control_block);
}
let before_poll = SmolInstant::now();
@@ -287,29 +325,34 @@ impl TcpTun {
// Check all the sockets' status
let mut sockets_to_remove = Vec::new();
for (socket_handle, control) in sockets.iter() {
for (socket_handle, control_block) in sockets.iter_mut() {
let socket_handle = socket_handle.clone();
let socket = iface.get_socket::<TcpSocket>(socket_handle);
let mut control = control.lock();
let TcpSocketControlBlock {
control,
send_buffer_consumer,
recv_buffer_producer,
} = control_block;
#[inline]
fn close_socket_control(control: &mut TcpSocketControl) {
control.is_closed = true;
if let Some(waker) = control.send_waker.take() {
fn close_socket_control(control: &TcpSocketControl) {
control.is_closed.store(true, Ordering::Release);
if let Some(waker) = control.send_waker.lock().take() {
waker.wake();
}
if let Some(waker) = control.recv_waker.take() {
if let Some(waker) = control.recv_waker.lock().take() {
waker.wake();
}
}
if !socket.is_open() || socket.state() == TcpState::Closed {
sockets_to_remove.push(socket_handle);
close_socket_control(&mut *control);
close_socket_control(control);
continue;
}
if control.is_closed {
if control.is_closed.load(Ordering::Acquire) {
// Close the socket.
socket.close();
// sockets_to_remove.push(socket_handle);
@@ -319,54 +362,56 @@ impl TcpTun {
// Check if readable
let mut has_received = false;
while socket.can_recv() && !control.recv_buffer.is_full() {
while socket.can_recv() && !recv_buffer_producer.is_full() {
let result = socket.recv(|buffer| {
let n = control.recv_buffer.enqueue_slice(buffer);
(n, ())
let n = recv_buffer_producer.push_slice(buffer);
(n, n)
});
match result {
Ok(..) => {
has_received = true;
Ok(n) => {
has_received = n != 0;
}
Err(err) => {
error!("socket recv error: {}", err);
sockets_to_remove.push(socket_handle);
close_socket_control(&mut *control);
close_socket_control(control);
break;
}
}
}
if has_received && control.recv_waker.is_some() {
if let Some(waker) = control.recv_waker.take() {
if has_received {
let mut recv_waker = control.recv_waker.lock();
if let Some(waker) = recv_waker.take() {
waker.wake();
}
}
// Check if writable
let mut has_sent = false;
while socket.can_send() && !control.send_buffer.is_empty() {
while socket.can_send() && !send_buffer_consumer.is_empty() {
let result = socket.send(|buffer| {
let n = control.send_buffer.dequeue_slice(buffer);
(n, ())
let n = send_buffer_consumer.pop_slice(buffer);
(n, n)
});
match result {
Ok(..) => {
has_sent = true;
Ok(n) => {
has_sent = n != 0;
}
Err(err) => {
error!("socket send error: {}", err);
sockets_to_remove.push(socket_handle);
close_socket_control(&mut *control);
close_socket_control(control);
break;
}
}
}
if has_sent && control.send_waker.is_some() {
if let Some(waker) = control.send_waker.take() {
if has_sent {
let mut send_waker = control.send_waker.lock();
if let Some(waker) = send_waker.take() {
waker.wake();
}
}