Skip to content

Commit

Permalink
event stream fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Mar 30, 2024
1 parent 5541810 commit 07f820e
Showing 1 changed file with 43 additions and 14 deletions.
57 changes: 43 additions & 14 deletions rpc/eventstream/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
type Stream struct {
c *http.Client
req *http.Request
lastEventId string
lastEventID string
retry time.Duration
// Events emits the events received by the stream
Events chan StreamEvent
Expand Down Expand Up @@ -59,27 +59,28 @@ func (e SubscriptionError) Error() string {

// Subscribe to the Events emitted from the specified url.
// If lastEventId is non-empty it will be sent to the server in case it can replay missed events.
func Subscribe(url, lastEventId string) (*Stream, error) {
req, err := http.NewRequest("GET", url, nil)
func Subscribe(url, lastEventID string) (*Stream, error) {
req, err := http.NewRequest("GET", url, http.NoBody)
if err != nil {
return nil, err
}
return SubscribeWithRequest(lastEventId, req)

return SubscribeWithRequest(lastEventID, req)
}

// SubscribeWithRequest will take an http.Request to setup the stream, allowing custom headers
// to be specified, authentication to be configured, etc.
func SubscribeWithRequest(lastEventId string, request *http.Request) (*Stream, error) {
return SubscribeWith(lastEventId, http.DefaultClient, request)
func SubscribeWithRequest(lastEventID string, request *http.Request) (*Stream, error) {
return SubscribeWith(lastEventID, http.DefaultClient, request)
}

// SubscribeWith takes a http client and request providing customization over both headers and
// control over the http client settings (timeouts, tls, etc)
func SubscribeWith(lastEventId string, client *http.Client, request *http.Request) (*Stream, error) {
func SubscribeWith(lastEventID string, client *http.Client, request *http.Request) (*Stream, error) {
stream := &Stream{
c: client,
req: request,
lastEventId: lastEventId,
lastEventID: lastEventID,
retry: time.Millisecond * 3000,
Events: make(chan StreamEvent),
Errors: make(chan error, 10),
Expand All @@ -91,7 +92,9 @@ func SubscribeWith(lastEventId string, client *http.Client, request *http.Reques
if err != nil {
return nil, err
}

go stream.stream(r)

return stream, nil
}

Expand All @@ -100,6 +103,7 @@ func (stream *Stream) Close() {
go func() {
stream.closeMutex.Lock()
defer stream.closeMutex.Unlock()

if stream.isClosed {
return
}
Expand All @@ -116,32 +120,40 @@ func checkRedirect(req *http.Request, via []*http.Request) error {
if len(via) >= 10 {
return errors.New("stopped after 10 redirects")
}

for k, vv := range via[0].Header {
for _, v := range vv {
req.Header.Add(k, v)
}
}

return nil
}

func (stream *Stream) connect() (r io.ReadCloser, err error) {
var resp *http.Response

stream.req.Header.Set("Cache-Control", "no-cache")
stream.req.Header.Set("Accept", "text/event-stream")
if len(stream.lastEventId) > 0 {
stream.req.Header.Set("Last-Event-ID", stream.lastEventId)

if stream.lastEventID != "" {
stream.req.Header.Set("Last-Event-ID", stream.lastEventID)
}

if resp, err = stream.c.Do(stream.req); err != nil {
return
}

if resp.StatusCode != 200 {
message, _ := io.ReadAll(resp.Body)
err = SubscriptionError{
Code: resp.StatusCode,
Message: string(message),
}
}

r = resp.Body

return
}

Expand All @@ -162,39 +174,53 @@ func (stream *Stream) receiveEvents(r io.ReadCloser) {

for {
ev, err := dec.Decode()

stream.closeMutex.Lock()
if stream.isClosed {
stream.closeMutex.Unlock()
return
}

if err != nil {
stream.Errors <- err
stream.closeMutex.Unlock()

return
}
stream.closeMutex.Unlock()

pub := ev.(StreamEvent)
pub, ok := ev.(StreamEvent)
if !ok {
stream.closeMutex.Unlock()
continue
}

if pub.Retry() > 0 {
stream.retry = time.Duration(pub.Retry()) * time.Millisecond
}
if len(pub.Id()) > 0 {
stream.lastEventId = pub.Id()

if pub.Id() != "" {
stream.lastEventID = pub.Id()
}

stream.Events <- pub
stream.closeMutex.Unlock()
}
}

func (stream *Stream) retryRestartStream() {
backoff := stream.retry

for {
if stream.Logger != nil {
stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds())
}

time.Sleep(backoff)

if stream.isClosed {
return
}

// NOTE: because of the defer we're opening the new connection
// before closing the old one. Shouldn't be a problem in practice,
// but something to be aware of.
Expand All @@ -203,10 +229,13 @@ func (stream *Stream) retryRestartStream() {
go stream.stream(r)
return
}

if stream.isClosed {
return
}

stream.Errors <- err

backoff = 10 * time.Second
}
}

0 comments on commit 07f820e

Please sign in to comment.