Skip to content

Commit

Permalink
fix gremlin job result size gt cassandra limit and lt hugegraph limit (
Browse files Browse the repository at this point in the history
…#1334)

Change-Id: Ic2a246facffca683052bb56bf76ad21b5413d94a
  • Loading branch information
zhoney authored Jan 14, 2021
1 parent 59a680c commit 36c58b8
Showing 1 changed file with 30 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.baidu.hugegraph.task;

import java.util.Date;
import java.util.Set;
import java.util.concurrent.Callable;

import org.apache.tinkerpop.gremlin.structure.Transaction;
Expand All @@ -30,13 +31,29 @@
import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
import com.google.common.collect.ImmutableSet;

public abstract class TaskCallable<V> implements Callable<V> {

private static final Logger LOG = Log.logger(HugeTask.class);

private static final String ERROR_MAX_LEN = "Failed to commit changes: " +
"The max length of bytes is";
private static final String ERROR_COMMIT = "Failed to commit changes: ";
private static final Set<String> ERROR_MESSAGES = ImmutableSet.of(
/*
* "The max length of bytes is" exception message occurs when
* task input size exceeds TASK_INPUT_SIZE_LIMIT or task result size
* exceeds TASK_RESULT_SIZE_LIMIT
*/
"The max length of bytes is",
/*
* "Batch too large" exception message occurs when using
* cassandra store and task input size is in
* [batch_size_fail_threshold_in_kb, TASK_INPUT_SIZE_LIMIT) or
* task result size is in
* [batch_size_fail_threshold_in_kb, TASK_RESULT_SIZE_LIMIT)
*/
"Batch too large"
);

private HugeTask<V> task = null;
private HugeGraph graph = null;
Expand Down Expand Up @@ -98,7 +115,8 @@ protected void save() {
*/
LOG.error("Failed to save task with error \"{}\": {}",
e, task.asMap(false));
if (e.getMessage().contains(ERROR_MAX_LEN)) {
String message = e.getMessage();
if (message.contains(ERROR_COMMIT) && needSaveWithEx(message)) {
task.failSave(e);
this.graph().taskScheduler().save(task);
return;
Expand Down Expand Up @@ -138,6 +156,15 @@ public static <V> TaskCallable<V> fromClass(String className) {
}
}

private static boolean needSaveWithEx(String message) {
for (String error : ERROR_MESSAGES) {
if (message.contains(error)) {
return true;
}
}
return false;
}

public static <V> TaskCallable<V> empty(Exception e) {
return new TaskCallable<V>() {
@Override
Expand Down

0 comments on commit 36c58b8

Please sign in to comment.