diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index 29aef2914a5..3ea9fad8481 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -24,10 +24,13 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/pkg/capnslog" prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" ) const ( @@ -40,7 +43,7 @@ type streamsMap struct { } func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { if !api.IsCapabilityEnabled(api.V3rpcCapability) { return nil, rpctypes.ErrGRPCNotCapable } @@ -54,10 +57,116 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { } } - return prometheus.UnaryServerInterceptor(ctx, req, info, handler) + return logUnaryInterceptor(ctx, req, info, handler) + // interceptors are chained manually during backporting, for better readability refer to PR #9990 } } +// logUnaryInterceptor is a gRPC server-side interceptor that prints info on incoming requests for debugging purpose +func logUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + startTime := time.Now() + resp, err := prometheus.UnaryServerInterceptor(ctx, req, info, handler) + // interceptors are chained manually during backporting, for better readability refer to PR #9990 + if plog.LevelAt(capnslog.DEBUG) { + defer logUnaryRequestStats(ctx, info, startTime, req, resp) + } + return resp, err +} + +func logUnaryRequestStats(ctx context.Context, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) { + duration := time.Since(startTime) + remote := "No remote client info." + peerInfo, ok := peer.FromContext(ctx) + if ok { + remote = peerInfo.Addr.String() + } + var responseType string = info.FullMethod + var reqCount, respCount int64 + var reqSize, respSize int + var reqContent string + switch _resp := resp.(type) { + case *pb.RangeResponse: + _req, ok := req.(*pb.RangeRequest) + if ok { + reqCount = 0 + reqSize = _req.Size() + reqContent = _req.String() + } + if _resp != nil { + respCount = _resp.Count + respSize = _resp.Size() + } + case *pb.PutResponse: + _req, ok := req.(*pb.PutRequest) + if ok { + reqCount = 1 + reqSize = _req.Size() + reqContent = pb.NewLoggablePutRequest(_req).String() + // redact value field from request content, see PR #9821 + } + if _resp != nil { + respCount = 0 + respSize = _resp.Size() + } + case *pb.DeleteRangeResponse: + _req, ok := req.(*pb.DeleteRangeRequest) + if ok { + reqCount = 0 + reqSize = _req.Size() + reqContent = _req.String() + } + if _resp != nil { + respCount = _resp.Deleted + respSize = _resp.Size() + } + case *pb.TxnResponse: + _req, ok := req.(*pb.TxnRequest) + if ok && _resp != nil { + if _resp.Succeeded { // determine the 'actual' count and size of request based on success or failure + reqCount = int64(len(_req.GetSuccess())) + reqSize = 0 + for _, r := range _req.GetSuccess() { + reqSize += r.Size() + } + } else { + reqCount = int64(len(_req.GetFailure())) + reqSize = 0 + for _, r := range _req.GetFailure() { + reqSize += r.Size() + } + } + reqContent = pb.NewLoggableTxnRequest(_req).String() + // redact value field from request content, see PR #9821 + } + if _resp != nil { + respCount = 0 + respSize = _resp.Size() + } + default: + reqCount = -1 + reqSize = -1 + respCount = -1 + respSize = -1 + } + + logGenericRequestStats(startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent) +} + +func logGenericRequestStats(startTime time.Time, duration time.Duration, remote string, responseType string, + reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) { + plog.Debugf("start time = %v, "+ + "time spent = %v, "+ + "remote = %s, "+ + "response type = %s, "+ + "request count = %d, "+ + "request size = %d, "+ + "response count = %d, "+ + "response size = %d, "+ + "request content = %s", + startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent, + ) +} + func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor { smap := monitorLeader(s) diff --git a/etcdserver/etcdserverpb/raft_internal_stringer.go b/etcdserver/etcdserverpb/raft_internal_stringer.go index 066a5a0118e..07800a186ba 100644 --- a/etcdserver/etcdserverpb/raft_internal_stringer.go +++ b/etcdserver/etcdserverpb/raft_internal_stringer.go @@ -59,7 +59,7 @@ func (as *InternalRaftStringer) String() string { case as.Request.Put != nil: return fmt.Sprintf("header:<%s> put:<%s>", as.Request.Header.String(), - newLoggablePutRequest(as.Request.Put).String(), + NewLoggablePutRequest(as.Request.Put).String(), ) case as.Request.Txn != nil: return fmt.Sprintf("header:<%s> txn:<%s>", @@ -121,7 +121,7 @@ func newLoggableRequestOp(op *RequestOp) *requestOpStringer { func (as *requestOpStringer) String() string { switch op := as.Op.Request.(type) { case *RequestOp_RequestPut: - return fmt.Sprintf("request_put:<%s>", newLoggablePutRequest(op.RequestPut).String()) + return fmt.Sprintf("request_put:<%s>", NewLoggablePutRequest(op.RequestPut).String()) default: // nothing to redact } @@ -161,7 +161,7 @@ type loggablePutRequest struct { PrevKv bool `protobuf:"varint,4,opt,name=prev_kv,proto3"` } -func newLoggablePutRequest(request *PutRequest) *loggablePutRequest { +func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest { return &loggablePutRequest{ request.Key, len(request.Value),