diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 12a4dd27d02..fdb3b1dea65 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -1,8 +1,8 @@ package main import ( - "bytes" "fmt" + "os" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" cmds "github.com/jbenet/go-ipfs/commands" @@ -51,9 +51,8 @@ the daemon. } func daemonFunc(req cmds.Request, res cmds.Response) { - var out bytes.Buffer - res.SetOutput(&out) - writef(&out, "Initializing daemon...\n") + // let the user know we're going. + fmt.Printf("Initializing daemon...\n") // first, whether user has provided the initialization flag. we may be // running in an uninitialized state. @@ -70,7 +69,7 @@ func daemonFunc(req cmds.Request, res cmds.Response) { // `IsInitialized` where the quality of the signal can be improved over // time, and many call-sites can benefit. if !util.FileExists(req.Context().ConfigRoot) { - err := initWithDefaults(&out, req.Context().ConfigRoot) + err := initWithDefaults(os.Stdout, req.Context().ConfigRoot) if err != nil { res.SetError(debugerror.Wrap(err), cmds.ErrNormal) return @@ -155,8 +154,8 @@ func daemonFunc(req cmds.Request, res cmds.Response) { res.SetError(err, cmds.ErrNormal) return } - writef(&out, "IPFS mounted at: %s\n", fsdir) - writef(&out, "IPNS mounted at: %s\n", nsdir) + fmt.Printf("IPFS mounted at: %s\n", fsdir) + fmt.Printf("IPNS mounted at: %s\n", nsdir) } var rootRedirect corehttp.ServeOption @@ -173,10 +172,6 @@ func daemonFunc(req cmds.Request, res cmds.Response) { writable = cfg.Gateway.Writable } - if writable { - fmt.Printf("IPNS gateway mounted read-write\n") - } - if gatewayMaddr != nil { go func() { var opts = []corehttp.ServeOption{corehttp.GatewayOption(writable)} @@ -184,6 +179,9 @@ func daemonFunc(req cmds.Request, res cmds.Response) { opts = append(opts, rootRedirect) } fmt.Printf("Gateway server listening on %s\n", gatewayMaddr) + if writable { + fmt.Printf("Gateway server is writable\n") + } err := corehttp.ListenAndServe(node, gatewayMaddr.String(), opts...) if err != nil { log.Error(err) diff --git a/routing/dht/notif.go b/routing/dht/notif.go index 82e097753b2..4af2fc97876 100644 --- a/routing/dht/notif.go +++ b/routing/dht/notif.go @@ -18,6 +18,7 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) { select { case <-dht.Closing(): return + default: } dht.Update(dht.Context(), v.RemotePeer()) } @@ -27,6 +28,7 @@ func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) { select { case <-dht.Closing(): return + default: } dht.routingTable.Remove(v.RemotePeer()) } diff --git a/test/sharness/lib/test-lib.sh b/test/sharness/lib/test-lib.sh index 16cde91fd91..25b34cfd575 100644 --- a/test/sharness/lib/test-lib.sh +++ b/test/sharness/lib/test-lib.sh @@ -74,15 +74,16 @@ test_run_repeat_10_sec() { } test_wait_output_n_lines_60_sec() { - echo "$2" >expected_waitn - for i in 1 2 3 4 5 6 7 8 9 10 + for i in 1 2 3 4 5 6 do - cat "$1" | wc -l | tr -d " " >actual_waitn - test_cmp "expected_waitn" "actual_waitn" && return - sleep 2 + for i in 1 2 3 4 5 6 7 8 9 10 + do + test $(cat "$1" | wc -l | tr -d " ") -ge $2 && return + sleep 1 + done done - cat "$1" | wc -l | tr -d " " >actual_waitn - test_cmp "expected_waitn" "actual_waitn" + actual=$(cat "$1" | wc -l | tr -d " ") + fsh "expected $2 lines of output. got $actual" } test_wait_open_tcp_port_10_sec() { @@ -130,6 +131,13 @@ test_config_ipfs_gateway_writable() { test_launch_ipfs_daemon() { + ADDR_API="/ip4/127.0.0.1/tcp/5001" + ADDR_GWAY=`ipfs config Addresses.Gateway` + NLINES="2" + if test "$ADDR_GWAY" != ""; then + NLINES="3" + fi + test_expect_success "'ipfs daemon' succeeds" ' ipfs daemon >actual_daemon 2>daemon_err & ' @@ -138,19 +146,17 @@ test_launch_ipfs_daemon() { # and we make sure there are no errors test_expect_success "'ipfs daemon' is ready" ' IPFS_PID=$! && - test_run_repeat_10_sec "cat actual_daemon | grep \"API server listening on\"" && + test_wait_output_n_lines_60_sec actual_daemon $NLINES && printf "" >empty && test_cmp daemon_err empty || fsh cat actual_daemon || fsh cat daemon_err ' - ADDR_API="/ip4/127.0.0.1/tcp/5001" test_expect_success "'ipfs daemon' output includes API address" ' cat actual_daemon | grep "API server listening on $ADDR_API" || fsh cat actual_daemon || fsh "cat actual_daemon | grep \"API server listening on $ADDR_API\"" ' - ADDR_GWAY=`ipfs config Addresses.Gateway` if test "$ADDR_GWAY" != ""; then test_expect_success "'ipfs daemon' output includes Gateway address" ' cat actual_daemon | grep "Gateway server listening on $ADDR_GWAY" || diff --git a/thirdparty/notifier/notifier.go b/thirdparty/notifier/notifier.go index d493044f63a..c3721138180 100644 --- a/thirdparty/notifier/notifier.go +++ b/thirdparty/notifier/notifier.go @@ -5,6 +5,9 @@ package notifier import ( "sync" + + process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + ratelimit "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit" ) // Notifiee is a generic interface. Clients implement @@ -31,6 +34,18 @@ type Notifiee interface{} type Notifier struct { mu sync.RWMutex // guards notifiees nots map[Notifiee]struct{} + lim *ratelimit.RateLimiter +} + +// RateLimited returns a rate limited Notifier. only limit goroutines +// will be spawned. If limit is zero, no rate limiting happens. This +// is the same as `Notifier{}`. +func RateLimited(limit int) Notifier { + n := Notifier{} + if limit > 0 { + n.lim = ratelimit.NewRateLimiter(process.Background(), limit) + } + return n } // Notify signs up Notifiee e for notifications. This function @@ -107,8 +122,15 @@ func (n *Notifier) NotifyAll(notify func(Notifiee)) { n.mu.Lock() if n.nots != nil { // so that zero-value is ready to be used. for notifiee := range n.nots { - go notify(notifiee) - // TODO find a good way to rate limit this without blocking notifier. + + if n.lim == nil { // no rate limit + go notify(notifiee) + } else { + notifiee := notifiee // rebind for data races + n.lim.LimitedGo(func(worker process.Process) { + notify(notifiee) + }) + } } } n.mu.Unlock() diff --git a/thirdparty/notifier/notifier_test.go b/thirdparty/notifier/notifier_test.go index 15741bf3820..9b9692ef13e 100644 --- a/thirdparty/notifier/notifier_test.go +++ b/thirdparty/notifier/notifier_test.go @@ -4,6 +4,7 @@ import ( "fmt" "sync" "testing" + "time" ) // test data structures @@ -205,3 +206,86 @@ func TestThreadsafe(t *testing.T) { t.Error("counts disagree") } } + +type highwatermark struct { + mu sync.Mutex + mark int + limit int + errs chan error +} + +func (m *highwatermark) incr() { + m.mu.Lock() + m.mark++ + // fmt.Println("incr", m.mark) + if m.mark > m.limit { + m.errs <- fmt.Errorf("went over limit: %d/%d", m.mark, m.limit) + } + m.mu.Unlock() +} + +func (m *highwatermark) decr() { + m.mu.Lock() + m.mark-- + // fmt.Println("decr", m.mark) + if m.mark < 0 { + m.errs <- fmt.Errorf("went under zero: %d/%d", m.mark, m.limit) + } + m.mu.Unlock() +} + +func TestLimited(t *testing.T) { + timeout := 10 * time.Second // huge timeout. + limit := 9 + + hwm := highwatermark{limit: limit, errs: make(chan error, 100)} + n := RateLimited(limit) // will stop after 3 rounds + n.Notify(1) + n.Notify(2) + n.Notify(3) + + entr := make(chan struct{}) + exit := make(chan struct{}) + done := make(chan struct{}) + go func() { + for i := 0; i < 10; i++ { + // fmt.Printf("round: %d\n", i) + n.NotifyAll(func(e Notifiee) { + hwm.incr() + entr <- struct{}{} + <-exit // wait + hwm.decr() + }) + } + done <- struct{}{} + }() + + for i := 0; i < 30; { + select { + case <-entr: + continue // let as many enter as possible + case <-time.After(1 * time.Millisecond): + } + + // let one exit + select { + case <-entr: + continue // in case of timing issues. + case exit <- struct{}{}: + case <-time.After(timeout): + t.Error("got stuck") + } + i++ + } + + select { + case <-done: // two parts done + case <-time.After(timeout): + t.Error("did not finish") + } + + close(hwm.errs) + for err := range hwm.errs { + t.Error(err) + } +}