From c822b439a157200bf1241b9647d53346bbb928f2 Mon Sep 17 00:00:00 2001 From: vaughn Date: Sun, 26 Feb 2023 19:24:44 +0800 Subject: [PATCH] feat: support task auto manager by server role --- .../hugegraph/auth/ConfigAuthenticator.java | 7 + .../hugegraph/auth/HugeAuthenticator.java | 3 + .../hugegraph/auth/StandardAuthenticator.java | 11 +- .../hugegraph/config/ServerOptions.java | 41 +++++ .../apache/hugegraph/core/GraphManager.java | 45 +++++ .../core/RoleTypeDataAdapterImpl.java | 163 ++++++++++++++++++ .../core/StateMachineCallbackImpl.java | 91 ++++++++++ .../apache/hugegraph/StandardHugeGraph.java | 4 + .../election/HugeRoleStateMachineConfig.java | 70 ++++++++ .../hugegraph/task/ServerInfoManager.java | 16 +- .../hugegraph/task/StandardTaskScheduler.java | 4 + .../apache/hugegraph/task/TaskManager.java | 33 +++- 12 files changed, 480 insertions(+), 8 deletions(-) create mode 100644 hugegraph-api/src/main/java/org/apache/hugegraph/core/RoleTypeDataAdapterImpl.java create mode 100644 hugegraph-api/src/main/java/org/apache/hugegraph/core/StateMachineCallbackImpl.java create mode 100644 hugegraph-core/src/main/java/org/apache/hugegraph/election/HugeRoleStateMachineConfig.java diff --git a/hugegraph-api/src/main/java/org/apache/hugegraph/auth/ConfigAuthenticator.java b/hugegraph-api/src/main/java/org/apache/hugegraph/auth/ConfigAuthenticator.java index 0f914cff4c..834004d2f4 100644 --- a/hugegraph-api/src/main/java/org/apache/hugegraph/auth/ConfigAuthenticator.java +++ b/hugegraph-api/src/main/java/org/apache/hugegraph/auth/ConfigAuthenticator.java @@ -23,6 +23,7 @@ import java.util.Objects; import org.apache.commons.lang.NotImplementedException; +import org.apache.hugegraph.HugeGraph; import org.apache.tinkerpop.gremlin.groovy.jsr223.dsl.credential.CredentialGraphTokens; import org.apache.hugegraph.backend.id.IdGenerator; @@ -87,6 +88,12 @@ public AuthManager authManager() { "AuthManager is unsupported by ConfigAuthenticator"); } + @Override + public HugeGraph graph() { + throw new NotImplementedException( + "AuthManager is unsupported by ConfigAuthenticator"); + } + @Override public void initAdminUser(String password) throws Exception { String adminToken = this.tokens.get(USER_ADMIN); diff --git a/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeAuthenticator.java b/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeAuthenticator.java index 82ac59d593..186f2605cd 100644 --- a/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeAuthenticator.java +++ b/hugegraph-api/src/main/java/org/apache/hugegraph/auth/HugeAuthenticator.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; +import org.apache.hugegraph.HugeGraph; import org.apache.tinkerpop.gremlin.groovy.jsr223.dsl.credential.CredentialGraphTokens; import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser; import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException; @@ -66,6 +67,8 @@ public interface HugeAuthenticator extends Authenticator { AuthManager authManager(); + HugeGraph graph(); + @Override default void setup(final Map config) { E.checkState(config != null, diff --git a/hugegraph-api/src/main/java/org/apache/hugegraph/auth/StandardAuthenticator.java b/hugegraph-api/src/main/java/org/apache/hugegraph/auth/StandardAuthenticator.java index 2cda2333e4..9a7a8dc134 100644 --- a/hugegraph-api/src/main/java/org/apache/hugegraph/auth/StandardAuthenticator.java +++ b/hugegraph-api/src/main/java/org/apache/hugegraph/auth/StandardAuthenticator.java @@ -41,11 +41,6 @@ public class StandardAuthenticator implements HugeAuthenticator { private HugeGraph graph = null; - private HugeGraph graph() { - E.checkState(this.graph != null, "Must setup Authenticator first"); - return this.graph; - } - private void initAdminUser() throws Exception { if (this.requireInitAdminUser()) { this.initAdminUser(this.inputPassword()); @@ -53,6 +48,12 @@ private void initAdminUser() throws Exception { this.graph.close(); } + @Override + public HugeGraph graph() { + E.checkState(this.graph != null, "Must setup Authenticator first"); + return this.graph; + } + @Override public void initAdminUser(String password) { // Not allowed to call by non main thread diff --git a/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java b/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java index 95a53faa39..0f164182ca 100644 --- a/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java +++ b/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java @@ -64,6 +64,47 @@ public static synchronized ServerOptions instance() { "master" ); + public static final ConfigOption EXCEEDS_FAIL_COUNT = + new ConfigOption<>( + "server.role.fail.count", + "The role state machine fail count exceeds", + rangeInt(0, Integer.MAX_VALUE), + 2 + ); + + public static final ConfigOption RANDOM_TIMEOUT_MILLISECOND = + new ConfigOption<>( + "server.role.random.timeout", + "The role state machine random timeout millisecond time", + rangeInt(0, Integer.MAX_VALUE), + 400 + ); + + public static final ConfigOption HEARTBEAT_INTERVAL_SECOUND = + new ConfigOption<>( + "server.role.heartbeat.interval", + "The role state machine heartbeat interval second time", + rangeInt(0, Integer.MAX_VALUE), + 1 + ); + + public static final ConfigOption EXCEEDS_WORKER_COUNT = + new ConfigOption<>( + "server.role.worker.count", + "Check the number of times that the master node does not initiate " + + "the heartbeat threshold", + rangeInt(0, Integer.MAX_VALUE), + 5 + ); + + public static final ConfigOption BASE_TIMEOUT_MILLISECOND = + new ConfigOption<>( + "server.role.base.timeout", + "The role state machine candidate state base timeout time", + rangeInt(0, Integer.MAX_VALUE), + 100 + ); + public static final ConfigOption MAX_WORKER_THREADS = new ConfigOption<>( "restserver.max_worker_threads", diff --git a/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java b/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java index 57a99b7326..fdf3443ae4 100644 --- a/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java +++ b/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java @@ -22,12 +22,21 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.commons.lang3.StringUtils; +import org.apache.hugegraph.StandardHugeGraph; +import org.apache.hugegraph.auth.StandardAuthenticator; +import org.apache.hugegraph.election.Config; +import org.apache.hugegraph.election.HugeRoleStateMachineConfig; +import org.apache.hugegraph.election.RoleElectionStateMachine; +import org.apache.hugegraph.election.RoleElectionStateMachineImpl; +import org.apache.hugegraph.election.RoleTypeDataAdapter; import org.apache.tinkerpop.gremlin.server.auth.AuthenticationException; import org.apache.tinkerpop.gremlin.server.util.MetricManager; import org.apache.tinkerpop.gremlin.structure.Graph; @@ -82,6 +91,9 @@ public final class GraphManager { private final RpcClientProvider rpcClient; private final HugeConfig conf; + private RoleElectionStateMachine roleStateMachine; + private Executor applyThread; + private Id server; private NodeRole role; @@ -265,6 +277,9 @@ public void close() { } this.destroyRpcServer(); this.unlistenChanges(); + if (this.roleStateMachine != null) { + this.roleStateMachine.shutdown(); + } } private void startRpcServer() { @@ -428,6 +443,9 @@ private void serverStarted(HugeConfig config) { "The server role can't be null or empty"); this.server = IdGenerator.of(server); this.role = NodeRole.valueOf(role.toUpperCase()); + + initRoleStateMachine(config, server); + for (String graph : this.graphs()) { HugeGraph hugegraph = this.graph(graph); assert hugegraph != null; @@ -435,6 +453,33 @@ private void serverStarted(HugeConfig config) { } } + private void initRoleStateMachine(HugeConfig config, String server) { + try { + if (!(this.authenticator() instanceof StandardAuthenticator)) { + LOG.info("Current not support role state machine"); + return; + } + } catch (IllegalStateException e) { + LOG.info("Current not support role state machine"); + return; + } + + E.checkArgument(this.roleStateMachine == null, "Repetition init"); + Config roleStateMachineConfig = new HugeRoleStateMachineConfig(server, + config.get(ServerOptions.EXCEEDS_FAIL_COUNT), + config.get(ServerOptions.RANDOM_TIMEOUT_MILLISECOND), + config.get(ServerOptions.HEARTBEAT_INTERVAL_SECOUND), + config.get(ServerOptions.EXCEEDS_WORKER_COUNT), + config.get(ServerOptions.BASE_TIMEOUT_MILLISECOND)); + StandardHugeGraph graph = (StandardHugeGraph) this.authenticator().graph(); + RoleTypeDataAdapter adapter = new RoleTypeDataAdapterImpl(graph.hugeGraphParams()); + this.roleStateMachine = new RoleElectionStateMachineImpl(roleStateMachineConfig, adapter); + applyThread = Executors.newSingleThreadExecutor(); + applyThread.execute(() -> { + this.roleStateMachine.apply(new StateMachineCallbackImpl(TaskManager.instance())); + }); + } + private void addMetrics(HugeConfig config) { final MetricManager metric = MetricManager.INSTANCE; // Force to add server reporter diff --git a/hugegraph-api/src/main/java/org/apache/hugegraph/core/RoleTypeDataAdapterImpl.java b/hugegraph-api/src/main/java/org/apache/hugegraph/core/RoleTypeDataAdapterImpl.java new file mode 100644 index 0000000000..cbb0d7058e --- /dev/null +++ b/hugegraph-api/src/main/java/org/apache/hugegraph/core/RoleTypeDataAdapterImpl.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.core; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; + +import org.apache.hugegraph.HugeGraphParams; +import org.apache.hugegraph.auth.SchemaDefine; +import org.apache.hugegraph.backend.query.ConditionQuery; +import org.apache.hugegraph.backend.store.BackendEntry; +import org.apache.hugegraph.backend.tx.GraphTransaction; +import org.apache.hugegraph.election.RoleTypeData; +import org.apache.hugegraph.election.RoleTypeDataAdapter; +import org.apache.hugegraph.schema.VertexLabel; +import org.apache.hugegraph.structure.HugeVertex; +import org.apache.hugegraph.type.HugeType; +import org.apache.hugegraph.type.define.DataType; +import org.apache.hugegraph.type.define.HugeKeys; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.T; +import org.apache.tinkerpop.gremlin.structure.Vertex; + +public class RoleTypeDataAdapterImpl implements RoleTypeDataAdapter { + + private final HugeGraphParams graphParams; + private final Schema schema; + + public RoleTypeDataAdapterImpl(HugeGraphParams graphParams) { + this.graphParams = graphParams; + this.schema = new Schema(graphParams); + this.schema.initSchemaIfNeeded(); + } + + @Override + public boolean updateIfNodePresent(RoleTypeData stateData) { + // if epoch increase, update and return true + // if epoch equal, ignore different node, return false + try { + GraphTransaction tx = this.graphParams.systemTransaction(); + tx.doUpdateIfPresent(this.constructEntry(stateData)); + tx.commitOrRollback(); + } catch (Throwable ignore){ + return false; + } + + return true; + } + + BackendEntry constructEntry(RoleTypeData stateData) { + List list = new ArrayList<>(8); + list.add(P.LABEL); + list.add(P.ROLE_DATA); + + list.add(P.NODE); + list.add(stateData.node()); + + list.add(P.CLOCK); + list.add(stateData.clock()); + + list.add(P.EPOCH); + list.add(stateData.epoch()); + + list.add(P.TYPE); + list.add("default"); + + HugeVertex vertex = this.graphParams.systemTransaction() + .constructVertex(false, list); + + return this.graphParams.serializer().writeVertex(vertex); + } + + @Override + public Optional query() { + GraphTransaction tx = this.graphParams.systemTransaction(); + ConditionQuery query = new ConditionQuery(HugeType.VERTEX); + VertexLabel vl = this.graphParams.graph().vertexLabel(P.ROLE_DATA); + query.eq(HugeKeys.LABEL, vl.id()); + query.eq(HugeKeys.PROPERTY_KEY, "default"); + query.showHidden(true); + Iterator vertexIterator = tx.queryVertices(query); + if (vertexIterator.hasNext()) { + Vertex next = vertexIterator.next(); + String node = (String) next.property(P.NODE).value(); + Long clock = (Long) next.property(P.CLOCK).value(); + Integer epoch = (Integer) next.property(P.EPOCH).value(); + + RoleTypeData roleTypeData = new RoleTypeData(node, epoch, clock); + + return Optional.of(roleTypeData); + } + + return Optional.empty(); + } + + public static final class P { + + public static final String ROLE_DATA = Graph.Hidden.hide("role_data"); + + public static final String LABEL = T.label.getAccessor(); + + public static final String NODE = Graph.Hidden.hide("role_node"); + + public static final String CLOCK = Graph.Hidden.hide("role_clock"); + + public static final String EPOCH = Graph.Hidden.hide("role_epoch"); + + public static final String TYPE = Graph.Hidden.hide("role_type"); + } + + + public static final class Schema extends SchemaDefine { + + public Schema(HugeGraphParams graph) { + super(graph, P.ROLE_DATA); + } + + @Override + public void initSchemaIfNeeded() { + if (this.existVertexLabel(this.label)) { + return; + } + + String[] properties = this.initProperties(); + + VertexLabel label = this.schema().vertexLabel(this.label) + .enableLabelIndex(true) + .usePrimaryKeyId() + .primaryKeys(P.TYPE) + .properties(properties) + .build(); + this.graph.schemaTransaction().addVertexLabel(label); + } + + private String[] initProperties() { + List props = new ArrayList<>(); + + props.add(createPropertyKey(P.NODE, DataType.TEXT)); + props.add(createPropertyKey(P.CLOCK, DataType.LONG)); + props.add(createPropertyKey(P.EPOCH, DataType.INT)); + props.add(createPropertyKey(P.TYPE, DataType.TEXT)); + + return super.initProperties(props); + } + } +} diff --git a/hugegraph-api/src/main/java/org/apache/hugegraph/core/StateMachineCallbackImpl.java b/hugegraph-api/src/main/java/org/apache/hugegraph/core/StateMachineCallbackImpl.java new file mode 100644 index 0000000000..60df6adf4e --- /dev/null +++ b/hugegraph-api/src/main/java/org/apache/hugegraph/core/StateMachineCallbackImpl.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.core; + +import org.apache.hugegraph.election.StateMachineCallback; +import org.apache.hugegraph.election.StateMachineContext; +import org.apache.hugegraph.task.TaskManager; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +public class StateMachineCallbackImpl implements StateMachineCallback { + + private static final Logger LOG = Log.logger(StateMachineCallbackImpl.class); + + private final TaskManager taskManager; + + boolean isMaster = false; + + public StateMachineCallbackImpl(TaskManager taskManager) { + this.taskManager = taskManager; + this.taskManager.useRoleStateMachine(true); + } + @Override + public void master(StateMachineContext context) { + if (!isMaster) { + this.taskManager.onMaster(); + LOG.info("Server {} change to master role", context.config().node()); + } + this.isMaster = true; + } + + @Override + public void worker(StateMachineContext context) { + if (isMaster) { + this.taskManager.onWorker(); + LOG.info("Server {} change to worker role", context.config().node()); + } + + this.isMaster = false; + } + + @Override + public void candidate(StateMachineContext context) { + if (isMaster) { + this.taskManager.onWorker(); + LOG.info("Server {} change to worker role", context.config().node()); + } + + isMaster = false; + } + + @Override + public void unknown(StateMachineContext context) { + if (isMaster) { + this.taskManager.onWorker(); + LOG.info("Server {} change to worker role", context.config().node()); + } + + isMaster = false; + } + + @Override + public void abdication(StateMachineContext context) { + if (isMaster) { + this.taskManager.onWorker(); + LOG.info("Server {} change to worker role", context.config().node()); + } + + isMaster = false; + } + + @Override + public void error(StateMachineContext context, Throwable e) { + LOG.error("Server {} exception occurred", context.config().node(), e); + } +} diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index 4542fb88c6..5b74c8a1a5 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -1063,6 +1063,10 @@ public String toString() { return StringFactory.graphString(this, this.name()); } + public HugeGraphParams hugeGraphParams() { + return this.params; + } + @Override public final void proxy(HugeGraph graph) { this.params.graph(graph); diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/election/HugeRoleStateMachineConfig.java b/hugegraph-core/src/main/java/org/apache/hugegraph/election/HugeRoleStateMachineConfig.java new file mode 100644 index 0000000000..9c141f5ac9 --- /dev/null +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/election/HugeRoleStateMachineConfig.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.election; + +public class HugeRoleStateMachineConfig implements Config { + + String node; + int exceedsFailCount; + long randomTimeoutMillisecond; + long heartBeatIntervalSecond; + int exceedsWorkerCount; + long baseTimeoutMillisecond; + + public HugeRoleStateMachineConfig(String node, int exceedsFailCount, + long randomTimeoutMillisecond, long heartBeatIntervalSecond, + int exceedsWorkerCount, long baseTimeoutMillisecond) { + this.node = node; + this.exceedsFailCount = exceedsFailCount; + this.randomTimeoutMillisecond = randomTimeoutMillisecond; + this.heartBeatIntervalSecond = heartBeatIntervalSecond; + this.exceedsWorkerCount = exceedsWorkerCount; + this.baseTimeoutMillisecond = baseTimeoutMillisecond; + } + + + @Override + public String node() { + return this.node; + } + + @Override + public int exceedsFailCount() { + return this.exceedsFailCount; + } + + @Override + public long randomTimeoutMillisecond() { + return this.randomTimeoutMillisecond; + } + + @Override + public long heartBeatIntervalSecond() { + return this.heartBeatIntervalSecond; + } + + @Override + public int exceedsWorkerCount() { + return this.exceedsWorkerCount; + } + + @Override + public long baseTimeoutMillisecond() { + return this.baseTimeoutMillisecond; + } +} diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java b/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java index 830b6260cd..e4ca67883c 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java @@ -29,7 +29,6 @@ import org.apache.hugegraph.backend.page.PageInfo; import org.apache.hugegraph.backend.query.Condition; import org.apache.hugegraph.backend.query.ConditionQuery; -import org.apache.hugegraph.backend.query.Query; import org.apache.hugegraph.backend.query.QueryResults; import org.apache.hugegraph.backend.tx.GraphTransaction; import org.apache.hugegraph.schema.PropertyKey; @@ -52,7 +51,7 @@ import org.apache.hugegraph.util.Log; import com.google.common.collect.ImmutableMap; -public class ServerInfoManager { +public class ServerInfoManager{ private static final Logger LOG = Log.logger(ServerInfoManager.class); @@ -104,6 +103,19 @@ public boolean close() { return true; } + public synchronized void forceInitServerInfo(Id server, NodeRole role) { + if (this.closed) { + return; + } + + E.checkArgument(server != null && role != null, + "The server id or role can't be null"); + this.selfServerId = server; + this.selfServerRole = role; + + this.saveServerInfo(this.selfServerId, this.selfServerRole); + } + public synchronized void initServerInfo(Id server, NodeRole role) { E.checkArgument(server != null && role != null, "The server id or role can't be null"); diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java b/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java index b5b95256f3..a8aa673570 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java @@ -318,6 +318,10 @@ protected synchronized void scheduleTasks() { continue; } + if (!this.serverManager.master()) { + return; + } + HugeServerInfo server = this.serverManager().pickWorkerNode( scheduleInfos, task); if (server == null) { diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java index b013181549..5c3a6caa28 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.hugegraph.type.define.NodeRole; import org.apache.hugegraph.util.*; import org.apache.hugegraph.util.Consumers; import org.apache.hugegraph.util.LockUtil; @@ -58,6 +59,8 @@ public final class TaskManager { private final ExecutorService serverInfoDbExecutor; private final PausableScheduledThreadPool schedulerExecutor; + private boolean useRoleStateMachine = false; + public static TaskManager instance() { return MANAGER; } @@ -254,6 +257,34 @@ protected void notifyNewTask(HugeTask task) { } } + public void onMaster() { + try { + for (TaskScheduler entry : this.schedulers.values()) { + StandardTaskScheduler scheduler = (StandardTaskScheduler) entry; + ServerInfoManager serverInfoManager = scheduler.serverManager(); + serverInfoManager.forceInitServerInfo(serverInfoManager.selfServerId(), NodeRole.MASTER); + } + } catch (Throwable e) { + LOG.error("Exception occurred when change to master role", e); + } + } + + public void onWorker() { + try { + for (TaskScheduler entry : this.schedulers.values()) { + StandardTaskScheduler scheduler = (StandardTaskScheduler) entry; + ServerInfoManager serverInfoManager = scheduler.serverManager(); + serverInfoManager.forceInitServerInfo(serverInfoManager.selfServerId(), NodeRole.WORKER); + } + } catch (Throwable e) { + LOG.error("Exception occurred when change to worker role", e); + } + } + + public void useRoleStateMachine(boolean useRoleStateMachine) { + this.useRoleStateMachine = useRoleStateMachine; + } + private void scheduleOrExecuteJob() { // Called by scheduler timer try { @@ -303,7 +334,7 @@ private void scheduleOrExecuteJobForGraph(StandardTaskScheduler scheduler) { */ if (serverManager.master()) { scheduler.scheduleTasks(); - if (!serverManager.onlySingleNode()) { + if (!this.useRoleStateMachine && !serverManager.onlySingleNode()) { return; } }