Skip to content

Commit

Permalink
chore: improve code
Browse files Browse the repository at this point in the history
  • Loading branch information
zyxxoo committed Mar 5, 2023
1 parent ebef96d commit c5db1cf
Show file tree
Hide file tree
Showing 13 changed files with 95 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class RedirectFilter implements ContainerRequestFilter {

@Override
public void filter(ContainerRequestContext context) throws IOException {
ServiceHandle<GraphManager> handle = managerProvider.getHandle();
ServiceHandle<GraphManager> handle = this.managerProvider.getHandle();
E.checkState(handle != null, "Context GraphManager is absent");
GraphManager manager = handle.getService();
E.checkState(manager != null, "Context GraphManager is absent");
Expand All @@ -96,11 +96,10 @@ public void filter(ContainerRequestContext context) throws IOException {
URI redirectUri = null;
try {
URIBuilder redirectURIBuilder = new URIBuilder(context.getUriInfo().getRequestUri());
String[] host = url.split(":");
redirectURIBuilder.setHost(host[0]);
if (host.length == 2 && StringUtils.isNotEmpty(host[1].trim())) {
redirectURIBuilder.setPort(Integer.parseInt(host[1].trim()));
}
URI masterURI = URI.create(url);
redirectURIBuilder.setHost(masterURI.getHost());
redirectURIBuilder.setPort(masterURI.getPort());
redirectURIBuilder.setScheme(masterURI.getScheme());

redirectUri = redirectURIBuilder.build();
} catch (URISyntaxException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void setup(HugeConfig config) {

private void addRoleWorkerConfig(HugeConfig graphConfig, HugeConfig config) {
graphConfig.addProperty(CoreOptions.NODE_EXTERNAL_URL.name(),
config.get(CoreOptions.NODE_EXTERNAL_URL));
config.get(ServerOptions.REST_SERVER_URL));
graphConfig.addProperty(CoreOptions.BASE_TIMEOUT_MILLISECOND.name(),
config.get(CoreOptions.BASE_TIMEOUT_MILLISECOND));
graphConfig.addProperty(CoreOptions.EXCEEDS_FAIL_COUNT.name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ private void loadGraph(String name, String graphConfPath) {

private void addRoleWorkerConfig(HugeConfig config) {
config.addProperty(CoreOptions.NODE_EXTERNAL_URL.name(),
this.conf.get(CoreOptions.NODE_EXTERNAL_URL));
this.conf.get(ServerOptions.REST_SERVER_URL));
config.addProperty(CoreOptions.BASE_TIMEOUT_MILLISECOND.name(),
this.conf.get(CoreOptions.BASE_TIMEOUT_MILLISECOND));
config.addProperty(CoreOptions.EXCEEDS_FAIL_COUNT.name(),
Expand Down Expand Up @@ -499,7 +499,7 @@ private boolean supportRoleStateWorker() {
try {
if (!(this.authenticator() instanceof StandardAuthenticator)) {
LOG.info("{} authenticator does not support role election currently",
this.authenticator().getClass().getSimpleName());
this.authenticator().getClass().getSimpleName());
return false;
}
} catch (IllegalStateException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.election.ClusterRoleStore;
import org.apache.hugegraph.election.Config;
import org.apache.hugegraph.election.HugeRoleStateMachineConfig;
import org.apache.hugegraph.election.RoleElectionStateMachine;
import org.apache.hugegraph.election.RoleTypeDataAdapter;
import org.apache.hugegraph.election.StandardRoleElectionStateMachine;
import org.apache.hugegraph.election.StandardRoleTypeDataAdapter;
import org.apache.hugegraph.election.StandardClusterRoleStore;
import org.apache.hugegraph.io.HugeGraphIoRegistry;
import org.apache.hugegraph.rpc.RpcServiceConfig4Client;
import org.apache.hugegraph.rpc.RpcServiceConfig4Server;
Expand Down Expand Up @@ -294,7 +294,7 @@ private void initRoleStateWorker(Id serverId) {
this.configuration.get(CoreOptions.HEARTBEAT_INTERVAL_SECOUND),
this.configuration.get(CoreOptions.EXCEEDS_WORKER_COUNT),
this.configuration.get(CoreOptions.BASE_TIMEOUT_MILLISECOND));
RoleTypeDataAdapter adapter = new StandardRoleTypeDataAdapter(this.params);
ClusterRoleStore adapter = new StandardClusterRoleStore(this.params);
this.roleElectionStateMachine = new StandardRoleElectionStateMachine(roleStateMachineConfig, adapter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ public boolean equals(Object obj) {
if (!(obj instanceof ClusterRole)) {
return false;
}
ClusterRole metaData = (ClusterRole) obj;
return clock == metaData.clock &&
epoch == metaData.epoch &&
Objects.equals(node, metaData.node);
ClusterRole clusterRole = (ClusterRole) obj;
return clock == clusterRole.clock &&
epoch == clusterRole.epoch &&
Objects.equals(node, clusterRole.node);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import java.util.Optional;

public interface RoleTypeDataAdapter {
public interface ClusterRoleStore {

boolean updateIfNodePresent(ClusterRole stateData);
boolean updateIfNodePresent(ClusterRole clusterRole);

Optional<ClusterRole> query();
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,74 +40,74 @@
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;

public class StandardRoleTypeDataAdapter implements RoleTypeDataAdapter {
public class StandardClusterRoleStore implements ClusterRoleStore {

private static final Logger LOG = Log.logger(StandardRoleTypeDataAdapter.class);
private static final Logger LOG = Log.logger(StandardClusterRoleStore.class);
public static final int RETRY_QUERY_TIMEOUT = 200;

private final HugeGraphParams graph;
private final Schema schema;

private boolean firstTime;

public StandardRoleTypeDataAdapter(HugeGraphParams graph) {
public StandardClusterRoleStore(HugeGraphParams graph) {
this.graph = graph;
this.schema = new Schema(graph);
this.schema.initSchemaIfNeeded();
this.firstTime = true;
}

@Override
public boolean updateIfNodePresent(ClusterRole stateData) {
public boolean updateIfNodePresent(ClusterRole clusterRole) {
// if epoch increase, update and return true
// if epoch equal, ignore different node, return false
Optional<Vertex> oldTypeDataOpt = this.queryVertex();
if (oldTypeDataOpt.isPresent()) {
ClusterRole oldTypeData = this.from(oldTypeDataOpt.get());
if (stateData.epoch() < oldTypeData.epoch()) {
Optional<Vertex> oldClusterRoleOpt = this.queryVertex();
if (oldClusterRoleOpt.isPresent()) {
ClusterRole oldClusterRole = this.from(oldClusterRoleOpt.get());
if (clusterRole.epoch() < oldClusterRole.epoch()) {
return false;
}

if (stateData.epoch() == oldTypeData.epoch() &&
!Objects.equals(stateData.node(), oldTypeData.node())) {
if (clusterRole.epoch() == oldClusterRole.epoch() &&
!Objects.equals(clusterRole.node(), oldClusterRole.node())) {
return false;
}
LOG.trace("Server {} epoch {} begin remove data old epoch {}, ",
stateData.node(), stateData.epoch(), oldTypeData.epoch());
this.graph.systemTransaction().removeVertex((HugeVertex) oldTypeDataOpt.get());
clusterRole.node(), clusterRole.epoch(), oldClusterRole.epoch());
this.graph.systemTransaction().removeVertex((HugeVertex) oldClusterRoleOpt.get());
this.graph.systemTransaction().commitOrRollback();
LOG.trace("Server {} epoch {} success remove data old epoch {}, ",
stateData.node(), stateData.epoch(), oldTypeData.epoch());
clusterRole.node(), clusterRole.epoch(), oldClusterRole.epoch());
}
try {
GraphTransaction tx = this.graph.systemTransaction();
tx.doUpdateIfAbsent(this.constructEntry(stateData));
tx.doUpdateIfAbsent(this.constructEntry(clusterRole));
tx.commitOrRollback();
LOG.trace("Server {} epoch {} success update data", stateData.node(), stateData.epoch());
LOG.trace("Server {} epoch {} success update data", clusterRole.node(), clusterRole.epoch());
} catch (Throwable ignore){
LOG.trace("Server {} epoch {} fail update data", stateData.node(), stateData.epoch());
LOG.trace("Server {} epoch {} fail update data", clusterRole.node(), clusterRole.epoch());
return false;
}

return true;
}

private BackendEntry constructEntry(ClusterRole stateData) {
private BackendEntry constructEntry(ClusterRole clusterRole) {
List<Object> list = new ArrayList<>(8);
list.add(T.label);
list.add(P.ROLE_DATA);

list.add(P.NODE);
list.add(stateData.node());
list.add(clusterRole.node());

list.add(P.URL);
list.add(stateData.url());
list.add(clusterRole.url());

list.add(P.CLOCK);
list.add(stateData.clock());
list.add(clusterRole.clock());

list.add(P.EPOCH);
list.add(stateData.epoch());
list.add(clusterRole.epoch());

list.add(P.TYPE);
list.add("default");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ public class StandardRoleElectionStateMachine implements RoleElectionStateMachin
private volatile boolean shutdown;
private final Config config;
private volatile RoleState state;
private final RoleTypeDataAdapter roleTypeDataAdapter;
private final ClusterRoleStore clusterRoleStore;

private static final Logger LOG = Log.logger(StandardRoleElectionStateMachine.class);

public StandardRoleElectionStateMachine(Config config, RoleTypeDataAdapter adapter) {
public StandardRoleElectionStateMachine(Config config, ClusterRoleStore adapter) {
this.config = config;
this.roleTypeDataAdapter = adapter;
this.clusterRoleStore = adapter;
this.state = new UnknownState(null);
this.shutdown = false;
}
Expand Down Expand Up @@ -110,30 +110,30 @@ public UnknownState(Integer epoch) {

@Override
public RoleState transform(StateMachineContext context) {
RoleTypeDataAdapter adapter = context.adapter();
Optional<ClusterRole> roleTypeDataOpt = adapter.query();
if (!roleTypeDataOpt.isPresent()) {
ClusterRoleStore adapter = context.adapter();
Optional<ClusterRole> clusterRoleOpt = adapter.query();
if (!clusterRoleOpt.isPresent()) {
context.reset();
Integer nextEpoch = this.epoch == null ? 1 : this.epoch + 1;
context.epoch(nextEpoch);
return new CandidateState(nextEpoch);
}

ClusterRole roleTypeData = roleTypeDataOpt.get();
if (this.epoch != null && roleTypeData.epoch() < this.epoch) {
ClusterRole clusterRole = clusterRoleOpt.get();
if (this.epoch != null && clusterRole.epoch() < this.epoch) {
context.reset();
Integer nextEpoch = this.epoch + 1;
context.epoch(nextEpoch);
return new CandidateState(nextEpoch);
}

context.epoch(roleTypeData.epoch());
context.epoch(clusterRole.epoch());
context.master(new MasterServerInfoImpl(
roleTypeData.node(), roleTypeData.url()));
if (roleTypeData.isMaster(context.node())) {
return new MasterState(roleTypeData);
clusterRole.node(), clusterRole.url()));
if (clusterRole.isMaster(context.node())) {
return new MasterState(clusterRole);
} else {
return new WorkerState(roleTypeData);
return new WorkerState(clusterRole);
}
}

Expand Down Expand Up @@ -166,22 +166,22 @@ public Callback callback(StateMachineCallback callback) {

private static class MasterState implements RoleState {

private final ClusterRole roleTypeData;
private final ClusterRole clusterRole;

public MasterState(ClusterRole roleTypeData) {
this.roleTypeData = roleTypeData;
public MasterState(ClusterRole clusterRole) {
this.clusterRole = clusterRole;
}

@Override
public RoleState transform(StateMachineContext context) {
this.roleTypeData.increaseClock();
this.clusterRole.increaseClock();
RoleState.heartBeatPark(context);
if (context.adapter().updateIfNodePresent(this.roleTypeData)) {
if (context.adapter().updateIfNodePresent(this.clusterRole)) {
return this;
}
context.reset();
context.epoch(this.roleTypeData.epoch());
return new UnknownState(this.roleTypeData.epoch()).transform(context);
context.epoch(this.clusterRole.epoch());
return new UnknownState(this.clusterRole.epoch()).transform(context);
}

@Override
Expand All @@ -192,22 +192,22 @@ public Callback callback(StateMachineCallback callback) {

private static class WorkerState implements RoleState {

private ClusterRole roleTypeData;
private ClusterRole clusterRole;
private int clock;

public WorkerState(ClusterRole roleTypeData) {
this.roleTypeData = roleTypeData;
public WorkerState(ClusterRole clusterRole) {
this.clusterRole = clusterRole;
this.clock = 0;
}

@Override
public RoleState transform(StateMachineContext context) {
RoleState.heartBeatPark(context);
RoleState nextState = new UnknownState(this.roleTypeData.epoch()).transform(context);
RoleState nextState = new UnknownState(this.clusterRole.epoch()).transform(context);
if (nextState instanceof WorkerState) {
this.merge((WorkerState) nextState);
if (this.clock > context.config().exceedsWorkerCount()) {
return new CandidateState(this.roleTypeData.epoch() + 1);
return new CandidateState(this.clusterRole.epoch() + 1);
} else {
return this;
}
Expand All @@ -222,18 +222,18 @@ public Callback callback(StateMachineCallback callback) {
}

public void merge(WorkerState state) {
if (state.roleTypeData.epoch() > this.roleTypeData.epoch()) {
if (state.clusterRole.epoch() > this.clusterRole.epoch()) {
this.clock = 0;
this.roleTypeData = state.roleTypeData;
} else if (state.roleTypeData.epoch() < this.roleTypeData.epoch()) {
this.clusterRole = state.clusterRole;
} else if (state.clusterRole.epoch() < this.clusterRole.epoch()) {
throw new IllegalStateException("Epoch must increase");
} else if (state.roleTypeData.epoch() == this.roleTypeData.epoch() &&
state.roleTypeData.clock() < this.roleTypeData.clock()) {
} else if (state.clusterRole.epoch() == this.clusterRole.epoch() &&
state.clusterRole.clock() < this.clusterRole.clock()) {
throw new IllegalStateException("Clock must increase");
} else if (state.roleTypeData.epoch() == this.roleTypeData.epoch() &&
state.roleTypeData.clock() > this.roleTypeData.clock()) {
} else if (state.clusterRole.epoch() == this.clusterRole.epoch() &&
state.clusterRole.clock() > this.clusterRole.clock()) {
this.clock = 0;
this.roleTypeData = state.roleTypeData;
this.clusterRole = state.clusterRole;
} else {
this.clock++;
}
Expand All @@ -252,14 +252,14 @@ public CandidateState(Integer epoch) {
public RoleState transform(StateMachineContext context) {
RoleState.randomPark(context);
int epoch = this.epoch == null ? 1 : this.epoch;
ClusterRole roleTypeData = new ClusterRole(context.config().node(),
context.config().url(), epoch);
ClusterRole clusterRole = new ClusterRole(context.config().node(),
context.config().url(), epoch);
// The master failover completed
context.epoch(roleTypeData.epoch());
if (context.adapter().updateIfNodePresent(roleTypeData)) {
context.epoch(clusterRole.epoch());
if (context.adapter().updateIfNodePresent(clusterRole)) {
context.master(new MasterServerInfoImpl(
roleTypeData.node(), roleTypeData.url()));
return new MasterState(roleTypeData);
clusterRole.node(), clusterRole.url()));
return new MasterState(clusterRole);
} else {
return new UnknownState(epoch).transform(context);
}
Expand Down Expand Up @@ -305,7 +305,7 @@ public void epoch(Integer epoch) {
}

@Override
public RoleTypeDataAdapter adapter() {
public ClusterRoleStore adapter() {
return this.machine.adapter();
}

Expand Down Expand Up @@ -351,7 +351,7 @@ public String node() {
}
}

protected RoleTypeDataAdapter adapter() {
return this.roleTypeDataAdapter;
protected ClusterRoleStore adapter() {
return this.clusterRoleStore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface StateMachineContext {

void master(MasterServerInfo info);

RoleTypeDataAdapter adapter();
ClusterRoleStore adapter();

void reset();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#

restserver.url=http://127.0.0.1:8080
server.role.node_external_url=127.0.0.1:8080
gremlinserver.url=http://127.0.0.1:8181
graphs=conf/graphs
auth.authenticator=org.apache.hugegraph.auth.StandardAuthenticator
Expand Down
Loading

0 comments on commit c5db1cf

Please sign in to comment.