Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce HugeGraphFlinkCDCLoader #291

Merged
merged 11 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ public final class Constants {
public static final String[] SEARCH_LIST = new String[]{":", "!"};
public static final String[] TARGET_LIST = new String[]{"`:", "`!"};

public static final String HOST_PORT_REGEX = ".+://(.+):(\\d+)";
public static final String CDC_DATA = "data";
public static final String CDC_OP = "op";

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public void deserialize(SourceRecord sourceRecord,
data = value.getStruct("after");
break;
default:
throw new RuntimeException("The type of `op` should be 'c' 'r' 'u' 'd' only");
throw new IllegalArgumentException(
"The type of `op` should be 'c' 'r' 'u' 'd' only");
}
ObjectNode rootNode = mapper.createObjectNode();
if (data != null) {
Expand All @@ -78,7 +79,6 @@ public void deserialize(SourceRecord sourceRecord,
result.put(Constants.CDC_OP, op);
LOG.debug("Loaded data: {}", result.toString());
collector.collect(result.toString());

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;

import com.baidu.hugegraph.loader.exception.LoadException;
import com.baidu.hugegraph.loader.executor.LoadOptions;
import com.baidu.hugegraph.loader.mapping.InputStruct;
import com.baidu.hugegraph.loader.mapping.LoadMapping;
Expand Down Expand Up @@ -65,30 +66,8 @@ public void load() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

for (InputStruct struct : structs) {
JDBCSource input = (JDBCSource) struct.input();
String url = input.url();
String host;
int port;
try {
URIBuilder uriBuilder = new URIBuilder(url.substring(JDBC_PREFIX.length()));
host = uriBuilder.getHost();
port = uriBuilder.getPort();
} catch (URISyntaxException e) {
throw new RuntimeException(
String.format("Failed to parse Url(%s) to get hostName and port",
url), e);
}

MySqlSource<String> mysqlSource = MySqlSource.<String>builder()
.hostname(host)
.port(port)
.databaseList(input.database())
.tableList(input.database() + "." + input.table())
.username(input.username())
.password(input.password())
.deserializer(new HugeGraphDeserialization())
.build();

simon824 marked this conversation as resolved.
Show resolved Hide resolved
MySqlSource<String> mysqlSource = buildMysqlSource(struct);
DataStreamSource<String> source =
env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");

Expand All @@ -99,7 +78,31 @@ public void load() {
try {
env.execute("flink-cdc-hugegraph");
} catch (Exception e) {
Printer.printError("Failed to execute flink.", e);
throw new LoadException("Failed to execute flink", e);
}
}

private MySqlSource<String> buildMysqlSource(InputStruct struct) {
JDBCSource input = (JDBCSource) struct.input();
String url = input.url();
String host;
int port;
try {
URIBuilder uriBuilder = new URIBuilder(url.substring(JDBC_PREFIX.length()));
host = uriBuilder.getHost();
port = uriBuilder.getPort();
} catch (URISyntaxException e) {
throw new RuntimeException(
simon824 marked this conversation as resolved.
Show resolved Hide resolved
String.format("Failed to parse Url(%s) to get hostName and port", url), e);
simon824 marked this conversation as resolved.
Show resolved Hide resolved
}
return MySqlSource.<String>builder()
.hostname(host)
simon824 marked this conversation as resolved.
Show resolved Hide resolved
.port(port)
.databaseList(input.database())
.tableList(input.database() + "." + input.table())
.username(input.username())
.password(input.password())
.deserializer(new HugeGraphDeserialization())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -77,14 +79,14 @@ public HugeGraphOutputFormat(InputStruct struct, String[] args) {
}

private Map<ElementBuilder, List<String>> initBuilders() {
LoadContext loadContext = new LoadContext(loadOptions);
LoadContext loadContext = new LoadContext(this.loadOptions);
Map<ElementBuilder, List<String>> builders = new HashMap<>();
for (VertexMapping vertexMapping : struct.vertices()) {
builders.put(new VertexBuilder(loadContext, struct, vertexMapping),
for (VertexMapping vertexMapping : this.struct.vertices()) {
builders.put(new VertexBuilder(loadContext, this.struct, vertexMapping),
new ArrayList<>());
}
for (EdgeMapping edgeMapping : struct.edges()) {
builders.put(new EdgeBuilder(loadContext, struct, edgeMapping),
for (EdgeMapping edgeMapping : this.struct.edges()) {
builders.put(new EdgeBuilder(loadContext, this.struct, edgeMapping),
new ArrayList<>());
}
return builders;
Expand All @@ -102,26 +104,26 @@ public void open(int taskNumber, int numTasks) {
int flushIntervalMs = this.loadOptions.flushIntervalMs;
if (flushIntervalMs > 0) {
this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
"hugegraph-streamload-outputformat"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
synchronized (HugeGraphOutputFormat.this) {
if (!closed) {
try {
flushAll();
} catch (Exception e) {
LOG.error("Failed to flush all data.", e);
}
}
}
}, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
"hugegraph-streamload-outputformat"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(
this::flushAll, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
}
}

private void flushAll() {
for (Map.Entry<ElementBuilder, List<String>> builder : this.builders.entrySet()) {
List<String> graphElements = builder.getValue();
if (graphElements.size() > 0) {
flush(builder);
synchronized (HugeGraphOutputFormat.this) {
simon824 marked this conversation as resolved.
Show resolved Hide resolved
if (!this.closed) {
simon824 marked this conversation as resolved.
Show resolved Hide resolved
try {
for (Map.Entry<ElementBuilder, List<String>> builder :
this.builders.entrySet()) {
List<String> graphElements = builder.getValue();
if (graphElements.size() > 0) {
flush(builder.getKey(), graphElements);
}
}
} catch (Exception e) {
throw new RuntimeException("Failed to flush all data.", e);
simon824 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
Expand All @@ -139,35 +141,37 @@ public synchronized void writeRecord(T row) throws IOException {
List<String> graphElements = builder.getValue();
graphElements.add(row.toString());
if (graphElements.size() > elementMapping.batchSize()) {
flush(builder);
flush(builder.getKey(), builder.getValue());
}
}
}

private void flush(Map.Entry<ElementBuilder, List<String>> builder) {
GraphManager g = loadContext.client().graph();
ElementBuilder elementBuilder = builder.getKey();
ElementMapping elementMapping = elementBuilder.mapping();
for (String row : builder.getValue()) {
JsonNode node;
try {
node = new ObjectMapper().readTree(row);
} catch (JsonProcessingException e) {
LOG.error("Failed to parse json {}", row, e);
continue;
}

JsonNode data = node.get(Constants.CDC_DATA);
String op = node.get(Constants.CDC_OP).asText();
private Tuple2<String, List<GraphElement>> buildGraphData(ElementBuilder elementBuilder,
String row) {
JsonNode node;
try {
node = new ObjectMapper().readTree(row);
} catch (JsonProcessingException e) {
throw new ParseException(row, e);
}
JsonNode data = node.get(Constants.CDC_DATA);
String op = node.get(Constants.CDC_OP).asText();
String[] fields = this.struct.input().header();
String[] values = new String[data.size()];
for (int i = 0; i < fields.length; i++) {
values[i] = data.get(fields[i]).asText();
}
return Tuple2.of(op, elementBuilder.build(fields, values));
}

String[] fields = struct.input().header();
String[] values = new String[data.size()];
for (int i = 0; i < fields.length; i++) {
values[i] = data.get(fields[i]).asText();
}
List<GraphElement> graphElements = builder.getKey().build(fields, values);
private void flush(ElementBuilder elementBuilder, List<String> rows) {
GraphManager g = this.loadContext.client().graph();
ElementMapping elementMapping = elementBuilder.mapping();
for (String row : rows) {
Tuple2<String, List<GraphElement>> graphData = buildGraphData(elementBuilder, row);
List<GraphElement> graphElements = graphData.f1;
boolean isVertex = elementBuilder.mapping().type().isVertex();
switch (Envelope.Operation.forCode(op)) {
switch (Envelope.Operation.forCode(graphData.f0)) {
case READ:
simon824 marked this conversation as resolved.
Show resolved Hide resolved
case CREATE:
if (isVertex) {
Expand All @@ -180,8 +184,7 @@ private void flush(Map.Entry<ElementBuilder, List<String>> builder) {
Map<String, UpdateStrategy> updateStrategyMap =
elementMapping.updateStrategies();
if (isVertex) {
BatchVertexRequest.Builder req =
new BatchVertexRequest.Builder();
BatchVertexRequest.Builder req = new BatchVertexRequest.Builder();
req.vertices((List<Vertex>) (Object) graphElements)
.updatingStrategies(updateStrategyMap)
.createIfNotExist(true);
Expand All @@ -204,18 +207,19 @@ private void flush(Map.Entry<ElementBuilder, List<String>> builder) {
}
break;
default:
throw new RuntimeException("The type of `op` should be 'c' 'r' 'u' 'd' only");
throw new IllegalArgumentException(
"The type of `op` should be 'c' 'r' 'u' 'd' only");
}
}
builder.getValue().clear();
rows.clear();
}

@Override
public synchronized void close() {
if (!closed) {
closed = true;
if (!this.closed) {
simon824 marked this conversation as resolved.
Show resolved Hide resolved
this.closed = true;
if (this.scheduledFuture != null) {
scheduledFuture.cancel(false);
this.scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ public HugeGraphSinkFunction(@Nonnull HugeGraphOutputFormat<Object> outputFormat
public void open(Configuration parameters) throws Exception {
super.open(parameters);
RuntimeContext ctx = getRuntimeContext();
outputFormat.setRuntimeContext(ctx);
outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
this.outputFormat.setRuntimeContext(ctx);
this.outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}

@Override
public void invoke(T value, Context context) throws Exception {
outputFormat.writeRecord(value);
this.outputFormat.writeRecord(value);
}

@Override
Expand All @@ -64,7 +64,7 @@ public void snapshotState(FunctionSnapshotContext context) {

@Override
public void close() throws Exception {
outputFormat.close();
this.outputFormat.close();
super.close();
}
}