Skip to content

Commit

Permalink
tool support copyset snapshot
Browse files Browse the repository at this point in the history
Change-Id: Icf403e4beb5a2fb6532eae8e7de465dd085148cf
  • Loading branch information
bai-charisu authored and ilixiaocui committed Dec 28, 2020
1 parent 050b2ff commit d0360b8
Show file tree
Hide file tree
Showing 21 changed files with 545 additions and 9 deletions.
17 changes: 17 additions & 0 deletions proto/cli2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ message ChangePeersResponse2 {
repeated common.Peer newPeers = 2;
}

message SnapshotRequest2 {
required uint32 logicPoolId = 1;
required uint32 copysetId = 2;
optional common.Peer peer = 3;
};

message SnapshotResponse2 {
}

message SnapshotAllRequest {
};

message SnapshotAllResponse {
};

message GetLeaderRequest2 {
required uint32 logicPoolId = 1;
required uint32 copysetId = 2;
Expand Down Expand Up @@ -97,4 +112,6 @@ service CliService2 {
rpc GetLeader(GetLeaderRequest2) returns (GetLeaderResponse2);
rpc TransferLeader(TransferLeaderRequest2) returns (TransferLeaderResponse2);
rpc ResetPeer(ResetPeerRequest2) returns (ResetPeerResponse2);
rpc Snapshot(SnapshotRequest2) returns (SnapshotResponse2);
rpc SnapshotAll(SnapshotAllRequest) returns (SnapshotAllResponse);
};
9 changes: 9 additions & 0 deletions src/chunkserver/braft_cli_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,5 +231,14 @@ void BRaftCliServiceImpl::transfer_leader(
}
}

static void snapshot_returned(brpc::Controller* cntl,
scoped_refptr<braft::NodeImpl> node,
::google::protobuf::Closure* done,
const butil::Status& st) {
brpc::ClosureGuard done_guard(done);
if (!st.ok()) {
cntl->SetFailed(st.error_code(), "%s", st.error_cstr());
}
}
} // namespace chunkserver
} // namespace curve
48 changes: 48 additions & 0 deletions src/chunkserver/braft_cli_service2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,5 +338,53 @@ void BRaftCliServiceImpl2::ResetPeer(RpcController* controller,
}
}

static void snapshot_returned(brpc::Controller* cntl,
scoped_refptr<braft::NodeImpl> node,
::google::protobuf::Closure* done,
const butil::Status& st) {
brpc::ClosureGuard done_guard(done);
if (!st.ok()) {
cntl->SetFailed(st.error_code(), "%s", st.error_cstr());
}
}

void BRaftCliServiceImpl2::Snapshot(RpcController* controller,
const SnapshotRequest2* request,
SnapshotResponse2* response,
Closure* done) {
brpc::Controller* cntl = (brpc::Controller*)controller;
brpc::ClosureGuard done_guard(done);
scoped_refptr<braft::NodeImpl> node;
LogicPoolID logicPoolId = request->logicpoolid();
CopysetID copysetId = request->copysetid();
butil::Status st =
get_node(&node, logicPoolId, copysetId,
request->peer().address());
if (!st.ok()) {
cntl->SetFailed(st.error_code(), "%s", st.error_cstr());
return;
}

braft::Closure* snapshot_done = NewCallback(snapshot_returned, cntl, node,
done_guard.release());
return node->snapshot(snapshot_done);
}

void BRaftCliServiceImpl2::SnapshotAll(RpcController* controller,
const SnapshotAllRequest* request,
SnapshotAllResponse* response,
Closure* done) {
brpc::Controller* cntl = (brpc::Controller*)controller;
brpc::ClosureGuard done_guard(done);
braft::NodeManager *const nm = braft::NodeManager::GetInstance();
std::vector<scoped_refptr<braft::NodeImpl>> nodes;
nm->get_all_nodes(&nodes);
for (const auto& node : nodes) {
braft::Closure* snapshot_done = NewCallback(snapshot_returned, cntl,
node, done_guard.release());
node->snapshot(snapshot_done);
}
}

} // namespace chunkserver
} // namespace curve
12 changes: 12 additions & 0 deletions src/chunkserver/braft_cli_service2.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ class BRaftCliServiceImpl2 : public CliService2 {
ResetPeerResponse2* response,
Closure* done);

// 触发快照
void Snapshot(RpcController* controller,
const SnapshotRequest2* request,
SnapshotResponse2* response,
Closure* done);

// 给当前chunkserver上全部copyset的副本打快照
void SnapshotAll(RpcController* controller,
const SnapshotAllRequest* request,
SnapshotAllResponse* response,
Closure* done);

private:
/**
* @brief: 查询指定的raft node
Expand Down
23 changes: 23 additions & 0 deletions src/chunkserver/cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,5 +202,28 @@ butil::Status TransferLeader(const LogicPoolID &logicPoolId,
return butil::Status::OK();
}

butil::Status Snapshot(const LogicPoolID &logicPoolId,
const CopysetID &copysetId,
const PeerId &peer,
const braft::cli::CliOptions &options) {
brpc::Channel channel;
if (channel.Init(peer.addr, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
peer.to_string().c_str());
}
brpc::Controller cntl;
cntl.set_timeout_ms(options.timeout_ms);
cntl.set_max_retry(options.max_retry);
SnapshotRequest request;
request.set_peer_id(peer.to_string());
SnapshotResponse response;
CliService_Stub stub(&channel);
stub.snapshot(&cntl, &request, &response, NULL);
if (cntl.Failed()) {
return butil::Status(cntl.ErrorCode(), cntl.ErrorText());
}
return butil::Status::OK();
}

} // namespace chunkserver
} // namespace curve
6 changes: 6 additions & 0 deletions src/chunkserver/cli.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ butil::Status TransferLeader(const LogicPoolID &logicPoolId,
const PeerId &peer,
const braft::cli::CliOptions &options);

// 触发快照
butil::Status Snapshot(const LogicPoolID &logicPoolId,
const CopysetID &copysetId,
const PeerId &peer,
const braft::cli::CliOptions &options);

} // namespace chunkserver
} // namespace curve

Expand Down
48 changes: 48 additions & 0 deletions src/chunkserver/cli2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,5 +318,53 @@ butil::Status ResetPeer(const LogicPoolID &logicPoolId,
return butil::Status::OK();
}

butil::Status Snapshot(const LogicPoolID &logicPoolId,
const CopysetID &copysetId,
const Peer& peer,
const braft::cli::CliOptions& options) {
brpc::Channel channel;
PeerId peerId(peer.address());
if (channel.Init(peerId.addr, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
peerId.to_string().c_str());
}
brpc::Controller cntl;
cntl.set_timeout_ms(options.timeout_ms);
cntl.set_max_retry(options.max_retry);
SnapshotRequest2 request;
request.set_logicpoolid(logicPoolId);
request.set_copysetid(copysetId);
Peer *peerPtr = new Peer(peer);
request.set_allocated_peer(peerPtr);
SnapshotResponse2 response;
CliService2_Stub stub(&channel);
stub.Snapshot(&cntl, &request, &response, NULL);
if (cntl.Failed()) {
return butil::Status(cntl.ErrorCode(), cntl.ErrorText());
}
return butil::Status::OK();
}

butil::Status SnapshotAll(const Peer& peer,
const braft::cli::CliOptions& options) {
brpc::Channel channel;
PeerId peerId(peer.address());
if (channel.Init(peerId.addr, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
peerId.to_string().c_str());
}
brpc::Controller cntl;
cntl.set_timeout_ms(options.timeout_ms);
cntl.set_max_retry(options.max_retry);
SnapshotAllRequest request;
SnapshotAllResponse response;
CliService2_Stub stub(&channel);
stub.SnapshotAll(&cntl, &request, &response, NULL);
if (cntl.Failed()) {
return butil::Status(cntl.ErrorCode(), cntl.ErrorText());
}
return butil::Status::OK();
}

} // namespace chunkserver
} // namespace curve
10 changes: 10 additions & 0 deletions src/chunkserver/cli2.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ butil::Status ResetPeer(const LogicPoolID &logicPoolId,
const Peer& requestPeer,
const braft::cli::CliOptions& options);

// 触发快照
butil::Status Snapshot(const LogicPoolID &logicPoolId,
const CopysetID &copysetId,
const Peer& peer,
const braft::cli::CliOptions& options);

// 给chunkserver上全部copyset副本触发快照
butil::Status SnapshotAll(const Peer& peer,
const braft::cli::CliOptions& options);

} // namespace chunkserver
} // namespace curve

Expand Down
4 changes: 4 additions & 0 deletions src/tools/copyset_check.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ void CopysetCheck::PrintHelp(const std::string& command) {
std::cout << "curve_ops_tool copysets-status [-mdsAddr=127.0.0.1:6666] "
<< "[-margin=1000] [-operatorMaxPeriod=30] [-checkOperator] "
<< "[-confPath=/etc/curve/tools.conf]" << std::endl << std::endl; // NOLINT
} else if (command == kCheckOperatorCmd) {
std::cout << "curve_ops_tool check-operator -opName=" << kTotalOpName
<< "/" << kChangeOpName << "/" << kAddOpName << "/"
<< kRemoveOpName << "/" << kTransferOpName << std::endl;
} else {
std::cout << "Command not supported!" << std::endl;
}
Expand Down
82 changes: 81 additions & 1 deletion src/tools/curve_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Author: wudemiao
*/

#include <vector>
#include "src/tools/curve_cli.h"
#include "src/tools/common.h"

Expand All @@ -38,6 +39,7 @@ DEFINE_string(new_conf,
DEFINE_bool(affirm, true,
"If true, command line interactive affirmation is required."
" Only set false in unit test");
DECLARE_string(mdsAddr);

namespace curve {
namespace tool {
Expand All @@ -53,7 +55,12 @@ namespace tool {

bool CurveCli::SupportCommand(const std::string& command) {
return (command == kResetPeerCmd || command == kRemovePeerCmd
|| command == kTransferLeaderCmd);
|| command == kTransferLeaderCmd
|| command == kDoSnapshot);
}

int CurveCli::Init() {
return mdsClient_->Init(FLAGS_mdsAddr);
}

int CurveCli::RemovePeer() {
Expand Down Expand Up @@ -206,6 +213,64 @@ int CurveCli::ResetPeer() {
return 0;
}

int CurveCli::DoSnapshot() {
CHECK_FLAG(peer);
braft::PeerId requestPeerId;
if (requestPeerId.parse(FLAGS_peer) != 0) {
std::cout << "Fail to parse --peer" << std::endl;
return -1;
}
curve::common::Peer requestPeer;
requestPeer.set_address(requestPeerId.to_string());
return DoSnapshot(FLAGS_logicalPoolId, FLAGS_copysetId, requestPeer);
}

int CurveCli::DoSnapshot(uint32_t lgPoolId, uint32_t copysetId,
const curve::common::Peer& peer) {
braft::cli::CliOptions opt;
opt.timeout_ms = FLAGS_timeout_ms;
opt.max_retry = FLAGS_max_retry;
butil::Status st = curve::chunkserver::Snapshot(
FLAGS_logicalPoolId,
FLAGS_copysetId,
peer,
opt);
if (!st.ok()) {
std::cout << "Do snapshot of copyset "
<< "(" << FLAGS_logicalPoolId << ", "
<< FLAGS_copysetId << ")"
<< " fail, requestPeer: " << peer.address()
<< ", detail: " << st << std::endl;
return -1;
}
return 0;
}

int CurveCli::DoSnapshotAll() {
std::vector<ChunkServerInfo> chunkservers;
int res = mdsClient_->ListChunkServersInCluster(&chunkservers);
if (res != 0) {
std::cout << "ListChunkServersInCluster fail!" << std::endl;
return -1;
}
for (const auto& chunkserver : chunkservers) {
braft::cli::CliOptions opt;
opt.timeout_ms = FLAGS_timeout_ms;
opt.max_retry = FLAGS_max_retry;
std::string csAddr = chunkserver.hostip() + ":" +
std::to_string(chunkserver.port());
curve::common::Peer peer;
peer.set_address(csAddr);
butil::Status st = curve::chunkserver::SnapshotAll(peer, opt);
if (!st.ok()) {
std::cout << "Do all snapshot of chunkserver " << csAddr
<< " fail, error: " << st.error_str() << std::endl;
res = -1;
}
}
return res;
}

void CurveCli::PrintHelp(const std::string &cmd) {
std::cout << "Example " << std::endl;
if (cmd == kResetPeerCmd) {
Expand All @@ -214,12 +279,21 @@ void CurveCli::PrintHelp(const std::string &cmd) {
} else if (cmd == kRemovePeerCmd || cmd == kTransferLeaderCmd) {
std::cout << "curve_ops_tool " << cmd << " -logicalPoolId=1 -copysetId=10001 -peer=127.0.0.1:8080:0 " // NOLINT
"-conf=127.0.0.1:8080:0,127.0.0.1:8081:0,127.0.0.1:8082:0 -max_retry=3 -timeout_ms=100" << std::endl; // NOLINT
} else if (cmd == kDoSnapshot) {
std::cout << "curve_ops_tool " << cmd << " -logicalPoolId=1 -copysetId=10001 -peer=127.0.0.1:8080:0 " // NOLINT
"-max_retry=3 -timeout_ms=100" << std::endl;
} else if (cmd == kDoSnapshotAll) {
std::cout << "curve_ops_tool " << cmd << std::endl;
} else {
std::cout << "Command not supported!" << std::endl;
}
}

int CurveCli::RunCommand(const std::string &cmd) {
if (Init() != 0) {
std::cout << "Init CurveCli tool failed" << std::endl;
return -1;
}
if (cmd == kRemovePeerCmd) {
return RemovePeer();
}
Expand All @@ -229,6 +303,12 @@ int CurveCli::RunCommand(const std::string &cmd) {
if (cmd == kResetPeerCmd) {
return ResetPeer();
}
if (cmd == kDoSnapshot) {
return DoSnapshot();
}
if (cmd == kDoSnapshotAll) {
return DoSnapshotAll();
}
std::cout << "Command not supported!" << std::endl;
return -1;
}
Expand Down
Loading

0 comments on commit d0360b8

Please sign in to comment.