diff --git a/crates/shadowsocks-service/src/local/tun/tcp.rs b/crates/shadowsocks-service/src/local/tun/tcp.rs index 06a350a8..fa376b18 100644 --- a/crates/shadowsocks-service/src/local/tun/tcp.rs +++ b/crates/shadowsocks-service/src/local/tun/tcp.rs @@ -295,181 +295,184 @@ impl TcpTun { let manager_handle = { let manager_running = manager_running.clone(); - thread::spawn(move || { - let TcpSocketManager { - ref mut device, - ref mut iface, - ref mut sockets, - ref mut socket_creation_rx, - .. - } = manager; + thread::Builder::new() + .name("smoltcp-poll".to_owned()) + .spawn(move || { + let TcpSocketManager { + ref mut device, + ref mut iface, + ref mut sockets, + ref mut socket_creation_rx, + .. + } = manager; - let mut socket_set = SocketSet::new(vec![]); + let mut socket_set = SocketSet::new(vec![]); - while manager_running.load(Ordering::Relaxed) { - while let Ok(TcpSocketCreation { - control, - socket, - socket_created_tx: socket_create_tx, - }) = socket_creation_rx.try_recv() - { - let handle = socket_set.add(socket); - let _ = socket_create_tx.send(()); - sockets.insert(handle, control); - } - - let before_poll = SmolInstant::now(); - let updated_sockets = iface.poll(before_poll, device, &mut socket_set); - - if updated_sockets { - trace!("VirtDevice::poll costed {}", SmolInstant::now() - before_poll); - } - - // Check all the sockets' status - let mut sockets_to_remove = Vec::new(); - - for (socket_handle, control) in sockets.iter() { - let socket_handle = *socket_handle; - let socket = socket_set.get_mut::(socket_handle); - let mut control = control.lock(); - - // Remove the socket only when it is in the closed state. - if socket.state() == TcpState::Closed { - sockets_to_remove.push(socket_handle); - - control.send_state = TcpSocketState::Closed; - control.recv_state = TcpSocketState::Closed; - - if let Some(waker) = control.send_waker.take() { - waker.wake(); - } - if let Some(waker) = control.recv_waker.take() { - waker.wake(); - } - - trace!("closed TCP connection"); - continue; - } - - // SHUT_WR - if matches!(control.send_state, TcpSocketState::Close) { - trace!("closing TCP Write Half, {:?}", socket.state()); - - // Close the socket. Set to FIN state - socket.close(); - control.send_state = TcpSocketState::Closing; - - // We can still process the pending buffer. - } - - // Check if readable - let mut wake_receiver = false; - while socket.can_recv() && !control.recv_buffer.is_full() { - let result = socket.recv(|buffer| { - let n = control.recv_buffer.enqueue_slice(buffer); - (n, ()) - }); - - match result { - Ok(..) => { - wake_receiver = true; - } - Err(err) => { - error!("socket recv error: {:?}, {:?}", err, socket.state()); - - // Don't know why. Abort the connection. - socket.abort(); - - if matches!(control.recv_state, TcpSocketState::Normal) { - control.recv_state = TcpSocketState::Closed; - } - wake_receiver = true; - - // The socket will be recycled in the next poll. - break; - } - } - } - - // If socket is not in ESTABLISH, FIN-WAIT-1, FIN-WAIT-2, - // the local client have closed our receiver. - if matches!(control.recv_state, TcpSocketState::Normal) - && !socket.may_recv() - && !matches!( - socket.state(), - TcpState::Listen - | TcpState::SynReceived - | TcpState::Established - | TcpState::FinWait1 - | TcpState::FinWait2 - ) + while manager_running.load(Ordering::Relaxed) { + while let Ok(TcpSocketCreation { + control, + socket, + socket_created_tx: socket_create_tx, + }) = socket_creation_rx.try_recv() { - trace!("closed TCP Read Half, {:?}", socket.state()); - - // Let TcpConnection::poll_read returns EOF. - control.recv_state = TcpSocketState::Closed; - wake_receiver = true; + let handle = socket_set.add(socket); + let _ = socket_create_tx.send(()); + sockets.insert(handle, control); } - if wake_receiver && control.recv_waker.is_some() { - if let Some(waker) = control.recv_waker.take() { - waker.wake(); - } + let before_poll = SmolInstant::now(); + let updated_sockets = iface.poll(before_poll, device, &mut socket_set); + + if updated_sockets { + trace!("VirtDevice::poll costed {}", SmolInstant::now() - before_poll); } - // Check if writable - let mut wake_sender = false; - while socket.can_send() && !control.send_buffer.is_empty() { - let result = socket.send(|buffer| { - let n = control.send_buffer.dequeue_slice(buffer); - (n, ()) - }); + // Check all the sockets' status + let mut sockets_to_remove = Vec::new(); - match result { - Ok(..) => { - wake_sender = true; + for (socket_handle, control) in sockets.iter() { + let socket_handle = *socket_handle; + let socket = socket_set.get_mut::(socket_handle); + let mut control = control.lock(); + + // Remove the socket only when it is in the closed state. + if socket.state() == TcpState::Closed { + sockets_to_remove.push(socket_handle); + + control.send_state = TcpSocketState::Closed; + control.recv_state = TcpSocketState::Closed; + + if let Some(waker) = control.send_waker.take() { + waker.wake(); + } + if let Some(waker) = control.recv_waker.take() { + waker.wake(); } - Err(err) => { - error!("socket send error: {:?}, {:?}", err, socket.state()); - // Don't know why. Abort the connection. - socket.abort(); + trace!("closed TCP connection"); + continue; + } - if matches!(control.send_state, TcpSocketState::Normal) { - control.send_state = TcpSocketState::Closed; + // SHUT_WR + if matches!(control.send_state, TcpSocketState::Close) { + trace!("closing TCP Write Half, {:?}", socket.state()); + + // Close the socket. Set to FIN state + socket.close(); + control.send_state = TcpSocketState::Closing; + + // We can still process the pending buffer. + } + + // Check if readable + let mut wake_receiver = false; + while socket.can_recv() && !control.recv_buffer.is_full() { + let result = socket.recv(|buffer| { + let n = control.recv_buffer.enqueue_slice(buffer); + (n, ()) + }); + + match result { + Ok(..) => { + wake_receiver = true; } - wake_sender = true; + Err(err) => { + error!("socket recv error: {:?}, {:?}", err, socket.state()); - // The socket will be recycled in the next poll. - break; + // Don't know why. Abort the connection. + socket.abort(); + + if matches!(control.recv_state, TcpSocketState::Normal) { + control.recv_state = TcpSocketState::Closed; + } + wake_receiver = true; + + // The socket will be recycled in the next poll. + break; + } + } + } + + // If socket is not in ESTABLISH, FIN-WAIT-1, FIN-WAIT-2, + // the local client have closed our receiver. + if matches!(control.recv_state, TcpSocketState::Normal) + && !socket.may_recv() + && !matches!( + socket.state(), + TcpState::Listen + | TcpState::SynReceived + | TcpState::Established + | TcpState::FinWait1 + | TcpState::FinWait2 + ) + { + trace!("closed TCP Read Half, {:?}", socket.state()); + + // Let TcpConnection::poll_read returns EOF. + control.recv_state = TcpSocketState::Closed; + wake_receiver = true; + } + + if wake_receiver && control.recv_waker.is_some() { + if let Some(waker) = control.recv_waker.take() { + waker.wake(); + } + } + + // Check if writable + let mut wake_sender = false; + while socket.can_send() && !control.send_buffer.is_empty() { + let result = socket.send(|buffer| { + let n = control.send_buffer.dequeue_slice(buffer); + (n, ()) + }); + + match result { + Ok(..) => { + wake_sender = true; + } + Err(err) => { + error!("socket send error: {:?}, {:?}", err, socket.state()); + + // Don't know why. Abort the connection. + socket.abort(); + + if matches!(control.send_state, TcpSocketState::Normal) { + control.send_state = TcpSocketState::Closed; + } + wake_sender = true; + + // The socket will be recycled in the next poll. + break; + } + } + } + + if wake_sender && control.send_waker.is_some() { + if let Some(waker) = control.send_waker.take() { + waker.wake(); } } } - if wake_sender && control.send_waker.is_some() { - if let Some(waker) = control.send_waker.take() { - waker.wake(); + for socket_handle in sockets_to_remove { + sockets.remove(&socket_handle); + socket_set.remove(socket_handle); + } + + if !device.recv_available() { + let next_duration = iface + .poll_delay(before_poll, &socket_set) + .unwrap_or(SmolDuration::from_millis(5)); + if next_duration != SmolDuration::ZERO { + thread::park_timeout(Duration::from(next_duration)); } } } - for socket_handle in sockets_to_remove { - sockets.remove(&socket_handle); - socket_set.remove(socket_handle); - } - - if !device.recv_available() { - let next_duration = iface - .poll_delay(before_poll, &socket_set) - .unwrap_or(SmolDuration::from_millis(5)); - if next_duration != SmolDuration::ZERO { - thread::park_timeout(Duration::from(next_duration)); - } - } - } - - trace!("VirtDevice::poll thread exited"); - }) + trace!("VirtDevice::poll thread exited"); + }) + .unwrap() }; let manager_notify = Arc::new(ManagerNotify::new(manager_handle.thread().clone()));