Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[full-ci] Backport/stable5/nats js kv fixes #10019

Draft
wants to merge 23 commits into
base: stable-5.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/bump-gomicro-plugins.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: Bump go-micro plugins pkg

Bump plugins pkg to include fix for cache delete

https://github.com/owncloud/ocis/pull/9756
9 changes: 9 additions & 0 deletions changelog/unreleased/fix-natsjskv-registry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Bugfix: Repair nats-js-kv registry

The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it.

https://github.com/owncloud/ocis/pull/9726
https://github.com/owncloud/ocis/pull/9662
https://github.com/owncloud/ocis/pull/9656
https://github.com/owncloud/ocis/pull/9654
https://github.com/owncloud/ocis/pull/9620
5 changes: 5 additions & 0 deletions changelog/unreleased/natsjskv-registry-encoding.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Encode Registry Keys

Encode the keys of the natsjskv registry as they have always been.

https://github.com/owncloud/ocis/pull/9385
5 changes: 5 additions & 0 deletions changelog/unreleased/use-less-selectors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: use less selectors that watch the registry

The proxy now shares the service selector for all host lookups.

https://github.com/owncloud/ocis/pull/9741
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/coreos/go-oidc/v3 v3.9.0
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781
github.com/cs3org/reva/v2 v2.19.8
github.com/cs3org/reva/v2 v2.19.9
github.com/dhowden/tag v0.0.0-20230630033851-978a0926ee25
github.com/dutchcoders/go-clamd v0.0.0-20170520113014-b970184f4d9e
github.com/egirna/icap-client v0.1.1
Expand Down Expand Up @@ -351,7 +351,7 @@ require (
sigs.k8s.io/yaml v1.4.0 // indirect
)

replace github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20231207143248-4d424e3ae348
replace github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240807130109-f62bb67e8c90

replace github.com/studio-b12/gowebdav => github.com/aduffeck/gowebdav v0.0.0-20231215102054-212d4a4374f6

Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1019,8 +1019,8 @@ github.com/crewjam/saml v0.4.14 h1:g9FBNx62osKusnFzs3QTN5L9CVA/Egfgm+stJShzw/c=
github.com/crewjam/saml v0.4.14/go.mod h1:UVSZCf18jJkk6GpWNVqcyQJMD5HsRugBPf4I1nl2mME=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781 h1:BUdwkIlf8IS2FasrrPg8gGPHQPOrQ18MS1Oew2tmGtY=
github.com/cs3org/go-cs3apis v0.0.0-20231023073225-7748710e0781/go.mod h1:UXha4TguuB52H14EMoSsCqDj7k8a/t7g4gVP+bgY5LY=
github.com/cs3org/reva/v2 v2.19.8 h1:MXmE2W3gh1UK1Dwj/jwQDEcSeYJBZOyDF2/D3BAcvDw=
github.com/cs3org/reva/v2 v2.19.8/go.mod h1:GRUrOp5HbFVwZTgR9bVrMZ/MvVy+Jhxw1PdMmhhKP9E=
github.com/cs3org/reva/v2 v2.19.9 h1:cdjByJRBVxD8gBAZRXcJJ6y5elCbodxuxiItyfjKuSE=
github.com/cs3org/reva/v2 v2.19.9/go.mod h1:9UKEB2IrqPOm2awixItp1G9Ax4VcdB0GSJNipCeVUuc=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/cyphar/filepath-securejoin v0.2.4 h1:Ugdm7cg7i6ZK6x3xDF1oEu1nfkyfH53EtKeQYTC3kyg=
github.com/cyphar/filepath-securejoin v0.2.4/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
Expand Down Expand Up @@ -1593,8 +1593,8 @@ github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20231207143248-4d424e3ae348 h1:Czv6AW9Suj6npWd5BLZjobdD78c2RdzBeKBgkq3jYZk=
github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20231207143248-4d424e3ae348/go.mod h1:Goi4eJ9SrKkxE6NsAVqBVNxfQFbwb7UbyII6743ldgM=
github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240807130109-f62bb67e8c90 h1:pfI8Z5yavO6fU6vDGlWhZ4BgDlvj8c6xB7J57HfTPwA=
github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240807130109-f62bb67e8c90/go.mod h1:pjcozWijkNPbEtX5SIQaxEW/h8VAVZYTLx+70bmB3LY=
github.com/kolo/xmlrpc v0.0.0-20200310150728-e0350524596b/go.mod h1:o03bZfuBwAXHetKXuInt4S7omeXUu62/A845kiycsSQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down
8 changes: 4 additions & 4 deletions ocis-pkg/natsjsregistry/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type storeOptionsKey struct{}
type expiryKey struct{}
type defaultTTLKey struct{}

// StoreOptions sets the options for the underlying store
func StoreOptions(opts []store.Option) registry.Option {
Expand All @@ -21,12 +21,12 @@ func StoreOptions(opts []store.Option) registry.Option {
}
}

// ServiceExpiry allows setting an expiry time for service registrations
func ServiceExpiry(t time.Duration) registry.Option {
// DefaultTTL allows setting a default register TTL for services
func DefaultTTL(t time.Duration) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, expiryKey{}, t)
o.Context = context.WithValue(o.Context, defaultTTLKey{}, t)
}
}
115 changes: 71 additions & 44 deletions ocis-pkg/natsjsregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
"github.com/nats-io/nats.go"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/server"
"go-micro.dev/v4/store"
"go-micro.dev/v4/util/cmd"
)
Expand All @@ -23,6 +24,8 @@ var (
_registryAddressEnv = "MICRO_REGISTRY_ADDRESS"
_registryUsernameEnv = "MICRO_REGISTRY_AUTH_USERNAME"
_registryPasswordEnv = "MICRO_REGISTRY_AUTH_PASSWORD"

_serviceDelimiter = "@"
)

func init() {
Expand All @@ -37,22 +40,22 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
for _, o := range opts {
o(&options)
}
exp, _ := options.Context.Value(expiryKey{}).(time.Duration)
defaultTTL, _ := options.Context.Value(defaultTTLKey{}).(time.Duration)
n := &storeregistry{
opts: options,
typ: _registryName,
expiry: exp,
opts: options,
typ: _registryName,
defaultTTL: defaultTTL,
}
n.store = natsjskv.NewStore(n.storeOptions(options)...)
return n
}

type storeregistry struct {
opts registry.Options
store store.Store
typ string
expiry time.Duration
lock sync.RWMutex
opts registry.Options
store store.Store
typ string
defaultTTL time.Duration
lock sync.RWMutex
}

// Init inits the registry
Expand All @@ -73,90 +76,114 @@ func (n *storeregistry) Options() registry.Options {
}

// Register adds a service to the registry
func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOption) error {
func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
n.lock.RLock()
defer n.lock.RUnlock()

if s == nil {
return errors.New("wont store nil service")
}

var options registry.RegisterOptions
options.TTL = n.defaultTTL
for _, o := range opts {
o(&options)
}

b, err := json.Marshal(s)
if err != nil {
return err
}
return n.store.Write(&store.Record{
Key: s.Name,
Key: s.Name + _serviceDelimiter + server.DefaultId + _serviceDelimiter + s.Version,
Value: b,
Expiry: n.expiry,
Expiry: options.TTL,
})
}

// Deregister removes a service from the registry
// Deregister removes a service from the registry.
func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error {
n.lock.RLock()
defer n.lock.RUnlock()

return n.store.Delete(s.Name)
return n.store.Delete(s.Name + _serviceDelimiter + server.DefaultId + _serviceDelimiter + s.Version)
}

// GetService gets a specific service from the registry
func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()

recs, err := n.store.Read(s)
if err != nil {
return nil, err
}
svcs := make([]*registry.Service, 0, len(recs))
for _, rec := range recs {
var s registry.Service
if err := json.Unmarshal(rec.Value, &s); err != nil {
return nil, err
}
svcs = append(svcs, &s)
}
return svcs, nil
// avoid listing e.g. `webfinger` when requesting `web` by adding the delimiter to the service name
return n.listServices(store.ListPrefix(s + _serviceDelimiter))
}

// ListServices lists all registered services
func (n *storeregistry) ListServices(...registry.ListOption) ([]*registry.Service, error) {
return n.listServices()
}

// Watch allowes following the changes in the registry if it would be implemented
func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) {
return NewWatcher(n)
}

// String returns the name of the registry
func (n *storeregistry) String() string {
return n.typ
}

func (n *storeregistry) listServices(opts ...store.ListOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()

keys, err := n.store.List()
keys, err := n.store.List(opts...)
if err != nil {
return nil, err
}

var svcs []*registry.Service
versions := map[string]*registry.Service{}
for _, k := range keys {
s, err := n.GetService(k)
s, err := n.getNode(k)
if err != nil {
// TODO: continue ?
return nil, err
}
svcs = append(svcs, s...)

if versions[s.Version] == nil {
versions[s.Version] = s
} else {
versions[s.Version].Nodes = append(versions[s.Version].Nodes, s.Nodes...)
}
}
svcs := make([]*registry.Service, 0, len(versions))
for _, s := range versions {
svcs = append(svcs, s)
}
return svcs, nil
}

// Watch allowes following the changes in the registry if it would be implemented
func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) {
return nil, errors.New("watcher not implemented")
}

// String returns the name of the registry
func (n *storeregistry) String() string {
return n.typ
// getNode retrieves a node from the store. It returns a service to also keep track of the version.
func (n *storeregistry) getNode(s string) (*registry.Service, error) {
recs, err := n.store.Read(s)
if err != nil {
return nil, err
}
if len(recs) == 0 {
return nil, registry.ErrNotFound
}
var svc registry.Service
if err := json.Unmarshal(recs[0].Value, &svc); err != nil {
return nil, err
}
return &svc, nil
}

func (n *storeregistry) storeOptions(opts registry.Options) []store.Option {
storeoptions := []store.Option{
store.Database("service-registry"),
store.Table("service-registry"),
natsjskv.DefaultMemory(),
natsjskv.EncodeKeys(),
}

if defaultTTL, ok := opts.Context.Value(defaultTTLKey{}).(time.Duration); ok {
storeoptions = append(storeoptions, natsjskv.DefaultTTL(defaultTTL))
}

addr := []string{"127.0.0.1:9233"}
Expand Down
78 changes: 78 additions & 0 deletions ocis-pkg/natsjsregistry/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package natsjsregistry

import (
"encoding/json"
"errors"
"strings"

natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
"github.com/nats-io/nats.go"
"go-micro.dev/v4/registry"
)

// NatsWatcher is the watcher of the nats interface
type NatsWatcher interface {
WatchAll(bucket string, opts ...nats.WatchOpt) (<-chan *natsjskv.StoreUpdate, func() error, error)
}

// Watcher is used to keep track of changes in the registry
type Watcher struct {
updates <-chan *natsjskv.StoreUpdate
stop func() error
reg *storeregistry
}

// NewWatcher returns a new watcher
func NewWatcher(s *storeregistry) (*Watcher, error) {
w, ok := s.store.(NatsWatcher)
if !ok {
return nil, errors.New("store does not implement watcher interface")
}

watcher, stop, err := w.WatchAll("service-registry")
if err != nil {
return nil, err
}

return &Watcher{
updates: watcher,
stop: stop,
reg: s,
}, nil
}

// Next returns the next result. It is a blocking call
func (w *Watcher) Next() (*registry.Result, error) {
kve := <-w.updates
if kve == nil {
return nil, errors.New("watcher stopped")
}

var svc registry.Service
if kve.Value.Data == nil {
// fake a service
parts := strings.SplitN(kve.Value.Key, _serviceDelimiter, 3)
if len(parts) != 3 {
return nil, errors.New("invalid service key")
}
svc.Name = parts[0]
// ocis registers nodes with a - separator
svc.Nodes = []*registry.Node{{Id: parts[0] + "-" + parts[1]}}
svc.Version = parts[2]
} else {
if err := json.Unmarshal(kve.Value.Data, &svc); err != nil {
_ = w.stop()
return nil, err
}
}

return &registry.Result{
Service: &svc,
Action: kve.Action,
}, nil
}

// Stop stops the watcher
func (w *Watcher) Stop() {
_ = w.stop()
}
Loading