Skip to content

Commit

Permalink
Fix didn't reload namespaces from DB after the full sync
Browse files Browse the repository at this point in the history
Namespaces will store inside DB if the repl-namespace-enabled was set to
yes. And then replicas will intercept and parse the increment replication log
to see if it needs to reload namespaces. But it won't have the namespace
update log when doing the full sync, so we need to forcely reload from
DB once the full replication was done.
  • Loading branch information
git-hulk committed Sep 8, 2024
1 parent 2836bf0 commit 1dd0df0
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 4 deletions.
7 changes: 7 additions & 0 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,13 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
LOG(INFO) << "[replication] Succeeded restoring the backup, fullsync was finish";
post_fullsync_cb_();

// It needs to reload namespaces from DB after the full sync is done,
// or namespaces are not visible in the replica.
s = srv_->GetNamespace()->LoadAndRewrite();
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to load and rewrite namespace: " << s.Msg();
}

// Switch to psync state machine again
psync_steps_.Start();
return CBState::QUIT;
Expand Down
3 changes: 2 additions & 1 deletion src/server/namespace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ bool Namespace::IsAllowModify() const {
Status Namespace::loadFromDB(std::map<std::string, std::string>* db_tokens) const {
std::string value;
engine::Context ctx(storage_);
auto s = storage_->Get(ctx, ctx.GetReadOptions(), cf_, kNamespaceDBKey, &value);
auto cf = storage_->GetCFHandle(ColumnFamilyID::Propagate);
auto s = storage_->Get(ctx, ctx.GetReadOptions(), cf, kNamespaceDBKey, &value);
if (!s.ok()) {
if (s.IsNotFound()) return Status::OK();
return {Status::NotOK, s.ToString()};
Expand Down
4 changes: 1 addition & 3 deletions src/server/namespace.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ constexpr const char *kNamespaceDBKey = "__namespace_keys__";

class Namespace {
public:
explicit Namespace(engine::Storage *storage)
: storage_(storage), cf_(storage_->GetCFHandle(ColumnFamilyID::Propagate)) {}
explicit Namespace(engine::Storage *storage) : storage_(storage) {}

~Namespace() = default;
Namespace(const Namespace &) = delete;
Expand All @@ -45,7 +44,6 @@ class Namespace {

private:
engine::Storage *storage_;
rocksdb::ColumnFamilyHandle *cf_ = nullptr;

std::shared_mutex tokens_mu_;
// mapping from token to namespace name
Expand Down
39 changes: 39 additions & 0 deletions tests/gocase/unit/namespace/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package namespace

import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -252,6 +254,43 @@ func TestNamespaceReplicate(t *testing.T) {
})
}

func TestNamespaceReplicateWithFullSync(t *testing.T) {
config := map[string]string{
"rocksdb.write_buffer_size": "4",
"rocksdb.target_file_size_base": "16",
"rocksdb.max_write_buffer_number": "1",
"rocksdb.wal_ttl_seconds": "0",
"rocksdb.wal_size_limit_mb": "0",
"repl-namespace-enabled": "yes",
"requirepass": "123",
"masterauth": "123",
}
master := util.StartServer(t, config)
defer master.Close()
masterClient := master.NewClientWithOption(&redis.Options{Password: "123"})
defer func() { require.NoError(t, masterClient.Close()) }()

slave := util.StartServer(t, config)
defer slave.Close()
slaveClient := slave.NewClientWithOption(&redis.Options{Password: "123"})
defer func() { require.NoError(t, slaveClient.Close()) }()

ctx := context.Background()
value := strings.Repeat("a", 128*1024)
for i := 0; i < 1024; i++ {
require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), value, 0).Err())
}
require.NoError(t, masterClient.Do(ctx, "NAMESPACE", "ADD", "foo", "bar").Err())

util.SlaveOf(t, slaveClient, master)
util.WaitForOffsetSync(t, masterClient, slaveClient, 60*time.Second)

// Namespaces should be replicated after the full sync
token, err := slaveClient.Do(ctx, "NAMESPACE", "GET", "foo").Result()
require.NoError(t, err)
require.EqualValues(t, "bar", token)
}

func TestNamespaceRewrite(t *testing.T) {
password := "pwd"
srv := util.StartServerWithCLIOptions(t, false, map[string]string{
Expand Down

0 comments on commit 1dd0df0

Please sign in to comment.