Skip to content

Commit

Permalink
feat: support task auto manager by server role
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxxoo committed Feb 26, 2023
1 parent 614d471 commit c822b43
Show file tree
Hide file tree
Showing 12 changed files with 480 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Objects;

import org.apache.commons.lang.NotImplementedException;
import org.apache.hugegraph.HugeGraph;
import org.apache.tinkerpop.gremlin.groovy.jsr223.dsl.credential.CredentialGraphTokens;

import org.apache.hugegraph.backend.id.IdGenerator;
Expand Down Expand Up @@ -87,6 +88,12 @@ public AuthManager authManager() {
"AuthManager is unsupported by ConfigAuthenticator");
}

@Override
public HugeGraph graph() {
throw new NotImplementedException(
"AuthManager is unsupported by ConfigAuthenticator");
}

@Override
public void initAdminUser(String password) throws Exception {
String adminToken = this.tokens.get(USER_ADMIN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;

import org.apache.hugegraph.HugeGraph;
import org.apache.tinkerpop.gremlin.groovy.jsr223.dsl.credential.CredentialGraphTokens;
import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException;
Expand Down Expand Up @@ -66,6 +67,8 @@ public interface HugeAuthenticator extends Authenticator {

AuthManager authManager();

HugeGraph graph();

@Override
default void setup(final Map<String, Object> config) {
E.checkState(config != null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,19 @@ public class StandardAuthenticator implements HugeAuthenticator {

private HugeGraph graph = null;

private HugeGraph graph() {
E.checkState(this.graph != null, "Must setup Authenticator first");
return this.graph;
}

private void initAdminUser() throws Exception {
if (this.requireInitAdminUser()) {
this.initAdminUser(this.inputPassword());
}
this.graph.close();
}

@Override
public HugeGraph graph() {
E.checkState(this.graph != null, "Must setup Authenticator first");
return this.graph;
}

@Override
public void initAdminUser(String password) {
// Not allowed to call by non main thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,47 @@ public static synchronized ServerOptions instance() {
"master"
);

public static final ConfigOption<Integer> EXCEEDS_FAIL_COUNT =
new ConfigOption<>(
"server.role.fail.count",
"The role state machine fail count exceeds",
rangeInt(0, Integer.MAX_VALUE),
2
);

public static final ConfigOption<Integer> RANDOM_TIMEOUT_MILLISECOND =
new ConfigOption<>(
"server.role.random.timeout",
"The role state machine random timeout millisecond time",
rangeInt(0, Integer.MAX_VALUE),
400
);

public static final ConfigOption<Integer> HEARTBEAT_INTERVAL_SECOUND =
new ConfigOption<>(
"server.role.heartbeat.interval",
"The role state machine heartbeat interval second time",
rangeInt(0, Integer.MAX_VALUE),
1
);

public static final ConfigOption<Integer> EXCEEDS_WORKER_COUNT =
new ConfigOption<>(
"server.role.worker.count",
"Check the number of times that the master node does not initiate " +
"the heartbeat threshold",
rangeInt(0, Integer.MAX_VALUE),
5
);

public static final ConfigOption<Integer> BASE_TIMEOUT_MILLISECOND =
new ConfigOption<>(
"server.role.base.timeout",
"The role state machine candidate state base timeout time",
rangeInt(0, Integer.MAX_VALUE),
100
);

public static final ConfigOption<Integer> MAX_WORKER_THREADS =
new ConfigOption<>(
"restserver.max_worker_threads",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,21 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.StandardHugeGraph;
import org.apache.hugegraph.auth.StandardAuthenticator;
import org.apache.hugegraph.election.Config;
import org.apache.hugegraph.election.HugeRoleStateMachineConfig;
import org.apache.hugegraph.election.RoleElectionStateMachine;
import org.apache.hugegraph.election.RoleElectionStateMachineImpl;
import org.apache.hugegraph.election.RoleTypeDataAdapter;
import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException;
import org.apache.tinkerpop.gremlin.server.util.MetricManager;
import org.apache.tinkerpop.gremlin.structure.Graph;
Expand Down Expand Up @@ -82,6 +91,9 @@ public final class GraphManager {
private final RpcClientProvider rpcClient;
private final HugeConfig conf;

private RoleElectionStateMachine roleStateMachine;
private Executor applyThread;

private Id server;
private NodeRole role;

Expand Down Expand Up @@ -265,6 +277,9 @@ public void close() {
}
this.destroyRpcServer();
this.unlistenChanges();
if (this.roleStateMachine != null) {
this.roleStateMachine.shutdown();
}
}

private void startRpcServer() {
Expand Down Expand Up @@ -428,13 +443,43 @@ private void serverStarted(HugeConfig config) {
"The server role can't be null or empty");
this.server = IdGenerator.of(server);
this.role = NodeRole.valueOf(role.toUpperCase());

initRoleStateMachine(config, server);

for (String graph : this.graphs()) {
HugeGraph hugegraph = this.graph(graph);
assert hugegraph != null;
hugegraph.serverStarted(this.server, this.role);
}
}

private void initRoleStateMachine(HugeConfig config, String server) {
try {
if (!(this.authenticator() instanceof StandardAuthenticator)) {
LOG.info("Current not support role state machine");
return;
}
} catch (IllegalStateException e) {
LOG.info("Current not support role state machine");
return;
}

E.checkArgument(this.roleStateMachine == null, "Repetition init");
Config roleStateMachineConfig = new HugeRoleStateMachineConfig(server,
config.get(ServerOptions.EXCEEDS_FAIL_COUNT),
config.get(ServerOptions.RANDOM_TIMEOUT_MILLISECOND),
config.get(ServerOptions.HEARTBEAT_INTERVAL_SECOUND),
config.get(ServerOptions.EXCEEDS_WORKER_COUNT),
config.get(ServerOptions.BASE_TIMEOUT_MILLISECOND));
StandardHugeGraph graph = (StandardHugeGraph) this.authenticator().graph();
RoleTypeDataAdapter adapter = new RoleTypeDataAdapterImpl(graph.hugeGraphParams());
this.roleStateMachine = new RoleElectionStateMachineImpl(roleStateMachineConfig, adapter);
applyThread = Executors.newSingleThreadExecutor();
applyThread.execute(() -> {
this.roleStateMachine.apply(new StateMachineCallbackImpl(TaskManager.instance()));
});
}

private void addMetrics(HugeConfig config) {
final MetricManager metric = MetricManager.INSTANCE;
// Force to add server reporter
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.core;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;

import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.auth.SchemaDefine;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.election.RoleTypeData;
import org.apache.hugegraph.election.RoleTypeDataAdapter;
import org.apache.hugegraph.schema.VertexLabel;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.DataType;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;

public class RoleTypeDataAdapterImpl implements RoleTypeDataAdapter {

private final HugeGraphParams graphParams;
private final Schema schema;

public RoleTypeDataAdapterImpl(HugeGraphParams graphParams) {
this.graphParams = graphParams;
this.schema = new Schema(graphParams);
this.schema.initSchemaIfNeeded();
}

@Override
public boolean updateIfNodePresent(RoleTypeData stateData) {
// if epoch increase, update and return true
// if epoch equal, ignore different node, return false
try {
GraphTransaction tx = this.graphParams.systemTransaction();
tx.doUpdateIfPresent(this.constructEntry(stateData));
tx.commitOrRollback();
} catch (Throwable ignore){
return false;
}

return true;
}

BackendEntry constructEntry(RoleTypeData stateData) {
List<Object> list = new ArrayList<>(8);
list.add(P.LABEL);
list.add(P.ROLE_DATA);

list.add(P.NODE);
list.add(stateData.node());

list.add(P.CLOCK);
list.add(stateData.clock());

list.add(P.EPOCH);
list.add(stateData.epoch());

list.add(P.TYPE);
list.add("default");

HugeVertex vertex = this.graphParams.systemTransaction()
.constructVertex(false, list);

return this.graphParams.serializer().writeVertex(vertex);
}

@Override
public Optional<RoleTypeData> query() {
GraphTransaction tx = this.graphParams.systemTransaction();
ConditionQuery query = new ConditionQuery(HugeType.VERTEX);
VertexLabel vl = this.graphParams.graph().vertexLabel(P.ROLE_DATA);
query.eq(HugeKeys.LABEL, vl.id());
query.eq(HugeKeys.PROPERTY_KEY, "default");
query.showHidden(true);
Iterator<Vertex> vertexIterator = tx.queryVertices(query);
if (vertexIterator.hasNext()) {
Vertex next = vertexIterator.next();
String node = (String) next.property(P.NODE).value();
Long clock = (Long) next.property(P.CLOCK).value();
Integer epoch = (Integer) next.property(P.EPOCH).value();

RoleTypeData roleTypeData = new RoleTypeData(node, epoch, clock);

return Optional.of(roleTypeData);
}

return Optional.empty();
}

public static final class P {

public static final String ROLE_DATA = Graph.Hidden.hide("role_data");

public static final String LABEL = T.label.getAccessor();

public static final String NODE = Graph.Hidden.hide("role_node");

public static final String CLOCK = Graph.Hidden.hide("role_clock");

public static final String EPOCH = Graph.Hidden.hide("role_epoch");

public static final String TYPE = Graph.Hidden.hide("role_type");
}


public static final class Schema extends SchemaDefine {

public Schema(HugeGraphParams graph) {
super(graph, P.ROLE_DATA);
}

@Override
public void initSchemaIfNeeded() {
if (this.existVertexLabel(this.label)) {
return;
}

String[] properties = this.initProperties();

VertexLabel label = this.schema().vertexLabel(this.label)
.enableLabelIndex(true)
.usePrimaryKeyId()
.primaryKeys(P.TYPE)
.properties(properties)
.build();
this.graph.schemaTransaction().addVertexLabel(label);
}

private String[] initProperties() {
List<String> props = new ArrayList<>();

props.add(createPropertyKey(P.NODE, DataType.TEXT));
props.add(createPropertyKey(P.CLOCK, DataType.LONG));
props.add(createPropertyKey(P.EPOCH, DataType.INT));
props.add(createPropertyKey(P.TYPE, DataType.TEXT));

return super.initProperties(props);
}
}
}
Loading

0 comments on commit c822b43

Please sign in to comment.