Skip to content

Commit

Permalink
chore: improve stable
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxxoo committed Mar 1, 2023
1 parent e927910 commit c9f0c7b
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
@Path("graphs/{graph}/jobs/gremlin")
@Singleton
@Tag(name = "GremlinAPI")
@RedirectFilter.RedirectMasterRole
public class GremlinAPI extends API {

private static final Logger LOG = Log.logger(GremlinAPI.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
@Path("graphs/{graph}/jobs/rebuild")
@Singleton
@Tag(name = "RebuildAPI")
@RedirectFilter.RedirectMasterRole
public class RebuildAPI extends API {

private static final Logger LOG = Log.logger(RebuildAPI.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ public static synchronized CoreOptions instance() {
"server.role.fail_count",
"The role state machine fail count exceeds",
rangeInt(0, Integer.MAX_VALUE),
2
5
);

public static final ConfigOption<String> NODE_EXTERNAL_URL =
Expand All @@ -648,31 +648,31 @@ public static synchronized CoreOptions instance() {
"server.role.random_timeout",
"The role state machine random timeout millisecond time",
rangeInt(0, Integer.MAX_VALUE),
400
1000
);

public static final ConfigOption<Integer> HEARTBEAT_INTERVAL_SECOUND =
new ConfigOption<>(
"server.role.heartbeat_interval",
"The role state machine heartbeat interval second time",
rangeInt(0, Integer.MAX_VALUE),
1
2
);

public static final ConfigOption<Integer> EXCEEDS_WORKER_COUNT =
new ConfigOption<>(
"server.role.worker_count",
"Check the number of times that the master node does not initiate " +
"the heartbeat threshold",
"the heartbeat threshold",
rangeInt(0, Integer.MAX_VALUE),
5
10
);

public static final ConfigOption<Integer> BASE_TIMEOUT_MILLISECOND =
new ConfigOption<>(
"server.role.base_timeout",
"The role state machine candidate state base timeout time",
rangeInt(0, Integer.MAX_VALUE),
100
500
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.locks.LockSupport;

import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

public class StandardRoleElectionStateMachine implements RoleElectionStateMachine {

Expand All @@ -30,6 +32,8 @@ public class StandardRoleElectionStateMachine implements RoleElectionStateMachin
private volatile RoleState state;
private final RoleTypeDataAdapter roleTypeDataAdapter;

private static final Logger LOG = Log.logger(StandardRoleElectionStateMachine.class);

public StandardRoleElectionStateMachine(Config config, RoleTypeDataAdapter adapter) {
this.config = config;
this.roleTypeDataAdapter = adapter;
Expand All @@ -49,7 +53,9 @@ public void apply(StateMachineCallback stateMachineCallback) {
while (!this.shutdown) {
E.checkArgumentNotNull(this.state, "State don't be null");
try {
RoleState pre = this.state;
this.state = state.transform(context);
LOG.trace("server {} epoch {} role state change {} to {}", context.node(), context.epoch(), pre.getClass().getSimpleName(), this.state.getClass().getSimpleName());
Callback runnable = this.state.callback(stateMachineCallback);
runnable.call(context);
failCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,46 +34,55 @@
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.DataType;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.hugegraph.util.Log;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;

public class StandardRoleTypeDataAdapter implements RoleTypeDataAdapter {

private final HugeGraphParams graphParams;
private final Schema schema;

private static final Logger LOG = Log.logger(StandardRoleTypeDataAdapter.class);

private boolean first;

public StandardRoleTypeDataAdapter(HugeGraphParams graph) {
this.graphParams = graph;
this.schema = new Schema(graph);
this.schema.initSchemaIfNeeded();
this.first = true;
}

@Override
public boolean updateIfNodePresent(RoleTypeData stateData) {
// if epoch increase, update and return true
// if epoch equal, ignore different node, return false
try {
Optional<Vertex> oldTypeDataOpt = this.queryVertex();
if (oldTypeDataOpt.isPresent()) {
RoleTypeData oldTypeData = this.from(oldTypeDataOpt.get());
if (stateData.epoch() < oldTypeData.epoch()) {
return false;
}

if (stateData.epoch() == oldTypeData.epoch() &&
!Objects.equals(stateData.node(), oldTypeData.node())) {
return false;
}

this.graphParams.systemTransaction().removeVertex((HugeVertex) oldTypeDataOpt.get());
this.graphParams.systemTransaction().commitOrRollback();
Optional<Vertex> oldTypeDataOpt = this.queryVertex();
if (oldTypeDataOpt.isPresent()) {
RoleTypeData oldTypeData = this.from(oldTypeDataOpt.get());
if (stateData.epoch() < oldTypeData.epoch()) {
return false;
}

if (stateData.epoch() == oldTypeData.epoch() &&
!Objects.equals(stateData.node(), oldTypeData.node())) {
return false;
}
LOG.trace("Server {} epoch {} begin remove data old epoch {}, ", stateData.node(), stateData.epoch(), oldTypeData.epoch());
this.graphParams.systemTransaction().removeVertex((HugeVertex) oldTypeDataOpt.get());
this.graphParams.systemTransaction().commitOrRollback();
LOG.trace("Server {} epoch {} success remove data old epoch {}, ", stateData.node(), stateData.epoch(), oldTypeData.epoch());
}
try {
GraphTransaction tx = this.graphParams.systemTransaction();
tx.doUpdateIfAbsent(this.constructEntry(stateData));
tx.commitOrRollback();
LOG.trace("Server {} epoch {} success update data", stateData.node(), stateData.epoch());
} catch (Throwable ignore){
LOG.trace("Server {} epoch {} fail update data", stateData.node(), stateData.epoch());
return false;
}

Expand Down Expand Up @@ -109,6 +118,16 @@ private BackendEntry constructEntry(RoleTypeData stateData) {
@Override
public Optional<RoleTypeData> query() {
Optional<Vertex> vertex = this.queryVertex();
if (!vertex.isPresent() && !this.first) {
// If query nothing, retry once
try {
Thread.sleep(200);
} catch (InterruptedException ignore) {
}

vertex = this.queryVertex();
}
this.first = false;
return vertex.map(this::from);
}

Expand All @@ -131,7 +150,6 @@ private Optional<Vertex> queryVertex() {
query.showHidden(true);
Iterator<Vertex> vertexIterator = tx.queryVertices(query);
if (vertexIterator.hasNext()) {

return Optional.of(vertexIterator.next());
}

Expand Down Expand Up @@ -171,11 +189,11 @@ public void initSchemaIfNeeded() {
String[] properties = this.initProperties();

VertexLabel label = this.schema().vertexLabel(this.label)
.enableLabelIndex(true)
.usePrimaryKeyId()
.primaryKeys(P.TYPE)
.properties(properties)
.build();
.enableLabelIndex(true)
.usePrimaryKeyId()
.primaryKeys(P.TYPE)
.properties(properties)
.build();
this.graph.schemaTransaction().addVertexLabel(label);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,6 @@ public void onAsRoleWorker(StateMachineContext context) {

@Override
public void onAsRoleCandidate(StateMachineContext context) {
if (isMaster) {
this.taskManager.onAsRoleWorker();
this.initGlobalMasterInfo(context);
LOG.info("Server {} change to worker role", context.config().node());
}

isMaster = false;
}

@Override
Expand Down

0 comments on commit c9f0c7b

Please sign in to comment.