diff --git a/CHANGELOG/CHANGELOG-3.6.md b/CHANGELOG/CHANGELOG-3.6.md index f3c5635bfc9..0d8924bf245 100644 --- a/CHANGELOG/CHANGELOG-3.6.md +++ b/CHANGELOG/CHANGELOG-3.6.md @@ -67,6 +67,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0). - Add [Protection on maintenance request when auth is enabled](https://github.com/etcd-io/etcd/pull/14663). - Graduated [`--experimental-warning-unary-request-duration` to `--warning-unary-request-duration`](https://github.com/etcd-io/etcd/pull/14414). Note the experimental flag is deprecated and will be decommissioned in v3.7. - Add [field `hash_revision` into `HashKVResponse`](https://github.com/etcd-io/etcd/pull/14537). +- Add [`etcd --experimental-snapshot-catch-up-entries`](https://github.com/etcd-io/etcd/pull/15033) flag to configure number of entries for a slow follower to catch up after compacting the the raft storage entries and defaults to 5k. ### etcd grpc-proxy diff --git a/server/config/config.go b/server/config/config.go index 1570f1e7ec9..48de650b8a1 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -56,7 +56,6 @@ type ServerConfig struct { // We expect the follower has a millisecond level latency with the leader. // The max throughput is around 10K. Keep a 5K entries is enough for helping // follower to catch up. - // WARNING: only change this for tests. Always use "DefaultSnapshotCatchUpEntries" SnapshotCatchUpEntries uint64 MaxSnapFiles uint diff --git a/server/embed/config.go b/server/embed/config.go index a0824fcf5a1..2c563774292 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -156,9 +156,7 @@ type Config struct { // We expect the follower has a millisecond level latency with the leader. // The max throughput is around 10K. Keep a 5K entries is enough for helping // follower to catch up. - // WARNING: only change this for tests. - // Always use "DefaultSnapshotCatchUpEntries" - SnapshotCatchUpEntries uint64 + SnapshotCatchUpEntries uint64 `json:"experimental-snapshot-catch-up-entries"` MaxSnapFiles uint `json:"max-snapshots"` MaxWalFiles uint `json:"max-wals"` diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 8adc7521f21..951452e1222 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -279,6 +279,7 @@ func newConfig() *config { fs.UintVar(&cfg.ec.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.") fs.IntVar(&cfg.ec.ExperimentalMaxLearners, "experimental-max-learners", membership.DefaultMaxLearners, "Sets the maximum number of learners that can be available in the cluster membership.") fs.DurationVar(&cfg.ec.ExperimentalWaitClusterReadyTimeout, "experimental-wait-cluster-ready-timeout", cfg.ec.ExperimentalWaitClusterReadyTimeout, "Maximum duration to wait for the cluster to be ready.") + fs.Uint64Var(&cfg.ec.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.ec.SnapshotCatchUpEntries, "Number of entries for a slow follower to catch up after compacting the the raft storage entries.") // unsafe fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.") diff --git a/server/etcdmain/config_test.go b/server/etcdmain/config_test.go index f0ede206ead..1352a9ea5d5 100644 --- a/server/etcdmain/config_test.go +++ b/server/etcdmain/config_test.go @@ -34,6 +34,7 @@ func TestConfigParsingMemberFlags(t *testing.T) { "-max-wals=10", "-max-snapshots=10", "-snapshot-count=10", + "-experimental-snapshot-catchup-entries=1000", "-listen-peer-urls=http://localhost:8000,https://localhost:8001", "-listen-client-urls=http://localhost:7000,https://localhost:7001", // it should be set if -listen-client-urls is set @@ -51,20 +52,22 @@ func TestConfigParsingMemberFlags(t *testing.T) { func TestConfigFileMemberFields(t *testing.T) { yc := struct { - Dir string `json:"data-dir"` - MaxSnapFiles uint `json:"max-snapshots"` - MaxWalFiles uint `json:"max-wals"` - Name string `json:"name"` - SnapshotCount uint64 `json:"snapshot-count"` - LPUrls string `json:"listen-peer-urls"` - LCUrls string `json:"listen-client-urls"` - AcurlsCfgFile string `json:"advertise-client-urls"` + Dir string `json:"data-dir"` + MaxSnapFiles uint `json:"max-snapshots"` + MaxWalFiles uint `json:"max-wals"` + Name string `json:"name"` + SnapshotCount uint64 `json:"snapshot-count"` + SnapshotCatchUpEntries uint64 `json:"experimental-snapshot-catch-up-entries"` + LPUrls string `json:"listen-peer-urls"` + LCUrls string `json:"listen-client-urls"` + AcurlsCfgFile string `json:"advertise-client-urls"` }{ "testdir", 10, 10, "testname", 10, + 1000, "http://localhost:8000,https://localhost:8001", "http://localhost:7000,https://localhost:7001", "http://localhost:7000,https://localhost:7001", @@ -392,13 +395,14 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File { func validateMemberFlags(t *testing.T, cfg *config) { wcfg := &embed.Config{ - Dir: "testdir", - LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, - LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, - MaxSnapFiles: 10, - MaxWalFiles: 10, - Name: "testname", - SnapshotCount: 10, + Dir: "testdir", + LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}}, + LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}}, + MaxSnapFiles: 10, + MaxWalFiles: 10, + Name: "testname", + SnapshotCount: 10, + SnapshotCatchUpEntries: 1000, } if cfg.ec.Dir != wcfg.Dir { @@ -416,6 +420,9 @@ func validateMemberFlags(t *testing.T, cfg *config) { if cfg.ec.SnapshotCount != wcfg.SnapshotCount { t.Errorf("snapcount = %v, want %v", cfg.ec.SnapshotCount, wcfg.SnapshotCount) } + if cfg.ec.SnapshotCatchUpEntries != wcfg.SnapshotCatchUpEntries { + t.Errorf("snapshot catch up entries = %v, want %v", cfg.ec.SnapshotCatchUpEntries, wcfg.SnapshotCatchUpEntries) + } if !reflect.DeepEqual(cfg.ec.LPUrls, wcfg.LPUrls) { t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.LPUrls, wcfg.LPUrls) } diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index baef0095154..61e8d278c5b 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -269,6 +269,8 @@ Experimental feature: Set the max number of learner members allowed in the cluster membership. --experimental-wait-cluster-ready-timeout '5s' Set the maximum time duration to wait for the cluster to be ready. + --experimental-snapshot-catch-up-entries '5000' + Number of entries for a slow follower to catch up after compacting the the raft storage entries. Unsafe feature: --force-new-cluster 'false' diff --git a/tests/e2e/etcd_config_test.go b/tests/e2e/etcd_config_test.go index 2f6cc846fd5..c76d819e3de 100644 --- a/tests/e2e/etcd_config_test.go +++ b/tests/e2e/etcd_config_test.go @@ -20,10 +20,14 @@ import ( "os" "strings" "testing" + "time" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "go.etcd.io/etcd/pkg/v3/expect" + "go.etcd.io/etcd/tests/v3/framework/config" "go.etcd.io/etcd/tests/v3/framework/e2e" ) @@ -359,4 +363,64 @@ func TestBootstrapDefragFlag(t *testing.T) { if err = proc.Stop(); err != nil { t.Fatal(err) } + + // wait for the process to exit, otherwise test will have leaked goroutine + if err := proc.Close(); err != nil { + t.Logf("etcd process closed with error %v", err) + } +} + +func TestSnapshotCatchupEntriesFlag(t *testing.T) { + e2e.SkipInShortMode(t) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + proc, err := e2e.SpawnCmd([]string{e2e.BinPath.Etcd, "--experimental-snapshot-catchup-entries", "1000"}, nil) + require.NoError(t, err) + require.NoError(t, e2e.WaitReadyExpectProc(ctx, proc, []string{"\"snapshot-catchup-entries\":1000"})) + require.NoError(t, e2e.WaitReadyExpectProc(ctx, proc, []string{"serving client traffic"})) + require.NoError(t, proc.Stop()) + + // wait for the process to exit, otherwise test will have leaked goroutine + if err := proc.Close(); err != nil { + t.Logf("etcd process closed with error %v", err) + } +} + +// TestEtcdHealthyWithTinySnapshotCatchupEntries ensures multi-node etcd cluster remains healthy with 1 snapshot catch up entry +func TestEtcdHealthyWithTinySnapshotCatchupEntries(t *testing.T) { + e2e.BeforeTest(t) + epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, + e2e.WithClusterSize(3), + e2e.WithSnapshotCount(1), + e2e.WithSnapshotCatchUpEntries(1), + ) + require.NoErrorf(t, err, "could not start etcd process cluster (%v)", err) + t.Cleanup(func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) + + // simulate 10 clients keep writing to etcd in parallel with no error + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + g, ctx := errgroup.WithContext(ctx) + for i := 0; i < 10; i++ { + clientId := i + g.Go(func() error { + cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsV3()) + if err != nil { + return err + } + for j := 0; j < 100; j++ { + if err = cc.Put(ctx, "foo", fmt.Sprintf("bar%d", clientId), config.PutOptions{}); err != nil { + return err + } + } + return nil + }) + } + require.NoError(t, g.Wait()) } diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index f8fe27e2e55..870dc86def9 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -147,7 +147,8 @@ type EtcdProcessClusterConfig struct { MetricsURLScheme string - SnapshotCount int // default is 10000 + SnapshotCount int // default is 100000 + SnapshotCatchUpEntries int // default is 5000 Client ClientConfig IsPeerTLS bool @@ -223,6 +224,10 @@ func WithSnapshotCount(count int) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.SnapshotCount = count } } +func WithSnapshotCatchUpEntries(count int) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.SnapshotCatchUpEntries = count } +} + func WithClusterSize(size int) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.ClusterSize = size } } @@ -548,6 +553,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in if cfg.ExperimentalWarningUnaryRequestDuration != 0 { args = append(args, "--experimental-warning-unary-request-duration", cfg.ExperimentalWarningUnaryRequestDuration.String()) } + if cfg.SnapshotCatchUpEntries > 0 { + args = append(args, "--experimental-snapshot-catchup-entries", fmt.Sprintf("%d", cfg.SnapshotCatchUpEntries)) + } envVars := map[string]string{} for key, value := range cfg.EnvVars { envVars[key] = value