Skip to content

Commit

Permalink
Merge pull request #15033 from chaochn47/snapshotCatchupEntries
Browse files Browse the repository at this point in the history
externalize snapshot catchup entries to etcd flag
  • Loading branch information
ahrtr committed Jan 5, 2023
2 parents e73f55d + 2c46b2b commit 6200b22
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG/CHANGELOG-3.6.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
- Add [Protection on maintenance request when auth is enabled](https://github.com/etcd-io/etcd/pull/14663).
- Graduated [`--experimental-warning-unary-request-duration` to `--warning-unary-request-duration`](https://github.com/etcd-io/etcd/pull/14414). Note the experimental flag is deprecated and will be decommissioned in v3.7.
- Add [field `hash_revision` into `HashKVResponse`](https://github.com/etcd-io/etcd/pull/14537).
- Add [`etcd --experimental-snapshot-catch-up-entries`](https://github.com/etcd-io/etcd/pull/15033) flag to configure number of entries for a slow follower to catch up after compacting the the raft storage entries and defaults to 5k.

### etcd grpc-proxy

Expand Down
1 change: 0 additions & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ type ServerConfig struct {
// We expect the follower has a millisecond level latency with the leader.
// The max throughput is around 10K. Keep a 5K entries is enough for helping
// follower to catch up.
// WARNING: only change this for tests. Always use "DefaultSnapshotCatchUpEntries"
SnapshotCatchUpEntries uint64

MaxSnapFiles uint
Expand Down
4 changes: 1 addition & 3 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ type Config struct {
// We expect the follower has a millisecond level latency with the leader.
// The max throughput is around 10K. Keep a 5K entries is enough for helping
// follower to catch up.
// WARNING: only change this for tests.
// Always use "DefaultSnapshotCatchUpEntries"
SnapshotCatchUpEntries uint64
SnapshotCatchUpEntries uint64 `json:"experimental-snapshot-catch-up-entries"`

MaxSnapFiles uint `json:"max-snapshots"`
MaxWalFiles uint `json:"max-wals"`
Expand Down
1 change: 1 addition & 0 deletions server/etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func newConfig() *config {
fs.UintVar(&cfg.ec.ExperimentalBootstrapDefragThresholdMegabytes, "experimental-bootstrap-defrag-threshold-megabytes", 0, "Enable the defrag during etcd server bootstrap on condition that it will free at least the provided threshold of disk space. Needs to be set to non-zero value to take effect.")
fs.IntVar(&cfg.ec.ExperimentalMaxLearners, "experimental-max-learners", membership.DefaultMaxLearners, "Sets the maximum number of learners that can be available in the cluster membership.")
fs.DurationVar(&cfg.ec.ExperimentalWaitClusterReadyTimeout, "experimental-wait-cluster-ready-timeout", cfg.ec.ExperimentalWaitClusterReadyTimeout, "Maximum duration to wait for the cluster to be ready.")
fs.Uint64Var(&cfg.ec.SnapshotCatchUpEntries, "experimental-snapshot-catchup-entries", cfg.ec.SnapshotCatchUpEntries, "Number of entries for a slow follower to catch up after compacting the the raft storage entries.")

// unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
Expand Down
37 changes: 22 additions & 15 deletions server/etcdmain/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestConfigParsingMemberFlags(t *testing.T) {
"-max-wals=10",
"-max-snapshots=10",
"-snapshot-count=10",
"-experimental-snapshot-catchup-entries=1000",
"-listen-peer-urls=http://localhost:8000,https://localhost:8001",
"-listen-client-urls=http://localhost:7000,https://localhost:7001",
// it should be set if -listen-client-urls is set
Expand All @@ -51,20 +52,22 @@ func TestConfigParsingMemberFlags(t *testing.T) {

func TestConfigFileMemberFields(t *testing.T) {
yc := struct {
Dir string `json:"data-dir"`
MaxSnapFiles uint `json:"max-snapshots"`
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapshotCount uint64 `json:"snapshot-count"`
LPUrls string `json:"listen-peer-urls"`
LCUrls string `json:"listen-client-urls"`
AcurlsCfgFile string `json:"advertise-client-urls"`
Dir string `json:"data-dir"`
MaxSnapFiles uint `json:"max-snapshots"`
MaxWalFiles uint `json:"max-wals"`
Name string `json:"name"`
SnapshotCount uint64 `json:"snapshot-count"`
SnapshotCatchUpEntries uint64 `json:"experimental-snapshot-catch-up-entries"`
LPUrls string `json:"listen-peer-urls"`
LCUrls string `json:"listen-client-urls"`
AcurlsCfgFile string `json:"advertise-client-urls"`
}{
"testdir",
10,
10,
"testname",
10,
1000,
"http://localhost:8000,https://localhost:8001",
"http://localhost:7000,https://localhost:7001",
"http://localhost:7000,https://localhost:7001",
Expand Down Expand Up @@ -392,13 +395,14 @@ func mustCreateCfgFile(t *testing.T, b []byte) *os.File {

func validateMemberFlags(t *testing.T, cfg *config) {
wcfg := &embed.Config{
Dir: "testdir",
LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
MaxSnapFiles: 10,
MaxWalFiles: 10,
Name: "testname",
SnapshotCount: 10,
Dir: "testdir",
LPUrls: []url.URL{{Scheme: "http", Host: "localhost:8000"}, {Scheme: "https", Host: "localhost:8001"}},
LCUrls: []url.URL{{Scheme: "http", Host: "localhost:7000"}, {Scheme: "https", Host: "localhost:7001"}},
MaxSnapFiles: 10,
MaxWalFiles: 10,
Name: "testname",
SnapshotCount: 10,
SnapshotCatchUpEntries: 1000,
}

if cfg.ec.Dir != wcfg.Dir {
Expand All @@ -416,6 +420,9 @@ func validateMemberFlags(t *testing.T, cfg *config) {
if cfg.ec.SnapshotCount != wcfg.SnapshotCount {
t.Errorf("snapcount = %v, want %v", cfg.ec.SnapshotCount, wcfg.SnapshotCount)
}
if cfg.ec.SnapshotCatchUpEntries != wcfg.SnapshotCatchUpEntries {
t.Errorf("snapshot catch up entries = %v, want %v", cfg.ec.SnapshotCatchUpEntries, wcfg.SnapshotCatchUpEntries)
}
if !reflect.DeepEqual(cfg.ec.LPUrls, wcfg.LPUrls) {
t.Errorf("listen-peer-urls = %v, want %v", cfg.ec.LPUrls, wcfg.LPUrls)
}
Expand Down
2 changes: 2 additions & 0 deletions server/etcdmain/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ Experimental feature:
Set the max number of learner members allowed in the cluster membership.
--experimental-wait-cluster-ready-timeout '5s'
Set the maximum time duration to wait for the cluster to be ready.
--experimental-snapshot-catch-up-entries '5000'
Number of entries for a slow follower to catch up after compacting the the raft storage entries.
Unsafe feature:
--force-new-cluster 'false'
Expand Down
64 changes: 64 additions & 0 deletions tests/e2e/etcd_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"golang.org/x/sync/errgroup"

"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)

Expand Down Expand Up @@ -359,4 +363,64 @@ func TestBootstrapDefragFlag(t *testing.T) {
if err = proc.Stop(); err != nil {
t.Fatal(err)
}

// wait for the process to exit, otherwise test will have leaked goroutine
if err := proc.Close(); err != nil {
t.Logf("etcd process closed with error %v", err)
}
}

func TestSnapshotCatchupEntriesFlag(t *testing.T) {
e2e.SkipInShortMode(t)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

proc, err := e2e.SpawnCmd([]string{e2e.BinPath.Etcd, "--experimental-snapshot-catchup-entries", "1000"}, nil)
require.NoError(t, err)
require.NoError(t, e2e.WaitReadyExpectProc(ctx, proc, []string{"\"snapshot-catchup-entries\":1000"}))
require.NoError(t, e2e.WaitReadyExpectProc(ctx, proc, []string{"serving client traffic"}))
require.NoError(t, proc.Stop())

// wait for the process to exit, otherwise test will have leaked goroutine
if err := proc.Close(); err != nil {
t.Logf("etcd process closed with error %v", err)
}
}

// TestEtcdHealthyWithTinySnapshotCatchupEntries ensures multi-node etcd cluster remains healthy with 1 snapshot catch up entry
func TestEtcdHealthyWithTinySnapshotCatchupEntries(t *testing.T) {
e2e.BeforeTest(t)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithClusterSize(3),
e2e.WithSnapshotCount(1),
e2e.WithSnapshotCatchUpEntries(1),
)
require.NoErrorf(t, err, "could not start etcd process cluster (%v)", err)
t.Cleanup(func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
})

// simulate 10 clients keep writing to etcd in parallel with no error
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < 10; i++ {
clientId := i
g.Go(func() error {
cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsV3())
if err != nil {
return err
}
for j := 0; j < 100; j++ {
if err = cc.Put(ctx, "foo", fmt.Sprintf("bar%d", clientId), config.PutOptions{}); err != nil {
return err
}
}
return nil
})
}
require.NoError(t, g.Wait())
}
10 changes: 9 additions & 1 deletion tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ type EtcdProcessClusterConfig struct {

MetricsURLScheme string

SnapshotCount int // default is 10000
SnapshotCount int // default is 100000
SnapshotCatchUpEntries int // default is 5000

Client ClientConfig
IsPeerTLS bool
Expand Down Expand Up @@ -223,6 +224,10 @@ func WithSnapshotCount(count int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.SnapshotCount = count }
}

func WithSnapshotCatchUpEntries(count int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.SnapshotCatchUpEntries = count }
}

func WithClusterSize(size int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.ClusterSize = size }
}
Expand Down Expand Up @@ -548,6 +553,9 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
if cfg.ExperimentalWarningUnaryRequestDuration != 0 {
args = append(args, "--experimental-warning-unary-request-duration", cfg.ExperimentalWarningUnaryRequestDuration.String())
}
if cfg.SnapshotCatchUpEntries > 0 {
args = append(args, "--experimental-snapshot-catchup-entries", fmt.Sprintf("%d", cfg.SnapshotCatchUpEntries))
}
envVars := map[string]string{}
for key, value := range cfg.EnvVars {
envVars[key] = value
Expand Down

0 comments on commit 6200b22

Please sign in to comment.