feat(local-tun): Buffer received from tun make reuseable

This commit is contained in:
zonyitoo
2025-03-26 21:35:26 +08:00
parent 95102db279
commit c77e77ff2a
3 changed files with 103 additions and 22 deletions

View File

@@ -4,6 +4,7 @@
use std::os::unix::io::RawFd;
use std::{
io::{self, ErrorKind},
mem,
net::{IpAddr, SocketAddr},
sync::Arc,
time::Duration,
@@ -41,7 +42,7 @@ cfg_if! {
use crate::local::{context::ServiceContext, loadbalancing::PingBalancer};
use self::{ip_packet::IpPacket, tcp::TcpTun, udp::UdpTun};
use self::{ip_packet::IpPacket, tcp::TcpTun, udp::UdpTun, virt_device::TokenBuffer};
mod ip_packet;
mod tcp;
@@ -191,7 +192,16 @@ impl Tun {
let address_broadcast = address_net.broadcast();
let mut packet_buffer = vec![0u8; 65536].into_boxed_slice();
let create_packet_buffer = || {
const PACKET_BUFFER_SIZE: usize = 65536;
let mut packet_buffer = TokenBuffer::with_capacity(PACKET_BUFFER_SIZE);
unsafe {
packet_buffer.set_len(PACKET_BUFFER_SIZE);
}
packet_buffer
};
let mut packet_buffer = create_packet_buffer();
let mut udp_cleanup_timer = time::interval(self.udp_cleanup_interval);
loop {
@@ -200,10 +210,14 @@ impl Tun {
n = self.device.read(&mut packet_buffer) => {
let n = n?;
let packet = &mut packet_buffer[..n];
trace!("[TUN] received IP packet {:?}", ByteStr::new(packet));
let mut packet_buffer = mem::replace(&mut packet_buffer, create_packet_buffer());
unsafe {
packet_buffer.set_len(n);
}
if let Err(err) = self.handle_tun_frame(&address_broadcast, packet).await {
trace!("[TUN] received IP packet {:?}", ByteStr::new(&packet_buffer));
if let Err(err) = self.handle_tun_frame(&address_broadcast, packet_buffer).await {
error!("[TUN] handle IP frame failed, error: {}", err);
}
}
@@ -254,11 +268,15 @@ impl Tun {
}
}
async fn handle_tun_frame(&mut self, device_broadcast_addr: &IpAddr, frame: &[u8]) -> smoltcp::wire::Result<()> {
let packet = match IpPacket::new_checked(frame)? {
async fn handle_tun_frame(
&mut self,
device_broadcast_addr: &IpAddr,
frame: TokenBuffer,
) -> smoltcp::wire::Result<()> {
let packet = match IpPacket::new_checked(frame.as_ref())? {
Some(packet) => packet,
None => {
warn!("unrecognized IP packet {:?}", ByteStr::new(frame));
warn!("unrecognized IP packet {:?}", ByteStr::new(&frame));
return Ok(());
}
};

View File

@@ -40,7 +40,7 @@ use crate::{
net::utils::to_ipv4_mapped,
};
use super::virt_device::VirtTunDevice;
use super::virt_device::{TokenBuffer, VirtTunDevice};
// NOTE: Default buffer could contain 5 AEAD packets
const DEFAULT_TCP_SEND_BUFFER_SIZE: u32 = (0x3FFFu32 * 5).next_power_of_two();
@@ -243,8 +243,8 @@ pub struct TcpTun {
manager_socket_creation_tx: mpsc::UnboundedSender<TcpSocketCreation>,
manager_running: Arc<AtomicBool>,
balancer: PingBalancer,
iface_rx: mpsc::UnboundedReceiver<Vec<u8>>,
iface_tx: mpsc::UnboundedSender<Vec<u8>>,
iface_rx: mpsc::UnboundedReceiver<TokenBuffer>,
iface_tx: mpsc::UnboundedSender<TokenBuffer>,
iface_tx_avail: Arc<AtomicBool>,
}
@@ -551,8 +551,8 @@ impl TcpTun {
Ok(())
}
pub async fn drive_interface_state(&mut self, frame: &[u8]) {
if self.iface_tx.send(frame.to_vec()).is_err() {
pub async fn drive_interface_state(&mut self, frame: TokenBuffer) {
if self.iface_tx.send(frame).is_err() {
panic!("interface send channel closed unexpectedly");
}
@@ -561,7 +561,7 @@ impl TcpTun {
self.manager_notify.notify();
}
pub async fn recv_packet(&mut self) -> Vec<u8> {
pub async fn recv_packet(&mut self) -> TokenBuffer {
match self.iface_rx.recv().await {
Some(v) => v,
None => unreachable!("channel closed unexpectedly"),

View File

@@ -2,12 +2,16 @@
use std::{
marker::PhantomData,
mem,
ops::{Deref, DerefMut},
sync::{
Arc,
Arc, Mutex,
atomic::{AtomicBool, Ordering},
},
};
use bytes::BytesMut;
use once_cell::sync::Lazy;
use smoltcp::{
phy::{self, Device, DeviceCapabilities},
time::Instant,
@@ -16,8 +20,8 @@ use tokio::sync::mpsc;
pub struct VirtTunDevice {
capabilities: DeviceCapabilities,
in_buf: mpsc::UnboundedReceiver<Vec<u8>>,
out_buf: mpsc::UnboundedSender<Vec<u8>>,
in_buf: mpsc::UnboundedReceiver<TokenBuffer>,
out_buf: mpsc::UnboundedSender<TokenBuffer>,
in_buf_avail: Arc<AtomicBool>,
}
@@ -27,8 +31,8 @@ impl VirtTunDevice {
capabilities: DeviceCapabilities,
) -> (
Self,
mpsc::UnboundedReceiver<Vec<u8>>,
mpsc::UnboundedSender<Vec<u8>>,
mpsc::UnboundedReceiver<TokenBuffer>,
mpsc::UnboundedSender<TokenBuffer>,
Arc<AtomicBool>,
) {
let (iface_tx, iface_output) = mpsc::unbounded_channel();
@@ -64,7 +68,6 @@ impl Device for VirtTunDevice {
buffer,
phantom_device: PhantomData,
};
self.in_buf_avail.store(true, Ordering::Release);
let tx = VirtTxToken(self);
return Some((rx, tx));
}
@@ -82,7 +85,7 @@ impl Device for VirtTunDevice {
}
pub struct VirtRxToken<'a> {
buffer: Vec<u8>,
buffer: TokenBuffer,
phantom_device: PhantomData<&'a VirtTunDevice>,
}
@@ -102,9 +105,69 @@ impl phy::TxToken for VirtTxToken<'_> {
where
F: FnOnce(&mut [u8]) -> R,
{
let mut buffer = vec![0u8; len];
let mut buffer = TokenBuffer::new();
buffer.reserve(len);
unsafe {
buffer.set_len(len);
}
let result = f(&mut buffer);
self.0.out_buf.send(buffer).expect("channel closed unexpectly");
result
}
}
// Maximun number of TokenBuffer cached globally.
//
// Each of them has capacity 65536 (defined in tun/mod.rs), so 64 * 65536 = 4MB.
const TOKEN_BUFFER_LIST_MAX_SIZE: usize = 64;
static TOKEN_BUFFER_LIST: Lazy<Mutex<Vec<BytesMut>>> = Lazy::new(|| Mutex::new(Vec::new()));
pub struct TokenBuffer {
buffer: BytesMut,
}
impl Drop for TokenBuffer {
fn drop(&mut self) {
let mut list = TOKEN_BUFFER_LIST.lock().unwrap();
if list.len() >= TOKEN_BUFFER_LIST_MAX_SIZE {
return;
}
let empty_buffer = BytesMut::new();
let mut buffer = mem::replace(&mut self.buffer, empty_buffer);
buffer.clear();
list.push(buffer);
}
}
impl TokenBuffer {
pub fn new() -> TokenBuffer {
TokenBuffer::with_capacity(0)
}
pub fn with_capacity(cap: usize) -> TokenBuffer {
let mut list = TOKEN_BUFFER_LIST.lock().unwrap();
if let Some(buffer) = list.pop() {
return TokenBuffer { buffer };
}
TokenBuffer {
buffer: BytesMut::with_capacity(cap),
}
}
}
impl Deref for TokenBuffer {
type Target = BytesMut;
fn deref(&self) -> &Self::Target {
&self.buffer
}
}
impl DerefMut for TokenBuffer {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.buffer
}
}