mirror of
https://github.com/shadowsocks/shadowsocks-rust.git
synced 2026-02-09 01:59:16 +08:00
feat: tun smoltcp thread given a name
This commit is contained in:
@@ -295,181 +295,184 @@ impl TcpTun {
|
||||
let manager_handle = {
|
||||
let manager_running = manager_running.clone();
|
||||
|
||||
thread::spawn(move || {
|
||||
let TcpSocketManager {
|
||||
ref mut device,
|
||||
ref mut iface,
|
||||
ref mut sockets,
|
||||
ref mut socket_creation_rx,
|
||||
..
|
||||
} = manager;
|
||||
thread::Builder::new()
|
||||
.name("smoltcp-poll".to_owned())
|
||||
.spawn(move || {
|
||||
let TcpSocketManager {
|
||||
ref mut device,
|
||||
ref mut iface,
|
||||
ref mut sockets,
|
||||
ref mut socket_creation_rx,
|
||||
..
|
||||
} = manager;
|
||||
|
||||
let mut socket_set = SocketSet::new(vec![]);
|
||||
let mut socket_set = SocketSet::new(vec![]);
|
||||
|
||||
while manager_running.load(Ordering::Relaxed) {
|
||||
while let Ok(TcpSocketCreation {
|
||||
control,
|
||||
socket,
|
||||
socket_created_tx: socket_create_tx,
|
||||
}) = socket_creation_rx.try_recv()
|
||||
{
|
||||
let handle = socket_set.add(socket);
|
||||
let _ = socket_create_tx.send(());
|
||||
sockets.insert(handle, control);
|
||||
}
|
||||
|
||||
let before_poll = SmolInstant::now();
|
||||
let updated_sockets = iface.poll(before_poll, device, &mut socket_set);
|
||||
|
||||
if updated_sockets {
|
||||
trace!("VirtDevice::poll costed {}", SmolInstant::now() - before_poll);
|
||||
}
|
||||
|
||||
// Check all the sockets' status
|
||||
let mut sockets_to_remove = Vec::new();
|
||||
|
||||
for (socket_handle, control) in sockets.iter() {
|
||||
let socket_handle = *socket_handle;
|
||||
let socket = socket_set.get_mut::<TcpSocket>(socket_handle);
|
||||
let mut control = control.lock();
|
||||
|
||||
// Remove the socket only when it is in the closed state.
|
||||
if socket.state() == TcpState::Closed {
|
||||
sockets_to_remove.push(socket_handle);
|
||||
|
||||
control.send_state = TcpSocketState::Closed;
|
||||
control.recv_state = TcpSocketState::Closed;
|
||||
|
||||
if let Some(waker) = control.send_waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
if let Some(waker) = control.recv_waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
|
||||
trace!("closed TCP connection");
|
||||
continue;
|
||||
}
|
||||
|
||||
// SHUT_WR
|
||||
if matches!(control.send_state, TcpSocketState::Close) {
|
||||
trace!("closing TCP Write Half, {:?}", socket.state());
|
||||
|
||||
// Close the socket. Set to FIN state
|
||||
socket.close();
|
||||
control.send_state = TcpSocketState::Closing;
|
||||
|
||||
// We can still process the pending buffer.
|
||||
}
|
||||
|
||||
// Check if readable
|
||||
let mut wake_receiver = false;
|
||||
while socket.can_recv() && !control.recv_buffer.is_full() {
|
||||
let result = socket.recv(|buffer| {
|
||||
let n = control.recv_buffer.enqueue_slice(buffer);
|
||||
(n, ())
|
||||
});
|
||||
|
||||
match result {
|
||||
Ok(..) => {
|
||||
wake_receiver = true;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("socket recv error: {:?}, {:?}", err, socket.state());
|
||||
|
||||
// Don't know why. Abort the connection.
|
||||
socket.abort();
|
||||
|
||||
if matches!(control.recv_state, TcpSocketState::Normal) {
|
||||
control.recv_state = TcpSocketState::Closed;
|
||||
}
|
||||
wake_receiver = true;
|
||||
|
||||
// The socket will be recycled in the next poll.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If socket is not in ESTABLISH, FIN-WAIT-1, FIN-WAIT-2,
|
||||
// the local client have closed our receiver.
|
||||
if matches!(control.recv_state, TcpSocketState::Normal)
|
||||
&& !socket.may_recv()
|
||||
&& !matches!(
|
||||
socket.state(),
|
||||
TcpState::Listen
|
||||
| TcpState::SynReceived
|
||||
| TcpState::Established
|
||||
| TcpState::FinWait1
|
||||
| TcpState::FinWait2
|
||||
)
|
||||
while manager_running.load(Ordering::Relaxed) {
|
||||
while let Ok(TcpSocketCreation {
|
||||
control,
|
||||
socket,
|
||||
socket_created_tx: socket_create_tx,
|
||||
}) = socket_creation_rx.try_recv()
|
||||
{
|
||||
trace!("closed TCP Read Half, {:?}", socket.state());
|
||||
|
||||
// Let TcpConnection::poll_read returns EOF.
|
||||
control.recv_state = TcpSocketState::Closed;
|
||||
wake_receiver = true;
|
||||
let handle = socket_set.add(socket);
|
||||
let _ = socket_create_tx.send(());
|
||||
sockets.insert(handle, control);
|
||||
}
|
||||
|
||||
if wake_receiver && control.recv_waker.is_some() {
|
||||
if let Some(waker) = control.recv_waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
let before_poll = SmolInstant::now();
|
||||
let updated_sockets = iface.poll(before_poll, device, &mut socket_set);
|
||||
|
||||
if updated_sockets {
|
||||
trace!("VirtDevice::poll costed {}", SmolInstant::now() - before_poll);
|
||||
}
|
||||
|
||||
// Check if writable
|
||||
let mut wake_sender = false;
|
||||
while socket.can_send() && !control.send_buffer.is_empty() {
|
||||
let result = socket.send(|buffer| {
|
||||
let n = control.send_buffer.dequeue_slice(buffer);
|
||||
(n, ())
|
||||
});
|
||||
// Check all the sockets' status
|
||||
let mut sockets_to_remove = Vec::new();
|
||||
|
||||
match result {
|
||||
Ok(..) => {
|
||||
wake_sender = true;
|
||||
for (socket_handle, control) in sockets.iter() {
|
||||
let socket_handle = *socket_handle;
|
||||
let socket = socket_set.get_mut::<TcpSocket>(socket_handle);
|
||||
let mut control = control.lock();
|
||||
|
||||
// Remove the socket only when it is in the closed state.
|
||||
if socket.state() == TcpState::Closed {
|
||||
sockets_to_remove.push(socket_handle);
|
||||
|
||||
control.send_state = TcpSocketState::Closed;
|
||||
control.recv_state = TcpSocketState::Closed;
|
||||
|
||||
if let Some(waker) = control.send_waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
if let Some(waker) = control.recv_waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
Err(err) => {
|
||||
error!("socket send error: {:?}, {:?}", err, socket.state());
|
||||
|
||||
// Don't know why. Abort the connection.
|
||||
socket.abort();
|
||||
trace!("closed TCP connection");
|
||||
continue;
|
||||
}
|
||||
|
||||
if matches!(control.send_state, TcpSocketState::Normal) {
|
||||
control.send_state = TcpSocketState::Closed;
|
||||
// SHUT_WR
|
||||
if matches!(control.send_state, TcpSocketState::Close) {
|
||||
trace!("closing TCP Write Half, {:?}", socket.state());
|
||||
|
||||
// Close the socket. Set to FIN state
|
||||
socket.close();
|
||||
control.send_state = TcpSocketState::Closing;
|
||||
|
||||
// We can still process the pending buffer.
|
||||
}
|
||||
|
||||
// Check if readable
|
||||
let mut wake_receiver = false;
|
||||
while socket.can_recv() && !control.recv_buffer.is_full() {
|
||||
let result = socket.recv(|buffer| {
|
||||
let n = control.recv_buffer.enqueue_slice(buffer);
|
||||
(n, ())
|
||||
});
|
||||
|
||||
match result {
|
||||
Ok(..) => {
|
||||
wake_receiver = true;
|
||||
}
|
||||
wake_sender = true;
|
||||
Err(err) => {
|
||||
error!("socket recv error: {:?}, {:?}", err, socket.state());
|
||||
|
||||
// The socket will be recycled in the next poll.
|
||||
break;
|
||||
// Don't know why. Abort the connection.
|
||||
socket.abort();
|
||||
|
||||
if matches!(control.recv_state, TcpSocketState::Normal) {
|
||||
control.recv_state = TcpSocketState::Closed;
|
||||
}
|
||||
wake_receiver = true;
|
||||
|
||||
// The socket will be recycled in the next poll.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If socket is not in ESTABLISH, FIN-WAIT-1, FIN-WAIT-2,
|
||||
// the local client have closed our receiver.
|
||||
if matches!(control.recv_state, TcpSocketState::Normal)
|
||||
&& !socket.may_recv()
|
||||
&& !matches!(
|
||||
socket.state(),
|
||||
TcpState::Listen
|
||||
| TcpState::SynReceived
|
||||
| TcpState::Established
|
||||
| TcpState::FinWait1
|
||||
| TcpState::FinWait2
|
||||
)
|
||||
{
|
||||
trace!("closed TCP Read Half, {:?}", socket.state());
|
||||
|
||||
// Let TcpConnection::poll_read returns EOF.
|
||||
control.recv_state = TcpSocketState::Closed;
|
||||
wake_receiver = true;
|
||||
}
|
||||
|
||||
if wake_receiver && control.recv_waker.is_some() {
|
||||
if let Some(waker) = control.recv_waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
// Check if writable
|
||||
let mut wake_sender = false;
|
||||
while socket.can_send() && !control.send_buffer.is_empty() {
|
||||
let result = socket.send(|buffer| {
|
||||
let n = control.send_buffer.dequeue_slice(buffer);
|
||||
(n, ())
|
||||
});
|
||||
|
||||
match result {
|
||||
Ok(..) => {
|
||||
wake_sender = true;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("socket send error: {:?}, {:?}", err, socket.state());
|
||||
|
||||
// Don't know why. Abort the connection.
|
||||
socket.abort();
|
||||
|
||||
if matches!(control.send_state, TcpSocketState::Normal) {
|
||||
control.send_state = TcpSocketState::Closed;
|
||||
}
|
||||
wake_sender = true;
|
||||
|
||||
// The socket will be recycled in the next poll.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if wake_sender && control.send_waker.is_some() {
|
||||
if let Some(waker) = control.send_waker.take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if wake_sender && control.send_waker.is_some() {
|
||||
if let Some(waker) = control.send_waker.take() {
|
||||
waker.wake();
|
||||
for socket_handle in sockets_to_remove {
|
||||
sockets.remove(&socket_handle);
|
||||
socket_set.remove(socket_handle);
|
||||
}
|
||||
|
||||
if !device.recv_available() {
|
||||
let next_duration = iface
|
||||
.poll_delay(before_poll, &socket_set)
|
||||
.unwrap_or(SmolDuration::from_millis(5));
|
||||
if next_duration != SmolDuration::ZERO {
|
||||
thread::park_timeout(Duration::from(next_duration));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for socket_handle in sockets_to_remove {
|
||||
sockets.remove(&socket_handle);
|
||||
socket_set.remove(socket_handle);
|
||||
}
|
||||
|
||||
if !device.recv_available() {
|
||||
let next_duration = iface
|
||||
.poll_delay(before_poll, &socket_set)
|
||||
.unwrap_or(SmolDuration::from_millis(5));
|
||||
if next_duration != SmolDuration::ZERO {
|
||||
thread::park_timeout(Duration::from(next_duration));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trace!("VirtDevice::poll thread exited");
|
||||
})
|
||||
trace!("VirtDevice::poll thread exited");
|
||||
})
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
let manager_notify = Arc::new(ManagerNotify::new(manager_handle.thread().clone()));
|
||||
|
||||
Reference in New Issue
Block a user