From c8f3539bc11e57843745c68ee60ca5276248f9f9 Mon Sep 17 00:00:00 2001 From: sharpened-nacho <166434451+sharpened-nacho@users.noreply.github.com> Date: Tue, 2 Jul 2024 20:49:34 +0000 Subject: [PATCH] stream: make stream adapters public (#6658) --- tokio-stream/src/lib.rs | 16 ++++++++++++++- tokio-stream/src/stream_ext.rs | 32 +++++++++++++++--------------- tokio-stream/tests/stream_chain.rs | 11 +++++++++- 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/tokio-stream/src/lib.rs b/tokio-stream/src/lib.rs index 11ccd8c6aee..21f3fc92943 100644 --- a/tokio-stream/src/lib.rs +++ b/tokio-stream/src/lib.rs @@ -81,8 +81,22 @@ pub mod wrappers; mod stream_ext; pub use stream_ext::{collect::FromStream, StreamExt}; +/// Adapters for [`Stream`]s created by methods in [`StreamExt`]. +pub mod adapters { + pub use crate::stream_ext::{ + Chain, Filter, FilterMap, Fuse, Map, MapWhile, Merge, Peekable, Skip, SkipWhile, Take, + TakeWhile, Then, + }; + cfg_time! { + pub use crate::stream_ext::{ChunksTimeout, Timeout, TimeoutRepeating}; + } +} + cfg_time! { - pub use stream_ext::timeout::{Elapsed, Timeout}; + #[deprecated = "Import those symbols from adapters instead"] + #[doc(hidden)] + pub use stream_ext::timeout::Timeout; + pub use stream_ext::timeout::Elapsed; } mod empty; diff --git a/tokio-stream/src/stream_ext.rs b/tokio-stream/src/stream_ext.rs index cb640603dd2..cdbada30bc5 100644 --- a/tokio-stream/src/stream_ext.rs +++ b/tokio-stream/src/stream_ext.rs @@ -8,66 +8,66 @@ mod any; use any::AnyFuture; mod chain; -use chain::Chain; +pub use chain::Chain; pub(crate) mod collect; use collect::{Collect, FromStream}; mod filter; -use filter::Filter; +pub use filter::Filter; mod filter_map; -use filter_map::FilterMap; +pub use filter_map::FilterMap; mod fold; use fold::FoldFuture; mod fuse; -use fuse::Fuse; +pub use fuse::Fuse; mod map; -use map::Map; +pub use map::Map; mod map_while; -use map_while::MapWhile; +pub use map_while::MapWhile; mod merge; -use merge::Merge; +pub use merge::Merge; mod next; use next::Next; mod skip; -use skip::Skip; +pub use skip::Skip; mod skip_while; -use skip_while::SkipWhile; +pub use skip_while::SkipWhile; mod take; -use take::Take; +pub use take::Take; mod take_while; -use take_while::TakeWhile; +pub use take_while::TakeWhile; mod then; -use then::Then; +pub use then::Then; mod try_next; use try_next::TryNext; mod peekable; -use peekable::Peekable; +pub use peekable::Peekable; cfg_time! { pub(crate) mod timeout; pub(crate) mod timeout_repeating; - use timeout::Timeout; - use timeout_repeating::TimeoutRepeating; + pub use timeout::Timeout; + pub use timeout_repeating::TimeoutRepeating; use tokio::time::{Duration, Interval}; mod throttle; use throttle::{throttle, Throttle}; mod chunks_timeout; - use chunks_timeout::ChunksTimeout; + pub use chunks_timeout::ChunksTimeout; } /// An extension trait for the [`Stream`] trait that provides a variety of diff --git a/tokio-stream/tests/stream_chain.rs b/tokio-stream/tests/stream_chain.rs index f3b7edb16a6..4b47ce7a905 100644 --- a/tokio-stream/tests/stream_chain.rs +++ b/tokio-stream/tests/stream_chain.rs @@ -6,13 +6,14 @@ mod support { } use support::mpsc; +use tokio_stream::adapters::Chain; #[tokio::test] async fn basic_usage() { let one = stream::iter(vec![1, 2, 3]); let two = stream::iter(vec![4, 5, 6]); - let mut stream = one.chain(two); + let mut stream = visibility_test(one, two); assert_eq!(stream.size_hint(), (6, Some(6))); assert_eq!(stream.next().await, Some(1)); @@ -39,6 +40,14 @@ async fn basic_usage() { assert_eq!(stream.next().await, None); } +fn visibility_test(s1: S1, s2: S2) -> Chain +where + S1: Stream, + S2: Stream, +{ + s1.chain(s2) +} + #[tokio::test] async fn pending_first() { let (tx1, rx1) = mpsc::unbounded_channel_stream();