Skip to content

Commit

Permalink
Add support for transient databases
Browse files Browse the repository at this point in the history
This adds an internal API that allows creating
temporary databases on demand for creating indexes
on demand and deleting them.
  • Loading branch information
asdine committed Aug 16, 2021
1 parent 4f74b22 commit a45d2c1
Show file tree
Hide file tree
Showing 19 changed files with 495 additions and 43 deletions.
32 changes: 32 additions & 0 deletions engine/badgerengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@ package badgerengine
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"os"
"path/filepath"
"time"

"github.com/dgraph-io/badger/v3"
"github.com/dgraph-io/badger/v3/options"
"github.com/genjidb/genji/engine"
)

Expand All @@ -18,6 +25,8 @@ const (
// Engine represents a Badger engine.
type Engine struct {
DB *badger.DB

transient bool
}

// NewEngine creates a Badger engine. It takes the same argument as Badger's Open function.
Expand Down Expand Up @@ -50,6 +59,29 @@ func (e *Engine) Begin(ctx context.Context, opts engine.TxOptions) (engine.Trans
}, nil
}

func (e *Engine) NewTransientEngine(ctx context.Context) (engine.Engine, error) {
// build engine with fast options
opt := badger.DefaultOptions(filepath.Join(os.TempDir(), fmt.Sprintf(".genji-transient-%d", time.Now().Unix()+rand.Int63())))
opt.Compression = options.None

ng, err := NewEngine(opt)
if err != nil {
return nil, err
}
ng.transient = true
return ng, nil
}

func (e *Engine) Drop(ctx context.Context) error {
if !e.transient {
return errors.New("cannot drop persistent engine")
}

_ = e.Close()

return os.RemoveAll(e.DB.Opts().Dir)
}

// Close the engine and underlying Badger database.
func (e *Engine) Close() error {
return e.DB.Close()
Expand Down
21 changes: 21 additions & 0 deletions engine/badgerengine/engine_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package badgerengine_test

import (
"context"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -29,6 +30,26 @@ func TestBadgerEngine(t *testing.T) {
enginetest.TestSuite(t, builder(t))
}

func TestTransient(t *testing.T) {
var ng badgerengine.Engine

tng, err := ng.NewTransientEngine(context.Background())
require.NoError(t, err)

dir := tng.(*badgerengine.Engine).DB.Opts().Dir

tx, err := tng.Begin(context.Background(), engine.TxOptions{Writable: true})
require.NoError(t, err)
err = tx.Rollback()
require.NoError(t, err)

err = tng.Drop(context.Background())
require.NoError(t, err)

_, err = os.Stat(dir)
require.True(t, os.IsNotExist(err))
}

func BenchmarkBadgerEngineStorePut(b *testing.B) {
enginetest.BenchmarkStorePut(b, builder(b))
}
Expand Down
34 changes: 34 additions & 0 deletions engine/boltengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ package boltengine
import (
"context"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"os"
"path/filepath"
"time"

"github.com/genjidb/genji/engine"
bolt "go.etcd.io/bbolt"
Expand All @@ -18,6 +23,8 @@ const (
// Engine represents a BoltDB engine. Each store is stored in a dedicated bucket.
type Engine struct {
DB *bolt.DB

transient bool
}

// NewEngine creates a BoltDB engine. It takes the same argument as Bolt's Open function.
Expand Down Expand Up @@ -52,6 +59,33 @@ func (e *Engine) Begin(ctx context.Context, opts engine.TxOptions) (engine.Trans
}, nil
}

func (e *Engine) NewTransientEngine(ctx context.Context) (engine.Engine, error) {
// build engine with fast options
ng, err := NewEngine(filepath.Join(os.TempDir(), fmt.Sprintf(".genji-transient-%d.db", time.Now().UnixNano()+rand.Int63())), 0600, &bolt.Options{
NoFreelistSync: true,
FreelistType: bolt.FreelistMapType,
NoSync: true,
})
if err != nil {
return nil, err
}

ng.transient = true
return ng, nil
}

func (e *Engine) Drop(ctx context.Context) error {
if !e.transient {
return errors.New("cannot drop persistent engine")
}

p := e.DB.Path()

_ = e.Close()

return os.Remove(p)
}

// Close the engine and underlying Bolt database.
func (e *Engine) Close() error {
return e.DB.Close()
Expand Down
21 changes: 21 additions & 0 deletions engine/boltengine/engine_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package boltengine_test

import (
"context"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -41,3 +42,23 @@ func tempDir(t require.TestingT) (string, func()) {
os.RemoveAll(dir)
}
}

func TestTransient(t *testing.T) {
var ng boltengine.Engine

tng, err := ng.NewTransientEngine(context.Background())
require.NoError(t, err)

path := tng.(*boltengine.Engine).DB.Path()

tx, err := tng.Begin(context.Background(), engine.TxOptions{Writable: true})
require.NoError(t, err)
err = tx.Rollback()
require.NoError(t, err)

err = tng.Drop(context.Background())
require.NoError(t, err)

_, err = os.Stat(path)
require.True(t, os.IsNotExist(err))
}
14 changes: 14 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ type Engine interface {
// or true, respectively.
// The behaviour of opening a transaction when another one is already opened depends on the implementation.
Begin(ctx context.Context, opts TxOptions) (Transaction, error)
// A transient engine is a database
// used to create temporary indices.
// It should ideally be optimized for writes,
// and not reside solely in memory as it will be
// used to index entire tables.
// This database is not expected to be crash safe
// or support any recovery mechanism, as the Commit
// method will never be used.
// However, it might be reused across multiple transactions.
NewTransientEngine(ctx context.Context) (Engine, error)
// Drop releases any resource (files, memory, etc.) used by a transient database.
// It must return an error if the engine has not been created
// with NewTransientEngine.
Drop(ctx context.Context) error
// Close the engine after ensuring all the transactions have completed.
Close() error
}
Expand Down
16 changes: 16 additions & 0 deletions engine/enginetest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ func TestEngine(t *testing.T, builder Builder) {

require.NoError(t, ng.Close())
})

t.Run("Drop", func(t *testing.T) {
ng, cleanup := builder()
defer cleanup()

// drop should fail if the engine is not transient
err := ng.Drop(context.Background())
require.Error(t, err)
require.NoError(t, ng.Close())

tng, err := ng.NewTransientEngine(context.Background())
require.NoError(t, err)

err = tng.Drop(context.Background())
require.NoError(t, err)
})
}

// TestTransactionCommitRollback runs a list of tests to verify Commit and Rollback
Expand Down
19 changes: 17 additions & 2 deletions engine/memoryengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ const btreeDegree = 12
// Engine is a simple memory engine implementation that stores data in
// an in-memory Btree. It is not thread safe.
type Engine struct {
Closed bool
stores map[string]*btree.BTree
Closed bool
stores map[string]*btree.BTree
transient bool
}

// NewEngine creates an in-memory engine.
Expand All @@ -43,6 +44,20 @@ func (ng *Engine) Begin(ctx context.Context, opts engine.TxOptions) (engine.Tran
return &transaction{ctx: ctx, ng: ng, writable: opts.Writable}, nil
}

func (ng *Engine) NewTransientEngine(ctx context.Context) (engine.Engine, error) {
e := NewEngine()
e.transient = true
return e, nil
}

func (ng *Engine) Drop(ctx context.Context) error {
if !ng.transient {
return errors.New("cannot drop persistent engine")
}

return nil
}

// Close the engine.
func (ng *Engine) Close() error {
if ng.Closed {
Expand Down
6 changes: 3 additions & 3 deletions internal/database/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ OUTER:
return err
}

err = c.buildIndex(tx, idx, tb)
err = c.BuildIndex(tx, idx, tb)
return err
}

Expand Down Expand Up @@ -432,10 +432,10 @@ func (c *Catalog) ReIndex(tx *Transaction, indexName string) error {
return err
}

return c.buildIndex(tx, idx, tb)
return c.BuildIndex(tx, idx, tb)
}

func (c *Catalog) buildIndex(tx *Transaction, idx *Index, table *Table) error {
func (c *Catalog) BuildIndex(tx *Transaction, idx *Index, table *Table) error {
return table.Iterate(func(d types.Document) error {
var err error
values := make([]types.Value, len(idx.Info.Paths))
Expand Down
26 changes: 26 additions & 0 deletions internal/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Database struct {

// This controls concurrency on read-only and read/write transactions.
txmu *sync.RWMutex

// Pool of reusable transient engines to use for temporary indices.
TransientEnginePool *TransientEnginePool
}

type Options struct {
Expand Down Expand Up @@ -57,6 +60,9 @@ func New(ctx context.Context, ng engine.Engine, opts Options) (*Database, error)
Codec: opts.Codec,
Catalog: NewCatalog(),
txmu: &sync.RWMutex{},
TransientEnginePool: &TransientEnginePool{
ng: ng,
},
}

tx, err := db.Begin(true)
Expand All @@ -78,6 +84,26 @@ func New(ctx context.Context, ng engine.Engine, opts Options) (*Database, error)
return &db, nil
}

// NewTransientDB creates a temporary database to be used for creating temporary indices.
func (db *Database) NewTransientDB(ctx context.Context) (*Database, func() error, error) {
ng, err := db.TransientEnginePool.Get(context.Background())
if err != nil {
return nil, nil, err
}

tdb, err := New(ctx, ng, Options{Codec: db.Codec})
if err != nil {
_ = ng.Close()
_ = db.TransientEnginePool.Release(context.Background(), ng)
return nil, nil, err
}

return tdb, func() error {
_ = tdb.Close()
return db.TransientEnginePool.Release(context.Background(), ng)
}, nil
}

// Close the database.
func (db *Database) Close() error {
// If there is an attached transaction
Expand Down
47 changes: 47 additions & 0 deletions internal/database/transient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package database

import (
"context"
"sync"

"github.com/genjidb/genji/engine"
)

const maxTransientPoolSize = 3

// TransientEnginePool manages a pool of transient engines.
// It keeps a pool of maxTransientPoolSize engines.
type TransientEnginePool struct {
ng engine.Engine

mu sync.Mutex
pool []engine.Engine
}

// Get returns a free engine from the pool, if any. Otherwise it creates a new engine
// and returns it.
func (t *TransientEnginePool) Get(ctx context.Context) (engine.Engine, error) {
t.mu.Lock()
defer t.mu.Unlock()

if len(t.pool) > 0 {
ng := t.pool[len(t.pool)-1]
t.pool = t.pool[:len(t.pool)-1]
return ng, nil
}

return t.ng.NewTransientEngine(ctx)
}

// Release sets the engine for reuse. If the pool is full, it drops the given engine.
func (t *TransientEnginePool) Release(ctx context.Context, ng engine.Engine) error {
t.mu.Lock()
defer t.mu.Unlock()

if len(t.pool) >= maxTransientPoolSize {
return ng.Drop(ctx)
}

t.pool = append(t.pool, ng)
return nil
}
Loading

0 comments on commit a45d2c1

Please sign in to comment.