Skip to content

Commit

Permalink
fix: Handle null-related test options (#1074)
Browse files Browse the repository at this point in the history
Follow-up for #1072 & #1002
  • Loading branch information
candiduslynx committed Jul 5, 2023
1 parent 8356590 commit 88f08ee
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 61 deletions.
102 changes: 52 additions & 50 deletions plugin/nulls.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,73 +6,75 @@ import (
"github.com/apache/arrow/go/v13/arrow/memory"
)

// TODO(v4): use in v4
//
// nolint:unused
func stripNullsFromLists(records []arrow.Record) {
for i := range records {
cols := records[i].Columns()
for c, col := range cols {
if col.DataType().ID() != arrow.LIST {
func stripNullsFromLists(record arrow.Record) arrow.Record {
cols := record.Columns()
for c, col := range cols {
list, ok := col.(array.ListLike)
if !ok {
continue
}
if _, ok := list.(*array.Map); ok {
// maps also correspond to array.ListLike
continue
}

bldr := array.NewListBuilder(memory.DefaultAllocator, list.DataType().(arrow.ListLikeType).Elem())
for j := 0; j < list.Len(); j++ {
if list.IsNull(j) {
bldr.AppendNull()
continue
}

list := col.(*array.List)
bldr := array.NewListBuilder(memory.DefaultAllocator, list.DataType().(*arrow.ListType).Elem())
for j := 0; j < list.Len(); j++ {
if list.IsNull(j) {
bldr.AppendNull()
bldr.Append(true)
vBldr := bldr.ValueBuilder()
from, to := list.ValueOffsets(j)
slc := array.NewSlice(list.ListValues(), from, to)
for k := 0; k < int(to-from); k++ {
if slc.IsNull(k) {
continue
}
bldr.Append(true)
vBldr := bldr.ValueBuilder()
from, to := list.ValueOffsets(j)
slc := array.NewSlice(list.ListValues(), from, to)
for k := 0; k < int(to-from); k++ {
if slc.IsNull(k) {
continue
}
err := vBldr.AppendValueFromString(slc.ValueStr(k))
if err != nil {
panic(err)
}
err := vBldr.AppendValueFromString(slc.ValueStr(k))
if err != nil {
panic(err)
}
}
cols[c] = bldr.NewArray()
}
records[i] = array.NewRecord(records[i].Schema(), cols, records[i].NumRows())
cols[c] = bldr.NewArray()
}
return array.NewRecord(record.Schema(), cols, record.NumRows())
}

type AllowNullFunc func(arrow.DataType) bool

// TODO(v4): use in v4
//
// nolint:unused
func (f AllowNullFunc) replaceNullsByEmpty(records []arrow.Record) {
if f == nil {
return
func (s *WriterTestSuite) replaceNullsByEmpty(record arrow.Record) arrow.Record {
if s.allowNull == nil {
return record
}
for i := range records {
cols := records[i].Columns()
for c, col := range records[i].Columns() {
if col.NullN() == 0 || f(col.DataType()) {

cols := record.Columns()
for c, col := range cols {
if col.NullN() == 0 || s.allowNull(col.DataType()) {
continue
}

builder := array.NewBuilder(memory.DefaultAllocator, col.DataType())
for j := 0; j < col.Len(); j++ {
if col.IsNull(j) {
builder.AppendEmptyValue()
continue
}

builder := array.NewBuilder(memory.DefaultAllocator, records[i].Column(c).DataType())
for j := 0; j < col.Len(); j++ {
if col.IsNull(j) {
builder.AppendEmptyValue()
continue
}

if err := builder.AppendValueFromString(col.ValueStr(j)); err != nil {
panic(err)
}
if err := builder.AppendValueFromString(col.ValueStr(j)); err != nil {
panic(err)
}
cols[c] = builder.NewArray()
}
records[i] = array.NewRecord(records[i].Schema(), cols, records[i].NumRows())
cols[c] = builder.NewArray()
}
return array.NewRecord(record.Schema(), cols, record.NumRows())
}

func (s *WriterTestSuite) handleNulls(record arrow.Record) arrow.Record {
if s.ignoreNullsInLists {
record = stripNullsFromLists(record)
}
return s.replaceNullsByEmpty(record)
}
6 changes: 4 additions & 2 deletions plugin/testing_upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (s *WriterTestSuite) testUpsertBasic(ctx context.Context) error {
}); err != nil {
return fmt.Errorf("failed to insert record: %w", err)
}
record = s.handleNulls(record) // we process nulls after writing

records, err := s.plugin.readAll(ctx, table)
if err != nil {
Expand Down Expand Up @@ -79,13 +80,12 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error {

tg := schema.NewTestDataGenerator()
normalRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1})[0]
nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, NullRows: true})[0]

if err := s.plugin.writeOne(ctx, &message.WriteInsert{
Record: normalRecord,
}); err != nil {
return fmt.Errorf("failed to insert record: %w", err)
}
normalRecord = s.handleNulls(normalRecord) // we process nulls after writing

records, err := s.plugin.readAll(ctx, table)
if err != nil {
Expand All @@ -100,11 +100,13 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error {
return fmt.Errorf("record differs after insert: %s", diff)
}

nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, NullRows: true})[0]
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
Record: nullRecord,
}); err != nil {
return fmt.Errorf("failed to insert record: %w", err)
}
nullRecord = s.handleNulls(nullRecord) // we process nulls after writing

records, err = s.plugin.readAll(ctx, table)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions plugin/testing_write_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func (s *WriterTestSuite) testDeleteStale(ctx context.Context) error {
}); err != nil {
return fmt.Errorf("failed to insert record: %w", err)
}
record = s.handleNulls(record) // we process nulls after writing

records, err := s.plugin.readAll(ctx, table)
if err != nil {
Expand Down
15 changes: 7 additions & 8 deletions plugin/testing_write_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func (s *WriterTestSuite) testInsertBasic(ctx context.Context) error {
}); err != nil {
return fmt.Errorf("failed to insert record: %w", err)
}
record = s.handleNulls(record) // we process nulls after writing

readRecords, err := s.plugin.readAll(ctx, table)
if err != nil {
return fmt.Errorf("failed to sync: %w", err)
Expand Down Expand Up @@ -91,19 +93,14 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error {
return fmt.Errorf("failed to create table: %w", err)
}
tg := schema.NewTestDataGenerator()
normalRecord := tg.Generate(table, schema.GenTestDataOptions{
MaxRows: 1,
})[0]
nullRecord := tg.Generate(table, schema.GenTestDataOptions{
MaxRows: 1,
NullRows: true,
})[0]

normalRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1})[0]
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
Record: normalRecord,
}); err != nil {
return fmt.Errorf("failed to insert record: %w", err)
}
normalRecord = s.handleNulls(normalRecord) // we process nulls after writing

readRecords, err := s.plugin.readAll(ctx, table)
if err != nil {
return fmt.Errorf("failed to sync: %w", err)
Expand All @@ -114,11 +111,13 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error {
return fmt.Errorf("expected 1 item, got %d", totalItems)
}

nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, NullRows: true})[0]
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
Record: nullRecord,
}); err != nil {
return fmt.Errorf("failed to insert record: %w", err)
}
nullRecord = s.handleNulls(nullRecord) // we process nulls after writing

readRecords, err = s.plugin.readAll(ctx, table)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion plugin/testing_write_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
}
tg := schema.NewTestDataGenerator()
resource1 := tg.Generate(source, opts)[0]

if err := s.plugin.writeOne(ctx, &message.WriteInsert{
Record: resource1,
}); err != nil {
return fmt.Errorf("failed to insert first record: %w", err)
}
resource1 = s.handleNulls(resource1) // we process nulls after writing

records, err := s.plugin.readAll(ctx, source)
if err != nil {
Expand All @@ -70,6 +70,7 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
}); err != nil {
return fmt.Errorf("failed to insert second record: %w", err)
}
resource2 = s.handleNulls(resource2) // we process nulls after writing

records, err = s.plugin.readAll(ctx, target)
if err != nil {
Expand Down

0 comments on commit 88f08ee

Please sign in to comment.