Skip to content

Commit

Permalink
[feat][store] Executor add leader_id info
Browse files Browse the repository at this point in the history
  • Loading branch information
visualYJD authored and ketor committed Sep 12, 2024
1 parent 563c4aa commit dcd0cfe
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 2 deletions.
2 changes: 2 additions & 0 deletions src/client_v2/pretty.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,7 @@ void Pretty::Show(dingodb::pb::coordinator::GetExecutorMapResponse& response) {
ftxui::paragraph("CreateTime"),
ftxui::paragraph("LastSeenTime"),
ftxui::paragraph("ClusterName"),
ftxui::paragraph("LeaderId"),
}};
const auto& executor_map = response.executormap().executors();
if (response.executormap().executors_size() == 0) {
Expand All @@ -1394,6 +1395,7 @@ void Pretty::Show(dingodb::pb::coordinator::GetExecutorMapResponse& response) {
ftxui::paragraph(dingodb::Helper::FormatMsTime(executor.create_timestamp())),
ftxui::paragraph(dingodb::Helper::FormatMsTime(executor.last_seen_timestamp())),
ftxui::paragraph(fmt::format("{}", executor.cluster_name())),
ftxui::paragraph(fmt::format("{}", executor.leader_id())),
};
rows.push_back(row);
}
Expand Down
4 changes: 3 additions & 1 deletion src/coordinator/coordinator_control_coor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4049,12 +4049,13 @@ int64_t CoordinatorControl::UpdateExecutorMap(const pb::common::Executor& execut
// raft_location & state

// only update server_location & raft_location & state &
// last_seen_timestamp
// last_seen_timestamp & cluster_name & leaeder_id
*(executor_increment_executor->mutable_server_location()) = executor.server_location();
executor_increment_executor->set_state(pb::common::ExecutorState::EXECUTOR_NORMAL);
executor_increment_executor->set_last_seen_timestamp(butil::gettimeofday_ms());
executor_increment_executor->set_create_timestamp(butil::gettimeofday_ms());
executor_increment_executor->set_cluster_name(executor.cluster_name());
executor_increment_executor->set_leader_id(executor.leader_id());
} else {
// this is normall heartbeat,
// so only need to update state & last_seen_timestamp, no need to
Expand All @@ -4078,6 +4079,7 @@ int64_t CoordinatorControl::UpdateExecutorMap(const pb::common::Executor& execut
executor_to_update.set_state(pb::common::ExecutorState::EXECUTOR_NORMAL);
executor_to_update.set_last_seen_timestamp(butil::gettimeofday_ms());
executor_to_update.set_cluster_name(executor.cluster_name());
executor_to_update.set_leader_id(executor.leader_id());
executor_map_.Put(executor.id(), executor_to_update);
}
} else {
Expand Down
3 changes: 3 additions & 0 deletions src/server/cluster_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ void ClusterStatImpl::PrintExecutors(std::ostream& os, bool use_html) {
table_header.push_back("CREATE_TIME");
table_header.push_back("UPDATE_TIME");
table_header.push_back("CLUSTER_NAME");
table_header.push_back("LEADER_ID");

std::vector<int32_t> min_widths;

Expand All @@ -164,6 +165,7 @@ void ClusterStatImpl::PrintExecutors(std::ostream& os, bool use_html) {
min_widths.push_back(20); // CREATE_TIME
min_widths.push_back(30); // UPDATE_TIME
min_widths.push_back(30); // CLUSTER_NAME
min_widths.push_back(10); // LEADER_ID
std::vector<std::vector<std::string>> table_contents;
std::vector<std::vector<std::string>> table_urls;

Expand All @@ -187,6 +189,7 @@ void ClusterStatImpl::PrintExecutors(std::ostream& os, bool use_html) {
line.push_back(Helper::FormatMsTime(executor.create_timestamp(), "%Y-%m-%d %H:%M:%S"));
line.push_back(Helper::FormatMsTime(executor.last_seen_timestamp(), "%Y-%m-%d %H:%M:%S"));
line.push_back(executor.cluster_name());
line.push_back(executor.leader_id());
table_contents.push_back(line);
}

Expand Down
3 changes: 2 additions & 1 deletion src/store/heartbeat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ void CoordinatorUpdateStateTask::CoordinatorUpdateState(std::shared_ptr<Coordina

int64_t cluster_id = 1;
pb::coordinator_internal::MetaIncrement meta_increment;

coordinator_control->GetExecutorMap(executor_map_temp, cluster_name);
for (const auto& it : executor_map_temp.executors()) {
if (it.state() == pb::common::ExecutorState::EXECUTOR_NORMAL) {
Expand All @@ -411,12 +412,12 @@ void CoordinatorUpdateStateTask::CoordinatorUpdateState(std::shared_ptr<Coordina
if (it.state() == pb::common::ExecutorState::EXECUTOR_OFFLINE) {
if (it.last_seen_timestamp() + (FLAGS_executor_delete_after_heartbeat_timeout * 1000) <
butil::gettimeofday_ms()) {
DINGO_LOG(INFO) << "CoordinatorUpdateState... delete executor " << it.id();
auto status = coordinator_control->DeleteExecutor(cluster_id, it, meta_increment);
if (!status.ok()) {
DINGO_LOG(WARNING) << "CoordinatorUpdateState... delete executor " << it.id()
<< " failed, error_msg:" << status.error_str();
}
DINGO_LOG(INFO) << "CoordinatorUpdateState... delete executor " << it.id();
continue;
}
}
Expand Down

0 comments on commit dcd0cfe

Please sign in to comment.