diff --git a/BCLOUD b/BCLOUD new file mode 100644 index 000000000..abcf9582e --- /dev/null +++ b/BCLOUD @@ -0,0 +1 @@ +BUILD_SUBMITTER -x -e CENTOS6U3 -m baidu/xbu-data/hugegraph-loader -c "export MAVEN_HOME=/home/scmtools/buildkit/maven/apache-maven-3.3.9/ && export JAVA_HOME=/home/scmtools/buildkit/java/jdk1.8.0_25/ && export PATH=$JAVA_HOME/bin:$MAVEN_HOME/bin:$PATH && cd baidu/xbu-data/hugegraph-loader && sh build.sh && mkdir output && cp BCLOUD ./output/" -u ./ diff --git a/BUILDING.md b/BUILDING.md new file mode 100644 index 000000000..a85b754df --- /dev/null +++ b/BUILDING.md @@ -0,0 +1,25 @@ +Building hugegraph-loader + +-------------- + +Required: + +* Java 8 (0.9 and later) + +* Maven + +To build without executing tests: + +``` + +mvn clean install -DskipTests=true + +``` + +To build with default tests: + +``` + +mvn clean install + +``` \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 000000000..e06d20818 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + diff --git a/README.md b/README.md new file mode 100644 index 000000000..9b55472bd --- /dev/null +++ b/README.md @@ -0,0 +1,18 @@ +# hugegraph-loader + +hugegraph-loader is a customizable command line utility for loading small to medium size graph datasets into the HugeGraph database from files with various input formats. + +## Features + +- Various input formats, such as json, csv, and text with any delimiters. +- Diverse options, with which users can manage the data loading intuitively. +- Detecting schema from data automatically, reduce the complex work of schema management. +- Advanced customized operations with groovy script, users can configure how to construct vertices and edges by themselves. + +## Learn More + +The [project homepage](https://hugegraph.github.io/hugegraph-doc/) contains more information about hugegraph-loader. + +## License + +hugegraph-loader is licensed under Apache 2.0 License. \ No newline at end of file diff --git a/assembly/descriptor/assembly.xml b/assembly/descriptor/assembly.xml new file mode 100644 index 000000000..e1a372891 --- /dev/null +++ b/assembly/descriptor/assembly.xml @@ -0,0 +1,54 @@ + + distribution + false + + + dir + + + + + ${assembly.static.dir}/bin + bin + + * + + 755 + + + ${assembly.static.dir} + / + false + + + ${project.basedir} + / + + README* + LICENSE* + NOTICE* + + + + ${project.build.directory} + lib + + *.jar + + + + + + + + /lib + false + runtime + false + + *:*:jar:* + + + + + \ No newline at end of file diff --git a/assembly/static/bin/hugegraph-loader b/assembly/static/bin/hugegraph-loader new file mode 100755 index 000000000..b4cb52c37 --- /dev/null +++ b/assembly/static/bin/hugegraph-loader @@ -0,0 +1,64 @@ +#!/bin/bash + +abs_path() { + SOURCE="${BASH_SOURCE[0]}" + while [ -h "$SOURCE" ]; do + DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )" + SOURCE="$(readlink "$SOURCE")" + [[ $SOURCE != /* ]] && SOURCE="$DIR/$SOURCE" + done + echo "$( cd -P "$( dirname "$SOURCE" )" && pwd )" +} + +BIN=`abs_path` +TOP="$(cd $BIN/../ && pwd)" +CONF="$TOP/conf" +LIB="$TOP/lib" +LOG="$TOP/logs" + +# Use the unofficial bash strict mode to avoid subtle bugs impossible. +# Don't use -u option for now because LOADER_HOME might not yet defined. +set -eo pipefail + +export VARS=${@:1} + +# Use JAVA_HOME if set, otherwise look for java in PATH +if [ -n "$JAVA_HOME" ]; then + # Why we can't have nice things: Solaris combines x86 and x86_64 + # installations in the same tree, using an unconventional path for the + # 64bit JVM. Since we prefer 64bit, search the alternate path first, + # (see https://issues.apache.org/jira/browse/CASSANDRA-4638). + for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do + if [ -x "$java" ]; then + JAVA="$java" + break + fi + done +else + JAVA=java +fi + +if [ -z $JAVA ] ; then + echo Unable to find java executable. Check JAVA_HOME and PATH environment variables. > /dev/stderr + exit 1; +fi + +# Add the slf4j-log4j12 binding +CP=$(find -L $LIB -name 'log4j-slf4j-impl*.jar' | sort | tr '\n' ':') +# Add the jars in lib that start with "hugegraph" +CP="$CP":$(find -L $LIB -name 'hugegraph*.jar' | sort | tr '\n' ':') +# Add the remaining jars in lib. +CP="$CP":$(find -L $LIB -name '*.jar' \ + \! -name 'hugegraph*' \ + \! -name 'log4j-slf4j-impl*.jar' | sort | tr '\n' ':') + +export LOADER_CLASSPATH="${CLASSPATH:-}:$CP" + +# Xmx needs to be set so that it is big enough to cache all the vertexes in the run +export JVM_OPTS="$JVM_OPTS -Xmx10g -cp $LOADER_CLASSPATH" + +# Uncomment to enable debugging +#JVM_OPTS="$JVM_OPTS -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=1414" + +exec "$JAVA" -Dname="HugeGraphLoader" -Dlog4j.configurationFile="$CONF/log4j2.xml" \ +$JVM_OPTS com.baidu.hugegraph.loader.HugeGraphLoader $VARS diff --git a/assembly/static/conf/log4j2.xml b/assembly/static/conf/log4j2.xml new file mode 100644 index 000000000..0ffd92504 --- /dev/null +++ b/assembly/static/conf/log4j2.xml @@ -0,0 +1,68 @@ + + + + + UTF-8 + + + + + + + + + + + + + + + ${log_charset} + %m%n + + + + + ${log_charset} + %m%n + + + + + ${log_charset} + %m%n + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/assembly/static/example/edge_created.json b/assembly/static/example/edge_created.json new file mode 100644 index 000000000..c441ee672 --- /dev/null +++ b/assembly/static/example/edge_created.json @@ -0,0 +1,4 @@ +{"aname": "marko", "bname": "lop", "date": "20171210", "weight": 0.4} +{"aname": "josh", "bname": "lop", "date": "20091111", "weight": 0.4} +{"aname": "josh", "bname": "ripple", "date": "20171210", "weight": 1.0} +{"aname": "peter", "bname": "lop", "date": "20170324", "weight": 0.2} \ No newline at end of file diff --git a/assembly/static/example/edge_knows.json b/assembly/static/example/edge_knows.json new file mode 100644 index 000000000..216719278 --- /dev/null +++ b/assembly/static/example/edge_knows.json @@ -0,0 +1,2 @@ +{"source_name": "marko", "target_name": "vadas", "date": "20160110", "weight": 0.5} +{"source_name": "marko", "target_name": "josh", "date": "20130220", "weight": 1.0} \ No newline at end of file diff --git a/assembly/static/example/schema.groovy b/assembly/static/example/schema.groovy new file mode 100644 index 000000000..0fd5cf410 --- /dev/null +++ b/assembly/static/example/schema.groovy @@ -0,0 +1,24 @@ +// Define schema +schema.propertyKey("name").asText().ifNotExist().create(); +schema.propertyKey("age").asInt().ifNotExist().create(); +schema.propertyKey("city").asText().ifNotExist().create(); +schema.propertyKey("weight").asDouble().ifNotExist().create(); +schema.propertyKey("lang").asText().ifNotExist().create(); +schema.propertyKey("date").asText().ifNotExist().create(); +schema.propertyKey("price").asDouble().ifNotExist().create(); + +schema.vertexLabel("person").properties("name", "age", "city").primaryKeys("name").ifNotExist().create(); +schema.vertexLabel("software").properties("name", "lang", "price").primaryKeys("name").ifNotExist().create(); + +schema.indexLabel("personByName").onV("person").by("name").secondary().ifNotExist().create(); +schema.indexLabel("personByAge").onV("person").by("age").range().ifNotExist().create(); +schema.indexLabel("personByCity").onV("person").by("city").secondary().ifNotExist().create(); +schema.indexLabel("personByAgeAndCity").onV("person").by("age", "city").secondary().ifNotExist().create(); +schema.indexLabel("softwareByPrice").onV("software").by("price").range().ifNotExist().create(); + +schema.edgeLabel("knows").sourceLabel("person").targetLabel("person").properties("date", "weight").ifNotExist().create(); +schema.edgeLabel("created").sourceLabel("person").targetLabel("software").properties("date", "weight").ifNotExist().create(); + +schema.indexLabel("createdByDate").onE("created").by("date").secondary().ifNotExist().create(); +schema.indexLabel("createdByWeight").onE("created").by("weight").range().ifNotExist().create(); +schema.indexLabel("knowsByWeight").onE("knows").by("weight").range().ifNotExist().create(); \ No newline at end of file diff --git a/assembly/static/example/struct.json b/assembly/static/example/struct.json new file mode 100644 index 000000000..63bc7f4a8 --- /dev/null +++ b/assembly/static/example/struct.json @@ -0,0 +1,59 @@ +{ + "vertices": [ + { + "label": "person", + "input": { + "type": "file", + "path": "example/vertex_person.csv", + "format": "CSV", + "header": ["name", "age", "city"], + "charset": "UTF-8" + }, + "mapping": { + "name": "name", + "age": "age", + "city": "city" + } + }, + { + "label": "software", + "input": { + "type": "file", + "path": "example/vertex_software.text", + "format": "TEXT", + "delimiter": "|", + "charset": "GBK" + } + } + ], + "edges": [ + { + "label": "knows", + "source": ["source_name"], + "target": ["target_name"], + "input": { + "type": "file", + "path": "example/edge_knows.json", + "format": "JSON" + }, + "mapping": { + "source_name": "name", + "target_name": "name" + } + }, + { + "label": "created", + "source": ["aname"], + "target": ["bname"], + "input": { + "type": "file", + "path": "example/edge_created.json", + "format": "JSON" + }, + "mapping": { + "aname": "name", + "bname": "name" + } + } + ] +} \ No newline at end of file diff --git a/assembly/static/example/vertex_person.csv b/assembly/static/example/vertex_person.csv new file mode 100644 index 000000000..00a34c0c8 --- /dev/null +++ b/assembly/static/example/vertex_person.csv @@ -0,0 +1,5 @@ +marko,29,Beijing +vadas,27,Hongkong +josh,32,Beijing +peter,35,Shanghai +"li,nary",26,"Wu,han" \ No newline at end of file diff --git a/assembly/static/example/vertex_software.text b/assembly/static/example/vertex_software.text new file mode 100644 index 000000000..20b93ebb0 --- /dev/null +++ b/assembly/static/example/vertex_software.text @@ -0,0 +1,3 @@ +name|lang|price +lop|java|328 +ripple|java|199 \ No newline at end of file diff --git a/build.sh b/build.sh new file mode 100644 index 000000000..0d403d93b --- /dev/null +++ b/build.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +HUGEGRAPH_LOADER_RELEASE_PATH="${PWD}/output/" + +export MAVEN_HOME="/home/scmtools/buildkit/maven/apache-maven-3.3.9/" +export JAVA_HOME="/home/scmtools/buildkit/java/jdk1.8.0_25/" +export PATH="$JAVA_HOME/bin:$MAVEN_HOME/bin:$PATH" + +mvn clean compile diff --git a/pom.xml b/pom.xml new file mode 100644 index 000000000..7cd550f28 --- /dev/null +++ b/pom.xml @@ -0,0 +1,152 @@ + + + 4.0.0 + + com.baidu.hugegraph + hugegraph-loader + 0.6.1 + + + hugegraph-loader + ${release.name}-${project.version} + ${project.basedir}/assembly + ${assembly.dir}/descriptor + ${assembly.dir}/static + bash + UTF-8 + 1.8 + 1.8 + 2.8.2 + 4.12 + 21.0 + 3.6 + + + + + junit + junit + ${junit.version} + test + + + com.baidu.hugegraph + hugegraph-common + 1.4.4 + + + com.baidu.hugegraph + hugegraph-client + 1.5.6 + + + commons-io + commons-io + 2.6 + + + org.codehaus.groovy + groovy-all + 2.4.6 + + + com.beust + jcommander + 1.72 + + + com.opencsv + opencsv + 4.2 + + + + + + + maven-compiler-plugin + 3.1 + + ${compiler.source} + ${compiler.target} + + 500 + + + -Xlint:unchecked + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 2.4 + + + assembly-hugegraph-loader + package + + single + + + false + false + ${project.basedir} + ${assembly.descriptor.dir}/assembly.xml + ${final.name} + + + + + + maven-antrun-plugin + + + package + + run + + + + + tar -zcvf \ + ${project.basedir}/${final.name}.tar.gz ${final.name} || exit 1 + rm -f ${project.basedir}/dist.sh + echo -n "hugegraph-loader tar.gz available at: " + echo "${project.basedir}/${final.name}.tar.gz" + + + + + + + + + + + maven-clean-plugin + 3.0.0 + + + + ${project.basedir} + + *.tar.gz + ${final.name}/** + + false + + + ${final.name} + + + + + + + + \ No newline at end of file diff --git a/src/main/java/com/baidu/hugegraph/loader/HugeGraphLoader.java b/src/main/java/com/baidu/hugegraph/loader/HugeGraphLoader.java new file mode 100644 index 000000000..9c61e89b6 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/HugeGraphLoader.java @@ -0,0 +1,270 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader; + +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import com.baidu.hugegraph.driver.HugeClient; +import com.baidu.hugegraph.loader.exception.LoadException; +import com.baidu.hugegraph.loader.exception.ParseException; +import com.baidu.hugegraph.loader.executor.GroovyExecutor; +import com.baidu.hugegraph.loader.executor.HugeClients; +import com.baidu.hugegraph.loader.executor.LoadLogger; +import com.baidu.hugegraph.loader.executor.LoadOptions; +import com.baidu.hugegraph.loader.executor.LoadSummary; +import com.baidu.hugegraph.loader.parser.EdgeParser; +import com.baidu.hugegraph.loader.parser.VertexParser; +import com.baidu.hugegraph.loader.reader.InputReader; +import com.baidu.hugegraph.loader.reader.InputReaderFactory; +import com.baidu.hugegraph.loader.source.EdgeSource; +import com.baidu.hugegraph.loader.source.GraphSource; +import com.baidu.hugegraph.loader.source.VertexSource; +import com.baidu.hugegraph.loader.task.TaskManager; +import com.baidu.hugegraph.structure.graph.Edge; +import com.baidu.hugegraph.structure.graph.Vertex; +import com.baidu.hugegraph.util.E; +import com.baidu.hugegraph.util.Log; +import com.beust.jcommander.JCommander; + +public class HugeGraphLoader { + + private static final Logger LOG = Log.logger(HugeGraphLoader.class); + private static final LoadLogger LOG_PARSE = LoadLogger.logger("parseError"); + + private final JCommander commander; + private final TaskManager taskManager; + private final GraphSource graphSource; + + private long parseFailureNum = 0L; + + public static void main(String[] args) { + HugeGraphLoader loader = new HugeGraphLoader(args); + LoadSummary summary = loader.load(); + summary.print(); + } + + private HugeGraphLoader(String[] args) { + LoadOptions options = LoadOptions.instance(); + this.commander = JCommander.newBuilder().addObject(options).build(); + this.parseAndCheckOptions(args); + this.taskManager = new TaskManager(options); + this.graphSource = GraphSource.of(options.file); + } + + private void parseAndCheckOptions(String[] args) { + this.commander.parse(args); + // Check options + LoadOptions options = LoadOptions.instance(); + // Check option "-f" + E.checkArgument(!StringUtils.isEmpty(options.file), + "Must specified entrance groovy file"); + File scriptFile = new File(options.file); + if (!scriptFile.canRead()) { + LOG.error("Script file must be readable: '{}'", + scriptFile.getAbsolutePath()); + this.exitWithUsage(-1); + } + // Check option "-g" + E.checkArgument(!StringUtils.isEmpty(options.graph), + "Must specified a graph"); + // Check option "-h" + if (!options.host.startsWith("http://")) { + options.host = "http://" + options.host; + } + } + + private LoadSummary load() { + // Create schema + this.createSchema(); + + LoadSummary summary = new LoadSummary(); + // Prepare to load vertices + Instant begTime = Instant.now(); + System.out.print("Vertices has been imported: 0\b\b"); + // Load vertices + this.loadVertices(); + Instant endTime = Instant.now(); + Duration duration = Duration.between(begTime, endTime); + summary.parseFailureVertices(this.parseFailureNum); + summary.insertFailureVertices(this.taskManager.failureNum()); + summary.insertSuccessVertices(this.taskManager.successNum()); + summary.vertexLoadTime(duration); + System.out.println(" " + summary.insertSuccessVertices()); + // Reset counters + this.resetCounters(); + + // Prepare to load edges ... + begTime = Instant.now(); + System.out.print("Edges has been imported: 0\b\b"); + // Load edges + this.loadEdges(); + endTime = Instant.now(); + duration = Duration.between(begTime, endTime); + summary.parseFailureEdges(this.parseFailureNum); + summary.insertFailureEdges(this.taskManager.failureNum()); + summary.insertSuccessEdges(this.taskManager.successNum()); + summary.edgeLoadTime(duration); + System.out.println(" " + summary.insertSuccessEdges()); + // Reset counters + this.resetCounters(); + + LoadOptions options = LoadOptions.instance(); + // Shutdown task manager + this.taskManager.shutdown(options.shutdownTimeout); + return summary; + } + + private void resetCounters() { + this.taskManager.cleanup(); + this.parseFailureNum = 0L; + } + + private void createSchema() { + LoadOptions options = LoadOptions.instance(); + File schemaFile = FileUtils.getFile(options.schema); + HugeClient client = HugeClients.get(options); + GroovyExecutor groovyExecutor = new GroovyExecutor(); + groovyExecutor.bind("schema", client.schema()); + String script; + try { + script = FileUtils.readFileToString(schemaFile, "UTF-8"); + } catch (IOException e) { + throw new LoadException("Read schema file '%s' error", + e, options.schema); + } + groovyExecutor.execute(script, client); + } + + private void loadVertices() { + LoadOptions options = LoadOptions.instance(); + List vertexSources = this.graphSource.vertexSources(); + for (VertexSource source : vertexSources) { + InputReader reader = InputReaderFactory.create(source.input()); + VertexParser parser = new VertexParser(source, reader); + this.loadVertex(parser); + try { + parser.close(); + } catch (Exception e) { + LOG.warn("Failed to close parser for vertex source {}", source); + } + } + // Waiting async worker threads finish + this.taskManager.waitFinished(options.timeout); + } + + private void loadVertex(VertexParser parser) { + LoadOptions options = LoadOptions.instance(); + int batchSize = options.batchSize; + List batch = new ArrayList<>(batchSize); + while (parser.hasNext()) { + try { + Vertex vertex = parser.next(); + batch.add(vertex); + } catch (ParseException e) { + if (options.testMode) { + throw e; + } + LOG.error("Vertex parse error", e); + LOG_PARSE.error(e); + if (++this.parseFailureNum >= options.maxParseErrors) { + exitWithInfo("vertices", options.maxParseErrors); + } + continue; + } + if (batch.size() >= batchSize) { + this.taskManager.submitVertexBatch(batch); + batch = new ArrayList<>(batchSize); + } + } + if (batch.size() > 0) { + this.taskManager.submitVertexBatch(batch); + } + } + + private void loadEdges() { + LoadOptions options = LoadOptions.instance(); + List edgeSources = this.graphSource.edgeSources(); + for (EdgeSource source : edgeSources) { + InputReader reader = InputReaderFactory.create(source.input()); + EdgeParser parser = new EdgeParser(source, reader); + this.loadEdge(parser); + try { + parser.close(); + } catch (Exception e) { + LOG.warn("Failed to close parser for edge source {}", source); + } + } + // Waiting async worker threads finish + this.taskManager.waitFinished(options.timeout); + } + + private void loadEdge(EdgeParser parser) { + LoadOptions options = LoadOptions.instance(); + int batchSize = options.batchSize; + List batch = new ArrayList<>(batchSize); + while (parser.hasNext()) { + try { + Edge edge = parser.next(); + batch.add(edge); + } catch (ParseException e) { + if (options.testMode) { + throw e; + } + LOG.error("Edge parse error", e); + LOG_PARSE.error(e); + if (++this.parseFailureNum >= options.maxParseErrors) { + exitWithInfo("edges", options.maxParseErrors); + } + continue; + } + if (batch.size() >= batchSize) { + this.taskManager.submitEdgeBatch(batch); + batch = new ArrayList<>(batchSize); + } + } + if (batch.size() > 0) { + this.taskManager.submitEdgeBatch(batch); + } + } + + private void exitWithUsage(int status) { + this.commander.usage(); + System.exit(status); + } + + private static void exitWithInfo(String type, int parseErrors) { + LOG.error("Too many {} parse error ... Stopping", type); + // Print an empty line. + System.out.println(); + System.out.println(String.format( + "Error: More than %s %s parsing error ... Stopping", + parseErrors, type)); + System.exit(0); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/exception/InsertException.java b/src/main/java/com/baidu/hugegraph/loader/exception/InsertException.java new file mode 100644 index 000000000..154b6d840 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/exception/InsertException.java @@ -0,0 +1,54 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.exception; + +import com.baidu.hugegraph.structure.GraphElement; + +public class InsertException extends IllegalArgumentException { + + private final GraphElement element; + + public InsertException(GraphElement element, String message) { + super(message); + this.element = element; + } + + public InsertException(GraphElement element, String message, + Throwable cause) { + super(message, cause); + this.element = element; + } + + public InsertException(GraphElement element, String message, + Object... args) { + super(String.format(message, args)); + this.element = element; + } + + public InsertException(GraphElement element, String message, + Throwable cause, Object... args) { + super(String.format(message, args), cause); + this.element = element; + } + + public GraphElement element() { + return this.element; + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/exception/LoadException.java b/src/main/java/com/baidu/hugegraph/loader/exception/LoadException.java new file mode 100644 index 000000000..ed72d5e4f --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/exception/LoadException.java @@ -0,0 +1,39 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.exception; + +public class LoadException extends RuntimeException { + + public LoadException(String message) { + super(message); + } + + public LoadException(String message, Throwable cause) { + super(message, cause); + } + + public LoadException(String message, Object... args) { + super(String.format(message, args)); + } + + public LoadException(String message, Throwable cause, Object... args) { + super(String.format(message, args), cause); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/exception/ParseException.java b/src/main/java/com/baidu/hugegraph/loader/exception/ParseException.java new file mode 100644 index 000000000..cc37ab709 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/exception/ParseException.java @@ -0,0 +1,50 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.exception; + +public class ParseException extends IllegalArgumentException { + + private final String line; + + public ParseException(String line, String message) { + super(message); + this.line = line; + } + + public ParseException(String line, String message, Throwable cause) { + super(message, cause); + this.line = line; + } + + public ParseException(String line, String message, Object... args) { + super(String.format(message, args)); + this.line = line; + } + + public ParseException(String line, String message, Throwable cause, + Object... args) { + super(String.format(message, args), cause); + this.line = line; + } + + public String line() { + return this.line; + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/executor/GroovyExecutor.java b/src/main/java/com/baidu/hugegraph/loader/executor/GroovyExecutor.java new file mode 100644 index 000000000..79e047816 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/executor/GroovyExecutor.java @@ -0,0 +1,64 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.executor; + +import org.codehaus.groovy.control.CompilerConfiguration; +import org.codehaus.groovy.control.customizers.ImportCustomizer; +import org.slf4j.Logger; + +import com.baidu.hugegraph.driver.HugeClient; +import com.baidu.hugegraph.driver.SchemaManager; +import com.baidu.hugegraph.util.Log; + +import groovy.lang.Binding; +import groovy.lang.GroovyShell; +import groovy.util.DelegatingScript; + +public class GroovyExecutor { + + private static final Logger LOG = Log.logger(GroovyExecutor.class); + + private final Binding binding; + + public GroovyExecutor() { + this.binding = new Binding(); + } + + public void bind(String name, Object value) { + this.binding.setVariable(name, value); + } + + public void execute(String groovyScript, HugeClient client) { + CompilerConfiguration config = new CompilerConfiguration(); + config.setScriptBaseClass(DelegatingScript.class.getName()); + ImportCustomizer importCustomizer = new ImportCustomizer(); + importCustomizer.addImports(HugeClient.class.getName()); + importCustomizer.addImports(SchemaManager.class.getName()); + config.addCompilationCustomizers(importCustomizer); + + GroovyShell shell = new GroovyShell(getClass().getClassLoader(), + this.binding, config); + + // Groovy invoke java through the delegating script. + DelegatingScript script = (DelegatingScript) shell.parse(groovyScript); + script.setDelegate(client); + script.run(); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/executor/HugeClients.java b/src/main/java/com/baidu/hugegraph/loader/executor/HugeClients.java new file mode 100644 index 000000000..5a2cced39 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/executor/HugeClients.java @@ -0,0 +1,45 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.executor; + +import com.baidu.hugegraph.driver.HugeClient; +import com.baidu.hugegraph.loader.executor.LoadOptions; + +public class HugeClients { + + // TODO: seems no need to use ThreadLocal, reuse HugeClient is ok + private static final ThreadLocal instance = new ThreadLocal<>(); + + public static HugeClient get(LoadOptions options) { + HugeClient client = instance.get(); + if (client == null) { + client = newHugeClient(options); + instance.set(client); + } + return client; + } + + private HugeClients() {} + + private static HugeClient newHugeClient(LoadOptions options) { + String address = options.host + ":" + options.port; + return new HugeClient(address, options.graph, options.timeout); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/executor/LoadLogger.java b/src/main/java/com/baidu/hugegraph/loader/executor/LoadLogger.java new file mode 100644 index 000000000..c3cfc8841 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/executor/LoadLogger.java @@ -0,0 +1,50 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.executor; + +import org.slf4j.Logger; + +import com.baidu.hugegraph.loader.exception.InsertException; +import com.baidu.hugegraph.loader.exception.ParseException; +import com.baidu.hugegraph.util.JsonUtil; +import com.baidu.hugegraph.util.Log; + +public class LoadLogger { + + private final Logger log; + + public static LoadLogger logger(String name) { + return new LoadLogger(name); + } + + private LoadLogger(String name) { + this.log = Log.logger(name); + } + + public void error(ParseException e) { + this.log.error(">>>> PARSE ERROR: {}", e.getMessage()); + this.log.error("{}", e.line()); + } + + public void error(InsertException e) { + this.log.error(">>>> INSERT ERROR: {}", e.getMessage()); + this.log.error("{}", JsonUtil.toJson(e.element())); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java b/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java new file mode 100644 index 000000000..7e52702f3 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java @@ -0,0 +1,173 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.executor; + +import java.io.File; + +import org.slf4j.Logger; + +import com.baidu.hugegraph.util.Log; +import com.beust.jcommander.IParameterValidator; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; + +public class LoadOptions { + + private Logger LOG = Log.logger(LoadOptions.class); + + private static final LoadOptions instance = new LoadOptions(); + + public static LoadOptions instance() { + return instance; + } + + private LoadOptions() {} + + @Parameter(names = {"-f", "--file"}, required = true, arity = 1, + validateWith = {FileValidator.class}, + description = "The path of the data source description file") + public String file; + + @Parameter(names = {"-g", "--graph"}, required = true, arity = 1, + description = "The namespace of the graph to load into") + public String graph; + + @Parameter(names = {"-s", "--schema"}, required = true, arity = 1, + validateWith = {FileValidator.class}, + description = "The schema file path which to create manually") + public String schema; + + @Parameter(names = {"-h", "--host"}, arity = 1, + validateWith = {UrlValidator.class}, + description = "The host/IP of HugeGraphServer") + public String host = "localhost"; + + @Parameter(names = {"-p", "--port"}, arity = 1, + validateWith = {PositiveValidator.class}, + description = "The port of HugeGraphServer") + public int port = 8080; + + @Parameter(names = {"--num-threads"}, arity = 1, + validateWith = {PositiveValidator.class}, + description = "The number of threads to use") + public int numThreads = Runtime.getRuntime().availableProcessors() * 2 - 1; + + @Parameter(names = {"--batch-size"}, arity = 1, + validateWith = {PositiveValidator.class}, + description = "The number of lines in each submit") + public int batchSize = 500; + + @Parameter(names = {"--shutdown-timeout"}, arity = 1, + validateWith = {PositiveValidator.class}, + description = "The timeout of awaitTermination in seconds") + public int shutdownTimeout = 10; + + @Parameter(names = {"--check-vertex"}, arity = 1, + description = "Check vertices exists while inserting edges") + public boolean checkVertex = false; + + @Parameter(names = {"--max-parse-errors"}, arity = 1, + validateWith = {PositiveValidator.class}, + description = "The maximum number of rows that parse error " + + "before exiting") + public int maxParseErrors = 1; + + @Parameter(names = {"--max-insert-errors"}, arity = 1, + validateWith = {PositiveValidator.class}, + description = "The maximum number of rows that insert error " + + "before exiting") + public int maxInsertErrors = 500; + + @Parameter(names = {"--timeout"}, arity = 1, + validateWith = {PositiveValidator.class}, + description = "The timeout of inserting task in seconds") + public int timeout = 100; + + @Parameter(names = {"--retry-times"}, arity = 1, + validateWith = {PositiveValidator.class}, + description = "Setting the max retry times when loading timeout") + public int retryTimes = 0; + + @Parameter(names = {"--retry-interval"}, arity = 1, + validateWith = {PositiveValidator.class}, + description = "Setting the interval time before retrying") + public int retryInterval = 10; + + @Parameter(names = {"--test-mode"}, arity = 1, + description = "Whether the hugegraph-loader work in test mode") + public boolean testMode = false; + + public static class UrlValidator implements IParameterValidator { + + @Override + public void validate(String name, String value) { + String regex = "^((http)?://)" + + "?(([0-9a-z_!~*'().&=+$%-]+: )?[0-9a-z_!~*'().&=+$%-]+@)?" + + "(([0-9]{1,3}\\.){3}[0-9]{1,3}" // IP URL, like: 10.0.0.1 + + "|" // Or domain name + + "([0-9a-z_!~*'()-]+\\.)*" // Third level, like: www. + + "([0-9a-z][0-9a-z-]{0,61})?[0-9a-z]\\." // Second level + + "[a-z]{2,6})"; // First level, like: com or museum + if (!value.matches(regex)) { + throw new ParameterException(String.format( + "Invalid value of argument '%s': '%s'", name, value)); + } + } + } + + public static class DirectoryValidator implements IParameterValidator { + + @Override + public void validate(String name, String value) { + File file = new File(value); + if (!file.exists() || !file.isDirectory()) { + throw new ParameterException(String.format( + "Ensure the directory '%s' exists and is indeed a " + + "directory instead of a file", value)); + } + } + } + + public static class FileValidator implements IParameterValidator { + + @Override + public void validate(String name, String value) { + File file = new File(value); + if (!file.exists() || !file.isFile()) { + throw new ParameterException(String.format( + "Ensure the file '%s' exists and is indeed a file " + + "instead of a directory", value)); + } + } + } + + public static class PositiveValidator implements IParameterValidator { + + @Override + public void validate(String name, String value) { + int retry = Integer.parseInt(value); + if (retry <= 0) { + throw new ParameterException(String.format( + "Parameter '%s' should be positive, but got '%s'", + name, value)); + } + } + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/executor/LoadSummary.java b/src/main/java/com/baidu/hugegraph/loader/executor/LoadSummary.java new file mode 100644 index 000000000..e7b76633e --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/executor/LoadSummary.java @@ -0,0 +1,136 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.executor; + +import java.time.Duration; +import java.util.Formatter; + +public class LoadSummary { + + private static Formatter formatter = new Formatter(System.out); + + private long parseFailureVertices; + private long insertFailureVertices; + private long insertSuccessVertices; + + private long parseFailureEdges; + private long insertFailureEdges; + private long insertSuccessEdges; + + private Duration vertexLoadTime; + private Duration edgeLoadTime; + + public LoadSummary() { + this.vertexLoadTime = Duration.ZERO; + this.edgeLoadTime = Duration.ZERO; + } + + public void print() { + System.out.println("-------------------------------------------------"); + System.out.println("Vertex Results:"); + printInFormat("Parse failure vertices", this.parseFailureVertices()); + printInFormat("Insert failure vertices", this.insertFailureVertices()); + printInFormat("Insert success vertices", this.insertSuccessVertices()); + + System.out.println("-------------------------------------------------"); + System.out.println("Edge Results:"); + printInFormat("Parse failure edges", this.parseFailureEdges()); + printInFormat("Insert failure edges", this.insertFailureEdges()); + printInFormat("Insert success edges", this.insertSuccessEdges()); + + System.out.println("-------------------------------------------------"); + System.out.println("Time Results:"); + printInFormat("Vertex loading time", this.vertexLoadTime().getSeconds()); + printInFormat("Edge loading time", this.edgeLoadTime().getSeconds()); + printInFormat("Total loading time", this.totalTime().getSeconds()); + } + + private static void printInFormat(String desc, long value) { + formatter.format("\t%-25s:\t%-20d%n", desc, value); + } + + private Duration totalTime() { + return edgeLoadTime().plus(vertexLoadTime()); + } + + public Duration vertexLoadTime() { + return this.vertexLoadTime; + } + + public void vertexLoadTime(Duration duration) { + this.vertexLoadTime = duration; + } + + public Duration edgeLoadTime() { + return this.edgeLoadTime; + } + + public void edgeLoadTime(Duration duration) { + this.edgeLoadTime = duration; + } + + public long parseFailureVertices() { + return this.parseFailureVertices; + } + + public void parseFailureVertices(long count) { + this.parseFailureVertices = count; + } + + public long insertFailureVertices() { + return this.insertFailureVertices; + } + + public void insertFailureVertices(long count) { + this.insertFailureVertices = count; + } + + public long insertSuccessVertices() { + return this.insertSuccessVertices; + } + + public void insertSuccessVertices(long count) { + this.insertSuccessVertices = count; + } + + public long parseFailureEdges() { + return this.parseFailureEdges; + } + + public void parseFailureEdges(long count) { + this.parseFailureEdges = count; + } + + public long insertFailureEdges() { + return this.insertFailureEdges; + } + + public void insertFailureEdges(long count) { + this.insertFailureEdges = count; + } + + public long insertSuccessEdges() { + return this.insertSuccessEdges; + } + + public void insertSuccessEdges(long count) { + this.insertSuccessEdges = count; + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/parser/EdgeParser.java b/src/main/java/com/baidu/hugegraph/loader/parser/EdgeParser.java new file mode 100644 index 000000000..92d8defe3 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/parser/EdgeParser.java @@ -0,0 +1,135 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.parser; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import com.baidu.hugegraph.loader.reader.InputReader; +import com.baidu.hugegraph.loader.source.EdgeSource; +import com.baidu.hugegraph.structure.constant.IdStrategy; +import com.baidu.hugegraph.structure.graph.Edge; +import com.baidu.hugegraph.structure.schema.EdgeLabel; +import com.baidu.hugegraph.structure.schema.VertexLabel; +import com.baidu.hugegraph.util.E; + +public class EdgeParser extends ElementParser { + + private final EdgeSource source; + private final EdgeLabel edgeLabel; + private final VertexLabel sourceLabel; + private final VertexLabel targetLabel; + + public EdgeParser(EdgeSource source, InputReader reader) { + super(reader); + this.source = source; + this.edgeLabel = this.getEdgeLabel(source.label()); + this.sourceLabel = this.getVertexLabel(this.edgeLabel.sourceLabel()); + this.targetLabel = this.getVertexLabel(this.edgeLabel.targetLabel()); + // Ensure that the source/target id fileds are matched with id strategy + this.checkIdFields(this.sourceLabel, this.source.sourceFields()); + this.checkIdFields(this.targetLabel, this.source.targetFields()); + } + + @Override + public EdgeSource source() { + return this.source; + } + + @Override + protected Edge parse(Map keyValues) { + Edge edge = new Edge(this.source.label()); + // Must add source/target vertex id + edge.source(this.buildVertexId(this.sourceLabel, + this.source.sourceFields(), keyValues)); + edge.target(this.buildVertexId(this.targetLabel, + this.source.targetFields(), keyValues)); + // Must add source/target vertex label + edge.sourceLabel(this.sourceLabel.name()); + edge.targetLabel(this.targetLabel.name()); + // Add properties + this.addProperties(edge, keyValues); + return edge; + } + + @Override + protected boolean isIdField(String fieldName) { + return this.source.sourceFields().contains(fieldName) || + this.source.targetFields().contains(fieldName); + } + + private Object buildVertexId(VertexLabel vertexLabel, + List fieldNames, + Map keyValues) { + List primaryKeys = vertexLabel.primaryKeys(); + List primaryValues = new ArrayList<>(primaryKeys.size()); + for (String fieldName : fieldNames) { + if (!keyValues.containsKey(fieldName)) { + continue; + } + Object fieldValue = keyValues.get(fieldName); + String key = this.source.mappingField(fieldName); + Object value = this.validatePropertyValue(key, fieldValue); + + IdStrategy idStrategy = vertexLabel.idStrategy(); + if (isCustomize(idStrategy)) { + /* + * Check vertex id length when the id strategy of + * source/target label is CUSTOMIZE_STRING, + * just return when id strategy is CUSTOMIZE_NUMBER + */ + if (idStrategy == IdStrategy.CUSTOMIZE_STRING) { + String id = String.valueOf(fieldValue); + this.checkVertexIdLength(id); + return id; + } else { + assert idStrategy == IdStrategy.CUSTOMIZE_NUMBER; + return fieldValue; + } + } else { + // The id strategy of source/target label must be PRIMARY_KEY + if (primaryKeys.contains(key)) { + int index = primaryKeys.indexOf(key); + primaryValues.add(index, value); + } + } + } + + String id = this.spliceVertexId(vertexLabel, primaryValues); + this.checkVertexIdLength(id); + return id; + } + + private void checkIdFields(VertexLabel vertexLabel, List fields) { + if (isCustomize(vertexLabel.idStrategy())) { + E.checkArgument(fields.size() == 1, + "The source/target field can contains only one " + + "column when id strategy is CUSTOMIZE"); + } else if (isPrimaryKey(vertexLabel.idStrategy())) { + E.checkArgument(fields.size() >= 1, + "The source/target field must contains some " + + "columns when id strategy is CUSTOMIZE"); + } else { + throw new IllegalArgumentException( + "Unsupported AUTOMATIC id strategy for hugegraph-loader"); + } + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/parser/ElementParser.java b/src/main/java/com/baidu/hugegraph/loader/parser/ElementParser.java new file mode 100644 index 000000000..79066a044 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/parser/ElementParser.java @@ -0,0 +1,206 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.parser; + +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; + +import com.baidu.hugegraph.driver.HugeClient; +import com.baidu.hugegraph.loader.exception.ParseException; +import com.baidu.hugegraph.loader.executor.HugeClients; +import com.baidu.hugegraph.loader.executor.LoadOptions; +import com.baidu.hugegraph.loader.reader.InputReader; +import com.baidu.hugegraph.loader.source.ElementSource; +import com.baidu.hugegraph.loader.util.AutoCloseableIterator; +import com.baidu.hugegraph.loader.util.DataTypeUtil; +import com.baidu.hugegraph.structure.GraphElement; +import com.baidu.hugegraph.structure.SchemaElement; +import com.baidu.hugegraph.structure.constant.HugeType; +import com.baidu.hugegraph.structure.constant.IdStrategy; +import com.baidu.hugegraph.structure.schema.EdgeLabel; +import com.baidu.hugegraph.structure.schema.PropertyKey; +import com.baidu.hugegraph.structure.schema.VertexLabel; +import com.baidu.hugegraph.util.E; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; + +public abstract class ElementParser + implements AutoCloseableIterator { + + private static final int VERTEX_ID_LIMIT = 128; + private static final String ID_CHARSET = "UTF-8"; + + private final InputReader reader; + + private final HugeClient client; + private final Table schemas; + + ElementParser(InputReader reader) { + this.reader = reader; + this.client = HugeClients.get(LoadOptions.instance()); + this.schemas = HashBasedTable.create(); + this.reader.init(); + } + + public abstract ElementSource source(); + + public InputReader reader() { + return this.reader; + } + + @Override + public boolean hasNext() { + return this.reader.hasNext(); + } + + @Override + public GE next() { + String line = this.reader().line(); + try { + return this.parse(this.reader().next()); + } catch (IllegalArgumentException e) { + throw new ParseException(line, e.getMessage()); + } + } + + @Override + public void close() throws Exception { + this.reader.close(); + } + + protected abstract GE parse(Map keyValues); + + protected abstract boolean isIdField(String fieldName); + + protected void addProperties(GE element, Map keyValues) { + for (Map.Entry entry : keyValues.entrySet()) { + String fieldName = entry.getKey(); + Object fieldValue = entry.getValue(); + if (this.isIdField(fieldName)) { + continue; + } + String key = this.source().mappingField(fieldName); + Object value = this.validatePropertyValue(key, fieldValue); + + element.property(key, value); + } + } + + protected PropertyKey getPropertyKey(String name) { + SchemaElement schema = this.schemas.get(HugeType.PROPERTY_KEY, name); + if (schema == null) { + schema = this.client.schema().getPropertyKey(name); + } + if (schema == null) { + throw new IllegalStateException( + String.format("The property key %s doesn't exist", name)); + } else { + this.schemas.put(HugeType.PROPERTY_KEY, name, schema); + } + return (PropertyKey) schema; + } + + protected VertexLabel getVertexLabel(String name) { + SchemaElement schema = this.schemas.get(HugeType.VERTEX_LABEL, name); + if (schema == null) { + schema = this.client.schema().getVertexLabel(name); + } + if (schema == null) { + throw new IllegalStateException( + String.format("The vertex label %s doesn't exist", name)); + } else { + this.schemas.put(HugeType.VERTEX_LABEL, name, schema); + } + return (VertexLabel) schema; + } + + protected EdgeLabel getEdgeLabel(String name) { + SchemaElement schema = this.schemas.get(HugeType.EDGE_LABEL, name); + if (schema == null) { + schema = this.client.schema().getEdgeLabel(name); + } + if (schema == null) { + throw new IllegalStateException( + String.format("The edge label %s doesn't exist", name)); + } else { + this.schemas.put(HugeType.EDGE_LABEL, name, schema); + } + return (EdgeLabel) schema; + } + + protected String spliceVertexId(VertexLabel vertexLabel, + List primaryValues) { + E.checkArgument(vertexLabel.primaryKeys().size() == primaryValues.size(), + "Missing some primary key columns, expect %s, " + + "but only got %s for vertex label '%s'", + vertexLabel.primaryKeys(), primaryValues, vertexLabel); + + StringBuilder vertexId = new StringBuilder(); + StringBuilder vertexKeysId = new StringBuilder(); + String[] searchList = new String[]{":", "!"}; + String[] replaceList = new String[]{"`:", "`!"}; + for (Object value : primaryValues) { + String pkValue = String.valueOf(value); + pkValue = StringUtils.replaceEach(pkValue, searchList, replaceList); + vertexKeysId.append(pkValue); + vertexKeysId.append("!"); + } + + vertexId.append(vertexLabel.id()).append(":").append(vertexKeysId); + vertexId.deleteCharAt(vertexId.length() - 1); + return vertexId.toString(); + } + + protected void checkVertexIdLength(String id) { + try { + E.checkArgument(id.getBytes(ID_CHARSET).length <= VERTEX_ID_LIMIT, + "Vertex id length limit is '%s', '%s' exceeds it", + VERTEX_ID_LIMIT, id); + } catch (UnsupportedEncodingException e) { + throw new IllegalStateException(e); + } + } + + protected Object validatePropertyValue(String key, Object rawValue) { + PropertyKey pKey = this.getPropertyKey(key); + Object value = DataTypeUtil.convert(rawValue, pKey); + E.checkArgument(value != null, + "The value '%s' can't convert to class %s " + + "with cardinality %s", + rawValue, pKey.dataType().clazz(), pKey.cardinality()); + return value; + } + + public static boolean isAutomatic(IdStrategy idStrategy) { + return idStrategy == IdStrategy.AUTOMATIC; + } + + public static boolean isCustomize(IdStrategy idStrategy) { + return idStrategy == IdStrategy.CUSTOMIZE_STRING || + idStrategy == IdStrategy.CUSTOMIZE_NUMBER; + } + + public static boolean isPrimaryKey(IdStrategy idStrategy) { + return idStrategy == IdStrategy.PRIMARY_KEY; + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/parser/VertexParser.java b/src/main/java/com/baidu/hugegraph/loader/parser/VertexParser.java new file mode 100644 index 000000000..6ad279493 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/parser/VertexParser.java @@ -0,0 +1,116 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.parser; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import com.baidu.hugegraph.loader.reader.InputReader; +import com.baidu.hugegraph.loader.source.VertexSource; +import com.baidu.hugegraph.structure.constant.IdStrategy; +import com.baidu.hugegraph.structure.graph.Vertex; +import com.baidu.hugegraph.structure.schema.VertexLabel; +import com.baidu.hugegraph.util.E; + +public class VertexParser extends ElementParser { + + private final VertexSource source; + private final VertexLabel vertexLabel; + + public VertexParser(VertexSource source, InputReader reader) { + super(reader); + this.source = source; + this.vertexLabel = this.getVertexLabel(source.label()); + // Ensure the id field is matched with id strategy + this.checkIdField(); + } + + @Override + public VertexSource source() { + return this.source; + } + + @Override + protected Vertex parse(Map keyValues) { + Vertex vertex = new Vertex(this.source.label()); + // Assign or check id if need + this.assignIdIfNeed(vertex, keyValues); + // Add properties + this.addProperties(vertex, keyValues); + return vertex; + } + + @Override + protected boolean isIdField(String fieldName) { + return fieldName.equals(this.source.idField()); + } + + private void assignIdIfNeed(Vertex vertex, Map keyValues) { + // The id strategy must be CUSTOMIZE/PRIMARY_KEY via 'checkIdField()' + if (isCustomize(this.vertexLabel.idStrategy())) { + assert this.source.idField() != null; + Object idValue = keyValues.get(this.source.idField()); + E.checkArgument(idValue != null, + "The value of id field '%s' can't be null", + this.source.idField()); + + String id = String.valueOf(idValue); + if (this.vertexLabel.idStrategy() == IdStrategy.CUSTOMIZE_STRING) { + this.checkVertexIdLength(id); + vertex.id(id); + } + } else { + assert isPrimaryKey(this.vertexLabel.idStrategy()); + List primaryKeys = this.vertexLabel.primaryKeys(); + List primaryValues = new ArrayList<>(primaryKeys.size()); + for (Map.Entry entry : keyValues.entrySet()) { + String fieldName = entry.getKey(); + Object fieldValue = entry.getValue(); + + String key = this.source.mappingField(fieldName); + Object value = this.validatePropertyValue(key, fieldValue); + + if (primaryKeys.contains(key)) { + int index = primaryKeys.indexOf(key); + primaryValues.add(index, value); + } + } + String id = this.spliceVertexId(this.vertexLabel, primaryValues); + this.checkVertexIdLength(id); + } + } + + private void checkIdField() { + if (isCustomize(this.vertexLabel.idStrategy())) { + E.checkState(this.source.idField() != null, + "The id field can't be empty or null " + + "when id strategy is CUSTOMIZE"); + } else if (isPrimaryKey(this.vertexLabel.idStrategy())) { + E.checkState(this.source.idField() == null, + "The id field must be empty or null " + + "when id strategy is PRIMARY_KEY"); + } else { + // The id strategy is automatic + throw new IllegalArgumentException( + "Unsupported AUTOMATIC id strategy for hugegraph-loader"); + } + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/reader/InputReader.java b/src/main/java/com/baidu/hugegraph/loader/reader/InputReader.java new file mode 100644 index 000000000..5f8499336 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/reader/InputReader.java @@ -0,0 +1,32 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.reader; + +import java.util.Map; + +import com.baidu.hugegraph.loader.util.AutoCloseableIterator; + +public interface InputReader + extends AutoCloseableIterator> { + + public void init(); + + public String line(); +} diff --git a/src/main/java/com/baidu/hugegraph/loader/reader/InputReaderFactory.java b/src/main/java/com/baidu/hugegraph/loader/reader/InputReaderFactory.java new file mode 100644 index 000000000..c75c0fbea --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/reader/InputReaderFactory.java @@ -0,0 +1,57 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.reader; + +import com.baidu.hugegraph.loader.reader.file.CsvFileReader; +import com.baidu.hugegraph.loader.reader.file.FileReader; +import com.baidu.hugegraph.loader.reader.file.JsonFileReader; +import com.baidu.hugegraph.loader.reader.file.TextFileReader; +import com.baidu.hugegraph.loader.source.file.FileFormat; +import com.baidu.hugegraph.loader.source.file.FileSource; +import com.baidu.hugegraph.loader.source.InputSource; + +public class InputReaderFactory { + + public static InputReader create(InputSource source) { + switch (source.type()) { + case FILE: + return createFileReader((FileSource) source); + default: + // TODO: Expand more input sources + throw new AssertionError(String.format( + "Unsupported input source '%s'", source.type())); + } + } + + private static FileReader createFileReader(FileSource source) { + FileFormat format = source.format(); + switch (format) { + case CSV: + return new CsvFileReader(source); + case TEXT: + return new TextFileReader(source); + case JSON: + return new JsonFileReader(source); + default: + throw new AssertionError(String.format( + "Unsupported file format '%s'", source)); + } + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/reader/file/CsvFileReader.java b/src/main/java/com/baidu/hugegraph/loader/reader/file/CsvFileReader.java new file mode 100644 index 000000000..14e99902f --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/reader/file/CsvFileReader.java @@ -0,0 +1,54 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.reader.file; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import com.baidu.hugegraph.loader.exception.ParseException; +import com.baidu.hugegraph.loader.source.file.FileSource; +import com.opencsv.CSVParser; +import com.opencsv.CSVParserBuilder; + +public class CsvFileReader extends TextFileReader { + + private static final String DELIMITER = ","; + + private final CSVParser parser; + + public CsvFileReader(FileSource fileSource) { + super(fileSource); + this.delimiter = DELIMITER; + char separator = this.delimiter.charAt(0); + this.parser = new CSVParserBuilder().withSeparator(separator) + .withIgnoreQuotations(false) + .build(); + } + + @Override + protected List split(String line) { + try { + return Arrays.asList(this.parser.parseLine(line)); + } catch (IOException e) { + throw new ParseException(line, "Parse line '%s' error", e, line); + } + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/reader/file/FileReader.java b/src/main/java/com/baidu/hugegraph/loader/reader/file/FileReader.java new file mode 100644 index 000000000..5dcbe7540 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/reader/file/FileReader.java @@ -0,0 +1,136 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.reader.file; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.util.Map; +import java.util.NoSuchElementException; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; + +import com.baidu.hugegraph.loader.exception.LoadException; +import com.baidu.hugegraph.loader.reader.InputReader; +import com.baidu.hugegraph.loader.source.file.FileSource; +import com.baidu.hugegraph.util.Log; + +public abstract class FileReader implements InputReader { + + private Logger LOG = Log.logger(FileReader.class); + + private static final int BUF_SIZE = 5 * 1024 * 1024; + + private final FileSource source; + private final BufferedReader reader; + private String nextLine; + + public FileReader(FileSource source) { + this.source = source; + try { + this.reader = this.open(source); + } catch (IOException e) { + throw new LoadException("Failed to load input file '%s'", + e, source.path()); + } + this.nextLine = null; + } + + public FileSource source() { + return this.source; + } + + public String line() { + return this.nextLine; + } + + @Override + public boolean hasNext() { + if (this.nextLine == null) { + try { + this.nextLine = this.reader.readLine(); + } catch (IOException e) { + throw new LoadException("Read next line error", e); + } + } + return this.nextLine != null; + } + + @Override + public Map next() { + if (!this.hasNext()) { + throw new NoSuchElementException("Reach end of file"); + } + String line = this.nextLine; + this.nextLine = null; + return this.transform(line); + } + + @Override + public void close() throws IOException { + this.reader.close(); + } + + protected abstract Map transform(String line); + + private BufferedReader open(FileSource source) throws IOException { + String path = source.path(); + String charset = source.charset(); + + File file = FileUtils.getFile(path); + checkFile(file); + + InputStream fis = null; + try { + fis = new FileInputStream(file); + Reader isr = new InputStreamReader(fis, charset); + return new BufferedReader(isr, BUF_SIZE); + } catch (FileNotFoundException | UnsupportedEncodingException e) { + if (fis != null) { + try { + fis.close(); + } catch (IOException ignored) { + LOG.warn("Failed to close file {}", path); + } + } + throw e; + } + } + + private static void checkFile(File file) { + if (!file.exists()) { + throw new LoadException("The file %s doesn't exist", file); + } + if (!file.isFile()) { + throw new LoadException( + "The file %s must be file rather than a directory", file); + } + if (!file.canRead()) { + throw new LoadException("The file %s must be readable", file); + } + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/reader/file/JsonFileReader.java b/src/main/java/com/baidu/hugegraph/loader/reader/file/JsonFileReader.java new file mode 100644 index 000000000..d7a6502b8 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/reader/file/JsonFileReader.java @@ -0,0 +1,50 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.reader.file; + +import java.util.Map; + +import com.baidu.hugegraph.loader.exception.ParseException; +import com.baidu.hugegraph.loader.source.file.FileSource; +import com.baidu.hugegraph.rest.SerializeException; +import com.baidu.hugegraph.util.JsonUtil; + +public class JsonFileReader extends FileReader { + + public JsonFileReader(FileSource fileSource) { + super(fileSource); + } + + @Override + public void init() { + // pass + } + + @Override + @SuppressWarnings("unchecked") + protected Map transform(String line) { + try { + return JsonUtil.fromJson(line, Map.class); + } catch (SerializeException e) { + throw new ParseException(line, "Deserialize line '%s' error", + e, line); + } + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/reader/file/TextFileReader.java b/src/main/java/com/baidu/hugegraph/loader/reader/file/TextFileReader.java new file mode 100644 index 000000000..4f736a385 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/reader/file/TextFileReader.java @@ -0,0 +1,99 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.reader.file; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.baidu.hugegraph.loader.exception.LoadException; +import com.baidu.hugegraph.loader.exception.ParseException; +import com.baidu.hugegraph.loader.source.file.FileSource; +import com.google.common.base.Splitter; + +public class TextFileReader extends FileReader { + + private static final String DEFAULT_DELIMITER = "\t"; + + // Default is "\t" + protected String delimiter; + protected List header; + + public TextFileReader(FileSource source) { + super(source); + this.delimiter = DEFAULT_DELIMITER; + this.header = null; + } + + @Override + public void init() { + /* + * The delimiter must be initialized before header, because init header + * may use it + */ + this.initDelimiter(); + this.initHeader(); + } + + protected void initDelimiter() { + if (this.source().delimiter() != null) { + this.delimiter = this.source().delimiter(); + } + } + + protected void initHeader() { + if (this.source().header() != null) { + this.header = this.source().header(); + } else { + // If doesn't specify header, the first line is considered as header + if (this.hasNext()) { + this.header = this.split(this.line()); + this.next(); + } else { + throw new LoadException("Can't load data from empty file '%s'", + this.source().path()); + } + if (this.header.isEmpty()) { + throw new LoadException("The header is empty", + this.source().path()); + } + } + } + + @Override + public Map transform(String line) { + List columns = this.split(line); + if (columns.size() != this.header.size()) { + throw new ParseException(line, + "The column length '%s' doesn't match with " + + "header length '%s' on: %s", + columns.size(), this.header.size(), line); + } + Map keyValues = new HashMap<>(); + for (int i = 0; i < this.header.size(); i++) { + keyValues.put(this.header.get(i), columns.get(i)); + } + return keyValues; + } + + protected List split(String line) { + return Splitter.on(this.delimiter).splitToList(line); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/serializer/DeserializerException.java b/src/main/java/com/baidu/hugegraph/loader/serializer/DeserializerException.java new file mode 100644 index 000000000..121feafa1 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/serializer/DeserializerException.java @@ -0,0 +1,42 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.serializer; + +import com.baidu.hugegraph.loader.exception.LoadException; + +public class DeserializerException extends LoadException { + + private static final long serialVersionUID = -7837901607110262081L; + + public DeserializerException(String message, Throwable cause) { + super(message, cause); + } + + public DeserializerException(String message, Object... args) { + super(message, args); + } + + public static DeserializerException expectField(String expectField, + Object parentField) { + return new DeserializerException( + "Invalid json, expect '%s' in '%s'", + expectField, parentField); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/serializer/EdgeSourceDeserializer.java b/src/main/java/com/baidu/hugegraph/loader/serializer/EdgeSourceDeserializer.java new file mode 100644 index 000000000..21eaa322d --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/serializer/EdgeSourceDeserializer.java @@ -0,0 +1,66 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.serializer; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.baidu.hugegraph.loader.source.EdgeSource; +import com.baidu.hugegraph.loader.source.InputSource; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeType; + +public class EdgeSourceDeserializer + extends ElementSourceDeserializer { + + @Override + @SuppressWarnings("unchecked") + public EdgeSource deserialize(JsonParser parser, + DeserializationContext ctxt) + throws IOException { + JsonNode node = parser.getCodec().readTree(parser); + + JsonNode labelNode = getNode(node, "label", JsonNodeType.STRING); + String label = labelNode.asText(); + + JsonNode sourceNode = getNode(node, "source", JsonNodeType.ARRAY); + List source = this.read(sourceNode, List.class); + + JsonNode targetNode = getNode(node, "target", JsonNodeType.ARRAY); + List target = this.read(targetNode, List.class); + + JsonNode inputNode = getNode(node, "input", JsonNodeType.OBJECT); + InputSource input = this.readInputSource(inputNode); + + JsonNode mappingNode = node.get("mapping"); + Map mapping = null; + if (mappingNode != null) { + mapping = this.read(mappingNode, Map.class); + } else { + mapping = new HashMap<>(); + } + + return new EdgeSource(label, input, source, target, mapping); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/serializer/ElementSourceDeserializer.java b/src/main/java/com/baidu/hugegraph/loader/serializer/ElementSourceDeserializer.java new file mode 100644 index 000000000..346e035fc --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/serializer/ElementSourceDeserializer.java @@ -0,0 +1,51 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.serializer; + +import com.baidu.hugegraph.loader.source.file.FileSource; +import com.baidu.hugegraph.loader.source.InputSource; +import com.baidu.hugegraph.loader.util.JsonUtil; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeType; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public abstract class ElementSourceDeserializer + extends InputSourceDeserializer { + + private static final String FIELD_TYPE = "type"; + + protected InputSource readInputSource(JsonNode node) { + JsonNode typeNode = getNode(node, FIELD_TYPE, JsonNodeType.STRING); + String type = typeNode.asText().toUpperCase(); + assert node instanceof ObjectNode; + ObjectNode objectNode = (ObjectNode) node; + // The node 'type' doesn't participate in deserialization + objectNode.remove(FIELD_TYPE); + switch (type) { + case "FILE": + return JsonUtil.convert(node, FileSource.class); + default: + // TODO: Expand more input sources + throw new AssertionError(String.format( + "Unsupported input source '%s'", type)); + } + } + +} diff --git a/src/main/java/com/baidu/hugegraph/loader/serializer/FileSourceDeserializer.java b/src/main/java/com/baidu/hugegraph/loader/serializer/FileSourceDeserializer.java new file mode 100644 index 000000000..afd5e7d30 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/serializer/FileSourceDeserializer.java @@ -0,0 +1,47 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.serializer; + +import java.io.IOException; + +import com.baidu.hugegraph.loader.source.file.FileSource; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeType; + +public class FileSourceDeserializer extends + InputSourceDeserializer { + + private static final String FIELD_FILE_PATH = "path"; + private static final String FIELD_FILE_FORMAT = "format"; + + @Override + public FileSource deserialize(JsonParser parser, + DeserializationContext ctxt) + throws IOException { + JsonNode node = parser.getCodec().readTree(parser); + + // Make sure the node 'path' and 'format' exist + getNode(node, FIELD_FILE_PATH, JsonNodeType.STRING); + getNode(node, FIELD_FILE_FORMAT, JsonNodeType.STRING); + return this.read(node, FileSource.class); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/serializer/InputSourceDeserializer.java b/src/main/java/com/baidu/hugegraph/loader/serializer/InputSourceDeserializer.java new file mode 100644 index 000000000..a10197f12 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/serializer/InputSourceDeserializer.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.serializer; + +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.JsonNodeType; + +public abstract class InputSourceDeserializer extends JsonDeserializer { + + private static final ObjectMapper mapper = new ObjectMapper(); + + protected T read(JsonNode node, Class clazz) { + return this.mapper.convertValue(node, clazz); + } + + protected static JsonNode getNode(JsonNode node, String name, + JsonNodeType nodeType) { + JsonNode subNode = node.get(name); + if (subNode == null || subNode.getNodeType() != nodeType) { + throw DeserializerException.expectField(name, node); + } + return subNode; + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/serializer/VertexSourceDeserializer.java b/src/main/java/com/baidu/hugegraph/loader/serializer/VertexSourceDeserializer.java new file mode 100644 index 000000000..aa4a15458 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/serializer/VertexSourceDeserializer.java @@ -0,0 +1,62 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.serializer; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import com.baidu.hugegraph.loader.source.InputSource; +import com.baidu.hugegraph.loader.source.VertexSource; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeType; + +public class VertexSourceDeserializer + extends ElementSourceDeserializer { + + @Override + @SuppressWarnings("unchecked") + public VertexSource deserialize(JsonParser parser, + DeserializationContext ctxt) + throws IOException { + JsonNode node = parser.getCodec().readTree(parser); + + JsonNode labelNode = getNode(node, "label", JsonNodeType.STRING); + String label = labelNode.asText(); + + JsonNode idNode = node.get("id"); + String id = idNode != null ? idNode.asText() : null; + + JsonNode inputNode = getNode(node, "input", JsonNodeType.OBJECT); + InputSource input = this.readInputSource(inputNode); + + JsonNode mappingNode = node.get("mapping"); + Map mapping = null; + if (mappingNode != null) { + mapping = this.read(mappingNode, Map.class); + } else { + mapping = new HashMap<>(); + } + + return new VertexSource(label, input, id, mapping); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/source/EdgeSource.java b/src/main/java/com/baidu/hugegraph/loader/source/EdgeSource.java new file mode 100644 index 000000000..507ace915 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/source/EdgeSource.java @@ -0,0 +1,50 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.source; + +import java.util.List; +import java.util.Map; + +public class EdgeSource extends ElementSource { + + private final List sourceFields; + private final List targetFields; + + public EdgeSource(String label, InputSource input, + List sourceFields, List targetFields, + Map mapping) { + super(label, input, mapping); + this.sourceFields = sourceFields; + this.targetFields = targetFields; + } + + public List sourceFields() { + return this.sourceFields; + } + + public List targetFields() { + return this.targetFields; + } + + @Override + public String toString() { + return String.format("edge-source(%s)", this.label()); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/source/ElementSource.java b/src/main/java/com/baidu/hugegraph/loader/source/ElementSource.java new file mode 100644 index 000000000..a394da7d2 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/source/ElementSource.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.source; + +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public abstract class ElementSource { + + @JsonProperty("label") + private String label; + @JsonProperty("input") + private InputSource input; + @JsonProperty("mapping") + private Map mappingFields; + + public ElementSource(String label, InputSource input, + Map mappingFields) { + this.label = label; + this.input = input; + this.mappingFields = mappingFields; + } + + public String label() { + return this.label; + } + + public InputSource input() { + return this.input; + } + + public Map mappingFields() { + return this.mappingFields; + } + + public String mappingField(String fieldName) { + String mappingName = fieldName; + if (this.mappingFields.containsKey(fieldName)) { + mappingName = this.mappingFields.get(fieldName); + } + return mappingName; + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/source/GraphSource.java b/src/main/java/com/baidu/hugegraph/loader/source/GraphSource.java new file mode 100644 index 000000000..9d494f4be --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/source/GraphSource.java @@ -0,0 +1,63 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.source; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.FileUtils; + +import com.baidu.hugegraph.loader.exception.LoadException; +import com.baidu.hugegraph.loader.util.JsonUtil; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class GraphSource { + + @JsonProperty("vertices") + private List vertexSources; + @JsonProperty("edges") + private List edgeSources; + + public GraphSource() { + this.vertexSources = new ArrayList<>(); + this.edgeSources = new ArrayList<>(); + } + + public static GraphSource of(String filePath) { + try { + File file = FileUtils.getFile(filePath); + String json = FileUtils.readFileToString(file, "UTF-8"); + return JsonUtil.fromJson(json, GraphSource.class); + } catch (IOException e) { + throw new LoadException("Read data source file '%s' error", + filePath); + } + } + + public List vertexSources() { + return this.vertexSources; + } + + public List edgeSources() { + return this.edgeSources; + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/source/InputSource.java b/src/main/java/com/baidu/hugegraph/loader/source/InputSource.java new file mode 100644 index 000000000..fbc8cd4b0 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/source/InputSource.java @@ -0,0 +1,29 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.source; + +/** + * In addition to file, it will support various types of input sources, + * such as HDFS, database, Kafka, etc. + */ +public interface InputSource { + + public SourceType type(); +} diff --git a/src/main/java/com/baidu/hugegraph/loader/source/SourceType.java b/src/main/java/com/baidu/hugegraph/loader/source/SourceType.java new file mode 100644 index 000000000..927bdf279 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/source/SourceType.java @@ -0,0 +1,25 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.source; + +public enum SourceType { + + FILE; +} diff --git a/src/main/java/com/baidu/hugegraph/loader/source/VertexSource.java b/src/main/java/com/baidu/hugegraph/loader/source/VertexSource.java new file mode 100644 index 000000000..1d9638814 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/source/VertexSource.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.source; + +import java.util.Map; + +public class VertexSource extends ElementSource { + + // Be null when id strategy is primary key + private final String idField; + + public VertexSource(String label, InputSource input, + String idField, Map mapping) { + super(label, input, mapping); + this.idField = idField; + } + + public String idField() { + return this.idField; + } + + @Override + public String toString() { + return String.format("vertex-source(%s)", this.label()); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/source/file/FileFormat.java b/src/main/java/com/baidu/hugegraph/loader/source/file/FileFormat.java new file mode 100644 index 000000000..a346a9c84 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/source/file/FileFormat.java @@ -0,0 +1,29 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.source.file; + +public enum FileFormat { + + CSV, + + TEXT, + + JSON; +} diff --git a/src/main/java/com/baidu/hugegraph/loader/source/file/FileSource.java b/src/main/java/com/baidu/hugegraph/loader/source/file/FileSource.java new file mode 100644 index 000000000..a01fe211d --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/source/file/FileSource.java @@ -0,0 +1,70 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.source.file; + +import java.util.List; + +import com.baidu.hugegraph.loader.source.InputSource; +import com.baidu.hugegraph.loader.source.SourceType; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class FileSource implements InputSource { + + private static final String DEFAULT_CHARSET = "UTF-8"; + + @JsonProperty("path") + private String path; + @JsonProperty("format") + private FileFormat format; + @JsonProperty("header") + private List header; + @JsonProperty("delimiter") + private String delimiter; + @JsonProperty("charset") + private String charset = DEFAULT_CHARSET; + + public FileSource() { + } + + @Override + public SourceType type() { + return SourceType.FILE; + } + + public String path() { + return this.path; + } + + public FileFormat format() { + return this.format; + } + + public List header() { + return this.header; + } + + public String delimiter() { + return this.delimiter; + } + + public String charset() { + return this.charset; + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/task/InsertEdgeTask.java b/src/main/java/com/baidu/hugegraph/loader/task/InsertEdgeTask.java new file mode 100644 index 000000000..be6131eec --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/task/InsertEdgeTask.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.task; + +import java.util.List; + +import com.baidu.hugegraph.loader.executor.LoadOptions; +import com.baidu.hugegraph.structure.graph.Edge; + +public class InsertEdgeTask extends InsertTask { + + InsertEdgeTask(List batch) { + super(batch); + } + + @Override + protected void execute() { + LoadOptions options = LoadOptions.instance(); + this.client().graph().addEdges(this.batch(), options.checkVertex); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/task/InsertTask.java b/src/main/java/com/baidu/hugegraph/loader/task/InsertTask.java new file mode 100644 index 000000000..1da77a1c4 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/task/InsertTask.java @@ -0,0 +1,96 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.task; + +import java.util.List; +import java.util.concurrent.Callable; + +import com.baidu.hugegraph.driver.HugeClient; +import com.baidu.hugegraph.exception.ServerException; +import com.baidu.hugegraph.loader.executor.HugeClients; +import com.baidu.hugegraph.loader.executor.LoadOptions; +import com.baidu.hugegraph.rest.ClientException; +import com.baidu.hugegraph.structure.GraphElement; + +public abstract class InsertTask + implements Callable { + + protected static final String ILLEGAL_ARGUMENT_EXCEPTION = + "class java.lang.IllegalArgumentException"; + + private final List batch; + private final HugeClient client; + + public InsertTask(List batch) { + this.batch = batch; + this.client = HugeClients.get(LoadOptions.instance()); + } + + public List batch() { + return this.batch; + } + + public HugeClient client() { + return this.client; + } + + @Override + public Integer call() { + if (this.batch == null || this.batch.isEmpty()) { + return 0; + } + + LoadOptions options = LoadOptions.instance(); + int retryTimes = options.retryTimes; + int retryCount = 0; + do { + try { + this.execute(); + break; + } catch (ClientException e) { + retryCount = this.waitThenRetry(retryCount, e); + } catch (ServerException e) { + if (ILLEGAL_ARGUMENT_EXCEPTION.equals(e.exception())) { + throw e; + } + retryCount = this.waitThenRetry(retryCount, e); + } + } while (retryCount > 0 && retryCount <= retryTimes); + + return this.batch.size(); + } + + protected int waitThenRetry(int retryCount, RuntimeException e) { + LoadOptions options = LoadOptions.instance(); + try { + int retryInterval = options.retryInterval; + Thread.sleep(retryInterval * 1000); + } catch (InterruptedException ignored) { + // That's fine, just continue. + } + + if (++retryCount > options.retryTimes) { + throw e; + } + return retryCount; + } + + protected abstract void execute(); +} diff --git a/src/main/java/com/baidu/hugegraph/loader/task/InsertVertexTask.java b/src/main/java/com/baidu/hugegraph/loader/task/InsertVertexTask.java new file mode 100644 index 000000000..7b5aeff66 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/task/InsertVertexTask.java @@ -0,0 +1,36 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.task; + +import java.util.List; + +import com.baidu.hugegraph.structure.graph.Vertex; + +public class InsertVertexTask extends InsertTask { + + InsertVertexTask(List batch) { + super(batch); + } + + @Override + protected void execute() { + this.client().graph().addVertices(this.batch()); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/task/TaskManager.java b/src/main/java/com/baidu/hugegraph/loader/task/TaskManager.java new file mode 100644 index 000000000..101ca1fb2 --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/task/TaskManager.java @@ -0,0 +1,297 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.task; + +import java.util.List; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; + +import org.slf4j.Logger; + +import com.baidu.hugegraph.driver.GraphManager; +import com.baidu.hugegraph.loader.exception.InsertException; +import com.baidu.hugegraph.loader.executor.HugeClients; +import com.baidu.hugegraph.loader.executor.LoadLogger; +import com.baidu.hugegraph.loader.executor.LoadOptions; +import com.baidu.hugegraph.loader.util.LoaderUtil; +import com.baidu.hugegraph.structure.graph.Edge; +import com.baidu.hugegraph.structure.graph.Vertex; +import com.baidu.hugegraph.util.Log; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +public class TaskManager { + + private static final Logger LOG = Log.logger(TaskManager.class); + private static final LoadLogger LOG_VERTEX_INSERT = + LoadLogger.logger("vertexInsertError"); + private static final LoadLogger LOG_EDGE_INSERT = + LoadLogger.logger("edgeInsertError"); + + private static final long BATCH_PRINT_FREQUENCY = 10_000_000L; + private static final long SINGLE_PRINT_FREQUENCY = 10_000L; + + private final int futureNum; + private final Semaphore available; + private final ListeningExecutorService batchService; + private final ExecutorService singleExecutor; + private final CompletionService singleService; + private final LongAdder singleTasks; + + private final LongAdder successNum; + private final LongAdder failureNum; + + public TaskManager(LoadOptions options) { + this.futureNum = options.numThreads; + this.available = new Semaphore(this.futureNum); + ExecutorService pool = Executors.newFixedThreadPool(this.futureNum); + this.batchService = MoreExecutors.listeningDecorator(pool); + this.singleExecutor = Executors.newFixedThreadPool(1); + this.singleService = new ExecutorCompletionService<>(singleExecutor); + this.singleTasks = new LongAdder(); + this.successNum = new LongAdder(); + this.failureNum = new LongAdder(); + } + + public long successNum() { + return this.successNum.longValue(); + } + + public long failureNum() { + return this.failureNum.longValue(); + } + + public boolean waitFinished(int timeout) { + try { + // Wait batch task finished + this.available.acquire(this.futureNum); + // Wait single task finished + this.tryConsume(timeout); + if (this.singleTasks.longValue() != 0L) { + return false; + } + } catch (InterruptedException e) { + return false; + } finally { + this.available.release(this.futureNum); + } + return true; + } + + public void cleanup() { + this.successNum.reset(); + this.failureNum.reset(); + } + + public void shutdown(int seconds) { + try { + LOG.debug("Attempt to shutdown executor."); + this.batchService.shutdown(); + this.batchService.awaitTermination(seconds, TimeUnit.SECONDS); + this.singleExecutor.shutdown(); + this.singleExecutor.awaitTermination(seconds, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.error("Tasks is interrupted."); + } finally { + if (!this.batchService.isTerminated()) { + LOG.error("Cancel unfinished tasks."); + } + this.batchService.shutdownNow(); + LOG.debug("Shutdown is completed."); + } + } + + public void submitVertexBatch(List batch) { + this.ensurePoolAvailable(); + try { + this.available.acquire(); + } catch (InterruptedException ignored) { + } + + InsertVertexTask task = new InsertVertexTask(batch); + ListenableFuture future = this.batchService.submit(task); + + Futures.addCallback(future, new FutureCallback() { + + @Override + public void onSuccess(Integer size) { + available.release(); + successNum.add(size); + printProgress("Vertices", BATCH_PRINT_FREQUENCY, size); + } + + @Override + public void onFailure(Throwable t) { + available.release(); + submitVerticesInSingleMode(batch); + } + }); + } + + public void submitEdgeBatch(List batch) { + this.ensurePoolAvailable(); + try { + this.available.acquire(); + } catch (InterruptedException ignored) { + } + + InsertEdgeTask task = new InsertEdgeTask(batch); + ListenableFuture future = this.batchService.submit(task); + + Futures.addCallback(future, new FutureCallback() { + + @Override + public void onSuccess(Integer size) { + available.release(); + successNum.add(size); + printProgress("Edges", BATCH_PRINT_FREQUENCY, size); + } + + @Override + public void onFailure(Throwable t) { + available.release(); + submitEdgesInSingleMode(batch); + } + }); + } + + private void submitVerticesInSingleMode(List vertices) { + LoadOptions options = LoadOptions.instance(); + int maxInsertErrors = options.maxInsertErrors; + int shutdownTimeout = options.shutdownTimeout; + GraphManager graph = HugeClients.get(options).graph(); + this.singleService.submit(() -> { + for (Vertex vertex : vertices) { + try { + graph.addVertex(vertex); + successNum.add(1); + } catch (Exception e) { + failureNum.add(1); + LOG.error("Vertex insert error", e); + if (options.testMode) { + throw e; + } + LOG_VERTEX_INSERT.error(new InsertException(vertex, + e.getMessage())); + + if (failureNum.longValue() >= maxInsertErrors) { + LOG.error("Too many vertices insert error... Stopping"); + // Print an empty line. + System.out.println(); + System.out.println(String.format( + "Error: More than %s vertices " + + "insert error... Stopping", + maxInsertErrors)); + this.shutdown(shutdownTimeout); + System.exit(-1); + } + } + } + printProgress("Vertices", SINGLE_PRINT_FREQUENCY, vertices.size()); + return null; + }); + this.singleTasks.increment(); + + try { + this.tryConsume(0L); + } catch (InterruptedException ignored) {} + } + + private void submitEdgesInSingleMode(List edges) { + LoadOptions options = LoadOptions.instance(); + int maxInsertErrors = options.maxInsertErrors; + int shutdownTimeout = options.shutdownTimeout; + GraphManager graph = HugeClients.get(options).graph(); + this.singleService.submit(() -> { + for (Edge edge : edges) { + try { + graph.addEdge(edge); + successNum.add(1); + } catch (Exception e) { + failureNum.add(1); + LOG.error("Edge insert error", e); + if (options.testMode) { + throw e; + } + LOG_EDGE_INSERT.error(new InsertException(edge, + e.getMessage())); + + if (failureNum.longValue() >= maxInsertErrors) { + LOG.error("Too many edges insert error... Stopping"); + // Print an empty line. + System.out.println(); + System.out.println(String.format( + "Error: More than %s edges " + + "insert error... Stopping", + maxInsertErrors)); + this.shutdown(shutdownTimeout); + System.exit(-1); + } + } + } + printProgress("Edges", SINGLE_PRINT_FREQUENCY, edges.size()); + return null; + }); + this.singleTasks.increment(); + + try { + this.tryConsume(0L); + } catch (InterruptedException ignored) {} + } + + private void tryConsume(long timeout) throws InterruptedException { + long total = this.singleTasks.longValue(); + for (long i = 0; i < total; i++) { + Future future = this.singleService.poll(timeout, TimeUnit.SECONDS); + // The future is null if timeout + if (future != null) { + this.singleTasks.decrement(); + } + } + } + + private void printProgress(String type, long frequency, int batchSize) { + long inserted = this.successNum.longValue(); + System.out.print(String.format(" %d%s", + inserted, LoaderUtil.backward(inserted))); + if (inserted % frequency < batchSize) { + LOG.info("{} has been imported: {}", type, inserted); + } + } + + private void ensurePoolAvailable() { + while (this.batchService.isShutdown()){ + try { + Thread.sleep(100); + } catch (Exception ignored) { + // That's fine, just continue. + } + } + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/util/AutoCloseableIterator.java b/src/main/java/com/baidu/hugegraph/loader/util/AutoCloseableIterator.java new file mode 100644 index 000000000..fb957971a --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/util/AutoCloseableIterator.java @@ -0,0 +1,25 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.util; + +import java.util.Iterator; + +public interface AutoCloseableIterator extends AutoCloseable, Iterator { +} diff --git a/src/main/java/com/baidu/hugegraph/loader/util/DataTypeUtil.java b/src/main/java/com/baidu/hugegraph/loader/util/DataTypeUtil.java new file mode 100644 index 000000000..3f9646aae --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/util/DataTypeUtil.java @@ -0,0 +1,194 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.util; + +import static com.baidu.hugegraph.structure.constant.DataType.BYTE; +import static com.baidu.hugegraph.structure.constant.DataType.DOUBLE; +import static com.baidu.hugegraph.structure.constant.DataType.FLOAT; +import static com.baidu.hugegraph.structure.constant.DataType.INT; +import static com.baidu.hugegraph.structure.constant.DataType.LONG; + +import java.text.ParseException; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import com.baidu.hugegraph.structure.constant.Cardinality; +import com.baidu.hugegraph.structure.constant.DataType; +import com.baidu.hugegraph.structure.schema.PropertyKey; +import com.baidu.hugegraph.util.E; + +public final class DataTypeUtil { + + public static Object convert(Object value, PropertyKey propertyKey) { + if (value == null) { + return null; + } + + DataType dataType = propertyKey.dataType(); + if (propertyKey.cardinality() == Cardinality.SINGLE) { + if (dataType.isNumber()) { + return valueToNumber(value, dataType); + } else if (dataType.isDate()) { + return valueToDate(value, dataType); + } else if (dataType.isUUID()) { + return valueToUUID(value, dataType); + } + } + + if (checkValue(value, propertyKey)) { + return value; + } + + return null; + } + + public static boolean isNumber(DataType dataType) { + return dataType == BYTE || dataType == INT || dataType == LONG || + dataType == FLOAT || dataType == DOUBLE; + } + + public static boolean isDate(DataType dataType) { + return dataType == DataType.DATE; + } + + public static boolean isUUID(DataType dataType) { + return dataType == DataType.UUID; + } + + public static Number valueToNumber(Object value, DataType dataType) { + E.checkState(isNumber(dataType), "The target data type must be number"); + + if (dataType.clazz().isInstance(value)) { + return (Number) value; + } + + Number number = null; + try { + switch (dataType) { + case BYTE: + number = Byte.valueOf(value.toString()); + break; + case INT: + number = Integer.valueOf(value.toString()); + break; + case LONG: + number = Long.valueOf(value.toString()); + break; + case FLOAT: + number = Float.valueOf(value.toString()); + break; + case DOUBLE: + number = Double.valueOf(value.toString()); + break; + default: + throw new AssertionError(String.format( + "Number type only contains Byte, Integer, " + + "Long, Float, Double, but got %s", + dataType.clazz())); + } + } catch (NumberFormatException ignored) { + // Unmatched type found + } + return number; + } + + public static Date valueToDate(Object value, DataType dataType) { + if (value instanceof Date) { + return (Date) value; + } + if (isDate(dataType)) { + if (value instanceof Number) { + return new Date(((Number) value).longValue()); + } else if (value instanceof String) { + try { + return DateUtil.parse((String) value); + } catch (ParseException e) { + E.checkArgument(false, "%s, expect format: %s", + e.getMessage(), DateUtil.toPattern()); + } + } + } + return null; + } + + public static UUID valueToUUID(Object value, DataType dataType) { + if (value instanceof UUID) { + return (UUID) value; + } + if (isUUID(dataType) && value instanceof String) { + return UUID.fromString((String) value); + } + return null; + } + + public static boolean checkValue(Object value, PropertyKey propertyKey) { + boolean valid; + + Cardinality cardinality = propertyKey.cardinality(); + DataType dataType = propertyKey.dataType(); + switch (cardinality) { + case SINGLE: + valid = checkDataType(value, dataType); + break; + case SET: + valid = value instanceof Set; + valid = valid && checkCollectionDataType((Set) value, + dataType); + break; + case LIST: + valid = value instanceof List; + valid = valid && checkCollectionDataType((List) value, + dataType); + break; + default: + throw new AssertionError(String.format( + "Unsupported cardinality: '%s'", cardinality)); + } + return valid; + } + + /** + * Check type of the value valid + */ + public static boolean checkDataType(Object value, DataType dataType) { + if (value instanceof Number) { + return valueToNumber(value, dataType) != null; + } + return dataType.clazz().isInstance(value); + } + + /** + * Check type of all the values(may be some of list properties) valid + */ + public static boolean checkCollectionDataType(Collection values, + DataType dataType) { + boolean valid = true; + for (Object value : values) { + if (!checkDataType(value, dataType)) { + valid = false; + break; + } + } + return valid; + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/util/DateUtil.java b/src/main/java/com/baidu/hugegraph/loader/util/DateUtil.java new file mode 100644 index 000000000..bc83f68bd --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/util/DateUtil.java @@ -0,0 +1,39 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.util; + +import java.text.ParseException; +import java.util.Date; + +import com.baidu.hugegraph.date.SafeDateFormat; + +public final class DateUtil { + + private static final String DF = "yyyy-MM-dd HH:mm:ss.SSS"; + private static SafeDateFormat DATE_FORMAT = new SafeDateFormat(DF); + + public static Date parse(String source) throws ParseException { + return DATE_FORMAT.parse(source); + } + + public static Object toPattern() { + return DATE_FORMAT.toPattern(); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/util/JsonUtil.java b/src/main/java/com/baidu/hugegraph/loader/util/JsonUtil.java new file mode 100644 index 000000000..765589c2e --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/util/JsonUtil.java @@ -0,0 +1,72 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.util; + +import java.io.IOException; + +import com.baidu.hugegraph.loader.serializer.EdgeSourceDeserializer; +import com.baidu.hugegraph.loader.serializer.FileSourceDeserializer; +import com.baidu.hugegraph.loader.serializer.VertexSourceDeserializer; +import com.baidu.hugegraph.loader.source.EdgeSource; +import com.baidu.hugegraph.loader.source.file.FileSource; +import com.baidu.hugegraph.loader.source.VertexSource; +import com.baidu.hugegraph.rest.SerializeException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; + +public final class JsonUtil { + + private static final ObjectMapper mapper = new ObjectMapper(); + + static { + SimpleModule module = new SimpleModule(); + module.addDeserializer(VertexSource.class, new VertexSourceDeserializer()); + module.addDeserializer(EdgeSource.class, new EdgeSourceDeserializer()); + module.addDeserializer(FileSource.class, new FileSourceDeserializer()); + registerModule(module); + } + + public static void registerModule(Module module) { + mapper.registerModule(module); + } + + public static String toJson(Object object) { + try { + return mapper.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new SerializeException("Failed to serialize objects", e); + } + } + + public static T fromJson(String json, Class clazz) { + try { + return mapper.readValue(json, clazz); + } catch (IOException e) { + throw new SerializeException("Failed to deserialize json", e); + } + } + + public static T convert(JsonNode node, Class clazz) { + return mapper.convertValue(node, clazz); + } +} diff --git a/src/main/java/com/baidu/hugegraph/loader/util/LoaderUtil.java b/src/main/java/com/baidu/hugegraph/loader/util/LoaderUtil.java new file mode 100644 index 000000000..452cc4ede --- /dev/null +++ b/src/main/java/com/baidu/hugegraph/loader/util/LoaderUtil.java @@ -0,0 +1,31 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.util; + +public final class LoaderUtil { + + public static String backward(long word) { + StringBuilder backward = new StringBuilder(); + for (int i = 0, len = String.valueOf(word).length(); i <= len; i++) { + backward.append("\b"); + } + return backward.toString(); + } +} diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml new file mode 100644 index 000000000..0ffd92504 --- /dev/null +++ b/src/main/resources/log4j2.xml @@ -0,0 +1,68 @@ + + + + + UTF-8 + + + + + + + + + + + + + + + ${log_charset} + %m%n + + + + + ${log_charset} + %m%n + + + + + ${log_charset} + %m%n + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/test/java/com/baidu/hugegraph/loader/test/functional/FileUtil.java b/src/test/java/com/baidu/hugegraph/loader/test/functional/FileUtil.java new file mode 100644 index 000000000..e3e68a177 --- /dev/null +++ b/src/test/java/com/baidu/hugegraph/loader/test/functional/FileUtil.java @@ -0,0 +1,95 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.test.functional; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; + +import com.google.common.collect.ImmutableList; + +public class FileUtil { + + private static final String DEFAULT_CHARSET = "UTF-8"; + + public static String newCSVLine(Object... parts) { + return StringUtils.join(parts, ","); + } + + public static void clear(String fileName) { + File file = org.apache.commons.io.FileUtils.getFile(fileName); + checkFileValid(file, true); + try { + FileUtils.write(file, "", DEFAULT_CHARSET); + } catch (IOException e) { + throw new RuntimeException(String.format( + "Failed to clear file '%s'", fileName), e); + } + } + + public static void append(String fileName, String line) { + append(fileName, line, DEFAULT_CHARSET); + } + + public static void append(String fileName, String line, String charset) { + File file = org.apache.commons.io.FileUtils.getFile(fileName); + checkFileValid(file, true); + try { + FileUtils.writeLines(file, charset, ImmutableList.of(line), true); + } catch (IOException e) { + throw new RuntimeException(String.format( + "Failed to append line '%s' to file '%s'", + line, fileName), e); + } + } + + public static void delete(String fileName) { + try { + FileUtils.forceDelete(FileUtils.getFile(fileName)); + } catch (IOException e) { + throw new RuntimeException(String.format( + "Failed to delete file '%s'", fileName), e); + } + } + + private static void checkFileValid(File file, boolean autoCreate) { + if (!file.exists()) { + if (autoCreate) { + try { + file.createNewFile(); + } catch (IOException e) { + throw new RuntimeException(String.format( + "Failed to create file '%s'", file.getName()), e); + } + } else { + throw new RuntimeException(String.format( + "Please ensure the file '%s' exist", file.getName())); + } + } else { + if (!file.isFile() || !file.canWrite()) { + throw new RuntimeException(String.format( + "Please ensure the file '%s' is writable", + file.getName())); + } + } + } +} diff --git a/src/test/java/com/baidu/hugegraph/loader/test/functional/LoaderTest.java b/src/test/java/com/baidu/hugegraph/loader/test/functional/LoaderTest.java new file mode 100644 index 000000000..ab6299130 --- /dev/null +++ b/src/test/java/com/baidu/hugegraph/loader/test/functional/LoaderTest.java @@ -0,0 +1,447 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.test.functional; + +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.baidu.hugegraph.driver.GraphManager; +import com.baidu.hugegraph.driver.HugeClient; +import com.baidu.hugegraph.driver.SchemaManager; +import com.baidu.hugegraph.loader.HugeGraphLoader; +import com.baidu.hugegraph.loader.exception.ParseException; +import com.baidu.hugegraph.loader.test.util.Assert; +import com.baidu.hugegraph.structure.constant.DataType; +import com.baidu.hugegraph.structure.graph.Edge; +import com.baidu.hugegraph.structure.graph.Vertex; +import com.baidu.hugegraph.structure.schema.PropertyKey; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +public class LoaderTest { + + private static final String PATH_PREFIX = "src/test/resources"; + private static final String url = "http://127.0.0.1:8080"; + private static final String graph = "hugegraph"; + private static final HugeClient client = new HugeClient(url, graph); + + @BeforeClass + public static void setUp() { + clearFileData(); + clearServerData(); + } + + @Before + public void init() { + FileUtil.append(path("vertex_person.csv"), "name,age,city"); + FileUtil.append(path("vertex_software.csv"), "name,lang,price", "GBK"); + FileUtil.append(path("edge_knows.csv"), "aname,bname,date,weight"); + FileUtil.append(path("edge_created.csv"), "aname,bname,date,weight"); + } + + @After + public void clear() { + clearFileData(); + clearServerData(); + } + + @AfterClass + public static void tearDown() { + FileUtil.delete(path("vertex_person.csv")); + FileUtil.delete(path("vertex_software.csv")); + FileUtil.delete(path("edge_knows.csv")); + FileUtil.delete(path("edge_created.csv")); + } + + private static void clearFileData() { + FileUtil.clear(path("vertex_person.csv")); + FileUtil.clear(path("vertex_software.csv")); + FileUtil.clear(path("edge_knows.csv")); + FileUtil.clear(path("edge_created.csv")); + } + + private static void clearServerData() { + SchemaManager schema = client.schema(); + GraphManager graph = client.graph(); + // Clear edge + graph.listEdges().forEach(e -> graph.removeEdge(e.id())); + // Clear vertex + graph.listVertices().forEach(v -> graph.removeVertex(v.id())); + + // Clear schema + schema.getIndexLabels().forEach(il -> { + schema.removeIndexLabel(il.name()); + }); + schema.getEdgeLabels().forEach(el -> { + schema.removeEdgeLabel(el.name()); + }); + schema.getVertexLabels().forEach(vl -> { + schema.removeVertexLabel(vl.name()); + }); + schema.getPropertyKeys().forEach(pk -> { + schema.removePropertyKey(pk.name()); + }); + } + + /** + * NOTE: Unsupport auto create schema + */ + //@Test + public void testLoadWithAutoCreateSchema() { + String[] args = new String[]{"-f", "example/struct.json", + "-g", "hugegraph", + "--num-threads", "2"}; + try { + HugeGraphLoader.main(args); + } catch (Exception e) { + Assert.fail("Should not throw exception, but throw " + e); + } + + List propertyKeys = client.schema().getPropertyKeys(); + propertyKeys.forEach(pkey -> { + Assert.assertEquals(DataType.TEXT, pkey.dataType()); + }); + + List vertices = client.graph().listVertices(); + List edges = client.graph().listEdges(); + + Assert.assertEquals(7, vertices.size()); + Assert.assertEquals(6, edges.size()); + + boolean interestedVertex = false; + for (Vertex vertex : vertices) { + Assert.assertEquals(String.class, vertex.id().getClass()); + if (((String) vertex.id()).contains("li,nary")) { + interestedVertex = true; + Assert.assertEquals("26", vertex.property("age")); + Assert.assertEquals("Wu,han", vertex.property("city")); + } + } + Assert.assertTrue(interestedVertex); + + boolean interestedEdge = false; + for (Edge edge : edges) { + Assert.assertEquals(String.class, edge.source().getClass()); + Assert.assertEquals(String.class, edge.target().getClass()); + if (((String) edge.source()).contains("marko") && + ((String) edge.target()).contains("vadas")) { + interestedEdge = true; + Assert.assertEquals("20160110", edge.property("date")); + Assert.assertEquals("0.5", edge.property("weight")); + } + } + Assert.assertTrue(interestedEdge); + } + + @Test + public void testLoadWithCustomizedSchema() { + String[] args = new String[]{"-f", "example/struct.json", + "-s", "example/schema.groovy", + "-g", "hugegraph", + "--num-threads", "2", + "--test-mode", "true"}; + try { + HugeGraphLoader.main(args); + } catch (Exception e) { + Assert.fail("Should not throw exception, but throw " + e); + } + + List vertices = client.graph().listVertices(); + List edges = client.graph().listEdges(); + + Assert.assertEquals(7, vertices.size()); + Assert.assertEquals(6, edges.size()); + + boolean interestedVertex = false; + for (Vertex vertex : vertices) { + Assert.assertEquals(String.class, vertex.id().getClass()); + if (((String) vertex.id()).contains("li,nary")) { + interestedVertex = true; + Assert.assertEquals(26, vertex.property("age")); + Assert.assertEquals("Wu,han", vertex.property("city")); + } + } + Assert.assertTrue(interestedVertex); + + boolean interestedEdge = false; + for (Edge edge : edges) { + Assert.assertEquals(String.class, edge.source().getClass()); + Assert.assertEquals(String.class, edge.target().getClass()); + if (((String) edge.source()).contains("marko") && + ((String) edge.target()).contains("vadas")) { + interestedEdge = true; + Assert.assertEquals("20160110", edge.property("date")); + Assert.assertEquals(0.5, edge.property("weight")); + } + } + Assert.assertTrue(interestedEdge); + } + + @Test + public void testVertexIdExceedLimit() { + Integer[] array = new Integer[129]; + Arrays.fill(array, 1); + String tooLongId = StringUtils.join(array); + String line = FileUtil.newCSVLine(tooLongId, 29, "Beijing"); + FileUtil.append(path("vertex_person.csv"), line); + + String[] args = new String[]{"-f", path("struct.json"), + "-s", path("schema.groovy"), + "-g", "hugegraph", + "--num-threads", "2", + "--test-mode", "true"}; + + Assert.assertThrows(ParseException.class, () -> { + HugeGraphLoader.main(args); + }); + } + + @Test + public void testLoadWithIdExceedLimitLengthInBytes() { + String pk = "ecommerce__color__极光银翻盖上盖+" + + "琥珀啡翻盖下盖+咖啡金翻盖上盖装饰片+" + + "香槟金主镜片+深咖啡色副镜片+琥珀>" + + "啡前壳+极光银后壳+浅灰电池扣+极光银电池组件+深灰天线"; + assert pk.length() < 128; + String line = FileUtil.newCSVLine(pk, "中文", 328); + FileUtil.append(path("vertex_software.csv"), line, "GBK"); + + String[] args = new String[]{"-f", path("struct.json"), + "-s", path("schema.groovy"), + "-g", "hugegraph", + "--num-threads", "2", + "--test-mode", "true"}; + // Bytes encoded in utf-8 exceed 128 + Assert.assertThrows(ParseException.class, () -> { + HugeGraphLoader.main(args); + }); + } + + @Test + public void testVertexTooManyColumns() { + String line = FileUtil.newCSVLine("marko", 29, "Beijing", "Extra"); + FileUtil.append(path("vertex_person.csv"), line); + + String[] args = new String[]{"-f", path("struct.json"), + "-s", path("schema.groovy"), + "-g", "hugegraph", + "--num-threads", "2", + "--test-mode", "true"}; + + Assert.assertThrows(ParseException.class, () -> { + HugeGraphLoader.main(args); + }); + } + + @Test + public void testVertexTooFewColumns() { + String line = FileUtil.newCSVLine("marko", 29); + FileUtil.append(path("vertex_person.csv"), line); + + String[] args = new String[]{"-f", path("struct.json"), + "-s", path("schema.groovy"), + "-g", "hugegraph", + "--num-threads", "2", + "--test-mode", "true"}; + + Assert.assertThrows(ParseException.class, () -> { + HugeGraphLoader.main(args); + }); + } + + @Test + public void testUnmatchedPropertyDataType() { + String line = FileUtil.newCSVLine("marko", "Should be number", + "Beijing"); + FileUtil.append(path("vertex_person.csv"), line); + + String[] args = new String[]{"-f", path("struct.json"), + "-s", path("schema.groovy"), + "-g", "hugegraph", + "--num-threads", "2", + "--test-mode", "true"}; + + Assert.assertThrows(ParseException.class, () -> { + HugeGraphLoader.main(args); + }); + } + + @Test + public void testVertexPkContainsSpecicalSymbol() { + String line = FileUtil.newCSVLine("mar:ko!", 29, "Beijing"); + FileUtil.append(path("vertex_person.csv"), line); + + String[] args = new String[]{"-f", path("struct.json"), + "-s", path("schema.groovy"), + "-g", "hugegraph", + "--num-threads", "2", + "--test-mode", "true"}; + + try { + HugeGraphLoader.main(args); + } catch (Exception e) { + Assert.fail("Should not throw exception, but throw " + e); + } + + List vertices = client.graph().listVertices(); + Assert.assertEquals(1, vertices.size()); + Vertex vertex = vertices.get(0); + Assert.assertEquals(String.class, vertex.id().getClass()); + Assert.assertTrue(((String) vertex.id()).contains(":mar`:ko`!")); + Assert.assertEquals(29, vertex.property("age")); + Assert.assertEquals("Beijing", vertex.property("city")); + } + + @Test + public void testLoadWithUnmatchedEncodingCharset() { + String line = FileUtil.newCSVLine("lop", "中文", 328); + FileUtil.append(path("vertex_software.csv"), line, "GBK"); + + String[] args = new String[]{"-f", path("struct.json"), + "-g", "hugegraph", + "-s", path("schema.groovy"), + "--num-threads", "2", + "--test-mode", "true"}; + try { + HugeGraphLoader.main(args); + } catch (Exception e) { + Assert.fail("Should not throw exception, but throw " + e); + } + + List vertices = client.graph().listVertices(); + Assert.assertEquals(1, vertices.size()); + Vertex vertex = vertices.get(0); + Assert.assertEquals("lop", vertex.property("name")); + Assert.assertNotEquals("中文", vertex.property("lang")); + Assert.assertEquals(328.0, vertex.property("price")); + } + + @Test + public void testLoadWithMatchedEncodingCharset() { + String line = FileUtil.newCSVLine("lop", "中文", 328); + FileUtil.append(path("vertex_software.csv"), line, "GBK"); + + String[] args = new String[]{"-f", path("struct_gbk.json"), + "-g", "hugegraph", + "-s", path("schema.groovy"), + "--num-threads", "2", + "--test-mode", "true"}; + + try { + HugeGraphLoader.main(args); + } catch (Exception e) { + Assert.fail("Should not throw exception, but throw " + e); + } + + List vertices = client.graph().listVertices(); + Assert.assertEquals(1, vertices.size()); + Vertex vertex = vertices.get(0); + Assert.assertEquals("lop", vertex.property("name")); + Assert.assertEquals("中文", vertex.property("lang")); + Assert.assertEquals(328.0, vertex.property("price")); + } + + @Test + public void testLoadWithValueListPorpertyInJsonFile() { + String line = FileUtil.newCSVLine("marko", 29, "Beijing"); + FileUtil.append(path("vertex_person.csv"), line); + + line = FileUtil.newCSVLine("lop", "中文", 328); + FileUtil.append(path("vertex_software.csv"), line, "GBK"); + + line = "{\"person_name\": \"marko\", \"software_name\": \"lop\", " + + "\"feel\": [\"so so\", \"good\", \"good\"]}"; + FileUtil.append(path("edge_use.json"), line); + + String[] args = new String[]{"-f", path("struct_edge_use.json"), + "-g", "hugegraph", + "-s", path("schema.groovy"), + "--num-threads", "2", + "--test-mode", "true"}; + + try { + HugeGraphLoader.main(args); + } catch (Exception e) { + FileUtil.delete(path("edge_use.json")); + Assert.fail("Should not throw exception, but throw " + e); + } + + List edges = client.graph().listEdges(); + Assert.assertEquals(1, edges.size()); + Edge edge = edges.get(0); + + Assert.assertEquals("person", edge.sourceLabel()); + Assert.assertEquals("software", edge.targetLabel()); + Assert.assertEquals(ImmutableList.of("so so", "good", "good"), + edge.property("feel")); + + FileUtil.delete(path("edge_use.json")); + } + + // TODO: Support cardinality set property + // @Test + public void testLoadWithValueSetPorpertyInJsonFile() { + String line = FileUtil.newCSVLine("marko", 29, "Beijing"); + FileUtil.append(path("vertex_person.csv"), line); + + line = FileUtil.newCSVLine("lop", "中文", 328); + FileUtil.append(path("vertex_software.csv"), line, "GBK"); + + line = "{\"person_name\": \"marko\", \"software_name\": \"lop\", " + + "\"time\": [\"20171210\", \"20180101\"]}"; + FileUtil.append(path("edge_use.json"), line); + + String[] args = new String[]{"-f", path("struct_edge_use.json"), + "-g", "hugegraph", + "-s", path("schema.groovy"), + "--num-threads", "2", + "--test-mode", "true"}; + + try { + HugeGraphLoader.main(args); + } catch (Exception e) { + FileUtil.delete(path("edge_use.json")); + Assert.fail("Should not throw exception, but throw " + e); + } + + List edges = client.graph().listEdges(); + Assert.assertEquals(1, edges.size()); + Edge edge = edges.get(0); + + Assert.assertEquals("person", edge.sourceLabel()); + Assert.assertEquals("software", edge.targetLabel()); + Assert.assertEquals(ImmutableSet.of("20171210", "20180101"), + edge.property("time")); + + FileUtil.delete(path("edge_use.json")); + } + + private static String path(String fileName) { + return Paths.get(PATH_PREFIX, fileName).toString(); + } +} diff --git a/src/test/java/com/baidu/hugegraph/loader/test/util/Assert.java b/src/test/java/com/baidu/hugegraph/loader/test/util/Assert.java new file mode 100644 index 000000000..e10fabc63 --- /dev/null +++ b/src/test/java/com/baidu/hugegraph/loader/test/util/Assert.java @@ -0,0 +1,56 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.loader.test.util; + +import java.util.function.Consumer; + +public class Assert extends org.junit.Assert { + @FunctionalInterface + public interface ThrowableRunnable { + void run() throws Throwable; + } + public static void assertThrows(Class throwable, + ThrowableRunnable runnable) { + assertThrows(throwable, runnable, e -> { + System.err.println(e); + }); + } + public static void assertThrows(Class throwable, + ThrowableRunnable runnable, + Consumer exceptionConsumer) { + boolean fail = false; + try { + runnable.run(); + fail = true; + } catch (Throwable e) { + exceptionConsumer.accept(e); + if (!throwable.isInstance(e)) { + Assert.fail(String.format( + "Bad exception type %s(expect %s)", + e.getClass(), throwable)); + } + } + if (fail) { + Assert.fail(String.format( + "No exception was thrown(expect %s)", + throwable)); + } + } +} diff --git a/src/test/resources/schema.groovy b/src/test/resources/schema.groovy new file mode 100644 index 000000000..40b0ff3c3 --- /dev/null +++ b/src/test/resources/schema.groovy @@ -0,0 +1,27 @@ +// Define schema +schema.propertyKey("name").asText().ifNotExist().create(); +schema.propertyKey("age").asInt().ifNotExist().create(); +schema.propertyKey("city").asText().ifNotExist().create(); +schema.propertyKey("weight").asDouble().ifNotExist().create(); +schema.propertyKey("lang").asText().ifNotExist().create(); +schema.propertyKey("date").asText().ifNotExist().create(); +schema.propertyKey("price").asDouble().ifNotExist().create(); +schema.propertyKey("feel").asText().valueList().ifNotExist().create(); +schema.propertyKey("time").asText().valueSet().ifNotExist().create(); + +schema.vertexLabel("person").properties("name", "age", "city").primaryKeys("name").ifNotExist().create(); +schema.vertexLabel("software").properties("name", "lang", "price").primaryKeys("name").ifNotExist().create(); + +schema.indexLabel("personByName").onV("person").by("name").secondary().ifNotExist().create(); +schema.indexLabel("personByAge").onV("person").by("age").range().ifNotExist().create(); +schema.indexLabel("personByCity").onV("person").by("city").secondary().ifNotExist().create(); +schema.indexLabel("personByAgeAndCity").onV("person").by("age", "city").secondary().ifNotExist().create(); +schema.indexLabel("softwareByPrice").onV("software").by("price").range().ifNotExist().create(); + +schema.edgeLabel("knows").sourceLabel("person").targetLabel("person").properties("date", "weight").ifNotExist().create(); +schema.edgeLabel("created").sourceLabel("person").targetLabel("software").properties("date", "weight").ifNotExist().create(); +schema.edgeLabel("use").sourceLabel("person").targetLabel("software").properties("feel", "time").nullableKeys("feel", "time").ifNotExist().create(); + +schema.indexLabel("createdByDate").onE("created").by("date").secondary().ifNotExist().create(); +schema.indexLabel("createdByWeight").onE("created").by("weight").range().ifNotExist().create(); +schema.indexLabel("knowsByWeight").onE("knows").by("weight").range().ifNotExist().create(); \ No newline at end of file diff --git a/src/test/resources/struct.json b/src/test/resources/struct.json new file mode 100644 index 000000000..6d6713873 --- /dev/null +++ b/src/test/resources/struct.json @@ -0,0 +1,56 @@ +{ + "vertices": [ + { + "label": "person", + "input": { + "type": "file", + "path": "src/test/resources/vertex_person.csv", + "format": "CSV", + "charset": "UTF-8" + }, + "mapping": { + "name": "name", + "age": "age", + "city": "city" + } + }, + { + "label": "software", + "input": { + "type": "file", + "path": "src/test/resources/vertex_software.csv", + "format": "CSV" + } + } + ], + "edges": [ + { + "label": "knows", + "source": ["source_name"], + "target": ["target_name"], + "input": { + "type": "file", + "path": "src/test/resources/edge_knows.csv", + "format": "CSV" + }, + "mapping": { + "source_name": "name", + "target_name": "name" + } + }, + { + "label": "created", + "source": ["aname"], + "target": ["bname"], + "input": { + "type": "file", + "path": "src/test/resources/edge_created.csv", + "format": "CSV" + }, + "mapping": { + "aname": "name", + "bname": "name" + } + } + ] +} \ No newline at end of file diff --git a/src/test/resources/struct_edge_use.json b/src/test/resources/struct_edge_use.json new file mode 100644 index 000000000..2843e90ac --- /dev/null +++ b/src/test/resources/struct_edge_use.json @@ -0,0 +1,42 @@ +{ + "vertices": [ + { + "label": "person", + "input": { + "type": "file", + "path": "src/test/resources/vertex_person.csv", + "format": "CSV", + "charset": "UTF-8" + }, + "mapping": { + "name": "name", + "age": "age", + "city": "city" + } + }, + { + "label": "software", + "input": { + "type": "file", + "path": "src/test/resources/vertex_software.csv", + "format": "CSV" + } + } + ], + "edges": [ + { + "label": "use", + "source": ["person_name"], + "target": ["software_name"], + "input": { + "type": "file", + "path": "src/test/resources/edge_use.json", + "format": "JSON" + }, + "mapping": { + "person_name": "name", + "software_name": "name" + } + } + ] +} \ No newline at end of file diff --git a/src/test/resources/struct_gbk.json b/src/test/resources/struct_gbk.json new file mode 100644 index 000000000..daa0f2ee8 --- /dev/null +++ b/src/test/resources/struct_gbk.json @@ -0,0 +1,59 @@ +{ + "vertices": [ + { + "label": "person", + "input": { + "type": "file", + "path": "src/test/resources/vertex_person.csv", + "format": "CSV", + "charset": "GBK" + }, + "mapping": { + "name": "name", + "age": "age", + "city": "city" + } + }, + { + "label": "software", + "input": { + "type": "file", + "path": "src/test/resources/vertex_software.csv", + "format": "CSV", + "charset": "GBK" + } + } + ], + "edges": [ + { + "label": "knows", + "source": ["source_name"], + "target": ["target_name"], + "input": { + "type": "file", + "path": "src/test/resources/edge_knows.csv", + "format": "CSV", + "charset": "GBK" + }, + "mapping": { + "source_name": "name", + "target_name": "name" + } + }, + { + "label": "created", + "source": ["aname"], + "target": ["bname"], + "input": { + "type": "file", + "path": "src/test/resources/edge_created.csv", + "format": "CSV", + "charset": "GBK" + }, + "mapping": { + "aname": "name", + "bname": "name" + } + } + ] +} \ No newline at end of file