Skip to content

Commit

Permalink
fix(interactive): handle the case when meta or statistics are not rea…
Browse files Browse the repository at this point in the history
…dy in GOpt (#3986)

Co-authored-by: shirly121 <yihe.zxl@alibaba-inc.com>
  • Loading branch information
BingqingLyu and shirly121 authored Jul 2, 2024
1 parent 238db28 commit c2c9651
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class DynamicIrMetaFetcher extends IrMetaFetcher implements AutoCloseable
private static final Logger logger = LoggerFactory.getLogger(DynamicIrMetaFetcher.class);
private final ScheduledExecutorService scheduler;
private volatile IrMetaStats currentState;
// To manage the state changes of statistics resulting from different update operations.
private volatile StatsState statsState;

public DynamicIrMetaFetcher(Configs configs, IrMetaReader dataReader, IrMetaTracker tracker) {
super(dataReader, tracker);
Expand All @@ -66,20 +68,23 @@ public Optional<IrMeta> fetch() {
private synchronized void syncMeta() {
try {
IrMeta meta = this.reader.readMeta();
GraphStatistics curStats;
// if the graph id is changed, we need to update the statistics
GraphStatistics curStats =
(this.currentState == null
|| !this.currentState.getGraphId().equals(meta.getGraphId()))
? null
: this.currentState.getStatistics();
if (this.currentState == null
|| !this.currentState.getGraphId().equals(meta.getGraphId())) {
this.statsState = StatsState.INITIALIZED;
curStats = null;
} else {
curStats = this.currentState.getStatistics();
}
this.currentState =
new IrMetaStats(
meta.getGraphId(),
meta.getSnapshotId(),
meta.getSchema(),
meta.getStoredProcedures(),
curStats);
if (this.currentState.getStatistics() == null) {
if (this.statsState != StatsState.SYNCED) {
syncStats();
}
} catch (Exception e) {
Expand All @@ -101,15 +106,30 @@ private synchronized void syncStats() {
if (tracker != null) {
tracker.onChanged(this.currentState);
}
this.statsState = StatsState.SYNCED;
}
}
} catch (Exception e) {
logger.warn("failed to read graph statistics, error is {}", e);
} finally {
if (this.currentState != null
&& tracker != null
&& this.statsState == StatsState.INITIALIZED) {
tracker.onChanged(this.currentState);
this.statsState = StatsState.MOCKED;
}
}
}

@Override
public void close() throws Exception {
this.scheduler.shutdown();
}

public enum StatsState {
INITIALIZED, // first initialized or graph id changed
MOCKED, // the switch can only occur from the INITIALIZED state. If remote statistics are
// unavailable, a mocked statistics object is created once.
SYNCED // remote stats is synced
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,25 @@ public GlogueSchema(
this.edgeTypeCardinality = edgeTypeCardinality;
}

public GlogueSchema(GraphSchema graphSchema) {
schemaGraph = new DirectedPseudograph<Integer, EdgeTypeId>(EdgeTypeId.class);
vertexTypeCardinality = new HashMap<Integer, Double>();
edgeTypeCardinality = new HashMap<EdgeTypeId, Double>();
for (GraphVertex vertex : graphSchema.getVertexList()) {
schemaGraph.addVertex(vertex.getLabelId());
vertexTypeCardinality.put(vertex.getLabelId(), 1.0);
}
for (GraphEdge edge : graphSchema.getEdgeList()) {
for (EdgeRelation relation : edge.getRelationList()) {
int sourceType = relation.getSource().getLabelId();
int targetType = relation.getTarget().getLabelId();
EdgeTypeId edgeType = new EdgeTypeId(sourceType, targetType, edge.getLabelId());
schemaGraph.addEdge(sourceType, targetType, edgeType);
edgeTypeCardinality.put(edgeType, 1.0);
}
}
}

public GlogueSchema(GraphSchema graphSchema, GraphStatistics statistics) {
schemaGraph = new DirectedPseudograph<Integer, EdgeTypeId>(EdgeTypeId.class);
vertexTypeCardinality = new HashMap<Integer, Double>();
Expand All @@ -62,6 +81,8 @@ public GlogueSchema(GraphSchema graphSchema, GraphStatistics statistics) {
if (vertexTypeCount == null) {
throw new IllegalArgumentException(
"Vertex type count not found for vertex type: " + vertex.getLabelId());
} else if (vertexTypeCount == 0) {
vertexTypeCardinality.put(vertex.getLabelId(), 1.0);
} else {
vertexTypeCardinality.put(vertex.getLabelId(), vertexTypeCount.doubleValue());
}
Expand All @@ -80,6 +101,8 @@ public GlogueSchema(GraphSchema graphSchema, GraphStatistics statistics) {
if (edgeTypeCount == null) {
throw new IllegalArgumentException(
"Edge type count not found for edge type: " + edge.getLabelId());
} else if (edgeTypeCount == 0) {
edgeTypeCardinality.put(edgeType, 1.0);
} else {
edgeTypeCardinality.put(edgeType, edgeTypeCount.doubleValue());
}
Expand All @@ -88,7 +111,13 @@ public GlogueSchema(GraphSchema graphSchema, GraphStatistics statistics) {
}

public static GlogueSchema fromMeta(IrMetaStats irMeta) {
return new GlogueSchema(irMeta.getSchema(), irMeta.getStatistics());
if (irMeta.getStatistics() == null) {
// build a default GlogueSchema by assuming all vertex and edge types have the same
// cardinality 1.0
return new GlogueSchema(irMeta.getSchema());
} else {
return new GlogueSchema(irMeta.getSchema(), irMeta.getStatistics());
}
}

public List<Integer> getVertexTypes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,7 @@ public IrStandardOpProcessor(
// hack implementation here: rpc client is the old way to submit job (gremlin -> traversal
// -> ir_core -> pegasus), we should remove it after replacing it with gremlin-calcite
// stack.
if (FrontendConfig.GREMLIN_SCRIPT_LANGUAGE_NAME
.get(this.configs)
.equals(AntlrGremlinScriptEngineFactory.LANGUAGE_NAME)
&& FrontendConfig.ENGINE_TYPE.get(this.configs).equals("pegasus")) {
if (FrontendConfig.ENGINE_TYPE.get(this.configs).equals("pegasus")) {
this.rpcClient = new RpcClient(fetcher.fetch());
} else {
this.rpcClient = null;
Expand Down Expand Up @@ -163,10 +160,18 @@ protected void evalOpInternal(
}
BigInteger jobId = idGenerator.generateId();
String jobName = idGenerator.generateName(jobId);
String language = FrontendConfig.GREMLIN_SCRIPT_LANGUAGE_NAME.get(configs);
IrMeta irMeta = metaQueryCallback.beforeExec();
// If the current graph schema is empty (as service startup can occur before data loading in
// Groot), we temporarily switch to the original IR core.
// In the future, once schema-free support is implemented, we will replace this temporary
// solution.
if (irMeta.getSchema().getVertexList().isEmpty()
&& irMeta.getSchema().getEdgeList().isEmpty()) {
language = AntlrGremlinScriptEngineFactory.LANGUAGE_NAME;
}
QueryStatusCallback statusCallback = createQueryStatusCallback(script, jobId);
QueryTimeoutConfig timeoutConfig = new QueryTimeoutConfig(ctx.getRequestTimeout());
String language = FrontendConfig.GREMLIN_SCRIPT_LANGUAGE_NAME.get(configs);
GremlinExecutor.LifeCycle lifeCycle;
switch (language) {
case AntlrGremlinScriptEngineFactory.LANGUAGE_NAME:
Expand Down

0 comments on commit c2c9651

Please sign in to comment.