Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: fixed remove copyset after remove peer #458

Merged
merged 1 commit into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions proto/copyset.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum COPYSET_OP_STATUS {
COPYSET_OP_STATUS_EXIST = 1; // copyset node 已经存在
COPYSET_OP_STATUS_COPYSET_NOTEXIST = 2;
COPYSET_OP_STATUS_FAILURE_UNKNOWN = 3;
COPYSET_OP_STATUS_COPYSET_IS_HEALTHY = 4;
};

message CopysetResponse {
Expand Down Expand Up @@ -95,5 +96,7 @@ service CopysetService {

rpc CreateCopysetNode2 (CopysetRequest2) returns (CopysetResponse2);

rpc DeleteBrokenCopyset(CopysetRequest) returns (CopysetResponse);

rpc GetCopysetStatus (CopysetStatusRequest) returns (CopysetStatusResponse);
};
24 changes: 24 additions & 0 deletions src/chunkserver/copyset_node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,30 @@ bool CopysetNodeManager::PurgeCopysetNodeData(const LogicPoolID &logicPoolId,
return ret;
}

bool CopysetNodeManager::DeleteBrokenCopyset(const LogicPoolID& poolId,
const CopysetID& copysetId) {
auto groupId = ToGroupId(poolId, copysetId);
// if copyset node exist in the manager means its data is complete
if (copysetNodeMap_.find(groupId) != copysetNodeMap_.end()) {
return false;
}

std::string copysetsDir;
auto trash = copysetNodeOptions_.trash;
auto chunkDataUri = copysetNodeOptions_.chunkDataUri;
auto protocol = UriParser::ParseUri(chunkDataUri, &copysetsDir);
if (protocol.empty()) {
LOG(ERROR) << "Not support chunk data uri's protocol: " << chunkDataUri;
return false;
} else if (0 != trash->RecycleCopySet(copysetsDir + "/" + groupId)) {
LOG(ERROR) << "Failed to recycle broken copyset "
<< ToGroupIdString(poolId, copysetId);
return false;
}

return true;
}

bool CopysetNodeManager::IsExist(const LogicPoolID &logicPoolId,
const CopysetID &copysetId) {
/* 加读锁 */
Expand Down
9 changes: 9 additions & 0 deletions src/chunkserver/copyset_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ class CopysetNodeManager : public curve::common::Uncopyable {
bool PurgeCopysetNodeData(const LogicPoolID &logicPoolId,
const CopysetID &copysetId);

/**
* @brief Delete broken copyset
* @param[in] poolId logical pool id
* @param[in] copysetId copyset id
* @return true if delete success, else return false
*/
bool DeleteBrokenCopyset(const LogicPoolID& poolId,
const CopysetID& copysetId);

/**
* 判断指定的copyset是否存在
* @param logicPoolId:逻辑池子id
Expand Down
25 changes: 25 additions & 0 deletions src/chunkserver/copyset_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,31 @@ void CopysetServiceImpl::CreateCopysetNode2(RpcController *controller,
LOG(INFO) << "Create " << request->copysets().size() << " copysets success";
}

void CopysetServiceImpl::DeleteBrokenCopyset(RpcController* controller,
const CopysetRequest* request,
CopysetResponse* response,
Closure* done) {
LOG(INFO) << "Receive delete broken copyset request";

brpc::ClosureGuard doneGuard(done);

auto poolId = request->logicpoolid();
auto copysetId = request->copysetid();
auto groupId = ToGroupIdString(poolId, copysetId);

// if copyset node exist in the manager means its data is complete
if (copysetNodeManager_->IsExist(poolId, copysetId)) {
response->set_status(COPYSET_OP_STATUS_COPYSET_IS_HEALTHY);
LOG(WARNING) << "Delete broken copyset, " << groupId << " is healthy";
} else if (!copysetNodeManager_->DeleteBrokenCopyset(poolId, copysetId)) {
response->set_status(COPYSET_OP_STATUS_FAILURE_UNKNOWN);
LOG(ERROR) << "Delete broken copyset " << groupId << " failed";
} else {
response->set_status(COPYSET_OP_STATUS_SUCCESS);
LOG(INFO) << "Delete broken copyset " << groupId << " success";
}
}

void CopysetServiceImpl::GetCopysetStatus(RpcController *controller,
const CopysetStatusRequest *request,
CopysetStatusResponse *response,
Expand Down
8 changes: 8 additions & 0 deletions src/chunkserver/copyset_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ class CopysetServiceImpl : public CopysetService {
CopysetResponse2 *response,
Closure *done);

/**
* @brief Delete broken copyset
*/
void DeleteBrokenCopyset(RpcController* controller,
const CopysetRequest* request,
CopysetResponse* response,
Closure* done);

void GetCopysetStatus(RpcController *controller,
const CopysetStatusRequest *request,
CopysetStatusResponse *response,
Expand Down
93 changes: 67 additions & 26 deletions src/tools/curve_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ DEFINE_string(peer,
"", "Id of the operating peer");
DEFINE_string(new_conf,
"", "new conf to reset peer");
DEFINE_bool(remove_copyset, false, "Whether need to remove broken copyset "
"after remove peer (default: false)");

DEFINE_bool(affirm, true,
"If true, command line interactive affirmation is required."
Expand Down Expand Up @@ -64,43 +66,82 @@ int CurveCli::Init() {
return mdsClient_->Init(FLAGS_mdsAddr);
}

butil::Status CurveCli::DeleteBrokenCopyset(braft::PeerId peerId,
const LogicPoolID& poolId,
const CopysetID& copysetId) {
brpc::Channel channel;
brpc::Controller cntl;
CopysetRequest request;
CopysetResponse response;

cntl.set_timeout_ms(FLAGS_timeout_ms);
cntl.set_max_retry(FLAGS_max_retry);
request.set_logicpoolid(poolId);
request.set_copysetid(copysetId);

if (channel.Init(peerId.addr, NULL) != 0) {
return butil::Status(-1, "Fail to init channel to %s",
peerId.to_string().c_str());
}

CopysetService_Stub stub(&channel);
stub.DeleteBrokenCopyset(&cntl, &request, &response, NULL);

if (cntl.Failed()) {
return butil::Status(cntl.ErrorCode(), cntl.ErrorText());
} else if (response.status() != COPYSET_OP_STATUS_SUCCESS) {
return butil::Status(-1, COPYSET_OP_STATUS_Name(response.status()));
}

return butil::Status::OK();
}

int CurveCli::RemovePeer() {
CHECK_FLAG(conf);
CHECK_FLAG(peer);

braft::Configuration conf;
braft::PeerId peerId;
curve::common::Peer peer;
braft::cli::CliOptions opt;

auto poolId = FLAGS_logicalPoolId;
auto copysetId = FLAGS_copysetId;
opt.timeout_ms = FLAGS_timeout_ms;
opt.max_retry = FLAGS_max_retry;

if (conf.parse_from(FLAGS_conf) != 0) {
std::cout << "Fail to parse --conf" << std::endl;
return -1;
}
braft::PeerId removingPeerId;
if (removingPeerId.parse(FLAGS_peer) != 0) {
} else if (peerId.parse(FLAGS_peer) != 0) {
std::cout << "Fail to parse --peer" << std::endl;
return -1;
} else {
peer.set_address(peerId.to_string());
}
curve::common::Peer removingPeer;
removingPeer.set_address(removingPeerId.to_string());
braft::cli::CliOptions opt;
opt.timeout_ms = FLAGS_timeout_ms;
opt.max_retry = FLAGS_max_retry;
butil::Status st = curve::chunkserver::RemovePeer(
FLAGS_logicalPoolId,
FLAGS_copysetId,
conf,
removingPeer,
opt);
if (!st.ok()) {
std::cout << "Remove peer " << removingPeerId << " from copyset "
<< "(" << FLAGS_logicalPoolId << ", "
<< FLAGS_copysetId << ")"
<< " fail, original conf: " << conf
<< ", detail: " << st << std::endl;
return -1;

// STEP 1: remove peer
butil::Status status = curve::chunkserver::RemovePeer(
poolId, copysetId, conf, peer, opt);
auto succ = status.ok();
std::cout << "Remove peer " << peerId << " for copyset("
<< poolId << ", " << copysetId << ") "
<< (succ ? "success" : "fail") << ", original conf: " << conf
<< ", status: " << status << std::endl;

if (!succ || !FLAGS_remove_copyset) {
return succ ? 0 : -1;
}
std::cout << "Remove peer " << removingPeerId << " from copyset "
<< "(" << FLAGS_logicalPoolId << ", " << FLAGS_copysetId << ")"
<< " success, original conf: " << conf << std::endl;
return 0;

// STEP 2: delete broken copyset
status = DeleteBrokenCopyset(peerId, poolId, copysetId);
succ = status.ok();
std::cout << "Delete copyset(" << poolId << ", " << copysetId << ")"
<< " in " << peerId << (succ ? "success" : "fail")
<< ", original conf: " << conf
<< ", status: " << status << std::endl;

return succ ? 0 : -1;
}

int CurveCli::TransferLeader() {
Expand Down Expand Up @@ -279,7 +320,7 @@ void CurveCli::PrintHelp(const std::string &cmd) {
"-new_conf=127.0.0.1:8080:0 -max_retry=3 -timeout_ms=100" << std::endl; // NOLINT
} else if (cmd == kRemovePeerCmd || cmd == kTransferLeaderCmd) {
std::cout << "curve_ops_tool " << cmd << " -logicalPoolId=1 -copysetId=10001 -peer=127.0.0.1:8080:0 " // NOLINT
"-conf=127.0.0.1:8080:0,127.0.0.1:8081:0,127.0.0.1:8082:0 -max_retry=3 -timeout_ms=100" << std::endl; // NOLINT
"-conf=127.0.0.1:8080:0,127.0.0.1:8081:0,127.0.0.1:8082:0 -max_retry=3 -timeout_ms=100 -remove_copyset=true/false" << std::endl; // NOLINT
} else if (cmd == kDoSnapshot) {
std::cout << "curve_ops_tool " << cmd << " -logicalPoolId=1 -copysetId=10001 -peer=127.0.0.1:8080:0 " // NOLINT
"-max_retry=3 -timeout_ms=100" << std::endl;
Expand Down
22 changes: 22 additions & 0 deletions src/tools/curve_cli.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,25 @@
#include <iostream>
#include <memory>

#include "include/chunkserver/chunkserver_common.h"
#include "src/chunkserver/copyset_node.h"
#include "src/chunkserver/cli2.h"
#include "src/tools/curve_tool.h"
#include "src/tools/curve_tool_define.h"
#include "src/tools/mds_client.h"
#include "proto/copyset.pb.h"

namespace curve {
namespace tool {

using ::curve::chunkserver::LogicPoolID;
using ::curve::chunkserver::CopysetID;
using ::curve::chunkserver::CopysetRequest;
using ::curve::chunkserver::CopysetResponse;
using ::curve::chunkserver::CopysetService_Stub;
using ::curve::chunkserver::COPYSET_OP_STATUS::COPYSET_OP_STATUS_SUCCESS;
using ::curve::chunkserver::COPYSET_OP_STATUS::COPYSET_OP_STATUS_FAILURE_UNKNOWN; // NOLINT

class CurveCli : public CurveTool {
public:
explicit CurveCli(std::shared_ptr<MDSClient> mdsClient) :
Expand Down Expand Up @@ -74,6 +85,17 @@ class CurveCli : public CurveTool {
static bool SupportCommand(const std::string& command);

private:
/**
* @brief Delete broken copyset
* @param[in] peerId chunkserver peer (ip, port)
* @param[in] poolId logical pool id
* @param[in] copysetId copyset id
* @return butil::Status (code, err_msg)
*/
butil::Status DeleteBrokenCopyset(braft::PeerId peerId,
const LogicPoolID& poolId,
const CopysetID& copysetId);

/**
* @brief 删除peer
* @param 无
Expand Down
49 changes: 48 additions & 1 deletion test/chunkserver/copyset_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include <cstdint>

#include "src/chunkserver/trash.h"
#include "src/chunkserver/copyset_node.h"
#include "src/chunkserver/copyset_node_manager.h"
#include "src/chunkserver/cli.h"
Expand Down Expand Up @@ -57,11 +58,20 @@ class CopysetServiceTest : public testing::Test {
public:
void SetUp() {
testDir = "CopysetServiceTestData";
rmCmd = "rm -rf CopysetServiceTestData";
rmCmd = "rm -rf CopysetServiceTestData trash";
copysetDir = "local://./CopysetServiceTestData";
copysetDirPattern = "local://./CopysetServiceTestData/%d";
Exec(rmCmd.c_str());

// prepare trash
TrashOptions opt;
opt.trashPath = "local://./trash";
opt.localFileSystem =
LocalFsFactory::CreateFs(FileSystemType::EXT4, "");
trash_ = std::make_shared<Trash>();
trash_->Init(opt);
}

void TearDown() {
Exec(rmCmd.c_str());
}
Expand All @@ -71,6 +81,7 @@ class CopysetServiceTest : public testing::Test {
std::string rmCmd;
std::string copysetDir;
std::string copysetDirPattern;
std::shared_ptr<Trash> trash_;
};

butil::AtExitManager atExitManager;
Expand Down Expand Up @@ -106,6 +117,7 @@ TEST_F(CopysetServiceTest, basic) {
copysetNodeOptions.localFileSystem = fs;
copysetNodeOptions.chunkFilePool =
std::make_shared<FilePool>(fs);
copysetNodeOptions.trash = trash_;
ASSERT_EQ(0, copysetNodeManager->Init(copysetNodeOptions));
ASSERT_EQ(0, copysetNodeManager->Run());

Expand Down Expand Up @@ -175,6 +187,41 @@ TEST_F(CopysetServiceTest, basic) {
ASSERT_EQ(cntl.ErrorCode(), EINVAL);
}

// TEST CASES: remove copyset node
{
brpc::Controller cntl;
CopysetRequest request;
CopysetResponse response;
CopysetStatusRequest statusReq;
CopysetStatusResponse statusResp;
cntl.set_timeout_ms(3000);

// CASE 1: copyset is healthy
request.set_logicpoolid(logicPoolId);
request.set_copysetid(copysetId);
stub.DeleteBrokenCopyset(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(response.status(), COPYSET_OP_STATUS_COPYSET_IS_HEALTHY);

// CASE 2: copyset is not exist -> delete failed
cntl.Reset();
request.set_logicpoolid(logicPoolId);
request.set_copysetid(copysetId + 1);
stub.DeleteBrokenCopyset(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(response.status(), COPYSET_OP_STATUS_FAILURE_UNKNOWN);

// CASE 3: delete broken copyset success
ASSERT_TRUE(copysetNodeManager->
DeleteCopysetNode(logicPoolId, copysetId));
cntl.Reset();
request.set_logicpoolid(logicPoolId);
request.set_copysetid(copysetId);
stub.DeleteBrokenCopyset(&cntl, &request, &response, nullptr);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(response.status(), COPYSET_OP_STATUS_SUCCESS);
}

ASSERT_EQ(0, server.Stop(0));
ASSERT_EQ(0, server.Join());
}
Expand Down
Loading