From 07f820ed4d0ef6dce342ea7847315d8ca7ea5608 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 30 Mar 2024 07:28:28 +0100 Subject: [PATCH] event stream fixes --- rpc/eventstream/eventstream.go | 57 +++++++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 14 deletions(-) diff --git a/rpc/eventstream/eventstream.go b/rpc/eventstream/eventstream.go index 1ebbb864..1fd2342d 100644 --- a/rpc/eventstream/eventstream.go +++ b/rpc/eventstream/eventstream.go @@ -18,7 +18,7 @@ import ( type Stream struct { c *http.Client req *http.Request - lastEventId string + lastEventID string retry time.Duration // Events emits the events received by the stream Events chan StreamEvent @@ -59,27 +59,28 @@ func (e SubscriptionError) Error() string { // Subscribe to the Events emitted from the specified url. // If lastEventId is non-empty it will be sent to the server in case it can replay missed events. -func Subscribe(url, lastEventId string) (*Stream, error) { - req, err := http.NewRequest("GET", url, nil) +func Subscribe(url, lastEventID string) (*Stream, error) { + req, err := http.NewRequest("GET", url, http.NoBody) if err != nil { return nil, err } - return SubscribeWithRequest(lastEventId, req) + + return SubscribeWithRequest(lastEventID, req) } // SubscribeWithRequest will take an http.Request to setup the stream, allowing custom headers // to be specified, authentication to be configured, etc. -func SubscribeWithRequest(lastEventId string, request *http.Request) (*Stream, error) { - return SubscribeWith(lastEventId, http.DefaultClient, request) +func SubscribeWithRequest(lastEventID string, request *http.Request) (*Stream, error) { + return SubscribeWith(lastEventID, http.DefaultClient, request) } // SubscribeWith takes a http client and request providing customization over both headers and // control over the http client settings (timeouts, tls, etc) -func SubscribeWith(lastEventId string, client *http.Client, request *http.Request) (*Stream, error) { +func SubscribeWith(lastEventID string, client *http.Client, request *http.Request) (*Stream, error) { stream := &Stream{ c: client, req: request, - lastEventId: lastEventId, + lastEventID: lastEventID, retry: time.Millisecond * 3000, Events: make(chan StreamEvent), Errors: make(chan error, 10), @@ -91,7 +92,9 @@ func SubscribeWith(lastEventId string, client *http.Client, request *http.Reques if err != nil { return nil, err } + go stream.stream(r) + return stream, nil } @@ -100,6 +103,7 @@ func (stream *Stream) Close() { go func() { stream.closeMutex.Lock() defer stream.closeMutex.Unlock() + if stream.isClosed { return } @@ -116,24 +120,30 @@ func checkRedirect(req *http.Request, via []*http.Request) error { if len(via) >= 10 { return errors.New("stopped after 10 redirects") } + for k, vv := range via[0].Header { for _, v := range vv { req.Header.Add(k, v) } } + return nil } func (stream *Stream) connect() (r io.ReadCloser, err error) { var resp *http.Response + stream.req.Header.Set("Cache-Control", "no-cache") stream.req.Header.Set("Accept", "text/event-stream") - if len(stream.lastEventId) > 0 { - stream.req.Header.Set("Last-Event-ID", stream.lastEventId) + + if stream.lastEventID != "" { + stream.req.Header.Set("Last-Event-ID", stream.lastEventID) } + if resp, err = stream.c.Do(stream.req); err != nil { return } + if resp.StatusCode != 200 { message, _ := io.ReadAll(resp.Body) err = SubscriptionError{ @@ -141,7 +151,9 @@ func (stream *Stream) connect() (r io.ReadCloser, err error) { Message: string(message), } } + r = resp.Body + return } @@ -162,39 +174,53 @@ func (stream *Stream) receiveEvents(r io.ReadCloser) { for { ev, err := dec.Decode() + stream.closeMutex.Lock() if stream.isClosed { stream.closeMutex.Unlock() return } + if err != nil { stream.Errors <- err stream.closeMutex.Unlock() + return } - stream.closeMutex.Unlock() - pub := ev.(StreamEvent) + pub, ok := ev.(StreamEvent) + if !ok { + stream.closeMutex.Unlock() + continue + } + if pub.Retry() > 0 { stream.retry = time.Duration(pub.Retry()) * time.Millisecond } - if len(pub.Id()) > 0 { - stream.lastEventId = pub.Id() + + if pub.Id() != "" { + stream.lastEventID = pub.Id() } + stream.Events <- pub + stream.closeMutex.Unlock() } } func (stream *Stream) retryRestartStream() { backoff := stream.retry + for { if stream.Logger != nil { stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds()) } + time.Sleep(backoff) + if stream.isClosed { return } + // NOTE: because of the defer we're opening the new connection // before closing the old one. Shouldn't be a problem in practice, // but something to be aware of. @@ -203,10 +229,13 @@ func (stream *Stream) retryRestartStream() { go stream.stream(r) return } + if stream.isClosed { return } + stream.Errors <- err + backoff = 10 * time.Second } }