diff --git a/Cargo.lock b/Cargo.lock index b70c8098..7d0d0c84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -222,6 +222,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "cache-padded" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" + [[package]] name = "cc" version = "1.0.72" @@ -1719,6 +1725,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "ringbuf" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c60f3923939c33e6c543ddbff14d0ee6a407fcd186d560be37282559616adf3" +dependencies = [ + "cache-padded", +] + [[package]] name = "rpassword" version = "5.0.1" @@ -2105,6 +2120,7 @@ dependencies = [ "qrcode", "rand", "regex", + "ringbuf", "rustls-native-certs", "serde", "shadowsocks", diff --git a/crates/shadowsocks-service/Cargo.toml b/crates/shadowsocks-service/Cargo.toml index 7b688c72..19f24127 100644 --- a/crates/shadowsocks-service/Cargo.toml +++ b/crates/shadowsocks-service/Cargo.toml @@ -56,7 +56,7 @@ local-tunnel = ["local"] # Enable socks4 protocol for sslocal local-socks4 = ["local"] # Enable Tun interface protocol for sslocal -local-tun = ["local", "etherparse", "tun", "rand", "smoltcp"] +local-tun = ["local", "etherparse", "tun", "rand", "smoltcp", "ringbuf"] # Enable Stream Cipher Protocol # WARN: Stream Cipher Protocol is proved to be insecure @@ -95,6 +95,7 @@ bytes = "1.0" byte_string = "1.0" byteorder = "1.3" rand = { version = "0.8", optional = true } +ringbuf = { version = "0.2.6", optional = true } futures = "0.3" tokio = { version = "1.5", features = ["io-util", "macros", "net", "parking_lot", "rt", "sync", "time"] } diff --git a/crates/shadowsocks-service/src/local/tun/tcp.rs b/crates/shadowsocks-service/src/local/tun/tcp.rs index 864ffb05..a707a79d 100644 --- a/crates/shadowsocks-service/src/local/tun/tcp.rs +++ b/crates/shadowsocks-service/src/local/tun/tcp.rs @@ -14,12 +14,12 @@ use std::{ }; use log::{error, trace}; +use ringbuf::{Consumer as RingBufferConsumer, Producer as RingBufferProducer, RingBuffer}; use shadowsocks::{net::TcpSocketOpts, relay::socks5::Address}; use smoltcp::{ iface::{Interface, InterfaceBuilder, Routes, SocketHandle}, phy::{DeviceCapabilities, Medium}, socket::{TcpSocket, TcpSocketBuffer, TcpState}, - storage::RingBuffer, time::{Duration as SmolDuration, Instant as SmolInstant}, wire::{IpAddress, IpCidr, Ipv4Address, Ipv6Address, TcpPacket}, }; @@ -43,11 +43,9 @@ const DEFAULT_TCP_SEND_BUFFER_SIZE: u32 = 0x3FFF * 20; const DEFAULT_TCP_RECV_BUFFER_SIZE: u32 = 0x3FFF * 20; struct TcpSocketControl { - send_buffer: RingBuffer<'static, u8>, - send_waker: Option, - recv_buffer: RingBuffer<'static, u8>, - recv_waker: Option, - is_closed: bool, + send_waker: SpinMutex>, + recv_waker: SpinMutex>, + is_closed: AtomicBool, } struct ManagerNotify { @@ -66,26 +64,33 @@ impl ManagerNotify { struct TcpSocketManager { iface: Interface<'static, VirtTunDevice>, - sockets: HashMap, + sockets: HashMap, socket_creation_rx: mpsc::UnboundedReceiver, } -type SharedTcpConnectionControl = Arc>; +type SharedTcpConnectionControl = Arc; + +struct TcpSocketControlBlock { + control: SharedTcpConnectionControl, + send_buffer_consumer: RingBufferConsumer, + recv_buffer_producer: RingBufferProducer, +} struct TcpSocketCreation { - control: SharedTcpConnectionControl, + control_block: TcpSocketControlBlock, socket: TcpSocket<'static>, } struct TcpConnection { control: SharedTcpConnectionControl, manager_notify: Arc, + send_buffer_producer: RingBufferProducer, + recv_buffer_consumer: RingBufferConsumer, } impl Drop for TcpConnection { fn drop(&mut self) { - let mut control = self.control.lock(); - control.is_closed = true; + self.control.is_closed.store(true, Ordering::Release); } } @@ -99,82 +104,123 @@ impl TcpConnection { let send_buffer_size = tcp_opts.send_buffer_size.unwrap_or(DEFAULT_TCP_SEND_BUFFER_SIZE); let recv_buffer_size = tcp_opts.recv_buffer_size.unwrap_or(DEFAULT_TCP_RECV_BUFFER_SIZE); - let control = Arc::new(SpinMutex::new(TcpSocketControl { - send_buffer: RingBuffer::new(vec![0u8; send_buffer_size as usize]), - send_waker: None, - recv_buffer: RingBuffer::new(vec![0u8; recv_buffer_size as usize]), - recv_waker: None, - is_closed: false, - })); + let send_buffer = RingBuffer::new(send_buffer_size as usize); + let recv_buffer = RingBuffer::new(recv_buffer_size as usize); - let _ = socket_creation_tx.send(TcpSocketCreation { - control: control.clone(), - socket, + let (send_buffer_producer, send_buffer_consumer) = send_buffer.split(); + let (recv_buffer_producer, recv_buffer_consumer) = recv_buffer.split(); + + let control = Arc::new(TcpSocketControl { + send_waker: SpinMutex::new(None), + recv_waker: SpinMutex::new(None), + is_closed: AtomicBool::new(false), }); + let control_block = TcpSocketControlBlock { + control: control.clone(), + send_buffer_consumer, + recv_buffer_producer, + }; + + let _ = socket_creation_tx.send(TcpSocketCreation { control_block, socket }); + TcpConnection { control, manager_notify, + send_buffer_producer, + recv_buffer_consumer, } } } impl AsyncRead for TcpConnection { - fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - let mut control = self.control.lock(); + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let TcpConnection { + ref control, + ref manager_notify, + ref mut recv_buffer_consumer, + .. + } = *self; // If socket is already closed, just return EOF directly. - if control.is_closed { + if control.is_closed.load(Ordering::Acquire) { return Ok(()).into(); } // Read from buffer - if control.recv_buffer.is_empty() { + let recv_buf = unsafe { mem::transmute::<_, &mut [u8]>(buf.unfilled_mut()) }; + let mut n = recv_buffer_consumer.pop_slice(recv_buf); + if n == 0 { // Nothing could be read. Wait for notify. - if let Some(old_waker) = control.recv_waker.replace(cx.waker().clone()) { - if !old_waker.will_wake(cx.waker()) { - old_waker.wake(); - } + let mut recv_waker = control.recv_waker.lock(); + + // Double check. + if control.is_closed.load(Ordering::Acquire) { + return Ok(()).into(); } - return Poll::Pending; + n = recv_buffer_consumer.pop_slice(recv_buf); + if n == 0 { + if let Some(old_waker) = recv_waker.replace(cx.waker().clone()) { + if !old_waker.will_wake(cx.waker()) { + old_waker.wake(); + } + } + + return Poll::Pending; + } } - let recv_buf = unsafe { mem::transmute::<_, &mut [u8]>(buf.unfilled_mut()) }; - let n = control.recv_buffer.dequeue_slice(recv_buf); buf.advance(n); - if control.recv_buffer.is_empty() { - self.manager_notify.notify(); + if recv_buffer_consumer.is_empty() { + manager_notify.notify(); } Ok(()).into() } } impl AsyncWrite for TcpConnection { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - let mut control = self.control.lock(); - if control.is_closed { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + let TcpConnection { + ref control, + ref manager_notify, + ref mut send_buffer_producer, + .. + } = *self; + + // If socket is closed. EPIPE. + if control.is_closed.load(Ordering::Acquire) { return Err(io::ErrorKind::BrokenPipe.into()).into(); } // Write to buffer - if control.send_buffer.is_full() { - if let Some(old_waker) = control.send_waker.replace(cx.waker().clone()) { - if !old_waker.will_wake(cx.waker()) { - old_waker.wake(); - } + let mut n = send_buffer_producer.push_slice(buf); + if n == 0 { + // Nothing could be write. Wait for notify. + let mut send_waker = control.send_waker.lock(); + + // Double check. + if control.is_closed.load(Ordering::Acquire) { + return Err(io::ErrorKind::BrokenPipe.into()).into(); } - return Poll::Pending; + n = send_buffer_producer.push_slice(buf); + if n == 0 { + if let Some(old_waker) = send_waker.replace(cx.waker().clone()) { + if !old_waker.will_wake(cx.waker()) { + old_waker.wake(); + } + } + + return Poll::Pending; + } } - let n = control.send_buffer.enqueue_slice(buf); - - if control.send_buffer.is_full() { - self.manager_notify.notify(); + if send_buffer_producer.is_full() { + manager_notify.notify(); } Ok(n).into() } @@ -183,21 +229,13 @@ impl AsyncWrite for TcpConnection { Ok(()).into() } - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut control = self.control.lock(); - - if control.is_closed { + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if self.control.is_closed.load(Ordering::Acquire) { return Ok(()).into(); } - control.is_closed = true; - if let Some(old_waker) = control.send_waker.replace(cx.waker().clone()) { - if !old_waker.will_wake(cx.waker()) { - old_waker.wake(); - } - } - - Poll::Pending + self.control.is_closed.store(true, Ordering::Release); + Ok(()).into() } } @@ -266,9 +304,9 @@ impl TcpTun { } = manager; while manager_running.load(Ordering::Relaxed) { - while let Ok(TcpSocketCreation { control, socket }) = socket_creation_rx.try_recv() { + while let Ok(TcpSocketCreation { control_block, socket }) = socket_creation_rx.try_recv() { let handle = iface.add_socket(socket); - sockets.insert(handle, control); + sockets.insert(handle, control_block); } let before_poll = SmolInstant::now(); @@ -287,29 +325,34 @@ impl TcpTun { // Check all the sockets' status let mut sockets_to_remove = Vec::new(); - for (socket_handle, control) in sockets.iter() { + for (socket_handle, control_block) in sockets.iter_mut() { let socket_handle = socket_handle.clone(); let socket = iface.get_socket::(socket_handle); - let mut control = control.lock(); + + let TcpSocketControlBlock { + control, + send_buffer_consumer, + recv_buffer_producer, + } = control_block; #[inline] - fn close_socket_control(control: &mut TcpSocketControl) { - control.is_closed = true; - if let Some(waker) = control.send_waker.take() { + fn close_socket_control(control: &TcpSocketControl) { + control.is_closed.store(true, Ordering::Release); + if let Some(waker) = control.send_waker.lock().take() { waker.wake(); } - if let Some(waker) = control.recv_waker.take() { + if let Some(waker) = control.recv_waker.lock().take() { waker.wake(); } } if !socket.is_open() || socket.state() == TcpState::Closed { sockets_to_remove.push(socket_handle); - close_socket_control(&mut *control); + close_socket_control(control); continue; } - if control.is_closed { + if control.is_closed.load(Ordering::Acquire) { // Close the socket. socket.close(); // sockets_to_remove.push(socket_handle); @@ -319,54 +362,56 @@ impl TcpTun { // Check if readable let mut has_received = false; - while socket.can_recv() && !control.recv_buffer.is_full() { + while socket.can_recv() && !recv_buffer_producer.is_full() { let result = socket.recv(|buffer| { - let n = control.recv_buffer.enqueue_slice(buffer); - (n, ()) + let n = recv_buffer_producer.push_slice(buffer); + (n, n) }); match result { - Ok(..) => { - has_received = true; + Ok(n) => { + has_received = n != 0; } Err(err) => { error!("socket recv error: {}", err); sockets_to_remove.push(socket_handle); - close_socket_control(&mut *control); + close_socket_control(control); break; } } } - if has_received && control.recv_waker.is_some() { - if let Some(waker) = control.recv_waker.take() { + if has_received { + let mut recv_waker = control.recv_waker.lock(); + if let Some(waker) = recv_waker.take() { waker.wake(); } } // Check if writable let mut has_sent = false; - while socket.can_send() && !control.send_buffer.is_empty() { + while socket.can_send() && !send_buffer_consumer.is_empty() { let result = socket.send(|buffer| { - let n = control.send_buffer.dequeue_slice(buffer); - (n, ()) + let n = send_buffer_consumer.pop_slice(buffer); + (n, n) }); match result { - Ok(..) => { - has_sent = true; + Ok(n) => { + has_sent = n != 0; } Err(err) => { error!("socket send error: {}", err); sockets_to_remove.push(socket_handle); - close_socket_control(&mut *control); + close_socket_control(control); break; } } } - if has_sent && control.send_waker.is_some() { - if let Some(waker) = control.send_waker.take() { + if has_sent { + let mut send_waker = control.send_waker.lock(); + if let Some(waker) = send_waker.take() { waker.wake(); } }