Skip to content

Commit

Permalink
Let TaskScheduler pause when under loading mode
Browse files Browse the repository at this point in the history
Change-Id: I593ee46a2a14af18f9105d84249da68592556dde
  • Loading branch information
Linary committed Apr 15, 2021
1 parent ec974d0 commit e9383a7
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public static synchronized ServerOptions instance() {
"batch.max_write_threads",
"The maximum threads for batch writing, " +
"if the value is 0, the actual value will be set to " +
"batch.max_write_ratio * total-rest-threads.",
"batch.max_write_ratio * restserver.max_worker_threads.",
nonNegativeInt(),
0);

Expand Down
2 changes: 1 addition & 1 deletion hugegraph-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-common</artifactId>
<version>1.8.4</version>
<version>1.8.5</version>
</dependency>

<!-- tinkerpop -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,15 @@ public GraphMode mode() {
public void mode(GraphMode mode) {
LOG.info("Graph {} will work in {} mode", this, mode);
this.mode = mode;
if (mode.loading()) {
/*
* NOTE: This may block tasks submit and lead the queue to be full,
* so don't submit gremlin job when loading data
*/
this.taskManager.pauseScheduledThreadPool();
} else {
this.taskManager.resumeScheduledThreadPool();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -33,6 +32,7 @@

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.concurrent.PausableScheduledThreadPool;
import com.baidu.hugegraph.util.Consumers;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.ExecutorUtil;
Expand All @@ -59,7 +59,7 @@ public final class TaskManager {
private final ExecutorService taskExecutor;
private final ExecutorService taskDbExecutor;
private final ExecutorService serverInfoDbExecutor;
private final ScheduledExecutorService schedulerExecutor;
private final PausableScheduledThreadPool schedulerExecutor;

public static TaskManager instance() {
return MANAGER;
Expand All @@ -76,7 +76,7 @@ private TaskManager(int pool) {
this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool(
1, SERVER_INFO_DB_WORKER);
// For schedule task to run, just one thread is ok
this.schedulerExecutor = ExecutorUtil.newScheduledThreadPool(
this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
1, TASK_SCHEDULER);
// Start after 10s waiting for HugeGraphServer startup
this.schedulerExecutor.scheduleWithFixedDelay(this::scheduleOrExecuteJob,
Expand Down Expand Up @@ -155,6 +155,14 @@ private void closeSchedulerTx(HugeGraphParams graph) {
}
}

public void pauseScheduledThreadPool() {
this.schedulerExecutor.pauseSchedule();
}

public void resumeScheduledThreadPool() {
this.schedulerExecutor.resumeSchedule();
}

public TaskScheduler getScheduler(HugeGraphParams graph) {
return this.schedulers.get(graph);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ restserver.url=http://127.0.0.1:8080
# graphs list with pair NAME:CONF_PATH
graphs=[hugegraph:conf/hugegraph.properties]

# The maximum thread ratio for batch writing, only take effect if the batch.max_write_threads is 0
batch.max_write_ratio=80
batch.max_write_threads=0

# authentication
#auth.authenticator=
#auth.admin_token=
Expand Down

0 comments on commit e9383a7

Please sign in to comment.