Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(analytical): Support using customized writable as vdata,edata for Giraph apps #3873

Merged
merged 16 commits into from
Jun 5, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.example.giraph;

import com.alibaba.graphscope.example.giraph.writable.MultipleLongWritable;

import org.apache.giraph.conf.LongConfOption;
import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.LongWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Objects;

/**
* Only send msg.
*/
public class MessageAppWithUserWritable
extends BasicComputation<
LongWritable, MultipleLongWritable, MultipleLongWritable, MultipleLongWritable> {

public static LongConfOption MAX_SUPER_STEP;
private static Logger logger = LoggerFactory.getLogger(MessageAppWithUserWritable.class);

static {
String maxSuperStep = System.getenv("MAX_SUPER_STEP");
if (Objects.isNull(maxSuperStep) || maxSuperStep.isEmpty()) {
MAX_SUPER_STEP = new LongConfOption("maxSuperStep", 3, "max super step");
} else {
MAX_SUPER_STEP =
new LongConfOption(
"maxSuperStep", Long.valueOf(maxSuperStep), "max super step");
}
}

/**
* Must be defined by user to do computation on a single Vertex.
*
* @param vertex Vertex
* @param messages Messages that were sent to this vertex in the previous superstep. Each
* message is only guaranteed to have
*/
@Override
public void compute(
Vertex<LongWritable, MultipleLongWritable, MultipleLongWritable> vertex,
Iterable<MultipleLongWritable> messages)
throws IOException {
if (getSuperstep() == 0) {
// logger.info("There should be no messages in step0, " + vertex.getId());
boolean flag = false;
for (MultipleLongWritable message : messages) {
flag = true;
}
if (flag) {
throw new IllegalStateException(
"Expect no msg received in step 1, but actually received");
}
MultipleLongWritable msg = new MultipleLongWritable(vertex.getId().get());
sendMessageToAllEdges(vertex, msg);
} else if (getSuperstep() < MAX_SUPER_STEP.get(getConf())) {
if (vertex.getId().get() < 20) {
logger.info("step [{}] Checking received msg", getSuperstep());
}
int msgCnt = 0;
for (MultipleLongWritable message : messages) {
msgCnt += 1;
}
vertex.setValue(new MultipleLongWritable(msgCnt));
} else if (getSuperstep() == MAX_SUPER_STEP.get(getConf())) {
vertex.voteToHalt();
} else {
logger.info("Impossible: " + getSuperstep());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.example.giraph.format;

import com.alibaba.graphscope.example.giraph.writable.MultipleLongWritable;

import org.apache.giraph.io.EdgeReader;
import org.apache.giraph.io.formats.TextEdgeInputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class P2PEdgeMultipleLongInputFormat
extends TextEdgeInputFormat<LongWritable, MultipleLongWritable> {

/**
* Create an edge reader for a given split. The framework will call {@link
* EdgeReader#initialize(InputSplit, TaskAttemptContext)} before the split is used.
*
* @param split the split to be read
* @param context the information about the task
* @return a new record reader
* @throws IOException
*/
@Override
public EdgeReader<LongWritable, MultipleLongWritable> createEdgeReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new P2PEdgeReader();
}

public class P2PEdgeReader extends TextEdgeReaderFromEachLineProcessed<String[]> {

String SEPARATOR = " ";
/**
* Cached vertex id for the current line
*/
private LongWritable srcId;

private LongWritable dstId;
private MultipleLongWritable edgeValue;

/**
* Preprocess the line so other methods can easily read necessary information for creating
* edge
*
* @param line the current line to be read
* @return the preprocessed object
* @throws IOException exception that can be thrown while reading
*/
@Override
protected String[] preprocessLine(Text line) throws IOException {
// logger.debug("line: " + line.toString());
String[] tokens = line.toString().split(SEPARATOR);
if (tokens.length != 3) {
throw new IllegalStateException("expect 3 ele in edge line");
}
// logger.debug(String.join(",", tokens));
srcId = new LongWritable(Long.parseLong(tokens[0]));
dstId = new LongWritable(Long.parseLong(tokens[1]));
edgeValue = new MultipleLongWritable(Long.parseLong(tokens[2]));
return tokens;
}

/**
* Reads target vertex id from the preprocessed line.
*
* @param line the object obtained by preprocessing the line
* @return the target vertex id
* @throws IOException exception that can be thrown while reading
*/
@Override
protected LongWritable getTargetVertexId(String[] line) throws IOException {
return dstId;
}

/**
* Reads source vertex id from the preprocessed line.
*
* @param line the object obtained by preprocessing the line
* @return the source vertex id
* @throws IOException exception that can be thrown while reading
*/
@Override
protected LongWritable getSourceVertexId(String[] line) throws IOException {
return srcId;
}

/**
* Reads edge value from the preprocessed line.
*
* @param line the object obtained by preprocessing the line
* @return the edge value
* @throws IOException exception that can be thrown while reading
*/
@Override
protected MultipleLongWritable getValue(String[] line) throws IOException {
return edgeValue;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.example.giraph.format;

import com.alibaba.graphscope.example.giraph.writable.MultipleLongWritable;
import com.google.common.collect.Lists;

import org.apache.giraph.edge.Edge;
import org.apache.giraph.io.formats.TextVertexInputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;
import java.util.List;

public class P2PVertexMultipleLongInputFormat
extends TextVertexInputFormat<LongWritable, MultipleLongWritable, MultipleLongWritable> {

/**
* The factory method which produces the {@link TextVertexReader} used by this input format.
*
* @param split the split to be read
* @param context the information about the task
* @return the text vertex reader to be used
*/
@Override
public TextVertexInputFormat<LongWritable, MultipleLongWritable, MultipleLongWritable>
.TextVertexReader
createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException {
return new P2PVertexReader();
}

public class P2PVertexReader extends TextVertexReaderFromEachLineProcessed<String[]> {

String SEPARATOR = " ";

/**
* Cached vertex id for the current line
*/
private LongWritable id;

private MultipleLongWritable value;

@Override
protected String[] preprocessLine(Text line) throws IOException {
// logger.debug("line: " + line.toString());
String[] tokens = line.toString().split(SEPARATOR);
// logger.debug(String.join(",", tokens));
id = new LongWritable(Long.parseLong(tokens[0]));
value = new MultipleLongWritable(Long.parseLong(tokens[1]));
return tokens;
}

@Override
protected LongWritable getId(String[] tokens) throws IOException {
return id;
}

@Override
protected MultipleLongWritable getValue(String[] tokens) throws IOException {
return value;
}

@Override
protected Iterable<Edge<LongWritable, MultipleLongWritable>> getEdges(String[] tokens)
throws IOException {
List<Edge<LongWritable, MultipleLongWritable>> edges =
Lists.newArrayListWithCapacity(0);
return edges;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.fastjson.JSONObject;
import com.alibaba.graphscope.communication.Communicator;
import com.alibaba.graphscope.ds.GSVertexArray;
import com.alibaba.graphscope.ds.StringView;
import com.alibaba.graphscope.factory.GiraphComputationFactory;
import com.alibaba.graphscope.fragment.IFragment;
import com.alibaba.graphscope.graph.AggregatorManager;
Expand All @@ -39,6 +40,7 @@
import com.alibaba.graphscope.serialization.FFIByteVectorInputStream;
import com.alibaba.graphscope.serialization.FFIByteVectorOutputStream;
import com.alibaba.graphscope.stdcxx.FFIByteVector;
import com.alibaba.graphscope.stdcxx.StdString;
import com.alibaba.graphscope.utils.ConfigurationUtils;
import com.alibaba.graphscope.utils.FFITypeFactoryhelper;

Expand Down Expand Up @@ -152,102 +154,118 @@
logger.info("set class loader search path {}", urlsToString(classLoader.getURLs()));
}

public void writeBackVertexData() {
ImmutableClassesGiraphConfiguration conf = userComputation.getConf();
logger.info("Writing back vertex data of type back to c++ context ");
if (Objects.isNull(vertexDataManager)) {
throw new IllegalStateException("expect a non null vertex data manager");
}
// GSVertexArray<VDATA_T> contextVdata = data();
if (Objects.isNull(vertexArray) || vertexArray.getAddress() == 0) {
throw new IllegalStateException("GS vertex array empty");
}
// Generate a byte stream contains all vertex data.
FFIByteVectorOutputStream outputStream = new FFIByteVectorOutputStream();
long[] offsets = new long[(int) innerVerticesNum];
long maxOffset = 0;
{
long previous = 0;
try {
if (conf.getGrapeVdataClass().equals(String.class)) {
for (long lid = 0; lid < innerVerticesNum; ++lid) {
vertexDataManager.getVertexData(lid).write(outputStream);
long cur = outputStream.bytesWriten();
offsets[(int) lid] = cur - previous;
maxOffset = Math.max(offsets[(int) lid], maxOffset);
previous = cur;
}
} else {
for (long lid = 0; lid < innerVerticesNum; ++lid) {
vertexDataManager.getVertexData(lid).write(outputStream);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
outputStream.finishSetting();
FFIByteVector vector = outputStream.getVector();
FFIByteVectorInputStream inputStream = new FFIByteVectorInputStream(vector);

com.alibaba.graphscope.ds.Vertex<VID_T> grapeVertex =
(com.alibaba.graphscope.ds.Vertex<VID_T>)
FFITypeFactoryhelper.newVertex(conf.getGrapeVidClass());
try {
if (conf.getGrapeVdataClass().equals(Long.class)) {
for (long lid = 0; lid < innerVerticesNum; ++lid) {
grapeVertex.setValue((VID_T) (Long) lid);
if (inputStream.longAvailable() <= 0) {
throw new IllegalStateException(
"Input stream too short for " + innerVerticesNum + " vertices");
}
long value = inputStream.readLong();
vertexArray.setValue(grapeVertex, value);
}
} else if (conf.getGrapeVdataClass().equals(Integer.class)) {
for (long lid = 0; lid < innerVerticesNum; ++lid) {
grapeVertex.setValue((VID_T) (Long) lid);
if (inputStream.longAvailable() <= 0) {
throw new IllegalStateException(
"Input stream too short for " + innerVerticesNum + " vertices");
}
int value = inputStream.readInt();
vertexArray.setValue(grapeVertex, value);
}
} else if (conf.getGrapeVdataClass().equals(Double.class)) {
for (long lid = 0; lid < innerVerticesNum; ++lid) {
grapeVertex.setValue((VID_T) (Long) lid);
if (inputStream.longAvailable() <= 0) {
throw new IllegalStateException(
"Input stream too short for " + innerVerticesNum + " vertices");
}
double value = inputStream.readDouble();
vertexArray.setValue(grapeVertex, value);
}
} else if (conf.getGrapeVdataClass().equals(Float.class)) {
for (long lid = 0; lid < innerVerticesNum; ++lid) {
grapeVertex.setValue((VID_T) (Long) lid);
if (inputStream.longAvailable() <= 0) {
throw new IllegalStateException(
"Input stream too short for " + innerVerticesNum + " vertices");
}
float value = inputStream.readFloat();
vertexArray.setValue(grapeVertex, value);
}
} else if (conf.getGrapeVdataClass().equals(String.class)) {
byte[] bytes = new byte[(int) maxOffset];
for (long lid = 0; lid < innerVerticesNum; ++lid) {
grapeVertex.setValue((VID_T) (Long) lid);
if (inputStream.longAvailable() <= 0) {
throw new IllegalStateException(
"Input stream too short for " + innerVerticesNum + " vertices");
}
if (inputStream.read(bytes, 0, (int) offsets[(int) lid]) == -1) {
throw new IllegalStateException("read input stream failed");
}
// This string is not readable.
vertexArray.setValue(grapeVertex, new String(bytes));
}
} else if (conf.getGrapeVdataClass().equals(StringView.class)) {
byte[] bytes = new byte[(int) maxOffset];
for (long lid = 0; lid < innerVerticesNum; ++lid) {
grapeVertex.setValue((VID_T) (Long) lid);
if (inputStream.longAvailable() <= 0) {
throw new IllegalStateException(
"Input stream too short for " + innerVerticesNum + " vertices");
}
if (inputStream.read(bytes, 0, (int) offsets[(int) lid]) == -1) {
throw new IllegalStateException("read input stream failed");
}
// This string is not readable.
StdString value = (StdString) vertexArray.get(grapeVertex);
// TODO: can be optimized without creating a java string

Check warning on line 266 in analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/context/GiraphComputationAdaptorContext.java

View check run for this annotation

codefactor.io / CodeFactor

analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/context/GiraphComputationAdaptorContext.java#L266

Resolve unexpected comment. (com.puppycrawl.tools.checkstyle.checks.TodoCommentCheck)
value.fromJavaString(new String(bytes));
}
} else {
throw new IllegalStateException(
"Unrecognized vdata class:" + conf.getGrapeVdataClass().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private FFIByteVectorInputStream generateVertexIdStream() {
}
outputStream.finishSetting();
logger.info(
"Vertex data stream size: "
"Vertex id stream size: "
+ outputStream.bytesWriten()
+ ", vertices: "
+ vertexNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.alibaba.graphscope.graph.impl;

import com.alibaba.graphscope.ds.StringView;
import com.alibaba.graphscope.ds.Vertex;
import com.alibaba.graphscope.fragment.IFragment;
import com.alibaba.graphscope.graph.VertexDataManager;
Expand Down Expand Up @@ -106,55 +107,60 @@
return new FFIByteVectorInputStream(outputStream.getVector());
}

private void readVertexDataFromIFragment(FFIByteVectorOutputStream outputStream) {
try {
// We need to form all vdata as a stream, so java writables can read from this stream.
Iterable<Vertex<GRAPE_VID_T>> iterable;
if (conf.getGrapeVidClass().equals(Long.class)) {
iterable = (Iterable<Vertex<GRAPE_VID_T>>) fragment.vertices().longIterable();
} else if (conf.getGrapeVidClass().equals(Integer.class)) {
iterable = (Iterable<Vertex<GRAPE_VID_T>>) fragment.vertices().intIterable();
} else {
throw new IllegalStateException(
"No recognizable vid" + conf.getGrapeVidClass().getName());
}
if (conf.getGrapeVdataClass().equals(Long.class)) {
for (Vertex<GRAPE_VID_T> vertex : iterable) {
Long value = (Long) fragment.getData(vertex);
outputStream.writeLong(value);
}
} else if (conf.getGrapeVdataClass().equals(Integer.class)) {
for (Vertex<GRAPE_VID_T> vertex : iterable) {
Integer value = (Integer) fragment.getData(vertex);
outputStream.writeInt(value);
}
} else if (conf.getGrapeVdataClass().equals(Double.class)) {
for (Vertex<GRAPE_VID_T> vertex : iterable) {
Double value = (Double) fragment.getData(vertex);
outputStream.writeDouble(value);
}
} else if (conf.getGrapeVdataClass().equals(Float.class)) {
for (Vertex<GRAPE_VID_T> vertex : iterable) {
Float value = (Float) fragment.getData(vertex);
outputStream.writeFloat(value);
}
} else if (conf.getGrapeVdataClass().equals(String.class)) {
for (Vertex<GRAPE_VID_T> vertex : iterable) {
String value = (String) fragment.getData(vertex);
outputStream.writeBytes(value);
}
} else if (conf.getGrapeVdataClass().equals(StringView.class)) {
for (Vertex<GRAPE_VID_T> vertex : iterable) {
StringView value = (StringView) fragment.getData(vertex);
outputStream.writeBytes(value);
}
} else {
logger.error("Unsupported oid class: " + conf.getGrapeOidClass().getName());
logger.error("Unsupported vdata class: " + conf.getGrapeVdataClass().getName());
}
// else if (conf.getGrapeVdataClass().equals the userDefined class...
outputStream.finishSetting();
logger.info(
"Vertex data stream size: "
+ outputStream.bytesWriten()
+ ", vertices: "
+ vertexNum);
} catch (IOException e) {
e.printStackTrace();
}
}
}

Check notice on line 166 in analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/graph/impl/VertexDataManagerImpl.java

View check run for this annotation

codefactor.io / CodeFactor

analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/graph/impl/VertexDataManagerImpl.java#L110-L166

Complex Method
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.graphscope.ds.StringView;
import com.alibaba.graphscope.fragment.IFragment;

import org.apache.giraph.combiner.MessageCombiner;
Expand Down Expand Up @@ -117,152 +118,152 @@
* @param giraphConfiguration configuration to set.
* @param jsonObject input json params
*/
public static void parseArgs(
final GiraphConfiguration giraphConfiguration, JSONObject jsonObject)
throws ClassNotFoundException {
// String user_jar_path = System.getenv("USER_JAR_PATH");
// if (Objects.nonNull(user_jar_path) && !user_jar_path.isEmpty()){
// logger.info("user jar path: " + user_jar_path);
// giraphConfiguration.setClassLoader(new
// URLClassLoader(classPath2URLArray(user_jar_path)));
// }
ClassLoader loader = giraphConfiguration.getClassLoader();

if (jsonObject.containsKey(WORKER_CONTEXT_CLASS_STR)
&& !jsonObject.getString(WORKER_CONTEXT_CLASS_STR).isEmpty()) {
giraphConfiguration.setWorkerContextClass(
(Class<? extends WorkerContext>)
loader.loadClass(jsonObject.getString(WORKER_CONTEXT_CLASS_STR)));
logger.info(
"Setting worker context class: "
+ jsonObject.getString(WORKER_CONTEXT_CLASS_STR));
} else {
// set the default worker context class
giraphConfiguration.setWorkerContextClass(DefaultWorkerContext.class);
logger.info("Setting worker context class: " + DefaultWorkerContext.class.getName());
}

if (jsonObject.containsKey(APP_CLASS_STR)
&& !jsonObject.getString(APP_CLASS_STR).isEmpty()) {
giraphConfiguration.setComputationClass(
(Class<? extends AbstractComputation>)
loader.loadClass(jsonObject.getString(APP_CLASS_STR)));
logger.info("Setting app class: " + jsonObject.getString(APP_CLASS_STR));
} else {
logger.info("No computation class defined");
}

if (jsonObject.containsKey(VERTEX_INPUT_FORMAT_CLASS_STR)
&& !jsonObject.getString(VERTEX_INPUT_FORMAT_CLASS_STR).isEmpty()) {
giraphConfiguration.setVertexInputFormatClass(
(Class<? extends VertexInputFormat>)
loader.loadClass(
trim(jsonObject.getString(VERTEX_INPUT_FORMAT_CLASS_STR))));
logger.info(
"Setting vertex input format class: "
+ jsonObject.getString(VERTEX_INPUT_FORMAT_CLASS_STR));
} else {
logger.error("No vertex input format class found");
}

if (jsonObject.containsKey(EDGE_INPUT_FORMAT_CLASS_STR)
&& !jsonObject.getString(EDGE_INPUT_FORMAT_CLASS_STR).isEmpty()) {
giraphConfiguration.setEdgeInputFormatClass(
(Class<? extends EdgeInputFormat>)
loader.loadClass(
trim(jsonObject.getString(EDGE_INPUT_FORMAT_CLASS_STR))));
logger.info(
"Setting edge input format class: "
+ jsonObject.getString(EDGE_INPUT_FORMAT_CLASS_STR));
} else {
logger.info("No edge input format class found");
}

// Vertex output format
if (jsonObject.containsKey(VERTEX_OUTPUT_FORMAT_CLASS_STR)
&& !jsonObject.getString(VERTEX_OUTPUT_FORMAT_CLASS_STR).isEmpty()) {
giraphConfiguration.setVertexOutputFormatClass(
(Class<? extends VertexOutputFormat>)
loader.loadClass(
trim(jsonObject.getString(VERTEX_OUTPUT_FORMAT_CLASS_STR))));
logger.info(
"Setting vertex input format class: "
+ jsonObject.getString(VERTEX_OUTPUT_FORMAT_CLASS_STR));
} else {
logger.info("No vertex output format class found, using default one.");
}

// Vertex subdir
if (jsonObject.containsKey(VERTEX_OUTPUT_FORMAT_SUBDIR_STR)
&& !jsonObject.getString(VERTEX_OUTPUT_FORMAT_SUBDIR_STR).isEmpty()) {
giraphConfiguration.setVertexOutputFormatSubdir(
jsonObject.getString(VERTEX_OUTPUT_FORMAT_SUBDIR_STR));
logger.info(
"Setting vertex output format subdir to "
+ jsonObject.getString(VERTEX_OUTPUT_FORMAT_SUBDIR_STR));
} else {
logger.info("No vertex output format subdir specified, output to current dir");
}
// Vertex output path
if (jsonObject.containsKey(VERTEX_OUTPUT_PATH_STR)
&& !jsonObject.getString(VERTEX_OUTPUT_PATH_STR).isEmpty()) {
giraphConfiguration.setVertexOutputPath(jsonObject.getString(VERTEX_OUTPUT_PATH_STR));
logger.info("Setting output path to: " + jsonObject.getString(VERTEX_OUTPUT_PATH_STR));
}

// For fast speed, we may just use the minimum combiner.
if (jsonObject.containsKey(MESSAGE_COMBINER_CLASS_STR)
&& !jsonObject.getString(MESSAGE_COMBINER_CLASS_STR).isEmpty()) {
giraphConfiguration.setMessageCombinerClass(
(Class<? extends MessageCombiner>)
loader.loadClass(jsonObject.getString(MESSAGE_COMBINER_CLASS_STR)));
logger.info(
"Setting message combiner class to : "
+ jsonObject.getString(MESSAGE_COMBINER_CLASS_STR));
}

// master compute class
if (jsonObject.containsKey(MASTER_COMPUTE_CLASS_STR)
&& !jsonObject.getString(MASTER_COMPUTE_CLASS_STR).isEmpty()) {
giraphConfiguration.setMasterComputeClass(
(Class<? extends MasterCompute>)
loader.loadClass(jsonObject.getString(MASTER_COMPUTE_CLASS_STR)));
logger.info(
"Setting master compute class: "
+ jsonObject.getString(MASTER_COMPUTE_CLASS_STR));
}

// Parse edge manager type
if (jsonObject.containsKey(EDGE_MANAGER_STR)
&& !jsonObject.getString(EDGE_MANAGER_STR).isEmpty()) {
String edgeManagerType = jsonObject.getString(EDGE_MANAGER_STR);
giraphConfiguration.setEdgeManager(edgeManagerType);
logger.info("Using edge manager of type [{}]", edgeManagerType);
} else {
logger.info("Using default message manager type", EDGE_MANAGER.getDefaultValue());
}

// for other user-defined keys, we put then in configuration.
Set<String> keysSet = jsonObject.keySet();
for (String str : keysSet) {
if (!preservedKeysSet.contains(str)) {
logger.info("Found user defined params: {}: {}", str, jsonObject.getString(str));
giraphConfiguration.set(str, jsonObject.getString(str));
}
}
}

/**
* Get a class which is parameterized by the graph types defined by user. The types holder is
* actually an interface that any class which holds all of Giraph types can implement. It is
* used with reflection to infer the Giraph types.
*
* <p>The current order of type holders we try are: 1) The {@link TypesHolder} class directly.
* 2) The {@link org.apache.giraph.graph.Computation} class, as that holds all the types.
*
* @param conf Configuration
* @return {@link TypesHolder} or null if could not find one.
*/

Check notice on line 266 in analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/utils/ConfigurationUtils.java

View check run for this annotation

codefactor.io / CodeFactor

analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/utils/ConfigurationUtils.java#L121-L266

Complex Method
public static Class<? extends TypesHolder> getTypesHolderClass(Configuration conf) {
Class<? extends TypesHolder> klass = TYPES_HOLDER_CLASS.get(conf);
if (klass != null) {
Expand Down Expand Up @@ -295,6 +296,9 @@
if (grapeTypeClass.equals(Float.class)) {
return giraphTypeClass.equals(FloatWritable.class);
}
if (grapeTypeClass.equals(String.class) || grapeTypeClass.equals(StringView.class)) {
return true;
}
logger.error(
"Unsupported grape type and giraph type: "
+ grapeTypeClass.getName()
Expand Down
Loading
Loading