diff --git a/etcdserver/api/v3rpc/grpc.go b/etcdserver/api/v3rpc/grpc.go index fad1f43d6dd..4e9297a71dc 100644 --- a/etcdserver/api/v3rpc/grpc.go +++ b/etcdserver/api/v3rpc/grpc.go @@ -29,7 +29,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config) *grpc.Server { opts = append(opts, grpc.Creds(credentials.NewTLS(tls))) } opts = append(opts, grpc.UnaryInterceptor(newUnaryInterceptor(s))) - opts = append(opts, grpc.StreamInterceptor(metricsStreamInterceptor)) + opts = append(opts, grpc.StreamInterceptor(newStreamInterceptor(s))) grpcServer := grpc.NewServer(opts...) pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s)) diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index 6872bfd1108..7c2e33f0e82 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -16,6 +16,7 @@ package v3rpc import ( "strings" + "sync" "time" "github.com/coreos/etcd/etcdserver" @@ -28,6 +29,15 @@ import ( "google.golang.org/grpc/metadata" ) +const ( + maxNoLeaderCnt = 3 +) + +type streamsMap struct { + mu sync.Mutex + streams map[grpc.ServerStream]struct{} +} + func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { md, ok := metadata.FromContext(ctx) @@ -42,6 +52,37 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { } } +func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor { + smap := monitorLeader(s) + + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + md, ok := metadata.FromContext(ss.Context()) + if ok { + if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader { + if s.Leader() == types.ID(raft.None) { + return rpctypes.ErrGRPCNoLeader + } + + cctx, cancel := context.WithCancel(ss.Context()) + ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss} + + smap.mu.Lock() + smap.streams[ss] = struct{}{} + smap.mu.Unlock() + + defer func() { + smap.mu.Lock() + delete(smap.streams, ss) + smap.mu.Unlock() + cancel() + }() + + } + } + return metricsStreamInterceptor(srv, ss, info, handler) + } +} + func metricsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { service, method := splitMethodName(info.FullMethod) receivedCounter.WithLabelValues(service, method).Inc() @@ -75,3 +116,52 @@ func splitMethodName(fullMethodName string) (string, string) { } return "unknown", "unknown" } + +type serverStreamWithCtx struct { + grpc.ServerStream + ctx context.Context + cancel *context.CancelFunc +} + +func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx } + +func monitorLeader(s *etcdserver.EtcdServer) *streamsMap { + smap := &streamsMap{ + streams: make(map[grpc.ServerStream]struct{}), + } + + go func() { + election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond + noLeaderCnt := 0 + + for { + select { + case <-s.StopNotify(): + return + case <-time.After(election): + if s.Leader() == types.ID(raft.None) { + noLeaderCnt++ + } else { + noLeaderCnt = 0 + } + + // We are more conservative on canceling existing streams. Reconnecting streams + // cost much more than just rejecting new requests. So we wait until the member + // cannot find a leader for maxNoLeaderCnt election timeouts to cancel existing streams. + if noLeaderCnt >= maxNoLeaderCnt { + smap.mu.Lock() + for ss := range smap.streams { + if ssWithCtx, ok := ss.(serverStreamWithCtx); ok { + (*ssWithCtx.cancel)() + <-ss.Context().Done() + } + } + smap.streams = make(map[grpc.ServerStream]struct{}) + smap.mu.Unlock() + } + } + } + }() + + return smap +} diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 418e69a784f..686b1900a89 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -19,7 +19,10 @@ import ( "sync" "time" + "golang.org/x/net/context" + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/mvcc" "github.com/coreos/etcd/mvcc/mvccpb" @@ -105,10 +108,24 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { progress: make(map[mvcc.WatchID]bool), closec: make(chan struct{}), } - defer sws.close() go sws.sendLoop() - return sws.recvLoop() + errc := make(chan error, 1) + go func() { + errc <- sws.recvLoop() + sws.close() + }() + select { + case err := <-errc: + return err + case <-stream.Context().Done(): + err := stream.Context().Err() + // the only server-side cancellation is noleader for now. + if err == context.Canceled { + return rpctypes.ErrGRPCNoLeader + } + return err + } } func (sws *serverWatchStream) recvLoop() error { diff --git a/etcdserver/quota.go b/etcdserver/quota.go index 872bf5ebe81..508ce36c4f7 100644 --- a/etcdserver/quota.go +++ b/etcdserver/quota.go @@ -50,20 +50,20 @@ const ( ) func NewBackendQuota(s *EtcdServer) Quota { - if s.cfg.QuotaBackendBytes < 0 { + if s.Cfg.QuotaBackendBytes < 0 { // disable quotas if negative plog.Warningf("disabling backend quota") return &passthroughQuota{} } - if s.cfg.QuotaBackendBytes == 0 { + if s.Cfg.QuotaBackendBytes == 0 { // use default size if no quota size given return &backendQuota{s, backend.DefaultQuotaBytes} } - if s.cfg.QuotaBackendBytes > backend.MaxQuotaBytes { - plog.Warningf("backend quota %v exceeds maximum quota %v; using maximum", s.cfg.QuotaBackendBytes, backend.MaxQuotaBytes) + if s.Cfg.QuotaBackendBytes > backend.MaxQuotaBytes { + plog.Warningf("backend quota %v exceeds maximum quota %v; using maximum", s.Cfg.QuotaBackendBytes, backend.MaxQuotaBytes) return &backendQuota{s, backend.MaxQuotaBytes} } - return &backendQuota{s, s.cfg.QuotaBackendBytes} + return &backendQuota{s, s.Cfg.QuotaBackendBytes} } func (b *backendQuota) Available(v interface{}) bool { diff --git a/etcdserver/raft.go b/etcdserver/raft.go index b3722dd1791..4ada5303a47 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -134,8 +134,8 @@ func (r *raftNode) start(s *EtcdServer) { r.done = make(chan struct{}) heartbeat := 200 * time.Millisecond - if s.cfg != nil { - heartbeat = time.Duration(s.cfg.TickMs) * time.Millisecond + if s.Cfg != nil { + heartbeat = time.Duration(s.Cfg.TickMs) * time.Millisecond } // set up contention detectors for raft heartbeat message. // expect to send a heartbeat within 2 heartbeat intervals. @@ -173,7 +173,7 @@ func (r *raftNode) start(s *EtcdServer) { // it promotes or demotes instead of modifying server directly. syncC = r.s.SyncTicker if r.s.lessor != nil { - r.s.lessor.Promote(r.s.cfg.electionTimeout()) + r.s.lessor.Promote(r.s.Cfg.electionTimeout()) } // TODO: remove the nil checking // current test utility does not provide the stats @@ -238,7 +238,7 @@ func (r *raftNode) start(s *EtcdServer) { raftDone <- struct{}{} r.Advance() case <-syncC: - r.s.sync(r.s.cfg.ReqTimeout()) + r.s.sync(r.s.Cfg.ReqTimeout()) case <-r.stopped: return } diff --git a/etcdserver/server.go b/etcdserver/server.go index 4b413d392bc..a6aa442d872 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -162,11 +162,11 @@ type EtcdServer struct { // count the number of inflight snapshots. // MUST use atomic operation to access this field. inflightSnapshots int64 + Cfg *ServerConfig readych chan struct{} r raftNode - cfg *ServerConfig snapCount uint64 w wait.Wait @@ -369,7 +369,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { srv = &EtcdServer{ readych: make(chan struct{}), - cfg: cfg, + Cfg: cfg, snapCount: cfg.SnapCount, errorc: make(chan error, 1), store: st, @@ -444,7 +444,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { // It also starts a goroutine to publish its server information. func (s *EtcdServer) Start() { s.start() - go s.publish(s.cfg.ReqTimeout()) + go s.publish(s.Cfg.ReqTimeout()) go s.purgeFile() go monitorFileDescriptor(s.done) go s.monitorVersions() @@ -473,11 +473,11 @@ func (s *EtcdServer) start() { func (s *EtcdServer) purgeFile() { var serrc, werrc <-chan error - if s.cfg.MaxSnapFiles > 0 { - serrc = fileutil.PurgeFile(s.cfg.SnapDir(), "snap", s.cfg.MaxSnapFiles, purgeFileInterval, s.done) + if s.Cfg.MaxSnapFiles > 0 { + serrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done) } - if s.cfg.MaxWALFiles > 0 { - werrc = fileutil.PurgeFile(s.cfg.WALDir(), "wal", s.cfg.MaxWALFiles, purgeFileInterval, s.done) + if s.Cfg.MaxWALFiles > 0 { + werrc = fileutil.PurgeFile(s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.done) } select { case e := <-werrc: @@ -623,7 +623,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { plog.Panicf("get database snapshot file path error: %v", err) } - fn := path.Join(s.cfg.SnapDir(), databaseFilename) + fn := path.Join(s.Cfg.SnapDir(), databaseFilename) if err := os.Rename(snapfn, fn); err != nil { plog.Panicf("rename snapshot file error: %v", err) } @@ -764,7 +764,7 @@ func (s *EtcdServer) LeaderStats() []byte { func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() } func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) error { - if s.cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() { + if s.Cfg.StrictReconfigCheck && !s.cluster.IsReadyToAddNewMember() { // If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd. // In such a case adding a new member is allowed unconditionally return ErrNotEnoughStartedMembers @@ -784,7 +784,7 @@ func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) erro } func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) error { - if s.cfg.StrictReconfigCheck && !s.cluster.IsReadyToRemoveMember(id) { + if s.Cfg.StrictReconfigCheck && !s.cluster.IsReadyToRemoveMember(id) { // If s.cfg.StrictReconfigCheck is false, it means the option --strict-reconfig-check isn't passed to etcd. // In such a case removing a member is allowed unconditionally return ErrNotEnoughStartedMembers @@ -823,7 +823,7 @@ func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.r.lead) } func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) } -func (s *EtcdServer) IsPprofEnabled() bool { return s.cfg.EnablePprof } +func (s *EtcdServer) IsPprofEnabled() bool { return s.Cfg.EnablePprof } // configure sends a configuration change through consensus and // then waits for it to be applied to the server. It @@ -939,7 +939,7 @@ func (s *EtcdServer) send(ms []raftpb.Message) { ok, exceed := s.r.td.Observe(ms[i].To) if !ok { // TODO: limit request rate. - plog.Warningf("failed to send out heartbeat on time (exceeded the %dms timeout for %v)", s.cfg.TickMs, exceed) + plog.Warningf("failed to send out heartbeat on time (exceeded the %dms timeout for %v)", s.Cfg.TickMs, exceed) plog.Warningf("server is likely overloaded") } } @@ -1221,7 +1221,7 @@ func (s *EtcdServer) updateClusterVersion(ver string) { Path: membership.StoreClusterVersionKey(), Val: ver, } - ctx, cancel := context.WithTimeout(context.Background(), s.cfg.ReqTimeout()) + ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) _, err := s.Do(ctx, req) cancel() switch err { @@ -1241,7 +1241,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { return ErrCanceled case context.DeadlineExceeded: curLeadElected := s.r.leadElectedTime() - prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond) + prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond) if start.After(prevLeadLost) && start.Before(curLeadElected) { return ErrTimeoutDueToLeaderFail } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 3295917cb87..7a1c09d87cd 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -174,7 +174,7 @@ func TestApplyRepeat(t *testing.T) { storage: mockstorage.NewStorageRecorder(""), transport: rafthttp.NewNopTransporter(), }, - cfg: &ServerConfig{}, + Cfg: &ServerConfig{}, store: st, cluster: cl, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -525,7 +525,7 @@ func TestApplyConfChangeError(t *testing.T) { srv := &EtcdServer{ r: raftNode{Node: n}, cluster: cl, - cfg: &ServerConfig{}, + Cfg: &ServerConfig{}, } _, err := srv.applyConfChange(tt.cc, nil) if err != tt.werr { @@ -629,7 +629,7 @@ func TestDoProposal(t *testing.T) { for i, tt := range tests { st := mockstore.NewRecorder() srv := &EtcdServer{ - cfg: &ServerConfig{TickMs: 1}, + Cfg: &ServerConfig{TickMs: 1}, r: raftNode{ Node: newNodeCommitter(), storage: mockstorage.NewStorageRecorder(""), @@ -661,7 +661,7 @@ func TestDoProposal(t *testing.T) { func TestDoProposalCancelled(t *testing.T) { wt := mockwait.NewRecorder() srv := &EtcdServer{ - cfg: &ServerConfig{TickMs: 1}, + Cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: newNodeNop()}, w: wt, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -683,7 +683,7 @@ func TestDoProposalCancelled(t *testing.T) { func TestDoProposalTimeout(t *testing.T) { srv := &EtcdServer{ - cfg: &ServerConfig{TickMs: 1}, + Cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: newNodeNop()}, w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -699,7 +699,7 @@ func TestDoProposalTimeout(t *testing.T) { func TestDoProposalStopped(t *testing.T) { srv := &EtcdServer{ - cfg: &ServerConfig{TickMs: 1}, + Cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: newNodeNop()}, w: mockwait.NewNop(), reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -789,7 +789,7 @@ func TestSyncTrigger(t *testing.T) { n := newReadyNode() st := make(chan time.Time, 1) srv := &EtcdServer{ - cfg: &ServerConfig{TickMs: 1}, + Cfg: &ServerConfig{TickMs: 1}, r: raftNode{ Node: n, raftStorage: raft.NewMemoryStorage(), @@ -847,7 +847,7 @@ func TestSnapshot(t *testing.T) { st := mockstore.NewRecorder() p := mockstorage.NewStorageRecorder("") srv := &EtcdServer{ - cfg: &ServerConfig{}, + Cfg: &ServerConfig{}, r: raftNode{ Node: newNodeNop(), raftStorage: s, @@ -889,7 +889,7 @@ func TestTriggerSnap(t *testing.T) { st := mockstore.NewRecorder() p := mockstorage.NewStorageRecorderStream("") srv := &EtcdServer{ - cfg: &ServerConfig{TickMs: 1}, + Cfg: &ServerConfig{TickMs: 1}, snapCount: uint64(snapc), r: raftNode{ Node: newNodeCommitter(), @@ -955,7 +955,7 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { rs := raft.NewMemoryStorage() tr, snapDoneC := rafthttp.NewSnapTransporter(testdir) s := &EtcdServer{ - cfg: &ServerConfig{ + Cfg: &ServerConfig{ DataDir: testdir, }, r: raftNode{ @@ -1045,7 +1045,7 @@ func TestAddMember(t *testing.T) { storage: mockstorage.NewStorageRecorder(""), transport: rafthttp.NewNopTransporter(), }, - cfg: &ServerConfig{}, + Cfg: &ServerConfig{}, store: st, cluster: cl, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -1085,7 +1085,7 @@ func TestRemoveMember(t *testing.T) { storage: mockstorage.NewStorageRecorder(""), transport: rafthttp.NewNopTransporter(), }, - cfg: &ServerConfig{}, + Cfg: &ServerConfig{}, store: st, cluster: cl, reqIDGen: idutil.NewGenerator(0, time.Time{}), @@ -1156,7 +1156,7 @@ func TestPublish(t *testing.T) { w := wait.NewWithResponse(ch) srv := &EtcdServer{ readych: make(chan struct{}), - cfg: &ServerConfig{TickMs: 1}, + Cfg: &ServerConfig{TickMs: 1}, id: 1, r: raftNode{Node: n}, attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, @@ -1197,7 +1197,7 @@ func TestPublish(t *testing.T) { // TestPublishStopped tests that publish will be stopped if server is stopped. func TestPublishStopped(t *testing.T) { srv := &EtcdServer{ - cfg: &ServerConfig{TickMs: 1}, + Cfg: &ServerConfig{TickMs: 1}, r: raftNode{ Node: newNodeNop(), transport: rafthttp.NewNopTransporter(), @@ -1216,7 +1216,7 @@ func TestPublishStopped(t *testing.T) { func TestPublishRetry(t *testing.T) { n := newNodeRecorder() srv := &EtcdServer{ - cfg: &ServerConfig{TickMs: 1}, + Cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: n}, w: mockwait.NewNop(), done: make(chan struct{}), @@ -1241,7 +1241,7 @@ func TestUpdateVersion(t *testing.T) { w := wait.NewWithResponse(ch) srv := &EtcdServer{ id: 1, - cfg: &ServerConfig{TickMs: 1}, + Cfg: &ServerConfig{TickMs: 1}, r: raftNode{Node: n}, attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, cluster: &membership.RaftCluster{}, diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index df4d5244480..199f26a55a9 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -179,7 +179,7 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) { leader := s.cluster.Member(s.Leader()) for i := 0; i < 5 && leader == nil; i++ { // wait an election - dur := time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond + dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond select { case <-time.After(dur): leader = s.cluster.Member(s.Leader()) @@ -193,7 +193,7 @@ func (s *EtcdServer) LeaseRenew(id lease.LeaseID) (int64, error) { for _, url := range leader.PeerURLs { lurl := url + "/leases" - ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.cfg.peerDialTimeout()) + ttl, err = leasehttp.RenewHTTP(id, lurl, s.peerRt, s.Cfg.peerDialTimeout()) if err == nil { break } diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 95f2c24a917..c8d5ab02eee 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -979,6 +979,8 @@ func TestTLSGRPCAcceptSecureAll(t *testing.T) { } func TestGRPCRequireLeader(t *testing.T) { + t.Parallel() + defer testutil.AfterTest(t) cfg := ClusterConfig{Size: 3} @@ -1004,3 +1006,67 @@ func TestGRPCRequireLeader(t *testing.T) { t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader) } } + +func TestGRPCStreamRequireLeader(t *testing.T) { + t.Parallel() + + defer testutil.AfterTest(t) + + cfg := ClusterConfig{Size: 3} + clus := newClusterV3NoClients(t, &cfg) + defer clus.Terminate(t) + + client, err := NewClientV3(clus.Members[0]) + if err != nil { + t.Fatalf("failed to create client (%v)", err) + } + defer client.Close() + + wAPI := toGRPC(client).Watch + md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) + ctx := metadata.NewContext(context.Background(), md) + wStream, err := wAPI.Watch(ctx) + if err != nil { + t.Fatalf("wAPI.Watch error: %v", err) + } + + clus.Members[1].Stop(t) + clus.Members[2].Stop(t) + + // existing stream should be rejected + _, err = wStream.Recv() + if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() { + t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader) + } + + // new stream should also be rejected + wStream, err = wAPI.Watch(ctx) + if err != nil { + t.Fatalf("wAPI.Watch error: %v", err) + } + _, err = wStream.Recv() + if grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() { + t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader) + } + + clus.Members[1].Restart(t) + clus.Members[2].Restart(t) + + clus.waitLeader(t, clus.Members) + time.Sleep(time.Duration(2*electionTicks) * tickDuration) + + // new stream should also be OK now after we restarted the other members + wStream, err = wAPI.Watch(ctx) + if err != nil { + t.Fatalf("wAPI.Watch error: %v", err) + } + wreq := &pb.WatchRequest{ + RequestUnion: &pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}, + }, + } + err = wStream.Send(wreq) + if err != nil { + t.Errorf("err = %v, want nil", err) + } +}