diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 4c84a9b0196..d548fac9172 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -61,10 +61,10 @@ type applierV3Internal interface { type applierV3 interface { Apply(r *pb.InternalRaftRequest) *applyResult - Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) + Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) - Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) + Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) @@ -142,11 +142,11 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { case r.Range != nil: ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range) case r.Put != nil: - ar.resp, ar.trace, ar.err = a.s.applyV3.Put(nil, r.Put) + ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put) case r.DeleteRange != nil: ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange) case r.Txn != nil: - ar.resp, ar.err = a.s.applyV3.Txn(r.Txn) + ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn) case r.Compaction != nil: ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction) case r.LeaseGrant != nil: @@ -201,14 +201,18 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { return ar } -func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { +func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) { resp = &pb.PutResponse{} resp.Header = &pb.ResponseHeader{} - trace = traceutil.New("put", - a.s.getLogger(), - traceutil.Field{Key: "key", Value: string(p.Key)}, - traceutil.Field{Key: "req_size", Value: proto.Size(p)}, - ) + trace = traceutil.Get(ctx) + // create put tracing if the trace in context is empty + if trace.IsEmpty() { + trace = traceutil.New("put", + a.s.getLogger(), + traceutil.Field{Key: "key", Value: string(p.Key)}, + traceutil.Field{Key: "req_size", Value: proto.Size(p)}, + ) + } val, leaseID := p.Value, lease.LeaseID(p.Lease) if txn == nil { if leaseID != lease.NoLease { @@ -222,13 +226,13 @@ func (a *applierV3backend) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.Pu var rr *mvcc.RangeResult if p.IgnoreValue || p.IgnoreLease || p.PrevKv { - trace.DisableStep() - rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{}) + trace.StepWithFunction(func() { + rr, err = txn.Range(p.Key, nil, mvcc.RangeOptions{}) + }, "get previous kv pair") + if err != nil { return nil, nil, err } - trace.EnableStep() - trace.Step("get previous kv pair") } if p.IgnoreValue || p.IgnoreLease { if rr == nil || len(rr.KVs) == 0 { @@ -378,22 +382,35 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra return resp, nil } -func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { +func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { + trace := traceutil.Get(ctx) + if trace.IsEmpty() { + trace = traceutil.New("transaction", a.s.getLogger()) + ctx = context.WithValue(ctx, traceutil.TraceKey, trace) + } isWrite := !isTxnReadonly(rt) - txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(traceutil.TODO())) + txn := mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(trace)) + + var txnPath []bool + trace.StepWithFunction( + func() { + txnPath = compareToPath(txn, rt) + }, + "compare", + ) - txnPath := compareToPath(txn, rt) if isWrite { + trace.AddField(traceutil.Field{Key: "read_only", Value: false}) if _, err := checkRequests(txn, rt, txnPath, a.checkPut); err != nil { txn.End() - return nil, err + return nil, nil, err } } if _, err := checkRequests(txn, rt, txnPath, a.checkRange); err != nil { txn.End() - return nil, err + return nil, nil, err } - + trace.Step("check requests") txnResp, _ := newTxnResp(rt, txnPath) // When executing mutable txn ops, etcd must hold the txn lock so @@ -402,9 +419,9 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { // be the revision of the write txn. if isWrite { txn.End() - txn = a.s.KV().Write(traceutil.TODO()) + txn = a.s.KV().Write(trace) } - a.applyTxn(txn, rt, txnPath, txnResp) + a.applyTxn(ctx, txn, rt, txnPath, txnResp) rev := txn.Rev() if len(txn.Changes()) != 0 { rev++ @@ -412,7 +429,11 @@ func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { txn.End() txnResp.Header.Revision = rev - return txnResp, nil + trace.AddField( + traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)}, + traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision}, + ) + return txnResp, trace, nil } // newTxnResp allocates a txn response for a txn request given a path. @@ -543,7 +564,8 @@ func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool { return true } -func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) { +func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) { + trace := traceutil.Get(ctx) reqs := rt.Success if !txnPath[0] { reqs = rt.Failure @@ -554,17 +576,27 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat respi := tresp.Responses[i].Response switch tv := req.Request.(type) { case *pb.RequestOp_RequestRange: - resp, err := a.Range(context.TODO(), txn, tv.RequestRange) + trace.StartSubTrace( + traceutil.Field{Key: "req_type", Value: "range"}, + traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)}, + traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)}) + resp, err := a.Range(ctx, txn, tv.RequestRange) if err != nil { lg.Panic("unexpected error during txn", zap.Error(err)) } respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp + trace.StopSubTrace() case *pb.RequestOp_RequestPut: - resp, _, err := a.Put(txn, tv.RequestPut) + trace.StartSubTrace( + traceutil.Field{Key: "req_type", Value: "put"}, + traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)}, + traceutil.Field{Key: "req_size", Value: proto.Size(tv.RequestPut)}) + resp, _, err := a.Put(ctx, txn, tv.RequestPut) if err != nil { lg.Panic("unexpected error during txn", zap.Error(err)) } respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp + trace.StopSubTrace() case *pb.RequestOp_RequestDeleteRange: resp, err := a.DeleteRange(txn, tv.RequestDeleteRange) if err != nil { @@ -573,7 +605,7 @@ func (a *applierV3backend) applyTxn(txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPat respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp case *pb.RequestOp_RequestTxn: resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn - applyTxns := a.applyTxn(txn, tv.RequestTxn, txnPath[1:], resp) + applyTxns := a.applyTxn(ctx, txn, tv.RequestTxn, txnPath[1:], resp) txns += applyTxns + 1 txnPath = txnPath[applyTxns+1:] default: @@ -689,15 +721,15 @@ type applierV3Capped struct { // with Puts so that the number of keys in the store is capped. func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} } -func (a *applierV3Capped) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { +func (a *applierV3Capped) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { return nil, nil, ErrNoSpace } -func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, error) { +func (a *applierV3Capped) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { if a.q.Cost(r) > 0 { - return nil, ErrNoSpace + return nil, nil, ErrNoSpace } - return a.applierV3.Txn(r) + return a.applierV3.Txn(ctx, r) } func (a *applierV3Capped) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { @@ -859,22 +891,22 @@ func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 { return "aApplierV3{app, NewBackendQuota(s, "v3-applier")} } -func (a *quotaApplierV3) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { +func (a *quotaApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { ok := a.q.Available(p) - resp, trace, err := a.applierV3.Put(txn, p) + resp, trace, err := a.applierV3.Put(ctx, txn, p) if err == nil && !ok { err = ErrNoSpace } return resp, trace, err } -func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { +func (a *quotaApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { ok := a.q.Available(rt) - resp, err := a.applierV3.Txn(rt) + resp, trace, err := a.applierV3.Txn(ctx, rt) if err == nil && !ok { err = ErrNoSpace } - return resp, err + return resp, trace, err } func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { diff --git a/etcdserver/apply_auth.go b/etcdserver/apply_auth.go index 4177bfa18f7..5741a9cae1e 100644 --- a/etcdserver/apply_auth.go +++ b/etcdserver/apply_auth.go @@ -63,7 +63,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest) *applyResult { return ret } -func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { +func (aa *authApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil { return nil, nil, err } @@ -82,7 +82,7 @@ func (aa *authApplierV3) Put(txn mvcc.TxnWrite, r *pb.PutRequest) (*pb.PutRespon return nil, nil, err } } - return aa.applierV3.Put(txn, r) + return aa.applierV3.Put(ctx, txn, r) } func (aa *authApplierV3) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) { @@ -161,11 +161,11 @@ func checkTxnAuth(as auth.AuthStore, ai *auth.AuthInfo, rt *pb.TxnRequest) error return checkTxnReqsPermission(as, ai, rt.Failure) } -func (aa *authApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { +func (aa *authApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { if err := checkTxnAuth(aa.as, &aa.authInfo, rt); err != nil { - return nil, err + return nil, nil, err } - return aa.applierV3.Txn(rt) + return aa.applierV3.Txn(ctx, rt) } func (aa *authApplierV3) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index 70d6d21996c..5435757d386 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -309,7 +309,7 @@ type applierV3Corrupt struct { func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} } -func (a *applierV3Corrupt) Put(txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { +func (a *applierV3Corrupt) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) { return nil, nil, ErrCorrupt } @@ -321,8 +321,8 @@ func (a *applierV3Corrupt) DeleteRange(txn mvcc.TxnWrite, p *pb.DeleteRangeReque return nil, ErrCorrupt } -func (a *applierV3Corrupt) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, error) { - return nil, ErrCorrupt +func (a *applierV3Corrupt) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { + return nil, nil, ErrCorrupt } func (a *applierV3Corrupt) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index df03a1342b6..3f80afb60bf 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -146,8 +146,14 @@ func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { if isTxnReadonly(r) { + trace := traceutil.New("transaction", + s.getLogger(), + traceutil.Field{Key: "read_only", Value: true}, + ) + ctx = context.WithValue(ctx, traceutil.TraceKey, trace) if !isTxnSerializable(r) { err := s.linearizableReadNotify(ctx) + trace.Step("agreement among raft nodes before linearized reading") if err != nil { return nil, err } @@ -160,15 +166,17 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse defer func(start time.Time) { warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), start, r, resp, err) + trace.LogIfLong(traceThreshold) }(time.Now()) - get := func() { resp, err = s.applyV3Base.Txn(r) } + get := func() { resp, _, err = s.applyV3Base.Txn(ctx, r) } if serr := s.doSerialize(ctx, chk, get); serr != nil { return nil, serr } return resp, err } + ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now()) resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r}) if err != nil { return nil, err diff --git a/pkg/traceutil/trace.go b/pkg/traceutil/trace.go index 2d247dd9acc..cbe2cfbf0a0 100644 --- a/pkg/traceutil/trace.go +++ b/pkg/traceutil/trace.go @@ -60,12 +60,15 @@ type Trace struct { startTime time.Time steps []step stepDisabled bool + isEmpty bool } type step struct { - time time.Time - msg string - fields []Field + time time.Time + msg string + fields []Field + isSubTraceStart bool + isSubTraceEnd bool } func New(op string, lg *zap.Logger, fields ...Field) *Trace { @@ -74,7 +77,7 @@ func New(op string, lg *zap.Logger, fields ...Field) *Trace { // TODO returns a non-nil, empty Trace func TODO() *Trace { - return &Trace{} + return &Trace{isEmpty: true} } func Get(ctx context.Context) *Trace { @@ -93,7 +96,7 @@ func (t *Trace) SetStartTime(time time.Time) { } func (t *Trace) InsertStep(at int, time time.Time, msg string, fields ...Field) { - newStep := step{time, msg, fields} + newStep := step{time: time, msg: msg, fields: fields} if at < len(t.steps) { t.steps = append(t.steps[:at+1], t.steps[at:]...) t.steps[at] = newStep @@ -102,6 +105,18 @@ func (t *Trace) InsertStep(at int, time time.Time, msg string, fields ...Field) } } +// StartSubTrace adds step to trace as a start sign of sublevel trace +// All steps in the subtrace will log out the input fields of this function +func (t *Trace) StartSubTrace(fields ...Field) { + t.steps = append(t.steps, step{fields: fields, isSubTraceStart: true}) +} + +// StopSubTrace adds step to trace as a end sign of sublevel trace +// All steps in the subtrace will log out the input fields of this function +func (t *Trace) StopSubTrace(fields ...Field) { + t.steps = append(t.steps, step{fields: fields, isSubTraceEnd: true}) +} + // Step adds step to trace func (t *Trace) Step(msg string, fields ...Field) { if !t.stepDisabled { @@ -109,22 +124,26 @@ func (t *Trace) Step(msg string, fields ...Field) { } } -// DisableStep sets the flag to prevent the trace from adding steps -func (t *Trace) DisableStep() { - t.stepDisabled = true -} - -// EnableStep re-enable the trace to add steps -func (t *Trace) EnableStep() { - t.stepDisabled = false +// StepWithFunction will measure the input function as a single step +func (t *Trace) StepWithFunction(f func(), msg string, fields ...Field) { + t.disableStep() + f() + t.enableStep() + t.Step(msg, fields...) } func (t *Trace) AddField(fields ...Field) { for _, f := range fields { - t.fields = append(t.fields, f) + if !t.updateFieldIfExist(f) { + t.fields = append(t.fields, f) + } } } +func (t *Trace) IsEmpty() bool { + return t.isEmpty +} + // Log dumps all steps in the Trace func (t *Trace) Log() { t.LogWithStepThreshold(0) @@ -154,7 +173,25 @@ func (t *Trace) logInfo(threshold time.Duration) (string, []zap.Field) { var steps []string lastStepTime := t.startTime - for _, step := range t.steps { + for i := 0; i < len(t.steps); i++ { + step := t.steps[i] + // add subtrace common fields which defined at the beginning to each sub-steps + if step.isSubTraceStart { + for j := i + 1; j < len(t.steps) && !t.steps[j].isSubTraceEnd; j++ { + t.steps[j].fields = append(step.fields, t.steps[j].fields...) + } + continue + } + // add subtrace common fields which defined at the end to each sub-steps + if step.isSubTraceEnd { + for j := i - 1; j >= 0 && !t.steps[j].isSubTraceStart; j-- { + t.steps[j].fields = append(step.fields, t.steps[j].fields...) + } + continue + } + } + for i := 0; i < len(t.steps); i++ { + step := t.steps[i] stepDuration := step.time.Sub(lastStepTime) if stepDuration > threshold { steps = append(steps, fmt.Sprintf("trace[%d] '%v' %s (duration: %v)", @@ -167,6 +204,27 @@ func (t *Trace) logInfo(threshold time.Duration) (string, []zap.Field) { zap.Duration("duration", totalDuration), zap.Time("start", t.startTime), zap.Time("end", endTime), - zap.Strings("steps", steps)} + zap.Strings("steps", steps), + zap.Int("step_count", len(steps))} return msg, fs } + +func (t *Trace) updateFieldIfExist(f Field) bool { + for i, v := range t.fields { + if v.Key == f.Key { + t.fields[i].Value = f.Value + return true + } + } + return false +} + +// disableStep sets the flag to prevent the trace from adding steps +func (t *Trace) disableStep() { + t.stepDisabled = true +} + +// enableStep re-enable the trace to add steps +func (t *Trace) enableStep() { + t.stepDisabled = false +} diff --git a/pkg/traceutil/trace_test.go b/pkg/traceutil/trace_test.go index 9b99288764f..d629d0b89c4 100644 --- a/pkg/traceutil/trace_test.go +++ b/pkg/traceutil/trace_test.go @@ -28,7 +28,7 @@ import ( ) func TestGet(t *testing.T) { - traceForTest := &Trace{operation: "test"} + traceForTest := &Trace{operation: "Test"} tests := []struct { name string inputCtx context.Context @@ -151,6 +151,51 @@ func TestLog(t *testing.T) { "msg1", "msg2", "traceKey1:traceValue1", "count:1", "stepKey1:stepValue1", "stepKey2:stepValue2", + "\"step_count\":2", + }, + }, + { + name: "When trace has subtrace", + trace: &Trace{ + operation: "Test", + startTime: time.Now().Add(-100 * time.Millisecond), + steps: []step{ + { + time: time.Now().Add(-80 * time.Millisecond), + msg: "msg1", + fields: []Field{{"stepKey1", "stepValue1"}}, + }, + { + fields: []Field{{"beginSubTrace", "true"}}, + isSubTraceStart: true, + }, + { + time: time.Now().Add(-50 * time.Millisecond), + msg: "submsg", + fields: []Field{{"subStepKey", "subStepValue"}}, + }, + { + fields: []Field{{"endSubTrace", "true"}}, + isSubTraceEnd: true, + }, + { + time: time.Now().Add(-30 * time.Millisecond), + msg: "msg2", + fields: []Field{{"stepKey2", "stepValue2"}}, + }, + }, + }, + fields: []Field{ + {"traceKey1", "traceValue1"}, + {"count", 1}, + }, + expectedMsg: []string{ + "Test", + "msg1", "msg2", "submsg", + "traceKey1:traceValue1", "count:1", + "stepKey1:stepValue1", "stepKey2:stepValue2", "subStepKey:subStepValue", + "beginSubTrace:true", "endSubTrace:true", + "\"step_count\":3", }, }, }