Skip to content

Commit

Permalink
feat: Add JSON type schema (#1796)
Browse files Browse the repository at this point in the history
#### Summary

Part of cloudquery/cloudquery#2023 ~~Still WIP and depends on cloudquery/cloudquery-api-go#196


---
  • Loading branch information
erezrokah committed Jul 24, 2024
1 parent 7955c53 commit dbc534b
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 10 deletions.
1 change: 1 addition & 0 deletions schema/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
MetadataPrimaryKeyComponent = "cq:extension:primary_key_component"
MetadataConstraintName = "cq:extension:constraint_name"
MetadataIncremental = "cq:extension:incremental"
MetadataTypeSchema = "cq:extension:type_schema"

MetadataTrue = "true"
MetadataFalse = "false"
Expand Down
9 changes: 9 additions & 0 deletions schema/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ type Column struct {

// PrimaryKeyComponent is a flag that indicates if the column is used as part of the input to calculate the value of `_cq_id`.
PrimaryKeyComponent bool `json:"primary_key_component"`

// If the column type is JSON, this field will have a JSON string that represents the schema of the JSON object.
TypeSchema string `json:"type_schema,omitempty"`
}

// NewColumnFromArrowField creates a new Column from an arrow.Field
Expand All @@ -70,6 +73,9 @@ func NewColumnFromArrowField(f arrow.Field) Column {
v, ok = f.Metadata.GetValue(MetadataPrimaryKeyComponent)
column.PrimaryKeyComponent = ok && v == MetadataTrue

v, _ = f.Metadata.GetValue(MetadataTypeSchema)
column.TypeSchema = v

return column
}

Expand All @@ -79,6 +85,7 @@ func (c Column) ToArrowField() arrow.Field {
MetadataUnique: MetadataFalse,
MetadataIncremental: MetadataFalse,
MetadataPrimaryKeyComponent: MetadataFalse,
MetadataTypeSchema: c.TypeSchema,
}
if c.PrimaryKey {
mdKV[MetadataPrimaryKey] = MetadataTrue
Expand Down Expand Up @@ -111,6 +118,7 @@ func (c Column) MarshalJSON() ([]byte, error) {
Unique bool `json:"unique"`
IncrementalKey bool `json:"incremental_key"`
PrimaryKeyComponent bool `json:"primary_key_component"`
TypeSchema string `json:"type_schema,omitempty"`
}
var alias Alias
alias.Name = c.Name
Expand All @@ -121,6 +129,7 @@ func (c Column) MarshalJSON() ([]byte, error) {
alias.Unique = c.Unique
alias.IncrementalKey = c.IncrementalKey
alias.PrimaryKeyComponent = c.PrimaryKeyComponent
alias.TypeSchema = c.TypeSchema

return json.Marshal(alias)
}
Expand Down
9 changes: 7 additions & 2 deletions serve/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *PluginServe) writeTablesJSON(ctx context.Context, dir string) error {
}
columns := make([]cloudquery_api.PluginTableColumn, 0, len(table.Columns))
for _, column := range table.Columns {
columns = append(columns, cloudquery_api.PluginTableColumn{
c := cloudquery_api.PluginTableColumn{
Name: column.Name,
Description: column.Description,
Type: column.Type.String(),
Expand All @@ -86,7 +86,12 @@ func (s *PluginServe) writeTablesJSON(ctx context.Context, dir string) error {
// 2. If the column is a `PrimaryKey` and both of the following are true column name is NOT `_cq_id` and there are other columns that are a PrimaryKeyComponent
PrimaryKey: (column.PrimaryKey && !(column.Name == schema.CqIDColumn.Name && len(table.PrimaryKeyComponents()) > 0)) || column.PrimaryKeyComponent,
Unique: column.Unique,
})
}
if column.TypeSchema != "" {
typeSchema := column.TypeSchema
c.TypeSchema = &typeSchema
}
columns = append(columns, c)
}
tablesToEncode = append(tablesToEncode, cloudquery_api.PluginTableCreate{
Description: &table.Description,
Expand Down
120 changes: 112 additions & 8 deletions transformers/struct.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package transformers

import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"slices"
"strings"

"github.com/apache/arrow/go/v17/arrow"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/cloudquery/plugin-sdk/v4/types"
"github.com/thoas/go-funk"
)

const maxJSONTypeSchemaDepth = 5

type structTransformer struct {
table *schema.Table
skipFields []string
Expand Down Expand Up @@ -111,17 +118,11 @@ func (t *structTransformer) addColumnFromField(field reflect.StructField, parent
return nil
}

columnType, err := t.typeTransformer(field)
columnType, err := t.getColumnType(field)
if err != nil {
return fmt.Errorf("failed to transform type for field %s: %w", field.Name, err)
return err
}

if columnType == nil {
columnType, err = DefaultTypeTransformer(field)
if err != nil {
return fmt.Errorf("failed to transform type for field %s: %w", field.Name, err)
}
}
if columnType == nil {
return nil // ignored
}
Expand Down Expand Up @@ -159,6 +160,11 @@ func (t *structTransformer) addColumnFromField(field reflect.StructField, parent
IgnoreInTests: t.ignoreInTestsTransformer(field),
}

// Enrich JSON column with detailed schema
if columnType == types.ExtensionTypes.JSON {
column.TypeSchema = structSchemaToJSON(t.fieldToJSONSchema(field, 0))
}

for _, pk := range t.pkFields {
if pk == path {
// use path to allow the following
Expand Down Expand Up @@ -233,3 +239,101 @@ func TransformWithStruct(st any, opts ...StructTransformerOption) schema.Transfo
return nil
}
}

func (t *structTransformer) getColumnType(field reflect.StructField) (arrow.DataType, error) {
columnType, err := t.typeTransformer(field)
if err != nil {
return nil, fmt.Errorf("failed to transform type for field %s: %w", field.Name, err)
}

if columnType == nil {
columnType, err = DefaultTypeTransformer(field)
if err != nil {
return nil, fmt.Errorf("failed to transform type for field %s: %w", field.Name, err)
}
}
return columnType, nil
}

func structSchemaToJSON(s any) string {
b := new(bytes.Buffer)
encoder := json.NewEncoder(b)
encoder.SetEscapeHTML(false)
_ = encoder.Encode(s)
return strings.TrimSpace(b.String())
}

func normalizePointer(field reflect.StructField) reflect.Value {
if field.Type.Kind() == reflect.Ptr {
return reflect.New(field.Type.Elem())
}
return reflect.New(field.Type)
}

func (t *structTransformer) fieldToJSONSchema(field reflect.StructField, depth int) any {
transformInput := normalizePointer(field)
switch transformInput.Elem().Kind() {
case reflect.Struct:
fieldsMap := make(map[string]any)
fieldType := transformInput.Elem().Type()
for i := 0; i < fieldType.NumField(); i++ {
name, err := t.nameTransformer(fieldType.Field(i))
if err != nil {
continue
}
columnType, err := t.getColumnType(fieldType.Field(i))
if err != nil {
continue
}
if columnType == nil {
fieldsMap[name] = "any"
continue
}
// Avoid infinite recursion
if columnType == types.ExtensionTypes.JSON && depth < maxJSONTypeSchemaDepth {
fieldsMap[name] = t.fieldToJSONSchema(fieldType.Field(i), depth+1)
continue
}
asList, ok := columnType.(*arrow.ListType)
if ok {
fieldsMap[name] = []any{asList.Elem().String()}
continue
}
fieldsMap[name] = columnType.String()
}
return fieldsMap
case reflect.Map:
keySchema, ok := t.fieldToJSONSchema(reflect.StructField{
Type: field.Type.Key(),
}, depth+1).(string)
if keySchema == "" || !ok {
return ""
}
valueSchema := t.fieldToJSONSchema(reflect.StructField{
Type: field.Type.Elem(),
}, depth+1)
if valueSchema == "" {
return ""
}
return map[string]any{
keySchema: valueSchema,
}
case reflect.Slice:
valueSchema := t.fieldToJSONSchema(reflect.StructField{
Type: field.Type.Elem(),
}, depth+1)
if valueSchema == "" {
return ""
}
return []any{valueSchema}
}

columnType, err := t.getColumnType(field)
if err != nil {
return ""
}
if columnType == nil {
return "any"
}
return columnType.String()
}
146 changes: 146 additions & 0 deletions transformers/struct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,149 @@ func TestTableFromGoStruct(t *testing.T) {
})
}
}

func TestJSONTypeSchema(t *testing.T) {
tests := []struct {
name string
testStruct any
want map[string]string
}{
{
name: "simple map",
testStruct: struct {
Tags map[string]string `json:"tags"`
}{},
want: map[string]string{
"tags": `{"utf8":"utf8"}`,
},
},
{
name: "simple array",
testStruct: struct {
Items []struct {
Name string `json:"name"`
} `json:"items"`
}{},
want: map[string]string{
"items": `[{"name":"utf8"}]`,
},
},
{
name: "simple struct",
testStruct: struct {
Item struct {
Name string `json:"name"`
} `json:"item"`
}{},
want: map[string]string{
"item": `{"name":"utf8"}`,
},
},
{
name: "complex struct",
testStruct: struct {
Item struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
FlatItems []string `json:"flat_items"`
ComplexItems []struct {
Name string `json:"name"`
} `json:"complex_items"`
} `json:"item"`
}{},
want: map[string]string{
"item": `{"complex_items":[{"name":"utf8"}],"flat_items":["utf8"],"name":"utf8","tags":{"utf8":"utf8"}}`,
},
},
{
name: "multiple json columns",
testStruct: struct {
Tags map[string]string `json:"tags"`
Item struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
FlatItems []string `json:"flat_items"`
ComplexItems []struct {
Name string `json:"name"`
} `json:"complex_items"`
} `json:"item"`
}{},
want: map[string]string{
"item": `{"complex_items":[{"name":"utf8"}],"flat_items":["utf8"],"name":"utf8","tags":{"utf8":"utf8"}}`,
},
},
{
name: "handles any type in struct",
testStruct: struct {
Item struct {
Name string `json:"name"`
Object any `json:"object"`
} `json:"item"`
}{},
want: map[string]string{
"item": `{"name":"utf8","object":"any"}`,
},
},
{
name: "handles map from string to any",
testStruct: struct {
Tags map[string]any `json:"tags"`
}{},
want: map[string]string{
"tags": `{"utf8":"any"}`,
},
},
{
name: "handles array of any",
testStruct: struct {
Items []any `json:"items"`
}{},
want: map[string]string{
"items": `["any"]`,
},
},
{
name: "stops at the default depth of 5",
testStruct: struct {
Level0 struct {
Level1 struct {
Level2 struct {
Level3 struct {
Level4 struct {
Level5 struct {
Level6 struct {
Name string `json:"name"`
} `json:"level6"`
} `json:"level5"`
} `json:"level4"`
} `json:"level3"`
} `json:"level2"`
} `json:"level1"`
} `json:"level0"`
}{},
want: map[string]string{
"level0": `{"level1":{"level2":{"level3":{"level4":{"level5":{"level6":"json"}}}}}}`,
},
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
table := schema.Table{
Name: "test",
}
transformer := TransformWithStruct(tt.testStruct)
err := transformer(&table)
if err != nil {
t.Fatal(err)
}
for col, schema := range tt.want {
column := table.Column(col)
if diff := cmp.Diff(column.TypeSchema, schema); diff != "" {
t.Fatalf("table does not match expected. diff (-got, +want): %v", diff)
}
}
})
}
}

1 comment on commit dbc534b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⏱️ Benchmark results

  • Glob-8 ns/op: 92.85

Please sign in to comment.