-
Notifications
You must be signed in to change notification settings - Fork 0
/
chancaster.go
191 lines (169 loc) · 7.31 KB
/
chancaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package bigbuff
import (
"math"
"sync"
"sync/atomic"
)
// ChanCaster supports broadcasting messages to multiple receivers, via a
// single channel, without callbacks.
//
// See also [NewChanCaster] (optional).
type ChanCaster[C chan V, V any] struct {
// C is the channel that will be used to broadcast messages.
//
// See [ChanCaster.Add] and [ChanCaster.Send] for usage details.
C C
// mutex synchronises [ChanCaster.Send] and [ChanCaster.Add] calls with
// positive deltas. Ensuring sends are mutually exclusive is fairly
// important for consistency, but not strictly necessary. The primary
// purpose of this mutex is to avoid incrementing the number of receivers
// while any send is in progress, to facilitate removing receivers while
// send is in progress.
mutex sync.RWMutex
// state synchronises the number of receivers, between [ChanCaster.Send]
// calls, and [ChanCaster.Add] calls with negative deltas.
// The high 32 bits are number of receivers (modeled as int32), while the
// low 32 bits are either identical, or [math.MaxInt32] + the number of
// receivers being (the value of the high 32 bits), if sending is in
// progress. Using an atomic value for synchronisation requires having the
// "is sending" information embedded in said value, and having two copies
// of the number of receivers makes invariant and overflow checks easier.
state atomic.Uint64
}
// NewChanCaster is a factory for [ChanCaster] that exists solely for the
// convenience of type inference.
func NewChanCaster[C chan V, V any](channel C) *ChanCaster[C, V] {
return &ChanCaster[C, V]{C: channel}
}
// Send broadcasts a message, to all registered receivers, via
// [ChanCaster.C], then returns the number of sends performed, less any
// removed mid-flight by [ChanCaster.Add].
//
// If the contract of [ChanCaster] is obeyed, this call will never block for
// any significant time. This function may panic if state invariants have been
// violated, e.g. by misuse of [ChanCaster.Add], or if unregistered receives
// are detected.
//
// See [ChanCaster.Add] for usage details.
func (x *ChanCaster[C, V]) Send(value V) int {
if x.state.Load() == 0 {
return 0 // no receivers (fast path)
}
// prevent receivers being added while sending + order values by send call
x.mutex.Lock()
defer x.mutex.Unlock()
// load our state, guard no receivers (early exit), and set tracker to the
// value of `maxInt32 + receivers`, using CAS to sync with negative adds
var (
state uint64
receivers, tracker uint32
)
for {
state = x.state.Load()
if state == 0 {
return 0 // no receivers (slow path)
}
receivers = uint32(state >> 32) // initialize from hi
tracker = uint32(state) // initialize from lo
if tracker != receivers || receivers > math.MaxInt32 {
panic(`bigbuff: chancaster: send: state invariant violation`)
}
// attempt to set tracker to `maxInt32 + receivers`, with CAS used to
// synchronise with decrements of receivers
tracker += math.MaxInt32
if x.state.CompareAndSwap(state, uint64(receivers)<<32|uint64(tracker)) {
break
}
}
// broadcast involves sending to all receivers - with the total actually
// received being in range [0, receivers], due to potential decrements
for range receivers {
x.C <- value // may end up received by negative Add calls
}
// now, we can retrieve, validate, then reset the state (to 0 - all broadcast + we locked so none added)
// note: it should be stable - if it isn't, invariants were violated
state = x.state.Load()
tracker = uint32(state >> 32) // actually the final receivers (used as scratch)
if tracker > receivers || // receivers should be unchanged or decreased (and also lower than math.MaxInt32)
uint32(state) != tracker+math.MaxInt32 || // lo still exactly math.MaxInt32 more than hi
!x.state.CompareAndSwap(state, 0) { // failing this indicates one or more unregistered receivers
panic(`bigbuff: chancaster: send: state invariant violation`)
}
// returns number of sends less any received by negative Add calls
return int(tracker)
}
// Add increments or decrements the number of receivers, for [ChanCaster.C],
// then returns the number of receivers, after the operation. Calling with a
// delta of 0 is allowed, and will load and validate then return the number
// of receivers, without changing it.
//
// Each added receiver represents the intent to receive exactly one value, from
// [ChanCaster.C]. Each receiver removed (via a negative delta) represents
// exactly one added receiver, which has not yet been removed, and has not and
// will not receive a value. The valid range for the number of receivers is
// [0, math.MaxInt32], and any add which results in a number of receivers
// outside this range will cause a panic.
//
// The typical usage pattern is to call [ChanCaster.Add] with a delta of 1,
// then immediately receive, or attempt to receive, e.g. within a `select`
// statement. If the next-received value (e.g. within said `select` statement)
// is not from [ChanCaster.C], and receive won't be promptly re-attempted, then
// [ChanCaster.Add] should be called again, with the inverse of the previous
// delta.
//
// Using a delta greater than 1 indicates multiple separate receivers, which
// will all receive the same value, from the same [ChanCaster.Send] call. These
// receivers should independently decrement the number of receivers, if
// necessary, as described above.
func (x *ChanCaster[C, V]) Add(delta int) int {
const maxReceivers = math.MaxInt32
switch {
case delta >= 0:
var state uint64
if delta == 0 {
state = x.state.Load() // no change
} else if delta > maxReceivers {
panic(`bigbuff: chancaster: add: positive delta out of bounds`)
} else {
// increasing num receivers not allowed concurrently with sending
x.mutex.RLock()
defer x.mutex.RUnlock()
// add delta to both hi and lo
state = x.state.Add(uint64(delta)<<32 | uint64(uint32(delta)))
}
// validate to ensure we did not overflow + sanity check invariants
// note: if this was an add (and we acquired the lock), the lo value
// should never be the +maxReceivers case
if receivers, tracker := uint32(state>>32), uint32(state); receivers <= maxReceivers &&
receivers >= uint32(delta) &&
(receivers == tracker || (delta == 0 && receivers+maxReceivers == tracker)) {
return int(receivers)
}
case delta < -maxReceivers:
panic(`bigbuff: chancaster: add: negative delta out of bounds`)
default:
// flip delta to positive, from negative
delta = -delta
// note: same delta calc as above, subtracted using two's complement rules
state := x.state.Add(^(uint64(delta)<<32 | uint64(uint32(delta)) - 1))
// validate, and, if necessary, receive any channel sends that would
// otherwise never be received (to avoid Send hanging)
if receivers := uint32(state >> 32); receivers <= maxReceivers &&
maxReceivers-receivers >= uint32(delta) {
switch uint32(state) { // lo
case receivers:
// not sending
return int(receivers) // note: already subtracted delta
case maxReceivers + receivers:
// sending - perform the requisite number of receives
// note: receivers = tracker-maxReceivers (per the above)
for range delta {
<-x.C
}
return int(receivers) // note: already subtracted delta
}
}
}
// invariant violation, e.g. due to OoB add, or previous violation
panic(`bigbuff: chancaster: add: state invariant violation`)
}