Skip to content

Commit

Permalink
Add a hearthbeat mechanism, and timeouts support
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas committed Mar 5, 2019
1 parent 1c56f0c commit d7db60f
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 9 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ LOG_FORMAT=JSON
PUBLISH_ALLOWED_ORIGINS=http://localhost:3000,http://localhost:8000
PUBLISHER_JWT_KEY=
SUBSCRIBER_JWT_KEY=
HEARTHBEAT_INTERVAL=10s
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,14 @@ Be sure to update the value of `ACME_HOSTS` to match your domain name(s), a Let'
* `DB_PATH`: the path of the [bbolt](https://github.com/etcd-io/bbolt) database (default to `updates.db` in the current directory)
* `DEBUG`: set to `1` to enable the debug mode (prints recovery stack traces)
* `DEMO`: set to `1` to enable the demo mode (automatically enabled when `DEBUG=1`)
* `HEARTHBEAT_INTERVAL`: interval between heartbeats (useful with some proxies, and old browsers), set to `0s` to disable (default), example `15s`
* `JWT_KEY`: the JWT key to use for both publishers and subscribers
* `LOG_FORMAT`: the log format, can be `JSON`, `FLUENTD` or `TEXT` (default)
* `PUBLISH_ALLOWED_ORIGINS`: a comma separated list of origins allowed to publish (only applicable when using cookie-based auth)
* `PUBLISHER_JWT_KEY`: must contain the secret key to valid publishers' JWT, can be omited if `JWT_KEY` is set
* `READ_TIMEOUT`: maximum duration for reading the entire request, including the body, set to `0` to disable (default), example: `2m`
* `SUBSCRIBER_JWT_KEY`: must contain the secret key to valid subscribers' JWT, can be omited if `JWT_KEY` is set
* `WRITE_TIMEOUT`: maximum duration before timing out writes of the response, set to `0` to disable (default), example: `2m`

If `ACME_HOSTS` or both `CERT_FILE` and `CERT_KEY` are provided, an HTTPS server supporting HTTP/2 connection will be started.
If not, an HTTP server will be started (**not secure**).
Expand Down
36 changes: 36 additions & 0 deletions hub/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"strings"
"time"
)

// Options stores the hub's options
Expand All @@ -20,6 +21,9 @@ type Options struct {
AcmeCertDir string
CertFile string
KeyFile string
HearthbeatInterval time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
Demo bool
}

Expand All @@ -40,6 +44,21 @@ func NewOptionsFromEnv() (*Options, error) {
dbPath = "updates.db"
}

hearthbeatInterval, err := parseDurationFromEnvVar("HEARTHBEAT_INTERVAL")
if err != nil {
return nil, err
}

readTimeout, err := parseDurationFromEnvVar("READ_TIMEOUT")
if err != nil {
return nil, err
}

writeTimeout, err := parseDurationFromEnvVar("WRITE_TIMEOUT")
if err != nil {
return nil, err
}

options := &Options{
os.Getenv("DEBUG") == "1",
dbPath,
Expand All @@ -53,6 +72,9 @@ func NewOptionsFromEnv() (*Options, error) {
os.Getenv("ACME_CERT_DIR"),
os.Getenv("CERT_FILE"),
os.Getenv("KEY_FILE"),
hearthbeatInterval,
readTimeout,
writeTimeout,
os.Getenv("DEMO") == "1" || os.Getenv("DEBUG") == "1",
}

Expand Down Expand Up @@ -84,3 +106,17 @@ func splitVar(v string) []string {

return strings.Split(v, ",")
}

func parseDurationFromEnvVar(k string) (time.Duration, error) {
v := os.Getenv(k)
if v == "" {
return time.Duration(0), nil
}

dur, err := time.ParseDuration(v)
if err == nil {
return dur, nil
}

return time.Duration(0), fmt.Errorf("%s: %s", k, err)
}
19 changes: 19 additions & 0 deletions hub/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hub
import (
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -22,6 +23,9 @@ func TestNewOptionsFormNew(t *testing.T) {
"PUBLISHER_JWT_KEY": "foo",
"PUBLISH_ALLOWED_ORIGINS": "http://127.0.0.1:8080",
"SUBSCRIBER_JWT_KEY": "bar",
"HEARTHBEAT_INTERVAL": "30s",
"READ_TIMEOUT": "1m",
"WRITE_TIMEOUT": "40s",
}
for k, v := range testEnv {
os.Setenv(k, v)
Expand All @@ -42,6 +46,9 @@ func TestNewOptionsFormNew(t *testing.T) {
"/tmp",
"foo",
"bar",
30 * time.Second,
time.Minute,
40 * time.Second,
true,
}, opts)
assert.Nil(t, err)
Expand All @@ -67,3 +74,15 @@ func TestMissingCertFile(t *testing.T) {
_, err := NewOptionsFromEnv()
assert.EqualError(t, err, "The following environment variable must be defined: [PUBLISHER_JWT_KEY SUBSCRIBER_JWT_KEY CERT_FILE]")
}

func TestInvalidDuration(t *testing.T) {
vars := [3]string{"HEARTHBEAT_INTERVAL", "READ_TIMEOUT", "WRITE_TIMEOUT"}
for _, elem := range vars {
os.Setenv(elem, "1 MN (invalid)")
defer os.Unsetenv(elem)
_, err := NewOptionsFromEnv()
assert.EqualError(t, err, elem+": time: unknown unit MN (invalid) in duration 1 MN (invalid)")

os.Unsetenv(elem)
}
}
6 changes: 4 additions & 2 deletions hub/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
// Serve starts the HTTP server
func (h *Hub) Serve() {
h.server = &http.Server{
Addr: h.options.Addr,
Handler: h.chainHandlers(),
Addr: h.options.Addr,
Handler: h.chainHandlers(),
ReadTimeout: h.options.ReadTimeout,
WriteTimeout: h.options.WriteTimeout,
}
h.server.RegisterOnShutdown(func() {
h.Stop()
Expand Down
36 changes: 29 additions & 7 deletions hub/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net/http"
"regexp"
"time"

log "github.com/sirupsen/logrus"
"github.com/yosida95/uritemplate"
Expand Down Expand Up @@ -63,15 +64,10 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {
log.WithFields(log.Fields{"remote_addr": r.RemoteAddr}).Info("Subscriber disconnected")
}()

for {
serializedUpdate, open := <-updateChan
if !open {
break
}

publish := func(serializedUpdate *serializedUpdate) {
// Check authorization
if !subscriber.CanReceive(serializedUpdate.Update) {
continue
return
}

fmt.Fprint(w, serializedUpdate.event)
Expand All @@ -81,6 +77,32 @@ func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request) {
}).Info("Event sent")
f.Flush()
}

for {
if h.options.HearthbeatInterval == time.Duration(0) {
// No hearthbeat defined, just block
serializedUpdate, open := <-updateChan
if !open {
return
}
publish(serializedUpdate)

continue
}

select {
case serializedUpdate, open := <-updateChan:
if !open {
return
}
publish(serializedUpdate)

case <-time.After(h.options.HearthbeatInterval):
// Send a SSE comment as a heartbeat, to prevent issues with some proxies and old browsers
fmt.Fprint(w, ":\n")
f.Flush()
}
}
}

// sendHeaders send correct HTTP headers to create a keep-alive connection
Expand Down
36 changes: 36 additions & 0 deletions hub/subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
bolt "go.etcd.io/bbolt"
Expand Down Expand Up @@ -335,6 +336,41 @@ func TestSendMissedEvents(t *testing.T) {
wg.Wait()
}

func TestSubscribeHearthbeat(t *testing.T) {
hub := createAnonymousDummy()
hub.options.HearthbeatInterval = 5 * time.Millisecond
hub.Start()

go func() {
for {
hub.subscribers.RLock()
empty := len(hub.subscribers.m) == 0
hub.subscribers.RUnlock()

if empty {
continue
}

hub.updates <- newSerializedUpdate(&Update{
Topics: []string{"http://example.com/books/1"},
Event: Event{Data: "Hello World", ID: "b"},
})

time.Sleep(8 * time.Millisecond)
hub.Stop()
return
}
}()

req := httptest.NewRequest("GET", "http://example.com/hub?topic=http://example.com/books/1&topic=http://example.com/reviews/{id}", nil)
w := newCloseNotifyingRecorder()
hub.SubscribeHandler(w, req)

resp := w.Result()
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "id: b\ndata: Hello World\n\n:\n", w.Body.String())
}

// From https://github.com/go-martini/martini/blob/master/response_writer_test.go
type closeNotifyingRecorder struct {
*httptest.ResponseRecorder
Expand Down

0 comments on commit d7db60f

Please sign in to comment.