Skip to content

Commit

Permalink
feat(mapNotNull, whereNotNull): add mapNotNull and whereNotNull (#548)
Browse files Browse the repository at this point in the history
* added mapNotNull

* add where_not_null.dart

* docs

* docs

* exports

* README.md

* tests

* tests

* where_not_null_test.dart

* rename

* fix timer_test.dart

* fix timer_test.dart

* deps: update some dev deps to stable null safety release.

* update tests

* fix

* sort imports

* update impl
  • Loading branch information
hoc081098 authored May 27, 2022
1 parent fc0083a commit a8acf0d
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 3 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ Stream.fromIterable([1, 2, 3])
- [flatMapIterable](https://pub.dev/documentation/rxdart/latest/rx/FlatMapExtension/flatMapIterable.html)
- [groupBy](https://pub.dev/documentation/rxdart/latest/rx/GroupByExtension/groupBy.html)
- [interval](https://pub.dev/documentation/rxdart/latest/rx/IntervalExtension/interval.html)
- [mapNotNull](https://pub.dev/documentation/rxdart/latest/rx/MapNotNullExtension/mapNotNull.html)
- [mapTo](https://pub.dev/documentation/rxdart/latest/rx/MapToExtension/mapTo.html)
- [materialize](https://pub.dev/documentation/rxdart/latest/rx/MaterializeExtension/materialize.html)
- [max](https://pub.dev/documentation/rxdart/latest/rx/MaxExtension/max.html)
Expand All @@ -175,6 +176,7 @@ Stream.fromIterable([1, 2, 3])
- [throttleTime](https://pub.dev/documentation/rxdart/latest/rx/ThrottleExtensions/throttleTime.html)
- [timeInterval](https://pub.dev/documentation/rxdart/latest/rx/TimeIntervalExtension/timeInterval.html)
- [timestamp](https://pub.dev/documentation/rxdart/latest/rx/TimeStampExtension/timestamp.html)
- [whereNotNull](https://pub.dev/documentation/rxdart/latest/rx/WhereNotNullExtension/whereNotNull.html)
- [whereType](https://pub.dev/documentation/rxdart/latest/rx/WhereTypeExtension/whereType.html)
- [window](https://pub.dev/documentation/rxdart/latest/rx/WindowExtensions/window.html)
- [windowCount](https://pub.dev/documentation/rxdart/latest/rx/WindowExtensions/windowCount.html)
Expand Down
4 changes: 1 addition & 3 deletions lib/rxdart.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
library rx;

export 'src/rx.dart';
export 'src/utils/composite_subscription.dart';
export 'src/utils/error_and_stacktrace.dart';
export 'src/utils/notification.dart';
export 'streams.dart';
export 'subjects.dart';
export 'transformers.dart';
export 'utils.dart';
75 changes: 75 additions & 0 deletions lib/src/transformers/map_not_null.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import 'dart:async';

class _MapNotNullSink<T, R extends Object> implements EventSink<T> {
final R? Function(T) _transform;
final EventSink<R> _outputSink;

_MapNotNullSink(this._outputSink, this._transform);

@override
void add(T event) {
final value = _transform(event);
if (value != null) {
_outputSink.add(value);
}
}

@override
void addError(Object error, [StackTrace? stackTrace]) =>
_outputSink.addError(error, stackTrace);

@override
void close() => _outputSink.close();
}

/// Create a Stream containing only the non-`null` results
/// of applying the given [transform] function to each element of the Stream.
///
/// ### Example
///
/// Stream.fromIterable(['1', 'two', '3', 'four'])
/// .transform(MapNotNullStreamTransformer(int.tryParse))
/// .listen(print); // prints 1, 3
///
/// // equivalent to:
///
/// Stream.fromIterable(['1', 'two', '3', 'four'])
/// .map(int.tryParse)
/// .transform(WhereTypeStreamTransformer<int?, int>())
/// .listen(print); // prints 1, 3
class MapNotNullStreamTransformer<T, R extends Object>
extends StreamTransformerBase<T, R> {
/// A function that transforms each elements of the Stream.
final R? Function(T) transform;

/// Constructs a [StreamTransformer] which emits non-`null` elements
/// of applying the given [transform] function to each element of the Stream.
const MapNotNullStreamTransformer(this.transform);

@override
Stream<R> bind(Stream<T> stream) => Stream<R>.eventTransformed(
stream, (sink) => _MapNotNullSink<T, R>(sink, transform));
}

/// Extends the Stream class with the ability to convert the source Stream
/// to a Stream containing only the non-`null` results
/// of applying the given [transform] function to each element of this Stream.
extension MapNotNullExtension<T> on Stream<T> {
/// Returns a Stream containing only the non-`null` results
/// of applying the given [transform] function to each element of this Stream.
///
/// ### Example
///
/// Stream.fromIterable(['1', 'two', '3', 'four'])
/// .mapNotNull(int.tryParse)
/// .listen(print); // prints 1, 3
///
/// // equivalent to:
///
/// Stream.fromIterable(['1', 'two', '3', 'four'])
/// .map(int.tryParse)
/// .whereType<int>()
/// .listen(print); // prints 1, 3
Stream<R> mapNotNull<R extends Object>(R? Function(T) transform) =>
MapNotNullStreamTransformer<T, R>(transform).bind(this);
}
65 changes: 65 additions & 0 deletions lib/src/transformers/where_not_null.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import 'dart:async';

class _WhereNotNullStreamSink<T extends Object> implements EventSink<T?> {
final EventSink<T> _outputSink;

_WhereNotNullStreamSink(this._outputSink);

@override
void add(T? event) {
if (event != null) {
_outputSink.add(event);
}
}

@override
void addError(Object error, [StackTrace? stackTrace]) =>
_outputSink.addError(error, stackTrace);

@override
void close() => _outputSink.close();
}

/// Create a Stream which emits all the non-`null` elements of the Stream,
/// in their original emission order.
///
/// ### Example
///
/// Stream.fromIterable(<int?>[1, 2, 3, null, 4, null])
/// .transform(WhereNotNullStreamTransformer())
/// .listen(print); // prints 1, 2, 3, 4
///
/// // equivalent to:
///
/// Stream.fromIterable(<int?>[1, 2, 3, null, 4, null])
/// .transform(WhereTypeStreamTransformer<int?, int>())
/// .listen(print); // prints 1, 2, 3, 4
class WhereNotNullStreamTransformer<T extends Object>
extends StreamTransformerBase<T?, T> {
@override
Stream<T> bind(Stream<T?> stream) => Stream<T>.eventTransformed(
stream, (sink) => _WhereNotNullStreamSink<T>(sink));
}

/// Extends the Stream class with the ability to convert the source Stream
/// to a Stream which emits all the non-`null` elements
/// of this Stream, in their original emission order.
extension WhereNotNullExtension<T extends Object> on Stream<T?> {
/// Returns a Stream which emits all the non-`null` elements
/// of this Stream, in their original emission order.
///
/// For a `Stream<T?>`, this method is equivalent to `.whereType<T>()`.
///
/// ### Example
///
/// Stream.fromIterable(<int?>[1, 2, 3, null, 4, null])
/// .whereNotNull()
/// .listen(print); // prints 1, 2, 3, 4
///
/// // equivalent to:
///
/// Stream.fromIterable(<int?>[1, 2, 3, null, 4, null])
/// .whereType<int>()
/// .listen(print); // prints 1, 2, 3, 4
Stream<T> whereNotNull() => WhereNotNullStreamTransformer<T>().bind(this);
}
2 changes: 2 additions & 0 deletions lib/transformers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export 'src/transformers/flat_map.dart';
export 'src/transformers/group_by.dart';
export 'src/transformers/ignore_elements.dart';
export 'src/transformers/interval.dart';
export 'src/transformers/map_not_null.dart';
export 'src/transformers/map_to.dart';
export 'src/transformers/materialize.dart';
export 'src/transformers/max.dart';
Expand All @@ -36,5 +37,6 @@ export 'src/transformers/take_until.dart';
export 'src/transformers/take_while_inclusive.dart';
export 'src/transformers/time_interval.dart';
export 'src/transformers/timestamp.dart';
export 'src/transformers/where_not_null.dart';
export 'src/transformers/where_type.dart';
export 'src/transformers/with_latest_from.dart';
5 changes: 5 additions & 0 deletions lib/utils.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
library rx_utils;

export 'src/utils/composite_subscription.dart';
export 'src/utils/error_and_stacktrace.dart';
export 'src/utils/notification.dart';
4 changes: 4 additions & 0 deletions test/rxdart_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import 'transformers/group_by_test.dart' as group_by_test;
import 'transformers/ignore_elements_test.dart' as ignore_elements_test;
import 'transformers/interval_test.dart' as interval_test;
import 'transformers/join_test.dart' as join_test;
import 'transformers/map_not_null_test.dart' as map_not_null_test;
import 'transformers/map_to_test.dart' as map_to_test;
import 'transformers/materialize_test.dart' as materialize_test;
import 'transformers/merge_with_test.dart' as merge_with_test;
Expand All @@ -84,6 +85,7 @@ import 'transformers/take_while_inclusive_test.dart'
import 'transformers/time_interval_test.dart' as time_interval_test;
import 'transformers/timeout_test.dart' as timeout_test;
import 'transformers/timestamp_test.dart' as timestamp_test;
import 'transformers/where_not_null_test.dart' as where_not_null_test;
import 'transformers/where_type_test.dart' as where_type_test;
import 'transformers/with_latest_from_test.dart' as with_latest_from_test;
import 'transformers/zip_with_test.dart' as zip_with_test;
Expand Down Expand Up @@ -126,6 +128,7 @@ void main() {
ignore_elements_test.main();
interval_test.main();
join_test.main();
map_not_null_test.main();
map_to_test.main();
materialize_test.main();
merge_with_test.main();
Expand All @@ -146,6 +149,7 @@ void main() {
timeout_test.main();
timestamp_test.main();
timer_test.main();
where_not_null_test.main();
where_type_test.main();
with_latest_from_test.main();
zip_with_test.main();
Expand Down
92 changes: 92 additions & 0 deletions test/transformers/map_not_null_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import 'dart:async';

import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';

void main() {
test('Rx.mapNotNull', () {
expect(
Stream.fromIterable(['1', '2', 'invalid_num', '3', 'invalid_num', '4'])
.mapNotNull(int.tryParse),
emitsInOrder(<int>[1, 2, 3, 4]));

// 0-----1-----2-----3-----...-----8-----9-----|
// 1-----null--3-----null--...-----9-----null--|
// 1--3--5--7--9--|
final stream = Stream.periodic(const Duration(milliseconds: 10), (i) => i)
.take(10)
.transform(MapNotNullStreamTransformer((i) => i.isOdd ? null : i + 1));
expect(stream, emitsInOrder(<Object>[1, 3, 5, 7, 9, emitsDone]));
});

test('Rx.mapNotNull.shouldThrowA', () {
expect(
Stream<bool>.error(Exception()).mapNotNull((_) => true),
emitsError(isA<Exception>()),
);

expect(
Rx.concat<int>([
Stream.fromIterable([1, 2]),
Stream.error(Exception()),
Stream.value(3),
]).mapNotNull((i) => i.isEven ? i + 1 : null),
emitsInOrder(<dynamic>[
3,
emitsError(isException),
emitsDone,
]),
);
});

test('Rx.mapNotNull.shouldThrowB', () {
expect(
Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).mapNotNull((i) {
if (i == 4) throw Exception();
return i.isEven ? i + 1 : null;
}),
emitsInOrder(<dynamic>[
3,
emitsError(isException),
7,
9,
11,
emitsDone,
]),
);
});

test('Rx.mapNotNull.asBroadcastStream', () {
final stream = Stream.fromIterable([2, 3, 4, 5, 6])
.mapNotNull<int>((i) => null)
.asBroadcastStream();

// listen twice on same stream
stream.listen(null);
stream.listen(null);

// code should reach here
expect(true, true);
});

test('Rx.mapNotNull.singleSubscription', () {
final stream = StreamController<int>().stream.mapNotNull((i) => i);

expect(stream.isBroadcast, isFalse);
stream.listen(null);
expect(() => stream.listen(null), throwsStateError);
});

test('Rx.mapNotNull.pause.resume', () async {
final subscription =
Stream.fromIterable([2, 3, 4, 5, 6]).mapNotNull((i) => i).listen(null);

subscription
..pause()
..onData(expectAsync1((data) {
expect(data, 2);
subscription.cancel();
}))
..resume();
});
}
79 changes: 79 additions & 0 deletions test/transformers/where_not_null_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import 'dart:async';

import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';

void main() {
test('Rx.whereNotNull', () {
{
final notNull = Stream.fromIterable([1, 2, 3, 4]).whereNotNull();

expect(notNull, isA<Stream<int>>());
expect(notNull, emitsInOrder(<int>[1, 2, 3, 4]));
}

{
final notNull = Stream.fromIterable([1, 2, null, 3, 4, null])
.transform(WhereNotNullStreamTransformer());

expect(notNull, isA<Stream<int>>());
expect(notNull, emitsInOrder(<int>[1, 2, 3, 4]));
}
});

test('Rx.whereNotNull.shouldThrow', () {
expect(
Stream<bool>.error(Exception()).whereNotNull(),
emitsError(isA<Exception>()),
);

expect(
Rx.concat<int?>([
Stream.fromIterable([1, 2, null]),
Stream.error(Exception()),
Stream.value(3),
]).whereNotNull(),
emitsInOrder(<dynamic>[
1,
2,
emitsError(isException),
3,
emitsDone,
]),
);
});

test('Rx.whereNotNull.asBroadcastStream', () {
final stream =
Stream.fromIterable([1, 2, null]).whereNotNull().asBroadcastStream();

// listen twice on same stream
stream.listen(null);
stream.listen(null);

// code should reach here
expect(true, true);
});

test('Rx.whereNotNull.singleSubscription', () {
final stream = StreamController<int?>().stream.whereNotNull();

expect(stream.isBroadcast, isFalse);
stream.listen(null);
expect(() => stream.listen(null), throwsStateError);
});

test('Rx.whereNotNull.pause.resume', () async {
final subscription = Stream.fromIterable([null, 2, 3, null, 4, 5, 6])
.whereNotNull()
.listen(null);

subscription
..pause()
..onData(expectAsync1((data) {
expect(data, 2);
subscription.cancel();
}))
..resume();
});
}

0 comments on commit a8acf0d

Please sign in to comment.