diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 26e3fd378c14..24fdc85736c4 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -35,6 +35,8 @@ var ( ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err() ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err() + ErrGRPCWatchCanceled = status.New(codes.Canceled, "etcdserver: watch canceled").Err() + ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err() ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err() ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err() diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index c623d3d946df..208c453ffa56 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -21,12 +21,16 @@ import ( "sync" "time" + "google.golang.org/grpc/metadata" + "go.etcd.io/etcd/v3/auth" "go.etcd.io/etcd/v3/etcdserver" "go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes" pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" "go.etcd.io/etcd/v3/mvcc" "go.etcd.io/etcd/v3/mvcc/mvccpb" + "go.etcd.io/etcd/v3/pkg/types" + "go.etcd.io/etcd/v3/raft" "go.uber.org/zap" ) @@ -191,9 +195,18 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { case <-stream.Context().Done(): err = stream.Context().Err() - // the only server-side cancellation is noleader for now. if err == context.Canceled { - err = rpctypes.ErrGRPCNoLeader + err = rpctypes.ErrGRPCWatchCanceled + // Try to determine a more specific cancellation error. Use the stream context + // and local leader state to detect leader loss. If the reason is inconclusive, + // assume a client cancellation. + if md, hasMetadata := metadata.FromIncomingContext(stream.Context()); hasMetadata { + if rl := md[rpctypes.MetadataRequireLeaderKey]; len(rl) > 0 && rl[0] == rpctypes.MetadataHasLeader { + if sws.sg.Leader() == types.ID(raft.None) { + err = rpctypes.ErrGRPCNoLeader + } + } + } } } diff --git a/tests/e2e/metrics_test.go b/tests/e2e/metrics_test.go index e20b0f88d5c1..9666f169ebb4 100644 --- a/tests/e2e/metrics_test.go +++ b/tests/e2e/metrics_test.go @@ -49,6 +49,7 @@ func metricsTest(cx ctlCtx) { {"/metrics", fmt.Sprintf("etcd_mvcc_delete_total 3")}, {"/metrics", fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version)}, {"/metrics", fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version))}, + {"/metrics", fmt.Sprintf(`grpc_server_handled_total{grpc_code="Canceled",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} 6`)}, {"/health", `{"health":"true","reason":""}`}, } { i++ @@ -58,7 +59,9 @@ func metricsTest(cx ctlCtx) { if err := ctlV3Del(cx, []string{fmt.Sprintf("%d", i)}, 1); err != nil { cx.t.Fatal(err) } - + if err := ctlV3Watch(cx, []string{"k", "--rev", "1"}, []kvExec{{key: "k", val: "v"}}...); err != nil { + cx.t.Fatal(err) + } if err := cURLGet(cx.epc, cURLReq{endpoint: test.endpoint, expected: test.expected, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil { cx.t.Fatalf("failed get with curl (%v)", err) }