Skip to content

Commit

Permalink
curvefs: fix create copyset when exist already. If peers are all the …
Browse files Browse the repository at this point in the history
…same will success, otherwise return exist.
  • Loading branch information
SeanHai committed Jan 17, 2022
1 parent 34de287 commit 2b293c0
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 13 deletions.
2 changes: 1 addition & 1 deletion curvefs/conf/mds.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ space.rpcTimeoutMs=500
# metaserver options
#
metaserver.addr=127.0.0.1:6701 # __CURVEADM_TEMPLATE__ ${cluster_mds_addr} __CURVEADM_TEMPLATE__ __ANSIBLE_TEMPLATE__ {{ groups.metaserver | join_peer(hostvars, "metaserver_listen_port") }} __ANSIBLE_TEMPLATE__
metaserver.rpcTimeoutMs=500
metaserver.rpcTimeoutMs=5000
metaserver.rpcRertyTimes=3
metaserver.rpcRetryIntervalUs=1000000

Expand Down
6 changes: 6 additions & 0 deletions curvefs/src/mds/metaserverclient/metaserver_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,12 @@ FSStatusCode MetaserverClient::CreateCopySet(

uint32_t maxRetry = options_.rpcRetryTimes;
while (cntl.Failed() && maxRetry > 0) {
LOG(WARNING) << "Create copyset failed"
<< " from " << cntl.remote_side() << " to "
<< cntl.local_side()
<< " errCode = " << cntl.ErrorCode()
<< " errorText = " << cntl.ErrorText()
<< ", then will retry " << maxRetry << " times.";
maxRetry--;
bthread_usleep(options_.rpcRetryIntervalUs);
cntl.Reset();
Expand Down
1 change: 0 additions & 1 deletion curvefs/src/mds/topology/topology_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,6 @@ TopoStatusCode TopologyManager::CreateCopyset() {
CopySetInfo copyset(poolId, id);
copyset.SetCopySetMembers(metaServerIds);
TopoStatusCode ret = topology_->AddCopySet(copyset);
// TODO(wanghai): delete copyset on metaserver
if (TopoStatusCode::TOPO_OK != ret) {
LOG(ERROR) << "Add copyset failed after create copyset."
<< " poolId = " << poolId << ", copysetId = " << id
Expand Down
25 changes: 22 additions & 3 deletions curvefs/src/metaserver/copyset/copyset_node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,29 @@ CopysetNode* CopysetNodeManager::GetCopysetNode(PoolId poolId,
return nullptr;
}

bool CopysetNodeManager::IsCopysetNodeExist(PoolId poolId,
CopysetId copysetId) {
int CopysetNodeManager::IsCopysetNodeExist(
const CreateCopysetRequest::Copyset& copyset) {
ReadLockGuard lock(lock_);
return copysets_.count(ToGroupId(poolId, copysetId)) != 0;
auto iter = copysets_.find(ToGroupId(copyset.poolid(),
copyset.copysetid()));
if (iter == copysets_.end()) {
return 0;
} else {
auto copysetNode = iter->second.get();
std::vector<Peer> peers;
copysetNode->ListPeers(&peers);
for (int i = 0; i < copyset.peers_size(); i++) {
auto cspeer = copyset.peers(i);
auto iter = std::find_if(
peers.begin(), peers.end(),
[&cspeer](const Peer& p) { return
(cspeer.address() == p.address());});
if (iter == peers.end()) {
return -1;
}
}
}
return 1;
}

bool CopysetNodeManager::CreateCopysetNode(PoolId poolId, CopysetId copysetId,
Expand Down
2 changes: 1 addition & 1 deletion curvefs/src/metaserver/copyset/copyset_node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class CopysetNodeManager {

virtual CopysetNode* GetCopysetNode(PoolId poolId, CopysetId copysetId);

bool IsCopysetNodeExist(PoolId poolId, CopysetId copysetId);
int IsCopysetNodeExist(const CreateCopysetRequest::Copyset& copyset);

bool CreateCopysetNode(PoolId poolId, CopysetId copysetId,
const braft::Configuration& conf,
Expand Down
12 changes: 8 additions & 4 deletions curvefs/src/metaserver/copyset/copyset_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,17 @@ void CopysetServiceImpl::GetCopysetsStatus(

COPYSET_OP_STATUS CopysetServiceImpl::CreateOneCopyset(
const CreateCopysetRequest::Copyset& copyset) {
bool exists =
manager_->IsCopysetNodeExist(copyset.poolid(), copyset.copysetid());
if (exists) {
int exists = manager_->IsCopysetNodeExist(copyset);
if (-1 == exists) {
LOG(WARNING) << "Copyset "
<< ToGroupIdString(copyset.poolid(), copyset.copysetid())
<< " already exists";
<< " already exists, but peers not exactly the same.";
return COPYSET_OP_STATUS_EXIST;
} else if (1 == exists) {
LOG(WARNING) << "Copyset "
<< ToGroupIdString(copyset.poolid(), copyset.copysetid())
<< " already exists.";
return COPYSET_OP_STATUS_SUCCESS;
}

braft::Configuration conf;
Expand Down
14 changes: 11 additions & 3 deletions curvefs/test/metaserver/copyset/copyset_node_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,17 @@ TEST_F(CopysetNodeManagerTest, CreateCopysetTest_Common) {
nodeManager_->AddService(&server, butil::EndPoint(ip, kPort)));

EXPECT_TRUE(nodeManager_->CreateCopysetNode(kPoolId, kCopysetId, conf));
EXPECT_TRUE(nodeManager_->IsCopysetNodeExist(kPoolId, kCopysetId));

CreateCopysetRequest::Copyset copyset;
copyset.set_poolid(kPoolId);
copyset.set_copysetid(kCopysetId);
copyset.add_peers()->set_address("127.0.0.1:29920:0");
copyset.add_peers()->set_address("127.0.0.1:29921:0");
copyset.add_peers()->set_address("127.0.0.1:29922:0");
EXPECT_EQ(1, nodeManager_->IsCopysetNodeExist(copyset));

copyset.mutable_peers(0)->set_address("127.0.0.1:29923:0");
EXPECT_EQ(-1, nodeManager_->IsCopysetNodeExist(copyset));

// create same copyset will failed
EXPECT_FALSE(nodeManager_->CreateCopysetNode(kPoolId, kCopysetId, conf));
Expand Down Expand Up @@ -180,7 +190,6 @@ TEST_F(CopysetNodeManagerTest, DeleteCopysetNodeTest_Success) {
nodeManager_->AddService(&server, butil::EndPoint(ip, kPort)));

EXPECT_TRUE(nodeManager_->CreateCopysetNode(kPoolId, kCopysetId, conf));
EXPECT_TRUE(nodeManager_->IsCopysetNodeExist(kPoolId, kCopysetId));

// create same copyset will failed
EXPECT_FALSE(nodeManager_->CreateCopysetNode(kPoolId, kCopysetId, conf));
Expand Down Expand Up @@ -218,7 +227,6 @@ TEST_F(CopysetNodeManagerTest,
nodeManager_->AddService(&server, butil::EndPoint(ip, kPort)));

EXPECT_TRUE(nodeManager_->CreateCopysetNode(kPoolId, kCopysetId, conf));
EXPECT_TRUE(nodeManager_->IsCopysetNodeExist(kPoolId, kCopysetId));

// create same copyset will failed
EXPECT_FALSE(nodeManager_->CreateCopysetNode(kPoolId, kCopysetId, conf));
Expand Down
22 changes: 22 additions & 0 deletions curvefs/test/metaserver/copyset/copyset_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,28 @@ TEST_F(CopysetServiceTest, CreateCopysetTest) {

stub.CreateCopysetNode(&cntl, &request, &response, nullptr);

ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(COPYSET_OP_STATUS::COPYSET_OP_STATUS_SUCCESS,
response.status());
}

// create copyset exist
{
CopysetService_Stub stub(&channel_);
brpc::Controller cntl;

CreateCopysetRequest request;
CreateCopysetResponse response;

auto* copyset = request.add_copysets();
copyset->set_poolid(poolId_);
copyset->set_copysetid(copysetId_);
copyset->add_peers()->set_address("127.0.0.1:29960:0");
copyset->add_peers()->set_address("127.0.0.1:29961:0");
copyset->add_peers()->set_address("127.0.0.1:29963:0");

stub.CreateCopysetNode(&cntl, &request, &response, nullptr);

ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(COPYSET_OP_STATUS::COPYSET_OP_STATUS_EXIST,
response.status());
Expand Down

0 comments on commit 2b293c0

Please sign in to comment.