Skip to content

Commit

Permalink
supports linearly consistent reading based on LeaseBased and CheckQuorum
Browse files Browse the repository at this point in the history
--story=115077370
  • Loading branch information
libaszhang committed Jan 2, 2024
1 parent f4d8f56 commit fd9c554
Show file tree
Hide file tree
Showing 19 changed files with 1,548 additions and 17 deletions.
16 changes: 11 additions & 5 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,17 @@ http_archive(
url = "https://github.com/google/leveldb/archive/a53934a3ae1244679f812d998a4f16f2c7f309a6.tar.gz"
)

http_archive(
name = "com_github_brpc_brpc",
sha256 = "c0168d22205724bfa1939c9ad79bd9f74a98e0bd05be9e8f5cc504ef44c676a1",
strip_prefix = "incubator-brpc-1.0.0-rc02",
url = "https://github.com/apache/incubator-brpc/archive/refs/tags/1.0.0-rc02.tar.gz"
#http_archive(
# name = "com_github_brpc_brpc",
# sha256 = "c0168d22205724bfa1939c9ad79bd9f74a98e0bd05be9e8f5cc504ef44c676a1",
# strip_prefix = "incubator-brpc-1.0.0-rc02",
# url = "https://github.com/apache/incubator-brpc/archive/refs/tags/1.0.0-rc02.tar.gz"
#)

git_repository(
name = "com_github_brpc_brpc",
remote = "https://git.woa.com/elasticfaiss/brpc.git",
branch = "vectordb",
)

bind(
Expand Down
11 changes: 11 additions & 0 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,17 @@ class Configuration {
}
}

bool is_singleton() const {
int voter_num = 0;
std::set<PeerId>::iterator it;
for (it = _peers.begin(); it != _peers.end(); ++it) {
if (!it->is_learner()) {
voter_num++;
}
}
return voter_num == 1;
}

void append_peers(std::set<PeerId>* peers) {
peers->insert(_peers.begin(), _peers.end());
}
Expand Down
2 changes: 2 additions & 0 deletions src/braft/configuration_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ struct ConfigurationEntry {
}
bool contains(const PeerId& peer) const
{ return conf.contains(peer) || old_conf.contains(peer); }
bool is_singleton() const
{ return old_conf.empty() && conf.is_singleton(); }
};

// Manager the history of configuration changing
Expand Down
129 changes: 127 additions & 2 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,32 @@ FSMCaller::FSMCaller()
, _cur_task(IDLE)
, _applying_index(0)
, _queue_started(false)
, _next_wait_id(0)
{
}

FSMCaller::~FSMCaller() {
CHECK(_after_shutdown == NULL);
}

int FSMCaller::notify_apply(void* meta, bthread::TaskIterator<ApplyTask>& iter) {
//FSMCaller* caller = (FSMCaller*)meta;
//if (iter.is_queue_stopped()) {
// return 0;
//}
//for (; iter; ++iter) {
// switch (iter->type) {
// //case APPLIED:
// // caller->wakeup_waiters(iter->applied_index);
// // break;
// default:
// CHECK(false) << "Can't reach here";
// break;
// };
//}
return 0;
}

int FSMCaller::run(void* meta, bthread::TaskIterator<ApplyTask>& iter) {
FSMCaller* caller = (FSMCaller*)meta;
if (iter.is_queue_stopped()) {
Expand Down Expand Up @@ -124,6 +143,7 @@ int FSMCaller::run(void* meta, bthread::TaskIterator<ApplyTask>& iter) {
caller->_cur_task = ERROR;
caller->do_on_error((OnErrorClousre*)iter->done);
break;
//case APPLIED:
case IDLE:
CHECK(false) << "Can't reach here";
break;
Expand Down Expand Up @@ -170,11 +190,24 @@ int FSMCaller::init(const FSMCallerOptions &options) {
if (_node) {
_node->AddRef();
}


//if (_id_and_closure_wait_map.init(16) != 0) {
// LOG(ERROR) << "Fail to init _id_and_closure_wait_map";
// return -1;
//}

bthread::ExecutionQueueOptions execq_opt;
execq_opt.bthread_attr = options.usercode_in_pthread
? BTHREAD_ATTR_PTHREAD
: BTHREAD_ATTR_NORMAL;
//if (bthread::execution_queue_start(&_apply_queue_id,
// &execq_opt,
// FSMCaller::notify_apply,
// this) != 0) {
// LOG(ERROR) << "fsm fail to start execution_queue for apply";
// return -1;
//}

if (bthread::execution_queue_start(&_queue_id,
&execq_opt,
FSMCaller::run,
Expand All @@ -187,8 +220,18 @@ int FSMCaller::init(const FSMCallerOptions &options) {
}

int FSMCaller::shutdown() {
int res = 0;
if (_queue_started) {
return bthread::execution_queue_stop(_queue_id);
//res = bthread::execution_queue_stop(_apply_queue_id);
//if (res) {
// LOG(ERROR) << "fsm fail to stop apply queue";
// return res;
//}
res = bthread::execution_queue_stop(_queue_id);
if (res) {
LOG(ERROR) << "fsm fail to stop noraml queue";
return res;
}
}
return 0;
}
Expand All @@ -215,6 +258,14 @@ int FSMCaller::on_committed(int64_t committed_index) {
return bthread::execution_queue_execute(_queue_id, t);
}

int FSMCaller::on_applied(const int64_t applied_index) {
//ApplyTask t;
//t.type = APPLIED;
//t.applied_index = applied_index;
return 0;
//return bthread::execution_queue_execute(_apply_queue_id, t);
}

class OnErrorClousre : public Closure {
public:
OnErrorClousre(const Error& e) : _e(e) {
Expand Down Expand Up @@ -316,6 +367,7 @@ void FSMCaller::do_committed(int64_t committed_index) {
_last_applied_index.store(committed_index, butil::memory_order_release);
_last_applied_term = last_term;
_log_manager->set_applied_id(last_applied_id);
//on_applied(committed_index);
}

int FSMCaller::on_snapshot_save(SaveSnapshotClosure* done) {
Expand Down Expand Up @@ -545,11 +597,84 @@ int64_t FSMCaller::applying_index() const {

void FSMCaller::join() {
if (_queue_started) {
//bthread::execution_queue_join(_apply_queue_id);
bthread::execution_queue_join(_queue_id);
_queue_started = false;
}
}

FSMCaller::WaitId FSMCaller::wait_on_apply(LocalReadIndexClosure* done) {
WaitId wait_id = -1;
//int64_t last_applied_index = _last_applied_index.load(
// butil::memory_order_relaxed);
//if (last_applied_index < done->index()) {
// BAIDU_SCOPED_LOCK(_wait_mutex);
// if (_next_wait_id == 0) {
// ++_next_wait_id;
// }
// wait_id = _next_wait_id++;
// WaitIdAndClosure wac;
// wac.wait_id = wait_id;
// wac.done = done;

// _id_and_closure_wait_map[wait_id] = wac;
// _index_and_closure_wait_map[done->index()].push_back(wac);
//} else {
// run_closure_in_bthread(done);
//}
return wait_id;
}

int FSMCaller::remove_waiter(WaitId id) {
return 0;
//WaitIdAndClosure wac;
//{
// BAIDU_SCOPED_LOCK(_wait_mutex);
// WaitIdAndClosure* pwac = _id_and_closure_wait_map.seek(id);
// if (pwac) {
// wac = *pwac;
// _id_and_closure_wait_map.erase(id);
// IndexAndClosureListMap::iterator iter = _index_and_closure_wait_map.find(wac.done->index());
// if (iter == _index_and_closure_wait_map.end()) {
// LOG(FATAL) << "unexpected wait map";
// }
// iter->second.remove_if([&wac] (const WaitIdAndClosure& item) {
// return item.wait_id == wac.wait_id;
// });
// }
//}
//if (wac.done) {
// wac.done->status().set_error(EIDRM, "The waiter is removed");
// run_closure_in_bthread(wac.done);
//}
//return wac.done != NULL ? 0 : -1;
}

void FSMCaller::wakeup_waiters(const int64_t apply_index) {
//static int64_t total_cost = 0;
//int64_t start = butil::cpuwide_time_us();

//WaitIdAndClosureList ready_closures;
//{
// BAIDU_SCOPED_LOCK(_wait_mutex);
// for (IndexAndClosureListMap::iterator iter = _index_and_closure_wait_map.begin();
// iter != _index_and_closure_wait_map.end();) {
// if (iter->first > apply_index) {
// break;
// }
// for (WaitIdAndClosureList::const_iterator list_iter = iter->second.begin();
// list_iter != iter->second.end(); ++list_iter) {
// _id_and_closure_wait_map.erase(list_iter->wait_id);
// }
// ready_closures.splice(ready_closures.end(), iter->second);
// iter = _index_and_closure_wait_map.erase(iter);
// }
//}
//for (WaitIdAndClosureList::iterator iter = ready_closures.begin(); iter != ready_closures.end(); ++iter) {
// run_closure_in_bthread(iter->done);
//}
}

IteratorImpl::IteratorImpl(StateMachine* sm, LogManager* lm,
std::vector<Closure*> *closure,
int64_t first_closure_index,
Expand Down
32 changes: 31 additions & 1 deletion src/braft/fsm_caller.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#ifndef BRAFT_FSM_CALLER_H
#define BRAFT_FSM_CALLER_H

#include <list>

#include <butil/macros.h> // BAIDU_CACHELINE_ALIGNMENT
#include <bthread/bthread.h>
#include <bthread/execution_queue.h>
Expand All @@ -27,6 +29,7 @@
#include "braft/macros.h"
#include "braft/log_entry.h"
#include "braft/lease.h"
#include "braft/read_only.h"

namespace braft {

Expand Down Expand Up @@ -105,6 +108,8 @@ class LoadSnapshotClosure : public Closure {

class BAIDU_CACHELINE_ALIGNMENT FSMCaller {
public:
typedef int64_t WaitId;

FSMCaller();
BRAFT_MOCK ~FSMCaller();
int init(const FSMCallerOptions& options);
Expand All @@ -123,13 +128,17 @@ class BAIDU_CACHELINE_ALIGNMENT FSMCaller {
int64_t applying_index() const;
void describe(std::ostream& os, bool use_html);
void join();

WaitId wait_on_apply(LocalReadIndexClosure* done);
int remove_waiter(WaitId id);

private:

friend class IteratorImpl;

enum TaskType {
IDLE,
COMMITTED,
//APPLIED,
SNAPSHOT_SAVE,
SNAPSHOT_LOAD,
LEADER_STOP,
Expand All @@ -153,6 +162,9 @@ friend class IteratorImpl;
union {
// For applying log entry (including configuration change)
int64_t committed_index;

//// For notification
//int64_t applied_index;

// For on_leader_start
LeaderStartContext* leader_start_context;
Expand All @@ -169,7 +181,9 @@ friend class IteratorImpl;
};

static double get_cumulated_cpu_time(void* arg);
static int notify_apply(void* meta, bthread::TaskIterator<ApplyTask>& iter);
static int run(void* meta, bthread::TaskIterator<ApplyTask>& iter);
int on_applied(const int64_t applied_index);
void do_shutdown(); //Closure* done);
void do_committed(int64_t committed_index);
void do_cleared(int64_t log_index, Closure* done, int error_code);
Expand All @@ -182,6 +196,7 @@ friend class IteratorImpl;
void do_stop_following(const LeaderChangeContext& stop_following_context);
void set_error(const Error& e);
bool pass_by_status(Closure* done);
void wakeup_waiters(const int64_t apply_index);

bthread::ExecutionQueueId<ApplyTask> _queue_id;
LogManager *_log_manager;
Expand All @@ -195,6 +210,21 @@ friend class IteratorImpl;
butil::atomic<int64_t> _applying_index;
Error _error;
bool _queue_started;

struct WaitIdAndClosure {
WaitIdAndClosure() : wait_id(-1), done(NULL) {}

WaitId wait_id;
LocalReadIndexClosure* done;
};
typedef std::list<WaitIdAndClosure> WaitIdAndClosureList;
typedef std::map<int64_t, WaitIdAndClosureList> IndexAndClosureListMap;

//bthread::ExecutionQueueId<ApplyTask> _apply_queue_id;
raft_mutex_t _wait_mutex;
butil::FlatMap<WaitId, WaitIdAndClosure> _id_and_closure_wait_map;
IndexAndClosureListMap _index_and_closure_wait_map;
WaitId _next_wait_id;
};

};
Expand Down
Loading

0 comments on commit fd9c554

Please sign in to comment.