Skip to content

Commit

Permalink
Merge pull request #9654 from kobergj/FixRegistryTTL
Browse files Browse the repository at this point in the history
Fix issues with natsjskv registry
  • Loading branch information
kobergj authored Jul 19, 2024
2 parents 4f5728d + c421507 commit 7cf939a
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 28 deletions.
1 change: 1 addition & 0 deletions changelog/unreleased/fix-natsjskv-registry.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ Bugfix: Repair nats-js-kv registry

The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it.

https://github.com/owncloud/ocis/pull/9654
https://github.com/owncloud/ocis/pull/9620
8 changes: 4 additions & 4 deletions ocis-pkg/natsjsregistry/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

type storeOptionsKey struct{}
type expiryKey struct{}
type defaultTTLKey struct{}

// StoreOptions sets the options for the underlying store
func StoreOptions(opts []store.Option) registry.Option {
Expand All @@ -21,12 +21,12 @@ func StoreOptions(opts []store.Option) registry.Option {
}
}

// ServiceExpiry allows setting an expiry time for service registrations
func ServiceExpiry(t time.Duration) registry.Option {
// DefaultTTL allows setting a default register TTL for services
func DefaultTTL(t time.Duration) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, expiryKey{}, t)
o.Context = context.WithValue(o.Context, defaultTTLKey{}, t)
}
}
32 changes: 21 additions & 11 deletions ocis-pkg/natsjsregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,22 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
for _, o := range opts {
o(&options)
}
exp, _ := options.Context.Value(expiryKey{}).(time.Duration)
defaultTTL, _ := options.Context.Value(defaultTTLKey{}).(time.Duration)
n := &storeregistry{
opts: options,
typ: _registryName,
expiry: exp,
opts: options,
typ: _registryName,
defaultTTL: defaultTTL,
}
n.store = natsjskv.NewStore(n.storeOptions(options)...)
return n
}

type storeregistry struct {
opts registry.Options
store store.Store
typ string
expiry time.Duration
lock sync.RWMutex
opts registry.Options
store store.Store
typ string
defaultTTL time.Duration
lock sync.RWMutex
}

// Init inits the registry
Expand All @@ -76,14 +76,20 @@ func (n *storeregistry) Options() registry.Options {
}

// Register adds a service to the registry
func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOption) error {
func (n *storeregistry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
n.lock.RLock()
defer n.lock.RUnlock()

if s == nil {
return errors.New("wont store nil service")
}

var options registry.RegisterOptions
options.TTL = n.defaultTTL
for _, o := range opts {
o(&options)
}

unique := uuid.New().String()
if s.Metadata == nil {
s.Metadata = make(map[string]string)
Expand All @@ -97,7 +103,7 @@ func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOpti
return n.store.Write(&store.Record{
Key: s.Name + _serviceDelimiter + unique,
Value: b,
Expiry: n.expiry,
Expiry: options.TTL,
})
}

Expand Down Expand Up @@ -180,6 +186,10 @@ func (n *storeregistry) storeOptions(opts registry.Options) []store.Option {
natsjskv.EncodeKeys(),
}

if defaultTTL, ok := opts.Context.Value(defaultTTLKey{}).(time.Duration); ok {
storeoptions = append(storeoptions, natsjskv.DefaultTTL(defaultTTL))
}

addr := []string{"127.0.0.1:9233"}
if len(opts.Addrs) > 0 {
addr = opts.Addrs
Expand Down
33 changes: 33 additions & 0 deletions ocis-pkg/registry/expiry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package registry

import (
"os"
"time"
)

const (
_registryRegisterIntervalEnv = "EXPERIMENTAL_REGISTER_INTERVAL"
_registryRegisterTTLEnv = "EXPERIMENTAL_REGISTER_TTL"

// Note: _defaultRegisterInterval should always be lower than _defaultRegisterTTL
_defaultRegisterInterval = time.Second * 25
_defaultRegisterTTL = time.Second * 30
)

// GetRegisterInterval returns the register interval from the environment.
func GetRegisterInterval() time.Duration {
d, err := time.ParseDuration(os.Getenv(_registryRegisterIntervalEnv))
if err != nil {
return _defaultRegisterInterval
}
return d
}

// GetRegisterTTL returns the register TTL from the environment.
func GetRegisterTTL() time.Duration {
d, err := time.ParseDuration(os.Getenv(_registryRegisterTTLEnv))
if err != nil {
return _defaultRegisterTTL
}
return d
}
4 changes: 2 additions & 2 deletions ocis-pkg/registry/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ func RegisterService(ctx context.Context, service *mRegistry.Service, logger log

logger.Info().Msgf("registering external service %v@%v", node.Id, node.Address)

rOpts := []mRegistry.RegisterOption{mRegistry.RegisterTTL(time.Minute)}
rOpts := []mRegistry.RegisterOption{mRegistry.RegisterTTL(GetRegisterTTL())}
if err := registry.Register(service, rOpts...); err != nil {
logger.Fatal().Err(err).Msgf("Registration error for external service %v", service.Name)
}

t := time.NewTicker(time.Second * 30)
t := time.NewTicker(GetRegisterInterval())

go func() {
for {
Expand Down
14 changes: 9 additions & 5 deletions ocis-pkg/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ var (

// Config is the config for a registry
type Config struct {
Type string `mapstructure:"type"`
Addresses []string `mapstructure:"addresses"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
DisableCache bool `mapstructure:"disable_cache"`
Type string `mapstructure:"type"`
Addresses []string `mapstructure:"addresses"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
DisableCache bool `mapstructure:"disable_cache"`
RegisterTTL time.Duration `mapstructure:"register_ttl"`
}

// Option allows configuring the registry
Expand All @@ -62,6 +63,7 @@ func GetRegistry(opts ...Option) mRegistry.Registry {
case "natsjs", "nats-js", "nats-js-kv": // for backwards compatibility - we will stick with one of those
_reg = natsjsregistry.NewRegistry(
mRegistry.Addrs(cfg.Addresses...),
natsjsregistry.DefaultTTL(cfg.RegisterTTL),
)
case "memory":
_reg = memr.NewRegistry()
Expand Down Expand Up @@ -119,6 +121,8 @@ func getEnvs(opts ...Option) *Config {
cfg.Addresses = s
}

cfg.RegisterTTL = GetRegisterTTL()

for _, o := range opts {
o(cfg)
}
Expand Down
5 changes: 2 additions & 3 deletions ocis-pkg/service/grpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"crypto/tls"
"fmt"
"strings"
"time"

mgrpcs "github.com/go-micro/plugins/v4/server/grpc"
"github.com/go-micro/plugins/v4/wrapper/monitoring/prometheus"
Expand Down Expand Up @@ -59,8 +58,8 @@ func NewServiceWithClient(client client.Client, opts ...Option) (Service, error)
micro.Version(sopts.Version),
micro.Context(sopts.Context),
micro.Registry(registry.GetRegistry()),
micro.RegisterTTL(time.Second * 30),
micro.RegisterInterval(time.Second * 10),
micro.RegisterTTL(registry.GetRegisterTTL()),
micro.RegisterInterval(registry.GetRegisterInterval()),
micro.WrapHandler(prometheus.NewHandlerWrapper()),
micro.WrapClient(mtracer.NewClientWrapper(
mtracer.WithTraceProvider(sopts.TraceProvider),
Expand Down
5 changes: 2 additions & 3 deletions ocis-pkg/service/http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"crypto/tls"
"fmt"
"strings"
"time"

"github.com/owncloud/ocis/v2/ocis-pkg/broker"
"github.com/owncloud/ocis/v2/ocis-pkg/registry"
Expand Down Expand Up @@ -65,8 +64,8 @@ func NewService(opts ...Option) (Service, error) {
micro.Context(sopts.Context),
micro.Flags(sopts.Flags...),
micro.Registry(registry.GetRegistry()),
micro.RegisterTTL(time.Second * 30),
micro.RegisterInterval(time.Second * 10),
micro.RegisterTTL(registry.GetRegisterTTL()),
micro.RegisterInterval(registry.GetRegisterInterval()),
micro.WrapClient(mtracer.NewClientWrapper(
mtracer.WithTraceProvider(sopts.TraceProvider),
)),
Expand Down

0 comments on commit 7cf939a

Please sign in to comment.