Skip to content

Commit

Permalink
fix task result with path/tree can't be serialized (#1351)
Browse files Browse the repository at this point in the history
Change-Id: I5f740632f49b1305785a118a27236049cc71af41
  • Loading branch information
javeme authored Feb 4, 2021
1 parent cd54d7e commit 7e480e6
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@
import java.util.Date;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.tinkerpop.gremlin.process.traversal.Path;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.Tree;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONIo;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens;
import org.apache.tinkerpop.gremlin.structure.io.graphson.TinkerPopJacksonModule;
import org.apache.tinkerpop.shaded.jackson.core.JsonGenerator;
import org.apache.tinkerpop.shaded.jackson.core.JsonParser;
Expand Down Expand Up @@ -70,10 +75,10 @@ public class HugeGraphSONModule extends TinkerPopJacksonModule {

private static final long serialVersionUID = 6480426922914059122L;

public static boolean OPTIMIZE_SERIALIZE = true;

private static final String TYPE_NAMESPACE = "hugegraph";

private static boolean OPTIMIZE_SERIALIZE = true;

@SuppressWarnings("rawtypes")
private static final Map<Class, String> TYPE_DEFINITIONS;

Expand Down Expand Up @@ -198,6 +203,9 @@ public static void registerGraphSerializers(SimpleModule module) {
*/
module.addSerializer(HugeVertex.class, new HugeVertexSerializer());
module.addSerializer(HugeEdge.class, new HugeEdgeSerializer());

module.addSerializer(Path.class, new PathSerializer());
module.addSerializer(Tree.class, new TreeSerializer());
}

@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -485,6 +493,49 @@ public void serializeWithType(HugeEdge value, JsonGenerator generator,
}
}

private static class PathSerializer extends StdSerializer<Path> {

public PathSerializer() {
super(Path.class);
}

@Override
public void serialize(Path path, JsonGenerator jsonGenerator,
SerializerProvider provider) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeObjectField(GraphSONTokens.LABELS,
path.labels());
jsonGenerator.writeObjectField(GraphSONTokens.OBJECTS,
path.objects());
jsonGenerator.writeEndObject();
}
}

@SuppressWarnings("rawtypes") // Tree<T>
private static class TreeSerializer extends StdSerializer<Tree> {

public TreeSerializer() {
super(Tree.class);
}

@Override
public void serialize(Tree tree, JsonGenerator jsonGenerator,
SerializerProvider provider) throws IOException {
jsonGenerator.writeStartArray();
@SuppressWarnings("unchecked")
Set<Map.Entry<Element, Tree>> set = tree.entrySet();
for (Map.Entry<Element, Tree> entry : set) {
jsonGenerator.writeStartObject();
jsonGenerator.writeObjectField(GraphSONTokens.KEY,
entry.getKey());
jsonGenerator.writeObjectField(GraphSONTokens.VALUE,
entry.getValue());
jsonGenerator.writeEndObject();
}
jsonGenerator.writeEndArray();
}
}

private static class ShardSerializer extends StdSerializer<Shard> {

public ShardSerializer() {
Expand All @@ -493,8 +544,7 @@ public ShardSerializer() {

@Override
public void serialize(Shard shard, JsonGenerator jsonGenerator,
SerializerProvider provider)
throws IOException {
SerializerProvider provider) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeStringField("start", shard.start());
jsonGenerator.writeStringField("end", shard.end());
Expand All @@ -511,8 +561,7 @@ public FileSerializer() {

@Override
public void serialize(File file, JsonGenerator jsonGenerator,
SerializerProvider provider)
throws IOException {
SerializerProvider provider) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeStringField("file", file.getName());
jsonGenerator.writeEndObject();
Expand All @@ -527,8 +576,7 @@ public BlobSerializer() {

@Override
public void serialize(Blob blob, JsonGenerator jsonGenerator,
SerializerProvider provider)
throws IOException {
SerializerProvider provider) throws IOException {
jsonGenerator.writeBinary(blob.bytes());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ public void testGremlinJobWithScript() throws TimeoutException {
+ "schema.propertyKey('lang').asText().ifNotExist().create();"
+ "schema.propertyKey('date').asDate().ifNotExist().create();"
+ "schema.propertyKey('price').asInt().ifNotExist().create();"
+ "person1=schema.vertexLabel('person1').properties('name','age').ifNotExist().create();"
+ "person2=schema.vertexLabel('person2').properties('name','age').ifNotExist().create();"
+ "knows=schema.edgeLabel('knows').sourceLabel('person1').targetLabel('person2').properties('date').ifNotExist().create();"
+ "schema.vertexLabel('person1').properties('name','age').ifNotExist().create();"
+ "schema.vertexLabel('person2').properties('name','age').ifNotExist().create();"
+ "schema.edgeLabel('knows').sourceLabel('person1').targetLabel('person2').properties('date').ifNotExist().create();"
+ "for(int i = 0; i < 1000; i++) {"
+ " p1=graph.addVertex(T.label,'person1','name','p1-'+i,'age',29);"
+ " p2=graph.addVertex(T.label,'person2','name','p2-'+i,'age',27);"
Expand Down Expand Up @@ -302,6 +302,89 @@ public void testGremlinJobWithScript() throws TimeoutException {
Assert.assertEquals("[1000]", task.result());
}

@Test
public void testGremlinJobWithSerializedResults() throws TimeoutException {
HugeGraph graph = graph();
TaskScheduler scheduler = graph.taskScheduler();

String script = "schema=graph.schema();"
+ "schema.propertyKey('name').asText().ifNotExist().create();"
+ "schema.vertexLabel('char').useCustomizeNumberId().properties('name').ifNotExist().create();"
+ "schema.edgeLabel('next').sourceLabel('char').targetLabel('char').properties('name').ifNotExist().create();"
+ "g.addV('char').property(id,1).property('name','A').as('a')"
+ " .addV('char').property(id,2).property('name','B').as('b')"
+ " .addV('char').property(id,3).property('name','C').as('c')"
+ " .addV('char').property(id,4).property('name','D').as('d')"
+ " .addV('char').property(id,5).property('name','E').as('e')"
+ " .addV('char').property(id,6).property('name','F').as('f')"
+ " .addE('next').from('a').to('b').property('name','ab')"
+ " .addE('next').from('b').to('c').property('name','bc')"
+ " .addE('next').from('b').to('d').property('name','bd')"
+ " .addE('next').from('c').to('d').property('name','cd')"
+ " .addE('next').from('c').to('e').property('name','ce')"
+ " .addE('next').from('d').to('e').property('name','de')"
+ " .addE('next').from('e').to('f').property('name','ef')"
+ " .addE('next').from('f').to('d').property('name','fd')"
+ " .iterate();"
+ "g.tx().commit(); g.E().count();";

HugeTask<Object> task = runGremlinJob(script);
task = scheduler.waitUntilTaskCompleted(task.id(), 10);
Assert.assertEquals("test-gremlin-job", task.name());
Assert.assertEquals("gremlin", task.type());
Assert.assertEquals(TaskStatus.SUCCESS, task.status());
Assert.assertEquals("[8]", task.result());

Id edgeLabelId = graph.schema().getEdgeLabel("next").id();

script = "g.V(1).outE().inV().path()";
task = runGremlinJob(script);
task = scheduler.waitUntilTaskCompleted(task.id(), 10);
Assert.assertEquals(TaskStatus.SUCCESS, task.status());
String expected = String.format("[{\"labels\":[[],[],[]],\"objects\":["
+ "{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+ "{\"id\":\"L1>%s>>L2\",\"label\":\"next\",\"type\":\"edge\",\"outV\":1,\"outVLabel\":\"char\",\"inV\":2,\"inVLabel\":\"char\",\"properties\":{\"name\":\"ab\"}},"
+ "{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}}"
+ "]}]", edgeLabelId);
Assert.assertEquals(expected, task.result());

script = "g.V(1).out().out().path()";
task = runGremlinJob(script);
task = scheduler.waitUntilTaskCompleted(task.id(), 10);
Assert.assertEquals(TaskStatus.SUCCESS, task.status());
expected = "[{\"labels\":[[],[],[]],\"objects\":["
+ "{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+ "{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}},"
+ "{\"id\":3,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"C\"}}]},"
+ "{\"labels\":[[],[],[]],\"objects\":["
+ "{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+ "{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}},"
+ "{\"id\":4,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"D\"}}]}]";
Assert.assertEquals(expected, task.result());

script = "g.V(1).outE().inV().tree()";
task = runGremlinJob(script);
task = scheduler.waitUntilTaskCompleted(task.id(), 10);
Assert.assertEquals(TaskStatus.SUCCESS, task.status());
expected = String.format("[[{\"key\":{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+ "\"value\":["
+ "{\"key\":{\"id\":\"L1>%s>>L2\",\"label\":\"next\",\"type\":\"edge\",\"outV\":1,\"outVLabel\":\"char\",\"inV\":2,\"inVLabel\":\"char\",\"properties\":{\"name\":\"ab\"}},"
+ "\"value\":[{\"key\":{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}},\"value\":[]}]}]}]]",
edgeLabelId);
Assert.assertEquals(expected, task.result());

script = "g.V(1).out().out().tree()";
task = runGremlinJob(script);
task = scheduler.waitUntilTaskCompleted(task.id(), 10);
Assert.assertEquals(TaskStatus.SUCCESS, task.status());
expected = "[[{\"key\":{\"id\":1,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"A\"}},"
+ "\"value\":[{\"key\":{\"id\":2,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"B\"}},"
+ "\"value\":["
+ "{\"key\":{\"id\":3,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"C\"}},\"value\":[]},"
+ "{\"key\":{\"id\":4,\"label\":\"char\",\"type\":\"vertex\",\"properties\":{\"name\":\"D\"}},\"value\":[]}]}]}]]";
Assert.assertEquals(expected, task.result());
}

@Test
public void testGremlinJobWithFailure() throws TimeoutException {
HugeGraph graph = graph();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.baidu.hugegraph.schema.PropertyKey;
import com.baidu.hugegraph.schema.SchemaManager;
import com.baidu.hugegraph.task.TaskScheduler;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.type.define.IdStrategy;
import com.baidu.hugegraph.type.define.NodeRole;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -251,7 +252,8 @@ public GraphComputer compute() throws IllegalArgumentException {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public <I extends Io> I io(final Io.Builder<I> builder) {
HugeGraphSONModule.OPTIMIZE_SERIALIZE = false;
Whitebox.setInternalState(HugeGraphSONModule.class,
"OPTIMIZE_SERIALIZE", false);
return (I) builder.graph(this).onMapper(mapper ->
mapper.addRegistry(HugeGraphIoRegistry.instance())
).create();
Expand Down

0 comments on commit 7e480e6

Please sign in to comment.