diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index 45ea5d734fdd..77c7c1fd5975 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -25,11 +25,12 @@ import ( "go.etcd.io/etcd/v3/pkg/types" "go.etcd.io/etcd/v3/raft" - pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" + + pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" ) const ( @@ -231,8 +232,13 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor return rpctypes.ErrGRPCNoLeader } - cctx, cancel := context.WithCancel(ss.Context()) - ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss} + cancelCtx, cancelFn := context.WithCancel(ss.Context()) + monitorCtx := &leaderMonitoringContext{ + Context: cancelCtx, + cancel: cancelFn, + } + cancelForLeaderLoss := context.CancelFunc(monitorCtx.CancelForLeaderLoss) + ss = serverStreamWithCtx{ctx: monitorCtx, cancel: &cancelForLeaderLoss, ServerStream: ss} smap.mu.Lock() smap.streams[ss] = struct{}{} @@ -242,7 +248,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor smap.mu.Lock() delete(smap.streams, ss) smap.mu.Unlock() - cancel() + monitorCtx.Cancel() }() } } @@ -251,6 +257,40 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor } } +// leaderMonitoringContext wraps a context and provides a custom error when +// the CancelForLeaderLoss() method is used to cancel the context. This is +// so downstream context users can disambiguate the reason for the cancellation +// which could be from the client (for example) or from this interceptor code. +type leaderMonitoringContext struct { + context.Context + + lock sync.Mutex + cancel context.CancelFunc + cancelReason error +} + +func (c *leaderMonitoringContext) Cancel() { + c.lock.Lock() + defer c.lock.Unlock() + c.cancel() +} + +func (c *leaderMonitoringContext) CancelForLeaderLoss() { + c.lock.Lock() + defer c.lock.Unlock() + c.cancelReason = rpctypes.ErrGRPCNoLeader + c.cancel() +} + +func (c *leaderMonitoringContext) Err() error { + c.lock.Lock() + defer c.lock.Unlock() + if c.cancelReason != nil { + return c.cancelReason + } + return c.Context.Err() +} + type serverStreamWithCtx struct { grpc.ServerStream ctx context.Context diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 26e3fd378c14..24fdc85736c4 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -35,6 +35,8 @@ var ( ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err() ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err() + ErrGRPCWatchCanceled = status.New(codes.Canceled, "etcdserver: watch canceled").Err() + ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err() ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err() ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err() diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 2144779fc4da..504b33a95d95 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -21,14 +21,14 @@ import ( "sync" "time" + "go.uber.org/zap" + "go.etcd.io/etcd/v3/auth" "go.etcd.io/etcd/v3/etcdserver" "go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes" pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" "go.etcd.io/etcd/v3/mvcc" "go.etcd.io/etcd/v3/mvcc/mvccpb" - - "go.uber.org/zap" ) const minWatchProgressInterval = 100 * time.Millisecond @@ -199,13 +199,14 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { select { case err = <-errc: + if err == context.Canceled { + err = rpctypes.ErrGRPCWatchCanceled + } close(sws.ctrlStream) - case <-stream.Context().Done(): err = stream.Context().Err() - // the only server-side cancellation is noleader for now. if err == context.Canceled { - err = rpctypes.ErrGRPCNoLeader + err = rpctypes.ErrGRPCWatchCanceled } } diff --git a/tests/e2e/metrics_test.go b/tests/e2e/metrics_test.go index e20b0f88d5c1..9666f169ebb4 100644 --- a/tests/e2e/metrics_test.go +++ b/tests/e2e/metrics_test.go @@ -49,6 +49,7 @@ func metricsTest(cx ctlCtx) { {"/metrics", fmt.Sprintf("etcd_mvcc_delete_total 3")}, {"/metrics", fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version)}, {"/metrics", fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version))}, + {"/metrics", fmt.Sprintf(`grpc_server_handled_total{grpc_code="Canceled",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} 6`)}, {"/health", `{"health":"true","reason":""}`}, } { i++ @@ -58,7 +59,9 @@ func metricsTest(cx ctlCtx) { if err := ctlV3Del(cx, []string{fmt.Sprintf("%d", i)}, 1); err != nil { cx.t.Fatal(err) } - + if err := ctlV3Watch(cx, []string{"k", "--rev", "1"}, []kvExec{{key: "k", val: "v"}}...); err != nil { + cx.t.Fatal(err) + } if err := cURLGet(cx.epc, cURLReq{endpoint: test.endpoint, expected: test.expected, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil { cx.t.Fatalf("failed get with curl (%v)", err) }