diff --git a/lib/src/streams/range.dart b/lib/src/streams/range.dart index 77f461d7f..83b4c1eda 100644 --- a/lib/src/streams/range.dart +++ b/lib/src/streams/range.dart @@ -9,6 +9,7 @@ import 'dart:async'; /// /// RangeStream(3, 1).listen((i) => print(i)); // Prints 3, 2, 1 class RangeStream extends Stream { + var _isListened = false; final Stream _stream; /// Constructs a [Stream] which emits all integer values that exist @@ -18,36 +19,23 @@ class RangeStream extends Stream { @override StreamSubscription listen(void Function(int event)? onData, - {Function? onError, void Function()? onDone, bool? cancelOnError}) => - _stream.listen(onData, - onError: onError, onDone: onDone, cancelOnError: cancelOnError); + {Function? onError, void Function()? onDone, bool? cancelOnError}) { + if (_isListened) { + throw StateError('Stream has already been listened to.'); + } + _isListened = true; + + return _stream.listen(onData, + onError: onError, onDone: onDone, cancelOnError: cancelOnError); + } static Stream _buildStream(int startInclusive, int endInclusive) { - final controller = StreamController(sync: true); - StreamSubscription? subscription; - - controller.onListen = () { - final length = (endInclusive - startInclusive).abs() + 1; - int nextValue(int index) => startInclusive > endInclusive - ? startInclusive - index - : startInclusive + index; - - subscription = - Stream.fromIterable(Iterable.generate(length, nextValue)).listen( - controller.add, - onError: controller.addError, - onDone: controller.close, - ); + final length = (endInclusive - startInclusive).abs() + 1; - controller.onPause = subscription!.pause; - controller.onResume = subscription!.resume; - }; - controller.onCancel = () { - final future = subscription?.cancel(); - subscription = null; - return future; - }; + int nextValue(int index) => startInclusive > endInclusive + ? startInclusive - index + : startInclusive + index; - return controller.stream; + return Stream.fromIterable(Iterable.generate(length, nextValue)); } }