Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[nbs] background thread writes index records #7885

Closed
wants to merge 15 commits into from
121 changes: 100 additions & 21 deletions go/store/nbs/journal_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
// journalMaybeSyncThreshold determines how much un-syncd written data
// can be outstanding to the journal before we will sync it.
journalMaybeSyncThreshold = 64 * 1024 * 1024

indexChanSize = 256
)

var (
Expand Down Expand Up @@ -94,11 +96,21 @@ func openJournalWriter(ctx context.Context, path string) (wr *journalWriter, exi
return nil, true, err
}

return &journalWriter{
recvEg, ctx := errgroup.WithContext(context.Background())
indexCh := make(chan any, indexChanSize)
wr = &journalWriter{
buf: make([]byte, 0, journalWriterBuffSize),
journal: f,
path: path,
}, true, nil
indexEg: recvEg,
indexCh: indexCh,
done: make(chan struct{}),
}
recvEg.Go(func() error {
return wr.recvIndexRecords(context.Background(), indexCh)
})

return wr, true, nil
}

func createJournalWriter(ctx context.Context, path string) (wr *journalWriter, err error) {
Expand Down Expand Up @@ -133,11 +145,56 @@ func createJournalWriter(ctx context.Context, path string) (wr *journalWriter, e
return nil, fmt.Errorf("expected file journalOffset 0, got %d", o)
}

return &journalWriter{
recvEg, ctx := errgroup.WithContext(context.Background())
indexCh := make(chan any, indexChanSize)
wr = &journalWriter{
buf: make([]byte, 0, journalWriterBuffSize),
journal: f,
path: path,
}, nil
indexEg: recvEg,
indexCh: indexCh,
done: make(chan struct{}),
}
recvEg.Go(func() error {
return wr.recvIndexRecords(context.Background(), indexCh)
})

return wr, nil
}

func (wr *journalWriter) recvIndexRecords(ctx context.Context, c chan any) error {
var batchCrc uint32
for {
select {
case <-ctx.Done():
return context.Cause(ctx)
case obj, ok := <-c:
if !ok {
return nil
}
var err error
switch l := obj.(type) {
case lookup:
batchCrc = crc32.Update(batchCrc, crcTable, l.a[:])
err = writeIndexLookup(wr.indexWriter, l)
case lookupMeta:
l.checkSum = batchCrc
batchCrc = 0
err = writeJournalIndexMeta(wr.indexWriter, l.latestHash, l.batchStart, l.batchEnd, l.checkSum)
default:
err = fmt.Errorf("invalid index channel object")
}
if err != nil {
// In the past, an index write error would synchronously block the
// originating chunk write. Several layers of abstraction now
// separate a specific chunk write from a write error. An error
// now disables journal indexing for the remainder of the engine
// process. The error is reported by |recvEg| on Close().
close(wr.done)
return err
}
}
}
}

func deleteJournalAndIndexFiles(ctx context.Context, path string) (err error) {
Expand All @@ -164,8 +221,13 @@ type journalWriter struct {
ranges rangeIndex
index *os.File
indexWriter *bufio.Writer
batchCrc uint32
maxNovel int
// indexCh linearizes index record writes
indexCh chan any
// done is used to communicate error routine
done chan struct{}
// indexEg holds the index writer thread
indexEg *errgroup.Group

lock sync.RWMutex
}
Expand Down Expand Up @@ -305,10 +367,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context, reflogRingBuffer
wr.uncmpSz += r.uncompressedPayloadSize()

a := toAddr16(r.address)
if err := writeIndexLookup(wr.indexWriter, lookup{a: a, r: rng}); err != nil {
return err
}
wr.batchCrc = crc32.Update(wr.batchCrc, crcTable, a[:])
wr.sendIndexRecord(lookup{a: a, r: rng})

case rootHashJournalRecKind:
lastOffset = o
Expand Down Expand Up @@ -427,10 +486,7 @@ func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error {
wr.ranges.put(cc.H, rng)

a := toAddr16(cc.H)
if err := writeIndexLookup(wr.indexWriter, lookup{a: a, r: rng}); err != nil {
return err
}
wr.batchCrc = crc32.Update(wr.batchCrc, crcTable, a[:])
wr.sendIndexRecord(lookup{a: a, r: rng})

// To fulfill our durability guarantees, we technically only need to
// file.Sync() the journal when we commit a new root chunk. However,
Expand All @@ -454,6 +510,18 @@ func (wr *journalWriter) writeCompressedChunk(cc CompressedChunk) error {
return nil
}

// sendIndexRecord launches a goroutine that will enqueue
// a record to the indexCh channel.
func (wr *journalWriter) sendIndexRecord(r interface{}) {
if wr.indexWriter != nil {
// block on send, unlikely to bottleneck compared to chunk writes
select {
case wr.indexCh <- r:
case <-wr.done:
}
}
}

// commitRootHash commits |root| to the journal and syncs the file to disk.
func (wr *journalWriter) commitRootHash(root hash.Hash) error {
wr.lock.Lock()
Expand Down Expand Up @@ -489,10 +557,11 @@ func (wr *journalWriter) commitRootHashUnlocked(root hash.Hash) error {
// out-of-band journal index file. Index records accelerate journal
// bootstrapping by reducing the amount of the journal that must be processed.
func (wr *journalWriter) flushIndexRecord(root hash.Hash, end int64) (err error) {
if err := writeJournalIndexMeta(wr.indexWriter, root, wr.indexed, end, wr.batchCrc); err != nil {
return err
}
wr.batchCrc = 0
wr.sendIndexRecord(lookupMeta{
batchStart: wr.indexed,
batchEnd: end,
latestHash: root,
})
wr.ranges = wr.ranges.flatten()
// set a new high-water-mark for the indexed portion of the journal
wr.indexed = end
Expand Down Expand Up @@ -629,14 +698,24 @@ func (wr *journalWriter) Close() (err error) {
return err
}
if wr.index != nil {
_ = wr.indexWriter.Flush()
_ = wr.index.Close()
close(wr.indexCh)
// wait for writer to drain |indexChan|
if werr := wr.indexEg.Wait(); werr != nil {
err = errors.Join(err, werr)
}
// ensure writes make it to disk, close file
if ferr := wr.indexWriter.Flush(); ferr != nil {
err = errors.Join(err, ferr)
}
if cerr := wr.index.Close(); cerr != nil {
err = errors.Join(err, cerr)
}
}
if cerr := wr.journal.Sync(); cerr != nil {
err = cerr
err = errors.Join(err, cerr)
}
if cerr := wr.journal.Close(); cerr != nil {
err = cerr
err = errors.Join(err, cerr)
} else {
// Nil out the journal after the file has been closed, so that it's obvious it's been closed
wr.journal = nil
Expand Down
Loading