Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Enhance compatibility with different schema versions #244

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ To make the API use go idioms, the following mappings occur:
1. OVSDB Map = Map
1. OVSDB Scalar Type = Equivalent scalar Go type

A Open vSwitch Database is modeled using a DBModel which is a created by assigning table names to pointers to these structs:
A Open vSwitch Database is modeled using a DatabaseModelRequest which is a created by assigning table names to pointers to these structs:

dbModel, _ := model.NewDBModel("OVN_Northbound", map[string]model.Model{
dbModelReq, _ := model.NewDatabaseModelRequest("OVN_Northbound", map[string]model.Model{
"Logical_Switch": &MyLogicalSwitch{},
})


Finally, a client object can be created:

ovs, _ := client.Connect(context.Background(), dbModel, client.WithEndpoint("tcp:172.18.0.4:6641"))
ovs, _ := client.Connect(context.Background(), dbModelReq, client.WithEndpoint("tcp:172.18.0.4:6641"))
client.MonitorAll(nil) // Only needed if you want to use the built-in cache


Expand Down Expand Up @@ -219,7 +219,7 @@ It can be used as follows:
Package name (default "ovsmodel")

The result will be the definition of a Model per table defined in the ovsdb schema file.
Additionally, a function called `FullDatabaseModel()` that returns the `DBModel` is created for convenience.
Additionally, a function called `FullDatabaseModel()` that returns the `DatabaseModelRequest` is created for convenience.

Example:

Expand All @@ -237,7 +237,7 @@ Run `go generate`
go generate ./...


In your application, load the DBModel, connect to the server and start interacting with the database:
In your application, load the DatabaseModelRequest, connect to the server and start interacting with the database:

import (
"fmt"
Expand All @@ -247,8 +247,8 @@ In your application, load the DBModel, connect to the server and start interacti
)

func main() {
dbModel, _ := generated.FullDatabaseModel()
ovs, _ := client.Connect(context.Background(), dbModel, client.WithEndpoint("tcp:localhost:6641"))
dbModelReq, _ := generated.FullDatabaseModel()
ovs, _ := client.Connect(context.Background(), dbModelReq, client.WithEndpoint("tcp:localhost:6641"))
ovs.MonitorAll()

// Create a *LogicalRouter, as a pointer to a Model is required by the API
Expand Down
88 changes: 43 additions & 45 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newIndex(columns ...string) index {
// RowCache is a collections of Models hashed by UUID
type RowCache struct {
name string
schema ovsdb.TableSchema
dbModel *model.DatabaseModel
dataType reflect.Type
cache map[string]model.Model
indexes columnToValue
Expand All @@ -90,7 +90,7 @@ func (r *RowCache) RowByModel(m model.Model) model.Model {
if reflect.TypeOf(m) != r.dataType {
return nil
}
info, _ := mapper.NewInfo(&r.schema, m)
info, _ := r.dbModel.NewModelInfo(m)
uuid, err := info.FieldByColumn("_uuid")
if err != nil {
return nil
Expand Down Expand Up @@ -120,11 +120,11 @@ func (r *RowCache) Create(uuid string, m model.Model, checkIndexes bool) error {
if reflect.TypeOf(m) != r.dataType {
return fmt.Errorf("expected data of type %s, but got %s", r.dataType.String(), reflect.TypeOf(m).String())
}
info, err := mapper.NewInfo(&r.schema, m)
info, err := r.dbModel.NewModelInfo(m)
if err != nil {
return err
}
newIndexes := newColumnToValue(r.schema.Indexes)
newIndexes := newColumnToValue(r.dbModel.Schema().Table(r.name).Indexes)
for index := range r.indexes {
val, err := valueFromIndex(info, index)
if err != nil {
Expand Down Expand Up @@ -156,16 +156,17 @@ func (r *RowCache) Update(uuid string, m model.Model, checkIndexes bool) error {
return fmt.Errorf("row %s does not exist", uuid)
}
oldRow := model.Clone(r.cache[uuid])
oldInfo, err := mapper.NewInfo(&r.schema, oldRow)
oldInfo, err := r.dbModel.NewModelInfo(oldRow)
if err != nil {
return err
}
newInfo, err := mapper.NewInfo(&r.schema, m)
newInfo, err := r.dbModel.NewModelInfo(m)
if err != nil {
return err
}
newIndexes := newColumnToValue(r.schema.Indexes)
oldIndexes := newColumnToValue(r.schema.Indexes)
indexes := r.dbModel.Schema().Table(r.name).Indexes
newIndexes := newColumnToValue(indexes)
oldIndexes := newColumnToValue(indexes)
var errs []error
for index := range r.indexes {
var err error
Expand Down Expand Up @@ -218,7 +219,7 @@ func (r *RowCache) Update(uuid string, m model.Model, checkIndexes bool) error {
}

func (r *RowCache) IndexExists(row model.Model) error {
info, err := mapper.NewInfo(&r.schema, row)
info, err := r.dbModel.NewModelInfo(row)
if err != nil {
return err
}
Expand Down Expand Up @@ -252,7 +253,7 @@ func (r *RowCache) Delete(uuid string) error {
return fmt.Errorf("row %s does not exist", uuid)
}
oldRow := r.cache[uuid]
oldInfo, err := mapper.NewInfo(&r.schema, oldRow)
oldInfo, err := r.dbModel.NewModelInfo(oldRow)
if err != nil {
return err
}
Expand Down Expand Up @@ -280,6 +281,7 @@ func (r *RowCache) Rows() []string {

func (r *RowCache) RowsByCondition(conditions []ovsdb.Condition) ([]model.Model, error) {
var results []model.Model
schema := r.dbModel.Schema().Table(r.name)
if len(conditions) == 0 {
uuids := r.Rows()
for _, uuid := range uuids {
Expand Down Expand Up @@ -308,7 +310,7 @@ func (r *RowCache) RowsByCondition(conditions []ovsdb.Condition) ([]model.Model,
}
} else if index, err := r.Index(condition.Column); err != nil {
for k, v := range index {
tSchema := r.schema.Columns[condition.Column]
tSchema := schema.Columns[condition.Column]
nativeValue, err := ovsdb.OvsToNative(tSchema, condition.Value)
if err != nil {
return nil, err
Expand All @@ -325,7 +327,7 @@ func (r *RowCache) RowsByCondition(conditions []ovsdb.Condition) ([]model.Model,
} else {
for _, uuid := range r.Rows() {
row := r.Row(uuid)
info, err := mapper.NewInfo(&r.schema, row)
info, err := r.dbModel.NewModelInfo(row)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -406,9 +408,7 @@ func (e *EventHandlerFuncs) OnDelete(table string, row model.Model) {
type TableCache struct {
cache map[string]*RowCache
eventProcessor *eventProcessor
mapper *mapper.Mapper
dbModel *model.DBModel
schema *ovsdb.DatabaseSchema
dbModel *model.DatabaseModel
ovsdb.NotificationHandler
mutex sync.RWMutex
}
Expand All @@ -417,18 +417,18 @@ type TableCache struct {
type Data map[string]map[string]model.Model

// NewTableCache creates a new TableCache
func NewTableCache(schema *ovsdb.DatabaseSchema, dbModel *model.DBModel, data Data) (*TableCache, error) {
if schema == nil || dbModel == nil {
return nil, fmt.Errorf("tablecache without databasemodel cannot be populated")
func NewTableCache(dbModel *model.DatabaseModel, data Data) (*TableCache, error) {
if !dbModel.Valid() {
return nil, fmt.Errorf("tablecache without valid databasemodel cannot be populated")
}
eventProcessor := newEventProcessor(bufferSize)
cache := make(map[string]*RowCache)
tableTypes := dbModel.Types()
for name, tableSchema := range schema.Tables {
cache[name] = newRowCache(name, tableSchema, tableTypes[name])
for name := range dbModel.Schema().Tables {
cache[name] = newRowCache(name, dbModel, tableTypes[name])
}
for table, rowData := range data {
if _, ok := schema.Tables[table]; !ok {
if _, ok := dbModel.Schema().Tables[table]; !ok {
return nil, fmt.Errorf("table %s is not in schema", table)
}
for uuid, row := range rowData {
Expand All @@ -439,21 +439,19 @@ func NewTableCache(schema *ovsdb.DatabaseSchema, dbModel *model.DBModel, data Da
}
return &TableCache{
cache: cache,
schema: schema,
eventProcessor: eventProcessor,
mapper: mapper.NewMapper(schema),
dbModel: dbModel,
mutex: sync.RWMutex{},
}, nil
}

// Mapper returns the mapper
func (t *TableCache) Mapper() *mapper.Mapper {
return t.mapper
return t.dbModel.Mapper()
}

// DBModel returns the DBModel
func (t *TableCache) DBModel() *model.DBModel {
// DatabaseModelRequest returns the DatabaseModelRequest
func (t *TableCache) DatabaseModel() *model.DatabaseModel {
return t.dbModel
}

Expand Down Expand Up @@ -624,13 +622,14 @@ func (t *TableCache) Populate2(tableUpdates ovsdb.TableUpdates2) {
}

// Purge drops all data in the cache and reinitializes it using the
// provided schema
func (t *TableCache) Purge(schema *ovsdb.DatabaseSchema) {
// provided database model
func (t *TableCache) Purge(dbModel *model.DatabaseModel) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.dbModel = dbModel
tableTypes := t.dbModel.Types()
for name, tableSchema := range t.schema.Tables {
t.cache[name] = newRowCache(name, tableSchema, tableTypes[name])
for name := range t.dbModel.Schema().Tables {
t.cache[name] = newRowCache(name, t.dbModel, tableTypes[name])
}
}

Expand All @@ -646,11 +645,11 @@ func (t *TableCache) Run(stopCh <-chan struct{}) {

// newRowCache creates a new row cache with the provided data
// if the data is nil, and empty RowCache will be created
func newRowCache(name string, schema ovsdb.TableSchema, dataType reflect.Type) *RowCache {
func newRowCache(name string, dbModel *model.DatabaseModel, dataType reflect.Type) *RowCache {
r := &RowCache{
name: name,
schema: schema,
indexes: newColumnToValue(schema.Indexes),
dbModel: dbModel,
indexes: newColumnToValue(dbModel.Schema().Table(name).Indexes),
dataType: dataType,
cache: make(map[string]model.Model),
mutex: sync.RWMutex{},
Expand Down Expand Up @@ -756,26 +755,25 @@ func (e *eventProcessor) Run(stopCh <-chan struct{}) {

// CreateModel creates a new Model instance based on the Row information
func (t *TableCache) CreateModel(tableName string, row *ovsdb.Row, uuid string) (model.Model, error) {
table := t.mapper.Schema.Table(tableName)
table := t.dbModel.Schema().Table(tableName)
if table == nil {
return nil, fmt.Errorf("table %s not found", tableName)
}
model, err := t.dbModel.NewModel(tableName)
if err != nil {
return nil, err
}

err = t.mapper.GetRowData(tableName, row, model)
info, err := t.dbModel.NewModelInfo(model)
if err != nil {
return nil, err
}
err = t.dbModel.Mapper().GetRowData(row, info)
if err != nil {
return nil, err
}

if uuid != "" {
mapperInfo, err := mapper.NewInfo(table, model)
if err != nil {
return nil, err
}
if err := mapperInfo.SetField("_uuid", uuid); err != nil {
if err := info.SetField("_uuid", uuid); err != nil {
return nil, err
}
}
Expand All @@ -786,20 +784,20 @@ func (t *TableCache) CreateModel(tableName string, row *ovsdb.Row, uuid string)
// ApplyModifications applies the contents of a RowUpdate2.Modify to a model
// nolint: gocyclo
func (t *TableCache) ApplyModifications(tableName string, base model.Model, update ovsdb.Row) error {
table := t.mapper.Schema.Table(tableName)
table := t.dbModel.Schema().Table(tableName)
if table == nil {
return fmt.Errorf("table %s not found", tableName)
}
schema := t.schema.Table(tableName)
schema := t.dbModel.Schema().Table(tableName)
if schema == nil {
return fmt.Errorf("no schema for table %s", tableName)
}
info, err := mapper.NewInfo(schema, base)
info, err := t.dbModel.NewModelInfo(base)
if err != nil {
return err
}
for k, v := range update {
if k == "_uuid" {
if k == "_uuid" || !t.dbModel.HasColumn(tableName, k) {
continue
}

Expand Down
Loading