Skip to content

Commit

Permalink
fix(interactive): Support for option of using hash(srcId, dstId, edge…
Browse files Browse the repository at this point in the history
…LabelId, edgePks) instead of nanotime as eid (#3357)

Support edge updating scenarios where eid is not known in advance.
User can leverage `enable.hash.generate.eid` to  switch from previous eid generation behavior.
  • Loading branch information
bufapiqi authored Nov 16, 2023
1 parent c7f6b3d commit 41bc5ae
Show file tree
Hide file tree
Showing 17 changed files with 237 additions and 31 deletions.
1 change: 1 addition & 0 deletions charts/graphscope-store-one-pod/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ data:
## Frontend Config
frontend.server.num=1
enable.hash.generate.eid={{ .Values.enableHashGenerateEid }}
## Ingestor Config
ingestor.queue.buffer.size={{ .Values.ingestorQueueBufferSize }}
Expand Down
3 changes: 3 additions & 0 deletions charts/graphscope-store-one-pod/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ snapshotIncreaseIntervalMs: 1000
offsetsPersistIntervalMs: 3000
fileMetaStorePath: "/var/lib/graphscope-store/meta"

## Frontend config
enableHashGenerateEid: false

## Store Config
storeDataPath: "/var/lib/graphscope-store"
storeWriteThreadCount: 1
Expand Down
1 change: 1 addition & 0 deletions charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ data:
frontend.service.port=55556
frontend.server.id=INDEX
frontend.server.num={{ .Values.frontend.replicaCount }}
enable.hash.generate.eid={{ .Values.enableHashGenerateEid }}
## Ingestor Config
ingestor.queue.buffer.size={{ .Values.ingestorQueueBufferSize }}
Expand Down
1 change: 1 addition & 0 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ kafkaTopic: "graphscope"
kafkaProducerCustomConfigs: ""

## Frontend Config
enableHashGenerateEid: false
# gremlinServerPort: 12312

executorWorkerPerProcess: 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ public class FrontendConfig {
Config.intConfig(
"frontend.service.thread.count",
Math.max(Math.min(Runtime.getRuntime().availableProcessors() / 2, 64), 4));

public static final Config<Boolean> ENABLE_HASH_GENERATE_EID =
Config.boolConfig("enable.hash.generate.eid", false);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.graphscope.groot.common.schema.wrapper.TypeDef;
import com.google.common.base.MoreObjects;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

Expand All @@ -29,6 +30,7 @@ public class DefaultGraphEdge implements GraphEdge {
private String label;
private List<GraphProperty> propertyList;
private List<EdgeRelation> relationList;
private List<String> primaryKeyList;

private final int versionId;

Expand All @@ -45,11 +47,35 @@ public DefaultGraphEdge(
String label,
List<GraphProperty> propertyList,
List<EdgeRelation> relationList,
List<String> primaryKeyList) {
this(id, label, propertyList, relationList, primaryKeyList, 0);
}

public DefaultGraphEdge(
int id,
String label,
List<GraphProperty> propertyList,
List<EdgeRelation> relationList,
int versionId) {
this.id = id;
this.label = label;
this.propertyList = propertyList;
this.relationList = relationList;
this.versionId = versionId;
}

public DefaultGraphEdge(
int id,
String label,
List<GraphProperty> propertyList,
List<EdgeRelation> relationList,
List<String> primaryKeyList,
int versionId) {
this.id = id;
this.label = label;
this.propertyList = propertyList;
this.relationList = relationList;
this.primaryKeyList = primaryKeyList;
this.versionId = versionId;
}

Expand All @@ -59,6 +85,7 @@ public DefaultGraphEdge(TypeDef typeDef, List<EdgeRelation> edgeRelations) {
typeDef.getLabel(),
typeDef.getPropertyList(),
edgeRelations,
typeDef.getPrimaryKeyNameList(),
typeDef.getVersionId());
}

Expand Down Expand Up @@ -111,12 +138,18 @@ public int getVersionId() {

@Override
public List<GraphProperty> getPrimaryKeyList() {
return null;
List<GraphProperty> props = new ArrayList<>();
if (this.primaryKeyList != null) {
for (String name : primaryKeyList) {
props.add(getProperty(name));
}
}
return props;
}

@Override
public List<String> getPrimaryKeyNameList() {
return null;
return primaryKeyList;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,22 +187,15 @@ public static GraphSchema buildSchemaFromJson(String schemaJson) {
}

if (type.equalsIgnoreCase("VERTEX")) {
List<String> primaryKeyList = Lists.newArrayList();

JsonNode indexArray = typeObject.get("indexes");
if (indexArray != null) {
for (JsonNode indexObject : indexArray) {
JsonNode priNameList = indexObject.get("propertyNames");
for (JsonNode pri : priNameList) {
primaryKeyList.add(pri.asText());
}
}
}
List<String> primaryKeyList = getPrimaryKeyList(typeObject.get("indexes"));
DefaultGraphVertex graphVertex =
new DefaultGraphVertex(
labelId, label, propertyList, primaryKeyList);
vertexList.put(label, graphVertex);
} else {
// get edge pk name list
List<String> primaryKeyList = getPrimaryKeyList(typeObject.get("indexes"));

List<EdgeRelation> relationList = Lists.newArrayList();
JsonNode relationArray = typeObject.get("rawRelationShips");
if (null != relationArray) {
Expand All @@ -218,7 +211,8 @@ public static GraphSchema buildSchemaFromJson(String schemaJson) {
logger.warn("There's no relation def in edge " + label);
}
DefaultGraphEdge graphEdge =
new DefaultGraphEdge(labelId, label, propertyList, relationList);
new DefaultGraphEdge(
labelId, label, propertyList, relationList, primaryKeyList);
edgeList.put(label, graphEdge);
}
}
Expand All @@ -233,6 +227,19 @@ public static GraphSchema buildSchemaFromJson(String schemaJson) {
}
}

private static List<String> getPrimaryKeyList(JsonNode indexArray) {
List<String> primaryKeyList = Lists.newArrayList();
if (indexArray != null) {
for (JsonNode indexObject : indexArray) {
JsonNode priNameList = indexObject.get("propertyNames");
for (JsonNode pri : priNameList) {
primaryKeyList.add(pri.asText());
}
}
}
return primaryKeyList;
}

public static void main(String[] args) throws JsonProcessingException {
String schemaJson =
"{\"partitionNum\": 2, \"types\": [{\"id\": 0, \"indexes\": [{\"propertyNames\":"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,21 @@
public class EdgeTypeMapper extends SchemaElementMapper {
private List<EdgeRelationMapper> relationShips;

private List<ElementIndexMapper> indexes;

public static SchemaElementMapper parseFromEdgeType(GraphEdge graphEdge) {
EdgeTypeMapper edgeTypeMapper = new EdgeTypeMapper();
edgeTypeMapper.setId(graphEdge.getLabelId());
edgeTypeMapper.setLabel(graphEdge.getLabel());
edgeTypeMapper.setType(TypeEnum.EDGE.toString());

ElementIndexMapper elementIndexMapper = new ElementIndexMapper();
elementIndexMapper.setName("primary_key");
elementIndexMapper.setIndexType("PRIMARY_KEY");
elementIndexMapper.setPropertyNames(graphEdge.getPrimaryKeyNameList());
ArrayList<ElementIndexMapper> elementIndexMapperList = new ArrayList<>();
elementIndexMapperList.add(elementIndexMapper);
edgeTypeMapper.setIndexes(elementIndexMapperList);
List<EdgeRelationMapper> relationMapperList = new ArrayList<>();
for (EdgeRelation edgeRelation : graphEdge.getRelationList()) {
relationMapperList.add(
Expand All @@ -56,6 +65,14 @@ public List<EdgeRelationMapper> getRelationShips() {
return relationShips;
}

public List<ElementIndexMapper> getIndexes() {
return indexes;
}

public void setIndexes(List<ElementIndexMapper> indexes) {
this.indexes = indexes;
}

public void setRelationShips(List<EdgeRelationMapper> relationShips) {
this.relationShips = relationShips;
}
Expand All @@ -73,11 +90,21 @@ public GraphEdge toEdgeType(Map<String, GraphVertex> vertexTypeMap) {
relationList.add(relationMapper.toEdgeRelation(vertexTypeMap));
}
}
List<String> primaryKeyList = new ArrayList<>();
if (indexes != null && indexes.size() > 0) {
if (indexes.size() > 1) {
throw new IllegalArgumentException(
"Only support primary key now for " + this.indexes);
}
primaryKeyList = indexes.get(0).getPropertyNames();
}

return new DefaultGraphEdge(
this.getId(),
this.getLabel(),
graphPropertyList,
relationList,
primaryKeyList,
this.getVersionId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.util.List;
import java.util.stream.Collectors;

public class VertexIndexMapper {
public class ElementIndexMapper {
private String name;
private String indexType;
private List<String> propertyNames;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

@JsonIgnoreProperties(ignoreUnknown = true)
public class VertexTypeMapper extends SchemaElementMapper {
private List<VertexIndexMapper> indexes;
private List<ElementIndexMapper> indexes;
private long tableId;

public static VertexTypeMapper parseFromVertexType(GraphVertex graphVertex) {
Expand All @@ -34,13 +34,13 @@ public static VertexTypeMapper parseFromVertexType(GraphVertex graphVertex) {
vertexTypeMapper.setLabel(graphVertex.getLabel());
vertexTypeMapper.setType(TypeEnum.VERTEX.toString());

VertexIndexMapper vertexIndexMapper = new VertexIndexMapper();
vertexIndexMapper.setName("primary_key");
vertexIndexMapper.setIndexType("PRIMARY_KEY");
vertexIndexMapper.setPropertyNames(graphVertex.getPrimaryKeyNameList());
ArrayList<VertexIndexMapper> vertexIndexMapperList = new ArrayList<>();
vertexIndexMapperList.add(vertexIndexMapper);
vertexTypeMapper.setIndexes(vertexIndexMapperList);
ElementIndexMapper elementIndexMapper = new ElementIndexMapper();
elementIndexMapper.setName("primary_key");
elementIndexMapper.setIndexType("PRIMARY_KEY");
elementIndexMapper.setPropertyNames(graphVertex.getPrimaryKeyNameList());
ArrayList<ElementIndexMapper> elementIndexMapperList = new ArrayList<>();
elementIndexMapperList.add(elementIndexMapper);
vertexTypeMapper.setIndexes(elementIndexMapperList);
List<GraphPropertyMapper> propertyMapperList = new ArrayList<>();
for (GraphProperty graphProperty : graphVertex.getPropertyList()) {
propertyMapperList.add(GraphPropertyMapper.parseFromGraphProperty(graphProperty));
Expand All @@ -51,11 +51,11 @@ public static VertexTypeMapper parseFromVertexType(GraphVertex graphVertex) {
return vertexTypeMapper;
}

public List<VertexIndexMapper> getIndexes() {
public List<ElementIndexMapper> getIndexes() {
return indexes;
}

public void setIndexes(List<VertexIndexMapper> indexes) {
public void setIndexes(List<ElementIndexMapper> indexes) {
this.indexes = indexes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,31 @@ public static long hash(int labelId, List<byte[]> pks) {
return hash64(buffer.array(), buffer.limit());
}

public static long hash(long srcId, long dstId, int labelId, List<byte[]> pks) {
ByteBuffer buffer = THREAD_BUFFER.get();
clear(buffer);
buffer.putLong(srcId);
buffer.putLong(dstId);
buffer.putInt(labelId);
for (byte[] pk : pks) {
buffer.putInt(pk.length);
buffer.put(pk);
}
flip(buffer);
return hash64(buffer.array(), buffer.limit());
}

public static long hash(long srcId, long dstId, int labelId, long nanoTime) {
ByteBuffer buffer = THREAD_BUFFER.get();
clear(buffer);
buffer.putLong(srcId);
buffer.putLong(dstId);
buffer.putInt(labelId);
buffer.putLong(nanoTime);
flip(buffer);
return hash64(buffer.array(), buffer.limit());
}

/**
* Generates 64-bit hash from byte array of the given length and seed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ public BytesRef encodeEdgeKey(
labelPkIds.computeIfAbsent(type, k -> SchemaUtils.getEdgePrimaryKeyList(type));
long eid;
if (edgePkIds != null && edgePkIds.size() > 0) {
eid = getHashId(type.getLabelId(), propertiesMap, edgePkIds);
List<byte[]> pkBytes = getPkBytes(type.getLabelId(), propertiesMap, edgePkIds);
eid = PkHashUtils.hash(srcId, dstId, type.getLabelId(), pkBytes);
} else {
eid = System.nanoTime();
eid = PkHashUtils.hash(srcId, dstId, type.getLabelId(), System.nanoTime());
}

if (outEdge) {
Expand All @@ -102,7 +103,7 @@ public BytesRef encodeProperties(int labelId, Map<Integer, PropertyValue> proper
return new BytesRef(scratch.array(), 0, scratch.limit());
}

private static long getHashId(
private static List<byte[]> getPkBytes(
int labelId, Map<Integer, PropertyValue> operationProperties, List<Integer> pkIds) {
List<byte[]> pks = new ArrayList<>(pkIds.size());
for (int pkId : pkIds) {
Expand All @@ -114,6 +115,12 @@ private static long getHashId(
byte[] valBytes = propertyValue.getValBytes();
pks.add(valBytes);
}
return pks;
}

private static long getHashId(
int labelId, Map<Integer, PropertyValue> operationProperties, List<Integer> pkIds) {
List<byte[]> pks = getPkBytes(labelId, operationProperties, pkIds);
return PkHashUtils.hash(labelId, pks);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.alibaba.graphscope.groot.common.schema.api.GraphSchema;
import com.alibaba.graphscope.groot.common.schema.mapper.GraphSchemaMapper;
import com.alibaba.graphscope.groot.common.schema.wrapper.GraphDef;
import com.alibaba.graphscope.groot.common.util.JSON;
import com.alibaba.graphscope.groot.common.util.UuidUtils;
import com.alibaba.graphscope.groot.dataload.util.OSSFS;
import com.alibaba.graphscope.groot.dataload.util.VolumeFS;
Expand Down Expand Up @@ -72,6 +73,7 @@ public static void main(String[] args) throws IOException {
GraphDefPb graphDefPb = client.prepareDataLoad(targets);
System.out.println("GraphDef: " + graphDefPb);
GraphSchema schema = GraphDef.parseProto(graphDefPb);
System.out.println("GraphSchema: " + JSON.toJson(schema));

// number of reduce task
int partitionNum = client.getPartitionNum();
Expand Down Expand Up @@ -124,6 +126,7 @@ public static void main(String[] args) throws IOException {
}

String schemaJson = GraphSchemaMapper.parseFromSchema(schema).toJsonString();
System.out.println("schemaJson is :" + schemaJson);
Map<String, ColumnMappingInfo> info = Utils.getMappingInfo(odps, schema, mappingConfig);
ObjectMapper mapper = new ObjectMapper();

Expand Down
Loading

0 comments on commit 41bc5ae

Please sign in to comment.