-
Notifications
You must be signed in to change notification settings - Fork 3k
/
shareReplay.ts
50 lines (45 loc) · 1.5 KB
/
shareReplay.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import { Observable } from '../Observable';
import { ReplaySubject } from '../ReplaySubject';
import { Subscription } from '../Subscription';
import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
import { Subscriber } from '../Subscriber';
/**
* @method shareReplay
* @owner Observable
*/
export function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike ): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => source.lift(shareReplayOperator(bufferSize, windowTime, scheduler));
}
function shareReplayOperator<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike) {
let subject: ReplaySubject<T>;
let refCount = 0;
let subscription: Subscription;
let hasError = false;
let isComplete = false;
return function shareReplayOperation(this: Subscriber<T>, source: Observable<T>) {
refCount++;
if (!subject || hasError) {
hasError = false;
subject = new ReplaySubject<T>(bufferSize, windowTime, scheduler);
subscription = source.subscribe({
next(value) { subject.next(value); },
error(err) {
hasError = true;
subject.error(err);
},
complete() {
isComplete = true;
subject.complete();
},
});
}
const innerSub = subject.subscribe(this);
return () => {
refCount--;
innerSub.unsubscribe();
if (subscription && refCount === 0 && isComplete) {
subscription.unsubscribe();
}
};
};
}