Skip to content

Commit

Permalink
[rowexec] custom rowexec
Browse files Browse the repository at this point in the history
  • Loading branch information
max-hoffman committed Jun 25, 2024
1 parent 12f2a8d commit c531bce
Show file tree
Hide file tree
Showing 12 changed files with 593 additions and 23 deletions.
3 changes: 2 additions & 1 deletion go/cmd/dolt/commands/engine/sqlengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package engine
import (
"context"
"fmt"
rowexec2 "github.com/dolthub/dolt/go/libraries/doltcore/sqle/rowexec"
"os"
"runtime"
"strconv"
Expand Down Expand Up @@ -194,7 +195,7 @@ func NewSqlEngine(
statsPro := statspro.NewProvider(pro, statsnoms.NewNomsStatsFactory(mrEnv.RemoteDialProvider()))
engine.Analyzer.Catalog.StatsProvider = statsPro

engine.Analyzer.ExecBuilder = rowexec.DefaultBuilder
engine.Analyzer.ExecBuilder = rowexec.NewOverrideBuilder(rowexec2.Builder{})
sessFactory := doltSessionFactory(pro, statsPro, mrEnv.Config(), bcController, config.Autocommit)
sqlEngine.provider = pro
sqlEngine.contextFactory = sqlContextFactory()
Expand Down
2 changes: 1 addition & 1 deletion go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
github.com/cespare/xxhash v1.1.0
github.com/creasty/defaults v1.6.0
github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2
github.com/dolthub/go-mysql-server v0.18.2-0.20240621214638-7bec4655f469
github.com/dolthub/go-mysql-server v0.18.2-0.20240625211941-08961bf342e1
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63
github.com/dolthub/swiss v0.1.0
github.com/goccy/go-json v0.10.2
Expand Down
4 changes: 2 additions & 2 deletions go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e h1:kPsT4a47cw1+y/N5SSCkma7FhAPw7KeGmD6c9PBZW9Y=
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e/go.mod h1:KPUcpx070QOfJK1gNe0zx4pA5sicIK1GMikIGLKC168=
github.com/dolthub/go-mysql-server v0.18.2-0.20240621214638-7bec4655f469 h1:V2gkE6tUU49uR5U8+RqNjQNvNw333xKcDWbwIixnBNQ=
github.com/dolthub/go-mysql-server v0.18.2-0.20240621214638-7bec4655f469/go.mod h1:XdiHsd2TX3OOhjwY6tPcw1ztT2BdBiP6Wp0m/7OYHn4=
github.com/dolthub/go-mysql-server v0.18.2-0.20240625211941-08961bf342e1 h1:vmY5IV0CKAwTaK8oGMfm1n0Hc0i2+7JWz5zMrVZX/Rk=
github.com/dolthub/go-mysql-server v0.18.2-0.20240625211941-08961bf342e1/go.mod h1:XdiHsd2TX3OOhjwY6tPcw1ztT2BdBiP6Wp0m/7OYHn4=
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63 h1:OAsXLAPL4du6tfbBgK0xXHZkOlos63RdKYS3Sgw/dfI=
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63/go.mod h1:lV7lUeuDhH5thVGDCKXbatwKy2KW80L4rMT46n+Y2/Q=
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488 h1:0HHu0GWJH0N6a6keStrHhUAK5/o9LVfkh44pvsV4514=
Expand Down
14 changes: 3 additions & 11 deletions go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestQueries(t *testing.T) {
}

func TestSingleQuery(t *testing.T) {
t.Skip()
//t.Skip()

harness := newDoltHarness(t)
harness.Setup(setup.SimpleSetup...)
Expand All @@ -80,17 +80,9 @@ func TestSingleQuery(t *testing.T) {

var test queries.QueryTest
test = queries.QueryTest{
Query: `show create table mytable`,
Query: `select /*+ LOOKUP_JOIN(m1,m2) */ * from mytable m1 join mytable m2 on m1.i = m2.i`,
Expected: []sql.Row{
{"mytable",
"CREATE TABLE `mytable` (\n" +
" `i` bigint NOT NULL,\n" +
" `s` varchar(20) NOT NULL COMMENT 'column s',\n" +
" PRIMARY KEY (`i`),\n" +
" KEY `idx_si` (`s`,`i`),\n" +
" KEY `mytable_i_s` (`i`,`s`),\n" +
" UNIQUE KEY `mytable_s` (`s`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_bin"},
{},
},
}

Expand Down
5 changes: 3 additions & 2 deletions go/libraries/doltcore/sqle/enginetest/dolt_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package enginetest
import (
"context"
"fmt"
rowexec2 "github.com/dolthub/dolt/go/libraries/doltcore/sqle/rowexec"
"github.com/dolthub/go-mysql-server/sql/rowexec"
"runtime"
"strings"
"testing"
Expand All @@ -27,7 +29,6 @@ import (
"github.com/dolthub/go-mysql-server/memory"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/mysql_db"
"github.com/dolthub/go-mysql-server/sql/rowexec"
"github.com/stretchr/testify/require"

"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
Expand Down Expand Up @@ -251,7 +252,7 @@ func (d *DoltHarness) NewEngine(t *testing.T) (enginetest.QueryEngine, error) {
if err != nil {
return nil, err
}
e.Analyzer.ExecBuilder = rowexec.DefaultBuilder
e.Analyzer.ExecBuilder = rowexec.NewOverrideBuilder(rowexec2.Builder{})
d.engine = e

ctx := enginetest.NewContext(d)
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/sqle/index/index_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func NewLookupBuilder(
func newCoveringLookupBuilder(b *baseLookupBuilder) *coveringLookupBuilder {
var keyMap, valMap, ordMap val.OrdinalMapping
if b.idx.IsPrimaryKey() {
keyMap, valMap, ordMap = primaryIndexMapping(b.idx, b.sch, b.projections)
keyMap, valMap, ordMap = primaryIndexMapping(b.idx, b.projections)
} else {
keyMap, ordMap = coveringIndexMapping(b.idx, b.projections)
}
Expand Down
6 changes: 3 additions & 3 deletions go/libraries/doltcore/sqle/index/prolly_index_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func newProllyCoveringIndexIter(

var keyMap, valMap, ordMap val.OrdinalMapping
if idx.IsPrimaryKey() {
keyMap, valMap, ordMap = primaryIndexMapping(idx, pkSch, projections)
keyMap, valMap, ordMap = primaryIndexMapping(idx, projections)
} else {
keyMap, ordMap = coveringIndexMapping(idx, projections)
}
Expand Down Expand Up @@ -265,8 +265,8 @@ func coveringIndexMapping(d DoltIndex, projections []uint64) (keyMap, ordMap val
return
}

func primaryIndexMapping(idx DoltIndex, sqlSch sql.PrimaryKeySchema, projections []uint64) (keyProj, valProj, ordProj val.OrdinalMapping) {
return projectionMappingsForIndex(idx.Schema(), projections)
func primaryIndexMapping(idx DoltIndex, projections []uint64) (keyProj, valProj, ordProj val.OrdinalMapping) {
return ProjectionMappingsForIndex(idx.Schema(), projections)
}

type prollyKeylessIndexIter struct {
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/sqle/index/prolly_row_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewKeylessProllyRowIter(
// projectionMappings returns data structures that specify 1) which fields we read
// from key and value tuples, and 2) the position of those fields in the output row.
func projectionMappings(sch schema.Schema, projections []uint64) (keyMap, valMap, ordMap val.OrdinalMapping) {
keyMap, valMap, ordMap = projectionMappingsForIndex(sch, projections)
keyMap, valMap, ordMap = ProjectionMappingsForIndex(sch, projections)
adjustOffsetsForKeylessTable(sch, keyMap, valMap)
return keyMap, valMap, ordMap
}
Expand All @@ -126,7 +126,7 @@ func adjustOffsetsForKeylessTable(sch schema.Schema, keyMap val.OrdinalMapping,
}
}

func projectionMappingsForIndex(sch schema.Schema, projections []uint64) (keyMap, valMap, ordMap val.OrdinalMapping) {
func ProjectionMappingsForIndex(sch schema.Schema, projections []uint64) (keyMap, valMap, ordMap val.OrdinalMapping) {
pks := sch.GetPKCols()
nonPks := sch.GetNonPKCols()

Expand Down
4 changes: 4 additions & 0 deletions go/libraries/doltcore/sqle/indexed_dolt_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func NewIndexedDoltTable(t *DoltTable, idx index.DoltIndex) *IndexedDoltTable {
var _ sql.IndexedTable = (*IndexedDoltTable)(nil)
var _ sql.CommentedTable = (*IndexedDoltTable)(nil)

func (idt *IndexedDoltTable) Index() index.DoltIndex {
return idt.idx
}

func (idt *IndexedDoltTable) LookupPartitions(ctx *sql.Context, lookup sql.IndexLookup) (sql.PartitionIter, error) {
return index.NewRangePartitionIter(ctx, idt.DoltTable, lookup, idt.isDoltFormat)
}
Expand Down
229 changes: 229 additions & 0 deletions go/libraries/doltcore/sqle/rowexec/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package rowexec

import (
"context"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/val"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/plan"

"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/store/prolly"
)

type Builder struct{}

var _ sql.NodeExecBuilder = (*Builder)(nil)

func (b Builder) Build(ctx *sql.Context, n sql.Node, r sql.Row) (sql.RowIter, error) {
switch n := n.(type) {
case *plan.JoinNode:
if n.Op.IsLookup() {
if ita, ok := getIta(n.Right()); ok {
if dstIter, dstSchema, dstTags, dstFilter, err := getSourceKvIter(ctx, n.Right()); err == nil && dstSchema != nil {
if srcIter, srcSchema, srcTags, srcFilter, err := getSourceKvIter(ctx, n.Left()); err == nil && dstSchema != nil {
return rowIterTableLookupJoin(ctx, srcIter, dstIter, srcSchema, dstSchema, srcTags, dstTags, ita.Expressions(), srcFilter, dstFilter)
}
}
}
}
default:
}
return nil, nil
}

func getIta(n sql.Node) (*plan.IndexedTableAccess, bool) {
switch n := n.(type) {
case *plan.TableAlias:
return getIta(n.Child)
case *plan.IndexedTableAccess:
return n, true
case *plan.Filter:
return getIta(n.Child)
default:
return nil, false
}
}

type sqlRowJoiner struct {
// first |split| are from source
srcSplit int
ns tree.NodeStore

srcKd val.TupleDesc
srcVd val.TupleDesc
tgtKd val.TupleDesc
tgtVd val.TupleDesc

ordMappings []int
srcKeyMappings []int
srcValMappings []int
tgtKeyMappings []int
tgtValMappings []int
}

func (m *sqlRowJoiner) buildRow(ctx context.Context, srcKey, srcVal, tgtKey, tgtVal val.Tuple) (sql.Row, error) {
row := make(sql.Row, len(m.ordMappings))
var err error
for i, idx := range m.srcKeyMappings {
outputIdx := m.ordMappings[i]
row[outputIdx], err = tree.GetField(ctx, m.srcKd, idx, srcKey, m.ns)
if err != nil {
return nil, err
}
}
for i, idx := range m.srcValMappings {
outputIdx := m.ordMappings[len(m.srcKeyMappings)+i]
row[outputIdx], err = tree.GetField(ctx, m.srcVd, idx, srcVal, m.ns)
if err != nil {
return nil, err
}
}
for i, idx := range m.tgtKeyMappings {
outputIdx := m.ordMappings[m.srcSplit+i]
row[outputIdx], err = tree.GetField(ctx, m.tgtKd, idx, tgtKey, m.ns)
if err != nil {
return nil, err
}
}
for i, idx := range m.tgtValMappings {
outputIdx := m.ordMappings[m.srcSplit+len(m.tgtKeyMappings)+i]
row[outputIdx], err = tree.GetField(ctx, m.tgtVd, idx, tgtVal, m.ns)
if err != nil {
return nil, err
}
}
return row, nil
}

func getPrimaryLookupRowJoiner(src, tgt schema.Schema, srcSplit int, projections []uint64) *sqlRowJoiner {
numPhysicalColumns := len(projections)
if schema.IsVirtual(src) {
numPhysicalColumns = 0
for i, t := range projections {
if idx, ok := src.GetAllCols().TagToIdx[t]; ok && !src.GetAllCols().GetByIndex(idx).Virtual {
numPhysicalColumns++
srcSplit = i
}
if idx, ok := tgt.GetAllCols().TagToIdx[t]; ok && !tgt.GetAllCols().GetByIndex(idx).Virtual {
numPhysicalColumns++
}
}
}

allMap := make([]int, 2*numPhysicalColumns)
// | srcKey | srcVal | trgKey | trg val | ords |

keyIdx := 0
nonKeyIdx := srcSplit - 1
keyCols := src.GetPKCols()
valCols := src.GetNonPKCols()
var firstPkSplit int
for projNum, tag := range projections {
if projNum == srcSplit {
firstPkSplit = keyIdx
keyIdx = srcSplit
nonKeyIdx = len(projections) - 1
keyCols = tgt.GetPKCols()
valCols = tgt.GetNonPKCols()
}
if idx, ok := keyCols.StoredIndexByTag(tag); ok && !keyCols.GetByStoredIndex(idx).Virtual {
allMap[keyIdx] = idx
allMap[numPhysicalColumns+keyIdx] = projNum
keyIdx++
} else if idx, ok := valCols.StoredIndexByTag(tag); ok && !valCols.GetByStoredIndex(idx).Virtual {
allMap[nonKeyIdx] = idx
allMap[numPhysicalColumns+nonKeyIdx] = projNum
nonKeyIdx--
}
}

return &sqlRowJoiner{
srcSplit: srcSplit,
srcKeyMappings: allMap[:firstPkSplit],
srcValMappings: allMap[firstPkSplit:srcSplit],
tgtKeyMappings: allMap[srcSplit:keyIdx],
tgtValMappings: allMap[keyIdx:numPhysicalColumns],
ordMappings: allMap[numPhysicalColumns:],
srcKd: src.GetKeyDescriptor(),
srcVd: src.GetValueDescriptor(),
tgtKd: tgt.GetKeyDescriptor(),
tgtVd: tgt.GetValueDescriptor(),
}
}

func getMap(ctx *sql.Context, dt *sqle.DoltTable) (prolly.Map, schema.Schema, error) {
table, err := dt.DoltTable(ctx)
if err != nil {
return prolly.Map{}, nil, err
}

priIndex, err := table.GetRowData(ctx)
if err != nil {
return prolly.Map{}, nil, err
}

sch, err := table.GetSchema(ctx)
if err != nil {
return prolly.Map{}, nil, err
}

return durable.ProllyMapFromIndex(priIndex), sch, nil
}

func getSourceKvIter(ctx *sql.Context, n sql.Node) (prolly.Map, schema.Schema, []uint64, sql.Expression, error) {
var table *doltdb.Table
var tags []uint64
var err error
switch n := n.(type) {
case *plan.TableAlias:
return getSourceKvIter(ctx, n.Child)
case *plan.Filter:
m, s, t, _, err := getSourceKvIter(ctx, n.Child)
if err != nil {
return prolly.Map{}, nil, nil, nil, err
}
return m, s, t, n.Expression, nil
case *plan.ResolvedTable, *plan.IndexedTableAccess:
switch dt := n.(sql.TableNode).UnderlyingTable().(type) {
case *sqle.WritableDoltTable:
tags = dt.ProjectedTags()
table, err = dt.DoltTable.DoltTable(ctx)
case *sqle.WritableIndexedDoltTable:
tags = dt.ProjectedTags()
table, err = dt.DoltTable.DoltTable(ctx)
case *sqle.IndexedDoltTable:
tags = dt.ProjectedTags()
table, err = dt.DoltTable.DoltTable(ctx)
case *sqle.AlterableDoltTable:
tags = dt.ProjectedTags()
table, err = dt.DoltTable.DoltTable(ctx)
case *sqle.DoltTable:
tags = dt.ProjectedTags()
table, err = dt.DoltTable(ctx)
default:
return prolly.Map{}, nil, nil, nil, nil
}
default:
return prolly.Map{}, nil, nil, nil, nil
}
if err != nil {
return prolly.Map{}, nil, nil, nil, err
}

priIndex, err := table.GetRowData(ctx)
if err != nil {
return prolly.Map{}, nil, nil, nil, err
}

m := durable.ProllyMapFromIndex(priIndex)
sch, err := table.GetSchema(ctx)
if err != nil {
return prolly.Map{}, nil, nil, nil, err
}

return m, sch, tags, nil, nil
}
Loading

0 comments on commit c531bce

Please sign in to comment.