diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java index dab6888b83..6423783354 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java @@ -57,6 +57,8 @@ import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType; import org.apache.hugegraph.backend.store.raft.rpc.SetLeaderProcessor; import org.apache.hugegraph.backend.store.raft.rpc.StoreCommandProcessor; +import org.apache.hugegraph.backend.store.raft.rpc.AddPeerProcessor; +import org.apache.hugegraph.backend.store.raft.rpc.RemovePeerProcessor; import org.apache.hugegraph.config.CoreOptions; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.event.EventHub; @@ -409,6 +411,8 @@ private void shutdownRpcServer() { } private void registerRpcRequestProcessors() { + this.raftRpcServer.registerProcessor(new AddPeerProcessor(this)); + this.raftRpcServer.registerProcessor(new RemovePeerProcessor(this)); this.raftRpcServer.registerProcessor(new StoreCommandProcessor(this)); this.raftRpcServer.registerProcessor(new SetLeaderProcessor(this)); this.raftRpcServer.registerProcessor(new ListPeersProcessor(this)); diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftGroupManagerImpl.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftGroupManagerImpl.java index ff0867319c..aa402a8115 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftGroupManagerImpl.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftGroupManagerImpl.java @@ -29,6 +29,11 @@ import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.ListPeersResponse; import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.SetLeaderRequest; import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.SetLeaderResponse; +import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.AddPeerRequest; +import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.AddPeerResponse; +import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.RemovePeerRequest; +import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.RemovePeerResponse; + import org.apache.hugegraph.util.E; import com.google.protobuf.Message; @@ -118,12 +123,17 @@ public String setLeader(String endpoint) { @Override public String addPeer(String endpoint) { - E.checkArgument(this.raftNode.selfIsLeader(), - "Operation add_peer can only be executed on leader"); PeerId peerId = PeerId.parsePeer(endpoint); - RaftClosure future = new RaftClosure<>(); try { - this.raftNode.node().addPeer(peerId, future); + RaftClosure future = new RaftClosure<>(); + if (this.raftNode.selfIsLeader()) { + this.raftNode.node().addPeer(peerId, future); + } else { + AddPeerRequest request = AddPeerRequest.newBuilder() + .setEndpoint(endpoint) + .build(); + future = this.forwardToLeader(request); + } future.waitFinished(); } catch (Throwable e) { throw new BackendException("Failed to add peer '%s'", e, endpoint); @@ -133,16 +143,20 @@ public String addPeer(String endpoint) { @Override public String removePeer(String endpoint) { - E.checkArgument(this.raftNode.selfIsLeader(), - "Operation add_peer can only be executed on leader"); PeerId peerId = PeerId.parsePeer(endpoint); - RaftClosure future = new RaftClosure<>(); try { - this.raftNode.node().removePeer(peerId, future); + RaftClosure future = new RaftClosure<>(); + if (this.raftNode.selfIsLeader()) { + this.raftNode.node().removePeer(peerId, future); + } else { + RemovePeerRequest request = RemovePeerRequest.newBuilder() + .setEndpoint(endpoint) + .build(); + future = this.forwardToLeader(request); + } future.waitFinished(); } catch (Throwable e) { - throw new BackendException("Failed to remove peer '%s'", - e, endpoint); + throw new BackendException("Failed to remove peer '%s'", e, endpoint); } return peerId.toString(); } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/AddPeerProcessor.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/AddPeerProcessor.java new file mode 100644 index 0000000000..0395182142 --- /dev/null +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/AddPeerProcessor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.store.raft.rpc; + +import com.alipay.sofa.jraft.rpc.RpcRequestClosure; +import com.alipay.sofa.jraft.rpc.RpcRequestProcessor; +import com.google.protobuf.Message; +import org.apache.hugegraph.backend.store.raft.RaftContext; +import org.apache.hugegraph.backend.store.raft.RaftGroupManager; +import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.CommonResponse; +import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.AddPeerRequest; +import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.AddPeerResponse; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +public class AddPeerProcessor + extends RpcRequestProcessor { + + private static final Logger LOG = Log.logger(AddPeerProcessor.class); + + private final RaftContext context; + + public AddPeerProcessor(RaftContext context) { + super(null, null); + this.context = context; + } + + @Override + public Message processRequest(AddPeerRequest request, + RpcRequestClosure done) { + LOG.debug("Processing AddPeerRequest {}", request.getClass()); + RaftGroupManager nodeManager = this.context.raftNodeManager(); + try { + nodeManager.addPeer(request.getEndpoint()); + CommonResponse common = CommonResponse.newBuilder() + .setStatus(true) + .build(); + return AddPeerResponse.newBuilder().setCommon(common).build(); + } catch (Throwable e) { + CommonResponse common = CommonResponse.newBuilder() + .setStatus(false) + .setMessage(e.toString()) + .build(); + return AddPeerResponse.newBuilder().setCommon(common).build(); + } + } + + @Override + public String interest() { + return AddPeerRequest.class.getName(); + } +} diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RemovePeerProcessor.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RemovePeerProcessor.java new file mode 100644 index 0000000000..f578e60007 --- /dev/null +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RemovePeerProcessor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.store.raft.rpc; + +import com.alipay.sofa.jraft.rpc.RpcRequestClosure; +import com.alipay.sofa.jraft.rpc.RpcRequestProcessor; +import com.google.protobuf.Message; +import org.apache.hugegraph.backend.store.raft.RaftContext; +import org.apache.hugegraph.backend.store.raft.RaftGroupManager; +import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.RemovePeerRequest; +import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.RemovePeerResponse; +import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.CommonResponse; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +public class RemovePeerProcessor + extends RpcRequestProcessor { + + private static final Logger LOG = Log.logger(RemovePeerProcessor.class); + + private final RaftContext context; + + public RemovePeerProcessor(RaftContext context) { + super(null, null); + this.context = context; + } + + @Override + public Message processRequest(RemovePeerRequest request, + RpcRequestClosure done) { + LOG.debug("Processing RemovePeerRequest {}", request.getClass()); + RaftGroupManager nodeManager = this.context.raftNodeManager(); + try { + nodeManager.removePeer(request.getEndpoint()); + CommonResponse common = CommonResponse.newBuilder() + .setStatus(true) + .build(); + return RemovePeerResponse.newBuilder().setCommon(common).build(); + } catch (Throwable e) { + CommonResponse common = CommonResponse.newBuilder() + .setStatus(false) + .setMessage(e.toString()) + .build(); + return RemovePeerResponse.newBuilder().setCommon(common).build(); + } + } + + @Override + public String interest() { + return RemovePeerRequest.class.getName(); + } +} diff --git a/hugegraph-core/src/main/resources/proto/raft.proto b/hugegraph-core/src/main/resources/proto/raft.proto index d167f86c47..b3591bea2d 100644 --- a/hugegraph-core/src/main/resources/proto/raft.proto +++ b/hugegraph-core/src/main/resources/proto/raft.proto @@ -77,3 +77,19 @@ message SetLeaderRequest { message SetLeaderResponse { required CommonResponse common = 1; } + +message AddPeerRequest { + required string endpoint = 1; +} + +message AddPeerResponse { + required CommonResponse common = 1; +} + +message RemovePeerRequest { + required string endpoint = 1; +} + +message RemovePeerResponse { + required CommonResponse common = 1; +}