Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy: remove capnslog #11614

Merged
merged 2 commits into from
Feb 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and
- Deprecated `etcd_debugging_mvcc_txn_total` Prometheus metric. Use `etcd_mvcc_txn_total` instead.
- Deprecated `etcd_debugging_mvcc_range_total` Prometheus metric. Use `etcd_mvcc_range_total` instead.
- Master branch `/version` outputs `3.5.0-pre`, instead of `3.4.0+git`.
- Changed `proxy` package function signature to [support structured logger](https://github.com/etcd-io/etcd/pull/11614).
- Previously, `NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{})`, now `NewClusterProxy(lg *zap.Logger, c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{})`.
- Previously, `Register(c *clientv3.Client, prefix string, addr string, ttl int)`, now `Register(lg *zap.Logger, c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{}`.
- Previously, `NewHandler(t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler`, now `NewHandler(lg *zap.Logger, t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler`.

### Metrics, Monitoring

Expand Down
2 changes: 1 addition & 1 deletion etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func startProxy(cfg *config) error {

return clientURLs
}
ph := httpproxy.NewHandler(pt, uf, time.Duration(cfg.cp.ProxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.cp.ProxyRefreshIntervalMs)*time.Millisecond)
ph := httpproxy.NewHandler(lg, pt, uf, time.Duration(cfg.cp.ProxyFailureWaitMs)*time.Millisecond, time.Duration(cfg.cp.ProxyRefreshIntervalMs)*time.Millisecond)
ph = embed.WrapCORS(cfg.ec.CORS, ph)

if cfg.isReadonlyProxy() {
Expand Down
4 changes: 2 additions & 2 deletions etcdmain/grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,9 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
kvp, _ := grpcproxy.NewKvProxy(client)
watchp, _ := grpcproxy.NewWatchProxy(client)
if grpcProxyResolverPrefix != "" {
grpcproxy.Register(client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
grpcproxy.Register(lg, client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
}
clusterp, _ := grpcproxy.NewClusterProxy(client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
clusterp, _ := grpcproxy.NewClusterProxy(lg, client, grpcProxyAdvertiseClientURL, grpcProxyResolverPrefix)
leasep, _ := grpcproxy.NewLeaseProxy(client)
mainp := grpcproxy.NewMaintenanceProxy(client)
authp := grpcproxy.NewAuthProxy(client)
Expand Down
4 changes: 3 additions & 1 deletion integration/cluster_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"go.etcd.io/etcd/clientv3/namespace"
"go.etcd.io/etcd/proxy/grpcproxy"
"go.etcd.io/etcd/proxy/grpcproxy/adapter"

"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -56,7 +58,7 @@ func toGRPC(c *clientv3.Client) grpcAPI {
wp, wpch := grpcproxy.NewWatchProxy(c)
lp, lpch := grpcproxy.NewLeaseProxy(c)
mp := grpcproxy.NewMaintenanceProxy(c)
clp, _ := grpcproxy.NewClusterProxy(c, "", "") // without registering proxy URLs
clp, _ := grpcproxy.NewClusterProxy(zap.NewExample(), c, "", "") // without registering proxy URLs
authp := grpcproxy.NewAuthProxy(c)
lockp := grpcproxy.NewLockProxy(c)
electp := grpcproxy.NewElectionProxy(c)
Expand Down
12 changes: 9 additions & 3 deletions proxy/grpcproxy/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"

"go.uber.org/zap"
"golang.org/x/time/rate"
gnaming "google.golang.org/grpc/naming"
)
Expand All @@ -34,6 +35,7 @@ import (
const resolveRetryRate = 1

type clusterProxy struct {
lg *zap.Logger
clus clientv3.Cluster
ctx context.Context
gr *naming.GRPCResolver
Expand All @@ -49,8 +51,12 @@ type clusterProxy struct {
// NewClusterProxy takes optional prefix to fetch grpc-proxy member endpoints.
// The returned channel is closed when there is grpc-proxy endpoint registered
// and the client's context is canceled so the 'register' loop returns.
func NewClusterProxy(c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) {
func NewClusterProxy(lg *zap.Logger, c *clientv3.Client, advaddr string, prefix string) (pb.ClusterServer, <-chan struct{}) {
if lg == nil {
lg = zap.NewNop()
}
cp := &clusterProxy{
lg: lg,
clus: c.Cluster,
ctx: c.Ctx(),
gr: &naming.GRPCResolver{Client: c},
Expand Down Expand Up @@ -78,7 +84,7 @@ func (cp *clusterProxy) resolve(prefix string) {
for rm.Wait(cp.ctx) == nil {
wa, err := cp.gr.Resolve(prefix)
if err != nil {
plog.Warningf("failed to resolve %q (%v)", prefix, err)
cp.lg.Warn("failed to resolve prefix", zap.String("prefix", prefix), zap.Error(err))
continue
}
cp.monitor(wa)
Expand All @@ -89,7 +95,7 @@ func (cp *clusterProxy) monitor(wa gnaming.Watcher) {
for cp.ctx.Err() == nil {
ups, err := wa.Next()
if err != nil {
plog.Warningf("clusterProxy watcher error (%v)", err)
cp.lg.Warn("clusterProxy watcher error", zap.Error(err))
if rpctypes.ErrorDesc(err) == naming.ErrWatcherClosed.Error() {
return
}
Expand Down
9 changes: 5 additions & 4 deletions proxy/grpcproxy/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/pkg/testutil"

"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand All @@ -34,7 +35,7 @@ func TestClusterProxyMemberList(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

cts := newClusterProxyServer([]string{clus.Members[0].GRPCAddr()}, t)
cts := newClusterProxyServer(zap.NewExample(), []string{clus.Members[0].GRPCAddr()}, t)
defer cts.close(t)

cfg := clientv3.Config{
Expand Down Expand Up @@ -88,7 +89,7 @@ func (cts *clusterproxyTestServer) close(t *testing.T) {
}
}

func newClusterProxyServer(endpoints []string, t *testing.T) *clusterproxyTestServer {
func newClusterProxyServer(lg *zap.Logger, endpoints []string, t *testing.T) *clusterproxyTestServer {
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
Expand All @@ -113,8 +114,8 @@ func newClusterProxyServer(endpoints []string, t *testing.T) *clusterproxyTestSe
cts.server.Serve(cts.l)
}()

Register(client, "test-prefix", cts.l.Addr().String(), 7)
cts.cp, cts.donec = NewClusterProxy(client, cts.l.Addr().String(), "test-prefix")
Register(lg, client, "test-prefix", cts.l.Addr().String(), 7)
cts.cp, cts.donec = NewClusterProxy(lg, client, cts.l.Addr().String(), "test-prefix")
cts.caddr = cts.l.Addr().String()
pb.RegisterClusterServer(cts.server, cts.cp)
close(servec)
Expand Down
19 changes: 0 additions & 19 deletions proxy/grpcproxy/logger.go

This file was deleted.

19 changes: 12 additions & 7 deletions proxy/grpcproxy/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/clientv3/naming"

"go.uber.org/zap"
"golang.org/x/time/rate"
gnaming "google.golang.org/grpc/naming"
)
Expand All @@ -32,17 +33,17 @@ const registerRetryRate = 1
// Register registers itself as a grpc-proxy server by writing prefixed-key
// with session of specified TTL (in seconds). The returned channel is closed
// when the client's context is canceled.
func Register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{} {
func Register(lg *zap.Logger, c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{} {
rm := rate.NewLimiter(rate.Limit(registerRetryRate), registerRetryRate)

donec := make(chan struct{})
go func() {
defer close(donec)

for rm.Wait(c.Ctx()) == nil {
ss, err := registerSession(c, prefix, addr, ttl)
ss, err := registerSession(lg, c, prefix, addr, ttl)
if err != nil {
plog.Warningf("failed to create a session %v", err)
lg.Warn("failed to create a session", zap.Error(err))
continue
}
select {
Expand All @@ -51,8 +52,8 @@ func Register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan st
return

case <-ss.Done():
plog.Warning("session expired; possible network partition or server restart")
plog.Warning("creating a new session to rejoin")
lg.Warn("session expired; possible network partition or server restart")
lg.Warn("creating a new session to rejoin")
continue
}
}
Expand All @@ -61,7 +62,7 @@ func Register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan st
return donec
}

func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (*concurrency.Session, error) {
func registerSession(lg *zap.Logger, c *clientv3.Client, prefix string, addr string, ttl int) (*concurrency.Session, error) {
ss, err := concurrency.NewSession(c, concurrency.WithTTL(ttl))
if err != nil {
return nil, err
Expand All @@ -72,7 +73,11 @@ func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (*
return nil, err
}

plog.Infof("registered %q with %d-second lease", addr, ttl)
lg.Info(
"registered session with lease",
zap.String("addr", addr),
zap.Int("lease-ttl", ttl),
)
return ss, nil
}

Expand Down
3 changes: 2 additions & 1 deletion proxy/grpcproxy/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/pkg/testutil"

"go.uber.org/zap"
gnaming "google.golang.org/grpc/naming"
)

Expand All @@ -44,7 +45,7 @@ func TestRegister(t *testing.T) {
t.Fatalf("len(ups) expected 0, got %d (%v)", len(ups), ups)
}

donec := Register(cli, testPrefix, paddr, 5)
donec := Register(zap.NewExample(), cli, testPrefix, paddr, 5)

ups, err = wa.Next()
if err != nil {
Expand Down
37 changes: 29 additions & 8 deletions proxy/httpproxy/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"net/url"
"sync"
"time"

"go.uber.org/zap"
)

// defaultRefreshInterval is the default proxyRefreshIntervalMs value
Expand All @@ -31,8 +33,12 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
if lg == nil {
lg = zap.NewNop()
}
d := &director{
lg: lg,
uf: urlsFunc,
failureWait: failureWait,
}
Expand All @@ -56,7 +62,7 @@ func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterv
for _, e := range es {
sl = append(sl, e.URL.String())
}
plog.Infof("endpoints found %q", sl)
lg.Info("endpoints found", zap.Strings("endpoints", sl))
})
}
time.Sleep(ri)
Expand All @@ -68,6 +74,7 @@ func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterv

type director struct {
sync.Mutex
lg *zap.Logger
ep []*endpoint
uf GetProxyURLs
failureWait time.Duration
Expand All @@ -81,10 +88,10 @@ func (d *director) refresh() {
for _, u := range urls {
uu, err := url.Parse(u)
if err != nil {
plog.Printf("upstream URL invalid: %v", err)
d.lg.Info("upstream URL invalid", zap.Error(err))
continue
}
endpoints = append(endpoints, newEndpoint(*uu, d.failureWait))
endpoints = append(endpoints, newEndpoint(d.lg, *uu, d.failureWait))
}

// shuffle array to avoid connections being "stuck" to a single endpoint
Expand All @@ -109,8 +116,9 @@ func (d *director) endpoints() []*endpoint {
return filtered
}

func newEndpoint(u url.URL, failureWait time.Duration) *endpoint {
func newEndpoint(lg *zap.Logger, u url.URL, failureWait time.Duration) *endpoint {
ep := endpoint{
lg: lg,
URL: u,
Available: true,
failFunc: timedUnavailabilityFunc(failureWait),
Expand All @@ -122,6 +130,7 @@ func newEndpoint(u url.URL, failureWait time.Duration) *endpoint {
type endpoint struct {
sync.Mutex

lg *zap.Logger
URL url.URL
Available bool

Expand All @@ -138,10 +147,17 @@ func (ep *endpoint) Failed() {
ep.Available = false
ep.Unlock()

plog.Printf("marked endpoint %s unavailable", ep.URL.String())
if ep.lg != nil {
ep.lg.Info("marked endpoint unavailable", zap.String("endpoint", ep.URL.String()))
}

if ep.failFunc == nil {
plog.Printf("no failFunc defined, endpoint %s will be unavailable forever.", ep.URL.String())
if ep.lg != nil {
ep.lg.Info(
"no failFunc defined, endpoint will be unavailable forever",
zap.String("endpoint", ep.URL.String()),
)
}
return
}

Expand All @@ -152,7 +168,12 @@ func timedUnavailabilityFunc(wait time.Duration) func(*endpoint) {
return func(ep *endpoint) {
time.AfterFunc(wait, func() {
ep.Available = true
plog.Printf("marked endpoint %s available, to retest connectivity", ep.URL.String())
if ep.lg != nil {
ep.lg.Info(
"marked endpoint available, to retest connectivity",
zap.String("endpoint", ep.URL.String()),
)
}
})
}
}
4 changes: 3 additions & 1 deletion proxy/httpproxy/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"sort"
"testing"
"time"

"go.uber.org/zap"
)

func TestNewDirectorScheme(t *testing.T) {
Expand Down Expand Up @@ -53,7 +55,7 @@ func TestNewDirectorScheme(t *testing.T) {
uf := func() []string {
return tt.urls
}
got := newDirector(uf, time.Minute, time.Minute)
got := newDirector(zap.NewExample(), uf, time.Minute, time.Minute)

var gep []string
for _, ep := range got.ep {
Expand Down
11 changes: 8 additions & 3 deletions proxy/httpproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strings"
"time"

"go.uber.org/zap"
"golang.org/x/net/http2"
)

Expand All @@ -43,17 +44,21 @@ type GetProxyURLs func() []string
// NewHandler creates a new HTTP handler, listening on the given transport,
// which will proxy requests to an etcd cluster.
// The handler will periodically update its view of the cluster.
func NewHandler(t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler {
func NewHandler(lg *zap.Logger, t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler {
if lg == nil {
lg = zap.NewNop()
}
if t.TLSClientConfig != nil {
// Enable http2, see Issue 5033.
err := http2.ConfigureTransport(t)
if err != nil {
plog.Infof("Error enabling Transport HTTP/2 support: %v", err)
lg.Info("Error enabling Transport HTTP/2 support", zap.Error(err))
}
}

p := &reverseProxy{
director: newDirector(urlsFunc, failureWait, refreshInterval),
lg: lg,
director: newDirector(lg, urlsFunc, failureWait, refreshInterval),
transport: t,
}

Expand Down
Loading