Skip to content

Commit

Permalink
Merge pull request #12448 from agargi/introduce_config_parameter
Browse files Browse the repository at this point in the history
server: Added config parameter experimental-apply-warning-duration
  • Loading branch information
jingyih committed Nov 18, 2020
2 parents 06e48f0 + c1c681a commit c11ddc6
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 17 deletions.
9 changes: 7 additions & 2 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
DefaultMaxSnapshots = 5
DefaultMaxWALs = 5
DefaultMaxTxnOps = uint(128)
DefaultWarningApplyDuration = 100 * time.Millisecond
DefaultMaxRequestBytes = 1.5 * 1024 * 1024
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
Expand Down Expand Up @@ -289,6 +290,9 @@ type Config struct {
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
// takes more time than this value.
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`

// ForceNewCluster starts a new cluster even if previously started; unsafe.
ForceNewCluster bool `json:"force-new-cluster"`
Expand Down Expand Up @@ -382,8 +386,9 @@ func NewConfig() *Config {
SnapshotCount: etcdserver.DefaultSnapshotCount,
SnapshotCatchUpEntries: etcdserver.DefaultSnapshotCatchUpEntries,

MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
MaxTxnOps: DefaultMaxTxnOps,
MaxRequestBytes: DefaultMaxRequestBytes,
ExperimentalWarningApplyDuration: DefaultWarningApplyDuration,

GRPCKeepAliveMinTime: DefaultGRPCKeepAliveMinTime,
GRPCKeepAliveInterval: DefaultGRPCKeepAliveInterval,
Expand Down
1 change: 1 addition & 0 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
}
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
Expand Down
1 change: 1 addition & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func newConfig() *config {
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")

// unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ Experimental feature:
Skip verification of SAN field in client certificate for peer connections.
--experimental-watch-progress-notify-interval '10m'
Duration of periodical watch progress notification.
--experimental-warning-apply-duration '100ms'
Warning is generated if requests take more than this duration.
Unsafe feature:
--force-new-cluster 'false'
Expand Down
5 changes: 2 additions & 3 deletions server/etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ import (
)

const (
warnApplyDuration = 100 * time.Millisecond
v3Version = "v3"
v3Version = "v3"
)

type applyResult struct {
Expand Down Expand Up @@ -137,7 +136,7 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
defer func(start time.Time) {
success := ar.err == nil || ar.err == mvcc.ErrCompacted
applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
warnOfExpensiveRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
warnOfExpensiveRequest(a.s.getLogger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
if !success {
warnOfFailedRequest(a.s.getLogger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
}
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/apply_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *EtcdServer) applyV2Request(r *RequestV2) (resp Response) {
defer func(start time.Time) {
success := resp.Err == nil
applySec.WithLabelValues(v2Version, r.Method, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
warnOfExpensiveRequest(s.getLogger(), start, stringer, nil, nil)
warnOfExpensiveRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil)
}(time.Now())

switch r.Method {
Expand Down
2 changes: 2 additions & 0 deletions server/etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ type ServerConfig struct {
// MaxRequestBytes is the maximum request size to send over raft.
MaxRequestBytes uint

WarningApplyDuration time.Duration

StrictReconfigCheck bool

// ClientCertAuthEnabled is true when cert has been signed by the client CA.
Expand Down
19 changes: 10 additions & 9 deletions server/etcdserver/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ func (nc *notifier) notify(err error) {
close(nc.c)
}

func warnOfExpensiveRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
func warnOfExpensiveRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
var resp string
if !isNil(respMsg) {
resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
}
warnOfExpensiveGenericRequest(lg, now, reqStringer, "", resp, err)
warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "", resp, err)
}

func warnOfFailedRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
Expand All @@ -126,7 +126,7 @@ func warnOfFailedRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer
)
}

func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
reqStringer := pb.NewLoggableTxnRequest(r)
var resp string
if !isNil(txnResponse) {
Expand All @@ -141,24 +141,25 @@ func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnR
}
resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
}
warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only txn ", resp, err)
warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "read-only txn ", resp, err)
}

func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
var resp string
if !isNil(rangeResponse) {
resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
}
warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err)
warnOfExpensiveGenericRequest(lg, warningApplyDuration, now, reqStringer, "read-only range ", resp, err)
}

func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
func warnOfExpensiveGenericRequest(lg *zap.Logger, warningApplyDuration time.Duration, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
d := time.Since(now)
if d > warnApplyDuration {

if d > warningApplyDuration {
lg.Warn(
"apply request took too long",
zap.Duration("took", d),
zap.Duration("expected-duration", warnApplyDuration),
zap.Duration("expected-duration", warningApplyDuration),
zap.String("prefix", prefix),
zap.String("request", reqStringer.String()),
zap.String("response", resp),
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
var resp *pb.RangeResponse
var err error
defer func(start time.Time) {
warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), start, r, resp, err)
warnOfExpensiveReadOnlyRangeRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
if resp != nil {
trace.AddField(
traceutil.Field{Key: "response_count", Value: len(resp.Kvs)},
Expand Down Expand Up @@ -169,7 +169,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
}

defer func(start time.Time) {
warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), start, r, resp, err)
warnOfExpensiveReadOnlyTxnRequest(s.getLogger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
trace.LogIfLong(traceThreshold)
}(time.Now())

Expand Down

0 comments on commit c11ddc6

Please sign in to comment.