From 8ac4151b739de70b7d1d8e2bbedbc53f5541470a Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 30 Oct 2015 15:56:29 -0700 Subject: [PATCH 1/3] better implementation of mirrorwriter --- option.go | 2 +- writer.go | 199 ++++++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 163 insertions(+), 38 deletions(-) diff --git a/option.go b/option.go index 8c5a59c191d..4772f9881f4 100644 --- a/option.go +++ b/option.go @@ -7,7 +7,7 @@ import ( ) // Global writer group for logs to output to -var WriterGroup = new(MirrorWriter) +var WriterGroup = NewMirrorWriter() type Option func() diff --git a/writer.go b/writer.go index 6f38bbe6068..34d726e2a25 100644 --- a/writer.go +++ b/writer.go @@ -3,64 +3,189 @@ package log import ( "io" "sync" - "time" ) +var MaxWriterBuffer = 16 * 1024 * 1024 + +var log = Logger("eventlog") + type MirrorWriter struct { - writers []io.WriteCloser - lk sync.Mutex + active bool + activelk sync.Mutex + + // channel for incoming writers + writerAdd chan io.WriteCloser + + // slices of writer/sync-channel pairs + writers []*writerSync + + // synchronization channel for incoming writes + msgSync chan []byte + + // channel for dead writers to notify the MirrorWriter + deadWriter chan io.WriteCloser +} + +type writerSync struct { + w io.WriteCloser + br chan []byte +} + +func NewMirrorWriter() *MirrorWriter { + mw := &MirrorWriter{ + msgSync: make(chan []byte, 64), // sufficiently large buffer to avoid callers waiting + writerAdd: make(chan io.WriteCloser), + deadWriter: make(chan io.WriteCloser), + } + + go mw.logRoutine() + + return mw } func (mw *MirrorWriter) Write(b []byte) (int, error) { - mw.lk.Lock() - // write to all writers, and nil out the broken ones. - var dropped bool - done := make(chan error, 1) - for i, w := range mw.writers { - go func(out chan error) { + mw.msgSync <- b + return len(b), nil +} + +func (mw *MirrorWriter) handleWriter(w io.WriteCloser, msgs <-chan []byte) { + bufsize := 0 + bufBase := make([][]byte, 0, 16) // some initial memory + buffered := bufBase + nextCh := make(chan []byte) + + var nextMsg []byte + var send chan []byte + + go func() { + for b := range nextCh { _, err := w.Write(b) - out <- err - }(done) - select { - case err := <-done: if err != nil { - mw.writers[i].Close() - mw.writers[i] = nil - dropped = true + log.Info("eventlog write error: %s", err) + return + } + } + }() + + // collect and buffer messages + for { + select { + case b := <-msgs: + if len(buffered) == 0 { + nextMsg = b + send = nextCh + } else { + bufsize += len(b) + buffered = append(buffered, b) + if bufsize > MaxWriterBuffer { + // if we have too many messages buffered, kill the writer + w.Close() + mw.deadWriter <- w + close(nextCh) + return + } + } + case send <- nextMsg: + if len(buffered) > 0 { + nextMsg = buffered[0] + buffered = buffered[1:] + bufsize -= len(nextMsg) } - case <-time.After(time.Millisecond * 500): - mw.writers[i].Close() - mw.writers[i] = nil - dropped = true - // clear channel out - done = make(chan error, 1) + if len(buffered) == 0 { + // reset slice position + buffered = bufBase[:0] + + nextMsg = nil + send = nil + } } } +} + +func (mw *MirrorWriter) logRoutine() { + for { + select { + case b := <-mw.msgSync: + // write to all writers + dropped := mw.broadcastMessage(b) - // consolidate the slice - if dropped { - writers := mw.writers - mw.writers = nil - for _, w := range writers { - if w != nil { - mw.writers = append(mw.writers, w) + // consolidate the slice + if dropped { + mw.clearDeadWriters() } + case w := <-mw.writerAdd: + brchan := make(chan []byte, 1) // buffered for absent-handoffs to not cause delays + mw.writers = append(mw.writers, &writerSync{ + w: w, + br: brchan, + }) + go mw.handleWriter(w, brchan) + + mw.activelk.Lock() + mw.active = true + mw.activelk.Unlock() } } - mw.lk.Unlock() - return len(b), nil +} + +// broadcastMessage sends the given message to every writer +// if any writer is killed during the send, 'true' is returned +func (mw *MirrorWriter) broadcastMessage(b []byte) bool { + var dropped bool + for i, w := range mw.writers { + if w == nil { + // if the next writer was killed before we got + // to it move on + continue + } + + for sending := true; sending; { + // loop until we send the message, or the current writer is killed + select { + case w.br <- b: + // success! + sending = false + + case dw := <-mw.deadWriter: + // some writer was killed while waiting here + for j, w := range mw.writers { + if w.w == dw { + mw.writers[j] = nil + if i == j { + sending = false + } + } + } + dropped = true + } + } + } + return dropped +} + +func (mw *MirrorWriter) clearDeadWriters() { + writers := mw.writers + mw.writers = nil + for _, w := range writers { + if w != nil { + mw.writers = append(mw.writers, w) + } + } + if len(mw.writers) == 0 { + mw.activelk.Lock() + mw.active = false + mw.activelk.Unlock() + } } func (mw *MirrorWriter) AddWriter(w io.WriteCloser) { - mw.lk.Lock() - mw.writers = append(mw.writers, w) - mw.lk.Unlock() + mw.writerAdd <- w } func (mw *MirrorWriter) Active() (active bool) { - mw.lk.Lock() - active = len(mw.writers) > 0 - mw.lk.Unlock() + mw.activelk.Lock() + active = mw.active + mw.activelk.Unlock() return } From 3c6cfa597ed9898451daa8219d98e513d5654a7c Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 30 Oct 2015 17:50:45 -0700 Subject: [PATCH 2/3] split out bufwriter --- writer.go | 225 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 131 insertions(+), 94 deletions(-) diff --git a/writer.go b/writer.go index 34d726e2a25..5d3649b8ca1 100644 --- a/writer.go +++ b/writer.go @@ -1,11 +1,12 @@ package log import ( + "fmt" "io" "sync" ) -var MaxWriterBuffer = 16 * 1024 * 1024 +var MaxWriterBuffer = 512 * 1024 var log = Logger("eventlog") @@ -17,13 +18,10 @@ type MirrorWriter struct { writerAdd chan io.WriteCloser // slices of writer/sync-channel pairs - writers []*writerSync + writers []*bufWriter // synchronization channel for incoming writes msgSync chan []byte - - // channel for dead writers to notify the MirrorWriter - deadWriter chan io.WriteCloser } type writerSync struct { @@ -33,9 +31,8 @@ type writerSync struct { func NewMirrorWriter() *MirrorWriter { mw := &MirrorWriter{ - msgSync: make(chan []byte, 64), // sufficiently large buffer to avoid callers waiting - writerAdd: make(chan io.WriteCloser), - deadWriter: make(chan io.WriteCloser), + msgSync: make(chan []byte, 64), // sufficiently large buffer to avoid callers waiting + writerAdd: make(chan io.WriteCloser), } go mw.logRoutine() @@ -48,65 +45,24 @@ func (mw *MirrorWriter) Write(b []byte) (int, error) { return len(b), nil } -func (mw *MirrorWriter) handleWriter(w io.WriteCloser, msgs <-chan []byte) { - bufsize := 0 - bufBase := make([][]byte, 0, 16) // some initial memory - buffered := bufBase - nextCh := make(chan []byte) - - var nextMsg []byte - var send chan []byte - - go func() { - for b := range nextCh { - _, err := w.Write(b) - if err != nil { - log.Info("eventlog write error: %s", err) - return - } - } - }() - - // collect and buffer messages - for { - select { - case b := <-msgs: - if len(buffered) == 0 { - nextMsg = b - send = nextCh - } else { - bufsize += len(b) - buffered = append(buffered, b) - if bufsize > MaxWriterBuffer { - // if we have too many messages buffered, kill the writer - w.Close() - mw.deadWriter <- w - close(nextCh) - return - } - } - case send <- nextMsg: - if len(buffered) > 0 { - nextMsg = buffered[0] - buffered = buffered[1:] - bufsize -= len(nextMsg) - } - - if len(buffered) == 0 { - // reset slice position - buffered = bufBase[:0] - - nextMsg = nil - send = nil - } - } - } +func (mw *MirrorWriter) Close() error { + // it is up to the caller to ensure that write is not called during or + // after close is called. + close(mw.msgSync) + return nil } func (mw *MirrorWriter) logRoutine() { + // rebind to avoid races on nilling out struct fields + msgSync := mw.msgSync + writerAdd := mw.writerAdd + for { select { - case b := <-mw.msgSync: + case b, ok := <-msgSync: + if !ok { + return + } // write to all writers dropped := mw.broadcastMessage(b) @@ -114,13 +70,8 @@ func (mw *MirrorWriter) logRoutine() { if dropped { mw.clearDeadWriters() } - case w := <-mw.writerAdd: - brchan := make(chan []byte, 1) // buffered for absent-handoffs to not cause delays - mw.writers = append(mw.writers, &writerSync{ - w: w, - br: brchan, - }) - go mw.handleWriter(w, brchan) + case w := <-writerAdd: + mw.writers = append(mw.writers, newBufWriter(w)) mw.activelk.Lock() mw.active = true @@ -134,31 +85,10 @@ func (mw *MirrorWriter) logRoutine() { func (mw *MirrorWriter) broadcastMessage(b []byte) bool { var dropped bool for i, w := range mw.writers { - if w == nil { - // if the next writer was killed before we got - // to it move on - continue - } - - for sending := true; sending; { - // loop until we send the message, or the current writer is killed - select { - case w.br <- b: - // success! - sending = false - - case dw := <-mw.deadWriter: - // some writer was killed while waiting here - for j, w := range mw.writers { - if w.w == dw { - mw.writers[j] = nil - if i == j { - sending = false - } - } - } - dropped = true - } + _, err := w.Write(b) + if err != nil { + mw.writers[i] = nil + dropped = true } } return dropped @@ -189,3 +119,110 @@ func (mw *MirrorWriter) Active() (active bool) { mw.activelk.Unlock() return } + +func newBufWriter(w io.WriteCloser) *bufWriter { + bw := &bufWriter{ + writer: w, + incoming: make(chan []byte, 1), + } + + go bw.loop() + return bw +} + +type bufWriter struct { + writer io.WriteCloser + + incoming chan []byte + + deathLock sync.Mutex + dead bool +} + +var errDeadWriter = fmt.Errorf("writer is dead") + +func (bw *bufWriter) Write(b []byte) (int, error) { + bw.deathLock.Lock() + dead := bw.dead + bw.deathLock.Unlock() + if dead { + if bw.incoming != nil { + close(bw.incoming) + bw.incoming = nil + } + return 0, errDeadWriter + } + + bw.incoming <- b + return len(b), nil +} + +func (bw *bufWriter) die() { + bw.deathLock.Lock() + bw.dead = true + bw.writer.Close() + bw.deathLock.Unlock() +} + +func (bw *bufWriter) loop() { + bufsize := 0 + bufBase := make([][]byte, 0, 16) // some initial memory + buffered := bufBase + nextCh := make(chan []byte) + + var nextMsg []byte + var send chan []byte + + go func() { + for b := range nextCh { + _, err := bw.writer.Write(b) + if err != nil { + log.Info("eventlog write error: %s", err) + bw.die() + return + } + } + }() + + // collect and buffer messages + incoming := bw.incoming + for { + select { + case b, ok := <-incoming: + if !ok { + return + } + if len(buffered) == 0 { + nextMsg = b + send = nextCh + } else { + bufsize += len(b) + buffered = append(buffered, b) + if bufsize > MaxWriterBuffer { + // if we have too many messages buffered, kill the writer + bw.die() + close(nextCh) + // explicity keep going here to drain incoming + } + } + case send <- nextMsg: + // if 'send' is equal to nil, this case will never trigger. + // Taking advantage of that, when we have sent all of our buffered + // messages, we nil out the channel until more arrive. This way we + // can 'turn off' the writer until there is more to be written. + if len(buffered) > 0 { + nextMsg = buffered[0] + buffered = buffered[1:] + bufsize -= len(nextMsg) + } + + if len(buffered) == 0 { + // reset slice position + buffered = bufBase[:0] + + nextMsg = nil + send = nil + } + } + } +} From 90a77d26d33b1d98d861b59734485ff09a675984 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 2 Nov 2015 13:33:20 -0800 Subject: [PATCH 3/3] add tests and simplify some channel logic --- writer.go | 56 ++++++++++------- writer_test.go | 160 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 194 insertions(+), 22 deletions(-) create mode 100644 writer_test.go diff --git a/writer.go b/writer.go index 5d3649b8ca1..b5f1b9a3c49 100644 --- a/writer.go +++ b/writer.go @@ -41,7 +41,9 @@ func NewMirrorWriter() *MirrorWriter { } func (mw *MirrorWriter) Write(b []byte) (int, error) { - mw.msgSync <- b + mycopy := make([]byte, len(b)) + copy(mycopy, b) + mw.msgSync <- mycopy return len(b), nil } @@ -52,17 +54,26 @@ func (mw *MirrorWriter) Close() error { return nil } +func (mw *MirrorWriter) doClose() { + for _, w := range mw.writers { + w.writer.Close() + } +} + func (mw *MirrorWriter) logRoutine() { // rebind to avoid races on nilling out struct fields msgSync := mw.msgSync writerAdd := mw.writerAdd + defer mw.doClose() + for { select { case b, ok := <-msgSync: if !ok { return } + // write to all writers dropped := mw.broadcastMessage(b) @@ -171,7 +182,6 @@ func (bw *bufWriter) loop() { nextCh := make(chan []byte) var nextMsg []byte - var send chan []byte go func() { for b := range nextCh { @@ -187,29 +197,34 @@ func (bw *bufWriter) loop() { // collect and buffer messages incoming := bw.incoming for { + if nextMsg == nil || nextCh == nil { + // nextCh == nil implies we are 'dead' and draining the incoming channel + // until the caller notices and closes it for us + select { + case b, ok := <-incoming: + if !ok { + return + } + nextMsg = b + } + } + select { case b, ok := <-incoming: if !ok { return } - if len(buffered) == 0 { - nextMsg = b - send = nextCh - } else { - bufsize += len(b) - buffered = append(buffered, b) - if bufsize > MaxWriterBuffer { - // if we have too many messages buffered, kill the writer - bw.die() - close(nextCh) - // explicity keep going here to drain incoming - } + bufsize += len(b) + buffered = append(buffered, b) + if bufsize > MaxWriterBuffer { + // if we have too many messages buffered, kill the writer + bw.die() + close(nextCh) + nextCh = nil + // explicity keep going here to drain incoming } - case send <- nextMsg: - // if 'send' is equal to nil, this case will never trigger. - // Taking advantage of that, when we have sent all of our buffered - // messages, we nil out the channel until more arrive. This way we - // can 'turn off' the writer until there is more to be written. + case nextCh <- nextMsg: + nextMsg = nil if len(buffered) > 0 { nextMsg = buffered[0] buffered = buffered[1:] @@ -219,9 +234,6 @@ func (bw *bufWriter) loop() { if len(buffered) == 0 { // reset slice position buffered = bufBase[:0] - - nextMsg = nil - send = nil } } } diff --git a/writer_test.go b/writer_test.go new file mode 100644 index 00000000000..55466abc05d --- /dev/null +++ b/writer_test.go @@ -0,0 +1,160 @@ +package log + +import ( + "fmt" + "hash/fnv" + "io" + "sync" + "testing" + "time" + + randbo "github.com/dustin/randbo" +) + +type hangwriter struct { + c chan struct{} +} + +func newHangWriter() *hangwriter { + return &hangwriter{make(chan struct{})} +} + +func (hw *hangwriter) Write([]byte) (int, error) { + <-make(chan struct{}) + return 0, fmt.Errorf("write on closed writer") +} + +func (hw *hangwriter) Close() error { + close(hw.c) + return nil +} + +func TestMirrorWriterHang(t *testing.T) { + mw := NewMirrorWriter() + + hw := newHangWriter() + pr, pw := io.Pipe() + + mw.AddWriter(hw) + mw.AddWriter(pw) + + msg := "Hello!" + mw.Write([]byte(msg)) + + // make sure writes through can happen even with one writer hanging + done := make(chan struct{}) + go func() { + buf := make([]byte, 10) + n, err := pr.Read(buf) + if err != nil { + t.Fatal(err) + } + + if n != len(msg) { + t.Fatal("read wrong amount") + } + + if string(buf[:n]) != msg { + t.Fatal("didnt read right content") + } + + done <- struct{}{} + }() + + select { + case <-time.After(time.Second * 5): + t.Fatal("write to mirrorwriter hung") + case <-done: + } + + if !mw.Active() { + t.Fatal("writer should still be active") + } + + pw.Close() + + if !mw.Active() { + t.Fatal("writer should still be active") + } + + // now we just have the hangwriter + + // write a bunch to it + buf := make([]byte, 8192) + for i := 0; i < 128; i++ { + mw.Write(buf) + } + + // wait for goroutines to sync up + time.Sleep(time.Millisecond * 500) + + // the hangwriter should have been killed, causing the mirrorwriter to be inactive now + if mw.Active() { + t.Fatal("should be inactive now") + } +} + +func TestStress(t *testing.T) { + mw := NewMirrorWriter() + + nreaders := 20 + + var readers []io.Reader + for i := 0; i < nreaders; i++ { + pr, pw := io.Pipe() + mw.AddWriter(pw) + readers = append(readers, pr) + } + + hashout := make(chan []byte) + + numwriters := 20 + writesize := 1024 + writecount := 300 + + f := func(r io.Reader) { + h := fnv.New64a() + sum, err := io.Copy(h, r) + if err != nil { + t.Fatal(err) + } + + if sum != int64(numwriters*writesize*writecount) { + t.Fatal("read wrong number of bytes") + } + + hashout <- h.Sum(nil) + } + + for _, r := range readers { + go f(r) + } + + work := sync.WaitGroup{} + for i := 0; i < numwriters; i++ { + work.Add(1) + go func() { + defer work.Done() + r := randbo.New() + buf := make([]byte, writesize) + for j := 0; j < writecount; j++ { + r.Read(buf) + mw.Write(buf) + time.Sleep(time.Millisecond * 5) + } + }() + } + + work.Wait() + mw.Close() + + check := make(map[string]bool) + for i := 0; i < nreaders; i++ { + h := <-hashout + check[string(h)] = true + } + + if len(check) > 1 { + t.Fatal("writers received different data!") + } +}