diff --git a/proto/copyset.proto b/proto/copyset.proto index b489a9471d..13889efcf8 100755 --- a/proto/copyset.proto +++ b/proto/copyset.proto @@ -42,6 +42,7 @@ enum COPYSET_OP_STATUS { COPYSET_OP_STATUS_EXIST = 1; // copyset node 已经存在 COPYSET_OP_STATUS_COPYSET_NOTEXIST = 2; COPYSET_OP_STATUS_FAILURE_UNKNOWN = 3; + COPYSET_OP_STATUS_COPYSET_IS_HEALTHY = 4; }; message CopysetResponse { @@ -95,5 +96,7 @@ service CopysetService { rpc CreateCopysetNode2 (CopysetRequest2) returns (CopysetResponse2); + rpc DeleteBrokenCopyset(CopysetRequest) returns (CopysetResponse); + rpc GetCopysetStatus (CopysetStatusRequest) returns (CopysetStatusResponse); }; diff --git a/src/chunkserver/copyset_node_manager.cpp b/src/chunkserver/copyset_node_manager.cpp index dd03d63418..6e86e5e387 100755 --- a/src/chunkserver/copyset_node_manager.cpp +++ b/src/chunkserver/copyset_node_manager.cpp @@ -485,6 +485,30 @@ bool CopysetNodeManager::PurgeCopysetNodeData(const LogicPoolID &logicPoolId, return ret; } +bool CopysetNodeManager::DeleteBrokenCopyset(const LogicPoolID& poolId, + const CopysetID& copysetId) { + auto groupId = ToGroupId(poolId, copysetId); + // if copyset node exist in the manager means its data is complete + if (copysetNodeMap_.find(groupId) != copysetNodeMap_.end()) { + return false; + } + + std::string copysetsDir; + auto trash = copysetNodeOptions_.trash; + auto chunkDataUri = copysetNodeOptions_.chunkDataUri; + auto protocol = UriParser::ParseUri(chunkDataUri, ©setsDir); + if (protocol.empty()) { + LOG(ERROR) << "Not support chunk data uri's protocol: " << chunkDataUri; + return false; + } else if (0 != trash->RecycleCopySet(copysetsDir + "/" + groupId)) { + LOG(ERROR) << "Failed to recycle broken copyset " + << ToGroupIdString(poolId, copysetId); + return false; + } + + return true; +} + bool CopysetNodeManager::IsExist(const LogicPoolID &logicPoolId, const CopysetID ©setId) { /* 加读锁 */ diff --git a/src/chunkserver/copyset_node_manager.h b/src/chunkserver/copyset_node_manager.h index ad41c37dc2..509de831b1 100755 --- a/src/chunkserver/copyset_node_manager.h +++ b/src/chunkserver/copyset_node_manager.h @@ -105,6 +105,15 @@ class CopysetNodeManager : public curve::common::Uncopyable { bool PurgeCopysetNodeData(const LogicPoolID &logicPoolId, const CopysetID ©setId); + /** + * @brief Delete broken copyset + * @param[in] poolId logical pool id + * @param[in] copysetId copyset id + * @return true if delete success, else return false + */ + bool DeleteBrokenCopyset(const LogicPoolID& poolId, + const CopysetID& copysetId); + /** * 判断指定的copyset是否存在 * @param logicPoolId:逻辑池子id diff --git a/src/chunkserver/copyset_service.cpp b/src/chunkserver/copyset_service.cpp index acb552f1b9..e29f8c8ab8 100755 --- a/src/chunkserver/copyset_service.cpp +++ b/src/chunkserver/copyset_service.cpp @@ -134,6 +134,31 @@ void CopysetServiceImpl::CreateCopysetNode2(RpcController *controller, LOG(INFO) << "Create " << request->copysets().size() << " copysets success"; } +void CopysetServiceImpl::DeleteBrokenCopyset(RpcController* controller, + const CopysetRequest* request, + CopysetResponse* response, + Closure* done) { + LOG(INFO) << "Receive delete broken copyset request"; + + brpc::ClosureGuard doneGuard(done); + + auto poolId = request->logicpoolid(); + auto copysetId = request->copysetid(); + auto groupId = ToGroupIdString(poolId, copysetId); + + // if copyset node exist in the manager means its data is complete + if (copysetNodeManager_->IsExist(poolId, copysetId)) { + response->set_status(COPYSET_OP_STATUS_COPYSET_IS_HEALTHY); + LOG(WARNING) << "Delete broken copyset, " << groupId << " is healthy"; + } else if (!copysetNodeManager_->DeleteBrokenCopyset(poolId, copysetId)) { + response->set_status(COPYSET_OP_STATUS_FAILURE_UNKNOWN); + LOG(ERROR) << "Delete broken copyset " << groupId << " failed"; + } else { + response->set_status(COPYSET_OP_STATUS_SUCCESS); + LOG(INFO) << "Delete broken copyset " << groupId << " success"; + } +} + void CopysetServiceImpl::GetCopysetStatus(RpcController *controller, const CopysetStatusRequest *request, CopysetStatusResponse *response, diff --git a/src/chunkserver/copyset_service.h b/src/chunkserver/copyset_service.h index ec3d4c0ec3..fabf6df8fc 100755 --- a/src/chunkserver/copyset_service.h +++ b/src/chunkserver/copyset_service.h @@ -58,6 +58,14 @@ class CopysetServiceImpl : public CopysetService { CopysetResponse2 *response, Closure *done); + /** + * @brief Delete broken copyset + */ + void DeleteBrokenCopyset(RpcController* controller, + const CopysetRequest* request, + CopysetResponse* response, + Closure* done); + void GetCopysetStatus(RpcController *controller, const CopysetStatusRequest *request, CopysetStatusResponse *response, diff --git a/src/tools/curve_cli.cpp b/src/tools/curve_cli.cpp index d9788c690f..00dbeea052 100644 --- a/src/tools/curve_cli.cpp +++ b/src/tools/curve_cli.cpp @@ -35,6 +35,8 @@ DEFINE_string(peer, "", "Id of the operating peer"); DEFINE_string(new_conf, "", "new conf to reset peer"); +DEFINE_bool(remove_copyset, false, "Whether need to remove broken copyset " + "after remove peer (default: false)"); DEFINE_bool(affirm, true, "If true, command line interactive affirmation is required." @@ -64,43 +66,82 @@ int CurveCli::Init() { return mdsClient_->Init(FLAGS_mdsAddr); } +butil::Status CurveCli::DeleteBrokenCopyset(braft::PeerId peerId, + const LogicPoolID& poolId, + const CopysetID& copysetId) { + brpc::Channel channel; + brpc::Controller cntl; + CopysetRequest request; + CopysetResponse response; + + cntl.set_timeout_ms(FLAGS_timeout_ms); + cntl.set_max_retry(FLAGS_max_retry); + request.set_logicpoolid(poolId); + request.set_copysetid(copysetId); + + if (channel.Init(peerId.addr, NULL) != 0) { + return butil::Status(-1, "Fail to init channel to %s", + peerId.to_string().c_str()); + } + + CopysetService_Stub stub(&channel); + stub.DeleteBrokenCopyset(&cntl, &request, &response, NULL); + + if (cntl.Failed()) { + return butil::Status(cntl.ErrorCode(), cntl.ErrorText()); + } else if (response.status() != COPYSET_OP_STATUS_SUCCESS) { + return butil::Status(-1, COPYSET_OP_STATUS_Name(response.status())); + } + + return butil::Status::OK(); +} + int CurveCli::RemovePeer() { CHECK_FLAG(conf); CHECK_FLAG(peer); braft::Configuration conf; + braft::PeerId peerId; + curve::common::Peer peer; + braft::cli::CliOptions opt; + + auto poolId = FLAGS_logicalPoolId; + auto copysetId = FLAGS_copysetId; + opt.timeout_ms = FLAGS_timeout_ms; + opt.max_retry = FLAGS_max_retry; + if (conf.parse_from(FLAGS_conf) != 0) { std::cout << "Fail to parse --conf" << std::endl; return -1; - } - braft::PeerId removingPeerId; - if (removingPeerId.parse(FLAGS_peer) != 0) { + } else if (peerId.parse(FLAGS_peer) != 0) { std::cout << "Fail to parse --peer" << std::endl; return -1; + } else { + peer.set_address(peerId.to_string()); } - curve::common::Peer removingPeer; - removingPeer.set_address(removingPeerId.to_string()); - braft::cli::CliOptions opt; - opt.timeout_ms = FLAGS_timeout_ms; - opt.max_retry = FLAGS_max_retry; - butil::Status st = curve::chunkserver::RemovePeer( - FLAGS_logicalPoolId, - FLAGS_copysetId, - conf, - removingPeer, - opt); - if (!st.ok()) { - std::cout << "Remove peer " << removingPeerId << " from copyset " - << "(" << FLAGS_logicalPoolId << ", " - << FLAGS_copysetId << ")" - << " fail, original conf: " << conf - << ", detail: " << st << std::endl; - return -1; + + // STEP 1: remove peer + butil::Status status = curve::chunkserver::RemovePeer( + poolId, copysetId, conf, peer, opt); + auto succ = status.ok(); + std::cout << "Remove peer " << peerId << " for copyset(" + << poolId << ", " << copysetId << ") " + << (succ ? "success" : "fail") << ", original conf: " << conf + << ", status: " << status << std::endl; + + if (!succ || !FLAGS_remove_copyset) { + return succ ? 0 : -1; } - std::cout << "Remove peer " << removingPeerId << " from copyset " - << "(" << FLAGS_logicalPoolId << ", " << FLAGS_copysetId << ")" - << " success, original conf: " << conf << std::endl; - return 0; + + // STEP 2: delete broken copyset + status = DeleteBrokenCopyset(peerId, poolId, copysetId); + succ = status.ok(); + std::cout << "Delete copyset(" << poolId << ", " << copysetId << ")" + << " in " << peerId << (succ ? "success" : "fail") + << ", original conf: " << conf + << ", status: " << status << std::endl; + + return succ ? 0 : -1; } int CurveCli::TransferLeader() { @@ -279,7 +320,7 @@ void CurveCli::PrintHelp(const std::string &cmd) { "-new_conf=127.0.0.1:8080:0 -max_retry=3 -timeout_ms=100" << std::endl; // NOLINT } else if (cmd == kRemovePeerCmd || cmd == kTransferLeaderCmd) { std::cout << "curve_ops_tool " << cmd << " -logicalPoolId=1 -copysetId=10001 -peer=127.0.0.1:8080:0 " // NOLINT - "-conf=127.0.0.1:8080:0,127.0.0.1:8081:0,127.0.0.1:8082:0 -max_retry=3 -timeout_ms=100" << std::endl; // NOLINT + "-conf=127.0.0.1:8080:0,127.0.0.1:8081:0,127.0.0.1:8082:0 -max_retry=3 -timeout_ms=100 -remove_copyset=true/false" << std::endl; // NOLINT } else if (cmd == kDoSnapshot) { std::cout << "curve_ops_tool " << cmd << " -logicalPoolId=1 -copysetId=10001 -peer=127.0.0.1:8080:0 " // NOLINT "-max_retry=3 -timeout_ms=100" << std::endl; diff --git a/src/tools/curve_cli.h b/src/tools/curve_cli.h index 259642b16c..24a4944cee 100644 --- a/src/tools/curve_cli.h +++ b/src/tools/curve_cli.h @@ -33,14 +33,25 @@ #include #include +#include "include/chunkserver/chunkserver_common.h" #include "src/chunkserver/copyset_node.h" #include "src/chunkserver/cli2.h" #include "src/tools/curve_tool.h" #include "src/tools/curve_tool_define.h" #include "src/tools/mds_client.h" +#include "proto/copyset.pb.h" namespace curve { namespace tool { + +using ::curve::chunkserver::LogicPoolID; +using ::curve::chunkserver::CopysetID; +using ::curve::chunkserver::CopysetRequest; +using ::curve::chunkserver::CopysetResponse; +using ::curve::chunkserver::CopysetService_Stub; +using ::curve::chunkserver::COPYSET_OP_STATUS::COPYSET_OP_STATUS_SUCCESS; +using ::curve::chunkserver::COPYSET_OP_STATUS::COPYSET_OP_STATUS_FAILURE_UNKNOWN; // NOLINT + class CurveCli : public CurveTool { public: explicit CurveCli(std::shared_ptr mdsClient) : @@ -74,6 +85,17 @@ class CurveCli : public CurveTool { static bool SupportCommand(const std::string& command); private: + /** + * @brief Delete broken copyset + * @param[in] peerId chunkserver peer (ip, port) + * @param[in] poolId logical pool id + * @param[in] copysetId copyset id + * @return butil::Status (code, err_msg) + */ + butil::Status DeleteBrokenCopyset(braft::PeerId peerId, + const LogicPoolID& poolId, + const CopysetID& copysetId); + /** * @brief 删除peer * @param 无 diff --git a/test/chunkserver/copyset_service_test.cpp b/test/chunkserver/copyset_service_test.cpp index 080c5c32b7..8f727113f4 100644 --- a/test/chunkserver/copyset_service_test.cpp +++ b/test/chunkserver/copyset_service_test.cpp @@ -29,6 +29,7 @@ #include +#include "src/chunkserver/trash.h" #include "src/chunkserver/copyset_node.h" #include "src/chunkserver/copyset_node_manager.h" #include "src/chunkserver/cli.h" @@ -57,11 +58,20 @@ class CopysetServiceTest : public testing::Test { public: void SetUp() { testDir = "CopysetServiceTestData"; - rmCmd = "rm -rf CopysetServiceTestData"; + rmCmd = "rm -rf CopysetServiceTestData trash"; copysetDir = "local://./CopysetServiceTestData"; copysetDirPattern = "local://./CopysetServiceTestData/%d"; Exec(rmCmd.c_str()); + + // prepare trash + TrashOptions opt; + opt.trashPath = "local://./trash"; + opt.localFileSystem = + LocalFsFactory::CreateFs(FileSystemType::EXT4, ""); + trash_ = std::make_shared(); + trash_->Init(opt); } + void TearDown() { Exec(rmCmd.c_str()); } @@ -71,6 +81,7 @@ class CopysetServiceTest : public testing::Test { std::string rmCmd; std::string copysetDir; std::string copysetDirPattern; + std::shared_ptr trash_; }; butil::AtExitManager atExitManager; @@ -106,6 +117,7 @@ TEST_F(CopysetServiceTest, basic) { copysetNodeOptions.localFileSystem = fs; copysetNodeOptions.chunkFilePool = std::make_shared(fs); + copysetNodeOptions.trash = trash_; ASSERT_EQ(0, copysetNodeManager->Init(copysetNodeOptions)); ASSERT_EQ(0, copysetNodeManager->Run()); @@ -175,6 +187,41 @@ TEST_F(CopysetServiceTest, basic) { ASSERT_EQ(cntl.ErrorCode(), EINVAL); } + // TEST CASES: remove copyset node + { + brpc::Controller cntl; + CopysetRequest request; + CopysetResponse response; + CopysetStatusRequest statusReq; + CopysetStatusResponse statusResp; + cntl.set_timeout_ms(3000); + + // CASE 1: copyset is healthy + request.set_logicpoolid(logicPoolId); + request.set_copysetid(copysetId); + stub.DeleteBrokenCopyset(&cntl, &request, &response, nullptr); + ASSERT_FALSE(cntl.Failed()); + ASSERT_EQ(response.status(), COPYSET_OP_STATUS_COPYSET_IS_HEALTHY); + + // CASE 2: copyset is not exist -> delete failed + cntl.Reset(); + request.set_logicpoolid(logicPoolId); + request.set_copysetid(copysetId + 1); + stub.DeleteBrokenCopyset(&cntl, &request, &response, nullptr); + ASSERT_FALSE(cntl.Failed()); + ASSERT_EQ(response.status(), COPYSET_OP_STATUS_FAILURE_UNKNOWN); + + // CASE 3: delete broken copyset success + ASSERT_TRUE(copysetNodeManager-> + DeleteCopysetNode(logicPoolId, copysetId)); + cntl.Reset(); + request.set_logicpoolid(logicPoolId); + request.set_copysetid(copysetId); + stub.DeleteBrokenCopyset(&cntl, &request, &response, nullptr); + ASSERT_FALSE(cntl.Failed()); + ASSERT_EQ(response.status(), COPYSET_OP_STATUS_SUCCESS); + } + ASSERT_EQ(0, server.Stop(0)); ASSERT_EQ(0, server.Join()); } diff --git a/test/tools/curve_cli_test.cpp b/test/tools/curve_cli_test.cpp index 51b9d0bf02..133d9de42d 100644 --- a/test/tools/curve_cli_test.cpp +++ b/test/tools/curve_cli_test.cpp @@ -27,6 +27,7 @@ #include #include "src/tools/curve_cli.h" #include "test/tools/mock/mock_cli_service.h" +#include "test/tools/mock/mock_copyset_service.h" #include "test/tools/mock/mock_mds_client.h" using ::testing::_; @@ -43,11 +44,20 @@ DECLARE_string(peer); DECLARE_string(new_conf); DECLARE_uint32(logic_pool_id); DECLARE_uint32(copyset_id); +DECLARE_bool(remove_copyset); DECLARE_bool(affirm); namespace curve { namespace tool { +template +void callback(RpcController* controller, + const Req* request, + Resp* response, + Closure* done) { + brpc::ClosureGuard doneGuard(done); +} + class CurveCliTest : public ::testing::Test { protected: CurveCliTest() {} @@ -55,8 +65,11 @@ class CurveCliTest : public ::testing::Test { mdsClient_ = std::make_shared(); server = new brpc::Server(); mockCliService = new MockCliService(); + mockCopysetService_ = std::make_shared(); ASSERT_EQ(0, server->AddService(mockCliService, brpc::SERVER_DOESNT_OWN_SERVICE)); + ASSERT_EQ(0, server->AddService(mockCopysetService_.get(), + brpc::SERVER_DOESNT_OWN_SERVICE)); ASSERT_EQ(0, server->Start("127.0.0.1:9192", nullptr)); FLAGS_affirm = false; } @@ -68,8 +81,10 @@ class CurveCliTest : public ::testing::Test { delete mockCliService; mockCliService = nullptr; } + brpc::Server *server; MockCliService *mockCliService; + std::shared_ptr mockCopysetService_; const std::string conf = "127.0.0.1:9192:0"; const std::string peer = "127.0.0.1:9192:0"; std::shared_ptr mdsClient_; @@ -146,6 +161,50 @@ TEST_F(CurveCliTest, RemovePeer) { cntl->SetFailed("test"); })); ASSERT_EQ(-1, curveCli.RunCommand("remove-peer")); + + // TEST CASES: remove broken copyset after remove peer + { + auto getLeaderFunc = callback; + auto removePeerFunc = callback; + auto removeCopysetFunc = callback; + + // GetLeaderResponse2 + GetLeaderResponse2 getLeaderResp; + auto leader = new curve::common::Peer; + leader->set_address(peer); + getLeaderResp.set_allocated_leader(leader); + + // CopysetResponse + CopysetResponse copysetSuccResp, copysetFailResp; + copysetSuccResp.set_status(COPYSET_OP_STATUS_SUCCESS); + copysetFailResp.set_status(COPYSET_OP_STATUS_FAILURE_UNKNOWN); + + EXPECT_CALL(*mockCliService, GetLeader(_, _, _, _)) + .Times(3) + .WillRepeatedly(DoAll(SetArgPointee<2>(getLeaderResp), + Invoke(getLeaderFunc))); + EXPECT_CALL(*mockCliService, RemovePeer(_, _, _, _)) + .Times(3) + .WillRepeatedly(Invoke(removePeerFunc)); + EXPECT_CALL(*mockCopysetService_, DeleteBrokenCopyset(_, _, _, _)) + .Times(2) + .WillOnce(DoAll(SetArgPointee<2>(copysetFailResp), + Invoke(removeCopysetFunc))) + .WillOnce(DoAll(SetArgPointee<2>(copysetSuccResp), + Invoke(removeCopysetFunc))); + + // CASE 1: disable remove copyset + FLAGS_peer = peer; + FLAGS_conf = conf; + ASSERT_EQ(0, curveCli.RunCommand("remove-peer")); + + // CASE 2: remove copyset node fail + FLAGS_remove_copyset = true; + ASSERT_EQ(-1, curveCli.RunCommand("remove-peer")); + + // CASE 3: remove copyset node success + ASSERT_EQ(0, curveCli.RunCommand("remove-peer")); + } } TEST_F(CurveCliTest, TransferLeader) { diff --git a/test/tools/mock/mock_copyset_service.h b/test/tools/mock/mock_copyset_service.h new file mode 100644 index 0000000000..f49add3aca --- /dev/null +++ b/test/tools/mock/mock_copyset_service.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: Curve + * Created Date: Tue June 1 2021 + * Author: Jingli Chen (Wine93) + */ + +#ifndef TEST_TOOLS_MOCK_MOCK_COPYSET_SERVICE_H_ +#define TEST_TOOLS_MOCK_MOCK_COPYSET_SERVICE_H_ + +#include +#include +#include "proto/copyset.pb.h" + +namespace curve { +namespace tool { + +using ::google::protobuf::RpcController; +using ::google::protobuf::Closure; +using ::curve::chunkserver::CopysetService; +using ::curve::chunkserver::CopysetRequest; +using ::curve::chunkserver::CopysetResponse; + +class MockCopysetService : public CopysetService { + public: + MOCK_METHOD4(DeleteBrokenCopyset, + void(RpcController* controller, + const CopysetRequest* request, + CopysetResponse* response, + Closure* done)); +}; + +} // namespace tool +} // namespace curve +#endif // TEST_TOOLS_MOCK_MOCK_COPYSET_SERVICE_H_