From 061dffff9b20062392d4fc8ac32de5108240debc Mon Sep 17 00:00:00 2001 From: John McBride Date: Fri, 17 Apr 2020 11:29:13 -0600 Subject: [PATCH] Workaround for broken etcd gomod import --- go.mod | 5 +- go.sum | 6 +- vendor/go.etcd.io/etcd/auth/jwt.go | 6 +- vendor/go.etcd.io/etcd/auth/store.go | 7 +- .../balancer/resolver/endpoint/endpoint.go | 23 ++- vendor/go.etcd.io/etcd/clientv3/client.go | 61 +++--- .../etcd/clientv3/credentials/credentials.go | 64 +++--- vendor/go.etcd.io/etcd/clientv3/ctx.go | 64 ++++++ .../etcd/clientv3/retry_interceptor.go | 15 +- vendor/go.etcd.io/etcd/embed/config.go | 4 +- .../go.etcd.io/etcd/embed/config_logging.go | 15 +- vendor/go.etcd.io/etcd/embed/serve.go | 2 +- .../etcd/etcdserver/api/etcdhttp/metrics.go | 7 + .../etcd/etcdserver/api/etcdhttp/peer.go | 15 +- .../etcd/etcdserver/api/membership/cluster.go | 10 +- .../etcd/etcdserver/api/rafthttp/stream.go | 1 + .../etcd/etcdserver/api/snap/snapshotter.go | 21 ++ .../etcd/etcdserver/api/v3rpc/interceptor.go | 24 ++- .../etcd/etcdserver/api/v3rpc/metrics.go | 10 + .../etcd/etcdserver/api/v3rpc/rpctypes/md.go | 2 + vendor/go.etcd.io/etcd/etcdserver/apply.go | 78 ++++--- .../go.etcd.io/etcd/etcdserver/apply_auth.go | 14 +- vendor/go.etcd.io/etcd/etcdserver/corrupt.go | 194 ++++++++++++++---- vendor/go.etcd.io/etcd/etcdserver/metrics.go | 2 +- vendor/go.etcd.io/etcd/etcdserver/server.go | 21 +- .../go.etcd.io/etcd/etcdserver/v3_server.go | 51 ++++- vendor/go.etcd.io/etcd/lease/lessor.go | 9 +- .../go.etcd.io/etcd/mvcc/backend/backend.go | 29 ++- vendor/go.etcd.io/etcd/mvcc/kv.go | 7 +- vendor/go.etcd.io/etcd/mvcc/kv_view.go | 15 +- vendor/go.etcd.io/etcd/mvcc/kvstore.go | 65 +++--- .../etcd/mvcc/kvstore_compaction.go | 1 + vendor/go.etcd.io/etcd/mvcc/kvstore_txn.go | 18 +- vendor/go.etcd.io/etcd/mvcc/metrics.go | 43 ++++ vendor/go.etcd.io/etcd/mvcc/metrics_txn.go | 8 +- .../go.etcd.io/etcd/mvcc/watchable_store.go | 3 +- .../etcd/mvcc/watchable_store_txn.go | 9 +- vendor/go.etcd.io/etcd/pkg/fileutil/purge.go | 14 +- .../go.etcd.io/etcd/pkg/ioutil/pagewriter.go | 17 +- vendor/go.etcd.io/etcd/pkg/logutil/zap.go | 10 +- vendor/go.etcd.io/etcd/pkg/traceutil/trace.go | 172 ++++++++++++++++ .../etcd/raft/confchange/confchange.go | 12 +- vendor/go.etcd.io/etcd/raft/log_unstable.go | 6 +- vendor/go.etcd.io/etcd/raft/raft.go | 50 ++++- vendor/go.etcd.io/etcd/raft/util.go | 4 +- vendor/go.etcd.io/etcd/version/version.go | 2 +- vendor/go.etcd.io/etcd/wal/encoder.go | 12 +- vendor/go.etcd.io/etcd/wal/metrics.go | 8 + vendor/go.etcd.io/etcd/wal/repair.go | 4 +- vendor/go.etcd.io/etcd/wal/wal.go | 5 + vendor/modules.txt | 3 +- 51 files changed, 974 insertions(+), 274 deletions(-) create mode 100644 vendor/go.etcd.io/etcd/clientv3/ctx.go create mode 100644 vendor/go.etcd.io/etcd/pkg/traceutil/trace.go diff --git a/go.mod b/go.mod index 55e0f61057f2..5c925cf393e8 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/uber/jaeger-client-go v2.20.1+incompatible github.com/ugorji/go v1.1.7 // indirect github.com/weaveworks/common v0.0.0-20200310113808-2708ba4e60a4 - go.etcd.io/etcd v0.0.0-20190815204525-8f85f0dc2607 // indirect + go.etcd.io/etcd v0.0.0-20200401174654-e694b7bb0875 // indirect golang.org/x/net v0.0.0-20200226121028-0de0cce0169b google.golang.org/grpc v1.25.1 gopkg.in/alecthomas/kingpin.v2 v2.2.6 @@ -65,9 +65,6 @@ replace github.com/hpcloud/tail => github.com/grafana/tail v0.0.0-20191024143944 // Override reference that causes an error from Go proxy - see https://github.com/golang/go/issues/33558 replace k8s.io/client-go => k8s.io/client-go v0.0.0-20190620085101-78d2af792bab -// Override reference causing proxy error. Otherwise it attempts to download https://proxy.golang.org/golang.org/x/net/@v/v0.0.0-20190813000000-74dc4d7220e7.info -replace golang.org/x/net v0.0.0-20190813000000-74dc4d7220e7 => golang.org/x/net v0.0.0-20190923162816-aa69164e4478 - replace github.com/Azure/azure-sdk-for-go => github.com/Azure/azure-sdk-for-go v36.2.0+incompatible replace github.com/Azure/go-autorest => github.com/Azure/go-autorest v13.3.0+incompatible diff --git a/go.sum b/go.sum index c389aa28bedb..419cf1e48c19 100644 --- a/go.sum +++ b/go.sum @@ -808,8 +808,8 @@ go.elastic.co/fastjson v1.0.0/go.mod h1:PmeUOMMtLHQr9ZS9J9owrAVg0FkaZDRZJEFTTGHt go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20190709142735-eb7dd97135a5/go.mod h1:N0RPWo9FXJYZQI4BTkDtQylrstIigYHeR18ONnyTufk= -go.etcd.io/etcd v0.0.0-20190815204525-8f85f0dc2607 h1:TA51XPJi/dOGnzp82lfN1wh8ijEz3BZEiKphiurSzLU= -go.etcd.io/etcd v0.0.0-20190815204525-8f85f0dc2607/go.mod h1:tQYIqsNuGzkF9ncfEtoEX0qkoBhzw6ih5N1xcdGnvek= +go.etcd.io/etcd v0.0.0-20200401174654-e694b7bb0875 h1:61WXaq6CI2RsDa1qZEWkW4KruLdtp0EVLwrFRfsd8Qo= +go.etcd.io/etcd v0.0.0-20200401174654-e694b7bb0875/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.0.4/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.0/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= @@ -898,6 +898,7 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191002035440-2ec189313ef0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -1055,6 +1056,7 @@ google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ij google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA= google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= diff --git a/vendor/go.etcd.io/etcd/auth/jwt.go b/vendor/go.etcd.io/etcd/auth/jwt.go index c22ef898a14d..52cafe4aafb1 100644 --- a/vendor/go.etcd.io/etcd/auth/jwt.go +++ b/vendor/go.etcd.io/etcd/auth/jwt.go @@ -105,7 +105,7 @@ func (t *tokenJWT) assign(ctx context.Context, username string, revision uint64) token, err := tk.SignedString(t.key) if err != nil { if t.lg != nil { - t.lg.Warn( + t.lg.Debug( "failed to sign a JWT token", zap.String("user-name", username), zap.Uint64("revision", revision), @@ -118,7 +118,7 @@ func (t *tokenJWT) assign(ctx context.Context, username string, revision uint64) } if t.lg != nil { - t.lg.Info( + t.lg.Debug( "created/assigned a new JWT token", zap.String("user-name", username), zap.Uint64("revision", revision), @@ -136,7 +136,7 @@ func newTokenProviderJWT(lg *zap.Logger, optMap map[string]string) (*tokenJWT, e err = opts.ParseWithDefaults(optMap) if err != nil { if lg != nil { - lg.Warn("problem loading JWT options", zap.Error(err)) + lg.Error("problem loading JWT options", zap.Error(err)) } else { plog.Errorf("problem loading JWT options: %s", err) } diff --git a/vendor/go.etcd.io/etcd/auth/store.go b/vendor/go.etcd.io/etcd/auth/store.go index a35e84178a83..3c1536229345 100644 --- a/vendor/go.etcd.io/etcd/auth/store.go +++ b/vendor/go.etcd.io/etcd/auth/store.go @@ -306,7 +306,7 @@ func (as *authStore) Authenticate(ctx context.Context, username, password string return nil, ErrAuthFailed } - if user.Options.NoPassword { + if user.Options != nil && user.Options.NoPassword { return nil, ErrAuthFailed } @@ -344,7 +344,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { return 0, ErrAuthFailed } - if user.Options.NoPassword { + if user.Options != nil && user.Options.NoPassword { return 0, ErrAuthFailed } @@ -388,7 +388,8 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, var hashed []byte var err error - if !r.Options.NoPassword { + noPassword := r.Options != nil && r.Options.NoPassword + if !noPassword { hashed, err = bcrypt.GenerateFromPassword([]byte(r.Password), as.bcryptCost) if err != nil { if as.lg != nil { diff --git a/vendor/go.etcd.io/etcd/clientv3/balancer/resolver/endpoint/endpoint.go b/vendor/go.etcd.io/etcd/clientv3/balancer/resolver/endpoint/endpoint.go index 1f32039e37b2..864b5df6426f 100644 --- a/vendor/go.etcd.io/etcd/clientv3/balancer/resolver/endpoint/endpoint.go +++ b/vendor/go.etcd.io/etcd/clientv3/balancer/resolver/endpoint/endpoint.go @@ -16,7 +16,9 @@ package endpoint import ( + "context" "fmt" + "net" "net/url" "strings" "sync" @@ -228,13 +230,18 @@ func ParseTarget(target string) (string, string, error) { return parts[0], parts[1], nil } -// ParseHostPort splits a ":" string into the host and port parts. -// The port part is optional. -func ParseHostPort(hostPort string) (host string, port string) { - parts := strings.SplitN(hostPort, ":", 2) - host = parts[0] - if len(parts) > 1 { - port = parts[1] +// Dialer dials a endpoint using net.Dialer. +// Context cancelation and timeout are supported. +func Dialer(ctx context.Context, dialEp string) (net.Conn, error) { + proto, host, _ := ParseEndpoint(dialEp) + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + dialer := &net.Dialer{} + if deadline, ok := ctx.Deadline(); ok { + dialer.Deadline = deadline } - return host, port + return dialer.DialContext(ctx, proto, host) } diff --git a/vendor/go.etcd.io/etcd/clientv3/client.go b/vendor/go.etcd.io/etcd/clientv3/client.go index bc66a57c4097..a35ec679a029 100644 --- a/vendor/go.etcd.io/etcd/clientv3/client.go +++ b/vendor/go.etcd.io/etcd/clientv3/client.go @@ -37,7 +37,6 @@ import ( "google.golang.org/grpc/codes" grpccredentials "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -129,8 +128,12 @@ func NewFromURLs(urls []string) (*Client, error) { // Close shuts down the client's etcd connections. func (c *Client) Close() error { c.cancel() - c.Watcher.Close() - c.Lease.Close() + if c.Watcher != nil { + c.Watcher.Close() + } + if c.Lease != nil { + c.Lease.Close() + } if c.resolverGroup != nil { c.resolverGroup.Close() } @@ -226,24 +229,17 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts } opts = append(opts, dopts...) - // Provide a net dialer that supports cancelation and timeout. - f := func(dialEp string, t time.Duration) (net.Conn, error) { - proto, host, _ := endpoint.ParseEndpoint(dialEp) - select { - case <-c.ctx.Done(): - return nil, c.ctx.Err() - default: - } - dialer := &net.Dialer{Timeout: t} - return dialer.DialContext(c.ctx, proto, host) - } - opts = append(opts, grpc.WithDialer(f)) - + dialer := endpoint.Dialer if creds != nil { opts = append(opts, grpc.WithTransportCredentials(creds)) + // gRPC load balancer workaround. See credentials.transportCredential for details. + if credsDialer, ok := creds.(TransportCredentialsWithDialer); ok { + dialer = credsDialer.Dialer + } } else { opts = append(opts, grpc.WithInsecure()) } + opts = append(opts, grpc.WithContextDialer(dialer)) // Interceptor retry and backoff. // TODO: Replace all of clientv3/retry.go with interceptor based retry, or with @@ -262,7 +258,10 @@ func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts // Dial connects to a single endpoint using the client's config. func (c *Client) Dial(ep string) (*grpc.ClientConn, error) { - creds := c.directDialCreds(ep) + creds, err := c.directDialCreds(ep) + if err != nil { + return nil, err + } // Use the grpc passthrough resolver to directly dial a single endpoint. // This resolver passes through the 'unix' and 'unixs' endpoints schemes used // by etcd without modification, allowing us to directly dial endpoints and @@ -365,8 +364,8 @@ func (c *Client) dial(target string, creds grpccredentials.TransportCredentials, return conn, nil } -func (c *Client) directDialCreds(ep string) grpccredentials.TransportCredentials { - _, hostPort, scheme := endpoint.ParseEndpoint(ep) +func (c *Client) directDialCreds(ep string) (grpccredentials.TransportCredentials, error) { + _, host, scheme := endpoint.ParseEndpoint(ep) creds := c.creds if len(scheme) != 0 { creds = c.processCreds(scheme) @@ -375,12 +374,17 @@ func (c *Client) directDialCreds(ep string) grpccredentials.TransportCredentials // Set the server name must to the endpoint hostname without port since grpc // otherwise attempts to check if x509 cert is valid for the full endpoint // including the scheme and port, which fails. - host, _ := endpoint.ParseHostPort(hostPort) - clone.OverrideServerName(host) + overrideServerName, _, err := net.SplitHostPort(host) + if err != nil { + // Either the host didn't have a port or the host could not be parsed. Either way, continue with the + // original host string. + overrideServerName = host + } + clone.OverrideServerName(overrideServerName) creds = clone } } - return creds + return creds, nil } func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCredentials { @@ -392,13 +396,6 @@ func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCrede return creds } -// WithRequireLeader requires client requests to only succeed -// when the cluster has a leader. -func WithRequireLeader(ctx context.Context) context.Context { - md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) - return metadata.NewOutgoingContext(ctx, md) -} - func newClient(cfg *Config) (*Client, error) { if cfg == nil { cfg = &Config{} @@ -659,3 +656,9 @@ func IsConnCanceled(err error) bool { // <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")' return strings.Contains(err.Error(), "grpc: the client connection is closing") } + +// TransportCredentialsWithDialer is for a gRPC load balancer workaround. See credentials.transportCredential for details. +type TransportCredentialsWithDialer interface { + grpccredentials.TransportCredentials + Dialer(ctx context.Context, dialEp string) (net.Conn, error) +} diff --git a/vendor/go.etcd.io/etcd/clientv3/credentials/credentials.go b/vendor/go.etcd.io/etcd/clientv3/credentials/credentials.go index e6fd75cc3f1d..63389c08bffd 100644 --- a/vendor/go.etcd.io/etcd/clientv3/credentials/credentials.go +++ b/vendor/go.etcd.io/etcd/clientv3/credentials/credentials.go @@ -22,6 +22,7 @@ import ( "net" "sync" + "go.etcd.io/etcd/clientv3/balancer/resolver/endpoint" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" grpccredentials "google.golang.org/grpc/credentials" ) @@ -65,38 +66,37 @@ func (b *bundle) NewWithMode(mode string) (grpccredentials.Bundle, error) { } // transportCredential implements "grpccredentials.TransportCredentials" interface. +// transportCredential wraps TransportCredentials to track which +// addresses are dialed for which endpoints, and then sets the authority when checking the endpoint's cert to the +// hostname or IP of the dialed endpoint. +// This is a workaround of a gRPC load balancer issue. gRPC uses the dialed target's service name as the authority when +// checking all endpoint certs, which does not work for etcd servers using their hostname or IP as the Subject Alternative Name +// in their TLS certs. +// To enable, include both WithTransportCredentials(creds) and WithContextDialer(creds.Dialer) +// when dialing. type transportCredential struct { gtc grpccredentials.TransportCredentials + mu sync.Mutex + // addrToEndpoint maps from the connection addresses that are dialed to the hostname or IP of the + // endpoint provided to the dialer when dialing + addrToEndpoint map[string]string } func newTransportCredential(cfg *tls.Config) *transportCredential { return &transportCredential{ - gtc: grpccredentials.NewTLS(cfg), + gtc: grpccredentials.NewTLS(cfg), + addrToEndpoint: map[string]string{}, } } func (tc *transportCredential) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, grpccredentials.AuthInfo, error) { - // Only overwrite when authority is an IP address! - // Let's say, a server runs SRV records on "etcd.local" that resolves - // to "m1.etcd.local", and its SAN field also includes "m1.etcd.local". - // But what if SAN does not include its resolved IP address (e.g. 127.0.0.1)? - // Then, the server should only authenticate using its DNS hostname "m1.etcd.local", - // instead of overwriting it with its IP address. - // And we do not overwrite "localhost" either. Only overwrite IP addresses! - if isIP(authority) { - target := rawConn.RemoteAddr().String() - if authority != target { - // When user dials with "grpc.WithDialer", "grpc.DialContext" "cc.parsedTarget" - // update only happens once. This is problematic, because when TLS is enabled, - // retries happen through "grpc.WithDialer" with static "cc.parsedTarget" from - // the initial dial call. - // If the server authenticates by IP addresses, we want to set a new endpoint as - // a new authority. Otherwise - // "transport: authentication handshake failed: x509: certificate is valid for 127.0.0.1, 192.168.121.180, not 192.168.223.156" - // when the new dial target is "192.168.121.180" whose certificate host name is also "192.168.121.180" - // but client tries to authenticate with previously set "cc.parsedTarget" field "192.168.223.156" - authority = target - } + // Set the authority when checking the endpoint's cert to the hostname or IP of the dialed endpoint + tc.mu.Lock() + dialEp, ok := tc.addrToEndpoint[rawConn.RemoteAddr().String()] + tc.mu.Unlock() + if ok { + _, host, _ := endpoint.ParseEndpoint(dialEp) + authority = host } return tc.gtc.ClientHandshake(ctx, authority, rawConn) } @@ -115,8 +115,15 @@ func (tc *transportCredential) Info() grpccredentials.ProtocolInfo { } func (tc *transportCredential) Clone() grpccredentials.TransportCredentials { + copy := map[string]string{} + tc.mu.Lock() + for k, v := range tc.addrToEndpoint { + copy[k] = v + } + tc.mu.Unlock() return &transportCredential{ - gtc: tc.gtc.Clone(), + gtc: tc.gtc.Clone(), + addrToEndpoint: copy, } } @@ -124,6 +131,17 @@ func (tc *transportCredential) OverrideServerName(serverNameOverride string) err return tc.gtc.OverrideServerName(serverNameOverride) } +func (tc *transportCredential) Dialer(ctx context.Context, dialEp string) (net.Conn, error) { + // Keep track of which addresses are dialed for which endpoints + conn, err := endpoint.Dialer(ctx, dialEp) + if conn != nil { + tc.mu.Lock() + tc.addrToEndpoint[conn.RemoteAddr().String()] = dialEp + tc.mu.Unlock() + } + return conn, err +} + // perRPCCredential implements "grpccredentials.PerRPCCredentials" interface. type perRPCCredential struct { authToken string diff --git a/vendor/go.etcd.io/etcd/clientv3/ctx.go b/vendor/go.etcd.io/etcd/clientv3/ctx.go new file mode 100644 index 000000000000..542219837bbc --- /dev/null +++ b/vendor/go.etcd.io/etcd/clientv3/ctx.go @@ -0,0 +1,64 @@ +// Copyright 2020 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "context" + "strings" + + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.etcd.io/etcd/version" + "google.golang.org/grpc/metadata" +) + +// WithRequireLeader requires client requests to only succeed +// when the cluster has a leader. +func WithRequireLeader(ctx context.Context) context.Context { + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { // no outgoing metadata ctx key, create one + md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) + return metadata.NewOutgoingContext(ctx, md) + } + copied := md.Copy() // avoid racey updates + // overwrite/add 'hasleader' key/value + metadataSet(copied, rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) + return metadata.NewOutgoingContext(ctx, copied) +} + +// embeds client version +func withVersion(ctx context.Context) context.Context { + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { // no outgoing metadata ctx key, create one + md = metadata.Pairs(rpctypes.MetadataClientAPIVersionKey, version.APIVersion) + return metadata.NewOutgoingContext(ctx, md) + } + copied := md.Copy() // avoid racey updates + // overwrite/add version key/value + metadataSet(copied, rpctypes.MetadataClientAPIVersionKey, version.APIVersion) + return metadata.NewOutgoingContext(ctx, copied) +} + +func metadataGet(md metadata.MD, k string) []string { + k = strings.ToLower(k) + return md[k] +} + +func metadataSet(md metadata.MD, k string, vals ...string) { + if len(vals) == 0 { + return + } + k = strings.ToLower(k) + md[k] = vals +} diff --git a/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go b/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go index 080490ae2929..2c266e55bec0 100644 --- a/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go +++ b/vendor/go.etcd.io/etcd/clientv3/retry_interceptor.go @@ -38,6 +38,7 @@ import ( func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor { intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + ctx = withVersion(ctx) grpcOpts, retryOpts := filterCallOptions(opts) callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) // short circuit for simplicity, and avoiding allocations. @@ -103,6 +104,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor { intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + ctx = withVersion(ctx) grpcOpts, retryOpts := filterCallOptions(opts) callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) // short circuit for simplicity, and avoiding allocations. @@ -113,10 +115,9 @@ func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOp return nil, status.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()") } newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...) - logger.Warn("retry stream intercept", zap.Error(err)) if err != nil { - // TODO(mwitkow): Maybe dial and transport errors should be retriable? - return nil, err + logger.Error("streamer failed to create ClientStream", zap.Error(err)) + return nil, err // TODO(mwitkow): Maybe dial and transport errors should be retriable? } retryingStreamer := &serverStreamingRetryingStream{ client: c, @@ -185,6 +186,7 @@ func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error { if !attemptRetry { return lastErr // success or hard failure } + // We start off from attempt 1, because zeroth was already made on normal SendMsg(). for attempt := uint(1); attempt < s.callOpts.max; attempt++ { if err := waitRetryBackoff(s.ctx, attempt, s.callOpts); err != nil { @@ -192,12 +194,13 @@ func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error { } newStream, err := s.reestablishStreamAndResendBuffer(s.ctx) if err != nil { - // TODO(mwitkow): Maybe dial and transport errors should be retriable? - return err + s.client.lg.Error("failed reestablishStreamAndResendBuffer", zap.Error(err)) + return err // TODO(mwitkow): Maybe dial and transport errors should be retriable? } s.setStream(newStream) + + s.client.lg.Warn("retrying RecvMsg", zap.Error(lastErr)) attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m) - //fmt.Printf("Received message and indicate: %v %v\n", attemptRetry, lastErr) if !attemptRetry { return lastErr } diff --git a/vendor/go.etcd.io/etcd/embed/config.go b/vendor/go.etcd.io/etcd/embed/config.go index 278316b518fb..2f64d927f2af 100644 --- a/vendor/go.etcd.io/etcd/embed/config.go +++ b/vendor/go.etcd.io/etcd/embed/config.go @@ -303,8 +303,8 @@ type Config struct { // It can be multiple when "Logger" is zap. LogOutputs []string `json:"log-outputs"` - // zapLoggerBuilder is used to build the zap logger. - zapLoggerBuilder func(*Config) error + // ZapLoggerBuilder is used to build the zap logger. + ZapLoggerBuilder func(*Config) error // logger logs server-side operations. The default is nil, // and "setupLogging" must be called before starting server. diff --git a/vendor/go.etcd.io/etcd/embed/config_logging.go b/vendor/go.etcd.io/etcd/embed/config_logging.go index e617dfe82dc8..e42103cb18c2 100644 --- a/vendor/go.etcd.io/etcd/embed/config_logging.go +++ b/vendor/go.etcd.io/etcd/embed/config_logging.go @@ -170,7 +170,10 @@ func (cfg *Config) setupLogging() error { } if !isJournal { - copied := logutil.AddOutputPaths(logutil.DefaultZapLoggerConfig, outputPaths, errOutputPaths) + copied := logutil.DefaultZapLoggerConfig + copied.OutputPaths = outputPaths + copied.ErrorOutputPaths = errOutputPaths + copied = logutil.MergeOutputPaths(copied) copied.Level = zap.NewAtomicLevelAt(logutil.ConvertToZapLevel(cfg.LogLevel)) if cfg.Debug || cfg.LogLevel == "debug" { // enable tracing even when "--debug --log-level info" @@ -178,8 +181,8 @@ func (cfg *Config) setupLogging() error { // TODO: remove "Debug" check in v3.5 grpc.EnableTracing = true } - if cfg.zapLoggerBuilder == nil { - cfg.zapLoggerBuilder = func(c *Config) error { + if cfg.ZapLoggerBuilder == nil { + cfg.ZapLoggerBuilder = func(c *Config) error { var err error c.logger, err = copied.Build() if err != nil { @@ -232,8 +235,8 @@ func (cfg *Config) setupLogging() error { syncer, lvl, ) - if cfg.zapLoggerBuilder == nil { - cfg.zapLoggerBuilder = func(c *Config) error { + if cfg.ZapLoggerBuilder == nil { + cfg.ZapLoggerBuilder = func(c *Config) error { c.logger = zap.New(cr, zap.AddCaller(), zap.ErrorOutput(syncer)) c.loggerMu.Lock() defer c.loggerMu.Unlock() @@ -249,7 +252,7 @@ func (cfg *Config) setupLogging() error { } } - err := cfg.zapLoggerBuilder(cfg) + err := cfg.ZapLoggerBuilder(cfg) if err != nil { return err } diff --git a/vendor/go.etcd.io/etcd/embed/serve.go b/vendor/go.etcd.io/etcd/embed/serve.go index 62829ee5975a..a3b20c46c38f 100644 --- a/vendor/go.etcd.io/etcd/embed/serve.go +++ b/vendor/go.etcd.io/etcd/embed/serve.go @@ -189,7 +189,7 @@ func (sctx *serveCtx) serve( sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} if sctx.lg != nil { sctx.lg.Info( - "serving client traffic insecurely", + "serving client traffic securely", zap.String("address", sctx.l.Addr().String()), ) } else { diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/metrics.go b/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/metrics.go index f455e40a7406..07ec8ec3bd96 100644 --- a/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/metrics.go +++ b/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/metrics.go @@ -50,6 +50,7 @@ func NewHealthHandler(hfunc func() Health) http.HandlerFunc { if r.Method != http.MethodGet { w.Header().Set("Allow", http.MethodGet) http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + plog.Warningf("/health error (status code %d)", http.StatusMethodNotAllowed) return } h := hfunc() @@ -97,11 +98,15 @@ func checkHealth(srv etcdserver.ServerV2) Health { as := srv.Alarms() if len(as) > 0 { h.Health = "false" + for _, v := range as { + plog.Warningf("/health error due to an alarm %s", v.String()) + } } if h.Health == "true" { if uint64(srv.Leader()) == raft.None { h.Health = "false" + plog.Warningf("/health error; no leader (status code %d)", http.StatusServiceUnavailable) } } @@ -111,11 +116,13 @@ func checkHealth(srv etcdserver.ServerV2) Health { cancel() if err != nil { h.Health = "false" + plog.Warningf("/health error; QGET failed %v (status code %d)", err, http.StatusServiceUnavailable) } } if h.Health == "true" { healthSuccess.Inc() + plog.Infof("/health OK (status code %d)", http.StatusOK) } else { healthFailed.Inc() } diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/peer.go b/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/peer.go index 6c61bf5d5104..2d13741c68bd 100644 --- a/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/peer.go +++ b/vendor/go.etcd.io/etcd/etcdserver/api/etcdhttp/peer.go @@ -37,11 +37,17 @@ const ( ) // NewPeerHandler generates an http.Handler to handle etcd peer requests. -func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeer) http.Handler { - return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler()) +func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler { + return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler()) } -func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handler, leaseHandler http.Handler) http.Handler { +func newPeerHandler( + lg *zap.Logger, + s etcdserver.Server, + raftHandler http.Handler, + leaseHandler http.Handler, + hashKVHandler http.Handler, +) http.Handler { peerMembersHandler := newPeerMembersHandler(lg, s.Cluster()) peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s) @@ -55,6 +61,9 @@ func newPeerHandler(lg *zap.Logger, s etcdserver.Server, raftHandler http.Handle mux.Handle(leasehttp.LeasePrefix, leaseHandler) mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler) } + if hashKVHandler != nil { + mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler) + } mux.HandleFunc(versionPath, versionHandler(s.Cluster(), serveVersion)) return mux } diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/membership/cluster.go b/vendor/go.etcd.io/etcd/etcdserver/api/membership/cluster.go index 81f515d2f396..d1cf220dd691 100644 --- a/vendor/go.etcd.io/etcd/etcdserver/api/membership/cluster.go +++ b/vendor/go.etcd.io/etcd/etcdserver/api/membership/cluster.go @@ -565,6 +565,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s plog.Noticef("set the initial cluster version to %v", version.Cluster(ver.String())) } } + oldVer := c.version c.version = ver mustDetectDowngrade(c.lg, c.version) if c.v2store != nil { @@ -573,7 +574,10 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s if c.be != nil { mustSaveClusterVersionToBackend(c.be, ver) } - ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": ver.String()}).Set(1) + if oldVer != nil { + ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(oldVer.String())}).Set(0) + } + ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(ver.String())}).Set(1) onSet(c.lg, ver) } @@ -653,8 +657,8 @@ func (c *RaftCluster) IsReadyToRemoveVotingMember(id uint64) bool { } func (c *RaftCluster) IsReadyToPromoteMember(id uint64) bool { - nmembers := 1 - nstarted := 0 + nmembers := 1 // We count the learner to be promoted for the future quorum + nstarted := 1 // and we also count it as started. for _, member := range c.VotingMembers() { if member.IsStarted() { diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/stream.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/stream.go index dcb2223ca59a..cf7d8ccf62cf 100644 --- a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/stream.go +++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/stream.go @@ -57,6 +57,7 @@ var ( "3.1.0": {streamTypeMsgAppV2, streamTypeMessage}, "3.2.0": {streamTypeMsgAppV2, streamTypeMessage}, "3.3.0": {streamTypeMsgAppV2, streamTypeMessage}, + "3.4.0": {streamTypeMsgAppV2, streamTypeMessage}, } ) diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/snap/snapshotter.go b/vendor/go.etcd.io/etcd/etcdserver/api/snap/snapshotter.go index 7e7933374c9a..409aed386b6f 100644 --- a/vendor/go.etcd.io/etcd/etcdserver/api/snap/snapshotter.go +++ b/vendor/go.etcd.io/etcd/etcdserver/api/snap/snapshotter.go @@ -226,6 +226,9 @@ func (s *Snapshotter) snapNames() ([]string, error) { if err != nil { return nil, err } + if err = s.cleanupSnapdir(names); err != nil { + return nil, err + } snaps := checkSuffix(s.lg, names) if len(snaps) == 0 { return nil, ErrNoSnapshot @@ -253,3 +256,21 @@ func checkSuffix(lg *zap.Logger, names []string) []string { } return snaps } + +// cleanupSnapdir removes any files that should not be in the snapshot directory: +// - db.tmp prefixed files that can be orphaned by defragmentation +func (s *Snapshotter) cleanupSnapdir(filenames []string) error { + for _, filename := range filenames { + if strings.HasPrefix(filename, "db.tmp") { + if s.lg != nil { + s.lg.Info("found orphaned defragmentation file; deleting", zap.String("path", filename)) + } else { + plog.Infof("found orphaned defragmentation file; deleting: %s", filename) + } + if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) { + return fmt.Errorf("failed to remove orphaned defragmentation file %s: %v", filename, rmErr) + } + } + } + return nil +} diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/interceptor.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/interceptor.go index ce9047e80fdc..0a3b48e86626 100644 --- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/interceptor.go +++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/interceptor.go @@ -16,17 +16,17 @@ package v3rpc import ( "context" + "strings" "sync" "time" + "github.com/coreos/pkg/capnslog" "go.etcd.io/etcd/etcdserver" "go.etcd.io/etcd/etcdserver/api" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/pkg/types" "go.etcd.io/etcd/raft" - - "github.com/coreos/pkg/capnslog" - pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/metadata" @@ -54,6 +54,12 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { md, ok := metadata.FromIncomingContext(ctx) if ok { + ver, vs := "unknown", metadataGet(md, rpctypes.MetadataClientAPIVersionKey) + if len(vs) > 0 { + ver = vs[0] + } + clientRequests.WithLabelValues("unary", ver).Inc() + if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader { if s.Leader() == types.ID(raft.None) { return nil, rpctypes.ErrGRPCNoLeader @@ -200,6 +206,12 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor md, ok := metadata.FromIncomingContext(ss.Context()) if ok { + ver, vs := "unknown", metadataGet(md, rpctypes.MetadataClientAPIVersionKey) + if len(vs) > 0 { + ver = vs[0] + } + clientRequests.WithLabelValues("stream", ver).Inc() + if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader { if s.Leader() == types.ID(raft.None) { return rpctypes.ErrGRPCNoLeader @@ -218,7 +230,6 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor smap.mu.Unlock() cancel() }() - } } @@ -274,3 +285,8 @@ func monitorLeader(s *etcdserver.EtcdServer) *streamsMap { return smap } + +func metadataGet(md metadata.MD, k string) []string { + k = strings.ToLower(k) + return md[k] +} diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/metrics.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/metrics.go index d633d27c2cb3..a4ee723c52f9 100644 --- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/metrics.go +++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/metrics.go @@ -39,10 +39,20 @@ var ( }, []string{"Type", "API"}, ) + + clientRequests = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "client_requests_total", + Help: "The total number of client requests per client version.", + }, + []string{"type", "client_api_version"}, + ) ) func init() { prometheus.MustRegister(sentBytes) prometheus.MustRegister(receivedBytes) prometheus.MustRegister(streamFailures) + prometheus.MustRegister(clientRequests) } diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/md.go b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/md.go index 5c590e1aec99..90b8b835b168 100644 --- a/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/md.go +++ b/vendor/go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes/md.go @@ -17,4 +17,6 @@ package rpctypes var ( MetadataRequireLeaderKey = "hasleader" MetadataHasLeader = "true" + + MetadataClientAPIVersionKey = "client-api-version" ) diff --git a/vendor/go.etcd.io/etcd/etcdserver/apply.go b/vendor/go.etcd.io/etcd/etcdserver/apply.go index 1f06ad0dd673..822b5e32204d 100644 --- a/vendor/go.etcd.io/etcd/etcdserver/apply.go +++ b/vendor/go.etcd.io/etcd/etcdserver/apply.go @@ -26,6 +26,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc" "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/types" "github.com/gogo/protobuf/proto" @@ -43,17 +44,18 @@ type applyResult struct { // to being logically reflected by the node. Currently only used for // Compaction requests. physc <-chan struct{} + trace *traceutil.Trace } // applierV3 is the interface for processing V3 raft messages type applierV3 interface { Apply(r *pb.InternalRaftRequest) *applyResult - Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) - Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) + Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) + Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) - Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) + Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) @@ -119,15 +121,15 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { // call into a.s.applyV3.F instead of a.F so upper appliers can check individual calls switch { case r.Range != nil: - ar.resp, ar.err = a.s.applyV3.Range(nil, r.Range) + ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range) case r.Put != nil: - ar.resp, ar.err = a.s.applyV3.Put(nil, r.Put) + ar.resp, ar.trace, ar.err = a.s.applyV3.Put(nil, r.Put) case r.DeleteRange != nil: ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange) case r.Txn != nil: ar.resp, ar.err = a.s.applyV3.Txn(r.Txn) case r.Compaction != nil: - ar.resp, ar.physc, ar.err = a.s.applyV3.Compaction(r.Compaction) + ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction) case r.LeaseGrant != nil: ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant) case r.LeaseRevoke != nil: @@ -174,32 +176,39 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { return ar } -func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, err error) { +func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { resp = &pb.PutResponse{} resp.Header = &pb.ResponseHeader{} - + trace = traceutil.New("put", + a.s.getLogger(), + traceutil.Field{Key: "key", Value: string(p.Key)}, + traceutil.Field{Key: "req_size", Value: proto.Size(p)}, + ) val, leaseID := p.Value, lease.LeaseID(p.Lease) if txn == nil { if leaseID != lease.NoLease { if l := a.s.lessor.Lookup(leaseID); l == nil { - return nil, lease.ErrLeaseNotFound + return nil, nil, lease.ErrLeaseNotFound } } - txn = a.s.KV().Write() + txn = a.s.KV().Write(trace) defer txn.End() } var rr *mvcc.RangeResult if p.IgnoreValue || p.IgnoreLease || p.PrevKv { + trace.DisableStep() rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{}) if err != nil { - return nil, err + return nil, nil, err } + trace.EnableStep() + trace.Step("get previous kv pair") } if p.IgnoreValue || p.IgnoreLease { if rr == nil || len(rr.KVs) == 0 { // ignore_{lease,value} flag expects previous key-value pair - return nil, ErrKeyNotFound + return nil, nil, ErrKeyNotFound } } if p.IgnoreValue { @@ -215,7 +224,8 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu } resp.Header.Revision = txn.Put(p.Key, val, leaseID) - return resp, nil + trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}) + return resp, trace, nil } func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { @@ -224,7 +234,7 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ end := mkGteRange(dr.RangeEnd) if txn == nil { - txn = a.s.kv.Write() + txn = a.s.kv.Write(traceutil.TODO()) defer txn.End() } @@ -245,12 +255,14 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ return resp, nil } -func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { + trace := traceutil.Get(ctx) + resp := &pb.RangeResponse{} resp.Header = &pb.ResponseHeader{} if txn == nil { - txn = a.s.kv.Read() + txn = a.s.kv.Read(trace) defer txn.End() } @@ -327,7 +339,7 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang rr.KVs = rr.KVs[:r.Limit] resp.More = true } - + trace.Step("filter and sort the key-value pairs") resp.Header.Revision = rr.Rev resp.Count = int64(rr.Count) resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs)) @@ -337,12 +349,13 @@ func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.Rang } resp.Kvs[i] = &rr.KVs[i] } + trace.Step("assemble the response") return resp, nil } func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { isWrite := !isTxnReadonly(rt) - txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read()) + txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(traceutil.TODO())) txnPath := compareToPath(txn, rt) if isWrite { @@ -364,7 +377,7 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { // be the revision of the write txn. if isWrite { txn.End() - txn = a.s.KV().Write() + txn = a.s.KV().Write(traceutil.TODO()) } a.applyTxn(txn, rt, txnPath, txnResp) rev := txn.Rev() @@ -516,7 +529,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat respi := tresp.Responses[i].Response switch tv := req.Request.(type) { case *pb.RequestOp_RequestRange: - resp, err := a.Range(txn, tv.RequestRange) + resp, err := a.Range(context.TODO(), txn, tv.RequestRange) if err != nil { if lg != nil { lg.Panic("unexpected error during txn", zap.Error(err)) @@ -526,7 +539,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat } respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp case *pb.RequestOp_RequestPut: - resp, err := a.Put(txn, tv.RequestPut) + resp, _, err := a.Put(txn, tv.RequestPut) if err != nil { if lg != nil { lg.Panic("unexpected error during txn", zap.Error(err)) @@ -557,17 +570,22 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat return txns } -func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) { +func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { resp := &pb.CompactionResponse{} resp.Header = &pb.ResponseHeader{} - ch, err := a.s.KV().Compact(compaction.Revision) + trace := traceutil.New("compact", + a.s.getLogger(), + traceutil.Field{Key: "revision", Value: compaction.Revision}, + ) + + ch, err := a.s.KV().Compact(trace, compaction.Revision) if err != nil { - return nil, ch, err + return nil, ch, nil, err } // get the current revision. which key to get is not important. rr, _ := a.s.KV().Range([]byte("compaction"), nil, mvcc.RangeOptions{}) resp.Header.Revision = rr.Rev - return resp, ch, err + return resp, ch, trace, err } func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { @@ -674,8 +692,8 @@ type applierV3Capped struct { // with Puts so that the number of keys in the store is capped. func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} } -func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { - return nil, ErrNoSpace +func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { + return nil, nil, ErrNoSpace } func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) { @@ -824,13 +842,13 @@ func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 { return "aApplierV3{app, NewBackendQuota(s, "v3-applier")} } -func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { +func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { ok := a.q.Available(p) - resp, err := a.applierV3.Put(txn, p) + resp, trace, err := a.applierV3.Put(txn, p) if err == nil && !ok { err = ErrNoSpace } - return resp, err + return resp, trace, err } func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { diff --git a/vendor/go.etcd.io/etcd/etcdserver/apply_auth.go b/vendor/go.etcd.io/etcd/etcdserver/apply_auth.go index 4b094ad5d8d3..269af4758cd4 100644 --- a/vendor/go.etcd.io/etcd/etcdserver/apply_auth.go +++ b/vendor/go.etcd.io/etcd/etcdserver/apply_auth.go @@ -15,12 +15,14 @@ package etcdserver import ( + "context" "sync" "go.etcd.io/etcd/auth" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc" + "go.etcd.io/etcd/pkg/traceutil" ) type authApplierV3 struct { @@ -61,9 +63,9 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult { return ret } -func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, error) { +func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil { - return nil, err + return nil, nil, err } if err := aa.checkLeasePuts(lease.LeaseID(r.Lease)); err != nil { @@ -71,23 +73,23 @@ func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutRespon // be written by this user. It means the user cannot revoke the // lease so attaching the lease to the newly written key should // be forbidden. - return nil, err + return nil, nil, err } if r.PrevKv { err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, nil) if err != nil { - return nil, err + return nil, nil, err } } return aa.applierV3.Put(txn, r) } -func (aa *authApplierV3) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { +func (aa *authApplierV3) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil { return nil, err } - return aa.applierV3.Range(txn, r) + return aa.applierV3.Range(ctx, txn, r) } func (aa *authApplierV3) DeleteRange(txn mvcc.TxnWrite, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) { diff --git a/vendor/go.etcd.io/etcd/etcdserver/corrupt.go b/vendor/go.etcd.io/etcd/etcdserver/corrupt.go index 32678a7c5129..e243d98ba6d2 100644 --- a/vendor/go.etcd.io/etcd/etcdserver/corrupt.go +++ b/vendor/go.etcd.io/etcd/etcdserver/corrupt.go @@ -15,14 +15,19 @@ package etcdserver import ( + "bytes" "context" + "encoding/json" "fmt" + "io/ioutil" + "net/http" + "strings" "time" - "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/mvcc" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/types" "go.uber.org/zap" @@ -229,10 +234,12 @@ func (s *EtcdServer) checkHashKV() error { mismatch(uint64(s.ID())) } + checkedCount := 0 for _, p := range peers { if p.resp == nil { continue } + checkedCount++ id := p.resp.Header.MemberId // leader expects follower's latest revision less than or equal to leader's @@ -297,62 +304,56 @@ func (s *EtcdServer) checkHashKV() error { mismatch(id) } } + if lg != nil { + lg.Info("finished peer corruption check", zap.Int("number-of-peers-checked", checkedCount)) + } else { + plog.Infof("finished peer corruption check") + } + return nil } -type peerHashKVResp struct { +type peerInfo struct { id types.ID eps []string +} - resp *clientv3.HashKVResponse +type peerHashKVResp struct { + peerInfo + resp *pb.HashKVResponse err error } -func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) { +func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp { // TODO: handle the case when "s.cluster.Members" have not // been populated (e.g. no snapshot to load from disk) - mbs := s.cluster.Members() - pss := make([]peerHashKVResp, len(mbs)) - for _, m := range mbs { + members := s.cluster.Members() + peers := make([]peerInfo, 0, len(members)) + for _, m := range members { if m.ID == s.ID() { continue } - pss = append(pss, peerHashKVResp{id: m.ID, eps: m.PeerURLs}) + peers = append(peers, peerInfo{id: m.ID, eps: m.PeerURLs}) } lg := s.getLogger() - for _, p := range pss { + var resps []*peerHashKVResp + for _, p := range peers { if len(p.eps) == 0 { continue } - cli, cerr := clientv3.New(clientv3.Config{ - DialTimeout: s.Cfg.ReqTimeout(), - Endpoints: p.eps, - }) - if cerr != nil { - if lg != nil { - lg.Warn( - "failed to create client to peer URL", - zap.String("local-member-id", s.ID().String()), - zap.String("remote-peer-id", p.id.String()), - zap.Strings("remote-peer-endpoints", p.eps), - zap.Error(cerr), - ) - } else { - plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), p.eps, cerr.Error()) - } - continue - } respsLen := len(resps) - for _, c := range cli.Endpoints() { + var lastErr error + for _, ep := range p.eps { ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - var resp *clientv3.HashKVResponse - resp, cerr = cli.HashKV(ctx, c, rev) + + var resp *pb.HashKVResponse + resp, lastErr = s.getPeerHashKVHTTP(ctx, ep, rev) cancel() - if cerr == nil { - resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: resp, err: nil}) + if lastErr == nil { + resps = append(resps, &peerHashKVResp{peerInfo: p, resp: resp, err: nil}) break } if lg != nil { @@ -360,17 +361,17 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) { "failed hash kv request", zap.String("local-member-id", s.ID().String()), zap.Int64("requested-revision", rev), - zap.String("remote-peer-endpoint", c), - zap.Error(cerr), + zap.String("remote-peer-endpoint", ep), + zap.Error(lastErr), ) } else { - plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev) + plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), lastErr.Error(), ep, rev) } } - cli.Close() + // failed to get hashKV from all endpoints of this peer if respsLen == len(resps) { - resps = append(resps, &peerHashKVResp{id: p.id, eps: p.eps, resp: nil, err: cerr}) + resps = append(resps, &peerHashKVResp{peerInfo: p, resp: nil, err: lastErr}) } } return resps @@ -382,11 +383,11 @@ type applierV3Corrupt struct { func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} } -func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, error) { - return nil, ErrCorrupt +func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { + return nil, nil, ErrCorrupt } -func (a *applierV3Corrupt) Range(txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) { +func (a *applierV3Corrupt) Range(ctx context.Context, txn mvcc.TxnRead, p *pb.RangeRequest) (*pb.RangeResponse, error) { return nil, ErrCorrupt } @@ -398,8 +399,8 @@ func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { return nil, ErrCorrupt } -func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, error) { - return nil, nil, ErrCorrupt +func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { + return nil, nil, nil, ErrCorrupt } func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { @@ -409,3 +410,112 @@ func (a *applierV3Corrupt) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantR func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { return nil, ErrCorrupt } + +type ServerPeerV2 interface { + ServerPeer + HashKVHandler() http.Handler +} + +const PeerHashKVPath = "/members/hashkv" + +type hashKVHandler struct { + lg *zap.Logger + server *EtcdServer +} + +func (s *EtcdServer) HashKVHandler() http.Handler { + return &hashKVHandler{lg: s.getLogger(), server: s} +} + +func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.Header().Set("Allow", http.MethodGet) + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + if r.URL.Path != PeerHashKVPath { + http.Error(w, "bad path", http.StatusBadRequest) + return + } + + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, "error reading body", http.StatusBadRequest) + return + } + + req := &pb.HashKVRequest{} + if err = json.Unmarshal(b, req); err != nil { + h.lg.Warn("failed to unmarshal request", zap.Error(err)) + http.Error(w, "error unmarshalling request", http.StatusBadRequest) + return + } + hash, rev, compactRev, err := h.server.KV().HashByRev(req.Revision) + if err != nil { + h.lg.Warn( + "failed to get hashKV", + zap.Int64("requested-revision", req.Revision), + zap.Error(err), + ) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: hash, CompactRevision: compactRev} + respBytes, err := json.Marshal(resp) + if err != nil { + h.lg.Warn("failed to marshal hashKV response", zap.Error(err)) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("X-Etcd-Cluster-ID", h.server.Cluster().ID().String()) + w.Header().Set("Content-Type", "application/json") + w.Write(respBytes) +} + +// getPeerHashKVHTTP fetch hash of kv store at the given rev via http call to the given url +func (s *EtcdServer) getPeerHashKVHTTP(ctx context.Context, url string, rev int64) (*pb.HashKVResponse, error) { + cc := &http.Client{Transport: s.peerRt} + hashReq := &pb.HashKVRequest{Revision: rev} + hashReqBytes, err := json.Marshal(hashReq) + if err != nil { + return nil, err + } + requestUrl := url + PeerHashKVPath + req, err := http.NewRequest(http.MethodGet, requestUrl, bytes.NewReader(hashReqBytes)) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + req.Header.Set("Content-Type", "application/json") + req.Cancel = ctx.Done() + + resp, err := cc.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode == http.StatusBadRequest { + if strings.Contains(string(b), mvcc.ErrCompacted.Error()) { + return nil, rpctypes.ErrCompacted + } + if strings.Contains(string(b), mvcc.ErrFutureRev.Error()) { + return nil, rpctypes.ErrFutureRev + } + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unknown error: %s", string(b)) + } + + hashResp := &pb.HashKVResponse{} + if err := json.Unmarshal(b, hashResp); err != nil { + return nil, err + } + return hashResp, nil +} diff --git a/vendor/go.etcd.io/etcd/etcdserver/metrics.go b/vendor/go.etcd.io/etcd/etcdserver/metrics.go index e7f4a3747b34..e0c0cde85538 100644 --- a/vendor/go.etcd.io/etcd/etcdserver/metrics.go +++ b/vendor/go.etcd.io/etcd/etcdserver/metrics.go @@ -207,7 +207,7 @@ func monitorFileDescriptor(lg *zap.Logger, done <-chan struct{}) { } if used >= limit/5*4 { if lg != nil { - lg.Warn("80%% of file descriptors are used", zap.Uint64("used", used), zap.Uint64("limit", limit)) + lg.Warn("80% of file descriptors are used", zap.Uint64("used", used), zap.Uint64("limit", limit)) } else { plog.Warningf("80%% of the file descriptor limit is used [used = %d, limit = %d]", used, limit) } diff --git a/vendor/go.etcd.io/etcd/etcdserver/server.go b/vendor/go.etcd.io/etcd/etcdserver/server.go index 78daa0ea97bf..4db1998d6fc1 100644 --- a/vendor/go.etcd.io/etcd/etcdserver/server.go +++ b/vendor/go.etcd.io/etcd/etcdserver/server.go @@ -50,6 +50,7 @@ import ( "go.etcd.io/etcd/pkg/pbutil" "go.etcd.io/etcd/pkg/runtime" "go.etcd.io/etcd/pkg/schedule" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/pkg/types" "go.etcd.io/etcd/pkg/wait" "go.etcd.io/etcd/raft" @@ -785,7 +786,7 @@ func (s *EtcdServer) start() { } else { plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String())) } - membership.ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": s.ClusterVersion().String()}).Set(1) + membership.ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(s.ClusterVersion().String())}).Set(1) } else { if lg != nil { lg.Info( @@ -806,12 +807,13 @@ func (s *EtcdServer) start() { func (s *EtcdServer) purgeFile() { var dberrc, serrc, werrc <-chan error + var dbdonec, sdonec, wdonec <-chan struct{} if s.Cfg.MaxSnapFiles > 0 { - dberrc = fileutil.PurgeFile(s.getLogger(), s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done) - serrc = fileutil.PurgeFile(s.getLogger(), s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done) + dbdonec, dberrc = fileutil.PurgeFileWithDoneNotify(s.getLogger(), s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping) + sdonec, serrc = fileutil.PurgeFileWithDoneNotify(s.getLogger(), s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping) } if s.Cfg.MaxWALFiles > 0 { - werrc = fileutil.PurgeFile(s.getLogger(), s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.done) + wdonec, werrc = fileutil.PurgeFileWithDoneNotify(s.getLogger(), s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.stopping) } lg := s.getLogger() @@ -835,6 +837,15 @@ func (s *EtcdServer) purgeFile() { plog.Fatalf("failed to purge wal file %v", e) } case <-s.stopping: + if dbdonec != nil { + <-dbdonec + } + if sdonec != nil { + <-sdonec + } + if wdonec != nil { + <-wdonec + } return } } @@ -1178,7 +1189,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { plog.Info("recovering lessor...") } - s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() }) + s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) }) if lg != nil { lg.Info("restored lease store") diff --git a/vendor/go.etcd.io/etcd/etcdserver/v3_server.go b/vendor/go.etcd.io/etcd/etcdserver/v3_server.go index b2084618b8a8..70b7177d39e9 100644 --- a/vendor/go.etcd.io/etcd/etcdserver/v3_server.go +++ b/vendor/go.etcd.io/etcd/etcdserver/v3_server.go @@ -26,6 +26,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/lease/leasehttp" "go.etcd.io/etcd/mvcc" + "go.etcd.io/etcd/pkg/traceutil" "go.etcd.io/etcd/raft" "github.com/gogo/protobuf/proto" @@ -38,6 +39,7 @@ const ( // However, if the committed entries are very heavy to apply, the gap might grow. // We should stop accepting new proposals if the gap growing to a certain point. maxGapBetweenApplyAndCommitIndex = 5000 + traceThreshold = 100 * time.Millisecond ) type RaftKV interface { @@ -85,14 +87,29 @@ type Authenticator interface { } func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { + trace := traceutil.New("range", + s.getLogger(), + traceutil.Field{Key: "range_begin", Value: string(r.Key)}, + traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)}, + ) + ctx = context.WithValue(ctx, traceutil.TraceKey, trace) + var resp *pb.RangeResponse var err error defer func(start time.Time) { warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err) + if resp != nil { + trace.AddField( + traceutil.Field{Key: "response_count", Value: len(resp.Kvs)}, + traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}, + ) + } + trace.LogIfLong(traceThreshold) }(time.Now()) if !r.Serializable { err = s.linearizableReadNotify(ctx) + trace.Step("agreement among raft nodes before linearized reading") if err != nil { return nil, err } @@ -101,7 +118,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd) } - get := func() { resp, err = s.applyV3Base.Range(nil, r) } + get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) } if serr := s.doSerialize(ctx, chk, get); serr != nil { err = serr return nil, err @@ -110,6 +127,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe } func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) { + ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now()) resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r}) if err != nil { return nil, err @@ -186,7 +204,18 @@ func isTxnReadonly(r *pb.TxnRequest) bool { } func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) { + startTime := time.Now() result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r}) + trace := traceutil.TODO() + if result != nil && result.trace != nil { + trace = result.trace + defer func() { + trace.LogIfLong(traceThreshold) + }() + applyStart := result.trace.GetStartTime() + result.trace.SetStartTime(startTime) + trace.InsertStep(0, applyStart, "process raft request") + } if r.Physical && result != nil && result.physc != nil { <-result.physc // The compaction is done deleting keys; the hash is now settled @@ -195,6 +224,7 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb. // if the compaction resumes. Force the finished compaction to // commit so it won't resume following a crash. s.be.ForceCommit() + trace.Step("physically apply compaction") } if err != nil { return nil, err @@ -210,6 +240,7 @@ func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb. resp.Header = &pb.ResponseHeader{} } resp.Header.Revision = s.kv.Rev() + trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision}) return resp, nil } @@ -533,20 +564,25 @@ func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftReque if result.err != nil { return nil, result.err } + if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.trace != nil { + applyStart := result.trace.GetStartTime() + // The trace object is created in apply. Here reset the start time to trace + // the raft request time by the difference between the request start time + // and apply start time + result.trace.SetStartTime(startTime) + result.trace.InsertStep(0, applyStart, "process raft request") + result.trace.LogIfLong(traceThreshold) + } return result.resp, nil } func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) { - for { - resp, err := s.raftRequestOnce(ctx, r) - if err != auth.ErrAuthOldRevision { - return resp, err - } - } + return s.raftRequestOnce(ctx, r) } // doSerialize handles the auth logic, with permissions checked by "chk", for a serialized request "get". Returns a non-nil error on authentication failure. func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error { + trace := traceutil.Get(ctx) ai, err := s.AuthInfoFromCtx(ctx) if err != nil { return err @@ -558,6 +594,7 @@ func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) e if err = chk(ai); err != nil { return err } + trace.Step("get authentication metadata") // fetch response for serialized request get() // check for stale token revision in case the auth store was updated while diff --git a/vendor/go.etcd.io/etcd/lease/lessor.go b/vendor/go.etcd.io/etcd/lease/lessor.go index b4437bd460ef..b16099fbf1fb 100644 --- a/vendor/go.etcd.io/etcd/lease/lessor.go +++ b/vendor/go.etcd.io/etcd/lease/lessor.go @@ -291,14 +291,14 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { } le.leaseMap[id] = l - item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()} - le.leaseExpiredNotifier.RegisterOrUpdate(item) l.persistTo(le.b) leaseTotalTTLs.Observe(float64(l.ttl)) leaseGranted.Inc() if le.isPrimary() { + item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()} + le.leaseExpiredNotifier.RegisterOrUpdate(item) le.scheduleCheckpointIfNeeded(l) } @@ -505,6 +505,7 @@ func (le *lessor) Demote() { } le.clearScheduledLeasesCheckpoints() + le.clearLeaseExpiredNotifier() if le.demotec != nil { close(le.demotec) @@ -648,6 +649,10 @@ func (le *lessor) clearScheduledLeasesCheckpoints() { le.leaseCheckpointHeap = make(LeaseQueue, 0) } +func (le *lessor) clearLeaseExpiredNotifier() { + le.leaseExpiredNotifier = newLeaseExpiredNotifier() +} + // expireExists returns true if expiry items exist. // It pops only when expiry item exists. // "next" is true, to indicate that it may exist in next attempt. diff --git a/vendor/go.etcd.io/etcd/mvcc/backend/backend.go b/vendor/go.etcd.io/etcd/mvcc/backend/backend.go index bffd74950b46..26f196fbff3a 100644 --- a/vendor/go.etcd.io/etcd/mvcc/backend/backend.go +++ b/vendor/go.etcd.io/etcd/mvcc/backend/backend.go @@ -369,13 +369,27 @@ func (b *backend) defrag() error { b.batchTx.tx = nil - tmpdb, err := bolt.Open(b.db.Path()+".tmp", 0600, boltOpenOptions) + // Create a temporary file to ensure we start with a clean slate. + // Snapshotter.cleanupSnapdir cleans up any of these that are found during startup. + dir := filepath.Dir(b.db.Path()) + temp, err := ioutil.TempFile(dir, "db.tmp.*") + if err != nil { + return err + } + options := bolt.Options{} + if boltOpenOptions != nil { + options = *boltOpenOptions + } + options.OpenFile = func(path string, i int, mode os.FileMode) (file *os.File, err error) { + return temp, nil + } + tdbp := temp.Name() + tmpdb, err := bolt.Open(tdbp, 0600, &options) if err != nil { return err } dbp := b.db.Path() - tdbp := tmpdb.Path() size1, sizeInUse1 := b.Size(), b.SizeInUse() if b.lg != nil { b.lg.Info( @@ -387,11 +401,17 @@ func (b *backend) defrag() error { zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))), ) } - + // gofail: var defragBeforeCopy struct{} err = defragdb(b.db, tmpdb, defragLimit) if err != nil { tmpdb.Close() - os.RemoveAll(tmpdb.Path()) + if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil { + if b.lg != nil { + b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr)) + } else { + plog.Fatalf("failed to remove db.tmp after defragmentation completed: %v", rmErr) + } + } return err } @@ -411,6 +431,7 @@ func (b *backend) defrag() error { plog.Fatalf("cannot close database (%s)", err) } } + // gofail: var defragBeforeRename struct{} err = os.Rename(tdbp, dbp) if err != nil { if b.lg != nil { diff --git a/vendor/go.etcd.io/etcd/mvcc/kv.go b/vendor/go.etcd.io/etcd/mvcc/kv.go index 8e898a5ad3db..c057f9261183 100644 --- a/vendor/go.etcd.io/etcd/mvcc/kv.go +++ b/vendor/go.etcd.io/etcd/mvcc/kv.go @@ -18,6 +18,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" ) type RangeOptions struct { @@ -102,10 +103,10 @@ type KV interface { WriteView // Read creates a read transaction. - Read() TxnRead + Read(trace *traceutil.Trace) TxnRead // Write creates a write transaction. - Write() TxnWrite + Write(trace *traceutil.Trace) TxnWrite // Hash computes the hash of the KV's backend. Hash() (hash uint32, revision int64, err error) @@ -114,7 +115,7 @@ type KV interface { HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) // Compact frees all superseded keys with revisions less than rev. - Compact(rev int64) (<-chan struct{}, error) + Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) // Commit commits outstanding txns into the underlying backend. Commit() diff --git a/vendor/go.etcd.io/etcd/mvcc/kv_view.go b/vendor/go.etcd.io/etcd/mvcc/kv_view.go index bd2e77729ff2..d4f0ca6880ac 100644 --- a/vendor/go.etcd.io/etcd/mvcc/kv_view.go +++ b/vendor/go.etcd.io/etcd/mvcc/kv_view.go @@ -14,24 +14,27 @@ package mvcc -import "go.etcd.io/etcd/lease" +import ( + "go.etcd.io/etcd/lease" + "go.etcd.io/etcd/pkg/traceutil" +) type readView struct{ kv KV } func (rv *readView) FirstRev() int64 { - tr := rv.kv.Read() + tr := rv.kv.Read(traceutil.TODO()) defer tr.End() return tr.FirstRev() } func (rv *readView) Rev() int64 { - tr := rv.kv.Read() + tr := rv.kv.Read(traceutil.TODO()) defer tr.End() return tr.Rev() } func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) { - tr := rv.kv.Read() + tr := rv.kv.Read(traceutil.TODO()) defer tr.End() return tr.Range(key, end, ro) } @@ -39,13 +42,13 @@ func (rv *readView) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err type writeView struct{ kv KV } func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) { - tw := wv.kv.Write() + tw := wv.kv.Write(traceutil.TODO()) defer tw.End() return tw.DeleteRange(key, end) } func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) { - tw := wv.kv.Write() + tw := wv.kv.Write(traceutil.TODO()) defer tw.End() return tw.Put(key, value, lease) } diff --git a/vendor/go.etcd.io/etcd/mvcc/kvstore.go b/vendor/go.etcd.io/etcd/mvcc/kvstore.go index 27c50db09b39..997aaaf562cd 100644 --- a/vendor/go.etcd.io/etcd/mvcc/kvstore.go +++ b/vendor/go.etcd.io/etcd/mvcc/kvstore.go @@ -29,6 +29,7 @@ import ( "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/schedule" + "go.etcd.io/etcd/pkg/traceutil" "github.com/coreos/pkg/capnslog" "go.uber.org/zap" @@ -140,7 +141,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig ConsistentI s.ReadView = &readView{s} s.WriteView = &writeView{s} if s.le != nil { - s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() }) + s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) }) } tx := s.b.BatchTx() @@ -269,15 +270,16 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { return nil, nil } -func (s *store) compact(rev int64) (<-chan struct{}, error) { - start := time.Now() - keep := s.kvindex.Compact(rev) +func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { ch := make(chan struct{}) var j = func(ctx context.Context) { if ctx.Err() != nil { s.compactBarrier(ctx, ch) return } + start := time.Now() + keep := s.kvindex.Compact(rev) + indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) if !s.scheduleCompaction(rev, keep) { s.compactBarrier(nil, ch) return @@ -286,8 +288,7 @@ func (s *store) compact(rev int64) (<-chan struct{}, error) { } s.fifoSched.Schedule(j) - - indexCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond)) + trace.Step("schedule compaction") return ch, nil } @@ -297,21 +298,21 @@ func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) { return ch, err } - return s.compact(rev) + return s.compact(traceutil.TODO(), rev) } -func (s *store) Compact(rev int64) (<-chan struct{}, error) { +func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) { s.mu.Lock() ch, err := s.updateCompactRev(rev) - + trace.Step("check and update compact revision") if err != nil { s.mu.Unlock() return ch, err } s.mu.Unlock() - return s.compact(rev) + return s.compact(trace, rev) } // DefaultIgnores is a map of keys to ignore in hash checking. @@ -355,19 +356,7 @@ func (s *store) Restore(b backend.Backend) error { } func (s *store) restore() error { - b := s.b - reportDbTotalSizeInBytesMu.Lock() - reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) } - reportDbTotalSizeInBytesMu.Unlock() - reportDbTotalSizeInBytesDebugMu.Lock() - reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) } - reportDbTotalSizeInBytesDebugMu.Unlock() - reportDbTotalSizeInUseInBytesMu.Lock() - reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) } - reportDbTotalSizeInUseInBytesMu.Unlock() - reportDbOpenReadTxNMu.Lock() - reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) } - reportDbOpenReadTxNMu.Unlock() + s.setupMetricsReporter() min, max := newRevBytes(), newRevBytes() revToBytes(revision{main: 1}, min) @@ -579,6 +568,36 @@ func (s *store) ConsistentIndex() uint64 { return v } +func (s *store) setupMetricsReporter() { + b := s.b + reportDbTotalSizeInBytesMu.Lock() + reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) } + reportDbTotalSizeInBytesMu.Unlock() + reportDbTotalSizeInBytesDebugMu.Lock() + reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) } + reportDbTotalSizeInBytesDebugMu.Unlock() + reportDbTotalSizeInUseInBytesMu.Lock() + reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) } + reportDbTotalSizeInUseInBytesMu.Unlock() + reportDbOpenReadTxNMu.Lock() + reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) } + reportDbOpenReadTxNMu.Unlock() + reportCurrentRevMu.Lock() + reportCurrentRev = func() float64 { + s.revMu.RLock() + defer s.revMu.RUnlock() + return float64(s.currentRev) + } + reportCurrentRevMu.Unlock() + reportCompactRevMu.Lock() + reportCompactRev = func() float64 { + s.revMu.RLock() + defer s.revMu.RUnlock() + return float64(s.compactMainRev) + } + reportCompactRevMu.Unlock() +} + // appendMarkTombstone appends tombstone mark to normal revision bytes. func appendMarkTombstone(lg *zap.Logger, b []byte) []byte { if len(b) != revBytesLen { diff --git a/vendor/go.etcd.io/etcd/mvcc/kvstore_compaction.go b/vendor/go.etcd.io/etcd/mvcc/kvstore_compaction.go index 2adb4985437b..4c6b062b433d 100644 --- a/vendor/go.etcd.io/etcd/mvcc/kvstore_compaction.go +++ b/vendor/go.etcd.io/etcd/mvcc/kvstore_compaction.go @@ -43,6 +43,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc rev = bytesToRev(key) if _, ok := keep[rev]; !ok { tx.UnsafeDelete(keyBucketName, key) + keyCompactions++ } } diff --git a/vendor/go.etcd.io/etcd/mvcc/kvstore_txn.go b/vendor/go.etcd.io/etcd/mvcc/kvstore_txn.go index 9698254644db..716a6d82ff2e 100644 --- a/vendor/go.etcd.io/etcd/mvcc/kvstore_txn.go +++ b/vendor/go.etcd.io/etcd/mvcc/kvstore_txn.go @@ -18,6 +18,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -27,9 +28,11 @@ type storeTxnRead struct { firstRev int64 rev int64 + + trace *traceutil.Trace } -func (s *store) Read() TxnRead { +func (s *store) Read(trace *traceutil.Trace) TxnRead { s.mu.RLock() s.revMu.RLock() // backend holds b.readTx.RLock() only when creating the concurrentReadTx. After @@ -38,7 +41,7 @@ func (s *store) Read() TxnRead { tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created. firstRev, rev := s.compactMainRev, s.currentRev s.revMu.RUnlock() - return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev}) + return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace}) } func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev } @@ -61,12 +64,12 @@ type storeTxnWrite struct { changes []mvccpb.KeyValue } -func (s *store) Write() TxnWrite { +func (s *store) Write(trace *traceutil.Trace) TxnWrite { s.mu.RLock() tx := s.b.BatchTx() tx.Lock() tw := &storeTxnWrite{ - storeTxnRead: storeTxnRead{s, tx, 0, 0}, + storeTxnRead: storeTxnRead{s, tx, 0, 0, trace}, tx: tx, beginRev: s.currentRev, changes: make([]mvccpb.KeyValue, 0, 4), @@ -124,6 +127,7 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions } revpairs := tr.s.kvindex.Revisions(key, end, rev) + tr.trace.Step("range keys from in-memory index tree") if len(revpairs) == 0 { return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil } @@ -163,6 +167,7 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions } } } + tr.trace.Step("range keys from bolt db") return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil } @@ -178,7 +183,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { c = created.main oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)}) } - + tw.trace.Step("get key's previous created_revision and leaseID") ibytes := newRevBytes() idxRev := revision{main: rev, sub: int64(len(tw.changes))} revToBytes(idxRev, ibytes) @@ -205,9 +210,11 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { } } + tw.trace.Step("marshal mvccpb.KeyValue") tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) tw.s.kvindex.Put(key, idxRev) tw.changes = append(tw.changes, kv) + tw.trace.Step("store kv pair into bolt db") if oldLease != lease.NoLease { if tw.s.le == nil { @@ -234,6 +241,7 @@ func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) { panic("unexpected error from lease Attach") } } + tw.trace.Step("attach lease to kv pair") } func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 { diff --git a/vendor/go.etcd.io/etcd/mvcc/metrics.go b/vendor/go.etcd.io/etcd/mvcc/metrics.go index 9bcbc8fe3cff..42932c40d348 100644 --- a/vendor/go.etcd.io/etcd/mvcc/metrics.go +++ b/vendor/go.etcd.io/etcd/mvcc/metrics.go @@ -264,6 +264,46 @@ var ( // highest bucket start of 0.01 sec * 2^14 == 163.84 sec Buckets: prometheus.ExponentialBuckets(.01, 2, 15), }) + + currentRev = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "current_revision", + Help: "The current revision of store.", + }, + func() float64 { + reportCurrentRevMu.RLock() + defer reportCurrentRevMu.RUnlock() + return reportCurrentRev() + }, + ) + // overridden by mvcc initialization + reportCurrentRevMu sync.RWMutex + reportCurrentRev = func() float64 { return 0 } + + compactRev = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "compact_revision", + Help: "The revision of the last compaction in store.", + }, + func() float64 { + reportCompactRevMu.RLock() + defer reportCompactRevMu.RUnlock() + return reportCompactRev() + }, + ) + // overridden by mvcc initialization + reportCompactRevMu sync.RWMutex + reportCompactRev = func() float64 { return 0 } + + totalPutSizeGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "etcd_debugging", + Subsystem: "mvcc", + Name: "total_put_size_in_bytes", + Help: "The total size of put kv pairs seen by this member.", + }) ) func init() { @@ -291,6 +331,9 @@ func init() { prometheus.MustRegister(dbOpenReadTxN) prometheus.MustRegister(hashSec) prometheus.MustRegister(hashRevSec) + prometheus.MustRegister(currentRev) + prometheus.MustRegister(compactRev) + prometheus.MustRegister(totalPutSizeGauge) } // ReportEventReceived reports that an event is received. diff --git a/vendor/go.etcd.io/etcd/mvcc/metrics_txn.go b/vendor/go.etcd.io/etcd/mvcc/metrics_txn.go index 64b629c785b6..17f1b31caf7d 100644 --- a/vendor/go.etcd.io/etcd/mvcc/metrics_txn.go +++ b/vendor/go.etcd.io/etcd/mvcc/metrics_txn.go @@ -21,14 +21,15 @@ type metricsTxnWrite struct { ranges uint puts uint deletes uint + putSize int64 } func newMetricsTxnRead(tr TxnRead) TxnRead { - return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0} + return &metricsTxnWrite{&txnReadWrite{tr}, 0, 0, 0, 0} } func newMetricsTxnWrite(tw TxnWrite) TxnWrite { - return &metricsTxnWrite{tw, 0, 0, 0} + return &metricsTxnWrite{tw, 0, 0, 0, 0} } func (tw *metricsTxnWrite) Range(key, end []byte, ro RangeOptions) (*RangeResult, error) { @@ -43,6 +44,8 @@ func (tw *metricsTxnWrite) DeleteRange(key, end []byte) (n, rev int64) { func (tw *metricsTxnWrite) Put(key, value []byte, lease lease.LeaseID) (rev int64) { tw.puts++ + size := int64(len(key) + len(value)) + tw.putSize += size return tw.TxnWrite.Put(key, value, lease) } @@ -60,6 +63,7 @@ func (tw *metricsTxnWrite) End() { puts := float64(tw.puts) putCounter.Add(puts) putCounterDebug.Add(puts) // TODO: remove in 3.5 release + totalPutSizeGauge.Add(float64(tw.putSize)) deletes := float64(tw.deletes) deleteCounter.Add(deletes) diff --git a/vendor/go.etcd.io/etcd/mvcc/watchable_store.go b/vendor/go.etcd.io/etcd/mvcc/watchable_store.go index 3cf491d1fdf2..a51e5aa529b3 100644 --- a/vendor/go.etcd.io/etcd/mvcc/watchable_store.go +++ b/vendor/go.etcd.io/etcd/mvcc/watchable_store.go @@ -21,6 +21,7 @@ import ( "go.etcd.io/etcd/lease" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" "go.uber.org/zap" ) @@ -84,7 +85,7 @@ func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ig Co s.store.WriteView = &writeView{s} if s.le != nil { // use this store as the deleter so revokes trigger watch events - s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() }) + s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) }) } s.wg.Add(2) go s.syncWatchersLoop() diff --git a/vendor/go.etcd.io/etcd/mvcc/watchable_store_txn.go b/vendor/go.etcd.io/etcd/mvcc/watchable_store_txn.go index 3bcfa4d7566b..70b12983d970 100644 --- a/vendor/go.etcd.io/etcd/mvcc/watchable_store_txn.go +++ b/vendor/go.etcd.io/etcd/mvcc/watchable_store_txn.go @@ -14,7 +14,10 @@ package mvcc -import "go.etcd.io/etcd/mvcc/mvccpb" +import ( + "go.etcd.io/etcd/mvcc/mvccpb" + "go.etcd.io/etcd/pkg/traceutil" +) func (tw *watchableStoreTxnWrite) End() { changes := tw.Changes() @@ -48,4 +51,6 @@ type watchableStoreTxnWrite struct { s *watchableStore } -func (s *watchableStore) Write() TxnWrite { return &watchableStoreTxnWrite{s.store.Write(), s} } +func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite { + return &watchableStoreTxnWrite{s.store.Write(trace), s} +} diff --git a/vendor/go.etcd.io/etcd/pkg/fileutil/purge.go b/vendor/go.etcd.io/etcd/pkg/fileutil/purge.go index fda96c371143..d116f340b6f4 100644 --- a/vendor/go.etcd.io/etcd/pkg/fileutil/purge.go +++ b/vendor/go.etcd.io/etcd/pkg/fileutil/purge.go @@ -25,13 +25,23 @@ import ( ) func PurgeFile(lg *zap.Logger, dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}) <-chan error { - return purgeFile(lg, dirname, suffix, max, interval, stop, nil) + return purgeFile(lg, dirname, suffix, max, interval, stop, nil, nil) +} + +func PurgeFileWithDoneNotify(lg *zap.Logger, dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}) (<-chan struct{}, <-chan error) { + doneC := make(chan struct{}) + errC := purgeFile(lg, dirname, suffix, max, interval, stop, nil, doneC) + return doneC, errC } // purgeFile is the internal implementation for PurgeFile which can post purged files to purgec if non-nil. -func purgeFile(lg *zap.Logger, dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}, purgec chan<- string) <-chan error { +// if donec is non-nil, the function closes it to notify its exit. +func purgeFile(lg *zap.Logger, dirname string, suffix string, max uint, interval time.Duration, stop <-chan struct{}, purgec chan<- string, donec chan<- struct{}) <-chan error { errC := make(chan error, 1) go func() { + if donec != nil { + defer close(donec) + } for { fnames, err := ReadDir(dirname) if err != nil { diff --git a/vendor/go.etcd.io/etcd/pkg/ioutil/pagewriter.go b/vendor/go.etcd.io/etcd/pkg/ioutil/pagewriter.go index 72de1593d3ad..cf9a8dc664dc 100644 --- a/vendor/go.etcd.io/etcd/pkg/ioutil/pagewriter.go +++ b/vendor/go.etcd.io/etcd/pkg/ioutil/pagewriter.go @@ -95,12 +95,23 @@ func (pw *PageWriter) Write(p []byte) (n int, err error) { return n, werr } +// Flush flushes buffered data. func (pw *PageWriter) Flush() error { + _, err := pw.flush() + return err +} + +// FlushN flushes buffered data and returns the number of written bytes. +func (pw *PageWriter) FlushN() (int, error) { + return pw.flush() +} + +func (pw *PageWriter) flush() (int, error) { if pw.bufferedBytes == 0 { - return nil + return 0, nil } - _, err := pw.w.Write(pw.buf[:pw.bufferedBytes]) + n, err := pw.w.Write(pw.buf[:pw.bufferedBytes]) pw.pageOffset = (pw.pageOffset + pw.bufferedBytes) % pw.pageBytes pw.bufferedBytes = 0 - return err + return n, err } diff --git a/vendor/go.etcd.io/etcd/pkg/logutil/zap.go b/vendor/go.etcd.io/etcd/pkg/logutil/zap.go index 2f692233aa8c..8fc6e03b77bd 100644 --- a/vendor/go.etcd.io/etcd/pkg/logutil/zap.go +++ b/vendor/go.etcd.io/etcd/pkg/logutil/zap.go @@ -53,15 +53,12 @@ var DefaultZapLoggerConfig = zap.Config{ ErrorOutputPaths: []string{"stderr"}, } -// AddOutputPaths adds output paths to the existing output paths, resolving conflicts. -func AddOutputPaths(cfg zap.Config, outputPaths, errorOutputPaths []string) zap.Config { +// MergeOutputPaths merges logging output paths, resolving conflicts. +func MergeOutputPaths(cfg zap.Config) zap.Config { outputs := make(map[string]struct{}) for _, v := range cfg.OutputPaths { outputs[v] = struct{}{} } - for _, v := range outputPaths { - outputs[v] = struct{}{} - } outputSlice := make([]string, 0) if _, ok := outputs["/dev/null"]; ok { // "/dev/null" to discard all @@ -78,9 +75,6 @@ func AddOutputPaths(cfg zap.Config, outputPaths, errorOutputPaths []string) zap. for _, v := range cfg.ErrorOutputPaths { errOutputs[v] = struct{}{} } - for _, v := range errorOutputPaths { - errOutputs[v] = struct{}{} - } errOutputSlice := make([]string, 0) if _, ok := errOutputs["/dev/null"]; ok { // "/dev/null" to discard all diff --git a/vendor/go.etcd.io/etcd/pkg/traceutil/trace.go b/vendor/go.etcd.io/etcd/pkg/traceutil/trace.go new file mode 100644 index 000000000000..2d247dd9accb --- /dev/null +++ b/vendor/go.etcd.io/etcd/pkg/traceutil/trace.go @@ -0,0 +1,172 @@ +// Copyright 2019 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package traceutil implements tracing utilities using "context". +package traceutil + +import ( + "bytes" + "context" + "fmt" + "math/rand" + "time" + + "go.uber.org/zap" +) + +const ( + TraceKey = "trace" + StartTimeKey = "startTime" +) + +// Field is a kv pair to record additional details of the trace. +type Field struct { + Key string + Value interface{} +} + +func (f *Field) format() string { + return fmt.Sprintf("%s:%v; ", f.Key, f.Value) +} + +func writeFields(fields []Field) string { + if len(fields) == 0 { + return "" + } + var buf bytes.Buffer + buf.WriteString("{") + for _, f := range fields { + buf.WriteString(f.format()) + } + buf.WriteString("}") + return buf.String() +} + +type Trace struct { + operation string + lg *zap.Logger + fields []Field + startTime time.Time + steps []step + stepDisabled bool +} + +type step struct { + time time.Time + msg string + fields []Field +} + +func New(op string, lg *zap.Logger, fields ...Field) *Trace { + return &Trace{operation: op, lg: lg, startTime: time.Now(), fields: fields} +} + +// TODO returns a non-nil, empty Trace +func TODO() *Trace { + return &Trace{} +} + +func Get(ctx context.Context) *Trace { + if trace, ok := ctx.Value(TraceKey).(*Trace); ok && trace != nil { + return trace + } + return TODO() +} + +func (t *Trace) GetStartTime() time.Time { + return t.startTime +} + +func (t *Trace) SetStartTime(time time.Time) { + t.startTime = time +} + +func (t *Trace) InsertStep(at int, time time.Time, msg string, fields ...Field) { + newStep := step{time, msg, fields} + if at < len(t.steps) { + t.steps = append(t.steps[:at+1], t.steps[at:]...) + t.steps[at] = newStep + } else { + t.steps = append(t.steps, newStep) + } +} + +// Step adds step to trace +func (t *Trace) Step(msg string, fields ...Field) { + if !t.stepDisabled { + t.steps = append(t.steps, step{time: time.Now(), msg: msg, fields: fields}) + } +} + +// DisableStep sets the flag to prevent the trace from adding steps +func (t *Trace) DisableStep() { + t.stepDisabled = true +} + +// EnableStep re-enable the trace to add steps +func (t *Trace) EnableStep() { + t.stepDisabled = false +} + +func (t *Trace) AddField(fields ...Field) { + for _, f := range fields { + t.fields = append(t.fields, f) + } +} + +// Log dumps all steps in the Trace +func (t *Trace) Log() { + t.LogWithStepThreshold(0) +} + +// LogIfLong dumps logs if the duration is longer than threshold +func (t *Trace) LogIfLong(threshold time.Duration) { + if time.Since(t.startTime) > threshold { + stepThreshold := threshold / time.Duration(len(t.steps)+1) + t.LogWithStepThreshold(stepThreshold) + } +} + +// LogWithStepThreshold only dumps step whose duration is longer than step threshold +func (t *Trace) LogWithStepThreshold(threshold time.Duration) { + msg, fs := t.logInfo(threshold) + if t.lg != nil { + t.lg.Info(msg, fs...) + } +} + +func (t *Trace) logInfo(threshold time.Duration) (string, []zap.Field) { + endTime := time.Now() + totalDuration := endTime.Sub(t.startTime) + traceNum := rand.Int31() + msg := fmt.Sprintf("trace[%d] %s", traceNum, t.operation) + + var steps []string + lastStepTime := t.startTime + for _, step := range t.steps { + stepDuration := step.time.Sub(lastStepTime) + if stepDuration > threshold { + steps = append(steps, fmt.Sprintf("trace[%d] '%v' %s (duration: %v)", + traceNum, step.msg, writeFields(step.fields), stepDuration)) + } + lastStepTime = step.time + } + + fs := []zap.Field{zap.String("detail", writeFields(t.fields)), + zap.Duration("duration", totalDuration), + zap.Time("start", t.startTime), + zap.Time("end", endTime), + zap.Strings("steps", steps)} + return msg, fs +} diff --git a/vendor/go.etcd.io/etcd/raft/confchange/confchange.go b/vendor/go.etcd.io/etcd/raft/confchange/confchange.go index 58c74bfb53c8..a0dc486df4f6 100644 --- a/vendor/go.etcd.io/etcd/raft/confchange/confchange.go +++ b/vendor/go.etcd.io/etcd/raft/confchange/confchange.go @@ -257,11 +257,15 @@ func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id u nilAwareAdd(&cfg.Learners, id) } prs[id] = &tracker.Progress{ - // We initialize Progress.Next with lastIndex+1 so that the peer will be - // probed without an index first. + // Initializing the Progress with the last index means that the follower + // can be probed (with the last index). // - // TODO(tbg): verify that, this is just my best guess. - Next: c.LastIndex + 1, + // TODO(tbg): seems awfully optimistic. Using the first index would be + // better. The general expectation here is that the follower has no log + // at all (and will thus likely need a snapshot), though the app may + // have applied a snapshot out of band before adding the replica (thus + // making the first index the better choice). + Next: c.LastIndex, Match: 0, Inflights: tracker.NewInflights(c.Tracker.MaxInflight), IsLearner: isLearner, diff --git a/vendor/go.etcd.io/etcd/raft/log_unstable.go b/vendor/go.etcd.io/etcd/raft/log_unstable.go index 1005bf65cc5f..1bff5a7bdcb9 100644 --- a/vendor/go.etcd.io/etcd/raft/log_unstable.go +++ b/vendor/go.etcd.io/etcd/raft/log_unstable.go @@ -55,10 +55,7 @@ func (u *unstable) maybeLastIndex() (uint64, bool) { // is any. func (u *unstable) maybeTerm(i uint64) (uint64, bool) { if i < u.offset { - if u.snapshot == nil { - return 0, false - } - if u.snapshot.Metadata.Index == i { + if u.snapshot != nil && u.snapshot.Metadata.Index == i { return u.snapshot.Metadata.Term, true } return 0, false @@ -71,6 +68,7 @@ func (u *unstable) maybeTerm(i uint64) (uint64, bool) { if i > last { return 0, false } + return u.entries[i-u.offset].Term, true } diff --git a/vendor/go.etcd.io/etcd/raft/raft.go b/vendor/go.etcd.io/etcd/raft/raft.go index 83d783eb3387..d3c3f42574b1 100644 --- a/vendor/go.etcd.io/etcd/raft/raft.go +++ b/vendor/go.etcd.io/etcd/raft/raft.go @@ -367,7 +367,7 @@ func newRaft(c *Config) *raft { } assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs)) - if !isHardStateEqual(hs, emptyState) { + if !IsEmptyHardState(hs) { r.loadState(hs) } if c.Applied > 0 { @@ -1036,10 +1036,36 @@ func stepLeader(r *raft, m pb.Message) error { for i := range m.Entries { e := &m.Entries[i] - if e.Type == pb.EntryConfChange || e.Type == pb.EntryConfChangeV2 { - if r.pendingConfIndex > r.raftLog.applied { - r.logger.Infof("%x propose conf %s ignored since pending unapplied configuration [index %d, applied %d]", - r.id, e, r.pendingConfIndex, r.raftLog.applied) + var cc pb.ConfChangeI + if e.Type == pb.EntryConfChange { + var ccc pb.ConfChange + if err := ccc.Unmarshal(e.Data); err != nil { + panic(err) + } + cc = ccc + } else if e.Type == pb.EntryConfChangeV2 { + var ccc pb.ConfChangeV2 + if err := ccc.Unmarshal(e.Data); err != nil { + panic(err) + } + cc = ccc + } + if cc != nil { + alreadyPending := r.pendingConfIndex > r.raftLog.applied + alreadyJoint := len(r.prs.Config.Voters[1]) > 0 + wantsLeaveJoint := len(cc.AsV2().Changes) == 0 + + var refused string + if alreadyPending { + refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied) + } else if alreadyJoint && !wantsLeaveJoint { + refused = "must transition out of joint config first" + } else if !alreadyJoint && wantsLeaveJoint { + refused = "not in joint state; refusing empty conf change" + } + + if refused != "" { + r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused) m.Entries[i] = pb.Entry{Type: pb.EntryNormal} } else { r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1 @@ -1073,7 +1099,7 @@ func stepLeader(r *raft, m pb.Message) error { case ReadOnlyLeaseBased: ri := r.raftLog.committed if m.From == None || m.From == r.id { // from local member - r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data}) + r.readStates = append(r.readStates, ReadState{Index: ri, RequestCtx: m.Entries[0].Data}) } else { r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries}) } @@ -1527,10 +1553,18 @@ func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.Co if r.state != StateLeader || len(cs.Voters) == 0 { return cs } + if r.maybeCommit() { - // The quorum size may have been reduced (but not to zero), so see if - // any pending entries can be committed. + // If the configuration change means that more entries are committed now, + // broadcast/append to everyone in the updated config. r.bcastAppend() + } else { + // Otherwise, still probe the newly added replicas; there's no reason to + // let them wait out a heartbeat interval (or the next incoming + // proposal). + r.prs.Visit(func(id uint64, pr *tracker.Progress) { + r.maybeSendAppend(id, false /* sendIfEmpty */) + }) } // If the the leadTransferee was removed, abort the leadership transfer. if _, tOK := r.prs.Progress[r.leadTransferee]; !tOK && r.leadTransferee != 0 { diff --git a/vendor/go.etcd.io/etcd/raft/util.go b/vendor/go.etcd.io/etcd/raft/util.go index 881a6e14e241..785cf735d5db 100644 --- a/vendor/go.etcd.io/etcd/raft/util.go +++ b/vendor/go.etcd.io/etcd/raft/util.go @@ -77,8 +77,8 @@ func DescribeSoftState(ss SoftState) string { func DescribeConfState(state pb.ConfState) string { return fmt.Sprintf( - "Voters:%v VotersOutgoing:%v Learners:%v LearnersNext:%v", - state.Voters, state.VotersOutgoing, state.Learners, state.LearnersNext, + "Voters:%v VotersOutgoing:%v Learners:%v LearnersNext:%v AutoLeave:%v", + state.Voters, state.VotersOutgoing, state.Learners, state.LearnersNext, state.AutoLeave, ) } diff --git a/vendor/go.etcd.io/etcd/version/version.go b/vendor/go.etcd.io/etcd/version/version.go index 9ebf7cc6f5f0..c2960bdd0de4 100644 --- a/vendor/go.etcd.io/etcd/version/version.go +++ b/vendor/go.etcd.io/etcd/version/version.go @@ -26,7 +26,7 @@ import ( var ( // MinClusterVersion is the min cluster version this etcd binary is compatible with. MinClusterVersion = "3.0.0" - Version = "3.4.0-rc.1" + Version = "3.4.7" APIVersion = "unknown" // Git SHA Value will be set during build diff --git a/vendor/go.etcd.io/etcd/wal/encoder.go b/vendor/go.etcd.io/etcd/wal/encoder.go index d3877ed5c4ed..4de853b69a9f 100644 --- a/vendor/go.etcd.io/etcd/wal/encoder.go +++ b/vendor/go.etcd.io/etcd/wal/encoder.go @@ -92,7 +92,8 @@ func (e *encoder) encode(rec *walpb.Record) error { if padBytes != 0 { data = append(data, make([]byte, padBytes)...) } - _, err = e.bw.Write(data) + n, err = e.bw.Write(data) + walWriteBytes.Add(float64(n)) return err } @@ -108,13 +109,16 @@ func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) { func (e *encoder) flush() error { e.mu.Lock() - defer e.mu.Unlock() - return e.bw.Flush() + n, err := e.bw.FlushN() + e.mu.Unlock() + walWriteBytes.Add(float64(n)) + return err } func writeUint64(w io.Writer, n uint64, buf []byte) error { // http://golang.org/src/encoding/binary/binary.go binary.LittleEndian.PutUint64(buf, n) - _, err := w.Write(buf) + nv, err := w.Write(buf) + walWriteBytes.Add(float64(nv)) return err } diff --git a/vendor/go.etcd.io/etcd/wal/metrics.go b/vendor/go.etcd.io/etcd/wal/metrics.go index 22cb8003c98b..814d654cdd30 100644 --- a/vendor/go.etcd.io/etcd/wal/metrics.go +++ b/vendor/go.etcd.io/etcd/wal/metrics.go @@ -27,8 +27,16 @@ var ( // highest bucket start of 0.001 sec * 2^13 == 8.192 sec Buckets: prometheus.ExponentialBuckets(0.001, 2, 14), }) + + walWriteBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "disk", + Name: "wal_write_bytes_total", + Help: "Total number of bytes written in WAL.", + }) ) func init() { prometheus.MustRegister(walFsyncSec) + prometheus.MustRegister(walWriteBytes) } diff --git a/vendor/go.etcd.io/etcd/wal/repair.go b/vendor/go.etcd.io/etcd/wal/repair.go index 15afed01744d..5c7c5d1759f7 100644 --- a/vendor/go.etcd.io/etcd/wal/repair.go +++ b/vendor/go.etcd.io/etcd/wal/repair.go @@ -18,10 +18,10 @@ import ( "io" "os" "path/filepath" + "time" "go.etcd.io/etcd/pkg/fileutil" "go.etcd.io/etcd/wal/walpb" - "go.uber.org/zap" ) @@ -105,6 +105,7 @@ func Repair(lg *zap.Logger, dirpath string) bool { return false } + start := time.Now() if err = fileutil.Fsync(f.File); err != nil { if lg != nil { lg.Warn("failed to fsync", zap.String("path", f.Name()), zap.Error(err)) @@ -113,6 +114,7 @@ func Repair(lg *zap.Logger, dirpath string) bool { } return false } + walFsyncSec.Observe(time.Since(start).Seconds()) if lg != nil { lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.ErrUnexpectedEOF)) diff --git a/vendor/go.etcd.io/etcd/wal/wal.go b/vendor/go.etcd.io/etcd/wal/wal.go index 5f6f21e3a539..f5aff3f1bc14 100644 --- a/vendor/go.etcd.io/etcd/wal/wal.go +++ b/vendor/go.etcd.io/etcd/wal/wal.go @@ -204,6 +204,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) { } return nil, perr } + start := time.Now() if perr = fileutil.Fsync(pdir); perr != nil { if lg != nil { lg.Warn( @@ -215,6 +216,8 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) { } return nil, perr } + walFsyncSec.Observe(time.Since(start).Seconds()) + if perr = pdir.Close(); perr != nil { if lg != nil { lg.Warn( @@ -667,9 +670,11 @@ func (w *WAL) cut() error { if err = os.Rename(newTail.Name(), fpath); err != nil { return err } + start := time.Now() if err = fileutil.Fsync(w.dirFile); err != nil { return err } + walFsyncSec.Observe(time.Since(start).Seconds()) // reopen newTail with its new path so calls to Name() match the wal filename format newTail.Close() diff --git a/vendor/modules.txt b/vendor/modules.txt index 582cc0f1035b..a94c03b4f948 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -573,7 +573,7 @@ github.com/weaveworks/promrus github.com/xiang90/probing # go.etcd.io/bbolt v1.3.3 go.etcd.io/bbolt -# go.etcd.io/etcd v0.0.0-20190815204525-8f85f0dc2607 +# go.etcd.io/etcd v0.0.0-20200401174654-e694b7bb0875 go.etcd.io/etcd/auth go.etcd.io/etcd/auth/authpb go.etcd.io/etcd/client @@ -638,6 +638,7 @@ go.etcd.io/etcd/pkg/schedule go.etcd.io/etcd/pkg/srv go.etcd.io/etcd/pkg/systemd go.etcd.io/etcd/pkg/tlsutil +go.etcd.io/etcd/pkg/traceutil go.etcd.io/etcd/pkg/transport go.etcd.io/etcd/pkg/types go.etcd.io/etcd/pkg/wait