Skip to content

Commit

Permalink
refactor(BehaviorSubject, ReplaySubject): internally using Sentinel o…
Browse files Browse the repository at this point in the history
…bject instead ValueWrapper (#631)

* rm wrapper

* rm wrapper

* docs

* docs
  • Loading branch information
hoc081098 authored Sep 22, 2021
1 parent ac0f56b commit c69bb00
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 82 deletions.
1 change: 0 additions & 1 deletion lib/rxdart.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ export 'src/streams/connectable_stream.dart';
export 'src/utils/composite_subscription.dart';
export 'src/utils/error_and_stacktrace.dart';
export 'src/utils/notification.dart';
export 'src/utils/value_wrapper.dart';
export 'streams.dart';
export 'subjects.dart';
export 'transformers.dart';
24 changes: 11 additions & 13 deletions lib/src/subjects/behavior_subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import 'package:rxdart/src/streams/value_stream.dart';
import 'package:rxdart/src/subjects/subject.dart';
import 'package:rxdart/src/transformers/start_with.dart';
import 'package:rxdart/src/transformers/start_with_error.dart';
import 'package:rxdart/src/utils/empty.dart';
import 'package:rxdart/src/utils/error_and_stacktrace.dart';
import 'package:rxdart/src/utils/value_wrapper.dart';

/// A special StreamController that captures the latest item that has been
/// added to the controller, and emits that as the first item to any new
Expand Down Expand Up @@ -116,9 +116,9 @@ class BehaviorSubject<T> extends Subject<T> implements ValueStream<T> {
}

final value = wrapper.value;
if (value != null && wrapper.isValue) {
if (isNotEmpty(value) && wrapper.isValue) {
return controller.stream
.transform(StartWithStreamTransformer(value.value));
.transform(StartWithStreamTransformer(value as T));
}

return controller.stream;
Expand All @@ -135,19 +135,19 @@ class BehaviorSubject<T> extends Subject<T> implements ValueStream<T> {
ValueStream<T> get stream => this;

@override
bool get hasValue => _wrapper.value != null;
bool get hasValue => isNotEmpty(_wrapper.value);

@override
T get value {
final wrapper = _wrapper.value;
if (wrapper != null) {
return wrapper.value;
final value = _wrapper.value;
if (isNotEmpty(value)) {
return value as T;
}
throw ValueStreamError.hasNoValue();
}

@override
T? get valueOrNull => _wrapper.value?.value;
T? get valueOrNull => unbox(_wrapper.value);

/// Set and emit the new value.
set value(T newValue) => add(newValue);
Expand All @@ -173,18 +173,16 @@ class BehaviorSubject<T> extends Subject<T> implements ValueStream<T> {

class _Wrapper<T> {
bool isValue;
ValueWrapper<T>? value;
var value = EMPTY;
ErrorAndStackTrace? errorAndStackTrace;

/// Non-seeded constructor
_Wrapper() : isValue = false;

_Wrapper.seeded(T value)
: value = ValueWrapper(value),
isValue = true;
_Wrapper.seeded(this.value) : isValue = true;

void setValue(T event) {
value = ValueWrapper(event);
value = event;
isValue = true;
}

Expand Down
56 changes: 28 additions & 28 deletions lib/src/subjects/replay_subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import 'package:rxdart/src/streams/replay_stream.dart';
import 'package:rxdart/src/subjects/subject.dart';
import 'package:rxdart/src/transformers/start_with.dart';
import 'package:rxdart/src/transformers/start_with_error.dart';
import 'package:rxdart/src/utils/collection_extensions.dart';
import 'package:rxdart/src/utils/empty.dart';
import 'package:rxdart/src/utils/error_and_stacktrace.dart';
import 'package:rxdart/src/utils/value_wrapper.dart';

/// A special StreamController that captures all of the items that have been
/// added to the controller, and emits those as the first items to any new
Expand Down Expand Up @@ -73,22 +74,24 @@ class ReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
return ReplaySubject<T>._(
controller,
Rx.defer<T>(
() => queue.toList(growable: false).reversed.fold(controller.stream,
(stream, event) {
if (event.isError) {
final errorAndStackTrace = event.errorAndStackTrace!;

return stream.transform(
StartWithErrorStreamTransformer(
errorAndStackTrace.error,
errorAndStackTrace.stackTrace,
),
);
} else {
return stream
.transform(StartWithStreamTransformer(event.data!.value));
}
}),
() => queue.toList(growable: false).reversed.fold(
controller.stream,
(stream, event) {
final errorAndStackTrace = event.errorAndStackTrace;

if (errorAndStackTrace != null) {
return stream.transform(
StartWithErrorStreamTransformer(
errorAndStackTrace.error,
errorAndStackTrace.stackTrace,
),
);
} else {
return stream
.transform(StartWithStreamTransformer(event.data as T));
}
},
),
reusable: true,
),
queue,
Expand Down Expand Up @@ -123,33 +126,30 @@ class ReplaySubject<T> extends Subject<T> implements ReplayStream<T> {

@override
List<T> get values => _queue
.where((event) => !event.isError)
.map((event) => event.data!.value)
.where((event) => event.errorAndStackTrace == null)
.map((event) => event.data as T)
.toList(growable: false);

@override
List<Object> get errors => _queue
.where((event) => event.isError)
.map((event) => event.errorAndStackTrace!.error)
.mapNotNull((event) => event.errorAndStackTrace?.error)
.toList(growable: false);

@override
List<StackTrace?> get stackTraces => _queue
.where((event) => event.isError)
.where((event) => event.errorAndStackTrace != null)
.map((event) => event.errorAndStackTrace!.stackTrace)
.toList(growable: false);
}

class _Event<T> {
final bool isError;
final ValueWrapper<T>? data;
final Object? data;
final ErrorAndStackTrace? errorAndStackTrace;

_Event._({required this.isError, this.data, this.errorAndStackTrace});
_Event._({required this.data, required this.errorAndStackTrace});

factory _Event.data(T data) =>
_Event._(isError: false, data: ValueWrapper(data));
factory _Event.data(T data) => _Event._(data: data, errorAndStackTrace: null);

factory _Event.error(ErrorAndStackTrace e) =>
_Event._(isError: true, errorAndStackTrace: e);
_Event._(errorAndStackTrace: e, data: EMPTY);
}
10 changes: 1 addition & 9 deletions lib/src/transformers/backpressure/backpressure.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'dart:async';
import 'dart:collection';

import 'package:rxdart/src/utils/collection_extensions.dart';
import 'package:rxdart/src/utils/forwarding_sink.dart';
import 'package:rxdart/src/utils/forwarding_stream.dart';

Expand Down Expand Up @@ -354,12 +355,3 @@ class BackpressureStreamTransformer<S, T> extends StreamTransformerBase<S, T> {
),
);
}

extension _RemoveFirstNQueueExtension<T> on Queue<T> {
/// Removes the first [count] elements of this queue.
void removeFirstElements(int count) {
for (var i = 0; i < count; i++) {
removeFirst();
}
}
}
30 changes: 30 additions & 0 deletions lib/src/utils/collection_extensions.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import 'dart:collection';

/// Provides [mapNotNull] extension method on [Iterable].
extension MapNotNullIterableExtension<T> on Iterable<T> {
/// The non-`null` results of calling [transform] on the elements of [this].
///
/// Returns a lazy iterable which calls [transform]
/// on the elements of this iterable in iteration order,
/// then emits only the non-`null` values.
///
/// If [transform] throws, the iteration is terminated.
Iterable<R> mapNotNull<R>(R? Function(T) transform) sync* {
for (final e in this) {
final v = transform(e);
if (v != null) {
yield v;
}
}
}
}

/// Provides [removeFirstElements] extension method on [Queue].
extension RemoveFirstElementsQueueExtension<T> on Queue<T> {
/// Removes the first [count] elements of this queue.
void removeFirstElements(int count) {
for (var i = 0; i < count; i++) {
removeFirst();
}
}
}
18 changes: 18 additions & 0 deletions lib/src/utils/empty.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class _Empty {
const _Empty();

@override
String toString() => '<<EMPTY>>';
}

/// @internal
/// Sentinel object used to represent a missing value (distinct from `null`).
const Object? EMPTY = _Empty(); // ignore: constant_identifier_names

/// @internal
/// Returns `null` if [o] is [EMPTY], otherwise returns itself.
T? unbox<T>(Object? o) => identical(o, EMPTY) ? null : o as T;

/// @internal
/// Returns `true` if [o] is not [EMPTY].
bool isNotEmpty(Object? o) => !identical(o, EMPTY);
21 changes: 12 additions & 9 deletions lib/src/utils/notification.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import 'package:rxdart/src/utils/empty.dart';
import 'package:rxdart/src/utils/error_and_stacktrace.dart';
import 'package:rxdart/src/utils/value_wrapper.dart';

/// The type of event used in [Notification]
enum Kind {
Expand All @@ -23,8 +23,8 @@ class Notification<T> {
/// References the [Kind] of this [Notification] event.
final Kind kind;

/// The wrapped value, if applicable
final ValueWrapper<T>? _value;
/// The data value, if applicable
final Object? _value;

/// The wrapped error and stack trace, if applicable
final ErrorAndStackTrace? errorAndStackTrace;
Expand All @@ -36,15 +36,15 @@ class Notification<T> {

/// Constructs a [Notification] with [Kind.onData] and wraps a [value]
factory Notification.onData(T value) =>
Notification<T>(Kind.onData, ValueWrapper(value), null);
Notification<T>(Kind.onData, value, null);

/// Constructs a [Notification] with [Kind.onDone]
factory Notification.onDone() => const Notification(Kind.onDone, null, null);
factory Notification.onDone() => const Notification(Kind.onDone, EMPTY, null);

/// Constructs a [Notification] with [Kind.onError] and wraps an [error] and [stackTrace]
factory Notification.onError(Object error, StackTrace? stackTrace) =>
Notification<T>(
Kind.onError, null, ErrorAndStackTrace(error, stackTrace));
Kind.onError, EMPTY, ErrorAndStackTrace(error, stackTrace));

@override
bool operator ==(Object other) =>
Expand All @@ -61,7 +61,7 @@ class Notification<T> {

@override
String toString() =>
'Notification{kind: $kind, value: ${_value?.value}, errorAndStackTrace: $errorAndStackTrace}';
'Notification{kind: $kind, value: $_value, errorAndStackTrace: $errorAndStackTrace}';

/// A test to determine if this [Notification] wraps an onData event
bool get isOnData => kind == Kind.onData;
Expand All @@ -72,6 +72,9 @@ class Notification<T> {
/// A test to determine if this [Notification] wraps an error event
bool get isOnError => kind == Kind.onError;

/// Returns data if [kind] is [Kind.onData], otherwise throws `"Null check operator used on a null value"` error.
T get requireData => _value!.value;
/// Returns data if [kind] is [Kind.onData], otherwise throws a [StateError] error.
T get requireData => isNotEmpty(_value)
? _value as T
: (throw StateError(
'This notification has no data value, because its kind is $kind'));
}
22 changes: 0 additions & 22 deletions lib/src/utils/value_wrapper.dart

This file was deleted.

0 comments on commit c69bb00

Please sign in to comment.