From e452a39a1c8ffc898d022b34eaa6a01a06721c88 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Mon, 17 Oct 2022 14:24:11 +0200 Subject: [PATCH 01/10] add a poll_flush implementation for the tokio bufread module --- src/tokio/bufread/generic/encoder.rs | 22 +++++++++++++++++++++- src/tokio/bufread/macros/encoder.rs | 10 ++++++++++ src/tokio/mod.rs | 8 ++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/src/tokio/bufread/generic/encoder.rs b/src/tokio/bufread/generic/encoder.rs index a80fa122..998eab49 100644 --- a/src/tokio/bufread/generic/encoder.rs +++ b/src/tokio/bufread/generic/encoder.rs @@ -4,7 +4,7 @@ use core::{ }; use std::io::Result; -use crate::{codec::Encode, util::PartialBuffer}; +use crate::{codec::Encode, tokio::AsyncFlush, util::PartialBuffer}; use futures_core::ready; use pin_project_lite::pin_project; use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; @@ -115,3 +115,23 @@ impl AsyncRead for Encoder { } } } + +impl AsyncFlush for Encoder { + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let mut output = PartialBuffer::new(buf.initialize_unfilled()); + let mut this = self.project(); + + match this.encoder.flush(&mut output)? { + true => { + let len = output.written().len(); + buf.advance(len); + Poll::Ready(Ok(true)) + } + false => Poll::Ready(Ok(false)), + } + } +} diff --git a/src/tokio/bufread/macros/encoder.rs b/src/tokio/bufread/macros/encoder.rs index 44c8b595..4a6ee4f0 100644 --- a/src/tokio/bufread/macros/encoder.rs +++ b/src/tokio/bufread/macros/encoder.rs @@ -62,6 +62,16 @@ macro_rules! encoder { } } + impl<$inner: tokio::io::AsyncBufRead> crate::tokio::AsyncFlush for $name<$inner> { + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + self.project().inner.poll_flush(cx, buf) + } + } + const _: () = { fn _assert() { use crate::util::{_assert_send, _assert_sync}; diff --git a/src/tokio/mod.rs b/src/tokio/mod.rs index 8eba9add..a5514941 100644 --- a/src/tokio/mod.rs +++ b/src/tokio/mod.rs @@ -2,3 +2,11 @@ pub mod bufread; pub mod write; + +pub trait AsyncFlush { + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll>; +} From 1f5b586e9f14c367c3b965b57984151cf6ca356f Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Mon, 17 Oct 2022 17:34:43 +0200 Subject: [PATCH 02/10] add a streaming encoder and a test --- Cargo.lock | 26 +++++++++++++++++++- Cargo.toml | 7 +++--- src/tokio/mod.rs | 62 ++++++++++++++++++++++++++++++++++++++++++++++++ tests/gzip.rs | 56 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 147 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 526cda1a..38116d22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -46,6 +46,7 @@ dependencies = [ "tokio 0.2.25", "tokio 0.3.7", "tokio 1.15.0", + "tokio-stream", "tokio-util 0.3.1", "tokio-util 0.4.0", "tokio-util 0.5.1", @@ -725,7 +726,7 @@ dependencies = [ "memchr", "pin-project-lite 0.1.12", "slab", - "tokio-macros", + "tokio-macros 0.2.6", ] [[package]] @@ -750,6 +751,7 @@ dependencies = [ "bytes 1.1.0", "memchr", "pin-project-lite 0.2.8", + "tokio-macros 1.8.0", ] [[package]] @@ -763,6 +765,28 @@ dependencies = [ "syn 1.0.86", ] +[[package]] +name = "tokio-macros" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +dependencies = [ + "proc-macro2 1.0.36", + "quote 1.0.15", + "syn 1.0.86", +] + +[[package]] +name = "tokio-stream" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" +dependencies = [ + "futures-core", + "pin-project-lite 0.2.8", + "tokio 1.15.0", +] + [[package]] name = "tokio-util" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 8e9b0823..a11b4a4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"] [features] # groups -default = [] +default = ["gzip", "tokio"] all = ["all-implementations", "all-algorithms"] all-implementations = ["futures-io", "stream", "tokio-02", "tokio-03", "tokio"] all-algorithms = ["brotli", "bzip2", "deflate", "gzip", "lzma", "xz", "zlib", "zstd"] @@ -49,7 +49,7 @@ zstd-safe = { version = "5.0.1", optional = true, default-features = false } memchr = "2.2.1" tokio-02 = { package = "tokio", version = "0.2.21", optional = true, default-features = false } tokio-03 = { package = "tokio", version = "0.3.0", optional = true, default-features = false } -tokio = { version = "1.0.0", optional = true, default-features = false } +tokio = { version = "1.0.0", optional = true, default-features = false, features = ["time"] } [dev-dependencies] proptest = "1.0.0" @@ -63,11 +63,12 @@ bytes-06 = { package = "bytes", version = "0.6.0" } bytes = "1.0.0" tokio-02 = { package = "tokio", version = "0.2.21", default-features = false, features = ["io-util", "stream", "macros", "io-std"] } tokio-03 = { package = "tokio", version = "0.3.0", default-features = false, features = ["io-util", "stream"] } -tokio = { version = "1.0.0", default-features = false, features = ["io-util"] } +tokio = { version = "1.0.0", default-features = false, features = ["io-util", "macros", "rt"] } tokio-util-03 = { package = "tokio-util", version = "0.3.0", default-features = false, features = ["codec"] } tokio-util-04 = { package = "tokio-util", version = "0.4.0", default-features = false, features = ["io"] } tokio-util-05 = { package = "tokio-util", version = "0.5.0", default-features = false, features = ["io"] } tokio-util-06 = { package = "tokio-util", version = "0.6.0", default-features = false, features = ["io"] } +tokio-stream = { version = "0.1.11", features = ["time"] } [[test]] name = "brotli" diff --git a/src/tokio/mod.rs b/src/tokio/mod.rs index a5514941..ddf8b02f 100644 --- a/src/tokio/mod.rs +++ b/src/tokio/mod.rs @@ -1,8 +1,70 @@ //! Implementations for IO traits exported by [`tokio` v1.0](::tokio). +use std::{time::Duration, time::Instant}; + +use futures_core::Future; +use pin_project_lite::pin_project; +use tokio::io::{AsyncBufRead, AsyncRead}; + +use crate::codec::Encode; + +use self::bufread::Encoder; + pub mod bufread; pub mod write; +pin_project! { + pub struct StreamingEncoder { + #[pin] + encoder: E, + duration: Duration, + #[pin] + timeout: tokio::time::Sleep, + } +} + +impl StreamingEncoder { + pub fn new(encoder: E, timeout: Duration) -> Self { + let duration = timeout; + Self { + encoder, + duration, + timeout: tokio::time::sleep(timeout), + } + } +} + +impl AsyncRead for StreamingEncoder { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let mut this = self.project(); + + match this.encoder.as_mut().poll_read(cx, buf) { + std::task::Poll::Ready(r) => { + this.timeout + .reset(tokio::time::Instant::now() + *this.duration); + std::task::Poll::Ready(r) + } + std::task::Poll::Pending => match this.timeout.as_mut().poll(cx) { + std::task::Poll::Pending => std::task::Poll::Pending, + std::task::Poll::Ready(()) => { + this.timeout + .reset(tokio::time::Instant::now() + *this.duration); + match this.encoder.poll_flush(cx, buf) { + std::task::Poll::Ready(Ok(true)) => std::task::Poll::Ready(Ok(())), + std::task::Poll::Ready(Ok(false)) => std::task::Poll::Pending, + std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)), + std::task::Poll::Pending => std::task::Poll::Pending, + } + } + }, + } + } +} + pub trait AsyncFlush { fn poll_flush( self: std::pin::Pin<&mut Self>, diff --git a/tests/gzip.rs b/tests/gzip.rs index b17c7b34..f973c041 100644 --- a/tests/gzip.rs +++ b/tests/gzip.rs @@ -165,3 +165,59 @@ fn gzip_bufread_chunks_decompress_with_extra_header() { assert_eq!(output, &[1, 2, 3, 4, 5, 6][..]); } + +use std::time::Duration; + +use async_compression::tokio::{bufread::GzipEncoder, StreamingEncoder}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + time, +}; + +#[tokio::test] +async fn test() { + let (mut client, server) = tokio::io::duplex(1024); + tokio::task::spawn(async move { + loop { + client + .write_all(&std::iter::repeat(b'A').take(256).collect::>()) + .await + .unwrap(); + println!("sent data: 256 bytes"); + time::sleep(Duration::from_millis(100)).await; + } + }); + + let mut encoder = GzipEncoder::new(tokio::io::BufReader::new(server)); + //if this is commented out, the test will fail + let mut encoder = Box::pin(StreamingEncoder::new(encoder, Duration::from_millis(250))); + + let mut buf = std::iter::repeat(0u8).take(1024).collect::>(); + + let start = std::time::Instant::now(); + let mut counter = 0usize; + println!("start"); + loop { + let read = encoder.read(&mut buf); + match time::timeout(Duration::from_secs(5), read).await { + Err(e) => { + panic!("{}ms | timeout: {:?}", start.elapsed().as_millis(), e); + } + + Ok(res) => { + println!( + "{}ms | received data: {:?}", + start.elapsed().as_millis(), + res + ); + + counter += 1; + if counter == 10 { + break; + } + } + } + } + + //panic!(); +} From ef593998c7e4f4e0fcd0d664ae431f8c41654eb9 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Wed, 19 Oct 2022 11:59:22 +0200 Subject: [PATCH 03/10] cleanup --- Cargo.lock | 12 ------------ Cargo.toml | 3 +-- 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 38116d22..e3dbf0f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -46,7 +46,6 @@ dependencies = [ "tokio 0.2.25", "tokio 0.3.7", "tokio 1.15.0", - "tokio-stream", "tokio-util 0.3.1", "tokio-util 0.4.0", "tokio-util 0.5.1", @@ -776,17 +775,6 @@ dependencies = [ "syn 1.0.86", ] -[[package]] -name = "tokio-stream" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" -dependencies = [ - "futures-core", - "pin-project-lite 0.2.8", - "tokio 1.15.0", -] - [[package]] name = "tokio-util" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index a11b4a4d..bafed6b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"] [features] # groups -default = ["gzip", "tokio"] +default = [] all = ["all-implementations", "all-algorithms"] all-implementations = ["futures-io", "stream", "tokio-02", "tokio-03", "tokio"] all-algorithms = ["brotli", "bzip2", "deflate", "gzip", "lzma", "xz", "zlib", "zstd"] @@ -68,7 +68,6 @@ tokio-util-03 = { package = "tokio-util", version = "0.3.0", default-features = tokio-util-04 = { package = "tokio-util", version = "0.4.0", default-features = false, features = ["io"] } tokio-util-05 = { package = "tokio-util", version = "0.5.0", default-features = false, features = ["io"] } tokio-util-06 = { package = "tokio-util", version = "0.6.0", default-features = false, features = ["io"] } -tokio-stream = { version = "0.1.11", features = ["time"] } [[test]] name = "brotli" From 9bc110dc898efb3e02db332fc95abf34658899c1 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Wed, 19 Oct 2022 12:02:46 +0200 Subject: [PATCH 04/10] move code --- src/tokio/flush.rs | 68 ++++++++++++++++++++++++++++++++++++++++++++ src/tokio/mod.rs | 71 +--------------------------------------------- 2 files changed, 69 insertions(+), 70 deletions(-) create mode 100644 src/tokio/flush.rs diff --git a/src/tokio/flush.rs b/src/tokio/flush.rs new file mode 100644 index 00000000..fb42316e --- /dev/null +++ b/src/tokio/flush.rs @@ -0,0 +1,68 @@ +use std::{time::Duration, time::Instant}; + +use futures_core::Future; +use pin_project_lite::pin_project; +use tokio::io::{AsyncBufRead, AsyncRead}; + +use super::bufread::Encoder; +use crate::codec::Encode; + +pin_project! { + pub struct StreamingEncoder { + #[pin] + encoder: E, + duration: Duration, + #[pin] + timeout: tokio::time::Sleep, + } +} + +impl StreamingEncoder { + pub fn new(encoder: E, timeout: Duration) -> Self { + let duration = timeout; + Self { + encoder, + duration, + timeout: tokio::time::sleep(timeout), + } + } +} + +impl AsyncRead for StreamingEncoder { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let mut this = self.project(); + + match this.encoder.as_mut().poll_read(cx, buf) { + std::task::Poll::Ready(r) => { + this.timeout + .reset(tokio::time::Instant::now() + *this.duration); + std::task::Poll::Ready(r) + } + std::task::Poll::Pending => match this.timeout.as_mut().poll(cx) { + std::task::Poll::Pending => std::task::Poll::Pending, + std::task::Poll::Ready(()) => { + this.timeout + .reset(tokio::time::Instant::now() + *this.duration); + match this.encoder.poll_flush(cx, buf) { + std::task::Poll::Ready(Ok(true)) => std::task::Poll::Ready(Ok(())), + std::task::Poll::Ready(Ok(false)) => std::task::Poll::Pending, + std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)), + std::task::Poll::Pending => std::task::Poll::Pending, + } + } + }, + } + } +} + +pub trait AsyncFlush { + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll>; +} diff --git a/src/tokio/mod.rs b/src/tokio/mod.rs index ddf8b02f..fe2960a3 100644 --- a/src/tokio/mod.rs +++ b/src/tokio/mod.rs @@ -1,74 +1,5 @@ //! Implementations for IO traits exported by [`tokio` v1.0](::tokio). -use std::{time::Duration, time::Instant}; - -use futures_core::Future; -use pin_project_lite::pin_project; -use tokio::io::{AsyncBufRead, AsyncRead}; - -use crate::codec::Encode; - -use self::bufread::Encoder; - pub mod bufread; +pub mod flush; pub mod write; - -pin_project! { - pub struct StreamingEncoder { - #[pin] - encoder: E, - duration: Duration, - #[pin] - timeout: tokio::time::Sleep, - } -} - -impl StreamingEncoder { - pub fn new(encoder: E, timeout: Duration) -> Self { - let duration = timeout; - Self { - encoder, - duration, - timeout: tokio::time::sleep(timeout), - } - } -} - -impl AsyncRead for StreamingEncoder { - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { - let mut this = self.project(); - - match this.encoder.as_mut().poll_read(cx, buf) { - std::task::Poll::Ready(r) => { - this.timeout - .reset(tokio::time::Instant::now() + *this.duration); - std::task::Poll::Ready(r) - } - std::task::Poll::Pending => match this.timeout.as_mut().poll(cx) { - std::task::Poll::Pending => std::task::Poll::Pending, - std::task::Poll::Ready(()) => { - this.timeout - .reset(tokio::time::Instant::now() + *this.duration); - match this.encoder.poll_flush(cx, buf) { - std::task::Poll::Ready(Ok(true)) => std::task::Poll::Ready(Ok(())), - std::task::Poll::Ready(Ok(false)) => std::task::Poll::Pending, - std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)), - std::task::Poll::Pending => std::task::Poll::Pending, - } - } - }, - } - } -} - -pub trait AsyncFlush { - fn poll_flush( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll>; -} From 1051d4313027c295666484a6af9c4df630779cd8 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Wed, 19 Oct 2022 12:17:36 +0200 Subject: [PATCH 05/10] make the test usable with all algorithms --- src/tokio/bufread/generic/encoder.rs | 2 +- src/tokio/bufread/macros/encoder.rs | 2 +- tests/gzip.rs | 56 ---------------------- tests/utils/test_cases.rs | 72 ++++++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 58 deletions(-) diff --git a/src/tokio/bufread/generic/encoder.rs b/src/tokio/bufread/generic/encoder.rs index 998eab49..a47143ae 100644 --- a/src/tokio/bufread/generic/encoder.rs +++ b/src/tokio/bufread/generic/encoder.rs @@ -4,7 +4,7 @@ use core::{ }; use std::io::Result; -use crate::{codec::Encode, tokio::AsyncFlush, util::PartialBuffer}; +use crate::{codec::Encode, tokio::flush::AsyncFlush, util::PartialBuffer}; use futures_core::ready; use pin_project_lite::pin_project; use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; diff --git a/src/tokio/bufread/macros/encoder.rs b/src/tokio/bufread/macros/encoder.rs index 4a6ee4f0..5e4b444b 100644 --- a/src/tokio/bufread/macros/encoder.rs +++ b/src/tokio/bufread/macros/encoder.rs @@ -62,7 +62,7 @@ macro_rules! encoder { } } - impl<$inner: tokio::io::AsyncBufRead> crate::tokio::AsyncFlush for $name<$inner> { + impl<$inner: tokio::io::AsyncBufRead> crate::tokio::flush::AsyncFlush for $name<$inner> { fn poll_flush( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, diff --git a/tests/gzip.rs b/tests/gzip.rs index f973c041..b17c7b34 100644 --- a/tests/gzip.rs +++ b/tests/gzip.rs @@ -165,59 +165,3 @@ fn gzip_bufread_chunks_decompress_with_extra_header() { assert_eq!(output, &[1, 2, 3, 4, 5, 6][..]); } - -use std::time::Duration; - -use async_compression::tokio::{bufread::GzipEncoder, StreamingEncoder}; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - time, -}; - -#[tokio::test] -async fn test() { - let (mut client, server) = tokio::io::duplex(1024); - tokio::task::spawn(async move { - loop { - client - .write_all(&std::iter::repeat(b'A').take(256).collect::>()) - .await - .unwrap(); - println!("sent data: 256 bytes"); - time::sleep(Duration::from_millis(100)).await; - } - }); - - let mut encoder = GzipEncoder::new(tokio::io::BufReader::new(server)); - //if this is commented out, the test will fail - let mut encoder = Box::pin(StreamingEncoder::new(encoder, Duration::from_millis(250))); - - let mut buf = std::iter::repeat(0u8).take(1024).collect::>(); - - let start = std::time::Instant::now(); - let mut counter = 0usize; - println!("start"); - loop { - let read = encoder.read(&mut buf); - match time::timeout(Duration::from_secs(5), read).await { - Err(e) => { - panic!("{}ms | timeout: {:?}", start.elapsed().as_millis(), e); - } - - Ok(res) => { - println!( - "{}ms | received data: {:?}", - start.elapsed().as_millis(), - res - ); - - counter += 1; - if counter == 10 { - break; - } - } - } - } - - //panic!(); -} diff --git a/tests/utils/test_cases.rs b/tests/utils/test_cases.rs index fc61d78a..84131da7 100644 --- a/tests/utils/test_cases.rs +++ b/tests/utils/test_cases.rs @@ -444,6 +444,76 @@ macro_rules! io_test_cases { }; } +macro_rules! io_flush_test_cases { + ($variant:ident) => { + mod tokio_flush { + mod bufread { + mod compress { + use crate::utils::algos::$variant::tokio::bufread::Encoder; + + use std::time::Duration; + + use async_compression::tokio::flush::StreamingEncoder; + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + time, + }; + + #[tokio::test] + //#[ntest::timeout(1000)] + async fn test() { + let (mut client, server) = tokio::io::duplex(1024); + tokio::task::spawn(async move { + loop { + client + .write_all( + &std::iter::repeat(b'A').take(256).collect::>(), + ) + .await + .unwrap(); + println!("sent data: 256 bytes"); + time::sleep(Duration::from_millis(100)).await; + } + }); + + let mut encoder = Encoder::new(tokio::io::BufReader::new(server)); + //if this is commented out, the test will fail + let mut encoder = + Box::pin(StreamingEncoder::new(encoder, Duration::from_millis(250))); + + let mut buf = std::iter::repeat(0u8).take(1024).collect::>(); + + let start = std::time::Instant::now(); + let mut counter = 0usize; + println!("start"); + loop { + let read = encoder.read(&mut buf); + match time::timeout(Duration::from_secs(5), read).await { + Err(e) => { + panic!("{}ms | timeout: {:?}", start.elapsed().as_millis(), e); + } + + Ok(res) => { + println!( + "{}ms | received data: {:?}", + start.elapsed().as_millis(), + res + ); + + counter += 1; + if counter == 10 { + break; + } + } + } + } + } + } + } + } + }; +} + macro_rules! test_cases { ($variant:ident) => { mod $variant { @@ -702,6 +772,8 @@ macro_rules! test_cases { #[cfg(feature = "tokio")] io_test_cases!(tokio, $variant); + #[cfg(feature = "tokio")] + io_flush_test_cases!($variant); } }; } From 87bf8f1a9c4bf1a6559b440a843346b7edb4483d Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Wed, 19 Oct 2022 15:08:05 +0200 Subject: [PATCH 06/10] test a flushable encoder managed through a channel --- Cargo.lock | 35 ++++++++++++----------- Cargo.toml | 2 ++ src/tokio/flush.rs | 39 ++++++++++++++++++++++++++ tests/utils/test_cases.rs | 59 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 118 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e3dbf0f8..4093b015 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,9 +34,11 @@ dependencies = [ "bzip2", "flate2", "futures", + "futures-channel", "futures-core", "futures-io", "futures-test", + "futures-util", "memchr", "ntest", "pin-project-lite 0.2.8", @@ -216,9 +218,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.19" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3dda0b6588335f360afc675d0564c17a77a2bda81ca178a4b6081bd86c7f0b" +checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050" dependencies = [ "futures-core", "futures-sink", @@ -226,9 +228,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.19" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7" +checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf" [[package]] name = "futures-executor" @@ -243,15 +245,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.19" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f9d34af5a1aac6fb380f735fe510746c38067c5bf16c7fd250280503c971b2" +checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68" [[package]] name = "futures-macro" -version = "0.3.19" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c" +checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17" dependencies = [ "proc-macro2 1.0.36", "quote 1.0.15", @@ -260,26 +262,25 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.19" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3055baccb68d74ff6480350f8d6eb8fcfa3aa11bdc1a1ae3afdd0514617d508" +checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56" [[package]] name = "futures-task" -version = "0.3.19" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ee7c6485c30167ce4dfb83ac568a849fe53274c831081476ee13e0dce1aad72" +checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1" [[package]] name = "futures-test" -version = "0.3.19" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e741bc851e1e90ad08901b329389ae77e02d5e9a0ec61955b80834630fbdc2f" +checksum = "e77baeade98824bc928c21b8ad39918b9d8a06745ebdb6e2c93fb7673fb7968d" dependencies = [ "futures-core", "futures-executor", "futures-io", - "futures-macro", "futures-sink", "futures-task", "futures-util", @@ -289,9 +290,9 @@ dependencies = [ [[package]] name = "futures-util" -version = "0.3.19" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164" +checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90" dependencies = [ "futures-channel", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index bafed6b8..50c442f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,8 @@ memchr = "2.2.1" tokio-02 = { package = "tokio", version = "0.2.21", optional = true, default-features = false } tokio-03 = { package = "tokio", version = "0.3.0", optional = true, default-features = false } tokio = { version = "1.0.0", optional = true, default-features = false, features = ["time"] } +futures-channel = "0.3.24" +futures-util = "0.3.24" [dev-dependencies] proptest = "1.0.0" diff --git a/src/tokio/flush.rs b/src/tokio/flush.rs index fb42316e..1390d37f 100644 --- a/src/tokio/flush.rs +++ b/src/tokio/flush.rs @@ -1,6 +1,7 @@ use std::{time::Duration, time::Instant}; use futures_core::Future; +use futures_core::Stream; use pin_project_lite::pin_project; use tokio::io::{AsyncBufRead, AsyncRead}; @@ -66,3 +67,41 @@ pub trait AsyncFlush { buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll>; } + +pin_project! { + pub struct FlushableEncoder { + #[pin] + encoder: E, + #[pin] + receiver: futures_channel::mpsc::Receiver<()>, + } +} + +impl FlushableEncoder { + pub fn new(encoder: E, receiver: futures_channel::mpsc::Receiver<()>) -> Self { + Self { encoder, receiver } + } +} + +impl AsyncRead for FlushableEncoder { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let mut this = self.project(); + + match this.encoder.as_mut().poll_read(cx, buf) { + std::task::Poll::Ready(r) => std::task::Poll::Ready(r), + std::task::Poll::Pending => match this.receiver.as_mut().poll_next(cx) { + std::task::Poll::Pending => std::task::Poll::Pending, + std::task::Poll::Ready(_) => match this.encoder.poll_flush(cx, buf) { + std::task::Poll::Ready(Ok(true)) => std::task::Poll::Ready(Ok(())), + std::task::Poll::Ready(Ok(false)) => std::task::Poll::Pending, + std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)), + std::task::Poll::Pending => std::task::Poll::Pending, + }, + }, + } + } +} diff --git a/tests/utils/test_cases.rs b/tests/utils/test_cases.rs index 84131da7..ef06724c 100644 --- a/tests/utils/test_cases.rs +++ b/tests/utils/test_cases.rs @@ -453,7 +453,10 @@ macro_rules! io_flush_test_cases { use std::time::Duration; + use async_compression::tokio::flush::FlushableEncoder; use async_compression::tokio::flush::StreamingEncoder; + + use futures::SinkExt; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, time, @@ -508,6 +511,62 @@ macro_rules! io_flush_test_cases { } } } + + #[tokio::test] + //#[ntest::timeout(1000)] + async fn flushable_encoder() { + let (mut client, server) = tokio::io::duplex(1024); + tokio::task::spawn(async move { + loop { + client + .write_all( + &std::iter::repeat(b'A').take(256).collect::>(), + ) + .await + .unwrap(); + println!("sent data: 256 bytes"); + time::sleep(Duration::from_millis(100)).await; + } + }); + + let (mut tx, rx) = futures_channel::mpsc::channel(1); + let mut encoder = Encoder::new(tokio::io::BufReader::new(server)); + //if this is commented out, the test will fail + let mut encoder = Box::pin(FlushableEncoder::new(encoder, rx)); + + let mut buf = std::iter::repeat(0u8).take(1024).collect::>(); + + tokio::task::spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis(250)).await; + tx.send(()).await; + } + }); + let start = std::time::Instant::now(); + let mut counter = 0usize; + println!("start"); + loop { + let read = encoder.read(&mut buf); + match time::timeout(Duration::from_secs(5), read).await { + Err(e) => { + panic!("{}ms | timeout: {:?}", start.elapsed().as_millis(), e); + } + + Ok(res) => { + println!( + "{}ms | received data: {:?}", + start.elapsed().as_millis(), + res + ); + + counter += 1; + if counter == 10 { + break; + } + } + } + } + } } } } From cc86d79353f94a4e32b8efc3c86896a353cb133c Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Wed, 19 Oct 2022 15:24:39 +0200 Subject: [PATCH 07/10] make a FlushableEncoder for the futures-io version --- Cargo.lock | 1 - Cargo.toml | 1 - src/futures/flush.rs | 54 +++++++++++++++++++++++++++++++++++++++ src/futures/mod.rs | 1 + tests/utils/test_cases.rs | 13 +--------- 5 files changed, 56 insertions(+), 14 deletions(-) create mode 100644 src/futures/flush.rs diff --git a/Cargo.lock b/Cargo.lock index 4093b015..e940b137 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,7 +38,6 @@ dependencies = [ "futures-core", "futures-io", "futures-test", - "futures-util", "memchr", "ntest", "pin-project-lite 0.2.8", diff --git a/Cargo.toml b/Cargo.toml index 50c442f0..5c1c20f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,6 @@ tokio-02 = { package = "tokio", version = "0.2.21", optional = true, default-fea tokio-03 = { package = "tokio", version = "0.3.0", optional = true, default-features = false } tokio = { version = "1.0.0", optional = true, default-features = false, features = ["time"] } futures-channel = "0.3.24" -futures-util = "0.3.24" [dev-dependencies] proptest = "1.0.0" diff --git a/src/futures/flush.rs b/src/futures/flush.rs new file mode 100644 index 00000000..51ed50a1 --- /dev/null +++ b/src/futures/flush.rs @@ -0,0 +1,54 @@ +use std::{time::Duration, time::Instant}; + +use futures_core::Future; +use futures_core::Stream; +use futures_io::AsyncRead; +use pin_project_lite::pin_project; + +use super::bufread::Encoder; +use crate::codec::Encode; + +pub trait AsyncFlush { + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll>; +} + +pin_project! { + pub struct FlushableEncoder { + #[pin] + encoder: E, + #[pin] + receiver: futures_channel::mpsc::Receiver<()>, + } +} + +impl FlushableEncoder { + pub fn new(encoder: E, receiver: futures_channel::mpsc::Receiver<()>) -> Self { + Self { encoder, receiver } + } +} + +impl AsyncRead for FlushableEncoder { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + let mut this = self.project(); + + match this.encoder.as_mut().poll_read(cx, buf) { + std::task::Poll::Ready(r) => std::task::Poll::Ready(r), + std::task::Poll::Pending => match this.receiver.as_mut().poll_next(cx) { + std::task::Poll::Pending => std::task::Poll::Pending, + std::task::Poll::Ready(_) => match this.encoder.poll_flush(cx, buf) { + std::task::Poll::Ready(Ok(sz)) => std::task::Poll::Ready(Ok(sz)), + std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)), + std::task::Poll::Pending => std::task::Poll::Pending, + }, + }, + } + } +} diff --git a/src/futures/mod.rs b/src/futures/mod.rs index be7f7ed6..e2cc3b8a 100644 --- a/src/futures/mod.rs +++ b/src/futures/mod.rs @@ -1,4 +1,5 @@ //! Implementations for IO traits exported by `futures`. pub mod bufread; +pub mod flush; pub mod write; diff --git a/tests/utils/test_cases.rs b/tests/utils/test_cases.rs index ef06724c..46199e58 100644 --- a/tests/utils/test_cases.rs +++ b/tests/utils/test_cases.rs @@ -438,17 +438,7 @@ macro_rules! io_test_cases { assert_eq!(output, bytes); } - } - } - } - }; -} -macro_rules! io_flush_test_cases { - ($variant:ident) => { - mod tokio_flush { - mod bufread { - mod compress { use crate::utils::algos::$variant::tokio::bufread::Encoder; use std::time::Duration; @@ -462,6 +452,7 @@ macro_rules! io_flush_test_cases { time, }; + #[cfg(feature = "tokio")] #[tokio::test] //#[ntest::timeout(1000)] async fn test() { @@ -831,8 +822,6 @@ macro_rules! test_cases { #[cfg(feature = "tokio")] io_test_cases!(tokio, $variant); - #[cfg(feature = "tokio")] - io_flush_test_cases!($variant); } }; } From 30589b796c5ee03b33a414a6dae6d59fa7e7d355 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Mon, 24 Oct 2022 11:52:40 +0200 Subject: [PATCH 08/10] remove the timeout based streaming encoder --- Cargo.toml | 4 +-- src/futures/flush.rs | 2 -- src/tokio/flush.rs | 54 ------------------------------------- tests/utils/test_cases.rs | 57 ++------------------------------------- 4 files changed, 4 insertions(+), 113 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5c1c20f0..d857cd28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ zstd-safe = { version = "5.0.1", optional = true, default-features = false } memchr = "2.2.1" tokio-02 = { package = "tokio", version = "0.2.21", optional = true, default-features = false } tokio-03 = { package = "tokio", version = "0.3.0", optional = true, default-features = false } -tokio = { version = "1.0.0", optional = true, default-features = false, features = ["time"] } +tokio = { version = "1.0.0", optional = true, default-features = false } futures-channel = "0.3.24" [dev-dependencies] @@ -64,7 +64,7 @@ bytes-06 = { package = "bytes", version = "0.6.0" } bytes = "1.0.0" tokio-02 = { package = "tokio", version = "0.2.21", default-features = false, features = ["io-util", "stream", "macros", "io-std"] } tokio-03 = { package = "tokio", version = "0.3.0", default-features = false, features = ["io-util", "stream"] } -tokio = { version = "1.0.0", default-features = false, features = ["io-util", "macros", "rt"] } +tokio = { version = "1.0.0", default-features = false, features = ["io-util", "macros", "rt", "time"] } tokio-util-03 = { package = "tokio-util", version = "0.3.0", default-features = false, features = ["codec"] } tokio-util-04 = { package = "tokio-util", version = "0.4.0", default-features = false, features = ["io"] } tokio-util-05 = { package = "tokio-util", version = "0.5.0", default-features = false, features = ["io"] } diff --git a/src/futures/flush.rs b/src/futures/flush.rs index 51ed50a1..a3ea8a25 100644 --- a/src/futures/flush.rs +++ b/src/futures/flush.rs @@ -1,5 +1,3 @@ -use std::{time::Duration, time::Instant}; - use futures_core::Future; use futures_core::Stream; use futures_io::AsyncRead; diff --git a/src/tokio/flush.rs b/src/tokio/flush.rs index 1390d37f..0a9e7c88 100644 --- a/src/tokio/flush.rs +++ b/src/tokio/flush.rs @@ -1,5 +1,3 @@ -use std::{time::Duration, time::Instant}; - use futures_core::Future; use futures_core::Stream; use pin_project_lite::pin_project; @@ -8,58 +6,6 @@ use tokio::io::{AsyncBufRead, AsyncRead}; use super::bufread::Encoder; use crate::codec::Encode; -pin_project! { - pub struct StreamingEncoder { - #[pin] - encoder: E, - duration: Duration, - #[pin] - timeout: tokio::time::Sleep, - } -} - -impl StreamingEncoder { - pub fn new(encoder: E, timeout: Duration) -> Self { - let duration = timeout; - Self { - encoder, - duration, - timeout: tokio::time::sleep(timeout), - } - } -} - -impl AsyncRead for StreamingEncoder { - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { - let mut this = self.project(); - - match this.encoder.as_mut().poll_read(cx, buf) { - std::task::Poll::Ready(r) => { - this.timeout - .reset(tokio::time::Instant::now() + *this.duration); - std::task::Poll::Ready(r) - } - std::task::Poll::Pending => match this.timeout.as_mut().poll(cx) { - std::task::Poll::Pending => std::task::Poll::Pending, - std::task::Poll::Ready(()) => { - this.timeout - .reset(tokio::time::Instant::now() + *this.duration); - match this.encoder.poll_flush(cx, buf) { - std::task::Poll::Ready(Ok(true)) => std::task::Poll::Ready(Ok(())), - std::task::Poll::Ready(Ok(false)) => std::task::Poll::Pending, - std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)), - std::task::Poll::Pending => std::task::Poll::Pending, - } - } - }, - } - } -} - pub trait AsyncFlush { fn poll_flush( self: std::pin::Pin<&mut Self>, diff --git a/tests/utils/test_cases.rs b/tests/utils/test_cases.rs index 46199e58..6857c079 100644 --- a/tests/utils/test_cases.rs +++ b/tests/utils/test_cases.rs @@ -444,65 +444,12 @@ macro_rules! io_test_cases { use std::time::Duration; use async_compression::tokio::flush::FlushableEncoder; - use async_compression::tokio::flush::StreamingEncoder; - use futures::SinkExt; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, time, }; - #[cfg(feature = "tokio")] - #[tokio::test] - //#[ntest::timeout(1000)] - async fn test() { - let (mut client, server) = tokio::io::duplex(1024); - tokio::task::spawn(async move { - loop { - client - .write_all( - &std::iter::repeat(b'A').take(256).collect::>(), - ) - .await - .unwrap(); - println!("sent data: 256 bytes"); - time::sleep(Duration::from_millis(100)).await; - } - }); - - let mut encoder = Encoder::new(tokio::io::BufReader::new(server)); - //if this is commented out, the test will fail - let mut encoder = - Box::pin(StreamingEncoder::new(encoder, Duration::from_millis(250))); - - let mut buf = std::iter::repeat(0u8).take(1024).collect::>(); - - let start = std::time::Instant::now(); - let mut counter = 0usize; - println!("start"); - loop { - let read = encoder.read(&mut buf); - match time::timeout(Duration::from_secs(5), read).await { - Err(e) => { - panic!("{}ms | timeout: {:?}", start.elapsed().as_millis(), e); - } - - Ok(res) => { - println!( - "{}ms | received data: {:?}", - start.elapsed().as_millis(), - res - ); - - counter += 1; - if counter == 10 { - break; - } - } - } - } - } - #[tokio::test] //#[ntest::timeout(1000)] async fn flushable_encoder() { @@ -521,7 +468,7 @@ macro_rules! io_test_cases { }); let (mut tx, rx) = futures_channel::mpsc::channel(1); - let mut encoder = Encoder::new(tokio::io::BufReader::new(server)); + let encoder = Encoder::new(tokio::io::BufReader::new(server)); //if this is commented out, the test will fail let mut encoder = Box::pin(FlushableEncoder::new(encoder, rx)); @@ -530,7 +477,7 @@ macro_rules! io_test_cases { tokio::task::spawn(async move { loop { tokio::time::sleep(Duration::from_millis(250)).await; - tx.send(()).await; + tx.send(()).await.unwrap(); } }); let start = std::time::Instant::now(); From 111ee2c6be715a71f70d3a95d53f4c0df6075b31 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Mon, 24 Oct 2022 12:09:42 +0200 Subject: [PATCH 09/10] missing docs, API fixes --- src/futures/flush.rs | 27 +++++++++++++++++++++++++-- src/tokio/flush.rs | 27 +++++++++++++++++++++++++-- tests/utils/test_cases.rs | 6 +++--- 3 files changed, 53 insertions(+), 7 deletions(-) diff --git a/src/futures/flush.rs b/src/futures/flush.rs index a3ea8a25..d6ee4cb9 100644 --- a/src/futures/flush.rs +++ b/src/futures/flush.rs @@ -1,3 +1,5 @@ +//! Types related to [`AsyncFlush`](AsyncFlush) to wrap encoders + use futures_core::Future; use futures_core::Stream; use futures_io::AsyncRead; @@ -6,7 +8,21 @@ use pin_project_lite::pin_project; use super::bufread::Encoder; use crate::codec::Encode; +/// Flushes asynchronously +/// +/// `AsyncRead` and `AsyncBufRead` implementations may not have enough information +/// to know when to flush the data they have in store, so they can implement this +/// trait and let the caller decide when data should be flushed pub trait AsyncFlush { + /// Attempts to flush in flight data from the `AsyncFlush` into `buf`. + /// + /// On success, returns `Poll::Ready(Ok(()))` and places data in the + /// unfilled portion of `buf`. If no data was read (`buf.filled().len()` is + /// unchanged), it implies that EOF has been reached. + /// + /// If no data is available for reading, the method returns `Poll::Pending` + /// and arranges for the current task (via `cx.waker()`) to receive a + /// notification when the object becomes readable or is closed. fn poll_flush( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -15,6 +31,8 @@ pub trait AsyncFlush { } pin_project! { + /// This structure wraps an `Encoder` implementing [`AsyncRead`](tokio::io::AsyncRead) to + /// allow the caller to flush its buffers. pub struct FlushableEncoder { #[pin] encoder: E, @@ -24,8 +42,13 @@ pin_project! { } impl FlushableEncoder { - pub fn new(encoder: E, receiver: futures_channel::mpsc::Receiver<()>) -> Self { - Self { encoder, receiver } + /// Creates a new `FlushableEncoder` and a channel sender from an existing `Encoder` + /// + /// Whenever a message is sent on the channel, the encoder will flushes its buffers + /// and compress them. + pub fn new(encoder: E) -> (Self, futures_channel::mpsc::Sender<()>) { + let (sender, receiver) = futures_channel::mpsc::channel(1); + (Self { encoder, receiver }, sender) } } diff --git a/src/tokio/flush.rs b/src/tokio/flush.rs index 0a9e7c88..306dd500 100644 --- a/src/tokio/flush.rs +++ b/src/tokio/flush.rs @@ -1,3 +1,5 @@ +//! Types related to [`AsyncFlush`](AsyncFlush) to wrap encoders + use futures_core::Future; use futures_core::Stream; use pin_project_lite::pin_project; @@ -6,7 +8,21 @@ use tokio::io::{AsyncBufRead, AsyncRead}; use super::bufread::Encoder; use crate::codec::Encode; +/// Flushes asynchronously +/// +/// `AsyncRead` and `AsyncBufRead` implementations may not have enough information +/// to know when to flush the data they have in store, so they can implement this +/// trait and let the caller decide when data should be flushed pub trait AsyncFlush { + /// Attempts to flush in flight data from the `AsyncFlush` into `buf`. + /// + /// On success, returns `Poll::Ready(Ok(()))` and places data in the + /// unfilled portion of `buf`. If no data was read (`buf.filled().len()` is + /// unchanged), it implies that EOF has been reached. + /// + /// If no data is available for reading, the method returns `Poll::Pending` + /// and arranges for the current task (via `cx.waker()`) to receive a + /// notification when the object becomes readable or is closed. fn poll_flush( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -15,6 +31,8 @@ pub trait AsyncFlush { } pin_project! { + /// This structure wraps an `Encoder` implementing [`AsyncRead`](tokio::io::AsyncRead) to + /// allow the caller to flush its buffers. pub struct FlushableEncoder { #[pin] encoder: E, @@ -24,8 +42,13 @@ pin_project! { } impl FlushableEncoder { - pub fn new(encoder: E, receiver: futures_channel::mpsc::Receiver<()>) -> Self { - Self { encoder, receiver } + /// Creates a new `FlushableEncoder` and a channel sender from an existing `Encoder` + /// + /// Whenever a message is sent on the channel, the encoder will flushes its buffers + /// and compress them. + pub fn new(encoder: E) -> (Self, futures_channel::mpsc::Sender<()>) { + let (sender, receiver) = futures_channel::mpsc::channel(1); + (Self { encoder, receiver }, sender) } } diff --git a/tests/utils/test_cases.rs b/tests/utils/test_cases.rs index 6857c079..c12a94e1 100644 --- a/tests/utils/test_cases.rs +++ b/tests/utils/test_cases.rs @@ -467,10 +467,10 @@ macro_rules! io_test_cases { } }); - let (mut tx, rx) = futures_channel::mpsc::channel(1); - let encoder = Encoder::new(tokio::io::BufReader::new(server)); + let (encoder, mut tx) = + FlushableEncoder::new(Encoder::new(tokio::io::BufReader::new(server))); //if this is commented out, the test will fail - let mut encoder = Box::pin(FlushableEncoder::new(encoder, rx)); + let mut encoder = Box::pin(encoder); let mut buf = std::iter::repeat(0u8).take(1024).collect::>(); From fdda40e335cf6859c91ba59dc7b1c43d12312533 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Mon, 24 Oct 2022 17:09:12 +0200 Subject: [PATCH 10/10] remove the dependency on futures-channel The FlushableEncoder now accepts a Stream, which will be more flexible than forcing a channel --- Cargo.toml | 2 +- src/futures/flush.rs | 17 ++++++++--------- src/tokio/flush.rs | 17 ++++++++--------- tests/utils/test_cases.rs | 6 +++--- 4 files changed, 20 insertions(+), 22 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d857cd28..e17897e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,13 +50,13 @@ memchr = "2.2.1" tokio-02 = { package = "tokio", version = "0.2.21", optional = true, default-features = false } tokio-03 = { package = "tokio", version = "0.3.0", optional = true, default-features = false } tokio = { version = "1.0.0", optional = true, default-features = false } -futures-channel = "0.3.24" [dev-dependencies] proptest = "1.0.0" proptest-derive = "0.3.0" rand = "0.8.5" futures = "0.3.5" +futures-channel = "0.3.24" futures-test = "0.3.5" ntest = "0.8.1" bytes-05 = { package = "bytes", version = "0.5.0" } diff --git a/src/futures/flush.rs b/src/futures/flush.rs index d6ee4cb9..93653ec5 100644 --- a/src/futures/flush.rs +++ b/src/futures/flush.rs @@ -33,26 +33,25 @@ pub trait AsyncFlush { pin_project! { /// This structure wraps an `Encoder` implementing [`AsyncRead`](tokio::io::AsyncRead) to /// allow the caller to flush its buffers. - pub struct FlushableEncoder { + pub struct FlushableEncoder> { #[pin] encoder: E, #[pin] - receiver: futures_channel::mpsc::Receiver<()>, + receiver: Rx, } } -impl FlushableEncoder { - /// Creates a new `FlushableEncoder` and a channel sender from an existing `Encoder` +impl> FlushableEncoder { + /// Creates a new `FlushableEncoder` from an existing `Encoder` and a Stream /// - /// Whenever a message is sent on the channel, the encoder will flushes its buffers + /// Whenever a message is received from the stream, the encoder will flushes its buffers /// and compress them. - pub fn new(encoder: E) -> (Self, futures_channel::mpsc::Sender<()>) { - let (sender, receiver) = futures_channel::mpsc::channel(1); - (Self { encoder, receiver }, sender) + pub fn new(encoder: E, receiver: Rx) -> Self { + Self { encoder, receiver } } } -impl AsyncRead for FlushableEncoder { +impl> AsyncRead for FlushableEncoder { fn poll_read( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, diff --git a/src/tokio/flush.rs b/src/tokio/flush.rs index 306dd500..3044e2b1 100644 --- a/src/tokio/flush.rs +++ b/src/tokio/flush.rs @@ -33,26 +33,25 @@ pub trait AsyncFlush { pin_project! { /// This structure wraps an `Encoder` implementing [`AsyncRead`](tokio::io::AsyncRead) to /// allow the caller to flush its buffers. - pub struct FlushableEncoder { + pub struct FlushableEncoder> { #[pin] encoder: E, #[pin] - receiver: futures_channel::mpsc::Receiver<()>, + receiver: Rx, } } -impl FlushableEncoder { - /// Creates a new `FlushableEncoder` and a channel sender from an existing `Encoder` +impl> FlushableEncoder { + /// Creates a new `FlushableEncoder` from an existing `Encoder` and a Stream /// - /// Whenever a message is sent on the channel, the encoder will flushes its buffers + /// Whenever a message is received from the stream, the encoder will flushes its buffers /// and compress them. - pub fn new(encoder: E) -> (Self, futures_channel::mpsc::Sender<()>) { - let (sender, receiver) = futures_channel::mpsc::channel(1); - (Self { encoder, receiver }, sender) + pub fn new(encoder: E, receiver: Rx) -> Self { + Self { encoder, receiver } } } -impl AsyncRead for FlushableEncoder { +impl> AsyncRead for FlushableEncoder { fn poll_read( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, diff --git a/tests/utils/test_cases.rs b/tests/utils/test_cases.rs index c12a94e1..6857c079 100644 --- a/tests/utils/test_cases.rs +++ b/tests/utils/test_cases.rs @@ -467,10 +467,10 @@ macro_rules! io_test_cases { } }); - let (encoder, mut tx) = - FlushableEncoder::new(Encoder::new(tokio::io::BufReader::new(server))); + let (mut tx, rx) = futures_channel::mpsc::channel(1); + let encoder = Encoder::new(tokio::io::BufReader::new(server)); //if this is commented out, the test will fail - let mut encoder = Box::pin(encoder); + let mut encoder = Box::pin(FlushableEncoder::new(encoder, rx)); let mut buf = std::iter::repeat(0u8).take(1024).collect::>();