Skip to content

Commit

Permalink
v2.1.1 (#6)
Browse files Browse the repository at this point in the history
* add dispatchMany and extensions

* docs

* cancel subscription
  • Loading branch information
hoc081098 committed Oct 30, 2020
1 parent 9048d31 commit 5859efe
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 6 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 2.1.1 - Oct 30, 2020

- Add `RxReduxStore.dispatchMany(Stream<A>)`: Dispatch a `Stream` of actions to store.
- Add extension method `dispatchTo` on `A` and `Stream<A>`, eg: `anAction.dispatchTo(store)`, `streamOfActions.dispatchTo(store)`.

## 2.1.0 - Aug 28, 2020

- State stream returned from `RxReduxStore` will not replay the latest state
Expand Down
48 changes: 43 additions & 5 deletions lib/src/store.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ SideEffect<A, S> _onEachActionSideEffect<A, S>(StreamSink<A> outputSink) {
/// Redux store based on [Stream].
class RxReduxStore<A, S> {
final void Function(A) _dispatch;
final void Function(Stream<A>) _dispatchMany;

final GetState<S> _getState;
final Stream<S> _stateStream;
Expand All @@ -38,6 +39,7 @@ class RxReduxStore<A, S> {

const RxReduxStore._(
this._dispatch,
this._dispatchMany,
this._getState,
this._stateStream,
this._actionStream,
Expand Down Expand Up @@ -80,19 +82,26 @@ class RxReduxStore<A, S> {
.asBroadcastStream(onCancel: (subscription) => subscription.cancel());

var currentState = initialState;
final subscription = stateStream.listen(
(newState) => currentState = newState,
onError: errorHandler,
);
final subscriptions = <StreamSubscription<Object>>[
stateStream.listen(
(newState) => currentState = newState,
onError: errorHandler,
),
];

return RxReduxStore._(
actionController.add,
(actions) => subscriptions.add(actions.listen(actionController.add)),
() => currentState,
stateStream,
actionOutputController.stream,
() async {
if (subscriptions.length == 1) {
await subscriptions[0].cancel();
} else {
await Future.wait(subscriptions.map((s) => s.cancel()));
}
await actionController.close();
await subscription.cancel();
},
);
}
Expand Down Expand Up @@ -169,6 +178,21 @@ class RxReduxStore<A, S> {
/// store.dispatch(SubmitLogin());
void dispatch(A action) => _dispatch(action);

/// Dispatch [Stream] of actions to store.
///
/// The [StreamSubscription] from listening [actionStream]
/// will be cancelled when calling [dispose].
/// Therefore, don't forget to call [dispose] to avoid memory leaks.
///
/// ### Example:
///
/// abstract class Action {}
/// class LoadNextPageAction implements Action {}
///
/// Stream<LoadNextPageAction> loadNextPageActionStream;
/// store.dispatchMany(loadNextPageActionStream);
void dispatchMany(Stream<A> actionStream) => _dispatchMany(actionStream);

/// Dispose all resources.
/// This method is typically called in `dispose` method of Flutter `State` object.
///
Expand All @@ -183,3 +207,17 @@ class RxReduxStore<A, S> {
/// }
Future<void> dispose() => _dispose();
}

/// Dispatch this action to [store].
extension DispatchToExtension<A> on A {
/// Dispatch this action to [store].
/// See [RxReduxStore.dispatch].
void dispatchTo<S>(RxReduxStore<A, S> store) => store.dispatch(this);
}

/// /// Dispatch this actions [Stream] to [store].
extension DispatchToStreamExtension<A> on Stream<A> {
/// Dispatch this actions [Stream] to [store].
/// See [RxReduxStore.dispatchMany].
void dispatchTo<S>(RxReduxStore<A, S> store) => store.dispatchMany(this);
}
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: rx_redux
description: Redux implementation based on Dart Stream, with the power of RxDart. Reactive redux store for Dart & Flutter.
version: 2.1.0
version: 2.1.1
author: Petrus Nguyễn Thái Học <hoc081098@gmail.com>
homepage: https://github.com/hoc081098/rx_redux.git
repository: https://github.com/hoc081098/rx_redux.git
Expand Down
46 changes: 46 additions & 0 deletions test/store_test.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'package:rx_redux/rx_redux.dart';
import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';

enum Action {
Expand Down Expand Up @@ -334,5 +335,50 @@ void main() {
..dispatch(1)
..dispatch(2);
});

test('Dispatch many', () async {
{
final store = RxReduxStore<int, int>(
initialState: 0,
sideEffects: [],
reducer: (s, a) => s + a,
);

store.dispatchMany(Rx.range(0, 100));
await delay(200);

expect(store.state, 100 * 101 ~/ 2);
await store.dispose();
}

{
final store = RxReduxStore<int, int>(
initialState: 0,
sideEffects: [],
reducer: (s, a) => s + a,
);

Rx.range(0, 100).dispatchTo(store);
await delay(200);

expect(store.state, 100 * 101 ~/ 2);
await store.dispose();
}
});

test('Action.dispatchTo extension method', () async {
final store = RxReduxStore<int, int>(
initialState: 0,
sideEffects: [],
reducer: (s, a) => s + a,
);

1.dispatchTo(store);
2.dispatchTo(store);
await delay(100);

expect(store.state, 3);
await store.dispose();
});
});
}

0 comments on commit 5859efe

Please sign in to comment.