Skip to content

Commit

Permalink
fix: transfer add_peer/remove_peer command to leader (#2112)
Browse files Browse the repository at this point in the history
  • Loading branch information
simon824 authored Feb 20, 2023
1 parent c8e0f0c commit 614d471
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType;
import org.apache.hugegraph.backend.store.raft.rpc.SetLeaderProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.StoreCommandProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.AddPeerProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RemovePeerProcessor;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.event.EventHub;
Expand Down Expand Up @@ -409,6 +411,8 @@ private void shutdownRpcServer() {
}

private void registerRpcRequestProcessors() {
this.raftRpcServer.registerProcessor(new AddPeerProcessor(this));
this.raftRpcServer.registerProcessor(new RemovePeerProcessor(this));
this.raftRpcServer.registerProcessor(new StoreCommandProcessor(this));
this.raftRpcServer.registerProcessor(new SetLeaderProcessor(this));
this.raftRpcServer.registerProcessor(new ListPeersProcessor(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.ListPeersResponse;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.SetLeaderRequest;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.SetLeaderResponse;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.AddPeerRequest;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.AddPeerResponse;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.RemovePeerRequest;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.RemovePeerResponse;

import org.apache.hugegraph.util.E;
import com.google.protobuf.Message;

Expand Down Expand Up @@ -118,12 +123,17 @@ public String setLeader(String endpoint) {

@Override
public String addPeer(String endpoint) {
E.checkArgument(this.raftNode.selfIsLeader(),
"Operation add_peer can only be executed on leader");
PeerId peerId = PeerId.parsePeer(endpoint);
RaftClosure<?> future = new RaftClosure<>();
try {
this.raftNode.node().addPeer(peerId, future);
RaftClosure<?> future = new RaftClosure<>();
if (this.raftNode.selfIsLeader()) {
this.raftNode.node().addPeer(peerId, future);
} else {
AddPeerRequest request = AddPeerRequest.newBuilder()
.setEndpoint(endpoint)
.build();
future = this.forwardToLeader(request);
}
future.waitFinished();
} catch (Throwable e) {
throw new BackendException("Failed to add peer '%s'", e, endpoint);
Expand All @@ -133,16 +143,20 @@ public String addPeer(String endpoint) {

@Override
public String removePeer(String endpoint) {
E.checkArgument(this.raftNode.selfIsLeader(),
"Operation add_peer can only be executed on leader");
PeerId peerId = PeerId.parsePeer(endpoint);
RaftClosure<?> future = new RaftClosure<>();
try {
this.raftNode.node().removePeer(peerId, future);
RaftClosure<?> future = new RaftClosure<>();
if (this.raftNode.selfIsLeader()) {
this.raftNode.node().removePeer(peerId, future);
} else {
RemovePeerRequest request = RemovePeerRequest.newBuilder()
.setEndpoint(endpoint)
.build();
future = this.forwardToLeader(request);
}
future.waitFinished();
} catch (Throwable e) {
throw new BackendException("Failed to remove peer '%s'",
e, endpoint);
throw new BackendException("Failed to remove peer '%s'", e, endpoint);
}
return peerId.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.backend.store.raft.rpc;

import com.alipay.sofa.jraft.rpc.RpcRequestClosure;
import com.alipay.sofa.jraft.rpc.RpcRequestProcessor;
import com.google.protobuf.Message;
import org.apache.hugegraph.backend.store.raft.RaftContext;
import org.apache.hugegraph.backend.store.raft.RaftGroupManager;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.CommonResponse;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.AddPeerRequest;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.AddPeerResponse;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

public class AddPeerProcessor
extends RpcRequestProcessor<AddPeerRequest> {

private static final Logger LOG = Log.logger(AddPeerProcessor.class);

private final RaftContext context;

public AddPeerProcessor(RaftContext context) {
super(null, null);
this.context = context;
}

@Override
public Message processRequest(AddPeerRequest request,
RpcRequestClosure done) {
LOG.debug("Processing AddPeerRequest {}", request.getClass());
RaftGroupManager nodeManager = this.context.raftNodeManager();
try {
nodeManager.addPeer(request.getEndpoint());
CommonResponse common = CommonResponse.newBuilder()
.setStatus(true)
.build();
return AddPeerResponse.newBuilder().setCommon(common).build();
} catch (Throwable e) {
CommonResponse common = CommonResponse.newBuilder()
.setStatus(false)
.setMessage(e.toString())
.build();
return AddPeerResponse.newBuilder().setCommon(common).build();
}
}

@Override
public String interest() {
return AddPeerRequest.class.getName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.backend.store.raft.rpc;

import com.alipay.sofa.jraft.rpc.RpcRequestClosure;
import com.alipay.sofa.jraft.rpc.RpcRequestProcessor;
import com.google.protobuf.Message;
import org.apache.hugegraph.backend.store.raft.RaftContext;
import org.apache.hugegraph.backend.store.raft.RaftGroupManager;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.RemovePeerRequest;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.RemovePeerResponse;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.CommonResponse;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

public class RemovePeerProcessor
extends RpcRequestProcessor<RemovePeerRequest> {

private static final Logger LOG = Log.logger(RemovePeerProcessor.class);

private final RaftContext context;

public RemovePeerProcessor(RaftContext context) {
super(null, null);
this.context = context;
}

@Override
public Message processRequest(RemovePeerRequest request,
RpcRequestClosure done) {
LOG.debug("Processing RemovePeerRequest {}", request.getClass());
RaftGroupManager nodeManager = this.context.raftNodeManager();
try {
nodeManager.removePeer(request.getEndpoint());
CommonResponse common = CommonResponse.newBuilder()
.setStatus(true)
.build();
return RemovePeerResponse.newBuilder().setCommon(common).build();
} catch (Throwable e) {
CommonResponse common = CommonResponse.newBuilder()
.setStatus(false)
.setMessage(e.toString())
.build();
return RemovePeerResponse.newBuilder().setCommon(common).build();
}
}

@Override
public String interest() {
return RemovePeerRequest.class.getName();
}
}
16 changes: 16 additions & 0 deletions hugegraph-core/src/main/resources/proto/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,19 @@ message SetLeaderRequest {
message SetLeaderResponse {
required CommonResponse common = 1;
}

message AddPeerRequest {
required string endpoint = 1;
}

message AddPeerResponse {
required CommonResponse common = 1;
}

message RemovePeerRequest {
required string endpoint = 1;
}

message RemovePeerResponse {
required CommonResponse common = 1;
}

0 comments on commit 614d471

Please sign in to comment.