Fixed bug, concatenated buffer kept available before first write successfully

This commit is contained in:
zonyitoo
2021-04-10 09:07:52 +08:00
parent 1746c624fc
commit f805b5d350
8 changed files with 194 additions and 96 deletions

6
Cargo.lock generated
View File

@@ -1470,7 +1470,7 @@ dependencies = [
[[package]]
name = "shadowsocks"
version = "1.10.1"
version = "1.10.2"
dependencies = [
"arc-swap 1.2.0",
"async-trait",
@@ -1514,7 +1514,7 @@ dependencies = [
[[package]]
name = "shadowsocks-rust"
version = "1.10.4"
version = "1.10.5"
dependencies = [
"byte_string",
"byteorder",
@@ -1535,7 +1535,7 @@ dependencies = [
[[package]]
name = "shadowsocks-service"
version = "1.10.3"
version = "1.10.4"
dependencies = [
"async-trait",
"byte_string",

View File

@@ -1,6 +1,6 @@
[package]
name = "shadowsocks-rust"
version = "1.10.4"
version = "1.10.5"
authors = ["Shadowsocks Contributors"]
description = "shadowsocks is a fast tunnel proxy that helps you bypass firewalls."
repository = "https://github.com/shadowsocks/shadowsocks-rust"
@@ -122,7 +122,7 @@ mimalloc = { version = "0.1", optional = true }
tcmalloc = { version = "0.3", optional = true }
jemallocator = { version = "0.3", optional = true }
shadowsocks-service = { version = "1.10.3", path = "./crates/shadowsocks-service" }
shadowsocks-service = { version = "1.10.4", path = "./crates/shadowsocks-service" }
[target.'cfg(unix)'.dependencies]
daemonize = "0.4"

View File

@@ -1,6 +1,6 @@
[package]
name = "shadowsocks-service"
version = "1.10.3"
version = "1.10.4"
authors = ["Shadowsocks Contributors"]
description = "shadowsocks is a fast tunnel proxy that helps you bypass firewalls."
repository = "https://github.com/shadowsocks/shadowsocks-rust"
@@ -105,7 +105,7 @@ regex = "1.4"
serde = { version = "1.0", features = ["derive"] }
json5 = "0.3"
shadowsocks = { version = "1.10.1", path = "../shadowsocks" }
shadowsocks = { version = "1.10.2", path = "../shadowsocks" }
strum = { version = "0.20", optional = true }
strum_macros = { version = "0.20", optional = true }

View File

@@ -1,6 +1,6 @@
[package]
name = "shadowsocks"
version = "1.10.1"
version = "1.10.2"
authors = ["Shadowsocks Contributors"]
description = "shadowsocks is a fast tunnel proxy that helps you bypass firewalls."
repository = "https://github.com/shadowsocks/shadowsocks-rust"

View File

@@ -10,6 +10,7 @@ use bytes::{BufMut, BytesMut};
use futures::ready;
use log::trace;
use once_cell::sync::Lazy;
use pin_project::pin_project;
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::TcpStream,
@@ -26,10 +27,18 @@ use crate::{
},
};
enum ProxyClientStreamWriteState {
Connect(Address),
Connecting(BytesMut),
Connected,
}
/// A stream for sending / receiving data stream from remote server via shadowsocks' proxy server
#[pin_project]
pub struct ProxyClientStream<S> {
#[pin]
stream: CryptoStream<S>,
addr: Option<Address>,
state: ProxyClientStreamWriteState,
context: SharedContext,
}
@@ -140,7 +149,7 @@ where
ProxyClientStream {
stream,
addr: Some(addr),
state: ProxyClientStreamWriteState::Connect(addr),
context,
}
}
@@ -166,9 +175,9 @@ where
S: AsyncRead + AsyncWrite + Unpin,
{
#[inline]
fn poll_read(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
let context = unsafe { &*(self.context.as_ref() as *const _) };
self.stream.poll_read_decrypted(cx, context, buf)
fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
let mut this = self.project();
this.stream.poll_read_decrypted(cx, &this.context, buf)
}
}
@@ -176,45 +185,61 @@ impl<S> AsyncWrite for ProxyClientStream<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
match self.addr {
None => {
// For all subsequence calls, just proxy it to self.stream
return self.stream.poll_write_encrypted(cx, buf);
}
Some(ref addr) => {
let addr_length = addr.serialized_len();
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
let mut this = self.project();
let mut buffer = BytesMut::with_capacity(addr_length + buf.len());
addr.write_to_buf(&mut buffer);
buffer.put_slice(buf);
loop {
match this.state {
ProxyClientStreamWriteState::Connect(ref addr) => {
// Target Address should be sent with the first packet together,
// which would prevent from being detected by connection features.
ready!(self.stream.poll_write_encrypted(cx, &buffer))?;
let addr_length = addr.serialized_len();
// fallthrough. take the self.addr out
let mut buffer = BytesMut::with_capacity(addr_length + buf.len());
addr.write_to_buf(&mut buffer);
buffer.put_slice(buf);
// Save the concatenated buffer before it is written successfully.
// APIs require buffer to be kept alive before Poll::Ready
//
// Proactor APIs like IOCP on Windows, pointers of buffers have to be kept alive
// before IO completion.
*(this.state) = ProxyClientStreamWriteState::Connecting(buffer);
}
ProxyClientStreamWriteState::Connecting(ref buffer) => {
let n = ready!(this.stream.poll_write_encrypted(cx, &buffer))?;
// In general, poll_write_encrypted should perform like write_all.
debug_assert!(n == buffer.len());
*(this.state) = ProxyClientStreamWriteState::Connected;
// NOTE:
// poll_write will return Ok(0) if buf.len() == 0
// But for the first call, this function will eventually send the handshake packet (IV/Salt + ADDR) to the remote address.
//
// https://github.com/shadowsocks/shadowsocks-rust/issues/232
//
// For protocols that requires *Server Hello* message, like FTP, clients won't send anything to the server until server sends handshake messages.
// This could be achieved by calling poll_write with an empty input buffer.
return Ok(buf.len()).into();
}
ProxyClientStreamWriteState::Connected => {
return this.stream.poll_write_encrypted(cx, buf);
}
}
}
let _ = self.addr.take();
// NOTE:
// poll_write will return Ok(0) if buf.len() == 0
// But for the first call, this function will eventually send the handshake packet (IV/Salt + ADDR) to the remote address.
//
// https://github.com/shadowsocks/shadowsocks-rust/issues/232
//
// For protocols that requires *Server Hello* message, like FTP, clients won't send anything to the server until server sends handshake messages.
// This could be achieved by calling poll_write with an empty input buffer.
Ok(buf.len()).into()
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.stream.poll_flush(cx)
#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().stream.poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.stream.poll_shutdown(cx)
#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().stream.poll_shutdown(cx)
}
}
@@ -222,7 +247,13 @@ impl<S> ProxyClientStream<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
/// Splits into reader and writer halves
pub fn into_split(self) -> (ProxyClientStreamReadHalf<S>, ProxyClientStreamWriteHalf<S>) {
// Cannot split if stream is still pending
assert!(
!matches!(self.state, ProxyClientStreamWriteState::Connecting(..)),
"stream is pending on writing the first packet"
);
let (reader, writer) = self.stream.into_split();
(
ProxyClientStreamReadHalf {
@@ -231,13 +262,16 @@ where
},
ProxyClientStreamWriteHalf {
writer,
addr: self.addr,
state: self.state,
},
)
}
}
/// Owned read half produced by `ProxyClientStream::into_split`
#[pin_project]
pub struct ProxyClientStreamReadHalf<S> {
#[pin]
reader: CryptoStreamReadHalf<S>,
context: SharedContext,
}
@@ -247,53 +281,78 @@ where
S: AsyncRead + Unpin,
{
#[inline]
fn poll_read(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
let context = unsafe { &*(self.context.as_ref() as *const _) };
self.reader.poll_read_decrypted(cx, context, buf)
fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
let mut this = self.project();
this.reader.poll_read_decrypted(cx, &this.context, buf)
}
}
/// Owned write half produced by `ProxyClientStream::into_split`
#[pin_project]
pub struct ProxyClientStreamWriteHalf<S> {
#[pin]
writer: CryptoStreamWriteHalf<S>,
addr: Option<Address>,
state: ProxyClientStreamWriteState,
}
impl<S> AsyncWrite for ProxyClientStreamWriteHalf<S>
where
S: AsyncWrite + Unpin,
{
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
if self.addr.is_none() {
// For all subsequence calls, just proxy it to self.writer
return self.writer.poll_write_encrypted(cx, buf);
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
let mut this = self.project();
loop {
match this.state {
ProxyClientStreamWriteState::Connect(ref addr) => {
// Target Address should be sent with the first packet together,
// which would prevent from being detected by connection features.
let addr_length = addr.serialized_len();
let mut buffer = BytesMut::with_capacity(addr_length + buf.len());
addr.write_to_buf(&mut buffer);
buffer.put_slice(buf);
// Save the concatenated buffer before it is written successfully.
// APIs require buffer to be kept alive before Poll::Ready
//
// Proactor APIs like IOCP on Windows, pointers of buffers have to be kept alive
// before IO completion.
*(this.state) = ProxyClientStreamWriteState::Connecting(buffer);
}
ProxyClientStreamWriteState::Connecting(ref buffer) => {
let n = ready!(this.writer.poll_write_encrypted(cx, &buffer))?;
// In general, poll_write_encrypted should perform like write_all.
debug_assert!(n == buffer.len());
*(this.state) = ProxyClientStreamWriteState::Connected;
// NOTE:
// poll_write will return Ok(0) if buf.len() == 0
// But for the first call, this function will eventually send the handshake packet (IV/Salt + ADDR) to the remote address.
//
// https://github.com/shadowsocks/shadowsocks-rust/issues/232
//
// For protocols that requires *Server Hello* message, like FTP, clients won't send anything to the server until server sends handshake messages.
// This could be achieved by calling poll_write with an empty input buffer.
return Ok(buf.len()).into();
}
ProxyClientStreamWriteState::Connected => {
return this.writer.poll_write_encrypted(cx, buf);
}
}
}
let addr = self.addr.take().unwrap();
let addr_length = addr.serialized_len();
let mut buffer = BytesMut::with_capacity(addr_length + buf.len());
addr.write_to_buf(&mut buffer);
buffer.put_slice(buf);
ready!(self.writer.poll_write_encrypted(cx, &buffer))?;
// NOTE:
// poll_write will return Ok(0) if buf.len() == 0
// But for the first call, this function will eventually send the handshake packet (IV/Salt + ADDR) to the remote address.
//
// https://github.com/shadowsocks/shadowsocks-rust/issues/232
//
// For protocols that requires *Server Hello* message, like FTP, clients won't send anything to the server until server sends handshake messages.
// This could be achieved by calling poll_write with an empty input buffer.
Ok(buf.len()).into()
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.writer.poll_flush(cx)
#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().writer.poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.writer.poll_shutdown(cx)
#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().writer.poll_shutdown(cx)
}
}

View File

@@ -6,6 +6,7 @@ use std::{
task::{self, Poll},
};
use pin_project::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::{
@@ -15,7 +16,9 @@ use crate::{
};
/// A stream for communicating with shadowsocks' proxy client
#[pin_project]
pub struct ProxyServerStream<S> {
#[pin]
stream: CryptoStream<S>,
context: SharedContext,
}
@@ -71,9 +74,10 @@ impl<S> AsyncRead for ProxyServerStream<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
let context = unsafe { &*(self.context.as_ref() as *const _) };
self.stream.poll_read_decrypted(cx, context, buf)
#[inline]
fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
let mut this = self.project();
this.stream.poll_read_decrypted(cx, &this.context, buf)
}
}
@@ -81,20 +85,26 @@ impl<S> AsyncWrite for ProxyServerStream<S>
where
S: AsyncRead + AsyncWrite + Unpin,
{
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
self.stream.poll_write_encrypted(cx, buf)
#[inline]
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
self.project().stream.poll_write_encrypted(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.stream.poll_flush(cx)
#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().stream.poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.stream.poll_shutdown(cx)
#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().stream.poll_shutdown(cx)
}
}
/// Owned read half produced by `ProxyServerStream::into_split`
#[pin_project]
pub struct ProxyServerStreamReadHalf<S> {
#[pin]
reader: CryptoStreamReadHalf<S>,
context: SharedContext,
}
@@ -103,13 +113,17 @@ impl<S> AsyncRead for ProxyServerStreamReadHalf<S>
where
S: AsyncRead + Unpin,
{
fn poll_read(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
let context = unsafe { &*(self.context.as_ref() as *const _) };
self.reader.poll_read_decrypted(cx, context, buf)
#[inline]
fn poll_read(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
let mut this = self.project();
this.reader.poll_read_decrypted(cx, &this.context, buf)
}
}
/// Owned write half produced by `ProxyServerStream::into_split`
#[pin_project]
pub struct ProxyServerStreamWriteHalf<S> {
#[pin]
writer: CryptoStreamWriteHalf<S>,
}
@@ -117,15 +131,18 @@ impl<S> AsyncWrite for ProxyServerStreamWriteHalf<S>
where
S: AsyncWrite + Unpin,
{
fn poll_write(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
self.writer.poll_write_encrypted(cx, buf)
#[inline]
fn poll_write(self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
self.project().writer.poll_write_encrypted(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.writer.poll_flush(cx)
#[inline]
fn poll_flush(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().writer.poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.writer.poll_shutdown(cx)
#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Result<(), io::Error>> {
self.project().writer.poll_shutdown(cx)
}
}

22
debian/changelog vendored
View File

@@ -1,3 +1,25 @@
shadowsocks-rust (1.10.5) unstable; urgency=medium
## BUG Fixed
- `ProxyClientStream` should keep the concatenated first packet buffer alive before asynchronous `write()` finishes
shadowsocks-rust (1.10.4) unstable; urgency=medium
# Fixed BUG
- `ProxyClientStream::poll_write` may lose the `Address` in the packet to be sent if socket returns `EAGAIN`
# Features
- Support `protocol` in basic configuration format
shadowsocks-rust (1.10.3) unstable; urgency=medium
## BUG Fixed
- #472 Fixed `SO_INCOMING_CPU` when building on some Linux targets. rust-lang/socket2#213
shadowsocks-rust (1.10.2) unstable; urgency=medium
## BUG Fixed

View File

@@ -1,8 +1,8 @@
class ShadowsocksRust < Formula
desc "shadowsocks is a fast tunnel proxy that helps you bypass firewalls"
homepage "https://github.com/shadowsocks/shadowsocks-rust"
url "https://github.com/shadowsocks/shadowsocks-rust/archive/refs/tags/v1.10.3.tar.gz"
version "1.10.3"
url "https://github.com/shadowsocks/shadowsocks-rust/archive/refs/tags/v1.10.5.tar.gz"
version "1.10.5"
sha256 "00fb90b6f80d01c6b40f6cfeb49d70fbec9f659bfa268d6834e79fe1f670d55e"
license "MIT"