-
Notifications
You must be signed in to change notification settings - Fork 51
101 Reactive Gems (working title) #21
Comments
1) The merge/flatMap identityThe operators Given a nested Publisher<Publisher<T>> sources = ...
merge(sources) == sources.flatMap(source -> source); Given a Publisher<T> source = ...
Function<T, Publisher<R>> mapper = ...
source.flatMap(mapper) == merge(source.map(mapper)); This identity is also true for other mapping operators:
This is a general property of the operation and works for non-reactive but functional API's as well. This can come in handy if some library doesn't offer both methods but only the one. |
2) Compose at subscription timeModern reactive libraries offer you fluent conversion operators: Function<Publisher<T>, Publisher<T>> addSchedulers = o ->
o.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
range(1, 10).compose(addSchedulers).subscribe(System.out::println); The function However, sometimes you want stateful operators with a sequence, for example, a counting Luckily, you can use the composition operators above in conjunction with range(1, 10).compose(o -> defer(() -> {
int[] counter = new int[] { 0 };
return o.map(v -> ++counter[0]);
})).subscribe(System.out::println); |
3) FlatMap as map or filterThe operator The mapper function let's you return any We can utilize this property and implement Function<T, R> mapper = ...
map(mapper) == flatMap(v -> just(mapper.apply(v))); Given the current value from source, we apply the We can apply the same methodology with Predicate<T> predicate = ...
filter(predicate) == flatMap(v -> predicate.test(v) ? just(v) : empty()); Note, however, that using source.flatMap(v -> {
try {
performIO(v);
return just(v);
} catch (IOException ex) {
return error(ex);
}
}) |
4) Continuations with concatSometimes, you want to run through a sequence of values and then resume with another sequence of a completely different type once the first completed. In this case, you'd usually also ignore the first sequence's values. Some libraries already offer operators for it: Publisher<Integer> a = ...
Publisher<String> b = ...
Publisher<String> c = (Publisher<String>)concat(a.ignoreElements(), b); Unfortunately, you'll need some explicit casts and |
5) When subscribeOn and observeOn/publishOn act sometimes the sameOne of the confusions with reactive operators are centered around However, many sequential data sources, such as just(1).subscribeOn(scheduler) == just(1).observeOn(scheduler)
range(1, 5).subscribeOn(scheduler) == range(1, 5).publishOn(scheduler)
fromArray("a", "b", "c").subscribeOn(scheduler) == fromArray("a", "b", "c").observeOn(scheduler)
// etc. The reason for this equivalence is that these constant sources don't really have subscription side-effects and requests to them will trigger emissions on the Most modern libraries already exploit this in respect to In addition, using range(1, 10).subscribeOn(schedulerA).observeOn(schedulerB); |
6) FlatMap as concatMapThe operator However, flatMap(mapper, 1) == concatMap(mapper) This identity helped many times to rule out possible bugs in either Note that in many implementations of |
7) Parallel processingReactive datasources are sequential in nature; they form a processing pipeline that runs in FIFO order, even if source values are available while processing earlier values. Still, sometimes you want to go parallel, for example, execute many service calls at once or distribute some processing over the available CPUs. There are some experiments with such reactive stream parallelization, but the effect itself can be achieved today with common operators. Using
|
8) Executing an action if the source is emptySometimes you want to execute some action, such as logging, when it turns out the source You should be familiar with the Publisher<T> source = ...
source.switchIfEmpty(empty().doOnComplete(() -> System.out.println("Empty source!"))); |
9) Processing elements pairwiseSometimes you want to process subsequent elements of a range(1, 9)
.buffer(2, 1)
.filter(b -> b.size() == 2)
.map(b -> b.get(0) + b.get(1))
.subscribe(System.out::println); Since An alternative way is to use the We can use this range(1, 9)
.publish(o -> zip(o, o.skip(1), (a, b) -> a + b))
.subscribe(System.out::println); There is no need for filtering as there is no list anymore and we get the pairs nicely via lambda parameters. Naturally, it works with triplets as well: range(1, 9)
.publish(o -> zip(o, o.skip(1), o.skip(2), (a, b, c) -> a + b + c))
.subscribe(System.out::println); |
10) Executing a function asynchronouslyYou may often want to execute some Many modern libraries offer the Supplier<Integer> supplier = () -> 1;
fromCallable(supplier::get).subscribeOn(computation);
Runnable run = () -> System.out.println("Hello!");
fromCallable(() -> { run.run(); return null; }).subscribeOn(computation); If for some reason just("whatever")
.subscribeOn(computation)
.map(ignored -> supplier.get()); Of course, you can swap the order of |
11) Caching the last element and clearing it on demandA possible use for Sometimes, the contents of this cache can become outdated and shouldn't be emitted to new We will use this Integer CLEAR = new Integer(0);
BehaviorProcessor<Integer> cache = new BehaviorProcessor<>(CLEAR);
Publisher<Integer> front = cache.filter(v -> v != CLEAR);
front.subscribe(System.out::println);
cache.onNext(10);
cache.onNext(CLEAR);
cache.onNext(20);
front.subscribe(System.out::println); In the example, we create a new (!) Integer instance and use it for reference comparison to determine if the cache is "empty". Sometimes, however, you can't just create such an "empty" instance of the type you are working with. In this case, you have to revert to the lowest common denominator type: Object CLEAR = new Object();
BehaviorProcessor<Object> cache = new BehaviorProcessor<>();
Publisher<String> front = cache.filter(v -> v != CLEAR).cast(String.class);
cache.onNext("abc");
cache.onNext(CLEAR);
cache.onNext("def");
front.subscribe(System.out::println); Of course, directly exposing public final class Cache<T> implements Observer<T>, Publisher<T> {
static final Object CLEAR = new Object();
final BehaviorProcessor<Object> cache = new BehaviorProcessor<>(CLEAR);
@Override
public void subscribe(Subscriber<? super T> subscriber) {
cache.filter(v -> v != CLEAR).subscribe(subscriber);
}
@Override
public void onNext(T t) {
cache.onNext(t);
}
@Override
public void onError(Throwable t) {
cache.onError(t);
}
@Override
public void onComplete() {
cache.onComplete();
}
public void clear() {
cache.onNext(CLEAR);
}
} Remember, |
12) processing elements of list-based valuesSometimes, an API gives you a Let's start out as a common source of Publisher<List<Integer>> source = range(1, 1_000_000).buffer(256); In-place processingYou can just simply run a for loop and manipulate the list in-place if the list is mutable ( source.map(list -> {
for (int i = 0; i < list.size(); i++) {
list.set(i, list.get(i) + 2_000_000);
}
return list;
}); Or as a new list: source.map(list -> {
List<Integer> newList = new ArrayList<>(list.size());
for (Integer v : list) {
newList.add(v + 2_000_000);
}
return newList;
}); Stream processingYou can combine the worlds of reactive and the interactive Java 8 Stream processing: source.map(list ->
list.stream()
.map(v -> v + 2_000_000)
.collect(toList())
); ConcatMapIf you can't mute the list, stuck on Java 7 or before, or just don't want to look "non functional", you can use source.concatMap(list ->
fromIterable(list)
.map(v -> v + 2_000_000)
.toList()
); |
13) Awaiting completion of many sourcesSometimes you have a bunch of You may think of You can simply use Publisher<String> source1 = just("Hello");
Publisher<Integer> source2 = range(1, 3);
Publisher<Long> source3 = timer(1, TimeUnit.MILLISECONDS);
just(source1, source2, source3)
.flatMap(o -> o.ignoreElements())
.doOnComplete(() -> System.out.println("Done")) Some libraries like Reactor also use |
14) Errors as non-terminal eventsBy default, reactive protocols treat Many times, you'd want to treat an error like any other value and keep the sequences running. For this, you have to hide an error from the library by first not calling EmitterProcessor<Signal<Integer>> bus = EmitterProcessor.create();
bus.dematerialize().consume(System.out::println, Throwable::printStackTrace);
bus.subscribe(System.out::println, Throwable::printStackTrace);
bus.onNext(Signal.next(1));
bus.onNext(Signal.next(RuntimeException()));
bus.onNext(Signal.next(2));
bus.onNext(Signal.next(3)); You can use |
15) defer expressed as flatMapThe operator Func0<Observable<Integer>> supplier = () -> range(System.currentTimeMillis() & 1023, 5);
defer(supplier) == just("whatever").flatMap(v -> supplier.call()); In fact you could write it with defer(supplier) == just("whatever").concatMap(v -> supplier.call()); Even though using Function<Integer, Observable<Integer>> function = v -> range(v, 2);
just(-10).flatMap(function) == defer(() -> function.call(-10)); by exctacting the constant value from For this reason, using range(1, 5).nest().concatMap(o -> o.take(3)) == range(1, 5).take(3) |
16) Emit elements of a list periodicallyThe source operator Then comes a requirement of emitting items from a list periodically, that is, with some fixed delay between elements. We can first map the List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
interval(1, TimeUnit.SECONDS)
.takeUntil(t -> t == list.size() - 1)
.map(t -> list.get((int)t))
.subscribe(System.out::println) Given the interval, we take its elements until the current running value is exactly the list's size minus one. The operator |
17) Add a timed gap between elements
range(1, 1_000)
.concatMap(t -> Mono.delay(1000).map( v -> t ))
.consume(System.out::println)
// prints 1, ..... 2, ...... 3 ..... N with ...... =1 second
==
interval(1000)
.map( i )
.consume(System.out::println) You can also choose to relatively shift forward using range(1, 1_000)
.flatMap(t -> Mono.delay(1000).map( v -> t ))
.consume(System.out::println)
// prints .... 1, 2, 3 , [..] C ..... N with ...... = 1 second and C = max concurrency
// (shifting groups of items) |
18) Inner-join with flatMapIn SQL, we are used to write inner joins; the ones that pit one table against another table and forms all sorts of pairs. We can do such joins with Publisher<Integer> main = range(1, 5);
Publisher<String> slave = fromArray("a", "bb", "ccc");
main.flatMap(len -> slave.filter(s -> s.length() == len)); This will, for each main element stream through the Some libraries offer overload that takes a two argument function which will receive these pairs and should produce some values out of them: main.flatMap(v -> slave, (m, s) -> m == s.length()); Of course, one can return a main.flatMap(v -> slave, (m, s) -> m == s.length() ? just(s) : empty())
.flatMap(v -> v) |
19) Eager concatenationSometimes, one would like to merge concurrently running sources and keep their order at the same time. The usual Many reactive libraries have an operator for this: range(1, 10)
.map(v -> {
Flux<T> o = getSource(v);
UnicastProcessor<T> up = UnicastProcessor.create();
o.subscribe(up);
return up;
})
.concatMap(v -> v)
.subscribe(System.out::println, Throwable::printStackTrace); Here, values 1..10 are mapped to a source which is then started by subscribing an In case you don't have an
.map(v -> {
Flux<T> o = getSource(v);
ReplayProcessor<T> rp = ReplayProcessor.create();
o.subscribe(rp);
return rp;
})
.map(v -> {
Flux<T> f = getSource(v);
ConnectableFlux<T> cf = f.replay();
cf.subscribe(e -> { }, Throwable::printStackTrace);
cf.connect();
return cf;
}) That extra
.map(v -> {
Flux<T> f = getSource(v);
Flux<T> cf = f.cache();
cf.subscribe(e -> { }, Throwable::printStackTrace);
return cf;
}) By default, cancellation won't cancel the prestarted sources. You have to manually wire up the end List<Cancellation> cancellations = ...
// ...
.map(v -> {
Flux<T> f = getSource(v);
UnicastProcessor<T> up = UnicastProcessor.create();
cancellations.add(f.subscribe(up));
return us;
})
// ...
.map(v -> {
Flux<T> f = getSource(v);
ConnectableFlux<T> cf = f.replay();
cf.subscribe(e -> { }, Throwable::printStackTrace);
cancellations.add(cf.connect());
return cf;
})
// ...
.map(v -> {
Flux<T> f = getSource(v);
Flux<T> cf = f.replay().autoConnect(1, cancellations::add);
cf.subscribe(e -> { }, Throwable::printStackTrace);
return cf;
}) |
20) defer via usingThe operator Therefore, you can imitate Supplier<Publisher<Integer>> s = () -> range(1, 10);
defer(s) => using(() -> "whatever", v -> s, v -> { }); However, you could also create the source itself as the resource and use identity-mapping on it: using(s, v -> v, v -> { })
using(s::get, v -> v, v -> { }) The first shorter case is applicable if your library's The reverse direction, namely expressing Supplier<R> resource = ...
Function<R, Publisher<T>> source ==
Consumer<R> disposer;
defer(() ->
fromCallable(resource)
.flatMap(r -> {
try {
return source.apply(r)
.doOnTerminate(() -> disposer.accept(r))
.doOnUnsubscribe(() -> disposer.accept(r));
} catch (Throwable ex) {
disposer.accept(r);
return error(ex);
}
})
); In RxJava, a terminal event is followed by an defer(() ->
fromCallable(resource)
.flatMap(r -> {
AtomicBoolean once = new AtomicBoolean();
try {
return source.apply(r)
.doOnTerminate(() -> disposeOnce(once, disposer, r))
.doOnUnsubscribe(() -> disposeOnce(once, disposer, r));
} catch (Throwable ex) {
disposeOnce(once, disposer, r);
return error(ex);
}
})
);
<R> void disposeOnce(AtomicBoolean once, Consumer<R> disposer, R resource) {
if (once.compareAndSet(false, true)) {
disposer.accept(resource);
}
} |
21) Caching and clearingIf we want to execute some code once and then hand out the generated values, such as login tokens or results of a network call, we usually can go for However, sometimes the data gets outdated and there is no way of clearing the structures above. But we can restart the whole process and make sure new subscribers get the fresh data if we cache the cache itself and use defer to get the current caching source: final AtomicReference<Mono<Long>> cache = new AtomicReference<>(getSource());
public Mono<Long> getSource() {
return Mono.fromCallable(System::currentTimeMillis).cache();
}
public Mono<Long> get() {
return Mono.defer(() -> cache.get());
}
public void reset() {
cache.set(getSource());
} |
22) Compute a single value only when requestedBy design, In case you want to compute only when requested, you can use the same jump-start trick with just("irrelevant")
.map(unused -> {
try {
return callable.call();
} catch (Exception ex) {
throw Exceptions.bubble(ex); // or Exceptions.propagate(ex)
}
})
.subscribe(System.out::println, Throwable::printStackTrace); Since
Now the console will first print |
In this issue, we should collect tips and tricks with reactive systems and dataflows.
These are not particularly advanced topics but the markdown support on GitHub makes it easier to write them up.
Once we run out of ideas, we may tidy it up and release it together (maybe a free ebook?).
Please post only gems here and open discussion about them in separate issues. Thanks.
The text was updated successfully, but these errors were encountered: