-
Notifications
You must be signed in to change notification settings - Fork 0
/
future.go
87 lines (75 loc) · 1.6 KB
/
future.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package future
import (
"context"
"sync"
)
// SetResultFunc is function to set result of the Future.
type SetResultFunc[T any] func(T, error)
// Future holds the value of Future.
type Future[T any] struct {
val T
err error
ready chan struct{}
mu sync.Mutex
callbacks []SetResultFunc[T]
}
// New constructs new Future.
func New[T any]() (*Future[T], SetResultFunc[T]) {
f := &Future[T]{ready: make(chan struct{})}
return f, f.setResult
}
// Get returns value when it's ready. Will return error when the ctx signal a cancelation.
func (f *Future[T]) Get(ctx context.Context) (T, error) {
select {
case <-f.ready:
return f.val, f.err
default:
}
select {
case <-f.ready:
return f.val, f.err
case <-ctx.Done():
var zero T
return zero, ctx.Err()
}
}
// Ready indicates whether result ready or not.
func (f *Future[T]) Ready() <-chan struct{} {
return f.ready
}
func (f *Future[T]) setResult(v T, err error) {
select {
case <-f.ready:
default:
f.val, f.err = v, err
close(f.ready)
f.notifyCallbacks()
}
}
// Listen for the result.
func (f *Future[T]) Listen(callback SetResultFunc[T]) {
select {
case <-f.ready:
callback(f.val, f.err)
default:
f.mu.Lock()
defer f.mu.Unlock()
f.callbacks = append(f.callbacks, callback)
}
}
func (f *Future[T]) notifyCallbacks() {
f.mu.Lock()
defer f.mu.Unlock()
for _, callback := range f.callbacks {
callback(f.val, f.err)
}
}
// Call will converts the sync function call as async call.
func Call[T any](f func() (T, error)) *Future[T] {
fut, setDone := New[T]()
go func() {
res, err := f()
setDone(res, err)
}()
return fut
}