diff --git a/gnmi_server/client_subscribe.go b/gnmi_server/client_subscribe.go index dec1a7211..087e02a35 100644 --- a/gnmi_server/client_subscribe.go +++ b/gnmi_server/client_subscribe.go @@ -6,8 +6,8 @@ import ( "net" "sync" - log "github.com/golang/glog" "github.com/Workiva/go-datastructures/queue" + log "github.com/golang/glog" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -118,19 +118,18 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { } var dc sdc.Client - if target == "OTHERS" { - dc, err = sdc.NewNonDbClient(paths, prefix) - } else if isTargetDb(target) == true { - dc, err = sdc.NewDbClient(paths, prefix) - } else { - /* For any other target or no target create new Transl Client. */ - dc, err = sdc.NewTranslClient(prefix, paths) - } - - if err != nil { - return grpc.Errorf(codes.NotFound, "%v", err) - } + if target == "OTHERS" { + dc, err = sdc.NewNonDbClient(paths, prefix) + } else if isTargetDb(target) == true { + dc, err = sdc.NewDbClient(paths, prefix) + } else { + /* For any other target or no target create new Transl Client. */ + dc, err = sdc.NewTranslClient(prefix, paths) + } + if err != nil { + return grpc.Errorf(codes.NotFound, "%v", err) + } switch mode := c.subscribe.GetMode(); mode { case gnmipb.SubscriptionList_STREAM: @@ -198,6 +197,15 @@ func (c *Client) recv(stream gnmipb.GNMI_SubscribeServer) { return case io.EOF: log.V(1).Infof("Client %s received io.EOF", c) + if c.subscribe.Mode == gnmipb.SubscriptionList_STREAM { + // The client->server could be closed after the sending the subscription list. + // EOF is not a indication of client is not listening. + // Instead stream.Context() which is signaled once the underlying connection is terminated. + log.V(1).Infof("Waiting for client '%s'", c) + // This context is done when the client connection is terminated. + <-stream.Context().Done() + log.V(1).Infof("Client is done '%s'", c) + } return case nil: }