From 06a2cb73696413eaf9324a9699b37598bb83bd46 Mon Sep 17 00:00:00 2001 From: Clement Date: Tue, 24 Sep 2024 22:52:07 +0800 Subject: [PATCH] add compacti field to etcdProgress; add a test case Signed-off-by: Clement --- server/etcdserver/server.go | 12 +++- tests/integration/raft_log_test.go | 95 ++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 1 deletion(-) create mode 100644 tests/integration/raft_log_test.go diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 900a1a8e03f..19bfc3a8bd9 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -745,6 +745,7 @@ type etcdProgress struct { snapi uint64 appliedt uint64 appliedi uint64 + compacti uint64 } // raftReadyHandler contains a set of EtcdServer operations to be called by raftNode, @@ -764,6 +765,10 @@ func (s *EtcdServer) run() { if err != nil { lg.Panic("failed to get snapshot from Raft storage", zap.Error(err)) } + fi, err := s.r.raftStorage.FirstIndex() + if err != nil { + lg.Panic("failed to get first index from Raft storage", zap.Error(err)) + } // asynchronously accept toApply packets, dispatch progress in-order sched := schedule.NewFIFOScheduler(lg) @@ -813,6 +818,7 @@ func (s *EtcdServer) run() { snapi: sn.Metadata.Index, appliedt: sn.Metadata.Term, appliedi: sn.Metadata.Index, + compacti: fi - 1, } defer func() { @@ -2177,11 +2183,14 @@ func (s *EtcdServer) maybeCompactRaftLog(ep *etcdProgress) { // Retain all log entries up to the latest snapshot index to ensure any member can recover from that snapshot. // Beyond the snapshot index, preserve the most recent s.Cfg.SnapshotCatchUpEntries entries in memory. // This allows slow followers to catch up by synchronizing entries instead of requiring a full snapshot transfer. - if ep.snapi <= s.Cfg.SnapshotCatchUpEntries { + if ep.snapi <= s.Cfg.SnapshotCatchUpEntries { return } compacti := ep.snapi - s.Cfg.SnapshotCatchUpEntries + if compacti <= ep.compacti { + return + } lg := s.Logger() @@ -2196,6 +2205,7 @@ func (s *EtcdServer) maybeCompactRaftLog(ep *etcdProgress) { } err := s.r.raftStorage.Compact(compacti) + ep.compacti = compacti if err != nil { // the compaction was done asynchronously with the progress of raft. // raft log might already been compact. diff --git a/tests/integration/raft_log_test.go b/tests/integration/raft_log_test.go new file mode 100644 index 00000000000..680f3049749 --- /dev/null +++ b/tests/integration/raft_log_test.go @@ -0,0 +1,95 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "context" + "errors" + "testing" + "time" + + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/tests/v3/framework/integration" +) + +// TestRaftLogCompaction tests whether raft log snapshot and compaction work correctly. +func TestRaftLogCompaction(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{ + Size: 1, + SnapshotCount: 10, + SnapshotCatchUpEntries: 5, + }) + defer clus.Terminate(t) + + mem := clus.Members[0] + kvc := integration.ToGRPC(mem.Client).KV + + // When starting a new cluster with 1 member, the member will have an index of 4. + // TODO: Can someone explain this? + // Currently, if `ep.appliedi-ep.snapi > s.Cfg.SnapshotCount`, + // a raft log snapshot is created, and raft log entries are compacted. + // In this case, it triggers when the index is a multiple of 11. + appliedi := 4 + for ; appliedi <= 10; appliedi++ { + _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) + if err != nil { + t.Errorf("#%d: couldn't put key (%v)", appliedi, err) + } + } + // The first snapshot and compaction shouldn't happen because the index is less than 11 + expectMemberLogTimeout(t, mem, 5*time.Second, "saved snapshot", 1) + expectMemberLogTimeout(t, mem, time.Second, "compacted Raft logs", 1) + + for ; appliedi <= 11; appliedi++ { + _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) + if err != nil { + t.Errorf("#%d: couldn't put key (%v)", appliedi, err) + } + } + // The first snapshot and compaction should happen because the index is 11 + expectMemberLog(t, mem, 5*time.Second, "saved snapshot", 1) + expectMemberLog(t, mem, time.Second, "compacted Raft logs", 1) + expectMemberLog(t, mem, time.Second, "\"compact-index\": 6", 1) + + for ; appliedi <= 1100; appliedi++ { + _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) + if err != nil { + t.Errorf("#%d: couldn't put key (%v)", appliedi, err) + } + } + // With the index at 1100, snapshot and compaction should happen 100 times. + expectMemberLog(t, mem, 5*time.Second, "saved snapshot", 100) + expectMemberLog(t, mem, time.Second, "compacted Raft logs", 100) + expectMemberLog(t, mem, time.Second, "\"compact-index\": 1095", 1) + + // No more snapshot and compaction should happen. + expectMemberLogTimeout(t, mem, 5*time.Second, "saved snapshot", 101) + expectMemberLogTimeout(t, mem, time.Second, "compacted Raft logs", 101) +} + +// expectMemberLogTimeout ensures that the log has fewer than `count` occurrences of `s` before timing out +func expectMemberLogTimeout(t *testing.T, m *integration.Member, timeout time.Duration, s string, count int) { + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + _, err := m.LogObserver.Expect(ctx, s, count) + if !errors.Is(err, context.DeadlineExceeded) { + if err != nil { + t.Fatalf("failed to expect (log:%s, count:%v): %v", s, count, err) + } + } +}