Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test/internal/nosql/cassandra test #809

Merged
merged 9 commits into from
Nov 10, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 78 additions & 40 deletions internal/db/nosql/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,61 @@ import (
)

var (
ErrNotFound = gocql.ErrNotFound
ErrUnavailable = gocql.ErrUnavailable
ErrUnsupported = gocql.ErrUnsupported
ErrTooManyStmts = gocql.ErrTooManyStmts
ErrUseStmt = gocql.ErrUseStmt
ErrSessionClosed = gocql.ErrSessionClosed
ErrNoConnections = gocql.ErrNoConnections
ErrNoKeyspace = gocql.ErrNoKeyspace
// ErrNotFound is a alias of gocql.ErrNotFound
ErrNotFound = gocql.ErrNotFound
// ErrUnavailable is a alias of gocql.ErrUnavailable
ErrUnavailable = gocql.ErrUnavailable
// ErrUnsupported is a alias of gocql.ErrUnsupported
ErrUnsupported = gocql.ErrUnsupported
// ErrTooManyStmts is a alias of gocql.ErrTooManyStmts
ErrTooManyStmts = gocql.ErrTooManyStmts
// ErrUseStmt is a alias of gocql.ErrUseStmt
ErrUseStmt = gocql.ErrUseStmt
// ErrSessionClosed is a alias of gocql.ErrSessionClosed
ErrSessionClosed = gocql.ErrSessionClosed
// ErrNoConnections is a alias of gocql.ErrNoConnections
ErrNoConnections = gocql.ErrNoConnections
// ErrNoKeyspace is a alias of gocql.ErrNoKeyspace
ErrNoKeyspace = gocql.ErrNoKeyspace
// ErrKeyspaceDoesNotExist is a alias of gocql.ErrKeyspaceDoesNotExist
ErrKeyspaceDoesNotExist = gocql.ErrKeyspaceDoesNotExist
ErrNoMetadata = gocql.ErrNoMetadata
ErrNoHosts = gocql.ErrNoHosts
// ErrNoMetadata is a alias of gocql.ErrNoMetadata
ErrNoMetadata = gocql.ErrNoMetadata
// ErrNoHosts is a alias of gocql.ErrNoHosts
ErrNoHosts = gocql.ErrNoHosts
// ErrNoConnectionsStarted is a alias of gocql.ErrNoConnectionsStarted
ErrNoConnectionsStarted = gocql.ErrNoConnectionsStarted
ErrHostQueryFailed = gocql.ErrHostQueryFailed
// ErrHostQueryFailed is a alias of gocql.ErrHostQueryFailed
ErrHostQueryFailed = gocql.ErrHostQueryFailed
)

// Cassandra represent an interface to query on cassandra
type Cassandra interface {
Open(ctx context.Context) error
Close(ctx context.Context) error
Query(stmt string, names []string) *Queryx
}

// ClusterConfig represent an interface of cassandra cluster configuation
type ClusterConfig interface {
CreateSession() (*gocql.Session, error)
}

type (
Session = gocql.Session
Cmp = qb.Cmp
BatchBuilder = qb.BatchBuilder
// Session is a alias of gocql.Session
Session = gocql.Session
// Cmp is a alias of qb.Cmp
Cmp = qb.Cmp
// BatchBuilder is a alias of qb.BatchBuilder
BatchBuilder = qb.BatchBuilder
// InsertBuilder is a alias of qb.InsertBuilder
InsertBuilder = qb.InsertBuilder
// DeleteBuilder is a alias of qb.DeleteBuilder
DeleteBuilder = qb.DeleteBuilder
// UpdateBuilder is a alias of qb.UpdateBuilder
UpdateBuilder = qb.UpdateBuilder
Queryx = gocqlx.Queryx
// Queryx is a alias of gocqlx.Queryx
Queryx = gocqlx.Queryx
)

type (
Expand All @@ -83,6 +109,11 @@ type (
dcHost string
whiteList []string
}
events struct {
kevindiu marked this conversation as resolved.
Show resolved Hide resolved
DisableNodeStatusEvents bool
DisableTopologyEvents bool
DisableSchemaEvents bool
}
client struct {
hosts []string
cqlVersion string
Expand Down Expand Up @@ -128,11 +159,12 @@ type (
dialer gocql.Dialer
writeCoalesceWaitTime time.Duration

cluster *gocql.ClusterConfig
cluster ClusterConfig
session *gocql.Session
}
)

// New initialize and return the cassandra client, or any error occurred.
func New(opts ...Option) (Cassandra, error) {
c := new(client)
for _, opt := range append(defaultOpts, opts...) {
Expand Down Expand Up @@ -216,31 +248,29 @@ func New(opts ...Option) (Cassandra, error) {
ReconnectInterval: c.reconnectInterval,
MaxWaitSchemaAgreement: c.maxWaitSchemaAgreement,
HostFilter: func() (hf gocql.HostFilter) {
if c.hostFilter.enable {
if len(c.hostFilter.dcHost) != 0 {
hf = gocql.DataCentreHostFilter(c.poolConfig.dataCenterName)
}
if len(c.hostFilter.whiteList) != 0 {
if hf == nil {
hf = gocql.WhiteListHostFilter(c.hostFilter.whiteList...)
} else {
hf = gocql.HostFilterFunc(func(host *gocql.HostInfo) bool {
return hf.Accept(host) ||
gocql.WhiteListHostFilter(c.hostFilter.whiteList...).Accept(host)
})
}
if !c.hostFilter.enable {
return nil
}

if len(c.hostFilter.dcHost) != 0 {
hf = gocql.DataCentreHostFilter(c.hostFilter.dcHost)
}
if len(c.hostFilter.whiteList) != 0 {
wlhf := gocql.WhiteListHostFilter(c.hostFilter.whiteList...)
if hf == nil {
hf = wlhf
} else {
hf = gocql.HostFilterFunc(func(host *gocql.HostInfo) bool {
return hf.Accept(host) || wlhf.Accept(host)
})
}
}
return hf
}(),
// AddressTranslator
IgnorePeerAddr: c.ignorePeerAddr,
DisableInitialHostLookup: c.disableInitialHostLookup,
Events: struct {
DisableNodeStatusEvents bool
DisableTopologyEvents bool
DisableSchemaEvents bool
}{
Events: events{
DisableNodeStatusEvents: c.disableNodeStatusEvents,
DisableTopologyEvents: c.disableTopologyEvents,
DisableSchemaEvents: c.disableSchemaEvents,
Expand Down Expand Up @@ -269,26 +299,26 @@ func New(opts ...Option) (Cassandra, error) {
return c, nil
}

func (c *client) Open(ctx context.Context) error {
session, err := c.cluster.CreateSession()
if err != nil {
// Open creates a session to cassandra and return any error occurred
func (c *client) Open(ctx context.Context) (err error) {
if c.session, err = c.cluster.CreateSession(); err != nil {
return err
}

c.session = session

return nil
}

// Close closes the session to cassandra
func (c *client) Close(ctx context.Context) error {
c.session.Close()
return nil
}

// Query creates an query that can be executed on cassandra
func (c *client) Query(stmt string, names []string) *Queryx {
return gocqlx.Query(c.session.Query(stmt), names)
}

// Select build and returns the cql string and the named args
func Select(table string, columns []string, cmps ...Cmp) (stmt string, names []string) {
sb := qb.Select(table).Columns(columns...)
for _, cmp := range cmps {
Expand All @@ -297,6 +327,7 @@ func Select(table string, columns []string, cmps ...Cmp) (stmt string, names []s
return sb.ToCql()
}

// Delete returns the delete builder
func Delete(table string, cmps ...Cmp) *DeleteBuilder {
db := qb.Delete(table)
for _, cmp := range cmps {
Expand All @@ -305,30 +336,37 @@ func Delete(table string, cmps ...Cmp) *DeleteBuilder {
return db
}

// Insert returns the insert builder
func Insert(table string, columns ...string) *InsertBuilder {
return qb.Insert(table).Columns(columns...)
}

// Update returns the update builder
func Update(table string) *UpdateBuilder {
return qb.Update(table)
}

// Batch returns the batch builder
func Batch() *BatchBuilder {
return qb.Batch()
}

// Eq returns the equal comparator
func Eq(column string) Cmp {
return qb.Eq(column)
}

// In returns the in comparator
func In(column string) Cmp {
return qb.In(column)
}

// Contains return the contains comparator
func Contains(column string) Cmp {
return qb.Contains(column)
}

// WrapErrorWithKeys wraps the cassandra error to Vald internal error
func WrapErrorWithKeys(err error, keys ...string) error {
switch err {
case ErrNotFound:
Expand Down
11 changes: 11 additions & 0 deletions internal/db/nosql/cassandra/cassandra_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package cassandra

import "github.com/gocql/gocql"

type MockClusterConfig struct {
CreateSessionFunc func() (*gocql.Session, error)
}

func (m *MockClusterConfig) CreateSession() (*gocql.Session, error) {
return m.CreateSessionFunc()
}
Loading