From 09843c99787c3d7b9f57f9e8a936f31d06472e3a Mon Sep 17 00:00:00 2001 From: lintanghui Date: Tue, 18 Jan 2022 11:48:51 +0800 Subject: [PATCH 01/19] add raft_sync_per_bytes flag --- src/braft/log.cpp | 11 ++++++++--- src/braft/log.h | 5 +++-- src/braft/storage.cpp | 3 +++ src/braft/storage.h | 1 + 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/braft/log.cpp b/src/braft/log.cpp index 27519ae1..6aed1730 100644 --- a/src/braft/log.cpp +++ b/src/braft/log.cpp @@ -425,6 +425,7 @@ int Segment::append(const LogEntry* entry) { _offset_and_term.push_back(std::make_pair(_bytes, entry->id.term)); _last_index.fetch_add(1, butil::memory_order_relaxed); _bytes += to_write; + _unsynced_bytes += to_write; return 0; } @@ -432,11 +433,15 @@ int Segment::append(const LogEntry* entry) { int Segment::sync(bool will_sync) { if (_last_index > _first_index) { //CHECK(_is_open); - if (FLAGS_raft_sync && will_sync) { + if (will_sync) { + if (FLAGS_raft_sync) { return raft_fsync(_fd); - } else { - return 0; + } else if (FLAGS_raft_sync_per_bytes < _unsynced_bytes) { + _unsynced_bytes = 0; + return raft_fsync(_fd); + } } + return 0; } else { return 0; } diff --git a/src/braft/log.h b/src/braft/log.h index 364d40fa..7cd504c8 100644 --- a/src/braft/log.h +++ b/src/braft/log.h @@ -35,14 +35,14 @@ class BAIDU_CACHELINE_ALIGNMENT Segment : public butil::RefCountedThreadSafe { public: Segment(const std::string& path, const int64_t first_index, int checksum_type) - : _path(path), _bytes(0), + : _path(path), _bytes(0),_unsynced_bytes(0), _fd(-1), _is_open(true), _first_index(first_index), _last_index(first_index - 1), _checksum_type(checksum_type) {} Segment(const std::string& path, const int64_t first_index, const int64_t last_index, int checksum_type) - : _path(path), _bytes(0), + : _path(path), _bytes(0),_unsynced_bytes(0), _fd(-1), _is_open(false), _first_index(first_index), _last_index(last_index), _checksum_type(checksum_type) @@ -119,6 +119,7 @@ friend class butil::RefCountedThreadSafe; std::string _path; int64_t _bytes; + int64_t _unsynced_bytes; mutable raft_mutex_t _mutex; int _fd; bool _is_open; diff --git a/src/braft/storage.cpp b/src/braft/storage.cpp index ce0a09f4..f95bb28f 100644 --- a/src/braft/storage.cpp +++ b/src/braft/storage.cpp @@ -30,11 +30,14 @@ namespace braft { DEFINE_bool(raft_sync, true, "call fsync when need"); BRPC_VALIDATE_GFLAG(raft_sync, ::brpc::PassValidate); +DEFINE_int32(raft_sync_per_bytes, INT32_MAX, + "sync raft log per bytes when raft_sync set to false"); DEFINE_bool(raft_create_parent_directories, true, "Create parent directories of the path in local storage if true"); DEFINE_bool(raft_sync_meta, false, "sync log meta, snapshot meta and raft meta"); BRPC_VALIDATE_GFLAG(raft_sync_meta, ::brpc::PassValidate); +BRPC_VALIDATE_GFLAG(raft_sync_per_bytes, ::brpc::NonNegativeInteger); LogStorage* LogStorage::create(const std::string& uri) { butil::StringPiece copied_uri(uri); diff --git a/src/braft/storage.h b/src/braft/storage.h index c5c53546..0a22c8a5 100644 --- a/src/braft/storage.h +++ b/src/braft/storage.h @@ -38,6 +38,7 @@ namespace braft { DECLARE_bool(raft_sync); DECLARE_bool(raft_sync_meta); +DECLARE_int32(raft_sync_per_bytes); DECLARE_bool(raft_create_parent_directories); struct LogEntry; From 343d82f2397befcca43a5aa610ffc5b1e98364b4 Mon Sep 17 00:00:00 2001 From: lintanghui Date: Tue, 18 Jan 2022 14:18:48 +0800 Subject: [PATCH 02/19] fix format --- src/braft/log.cpp | 12 ++++++------ src/braft/log.h | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/braft/log.cpp b/src/braft/log.cpp index 6aed1730..034d13d2 100644 --- a/src/braft/log.cpp +++ b/src/braft/log.cpp @@ -434,12 +434,12 @@ int Segment::sync(bool will_sync) { if (_last_index > _first_index) { //CHECK(_is_open); if (will_sync) { - if (FLAGS_raft_sync) { - return raft_fsync(_fd); - } else if (FLAGS_raft_sync_per_bytes < _unsynced_bytes) { - _unsynced_bytes = 0; - return raft_fsync(_fd); - } + if (FLAGS_raft_sync) { + return raft_fsync(_fd); + } else if (FLAGS_raft_sync_per_bytes < _unsynced_bytes) { + _unsynced_bytes = 0; + return raft_fsync(_fd); + } } return 0; } else { diff --git a/src/braft/log.h b/src/braft/log.h index 7cd504c8..a74017d6 100644 --- a/src/braft/log.h +++ b/src/braft/log.h @@ -35,14 +35,14 @@ class BAIDU_CACHELINE_ALIGNMENT Segment : public butil::RefCountedThreadSafe { public: Segment(const std::string& path, const int64_t first_index, int checksum_type) - : _path(path), _bytes(0),_unsynced_bytes(0), + : _path(path), _bytes(0), _unsynced_bytes(0), _fd(-1), _is_open(true), _first_index(first_index), _last_index(first_index - 1), _checksum_type(checksum_type) {} Segment(const std::string& path, const int64_t first_index, const int64_t last_index, int checksum_type) - : _path(path), _bytes(0),_unsynced_bytes(0), + : _path(path), _bytes(0), _unsynced_bytes(0), _fd(-1), _is_open(false), _first_index(first_index), _last_index(last_index), _checksum_type(checksum_type) From cc0947783327d01afbf29ff0f1a9f6a4f5c76c3a Mon Sep 17 00:00:00 2001 From: lintanghui Date: Fri, 21 Jan 2022 19:02:23 +0800 Subject: [PATCH 03/19] add raft sync policy --- src/braft/log.cpp | 30 ++++++++++++++++++------------ src/braft/storage.cpp | 2 +- src/braft/storage.h | 1 + 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/braft/log.cpp b/src/braft/log.cpp index 034d13d2..7e79fffc 100644 --- a/src/braft/log.cpp +++ b/src/braft/log.cpp @@ -69,6 +69,11 @@ enum CheckSumType { CHECKSUM_CRC32 = 1, }; +enum RaftSyncPolicy { + RAFT_SYNC_NOHTING = 0, + RAFT_SYNC_BY_BYTES = 1, +}; + // Format of Header, all fields are in network order // | -------------------- term (64bits) ------------------------- | // | entry-type (8bits) | checksum_type (8bits) | reserved(16bits) | @@ -431,20 +436,21 @@ int Segment::append(const LogEntry* entry) { } int Segment::sync(bool will_sync) { - if (_last_index > _first_index) { - //CHECK(_is_open); - if (will_sync) { - if (FLAGS_raft_sync) { - return raft_fsync(_fd); - } else if (FLAGS_raft_sync_per_bytes < _unsynced_bytes) { - _unsynced_bytes = 0; - return raft_fsync(_fd); - } - } - return 0; - } else { + if (_last_index < _first_index) { return 0; } + //CHECK(_is_open); + if (will_sync) { + if (FLAGS_raft_sync) { + return raft_fsync(_fd); + } + if (FLAGS_raft_sync_policy == RaftSyncPolicy::RAFT_SYNC_BY_BYTES + && FLAGS_raft_sync_per_bytes < _unsynced_bytes) { + _unsynced_bytes = 0; + return raft_fsync(_fd); + } + } + return 0; } LogEntry* Segment::get(const int64_t index) const { diff --git a/src/braft/storage.cpp b/src/braft/storage.cpp index f95bb28f..cb969e27 100644 --- a/src/braft/storage.cpp +++ b/src/braft/storage.cpp @@ -34,7 +34,7 @@ DEFINE_int32(raft_sync_per_bytes, INT32_MAX, "sync raft log per bytes when raft_sync set to false"); DEFINE_bool(raft_create_parent_directories, true, "Create parent directories of the path in local storage if true"); - +DEFINE_int32(raft_sync_policy, 0, "raft sync policy when raft_sync set to false"); DEFINE_bool(raft_sync_meta, false, "sync log meta, snapshot meta and raft meta"); BRPC_VALIDATE_GFLAG(raft_sync_meta, ::brpc::PassValidate); BRPC_VALIDATE_GFLAG(raft_sync_per_bytes, ::brpc::NonNegativeInteger); diff --git a/src/braft/storage.h b/src/braft/storage.h index 0a22c8a5..21b5e0d3 100644 --- a/src/braft/storage.h +++ b/src/braft/storage.h @@ -39,6 +39,7 @@ namespace braft { DECLARE_bool(raft_sync); DECLARE_bool(raft_sync_meta); DECLARE_int32(raft_sync_per_bytes); +DECLARE_int32(raft_sync_policy); DECLARE_bool(raft_create_parent_directories); struct LogEntry; From b20114774dbe873048d9e7fc793c2029d8d990a4 Mon Sep 17 00:00:00 2001 From: lintanghui Date: Mon, 24 Jan 2022 11:09:01 +0800 Subject: [PATCH 04/19] fix param and add doc --- docs/cn/server.md | 2 ++ src/braft/log.cpp | 3 ++- src/braft/storage.cpp | 4 +++- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/docs/cn/server.md b/docs/cn/server.md index f98a1dc9..380b61b4 100644 --- a/docs/cn/server.md +++ b/docs/cn/server.md @@ -551,3 +551,5 @@ raft中有很多flags配置项,运行中可以通过http://endpoint/flags 查 | raft_max_byte_count_per_rpc | snapshot每次rpc下载大小 | | raft_apply_batch | apply的时候最大batch数量 | | raft_election_heartbeat_factor | election超时与heartbeat超时的比例 | +| raft_sync_policy | raft_sync为false时的sync策略,0表示永远不主动进行sync,1表示每写入bytes进行sync | +| raft_sync_per_bytes | raft_sync_policy 为1 时生效,表示每写bytes进行sync | diff --git a/src/braft/log.cpp b/src/braft/log.cpp index 7e79fffc..4f811f1b 100644 --- a/src/braft/log.cpp +++ b/src/braft/log.cpp @@ -70,10 +70,11 @@ enum CheckSumType { }; enum RaftSyncPolicy { - RAFT_SYNC_NOHTING = 0, + RAFT_SYNC_NEVER = 0, RAFT_SYNC_BY_BYTES = 1, }; + // Format of Header, all fields are in network order // | -------------------- term (64bits) ------------------------- | // | entry-type (8bits) | checksum_type (8bits) | reserved(16bits) | diff --git a/src/braft/storage.cpp b/src/braft/storage.cpp index cb969e27..9f80fafe 100644 --- a/src/braft/storage.cpp +++ b/src/braft/storage.cpp @@ -34,7 +34,9 @@ DEFINE_int32(raft_sync_per_bytes, INT32_MAX, "sync raft log per bytes when raft_sync set to false"); DEFINE_bool(raft_create_parent_directories, true, "Create parent directories of the path in local storage if true"); -DEFINE_int32(raft_sync_policy, 0, "raft sync policy when raft_sync set to false"); +DEFINE_int32(raft_sync_policy, 0, + "raft sync policy when raft_sync set to false, 0 mean never sync, 1 mean sync by " + "writed bytes"); DEFINE_bool(raft_sync_meta, false, "sync log meta, snapshot meta and raft meta"); BRPC_VALIDATE_GFLAG(raft_sync_meta, ::brpc::PassValidate); BRPC_VALIDATE_GFLAG(raft_sync_per_bytes, ::brpc::NonNegativeInteger); From dc58466b552772c27c9cd7381f6ef5880ae65e3f Mon Sep 17 00:00:00 2001 From: lintanghui Date: Wed, 23 Feb 2022 20:32:36 +0800 Subject: [PATCH 05/19] check sync policy when raft_sync is true --- docs/cn/server.md | 2 +- src/braft/log.cpp | 11 ++++++----- src/braft/storage.cpp | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/docs/cn/server.md b/docs/cn/server.md index 380b61b4..a5af00ce 100644 --- a/docs/cn/server.md +++ b/docs/cn/server.md @@ -551,5 +551,5 @@ raft中有很多flags配置项,运行中可以通过http://endpoint/flags 查 | raft_max_byte_count_per_rpc | snapshot每次rpc下载大小 | | raft_apply_batch | apply的时候最大batch数量 | | raft_election_heartbeat_factor | election超时与heartbeat超时的比例 | -| raft_sync_policy | raft_sync为false时的sync策略,0表示永远不主动进行sync,1表示每写入bytes进行sync | +| raft_sync_policy | raft_sync为true时的细化策略,0表示直接进行sync,1表示每写入bytes才进行sync | | raft_sync_per_bytes | raft_sync_policy 为1 时生效,表示每写bytes进行sync | diff --git a/src/braft/log.cpp b/src/braft/log.cpp index 4f811f1b..bbb44d2e 100644 --- a/src/braft/log.cpp +++ b/src/braft/log.cpp @@ -442,14 +442,15 @@ int Segment::sync(bool will_sync) { } //CHECK(_is_open); if (will_sync) { - if (FLAGS_raft_sync) { - return raft_fsync(_fd); + if (!FLAGS_raft_sync) { + return 0; } if (FLAGS_raft_sync_policy == RaftSyncPolicy::RAFT_SYNC_BY_BYTES - && FLAGS_raft_sync_per_bytes < _unsynced_bytes) { - _unsynced_bytes = 0; - return raft_fsync(_fd); + && FLAGS_raft_sync_per_bytes > _unsynced_bytes) { + return 0; } + _unsynced_bytes = 0; + return raft_fsync(_fd); } return 0; } diff --git a/src/braft/storage.cpp b/src/braft/storage.cpp index 9f80fafe..1a23bdee 100644 --- a/src/braft/storage.cpp +++ b/src/braft/storage.cpp @@ -35,7 +35,7 @@ DEFINE_int32(raft_sync_per_bytes, INT32_MAX, DEFINE_bool(raft_create_parent_directories, true, "Create parent directories of the path in local storage if true"); DEFINE_int32(raft_sync_policy, 0, - "raft sync policy when raft_sync set to false, 0 mean never sync, 1 mean sync by " + "raft sync policy when raft_sync set to true, 0 mean sync immediately, 1 mean sync by " "writed bytes"); DEFINE_bool(raft_sync_meta, false, "sync log meta, snapshot meta and raft meta"); BRPC_VALIDATE_GFLAG(raft_sync_meta, ::brpc::PassValidate); From 9ef7c9d3437c1a48092c34dc452b1c989c71af87 Mon Sep 17 00:00:00 2001 From: lintanghui Date: Thu, 24 Feb 2022 20:20:13 +0800 Subject: [PATCH 06/19] fix sync policy desc --- docs/cn/server.md | 2 +- src/braft/log.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/cn/server.md b/docs/cn/server.md index a5af00ce..35a63967 100644 --- a/docs/cn/server.md +++ b/docs/cn/server.md @@ -551,5 +551,5 @@ raft中有很多flags配置项,运行中可以通过http://endpoint/flags 查 | raft_max_byte_count_per_rpc | snapshot每次rpc下载大小 | | raft_apply_batch | apply的时候最大batch数量 | | raft_election_heartbeat_factor | election超时与heartbeat超时的比例 | -| raft_sync_policy | raft_sync为true时的细化策略,0表示直接进行sync,1表示每写入bytes才进行sync | +| raft_sync_policy | raft_sync为true时的细化策略,0表示每次写都立即sync,1表示每写入多少bytes才进行一次sync | | raft_sync_per_bytes | raft_sync_policy 为1 时生效,表示每写bytes进行sync | diff --git a/src/braft/log.cpp b/src/braft/log.cpp index bbb44d2e..b58b04a9 100644 --- a/src/braft/log.cpp +++ b/src/braft/log.cpp @@ -70,7 +70,7 @@ enum CheckSumType { }; enum RaftSyncPolicy { - RAFT_SYNC_NEVER = 0, + RAFT_SYNC_IMMEDIATELY = 0, RAFT_SYNC_BY_BYTES = 1, }; From 6e87761d45f7ca0324e194b23d9816d818696099 Mon Sep 17 00:00:00 2001 From: lintanghui Date: Fri, 25 Feb 2022 09:19:45 +0800 Subject: [PATCH 07/19] fix sync_per_byte define desc --- src/braft/storage.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/braft/storage.cpp b/src/braft/storage.cpp index 1a23bdee..8630eb38 100644 --- a/src/braft/storage.cpp +++ b/src/braft/storage.cpp @@ -31,7 +31,8 @@ namespace braft { DEFINE_bool(raft_sync, true, "call fsync when need"); BRPC_VALIDATE_GFLAG(raft_sync, ::brpc::PassValidate); DEFINE_int32(raft_sync_per_bytes, INT32_MAX, - "sync raft log per bytes when raft_sync set to false"); + "sync raft log per bytes when raft_sync set to true"); +BRPC_VALIDATE_GFLAG(raft_sync_per_bytes, ::brpc::NonNegativeInteger); DEFINE_bool(raft_create_parent_directories, true, "Create parent directories of the path in local storage if true"); DEFINE_int32(raft_sync_policy, 0, @@ -39,7 +40,6 @@ DEFINE_int32(raft_sync_policy, 0, "writed bytes"); DEFINE_bool(raft_sync_meta, false, "sync log meta, snapshot meta and raft meta"); BRPC_VALIDATE_GFLAG(raft_sync_meta, ::brpc::PassValidate); -BRPC_VALIDATE_GFLAG(raft_sync_per_bytes, ::brpc::NonNegativeInteger); LogStorage* LogStorage::create(const std::string& uri) { butil::StringPiece copied_uri(uri); From e9c5a1dde2de61d2209048a3ccde5978162add22 Mon Sep 17 00:00:00 2001 From: lintanghui Date: Sun, 23 Apr 2023 17:55:54 +0800 Subject: [PATCH 08/19] init witness --- src/braft/cli.proto | 1 + src/braft/cli_service.cpp | 5 +- src/braft/configuration.h | 1 + src/braft/node.cpp | 38 +++++++-- src/braft/raft.h | 5 ++ src/braft/replicator.cpp | 7 +- src/braft/replicator.h | 139 +++++++++++++++----------------- src/braft/snapshot.cpp | 14 +++- src/braft/snapshot.h | 4 + src/braft/snapshot_executor.cpp | 3 + src/braft/snapshot_executor.h | 1 + 11 files changed, 132 insertions(+), 86 deletions(-) diff --git a/src/braft/cli.proto b/src/braft/cli.proto index fb123a9a..1b3dbc89 100644 --- a/src/braft/cli.proto +++ b/src/braft/cli.proto @@ -7,6 +7,7 @@ message AddPeerRequest { required string group_id = 1; required string leader_id = 2; required string peer_id = 3; + optional bool is_witness = 4; } message AddPeerResponse { diff --git a/src/braft/cli_service.cpp b/src/braft/cli_service.cpp index 5f84079d..1731605b 100644 --- a/src/braft/cli_service.cpp +++ b/src/braft/cli_service.cpp @@ -71,9 +71,12 @@ void CliServiceImpl::add_peer(::google::protobuf::RpcController* controller, request->peer_id().c_str()); return; } + if (request->is_witness()) { + adding_peer.witness = true; + } LOG(WARNING) << "Receive AddPeerRequest to " << node->node_id() << " from " << cntl->remote_side() - << ", adding " << request->peer_id(); + << ", adding " << request->peer_id() << " is_witness:" << request->is_witness(); Closure* add_peer_done = NewCallback( add_peer_returned, cntl, request, response, peers, node, done_guard.release()); diff --git a/src/braft/configuration.h b/src/braft/configuration.h index 21dae30d..eb331760 100644 --- a/src/braft/configuration.h +++ b/src/braft/configuration.h @@ -38,6 +38,7 @@ typedef std::string VersionedGroupId; struct PeerId { butil::EndPoint addr; // ip+port. int idx; // idx in same addr, default 0 + bool witness; PeerId() : idx(0) {} explicit PeerId(butil::EndPoint addr_) : addr(addr_), idx(0) {} diff --git a/src/braft/node.cpp b/src/braft/node.cpp index f5802f7f..294ab2a6 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -67,6 +67,9 @@ BRPC_VALIDATE_GFLAG(raft_rpc_channel_connect_timeout_ms, brpc::PositiveInteger); DECLARE_bool(raft_enable_leader_lease); +DEFINE_bool(raft_enable_witness_to_leader, false, +"enabel witness temporarily to become leader when leader down accidently"); + #ifndef UNIT_TEST static bvar::Adder g_num_nodes("raft_node_count"); #else @@ -253,6 +256,10 @@ int NodeImpl::init_snapshot_storage() { opt.init_term = _current_term; opt.filter_before_copy_remote = _options.filter_before_copy_remote; opt.usercode_in_pthread = _options.usercode_in_pthread; + // not need to copy data file when it is witness. + if (_options.is_witness) { + opt.copy_file = false; + } if (_options.snapshot_file_system_adaptor) { opt.file_system_adaptor = *_options.snapshot_file_system_adaptor; } @@ -498,9 +505,17 @@ int NodeImpl::init(const NodeOptions& options) { << ", did you forget to call braft::add_service()?"; return -1; } - - CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms + options.max_clock_drift_ms)); - CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms)); + if (options.is_witness) { + // if node is witness, set timer twice as data replication node, + // which guarantees data replica will be elected as leader priority。 + if (FLAGS_raft_enable_witness_to_leader) { + CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms * 2)); + CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms * 2 + options.max_clock_drift_ms)); + } + }else { + CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms)); + CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms + options.max_clock_drift_ms)); + } CHECK_EQ(0, _stepdown_timer.init(this, options.election_timeout_ms)); CHECK_EQ(0, _snapshot_timer.init(this, options.snapshot_interval_s * 1000)); @@ -524,7 +539,11 @@ int NodeImpl::init(const NodeOptions& options) { _fsm_caller = new FSMCaller(); _leader_lease.init(options.election_timeout_ms); - _follower_lease.init(options.election_timeout_ms, options.max_clock_drift_ms); + if (options.is_witness) { + _follower_lease.init(options.election_timeout_ms * 2, options.max_clock_drift_ms); + } else { + _follower_lease.init(options.election_timeout_ms, options.max_clock_drift_ms); + } // log storage and log manager init if (init_log_storage() != 0) { @@ -1302,9 +1321,14 @@ void NodeImpl::unsafe_reset_election_timeout_ms(int election_timeout_ms, _replicator_group.reset_heartbeat_interval( heartbeat_timeout(_options.election_timeout_ms)); _replicator_group.reset_election_timeout_interval(_options.election_timeout_ms); - _election_timer.reset(election_timeout_ms); - _leader_lease.reset_election_timeout_ms(election_timeout_ms); - _follower_lease.reset_election_timeout_ms(election_timeout_ms, _options.max_clock_drift_ms); + if (_options.is_witness && FLAGS_raft_enable_witness_to_leader) { + _election_timer.reset(election_timeout_ms * 2); + _follower_lease.reset_election_timeout_ms(election_timeout_ms * 2, _options.max_clock_drift_ms); + } else { + _election_timer.reset(election_timeout_ms); + _leader_lease.reset_election_timeout_ms(election_timeout_ms); + _follower_lease.reset_election_timeout_ms(election_timeout_ms, _options.max_clock_drift_ms); + } } void NodeImpl::on_error(const Error& e) { diff --git a/src/braft/raft.h b/src/braft/raft.h index ef9cead8..6c04c199 100644 --- a/src/braft/raft.h +++ b/src/braft/raft.h @@ -588,6 +588,11 @@ struct NodeOptions { // Default: false bool disable_cli; + // if true, this node is a witness node, when raft_enable_witness_to_leader is false, + // witness will never to be leader. so not need to init _vote_timer and _election_timer. + // if raft_enable_witness_to_leader is true, witness can be leader, but should transfer leader as soon as possible. + // Default: false + bool is_witness; // Construct a default instance NodeOptions(); diff --git a/src/braft/replicator.cpp b/src/braft/replicator.cpp index d64d8385..ceef58bc 100644 --- a/src/braft/replicator.cpp +++ b/src/braft/replicator.cpp @@ -47,6 +47,7 @@ DEFINE_int32(raft_retry_replicate_interval_ms, 1000, BRPC_VALIDATE_GFLAG(raft_retry_replicate_interval_ms, brpc::PositiveInteger); +DECLARE_bool(raft_enable_witness_to_leader); DECLARE_int64(raft_append_entry_high_lat_us); DECLARE_bool(raft_trace_append_entry_latency); @@ -621,8 +622,10 @@ int Replicator::_prepare_entry(int offset, EntryMeta* em, butil::IOBuf *data) { } else { CHECK(entry->type != ENTRY_TYPE_CONFIGURATION) << "log_index=" << log_index; } - em->set_data_len(entry->data.length()); - data->append(entry->data); + if (!is_witness() || FLAGS_raft_enable_witness_to_leader) { + em->set_data_len(entry->data.length()); + data->append(entry->data); + } entry->Release(); return 0; } diff --git a/src/braft/replicator.h b/src/braft/replicator.h index 7223900f..d31e5cd1 100644 --- a/src/braft/replicator.h +++ b/src/braft/replicator.h @@ -1,11 +1,11 @@ // Copyright (c) 2015 Baidu.com, Inc. All Rights Reserved -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,17 +16,17 @@ // Wang,Yao(wangyao02@baidu.com) // Xiong,Kai(xiongkai@baidu.com) -#ifndef BRAFT_REPLICATOR_H -#define BRAFT_REPLICATOR_H +#ifndef BRAFT_REPLICATOR_H +#define BRAFT_REPLICATOR_H -#include // bthread_id -#include // brpc::Channel +#include // brpc::Channel +#include // bthread_id -#include "braft/storage.h" // SnapshotStorage -#include "braft/raft.h" // Closure -#include "braft/configuration.h" // Configuration -#include "braft/raft.pb.h" // AppendEntriesRequest -#include "braft/log_manager.h" // LogManager +#include "braft/configuration.h" // Configuration +#include "braft/log_manager.h" // LogManager +#include "braft/raft.h" // Closure +#include "braft/raft.pb.h" // AppendEntriesRequest +#include "braft/storage.h" // SnapshotStorage namespace braft { @@ -40,7 +40,8 @@ class SnapshotThrottle; struct ReplicatorStatus : public butil::RefCountedThreadSafe { butil::atomic last_rpc_send_timestamp; - ReplicatorStatus() : last_rpc_send_timestamp(0) {} + ReplicatorStatus() : last_rpc_send_timestamp(0) { + } }; struct ReplicatorOptions { @@ -52,7 +53,7 @@ struct ReplicatorOptions { PeerId peer_id; LogManager* log_manager; BallotBox* ballot_box; - NodeImpl *node; + NodeImpl* node; int64_t term; SnapshotStorage* snapshot_storage; SnapshotThrottle* snapshot_throttle; @@ -64,14 +65,13 @@ typedef uint64_t ReplicatorId; class CatchupClosure : public Closure { public: virtual void Run() = 0; + protected: - CatchupClosure() - : _max_margin(0) - , _has_timer(false) - , _error_was_set(false) - {} + CatchupClosure() : _max_margin(0), _has_timer(false), _error_was_set(false) { + } + private: -friend class Replicator; + friend class Replicator; int64_t _max_margin; bthread_timer_t _timer; bool _has_timer; @@ -92,10 +92,9 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { static int join(ReplicatorId); // Wait until the margin between |last_log_index| from leader and the peer - // is less than |max_margin| or error occurs. + // is less than |max_margin| or error occurs. // |done| can't be NULL and it is called after waiting fnishies. - static void wait_for_caught_up(ReplicatorId, int64_t max_margin, - const timespec* due_time, + static void wait_for_caught_up(ReplicatorId, int64_t max_margin, const timespec* due_time, CatchupClosure* done); // Tranfer leadership to the very peer if the replicated logs are over @@ -127,7 +126,7 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { // Check if a replicator is readonly static bool readonly(ReplicatorId id); - + private: enum St { IDLE, @@ -160,8 +159,7 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { void _block(long start_time_us, int error_code); void _install_snapshot(); void _start_heartbeat_timer(long start_time_us); - void _send_timeout_now(bool unlock_id, bool old_leader_stepped_down, - int timeout_ms = -1); + void _send_timeout_now(bool unlock_id, bool old_leader_stepped_down, int timeout_ms = -1); int _transfer_leadership(int64_t log_index); void _cancel_append_entries_rpcs(); void _reset_next_index(); @@ -170,23 +168,17 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { } int _change_readonly_config(bool readonly); - static void _on_rpc_returned( - ReplicatorId id, brpc::Controller* cntl, - AppendEntriesRequest* request, - AppendEntriesResponse* response, - int64_t); + static void _on_rpc_returned(ReplicatorId id, brpc::Controller* cntl, + AppendEntriesRequest* request, AppendEntriesResponse* response, + int64_t); - static void _on_heartbeat_returned( - ReplicatorId id, brpc::Controller* cntl, - AppendEntriesRequest* request, - AppendEntriesResponse* response, - int64_t); + static void _on_heartbeat_returned(ReplicatorId id, brpc::Controller* cntl, + AppendEntriesRequest* request, + AppendEntriesResponse* response, int64_t); - static void _on_timeout_now_returned( - ReplicatorId id, brpc::Controller* cntl, - TimeoutNowRequest* request, - TimeoutNowResponse* response, - bool old_leader_stepped_down); + static void _on_timeout_now_returned(ReplicatorId id, brpc::Controller* cntl, + TimeoutNowRequest* request, TimeoutNowResponse* response, + bool old_leader_stepped_down); static void _on_timedout(void* arg); static void* _send_heartbeat(void* arg); @@ -195,12 +187,11 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { static int _continue_sending(void* arg, int error_code); static void* _run_on_caught_up(void*); static void _on_catch_up_timedout(void*); - static void _on_block_timedout(void *arg); - static void* _on_block_timedout_in_new_thread(void *arg); - static void _on_install_snapshot_returned( - ReplicatorId id, brpc::Controller* cntl, - InstallSnapshotRequest* request, - InstallSnapshotResponse* response); + static void _on_block_timedout(void* arg); + static void* _on_block_timedout_in_new_thread(void* arg); + static void _on_install_snapshot_returned(ReplicatorId id, brpc::Controller* cntl, + InstallSnapshotRequest* request, + InstallSnapshotResponse* response); void _destroy(); void _describe(std::ostream& os, bool use_html); void _get_status(PeerStatus* status); @@ -210,20 +201,23 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { if (_next_index < _options.log_manager->first_log_index()) { return false; } - if (_min_flying_index() - 1 + max_margin - < _options.log_manager->last_log_index()) { + if (_min_flying_index() - 1 + max_margin < _options.log_manager->last_log_index()) { return false; } return true; } + bool is_witness() const { + return _options.peer_id.witness; + } void _close_reader(); int64_t _last_rpc_send_timestamp() { - return _options.replicator_status->last_rpc_send_timestamp.load(butil::memory_order_relaxed); + return _options.replicator_status->last_rpc_send_timestamp.load( + butil::memory_order_relaxed); } void _update_last_rpc_send_timestamp(int64_t new_timestamp) { if (new_timestamp > _last_rpc_send_timestamp()) { - _options.replicator_status->last_rpc_send_timestamp - .store(new_timestamp, butil::memory_order_relaxed); + _options.replicator_status->last_rpc_send_timestamp.store(new_timestamp, + butil::memory_order_relaxed); } } @@ -232,10 +226,13 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { int64_t log_index; int entries_size; brpc::CallId call_id; - FlyingAppendEntriesRpc(int64_t index, int size, brpc::CallId id) - : log_index(index), entries_size(size), call_id(id) {} + FlyingAppendEntriesRpc(int64_t index, int size, brpc::CallId id) : + log_index(index), + entries_size(size), + call_id(id) { + } }; - + brpc::Channel _sending_channel; int64_t _next_index; int64_t _flying_append_entries_size; @@ -257,7 +254,7 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { ReplicatorOptions _options; bthread_timer_t _heartbeat_timer; SnapshotReader* _reader; - CatchupClosure *_catchup_closure; + CatchupClosure* _catchup_closure; }; struct ReplicatorGroupOptions { @@ -284,7 +281,7 @@ class ReplicatorGroup { ReplicatorGroup(); ~ReplicatorGroup(); int init(const NodeId& node_id, const ReplicatorGroupOptions&); - + // Add a replicator attached with |peer| // will be a notification when the replicator catches up according to the // arguments. @@ -292,17 +289,17 @@ class ReplicatorGroup { // immediately, annd might call node->step_down which might have race with // the caller, you should deal with this situation. int add_replicator(const PeerId& peer); - + // wait the very peer catchup - int wait_caughtup(const PeerId& peer, int64_t max_margin, - const timespec* due_time, CatchupClosure* done); + int wait_caughtup(const PeerId& peer, int64_t max_margin, const timespec* due_time, + CatchupClosure* done); int64_t last_rpc_send_timestamp(const PeerId& peer); // Stop all the replicators int stop_all(); - int stop_replicator(const PeerId &peer); + int stop_replicator(const PeerId& peer); // Reset the term of all to-add replicators. // This method is supposed to be called when the very candidate becomes the @@ -315,8 +312,8 @@ class ReplicatorGroup { // leader, use new heartbeat_interval, maybe call vote() reset election_timeout // Return 0 on success, -1 otherwise int reset_heartbeat_interval(int new_interval_ms); - - // Reset the interval of election_timeout for replicator, + + // Reset the interval of election_timeout for replicator, // used in rpc's set_timeout_ms int reset_election_timeout_interval(int new_interval_ms); @@ -331,26 +328,25 @@ class ReplicatorGroup { // Stop all the replicators except for the one that we think can be the // candidate of the next leader, which has the largest `last_log_id' among - // peers in |current_conf|. + // peers in |current_conf|. // |candidate| would be assigned to a valid ReplicatorId if we found one and // the caller is responsible for stopping it, or an invalid value if we // found none. // Returns 0 on success and -1 otherwise. int stop_all_and_find_the_next_candidate(ReplicatorId* candidate, const ConfigurationEntry& conf); - + // Find the follower with the most log entries in this group, which is // likely becomes the leader according to the election algorithm of raft. // Returns 0 on success and |peer_id| is assigned with the very peer. // -1 otherwise. - int find_the_next_candidate(PeerId* peer_id, - const ConfigurationEntry& conf); + int find_the_next_candidate(PeerId* peer_id, const ConfigurationEntry& conf); // List all the existing replicators void list_replicators(std::vector* out) const; // List all the existing replicators with PeerId - void list_replicators(std::vector >* out) const; + void list_replicators(std::vector>* out) const; // Change the readonly config for a peer int change_readonly_config(const PeerId& peer, bool readonly); @@ -359,8 +355,7 @@ class ReplicatorGroup { bool readonly(const PeerId& peer) const; private: - - int _add_replicator(const PeerId& peer, ReplicatorId *rid); + int _add_replicator(const PeerId& peer, ReplicatorId* rid); struct ReplicatorIdAndStatus { ReplicatorId id; @@ -373,6 +368,6 @@ class ReplicatorGroup { int _election_timeout_ms; }; -} // namespace braft +} // namespace braft -#endif //BRAFT_REPLICATOR_H +#endif // BRAFT_REPLICATOR_H diff --git a/src/braft/snapshot.cpp b/src/braft/snapshot.cpp index 8bf7d173..dc8a2a6a 100644 --- a/src/braft/snapshot.cpp +++ b/src/braft/snapshot.cpp @@ -575,7 +575,7 @@ SnapshotWriter* LocalSnapshotStorage::create(bool from_empty) { } SnapshotCopier* LocalSnapshotStorage::start_to_copy_from(const std::string& uri) { - LocalSnapshotCopier* copier = new LocalSnapshotCopier(); + LocalSnapshotCopier* copier = new LocalSnapshotCopier(_copy_file); copier->_storage = this; copier->_filter_before_copy_remote = _filter_before_copy_remote; copier->_fs = _fs.get(); @@ -738,7 +738,10 @@ butil::Status LocalSnapshotStorage::gc_instance(const std::string& uri) const { // LocalSnapshotCopier LocalSnapshotCopier::LocalSnapshotCopier() - : _tid(INVALID_BTHREAD) + : LocalSnapshotCopier(true){} + +LocalSnapshotCopier::LocalSnapshotCopier(bool copy_file): + _tid(INVALID_BTHREAD) , _cancelled(false) , _filter_before_copy_remote(false) , _fs(NULL) @@ -746,8 +749,8 @@ LocalSnapshotCopier::LocalSnapshotCopier() , _writer(NULL) , _storage(NULL) , _reader(NULL) - , _cur_session(NULL) -{} + , _copy_file(copy_file) + , _cur_session(NULL){} LocalSnapshotCopier::~LocalSnapshotCopier() { CHECK(!_writer); @@ -769,6 +772,9 @@ void LocalSnapshotCopier::copy() { if (!ok()) { break; } + if (!_copy_file) { + break; + } std::vector files; _remote_snapshot.list_files(&files); for (size_t i = 0; i < files.size() && ok(); ++i) { diff --git a/src/braft/snapshot.h b/src/braft/snapshot.h index 448ff4cd..8d617d92 100644 --- a/src/braft/snapshot.h +++ b/src/braft/snapshot.h @@ -147,6 +147,7 @@ class LocalSnapshotCopier : public SnapshotCopier { friend class LocalSnapshotStorage; public: LocalSnapshotCopier(); + LocalSnapshotCopier(bool copy_file); ~LocalSnapshotCopier(); virtual void cancel(); virtual void join(); @@ -166,6 +167,7 @@ friend class LocalSnapshotStorage; bthread_t _tid; bool _cancelled; bool _filter_before_copy_remote; + bool _copy_file = true; FileSystemAdaptor* _fs; SnapshotThrottle* _throttle; LocalSnapshotWriter* _writer; @@ -204,6 +206,7 @@ friend class LocalSnapshotCopier; void set_server_addr(butil::EndPoint server_addr) { _addr = server_addr; } bool has_server_addr() { return _addr != butil::EndPoint(); } + void set_copy_file(bool copy_file) { _copy_file = copy_file; } private: SnapshotWriter* create(bool from_empty) WARN_UNUSED_RESULT; int destroy_snapshot(const std::string& path); @@ -217,6 +220,7 @@ friend class LocalSnapshotCopier; int64_t _last_snapshot_index; std::map _ref_map; butil::EndPoint _addr; + bool _copy_file = true; scoped_refptr _fs; scoped_refptr _snapshot_throttle; }; diff --git a/src/braft/snapshot_executor.cpp b/src/braft/snapshot_executor.cpp index 64403013..e33ece86 100644 --- a/src/braft/snapshot_executor.cpp +++ b/src/braft/snapshot_executor.cpp @@ -373,6 +373,9 @@ int SnapshotExecutor::init(const SnapshotExecutorOptions& options) { if (tmp != NULL && !tmp->has_server_addr()) { tmp->set_server_addr(options.addr); } + if (!options.copy_file) { + tmp->set_copy_file(false); + } SnapshotReader* reader = _snapshot_storage->open(); if (reader == NULL) { return 0; diff --git a/src/braft/snapshot_executor.h b/src/braft/snapshot_executor.h index 648cfa63..d5879505 100644 --- a/src/braft/snapshot_executor.h +++ b/src/braft/snapshot_executor.h @@ -44,6 +44,7 @@ struct SnapshotExecutorOptions { butil::EndPoint addr; bool filter_before_copy_remote; bool usercode_in_pthread; + bool copy_file = true; scoped_refptr file_system_adaptor; scoped_refptr snapshot_throttle; }; From 5d676fe6c50fbe32bae63b3d092bf48d6b11853f Mon Sep 17 00:00:00 2001 From: lintanghui Date: Sun, 23 Apr 2023 21:29:27 +0800 Subject: [PATCH 09/19] init bazel test --- BUILD | 1 + test/BUILD.bazel | 243 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 244 insertions(+) create mode 100644 test/BUILD.bazel diff --git a/BUILD b/BUILD index 64d1faf2..f38b9231 100644 --- a/BUILD +++ b/BUILD @@ -1,3 +1,4 @@ +load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library") licenses(["notice"]) exports_files(["LICENSE"]) diff --git a/test/BUILD.bazel b/test/BUILD.bazel new file mode 100644 index 00000000..2b8cecc2 --- /dev/null +++ b/test/BUILD.bazel @@ -0,0 +1,243 @@ +load("@rules_cc//cc:defs.bzl", "cc_library", "cc_test") +COPTS = [ + "-std=c++11", + "-DGFLAGS_NS=google", + "-DUNIT_TEST", + "-Wall", + "-g", + "-ggdb", + "-fno-omit-frame-pointer", + "-Dprivate=public", + "-Dprotected=public", + "-include test/sstream_workaround.h", +] +cc_library( + name = "braft_test_library", + srcs = glob(["test_util.cpp"]), + hdrs = glob(["*.h"]), + copts = COPTS, + deps = [ + "//:braft", + ], +) +cc_test( + name = "test_ballot_box", + srcs = ["test_ballot_box.cpp"], + + deps = [ + "//:braft", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "test_ballot", + srcs = ["test_ballot.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_checksum", + srcs = ["test_checksum.cpp"], + deps = [ + "//:braft", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_cli", + srcs = ["test_cli.cpp"], + copts = COPTS, + deps = [ + "@com_github_gflags_gflags//:gflags", + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_configuration", + srcs = ["test_configuration.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_file_service", + srcs = ["test_file_service.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_file_system_adaptor", + srcs = ["test_file_system_adaptor.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_fsm_caller", + srcs = ["test_fsm_caller.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_fsync", + srcs = ["test_fsync.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_leader_lease", + srcs = ["test_leader_lease.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_log_entry", + srcs = ["test_log_entry.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_log_manager", + srcs = ["test_log_manager.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_log", + srcs = ["test_log.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_memory_storage", + srcs = ["test_memory_storage.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_meta", + srcs = ["test_configuration.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_node", + srcs = ["test_node.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_protobuf_file", + srcs = ["test_protobuf_file.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_repeated_timer_task", + srcs = ["test_repeated_timer_task.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "test_snapshot_executor", + srcs = ["test_snapshot_executor.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_snapshot", + srcs = ["test_snapshot.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) +cc_test( + name = "test_storage", + srcs = ["test_storage.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "test_throttle", + srcs = ["test_throttle.cpp"], + copts = COPTS, + deps = [ + "//:braft", + "//test:braft_test_library", + "@com_google_googletest//:gtest_main", + ], +) \ No newline at end of file From ed36465c255e244c217f1b1953b1ec0444223ace Mon Sep 17 00:00:00 2001 From: lintanghui Date: Thu, 27 Apr 2023 14:03:56 +0800 Subject: [PATCH 10/19] transfer leader when witness temporarily be leader --- src/braft/cli.proto | 1 - src/braft/cli_service.cpp | 5 +- src/braft/configuration.h | 19 ++- src/braft/node.cpp | 22 +++- src/braft/node.h | 4 +- src/braft/raft.h | 2 +- src/braft/replicator.cpp | 7 ++ src/braft/replicator.h | 2 +- src/braft/snapshot.cpp | 2 +- test/BUILD.bazel | 243 ------------------------------------ test/test_configuration.cpp | 12 ++ 11 files changed, 57 insertions(+), 262 deletions(-) delete mode 100644 test/BUILD.bazel diff --git a/src/braft/cli.proto b/src/braft/cli.proto index 1b3dbc89..fb123a9a 100644 --- a/src/braft/cli.proto +++ b/src/braft/cli.proto @@ -7,7 +7,6 @@ message AddPeerRequest { required string group_id = 1; required string leader_id = 2; required string peer_id = 3; - optional bool is_witness = 4; } message AddPeerResponse { diff --git a/src/braft/cli_service.cpp b/src/braft/cli_service.cpp index 1731605b..5f84079d 100644 --- a/src/braft/cli_service.cpp +++ b/src/braft/cli_service.cpp @@ -71,12 +71,9 @@ void CliServiceImpl::add_peer(::google::protobuf::RpcController* controller, request->peer_id().c_str()); return; } - if (request->is_witness()) { - adding_peer.witness = true; - } LOG(WARNING) << "Receive AddPeerRequest to " << node->node_id() << " from " << cntl->remote_side() - << ", adding " << request->peer_id() << " is_witness:" << request->is_witness(); + << ", adding " << request->peer_id(); Closure* add_peer_done = NewCallback( add_peer_returned, cntl, request, response, peers, node, done_guard.release()); diff --git a/src/braft/configuration.h b/src/braft/configuration.h index eb331760..83d4ba30 100644 --- a/src/braft/configuration.h +++ b/src/braft/configuration.h @@ -34,11 +34,15 @@ typedef std::string GroupId; // GroupId with version, format: {group_id}_{index} typedef std::string VersionedGroupId; +enum Role { + WITNESS = 1, +}; + // Represent a participant in a replicating group. struct PeerId { butil::EndPoint addr; // ip+port. int idx; // idx in same addr, default 0 - bool witness; + Role role; PeerId() : idx(0) {} explicit PeerId(butil::EndPoint addr_) : addr(addr_), idx(0) {} @@ -56,14 +60,21 @@ struct PeerId { bool is_empty() const { return (addr.ip == butil::IP_ANY && addr.port == 0 && idx == 0); } - + bool is_witness() const { + return role == WITNESS; + } int parse(const std::string& str) { reset(); char ip_str[64]; - if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d", ip_str, &addr.port, &idx)) { + int value = 0; + if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d%*[:]%d", ip_str, &addr.port, &idx, &value)) { reset(); return -1; } + role = (Role)value; + if (role > WITNESS) { + return -1; + } if (0 != butil::str2ip(ip_str, &addr.ip)) { reset(); return -1; @@ -73,7 +84,7 @@ struct PeerId { std::string to_string() const { char str[128]; - snprintf(str, sizeof(str), "%s:%d", butil::endpoint2str(addr).c_str(), idx); + snprintf(str, sizeof(str), "%s:%d:%d", butil::endpoint2str(addr).c_str(), idx, int(role)); return std::string(str); } diff --git a/src/braft/node.cpp b/src/braft/node.cpp index 294ab2a6..70e488eb 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -257,7 +257,7 @@ int NodeImpl::init_snapshot_storage() { opt.filter_before_copy_remote = _options.filter_before_copy_remote; opt.usercode_in_pthread = _options.usercode_in_pthread; // not need to copy data file when it is witness. - if (_options.is_witness) { + if (_options.witness) { opt.copy_file = false; } if (_options.snapshot_file_system_adaptor) { @@ -505,7 +505,7 @@ int NodeImpl::init(const NodeOptions& options) { << ", did you forget to call braft::add_service()?"; return -1; } - if (options.is_witness) { + if (options.witness) { // if node is witness, set timer twice as data replication node, // which guarantees data replica will be elected as leader priority。 if (FLAGS_raft_enable_witness_to_leader) { @@ -539,7 +539,7 @@ int NodeImpl::init(const NodeOptions& options) { _fsm_caller = new FSMCaller(); _leader_lease.init(options.election_timeout_ms); - if (options.is_witness) { + if (options.witness) { _follower_lease.init(options.election_timeout_ms * 2, options.max_clock_drift_ms); } else { _follower_lease.init(options.election_timeout_ms, options.max_clock_drift_ms); @@ -831,7 +831,7 @@ void NodeImpl::handle_stepdown_timeout() { << " state is " << state2str(_state); return; } - + check_witness(_conf.conf); int64_t now = butil::monotonic_time_ms(); check_dead_nodes(_conf.conf, now); if (!_conf.old_conf.empty()) { @@ -839,6 +839,18 @@ void NodeImpl::handle_stepdown_timeout() { } } +void NodeImpl::check_witness(const Configuration& conf) { + if (is_witness()) { + LOG(WARNING) << "node " << node_id() + << " term " << _current_term + << " steps down as it's a witness but become leader temporarily" + << " conf: " << conf; + butil::Status status; + status.set_error(ETRANSFERLEADERSHIP, "Witness becomes leader temporarily"); + step_down(_current_term, true, status); + } +} + void NodeImpl::unsafe_register_conf_change(const Configuration& old_conf, const Configuration& new_conf, Closure* done) { @@ -1321,7 +1333,7 @@ void NodeImpl::unsafe_reset_election_timeout_ms(int election_timeout_ms, _replicator_group.reset_heartbeat_interval( heartbeat_timeout(_options.election_timeout_ms)); _replicator_group.reset_election_timeout_interval(_options.election_timeout_ms); - if (_options.is_witness && FLAGS_raft_enable_witness_to_leader) { + if (_options.witness && FLAGS_raft_enable_witness_to_leader) { _election_timer.reset(election_timeout_ms * 2); _follower_lease.reset_election_timeout_ms(election_timeout_ms * 2, _options.max_clock_drift_ms); } else { diff --git a/src/braft/node.h b/src/braft/node.h index d8565f39..b9dd3e82 100644 --- a/src/braft/node.h +++ b/src/braft/node.h @@ -240,7 +240,7 @@ friend class VoteBallotCtx; int bootstrap(const BootstrapOptions& options); bool disable_cli() const { return _options.disable_cli; } - + bool is_witness() const { return _options.witness; } private: friend class butil::RefCountedThreadSafe; @@ -305,7 +305,7 @@ friend class butil::RefCountedThreadSafe; void* meta, bthread::TaskIterator& iter); void apply(LogEntryAndClosure tasks[], size_t size); void check_dead_nodes(const Configuration& conf, int64_t now_ms); - + void check_witness(const Configuration& conf); bool handle_out_of_order_append_entries(brpc::Controller* cntl, const AppendEntriesRequest* request, AppendEntriesResponse* response, diff --git a/src/braft/raft.h b/src/braft/raft.h index 6c04c199..69fb3bb7 100644 --- a/src/braft/raft.h +++ b/src/braft/raft.h @@ -592,7 +592,7 @@ struct NodeOptions { // witness will never to be leader. so not need to init _vote_timer and _election_timer. // if raft_enable_witness_to_leader is true, witness can be leader, but should transfer leader as soon as possible. // Default: false - bool is_witness; + bool witness = false; // Construct a default instance NodeOptions(); diff --git a/src/braft/replicator.cpp b/src/braft/replicator.cpp index ceef58bc..fb3acf88 100644 --- a/src/braft/replicator.cpp +++ b/src/braft/replicator.cpp @@ -1528,6 +1528,13 @@ int ReplicatorGroup::find_the_next_candidate( *peer_id = iter->first; } } + // transfer leadership to the non witness peer priority. + if (consecutive_error_times == 0 && next_index == max_index) { + if (peer_id && peer_id->is_witness()) { + *peer_id = iter->first; + } + } + } if (max_index == 0) { return -1; diff --git a/src/braft/replicator.h b/src/braft/replicator.h index d31e5cd1..c3cf2e66 100644 --- a/src/braft/replicator.h +++ b/src/braft/replicator.h @@ -207,7 +207,7 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { return true; } bool is_witness() const { - return _options.peer_id.witness; + return _options.peer_id.is_witness(); } void _close_reader(); int64_t _last_rpc_send_timestamp() { diff --git a/src/braft/snapshot.cpp b/src/braft/snapshot.cpp index dc8a2a6a..7a97132a 100644 --- a/src/braft/snapshot.cpp +++ b/src/braft/snapshot.cpp @@ -744,12 +744,12 @@ LocalSnapshotCopier::LocalSnapshotCopier(bool copy_file): _tid(INVALID_BTHREAD) , _cancelled(false) , _filter_before_copy_remote(false) + , _copy_file(copy_file) , _fs(NULL) , _throttle(NULL) , _writer(NULL) , _storage(NULL) , _reader(NULL) - , _copy_file(copy_file) , _cur_session(NULL){} LocalSnapshotCopier::~LocalSnapshotCopier() { diff --git a/test/BUILD.bazel b/test/BUILD.bazel deleted file mode 100644 index 2b8cecc2..00000000 --- a/test/BUILD.bazel +++ /dev/null @@ -1,243 +0,0 @@ -load("@rules_cc//cc:defs.bzl", "cc_library", "cc_test") -COPTS = [ - "-std=c++11", - "-DGFLAGS_NS=google", - "-DUNIT_TEST", - "-Wall", - "-g", - "-ggdb", - "-fno-omit-frame-pointer", - "-Dprivate=public", - "-Dprotected=public", - "-include test/sstream_workaround.h", -] -cc_library( - name = "braft_test_library", - srcs = glob(["test_util.cpp"]), - hdrs = glob(["*.h"]), - copts = COPTS, - deps = [ - "//:braft", - ], -) -cc_test( - name = "test_ballot_box", - srcs = ["test_ballot_box.cpp"], - - deps = [ - "//:braft", - "@com_google_googletest//:gtest_main", - ], -) - -cc_test( - name = "test_ballot", - srcs = ["test_ballot.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_checksum", - srcs = ["test_checksum.cpp"], - deps = [ - "//:braft", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_cli", - srcs = ["test_cli.cpp"], - copts = COPTS, - deps = [ - "@com_github_gflags_gflags//:gflags", - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_configuration", - srcs = ["test_configuration.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_file_service", - srcs = ["test_file_service.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_file_system_adaptor", - srcs = ["test_file_system_adaptor.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_fsm_caller", - srcs = ["test_fsm_caller.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_fsync", - srcs = ["test_fsync.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_leader_lease", - srcs = ["test_leader_lease.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_log_entry", - srcs = ["test_log_entry.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_log_manager", - srcs = ["test_log_manager.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_log", - srcs = ["test_log.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_memory_storage", - srcs = ["test_memory_storage.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_meta", - srcs = ["test_configuration.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_node", - srcs = ["test_node.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_protobuf_file", - srcs = ["test_protobuf_file.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_repeated_timer_task", - srcs = ["test_repeated_timer_task.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) - -cc_test( - name = "test_snapshot_executor", - srcs = ["test_snapshot_executor.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_snapshot", - srcs = ["test_snapshot.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) -cc_test( - name = "test_storage", - srcs = ["test_storage.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) - -cc_test( - name = "test_throttle", - srcs = ["test_throttle.cpp"], - copts = COPTS, - deps = [ - "//:braft", - "//test:braft_test_library", - "@com_google_googletest//:gtest_main", - ], -) \ No newline at end of file diff --git a/test/test_configuration.cpp b/test/test_configuration.cpp index aa8b17c9..9461128d 100644 --- a/test/test_configuration.cpp +++ b/test/test_configuration.cpp @@ -42,6 +42,18 @@ TEST_F(TestUsageSuits, PeerId) { LOG(INFO) << "id:" << id1.to_string(); LOG(INFO) << "id:" << id1; + ASSERT_EQ(0, id1.parse("1.1.1.1:1000:0:0")); + LOG(INFO) << "id:" << id1.to_string(); + LOG(INFO) << "id:" << id1; + ASSERT_FALSE(id1.is_witness()); + + ASSERT_EQ(0, id1.parse("1.1.1.1:1000:0:1")); + LOG(INFO) << "id:" << id1.to_string(); + LOG(INFO) << "id:" << id1; + ASSERT_TRUE(id1.is_witness()); + + ASSERT_EQ(-1, id1.parse("1.1.1.1:1000:0:2")); + ASSERT_EQ(0, id1.parse("1.1.1.1:1000")); LOG(INFO) << "id:" << id1.to_string(); LOG(INFO) << "id:" << id1; From e7c14e509c8199c1f098a08672188cccc2196c7b Mon Sep 17 00:00:00 2001 From: lintanghui Date: Thu, 27 Apr 2023 14:32:23 +0800 Subject: [PATCH 11/19] fix format --- BUILD | 1 - src/braft/replicator.h | 136 ++++++++++++++++++++++------------------- 2 files changed, 72 insertions(+), 65 deletions(-) diff --git a/BUILD b/BUILD index f38b9231..64d1faf2 100644 --- a/BUILD +++ b/BUILD @@ -1,4 +1,3 @@ -load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library") licenses(["notice"]) exports_files(["LICENSE"]) diff --git a/src/braft/replicator.h b/src/braft/replicator.h index c3cf2e66..6ffd212a 100644 --- a/src/braft/replicator.h +++ b/src/braft/replicator.h @@ -1,11 +1,11 @@ // Copyright (c) 2015 Baidu.com, Inc. All Rights Reserved -// +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at -// +// // http://www.apache.org/licenses/LICENSE-2.0 -// +// // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,17 +16,17 @@ // Wang,Yao(wangyao02@baidu.com) // Xiong,Kai(xiongkai@baidu.com) -#ifndef BRAFT_REPLICATOR_H -#define BRAFT_REPLICATOR_H +#ifndef BRAFT_REPLICATOR_H +#define BRAFT_REPLICATOR_H -#include // brpc::Channel -#include // bthread_id +#include // bthread_id +#include // brpc::Channel -#include "braft/configuration.h" // Configuration -#include "braft/log_manager.h" // LogManager -#include "braft/raft.h" // Closure -#include "braft/raft.pb.h" // AppendEntriesRequest -#include "braft/storage.h" // SnapshotStorage +#include "braft/storage.h" // SnapshotStorage +#include "braft/raft.h" // Closure +#include "braft/configuration.h" // Configuration +#include "braft/raft.pb.h" // AppendEntriesRequest +#include "braft/log_manager.h" // LogManager namespace braft { @@ -40,8 +40,7 @@ class SnapshotThrottle; struct ReplicatorStatus : public butil::RefCountedThreadSafe { butil::atomic last_rpc_send_timestamp; - ReplicatorStatus() : last_rpc_send_timestamp(0) { - } + ReplicatorStatus() : last_rpc_send_timestamp(0) {} }; struct ReplicatorOptions { @@ -53,7 +52,7 @@ struct ReplicatorOptions { PeerId peer_id; LogManager* log_manager; BallotBox* ballot_box; - NodeImpl* node; + NodeImpl *node; int64_t term; SnapshotStorage* snapshot_storage; SnapshotThrottle* snapshot_throttle; @@ -65,13 +64,14 @@ typedef uint64_t ReplicatorId; class CatchupClosure : public Closure { public: virtual void Run() = 0; - protected: - CatchupClosure() : _max_margin(0), _has_timer(false), _error_was_set(false) { - } - + CatchupClosure() + : _max_margin(0) + , _has_timer(false) + , _error_was_set(false) + {} private: - friend class Replicator; +friend class Replicator; int64_t _max_margin; bthread_timer_t _timer; bool _has_timer; @@ -92,9 +92,10 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { static int join(ReplicatorId); // Wait until the margin between |last_log_index| from leader and the peer - // is less than |max_margin| or error occurs. + // is less than |max_margin| or error occurs. // |done| can't be NULL and it is called after waiting fnishies. - static void wait_for_caught_up(ReplicatorId, int64_t max_margin, const timespec* due_time, + static void wait_for_caught_up(ReplicatorId, int64_t max_margin, + const timespec* due_time, CatchupClosure* done); // Tranfer leadership to the very peer if the replicated logs are over @@ -126,7 +127,7 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { // Check if a replicator is readonly static bool readonly(ReplicatorId id); - + private: enum St { IDLE, @@ -159,7 +160,8 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { void _block(long start_time_us, int error_code); void _install_snapshot(); void _start_heartbeat_timer(long start_time_us); - void _send_timeout_now(bool unlock_id, bool old_leader_stepped_down, int timeout_ms = -1); + void _send_timeout_now(bool unlock_id, bool old_leader_stepped_down, + int timeout_ms = -1); int _transfer_leadership(int64_t log_index); void _cancel_append_entries_rpcs(); void _reset_next_index(); @@ -168,17 +170,23 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { } int _change_readonly_config(bool readonly); - static void _on_rpc_returned(ReplicatorId id, brpc::Controller* cntl, - AppendEntriesRequest* request, AppendEntriesResponse* response, - int64_t); + static void _on_rpc_returned( + ReplicatorId id, brpc::Controller* cntl, + AppendEntriesRequest* request, + AppendEntriesResponse* response, + int64_t); - static void _on_heartbeat_returned(ReplicatorId id, brpc::Controller* cntl, - AppendEntriesRequest* request, - AppendEntriesResponse* response, int64_t); + static void _on_heartbeat_returned( + ReplicatorId id, brpc::Controller* cntl, + AppendEntriesRequest* request, + AppendEntriesResponse* response, + int64_t); - static void _on_timeout_now_returned(ReplicatorId id, brpc::Controller* cntl, - TimeoutNowRequest* request, TimeoutNowResponse* response, - bool old_leader_stepped_down); + static void _on_timeout_now_returned( + ReplicatorId id, brpc::Controller* cntl, + TimeoutNowRequest* request, + TimeoutNowResponse* response, + bool old_leader_stepped_down); static void _on_timedout(void* arg); static void* _send_heartbeat(void* arg); @@ -187,11 +195,12 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { static int _continue_sending(void* arg, int error_code); static void* _run_on_caught_up(void*); static void _on_catch_up_timedout(void*); - static void _on_block_timedout(void* arg); - static void* _on_block_timedout_in_new_thread(void* arg); - static void _on_install_snapshot_returned(ReplicatorId id, brpc::Controller* cntl, - InstallSnapshotRequest* request, - InstallSnapshotResponse* response); + static void _on_block_timedout(void *arg); + static void* _on_block_timedout_in_new_thread(void *arg); + static void _on_install_snapshot_returned( + ReplicatorId id, brpc::Controller* cntl, + InstallSnapshotRequest* request, + InstallSnapshotResponse* response); void _destroy(); void _describe(std::ostream& os, bool use_html); void _get_status(PeerStatus* status); @@ -201,7 +210,8 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { if (_next_index < _options.log_manager->first_log_index()) { return false; } - if (_min_flying_index() - 1 + max_margin < _options.log_manager->last_log_index()) { + if (_min_flying_index() - 1 + max_margin + < _options.log_manager->last_log_index()) { return false; } return true; @@ -211,13 +221,12 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { } void _close_reader(); int64_t _last_rpc_send_timestamp() { - return _options.replicator_status->last_rpc_send_timestamp.load( - butil::memory_order_relaxed); + return _options.replicator_status->last_rpc_send_timestamp.load(butil::memory_order_relaxed); } void _update_last_rpc_send_timestamp(int64_t new_timestamp) { if (new_timestamp > _last_rpc_send_timestamp()) { - _options.replicator_status->last_rpc_send_timestamp.store(new_timestamp, - butil::memory_order_relaxed); + _options.replicator_status->last_rpc_send_timestamp + .store(new_timestamp, butil::memory_order_relaxed); } } @@ -226,13 +235,10 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { int64_t log_index; int entries_size; brpc::CallId call_id; - FlyingAppendEntriesRpc(int64_t index, int size, brpc::CallId id) : - log_index(index), - entries_size(size), - call_id(id) { - } + FlyingAppendEntriesRpc(int64_t index, int size, brpc::CallId id) + : log_index(index), entries_size(size), call_id(id) {} }; - + brpc::Channel _sending_channel; int64_t _next_index; int64_t _flying_append_entries_size; @@ -254,7 +260,7 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { ReplicatorOptions _options; bthread_timer_t _heartbeat_timer; SnapshotReader* _reader; - CatchupClosure* _catchup_closure; + CatchupClosure *_catchup_closure; }; struct ReplicatorGroupOptions { @@ -281,7 +287,7 @@ class ReplicatorGroup { ReplicatorGroup(); ~ReplicatorGroup(); int init(const NodeId& node_id, const ReplicatorGroupOptions&); - + // Add a replicator attached with |peer| // will be a notification when the replicator catches up according to the // arguments. @@ -289,17 +295,17 @@ class ReplicatorGroup { // immediately, annd might call node->step_down which might have race with // the caller, you should deal with this situation. int add_replicator(const PeerId& peer); - + // wait the very peer catchup - int wait_caughtup(const PeerId& peer, int64_t max_margin, const timespec* due_time, - CatchupClosure* done); + int wait_caughtup(const PeerId& peer, int64_t max_margin, + const timespec* due_time, CatchupClosure* done); int64_t last_rpc_send_timestamp(const PeerId& peer); // Stop all the replicators int stop_all(); - int stop_replicator(const PeerId& peer); + int stop_replicator(const PeerId &peer); // Reset the term of all to-add replicators. // This method is supposed to be called when the very candidate becomes the @@ -312,8 +318,8 @@ class ReplicatorGroup { // leader, use new heartbeat_interval, maybe call vote() reset election_timeout // Return 0 on success, -1 otherwise int reset_heartbeat_interval(int new_interval_ms); - - // Reset the interval of election_timeout for replicator, + + // Reset the interval of election_timeout for replicator, // used in rpc's set_timeout_ms int reset_election_timeout_interval(int new_interval_ms); @@ -328,25 +334,26 @@ class ReplicatorGroup { // Stop all the replicators except for the one that we think can be the // candidate of the next leader, which has the largest `last_log_id' among - // peers in |current_conf|. + // peers in |current_conf|. // |candidate| would be assigned to a valid ReplicatorId if we found one and // the caller is responsible for stopping it, or an invalid value if we // found none. // Returns 0 on success and -1 otherwise. int stop_all_and_find_the_next_candidate(ReplicatorId* candidate, const ConfigurationEntry& conf); - + // Find the follower with the most log entries in this group, which is // likely becomes the leader according to the election algorithm of raft. // Returns 0 on success and |peer_id| is assigned with the very peer. // -1 otherwise. - int find_the_next_candidate(PeerId* peer_id, const ConfigurationEntry& conf); + int find_the_next_candidate(PeerId* peer_id, + const ConfigurationEntry& conf); // List all the existing replicators void list_replicators(std::vector* out) const; // List all the existing replicators with PeerId - void list_replicators(std::vector>* out) const; + void list_replicators(std::vector >* out) const; // Change the readonly config for a peer int change_readonly_config(const PeerId& peer, bool readonly); @@ -355,7 +362,8 @@ class ReplicatorGroup { bool readonly(const PeerId& peer) const; private: - int _add_replicator(const PeerId& peer, ReplicatorId* rid); + + int _add_replicator(const PeerId& peer, ReplicatorId *rid); struct ReplicatorIdAndStatus { ReplicatorId id; @@ -368,6 +376,6 @@ class ReplicatorGroup { int _election_timeout_ms; }; -} // namespace braft +} // namespace braft -#endif // BRAFT_REPLICATOR_H +#endif //BRAFT_REPLICATOR_H From 0366072cd14bba034cf0fb94539e9daf5622f901 Mon Sep 17 00:00:00 2001 From: lintanghui Date: Mon, 8 May 2023 11:35:57 +0800 Subject: [PATCH 12/19] forbit install snapshot from wintess --- src/braft/node.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/braft/node.cpp b/src/braft/node.cpp index 70e488eb..d661263f 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -2605,7 +2605,10 @@ void NodeImpl::handle_install_snapshot_request(brpc::Controller* cntl, InstallSnapshotResponse* response, google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); - + if (is_witness()){ + cntl->SetFailed(EINVAL, "Can't not install snapshot from witness node"); + return; + } if (_snapshot_executor == NULL) { cntl->SetFailed(EINVAL, "Not support snapshot"); return; From a9836fad98427cf9c84159c93c41f041672eda45 Mon Sep 17 00:00:00 2001 From: lintanghui Date: Wed, 10 May 2023 15:03:39 +0800 Subject: [PATCH 13/19] fix peerid << rewrite --- src/braft/configuration.h | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/braft/configuration.h b/src/braft/configuration.h index 83d4ba30..36d1257f 100644 --- a/src/braft/configuration.h +++ b/src/braft/configuration.h @@ -35,6 +35,7 @@ typedef std::string GroupId; typedef std::string VersionedGroupId; enum Role { + REPLICA = 0, WITNESS = 1, }; @@ -44,17 +45,18 @@ struct PeerId { int idx; // idx in same addr, default 0 Role role; - PeerId() : idx(0) {} - explicit PeerId(butil::EndPoint addr_) : addr(addr_), idx(0) {} - PeerId(butil::EndPoint addr_, int idx_) : addr(addr_), idx(idx_) {} + PeerId() : idx(0), role(REPLICA) {} + explicit PeerId(butil::EndPoint addr_) : addr(addr_), idx(0), role(REPLICA) {} + PeerId(butil::EndPoint addr_, int idx_) : addr(addr_), idx(idx_), role(REPLICA) {} /*intended implicit*/PeerId(const std::string& str) { CHECK_EQ(0, parse(str)); } - PeerId(const PeerId& id) : addr(id.addr), idx(id.idx) {} + PeerId(const PeerId& id) : addr(id.addr), idx(id.idx), role(id.role) {} void reset() { addr.ip = butil::IP_ANY; addr.port = 0; idx = 0; + role = REPLICA; } bool is_empty() const { @@ -73,6 +75,7 @@ struct PeerId { } role = (Role)value; if (role > WITNESS) { + reset(); return -1; } if (0 != butil::str2ip(ip_str, &addr.ip)) { @@ -108,7 +111,7 @@ inline bool operator!=(const PeerId& id1, const PeerId& id2) { } inline std::ostream& operator << (std::ostream& os, const PeerId& id) { - return os << id.addr << ':' << id.idx; + return os << id.addr << ':' << id.idx << ':' << int(id.role); } struct NodeId { From 6e579d72c51adad388ef4bb2f0f0517c691224b7 Mon Sep 17 00:00:00 2001 From: lintanghui Date: Fri, 19 May 2023 15:43:11 +0800 Subject: [PATCH 14/19] add witness test --- src/braft/node.cpp | 4 -- src/braft/replicator.cpp | 12 ++-- test/test_node.cpp | 119 ++++++++++++++++++++++++++++++++++++++- test/util.h | 37 ++++++++++-- 4 files changed, 154 insertions(+), 18 deletions(-) diff --git a/src/braft/node.cpp b/src/braft/node.cpp index d661263f..a2e0f783 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -2605,10 +2605,6 @@ void NodeImpl::handle_install_snapshot_request(brpc::Controller* cntl, InstallSnapshotResponse* response, google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); - if (is_witness()){ - cntl->SetFailed(EINVAL, "Can't not install snapshot from witness node"); - return; - } if (_snapshot_executor == NULL) { cntl->SetFailed(EINVAL, "Not support snapshot"); return; diff --git a/src/braft/replicator.cpp b/src/braft/replicator.cpp index 3a15eb22..3542e061 100644 --- a/src/braft/replicator.cpp +++ b/src/braft/replicator.cpp @@ -761,6 +761,10 @@ void Replicator::_wait_more_entries() { } void Replicator::_install_snapshot() { + NodeImpl *node_impl = _options.node; + if (node_impl->is_witness()) { + return _block(butil::gettimeofday_us(), EBUSY); + } if (_reader) { // follower's readonly mode change may cause two install_snapshot // one possible case is: @@ -1523,18 +1527,12 @@ int ReplicatorGroup::find_the_next_candidate( } const int64_t next_index = Replicator::get_next_index(iter->second.id); const int consecutive_error_times = Replicator::get_consecutive_error_times(iter->second.id); - if (consecutive_error_times == 0 && next_index > max_index) { + if (consecutive_error_times == 0 && next_index > max_index && !iter->first.is_witness()) { max_index = next_index; if (peer_id) { *peer_id = iter->first; } } - // transfer leadership to the non witness peer priority. - if (consecutive_error_times == 0 && next_index == max_index) { - if (peer_id && peer_id->is_witness()) { - *peer_id = iter->first; - } - } } if (max_index == 0) { diff --git a/test/test_node.cpp b/test/test_node.cpp index fc5f14bb..b4858dd2 100644 --- a/test/test_node.cpp +++ b/test/test_node.cpp @@ -39,7 +39,7 @@ class NodeTest : public testing::TestWithParam { void SetUp() { g_dont_print_apply_log = false; //logging::FLAGS_v = 90; - GFLAGS_NS::SetCommandLineOption("minloglevel", "1"); + // GFLAGS_NS::SetCommandLineOption("minloglevel", "1"); GFLAGS_NS::SetCommandLineOption("crash_on_fatal_log", "true"); if (GetParam() == std::string("NoReplication")) { braft::FLAGS_raft_max_parallel_append_entries_rpc_num = 1; @@ -412,6 +412,122 @@ TEST_P(NodeTest, LeaderFail) { cluster.stop_all(); } +TEST_P(NodeTest, LeaderFailWithWitness) { + std::vector peers; + for (int i = 0; i < 3; i++) { + braft::PeerId peer; + peer.addr.ip = butil::my_ip(); + peer.addr.port = 5006 + i; + peer.idx = 0; + if (i == 0) { + peer.role = braft::Role::WITNESS; + } + peers.push_back(peer); + } + + // start cluster + Cluster cluster("unittest", peers); + for (size_t i = 0; i < peers.size(); i++) { + ASSERT_EQ(0, cluster.start(peers[i].addr, false, 30, nullptr, peers[i].is_witness())); + } + + // elect leader + cluster.wait_leader(); + braft::Node* leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + LOG(WARNING) << "leader is " << leader->node_id(); + + // apply something + bthread::CountdownEvent cond(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // stop leader + butil::EndPoint old_leader = leader->node_id().peer_id.addr; + LOG(WARNING) << "stop leader " << leader->node_id(); + cluster.stop(leader->node_id().peer_id.addr); + + // apply something when follower + std::vector nodes; + cluster.followers(&nodes); + cond.reset(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "follower apply: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, -1); + // node 0 is witness; + nodes[1]->apply(task); + } + cond.wait(); + + // elect new leader + cluster.wait_leader(); + leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + LOG(WARNING) << "elect new leader " << leader->node_id(); + + // apply something + cond.reset(10); + for (int i = 10; i < 20; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // old leader restart + ASSERT_EQ(0, cluster.start(old_leader)); + LOG(WARNING) << "restart old leader " << old_leader; + + // apply something + cond.reset(10); + for (int i = 20; i < 30; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // stop and clean old leader + LOG(WARNING) << "stop old leader " << old_leader; + cluster.stop(old_leader); + LOG(WARNING) << "clean old leader data " << old_leader; + cluster.clean(old_leader); + + sleep(2); + // restart old leader + ASSERT_EQ(0, cluster.start(old_leader)); + LOG(WARNING) << "restart old leader " << old_leader; + + cluster.ensure_same(); + + cluster.stop_all(); +} + TEST_P(NodeTest, JoinNode) { std::vector peers; braft::PeerId peer0; @@ -3381,6 +3497,7 @@ INSTANTIATE_TEST_CASE_P(NodeTestWithPipelineReplication, int main(int argc, char* argv[]) { ::testing::AddGlobalTestEnvironment(new TestEnvironment()); ::testing::InitGoogleTest(&argc, argv); + GFLAGS_NS::SetCommandLineOption("minloglevel", "1"); GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); return RUN_ALL_TESTS(); } diff --git a/test/util.h b/test/util.h index e6984920..aa992f43 100644 --- a/test/util.h +++ b/test/util.h @@ -17,6 +17,7 @@ #ifndef PUBLIC_RAFT_TEST_UTIL_H #define PUBLIC_RAFT_TEST_UTIL_H +#include #include "braft/node.h" #include "braft/enum.pb.h" #include "braft/errno.pb.h" @@ -25,20 +26,27 @@ using namespace braft; bool g_dont_print_apply_log = false; - +namespace braft { +DECLARE_bool(raft_enable_witness_to_leader); +} class MockFSM : public braft::StateMachine { public: - MockFSM(const butil::EndPoint& address_) + MockFSM(const butil::EndPoint& address_): + MockFSM(address_,false) { + } + MockFSM(const butil::EndPoint& address_, bool witness) : address(address_) , applied_index(0) , snapshot_index(0) , _on_start_following_times(0) , _on_stop_following_times(0) + , _witness(witness) , _leader_term(-1) , _on_leader_start_closure(NULL) { pthread_mutex_init(&mutex, NULL); } + virtual ~MockFSM() { pthread_mutex_destroy(&mutex); } @@ -50,6 +58,7 @@ class MockFSM : public braft::StateMachine { int64_t snapshot_index; int64_t _on_start_following_times; int64_t _on_stop_following_times; + bool _witness = false; volatile int64_t _leader_term; braft::Closure* _on_leader_start_closure; @@ -82,6 +91,10 @@ class MockFSM : public braft::StateMachine { virtual void on_apply(braft::Iterator& iter) { for (; iter.valid(); iter.next()) { + if (_witness && !FLAGS_raft_enable_witness_to_leader) { + LOG(INFO) << "addr " << address << " skip witness apply " << iter.index(); + continue; + } LOG_IF(INFO, !g_dont_print_apply_log) << "addr " << address << " apply " << iter.index() << " data_size " << iter.data().size(); @@ -233,7 +246,7 @@ class Cluster { int start(const butil::EndPoint& listen_addr, bool empty_peers = false, int snapshot_interval_s = 30, - braft::Closure* leader_start_closure = NULL) { + braft::Closure* leader_start_closure = NULL, bool witness = false) { if (_server_map[listen_addr] == NULL) { brpc::Server* server = new brpc::Server(); if (braft::add_service(server, listen_addr) != 0 @@ -246,13 +259,14 @@ class Cluster { } braft::NodeOptions options; + options.witness = witness; options.election_timeout_ms = _election_timeout_ms; options.max_clock_drift_ms = _max_clock_drift_ms; options.snapshot_interval_s = snapshot_interval_s; if (!empty_peers) { options.initial_conf = braft::Configuration(_peers); } - MockFSM* fsm = new MockFSM(listen_addr); + MockFSM* fsm = new MockFSM(listen_addr, witness); if (leader_start_closure) { fsm->set_on_leader_start_closure(leader_start_closure); } @@ -424,11 +438,22 @@ class Cluster { LOG(INFO) << "_fsms.size()=" << _fsms.size(); int nround = 0; - MockFSM* first = _fsms[0]; + MockFSM* first = nullptr; + // get first normal fsm when raft_enable_witness_to_leader false + for (size_t i = 1; i < _fsms.size(); i++) { + if (_fsms[i]->_witness && !FLAGS_raft_enable_witness_to_leader) { + continue; + } + first = _fsms[i]; + break; + } CHECK: first->lock(); - for (size_t i = 1; i < _fsms.size(); i++) { + for (size_t i = 0; i < _fsms.size(); i++) { MockFSM* fsm = _fsms[i]; + if ((fsm->_witness && !FLAGS_raft_enable_witness_to_leader) || fsm->address == first->address) { + continue; + } fsm->lock(); if (first->logs.size() != fsm->logs.size()) { From 835ff9e244ea9fd19ca90a41e73260d1f876f0e1 Mon Sep 17 00:00:00 2001 From: lintanghui Date: Fri, 19 May 2023 17:37:05 +0800 Subject: [PATCH 15/19] add test of witness to be leader --- src/braft/configuration.h | 8 +++- test/test_node.cpp | 80 +++++++++++++++++++++++++++++++++++++++ test/util.h | 4 +- 3 files changed, 89 insertions(+), 3 deletions(-) diff --git a/src/braft/configuration.h b/src/braft/configuration.h index 36d1257f..49987a27 100644 --- a/src/braft/configuration.h +++ b/src/braft/configuration.h @@ -43,11 +43,17 @@ enum Role { struct PeerId { butil::EndPoint addr; // ip+port. int idx; // idx in same addr, default 0 - Role role; + Role role = REPLICA; PeerId() : idx(0), role(REPLICA) {} explicit PeerId(butil::EndPoint addr_) : addr(addr_), idx(0), role(REPLICA) {} PeerId(butil::EndPoint addr_, int idx_) : addr(addr_), idx(idx_), role(REPLICA) {} + PeerId(butil::EndPoint addr_, int idx_, bool witness) : addr(addr_), idx(idx_) { + if (witness) { + this->role = WITNESS; + } + } + /*intended implicit*/PeerId(const std::string& str) { CHECK_EQ(0, parse(str)); } PeerId(const PeerId& id) : addr(id.addr), idx(id.idx), role(id.role) {} diff --git a/test/test_node.cpp b/test/test_node.cpp index b4858dd2..407e16af 100644 --- a/test/test_node.cpp +++ b/test/test_node.cpp @@ -23,6 +23,8 @@ extern bvar::Adder g_num_nodes; DECLARE_int32(raft_max_parallel_append_entries_rpc_num); DECLARE_bool(raft_enable_append_entries_cache); DECLARE_int32(raft_max_append_entries_cache_size); +DECLARE_bool(raft_enable_witness_to_leader); + } using braft::raft_mutex_t; @@ -1923,6 +1925,84 @@ TEST_P(NodeTest, leader_transfer) { ASSERT_TRUE(cluster.ensure_same(5)); cluster.stop_all(); } +TEST_P(NodeTest, leader_witness_temporary_be_leader) { + FLAGS_raft_enable_witness_to_leader = true; + std::vector peers; + for (int i = 0; i < 3; i++) { + braft::PeerId peer; + peer.addr.ip = butil::my_ip(); + peer.addr.port = 5006 + i; + peer.idx = 0; + if (i == 0) { + peer.role = braft::Role::WITNESS; + } + peers.push_back(peer); + } + // start cluster + Cluster cluster("unittest", peers, 5000); + for (size_t i = 0; i < peers.size(); i++) { + ASSERT_EQ(0, cluster.start(peers[i].addr, false, 30, nullptr, peers[i].is_witness())); + } + + // elect leader + cluster.wait_leader(); + braft::Node* leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + LOG(WARNING) << "leader is " << leader->node_id(); + std::vector nodes; + cluster.followers(&nodes); + + // stop follower so witness would had more entry logs than follower + braft::Node* follower_node = nodes[1]; + braft::PeerId follower = follower_node->node_id().peer_id; + cluster.stop(follower.addr); + // apply something + bthread::CountdownEvent cond(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // stop leader + butil::EndPoint old_leader = leader->node_id().peer_id.addr; + LOG(WARNING) << "stop leader " << leader->node_id(); + cluster.stop(leader->node_id().peer_id.addr); + + // old follower restart + ASSERT_EQ(0, cluster.start(follower.addr)); + LOG(WARNING) << "restart old follower " << follower.addr; + + // elect leader + cluster.wait_leader(); + leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + LOG(WARNING) << "leader is " << leader->node_id(); + // wait witness auto step_down and transfer leader. + while (true) { + if (leader->is_leader()) { + usleep(1000* 1000); + continue; + } + break; + } + cluster.wait_leader(); + leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + LOG(WARNING) << "leader is " << leader->node_id(); + + cluster.start(old_leader); + LOG(WARNING) << "restart old leader " << old_leader; + cluster.ensure_same(); + + cluster.stop_all(); +} TEST_P(NodeTest, leader_transfer_before_log_is_compleleted) { std::vector peers; diff --git a/test/util.h b/test/util.h index aa992f43..5cbcf241 100644 --- a/test/util.h +++ b/test/util.h @@ -284,14 +284,14 @@ class Cluster { options.catchup_margin = 2; - braft::Node* node = new braft::Node(_name, braft::PeerId(listen_addr, 0)); + braft::Node* node = new braft::Node(_name, braft::PeerId(listen_addr, 0, witness)); int ret = node->init(options); if (ret != 0) { LOG(WARNING) << "init_node failed, server: " << listen_addr; delete node; return ret; } else { - LOG(INFO) << "init node " << listen_addr; + LOG(INFO) << "init node " << listen_addr << " witness " << witness;; } { From de0604b2087d559f4cdaff4e5a67e41fafee23e0 Mon Sep 17 00:00:00 2001 From: lintanghui Date: Tue, 6 Jun 2023 18:11:44 +0800 Subject: [PATCH 16/19] add witness doc --- docs/cn/witness.md | 27 +++++++++++++++++++++++++++ src/braft/configuration.h | 2 +- src/braft/node.cpp | 7 ++++--- src/braft/raft.h | 7 ++++--- 4 files changed, 36 insertions(+), 7 deletions(-) create mode 100644 docs/cn/witness.md diff --git a/docs/cn/witness.md b/docs/cn/witness.md new file mode 100644 index 00000000..889a8412 --- /dev/null +++ b/docs/cn/witness.md @@ -0,0 +1,27 @@ +witness 副本只作为仲裁者进行投票,不保存实际的业务数据。 +## 实现方案 +对于witness的实现,需要考虑部署方式。 对于2+1部署,如果不允许witness当选选主,那么当主节点异常宕机的时候,如果wintess拥有比另外一个副本更新的entry,那么会导致选主失败,为了提高可用性,需要考虑允许witness短时间内允许当选为主,wintess成为主以后再主动transfer leader给另一个副本。**通过允许witness临时成为主可以提高系统的可用性** 。 + +对于4+1 的部署方式,实现相对简单,只需要让witness不能当选为主即可,因为即便主节点故障,依然至少有一个副本拥有最新的entry从而可以当选为主。由于witness不能当选为主,因此在同步raft log的时候也可以不需要同步log data给witness。当4+1部署的时候,如果不允许witness当选为主,那么最多只能容忍一个节点故障,如果允许witness临时当选为主,那么可以容忍两个节点故障。允许witness当选为主时,实现 +则与2+1部署一致。 + +## 详细实现 +### witness不允许当选为主 +当witness不允许当选为主时,只需要在初始化Node的时候禁止election_timeout timer进行初始化即可,同时可以无需进行data复制。 + +### witness允许临时当选为主 +允许witness当选为主可以提升服务的可用性。具体实现为: +* 设置raft_enable_witness_to_leader flag为true,允许witness临时选举为主 +* election_timeout设置为正常节点的两倍,在主节点异常宕机的时候,允许witness发起选主,同时由于election_timeout比数据副本大,可以保证数据副本优先被选为主,只有数据副本选主失败时,witness才会主动发起选主。 +* witness当选为主时,禁止安装快照请求,避免从节点获取到空快照覆盖原有的业务数据 +* 新增witness副本时, witness向leader发送install sanpshot请求,如果replicator本身是witness,则无需进行data文件的复制,只需复制最新的entry即可。 + +## witness 使用注意事项 +* 如果不允许witness当选为主时,相比原有raft方式部署,服务可用性会明显降低 +* 当允许witness临时当选为主时,极端情况下,可能导致从节点无法获取到最新的log entry从而导致数据丢失。 +例如: +``` +2+1的时候,日志为 [1, 8],某一时刻 replica1(leader) [1, 8] replica2 [1, 5] witness[4,8]。witness snapshot save,truncate 数据到 [7,8]。replica1(leader) 挂了,replica2 [1, 5] 和 witness 的数据接不上了, 此时会导致日志丢失。 +``` +用户在使用witness的时候,需要评估witness带来的可用性降低以及可能丢失部分最新数据的风险。 +如果业务无法接受数据丢失,可以自定义实现LogStorage, 只有半数以上副本拥有entry时,witness才能truncate 该entry之前的log。 diff --git a/src/braft/configuration.h b/src/braft/configuration.h index 49987a27..310539ae 100644 --- a/src/braft/configuration.h +++ b/src/braft/configuration.h @@ -74,7 +74,7 @@ struct PeerId { int parse(const std::string& str) { reset(); char ip_str[64]; - int value = 0; + int value = REPLICA; if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d%*[:]%d", ip_str, &addr.port, &idx, &value)) { reset(); return -1; diff --git a/src/braft/node.cpp b/src/braft/node.cpp index a2e0f783..2426fe25 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -68,7 +68,7 @@ BRPC_VALIDATE_GFLAG(raft_rpc_channel_connect_timeout_ms, brpc::PositiveInteger); DECLARE_bool(raft_enable_leader_lease); DEFINE_bool(raft_enable_witness_to_leader, false, -"enabel witness temporarily to become leader when leader down accidently"); + "enable witness temporarily to become leader when leader down accidently"); #ifndef UNIT_TEST static bvar::Adder g_num_nodes("raft_node_count"); @@ -506,8 +506,9 @@ int NodeImpl::init(const NodeOptions& options) { return -1; } if (options.witness) { - // if node is witness, set timer twice as data replication node, - // which guarantees data replica will be elected as leader priority。 + // When this node is a witness, set the election_timeout to be twice + // of the normal replica to ensure that the normal replica has a higher + // priority and is selected as the master if (FLAGS_raft_enable_witness_to_leader) { CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms * 2)); CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms * 2 + options.max_clock_drift_ms)); diff --git a/src/braft/raft.h b/src/braft/raft.h index eb3c10a3..90a6e02a 100644 --- a/src/braft/raft.h +++ b/src/braft/raft.h @@ -589,9 +589,10 @@ struct NodeOptions { // Default: false bool disable_cli; - // if true, this node is a witness node, when raft_enable_witness_to_leader is false, - // witness will never to be leader. so not need to init _vote_timer and _election_timer. - // if raft_enable_witness_to_leader is true, witness can be leader, but should transfer leader as soon as possible. + // if true, this node is a witness, when FLAGS_raft_enable_witness_to_leader is false, + // it will never be elected as leader. so we don't need to init _vote_timer and _election_timer. + // if FLAGS_raft_enable_witness_to_leader is true, it can be electd as leader, + // but should transfer leader to normal replica as soon as possible. // Default: false bool witness = false; // Construct a default instance From 1e3e6f7b8dd8b94b4bcda0be1050f98032fca5f8 Mon Sep 17 00:00:00 2001 From: lintanghui Date: Thu, 8 Jun 2023 11:35:13 +0800 Subject: [PATCH 17/19] fix uiint test --- test/test_meta.cpp | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/test/test_meta.cpp b/test/test_meta.cpp index 47433b74..903a645a 100644 --- a/test/test_meta.cpp +++ b/test/test_meta.cpp @@ -269,7 +269,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_upgrade) { st = storage->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("2.2.2.2:2000:0", peer_bak.to_string()); + ASSERT_EQ("2.2.2.2:2000:0:0", peer_bak.to_string()); } { // _merged_impl already catch up data after Mixed first load @@ -278,7 +278,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_upgrade) { st = tmp->_merged_impl->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("2.2.2.2:2000:0", peer_bak.to_string()); + ASSERT_EQ("2.2.2.2:2000:0:0", peer_bak.to_string()); } // test double write @@ -294,14 +294,14 @@ TEST_F(TestUsageSuits, mixed_stable_storage_upgrade) { st = tmp->_single_impl->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("3.3.3.3:3000:3", peer_bak.to_string()); + ASSERT_EQ("3.3.3.3:3000:3:0", peer_bak.to_string()); term_bak = 0; peer_bak.reset(); st = tmp->_merged_impl->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("3.3.3.3:3000:3", peer_bak.to_string()); + ASSERT_EQ("3.3.3.3:3000:3:0", peer_bak.to_string()); } delete storage; @@ -325,7 +325,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_upgrade) { st = storage->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("3.3.3.3:3000:3", peer_bak.to_string()); + ASSERT_EQ("3.3.3.3:3000:3:0", peer_bak.to_string()); } // test merged stable storage alone { @@ -340,7 +340,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_upgrade) { st = storage->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("4.4.4.4:4000:4", peer_bak.to_string()); + ASSERT_EQ("4.4.4.4:4000:4:0", peer_bak.to_string()); } delete storage; } @@ -454,7 +454,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_downgrade) { st = storage->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("2.2.2.2:2000:0", peer_bak.to_string()); + ASSERT_EQ("2.2.2.2:2000:0:0", peer_bak.to_string()); } { // _single_impl already catch up data after Mixed first load @@ -463,7 +463,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_downgrade) { st = tmp->_single_impl->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("2.2.2.2:2000:0", peer_bak.to_string()); + ASSERT_EQ("2.2.2.2:2000:0:0", peer_bak.to_string()); } // test double write @@ -479,14 +479,14 @@ TEST_F(TestUsageSuits, mixed_stable_storage_downgrade) { st = tmp->_single_impl->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("3.3.3.3:3000:3", peer_bak.to_string()); + ASSERT_EQ("3.3.3.3:3000:3:0", peer_bak.to_string()); term_bak = 0; peer_bak.reset(); st = tmp->_merged_impl->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("3.3.3.3:3000:3", peer_bak.to_string()); + ASSERT_EQ("3.3.3.3:3000:3:0", peer_bak.to_string()); } delete storage; @@ -510,7 +510,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_downgrade) { st = storage->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("3.3.3.3:3000:3", peer_bak.to_string()); + ASSERT_EQ("3.3.3.3:3000:3:0", peer_bak.to_string()); } // test single stable storage alone { @@ -525,7 +525,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_downgrade) { st = storage->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("4.4.4.4:4000:4", peer_bak.to_string()); + ASSERT_EQ("4.4.4.4:4000:4:0", peer_bak.to_string()); } delete storage; } From feb5ac4be26a5e27c8bce21fa2378f0d3cac4a6a Mon Sep 17 00:00:00 2001 From: lintanghui Date: Mon, 17 Jul 2023 13:42:07 +0800 Subject: [PATCH 18/19] fix format --- src/braft/node.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/braft/node.cpp b/src/braft/node.cpp index 2426fe25..b5ba39eb 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -513,7 +513,7 @@ int NodeImpl::init(const NodeOptions& options) { CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms * 2)); CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms * 2 + options.max_clock_drift_ms)); } - }else { + } else { CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms)); CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms + options.max_clock_drift_ms)); } From d260d29da8430eb674a6113603802f3bfbbb3991 Mon Sep 17 00:00:00 2001 From: lintanghui Date: Thu, 20 Jul 2023 10:58:45 +0800 Subject: [PATCH 19/19] add warning info when use witness --- src/braft/raft.h | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/braft/raft.h b/src/braft/raft.h index 90a6e02a..8a7f5d8e 100644 --- a/src/braft/raft.h +++ b/src/braft/raft.h @@ -589,10 +589,18 @@ struct NodeOptions { // Default: false bool disable_cli; - // if true, this node is a witness, when FLAGS_raft_enable_witness_to_leader is false, - // it will never be elected as leader. so we don't need to init _vote_timer and _election_timer. - // if FLAGS_raft_enable_witness_to_leader is true, it can be electd as leader, - // but should transfer leader to normal replica as soon as possible. + // If true, this node is a witness. + // 1. FLAGS_raft_enable_witness_to_leader = false + // It will never be elected as leader. So we don't need to init _vote_timer and _election_timer. + // 2. FLAGS_raft_enable_witness_to_leader = true + // It can be electd as leader, but should transfer leader to normal replica as soon as possible. + // + // Warning: + // 1. FLAGS_raft_enable_witness_to_leader = false + // When leader down and witness had newer log entry, it may cause leader election fail. + // 2. FLAGS_raft_enable_witness_to_leader = true + // When leader shutdown and witness was elected as leader, if follower delay over one snapshot, + // it may cause data lost because witness had truncated log entry before snapshot. // Default: false bool witness = false; // Construct a default instance