Skip to content

Commit

Permalink
Support composite primary keys
Browse files Browse the repository at this point in the history
  • Loading branch information
asdine committed Dec 4, 2021
1 parent a7309a7 commit 8983d68
Show file tree
Hide file tree
Showing 36 changed files with 1,483 additions and 1,648 deletions.
30 changes: 30 additions & 0 deletions document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,33 @@ func (p Path) getValueFromValue(v types.Value) (types.Value, error) {

return nil, ErrFieldNotFound
}

type Paths []Path

func (p Paths) String() string {
var sb strings.Builder

for i, pt := range p {
if i > 0 {
sb.WriteString(", ")
}
sb.WriteString(pt.String())
}

return sb.String()
}

// IsEqual returns whether other is equal to p.
func (p Paths) IsEqual(other Paths) bool {
if len(other) != len(p) {
return false
}

for i := range p {
if !other[i].IsEqual(p[i]) {
return false
}
}

return true
}
9 changes: 2 additions & 7 deletions document/document_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,14 @@ import (
"github.com/genjidb/genji/document"
"github.com/genjidb/genji/internal/errors"
"github.com/genjidb/genji/internal/sql/parser"
"github.com/genjidb/genji/internal/testutil"
"github.com/genjidb/genji/internal/testutil/assert"
"github.com/genjidb/genji/types"
"github.com/stretchr/testify/require"
)

var _ types.Document = new(document.FieldBuffer)

func parsePath(t testing.TB, p string) document.Path {
path, err := parser.ParsePath(p)
assert.NoError(t, err)
return path
}

func TestFieldBuffer(t *testing.T) {
var buf document.FieldBuffer
buf.Add("a", types.NewIntegerValue(10))
Expand Down Expand Up @@ -169,7 +164,7 @@ func TestFieldBuffer(t *testing.T) {
err := json.Unmarshal([]byte(test.document), &buf)
assert.NoError(t, err)

path := parsePath(t, test.deletePath)
path := testutil.ParseDocumentPath(t, test.deletePath)

err = buf.Delete(path)
if test.fails {
Expand Down
25 changes: 11 additions & 14 deletions internal/database/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ func (c *Catalog) DropIndex(tx *Transaction, name string) error {
}

// check if the index has been created by a table constraint
if info.Owner.Path != nil {
return stringutil.Errorf("cannot drop index %s because constraint on %s(%s) requires it", info.IndexName, info.TableName, info.Owner.Path)
if len(info.Owner.Paths) > 0 {
return stringutil.Errorf("cannot drop index %s because constraint on %s(%s) requires it", info.IndexName, info.TableName, info.Owner.Paths)
}

_, err = c.Cache.Delete(tx, RelationIndexType, name)
Expand Down Expand Up @@ -697,10 +697,8 @@ func newCatalogStore() *CatalogStore {
TableConstraints: []*TableConstraint{
{
PrimaryKey: true,
Path: document.Path{
document.PathFragment{
FieldName: "name",
},
Paths: []document.Path{
document.NewPath("name"),
},
},
},
Expand Down Expand Up @@ -862,21 +860,20 @@ func sequenceInfoToDocument(seq *SequenceInfo) types.Document {
buf.Add("sql", types.NewTextValue(seq.String()))

if seq.Owner.TableName != "" {
owner := document.NewFieldBuffer().Add("table_name", types.NewTextValue(seq.Owner.TableName))
if seq.Owner.Path != nil {
owner.Add("path", types.NewTextValue(seq.Owner.Path.String()))
}

buf.Add("owner", types.NewDocumentValue(owner))
buf.Add("owner", types.NewDocumentValue(ownerToDocument(&seq.Owner)))
}

return buf
}

func ownerToDocument(owner *Owner) types.Document {
buf := document.NewFieldBuffer().Add("table_name", types.NewTextValue(owner.TableName))
if owner.Path != nil {
buf.Add("path", types.NewTextValue(owner.Path.String()))
if owner.Paths != nil {
vb := document.NewValueBuffer()
for _, p := range owner.Paths {
vb.Append(types.NewTextValue(p.String()))
}
buf.Add("paths", types.NewArrayValue(vb))
}

return buf
Expand Down
10 changes: 5 additions & 5 deletions internal/database/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestCatalogTable(t *testing.T) {
{Path: testutil.ParseDocumentPath(t, "gender"), Type: types.TextValue},
{Path: testutil.ParseDocumentPath(t, "city"), Type: types.TextValue},
}, TableConstraints: []*database.TableConstraint{
{Path: testutil.ParseDocumentPath(t, "age"), PrimaryKey: true},
{Paths: []document.Path{testutil.ParseDocumentPath(t, "age")}, PrimaryKey: true},
}}

updateCatalog(t, db, func(tx *database.Transaction, catalog *database.Catalog) error {
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestCatalogTable(t *testing.T) {
{Path: testutil.ParseDocumentPath(t, "gender"), Type: types.TextValue},
{Path: testutil.ParseDocumentPath(t, "city"), Type: types.TextValue},
}, TableConstraints: []*database.TableConstraint{
{Path: testutil.ParseDocumentPath(t, "age"), PrimaryKey: true},
{Paths: []document.Path{testutil.ParseDocumentPath(t, "age")}, PrimaryKey: true},
}}

updateCatalog(t, db, func(tx *database.Transaction, catalog *database.Catalog) error {
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestCatalogTable(t *testing.T) {

// Adding a second primary key should return an error
tcs = nil
tcs.AddPrimaryKey("foo", testutil.ParseDocumentPath(t, "address"))
tcs.AddPrimaryKey("foo", []document.Path{testutil.ParseDocumentPath(t, "address")})
err = catalog.AddFieldConstraint(tx, "foo", nil, tcs)
assert.Error(t, err)

Expand Down Expand Up @@ -320,7 +320,7 @@ func TestCatalogCreateIndex(t *testing.T) {
updateCatalog(t, db, func(tx *database.Transaction, catalog *database.Catalog) error {
return catalog.CreateTable(tx, "test", &database.TableInfo{
TableConstraints: []*database.TableConstraint{
{Path: testutil.ParseDocumentPath(t, "a"), PrimaryKey: true},
{Paths: []document.Path{testutil.ParseDocumentPath(t, "a")}, PrimaryKey: true},
},
})
})
Expand Down Expand Up @@ -484,7 +484,7 @@ func TestReadOnlyTables(t *testing.T) {
case 2:
testutil.RequireDocJSONEq(t, d, `{"name":"foo", "docid_sequence_name":"foo_seq", "sql":"CREATE TABLE foo (a INTEGER, b[3].c DOUBLE, UNIQUE (b[3].c))", "store_name":"AQ==", "type":"table"}`)
case 3:
testutil.RequireDocJSONEq(t, d, `{"name":"foo_b[3].c_idx", "owner":{"table_name":"foo", "path":"b[3].c"}, "sql":"CREATE UNIQUE INDEX `+"`foo_b[3].c_idx`"+` ON foo (b[3].c)", "store_name":"Ag==", "table_name":"foo", "type":"index"}`)
testutil.RequireDocJSONEq(t, d, `{"name":"foo_b[3].c_idx", "owner":{"table_name":"foo", "paths":["b[3].c"]}, "sql":"CREATE UNIQUE INDEX `+"`foo_b[3].c_idx`"+` ON foo (b[3].c)", "store_name":"Ag==", "table_name":"foo", "type":"index"}`)
case 4:
testutil.RequireDocJSONEq(t, d, `{"name":"foo_seq", "owner":{"table_name":"foo"}, "sql":"CREATE SEQUENCE foo_seq CACHE 64", "type":"sequence"}`)
case 5:
Expand Down
12 changes: 10 additions & 2 deletions internal/database/catalogstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func loadSequences(tx *database.Transaction, c *database.Catalog, info []databas
func loadCatalogStore(tx *database.Transaction, s *database.CatalogStore) (tables []database.TableInfo, indexes []database.IndexInfo, sequences []database.SequenceInfo, err error) {
tb := s.Table(tx)

err = tb.Iterate(nil, false, func(key tree.Key, d types.Document) error {
err = tb.IterateOnRange(nil, false, func(key tree.Key, d types.Document) error {
tp, err := d.GetByField("type")
if err != nil {
return err
Expand Down Expand Up @@ -234,7 +234,15 @@ func ownerFromDocument(d types.Document) (*database.Owner, error) {
return nil, err
}
if err == nil {
owner.Path, err = parser.ParsePath(v.V().(string))
err = v.V().(types.Array).Iterate(func(i int, value types.Value) error {
pp, err := parser.ParsePath(v.V().(string))
if err != nil {
return err
}

owner.Paths = append(owner.Paths, pp)
return nil
})
if err != nil {
return nil, err
}
Expand Down
31 changes: 10 additions & 21 deletions internal/database/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,34 +467,23 @@ func (t *inferredTableExpression) String() string {
// and not necessarily to a single field path.
type TableConstraint struct {
Name string
Path document.Path
Paths document.Paths
Check TableExpression
Unique bool
PrimaryKey bool
}

// IsEqual compares t with other member by member.
func (t *TableConstraint) IsEqual(other *TableConstraint) bool {
if t == nil {
return other == nil
}
if other == nil {
return false
}
return t.Name == other.Name && t.Path.IsEqual(other.Path) && t.Check.IsEqual(other.Check) && t.Unique == other.Unique && t.PrimaryKey == other.PrimaryKey
}

func (t *TableConstraint) String() string {
if t.Check != nil {
return stringutil.Sprintf("CHECK (%s)", t.Check)
}

if t.PrimaryKey {
return stringutil.Sprintf("PRIMARY KEY (%s)", t.Path)
return stringutil.Sprintf("PRIMARY KEY (%s)", t.Paths)
}

if t.Unique {
return stringutil.Sprintf("UNIQUE (%s)", t.Path)
return stringutil.Sprintf("UNIQUE (%s)", t.Paths)
}

return ""
Expand Down Expand Up @@ -553,15 +542,15 @@ func (t *TableConstraints) AddCheck(tableName string, e TableExpression) {
})
}

func (t *TableConstraints) AddPrimaryKey(tableName string, p document.Path) error {
func (t *TableConstraints) AddPrimaryKey(tableName string, p document.Paths) error {
for _, tc := range *t {
if tc.PrimaryKey {
return stringutil.Errorf("multiple primary keys for table %q are not allowed", tableName)
}
}

*t = append(*t, &TableConstraint{
Path: p,
Paths: p,
PrimaryKey: true,
})

Expand All @@ -570,27 +559,27 @@ func (t *TableConstraints) AddPrimaryKey(tableName string, p document.Path) erro

// AddUnique adds a unique constraint to the table.
// If the constraint is already present, it is ignored.
func (t *TableConstraints) AddUnique(p document.Path) {
func (t *TableConstraints) AddUnique(p document.Paths) {
for _, tc := range *t {
if tc.Unique && tc.Path.IsEqual(p) {
if tc.Unique && tc.Paths.IsEqual(p) {
return
}
}

*t = append(*t, &TableConstraint{
Path: p,
Paths: p,
Unique: true,
})
}

func (t *TableConstraints) Merge(other TableConstraints) error {
for _, tc := range other {
if tc.PrimaryKey {
if err := t.AddPrimaryKey(tc.Name, tc.Path); err != nil {
if err := t.AddPrimaryKey(tc.Name, tc.Paths); err != nil {
return err
}
} else if tc.Unique {
t.AddUnique(tc.Path)
t.AddUnique(tc.Paths)
} else if tc.Check != nil {
t.AddCheck(tc.Name, tc.Check)
}
Expand Down
48 changes: 4 additions & 44 deletions internal/database/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ import (
var (
// ErrIndexDuplicateValue is returned when a value is already associated with a key
ErrIndexDuplicateValue = errors.New("duplicate value")

// ErrIndexWrongArity is returned when trying to index more values that what an
// index supports.
ErrIndexWrongArity = errors.New("wrong index arity")
)

// An Index associates encoded values with keys.
Expand All @@ -28,17 +24,15 @@ var (
type Index struct {
// How many values the index is operating on.
// For example, an index created with `CREATE INDEX idx_a_b ON foo (a, b)` has an arity of 2.
Arity int
Unique bool
Tree *tree.Tree
Arity int
Tree *tree.Tree
}

// NewIndex creates an index that associates values with a list of keys.
func NewIndex(tr *tree.Tree, opts IndexInfo) *Index {
return &Index{
Tree: tr,
Arity: len(opts.Paths),
Unique: opts.Unique,
Tree: tr,
Arity: len(opts.Paths),
}
}

Expand Down Expand Up @@ -74,17 +68,6 @@ func (idx *Index) Set(vs []types.Value, key tree.Key) error {
return err
}

// if the index is unique, we need to check if the value is already associated with the key
if idx.Unique {
found, _, err := idx.Exists(vs)
if err != nil {
return err
}
if found {
return errors.Wrap(ErrIndexDuplicateValue)
}
}

return idx.Tree.Put(treeKey, nil)
}

Expand Down Expand Up @@ -160,12 +143,6 @@ func (idx *Index) Delete(vs []types.Value, key tree.Key) error {
return engine.ErrKeyNotFound
}

func (idx *Index) Iterate(pivot tree.Key, reverse bool, fn func(key tree.Key) error) error {
return idx.Tree.Iterate(pivot, reverse, idx.iterateFn(func(itmKey, key tree.Key) error {
return fn(key)
}))
}

// IterateOnRange seeks for the pivot and then goes through all the subsequent key value pairs in increasing or decreasing order and calls the given function for each pair.
// If the given function returns an error, the iteration stops and returns that error.
// If the pivot(s) is/are empty, starts from the beginning.
Expand Down Expand Up @@ -217,23 +194,6 @@ func (idx *Index) iterateOnRange(rng *tree.Range, reverse bool, fn func(itmKey t
})
}

func (idx *Index) iterateFn(fn func(itmKey tree.Key, key tree.Key) error) func(k tree.Key, v types.Value) error {
return func(k tree.Key, v types.Value) error {
// we don't care about the value, we just want to extract the key
// which is the last element of the encoded array
pos := bytes.LastIndex(k, []byte{encoding.ArrayValueDelim})

kv, err := encoding.DecodeValue(k[pos+1:])
if err != nil {
return err
}

pk := tree.Key(kv.V().([]byte))

return fn(k, pk)
}
}

// Truncate deletes all the index data.
func (idx *Index) Truncate() error {
return idx.Tree.Truncate()
Expand Down
Loading

0 comments on commit 8983d68

Please sign in to comment.