Skip to content

Commit

Permalink
Some improve for raft backend
Browse files Browse the repository at this point in the history
1. Add a callback 'onBusy' used to adaptive rate limit
2. Let TaskScheduler pause when under loading mode
3. Fix bug of raft-tools script

Change-Id: I03859dbc9dc2ae71a2a47f3072d65276565ebba0
  • Loading branch information
Linary committed Mar 22, 2021
1 parent 4c4b770 commit ad2cc92
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 51 deletions.
2 changes: 1 addition & 1 deletion hugegraph-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<dependency>
<groupId>com.baidu.hugegraph</groupId>
<artifactId>hugegraph-common</artifactId>
<version>1.8.1</version>
<version>1.8.5</version>
</dependency>

<!-- tinkerpop -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ public GraphMode mode() {
public void mode(GraphMode mode) {
LOG.info("Graph {} will work in {} mode", this, mode);
this.mode = mode;
if (mode.loading()) {
this.taskManager.pauseScheduledThreadPool();
} else {
this.taskManager.resumeScheduledThreadPool();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ public RaftNodeStateListener() {
}

@Override

public void onCreated(PeerId peer) {
LOG.info("The node {} replicator has created", peer);
}
Expand All @@ -285,6 +286,13 @@ public void onError(PeerId peer, Status status) {
}
}

@Override
public void onBusy(PeerId peer, Status status) {
// NOTE: Jraft itself doesn't have this callback, it's added by us
int count = RaftNode.this.busyCounter.incrementAndGet();
LOG.info("Increase busy counter: [{}]", count);
}

private boolean isWriteBufferOverflow(Status status) {
String expectMsg = "maybe write overflow";
return RaftError.EINTERNAL == status.getRaftError() &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import java.nio.file.Paths;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;

import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -352,7 +352,7 @@ private ExecutorService createBackendExecutor(int threads) {
private static ExecutorService newPool(int coreThreads, int maxThreads,
String name,
RejectedExecutionHandler handler) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
return ThreadPoolUtil.newBuilder()
.poolName(name)
.enableMetric(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class StoreSerializer {
public static byte[] writeMutations(List<BackendMutation> mutations) {
int estimateSize = mutations.size() * MUTATION_SIZE;
// The first two bytes are reserved for StoreType and StoreAction
// TODO: pooled BytesBuffer
BytesBuffer buffer = BytesBuffer.allocate(StoreCommand.HEADER_SIZE +
4 + estimateSize);
StoreCommand.writeHeader(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

import org.slf4j.Logger;

Expand Down Expand Up @@ -112,6 +113,7 @@ public void onApply(Iterator iter) {
LOG.debug("Node role: {}", this.node().selfIsLeader() ?
"leader" : "follower");
StoreClosure closure = null;
List<Future<?>> futures = new ArrayList<>();
try {
while (iter.hasNext()) {
closure = (StoreClosure) iter.done();
Expand All @@ -133,7 +135,7 @@ public void onApply(Iterator iter) {
byte[] bytes = iter.getData().array();
// Follower seems no way to wait future
// Let the backend thread do it directly
this.context.backendExecutor().submit(() -> {
futures.add(this.context.backendExecutor().submit(() -> {
BytesBuffer buffer = LZ4Util.decompress(bytes,
RaftSharedContext.BLOCK_SIZE);
buffer.forReadWritten();
Expand All @@ -146,10 +148,14 @@ public void onApply(Iterator iter) {
action, e);
throw new BackendException("Backend error", e);
}
});
}));
}
iter.next();
}
// Follower wait tasks finished
for (Future<?> future : futures) {
future.get();
}
} catch (Throwable e) {
LOG.error("StateMachine occured critical error", e);
Status status = new Status(RaftError.ESTATEMACHINE,
Expand All @@ -159,6 +165,7 @@ public void onApply(Iterator iter) {
closure.failure(status, e);
}
// Will cause current node inactive
// TODO: rollback to correct index
iter.setErrorAndRollback(1L, status);
}
}
Expand Down Expand Up @@ -262,6 +269,7 @@ public void onConfigurationCommitted(Configuration conf) {

@Override
public void onError(final RaftException e) {
// If busy, spin and wait a moment
LOG.error("Raft error: {}", e.getMessage(), e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -33,6 +32,7 @@

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.concurrent.PausableScheduledThreadPool;
import com.baidu.hugegraph.util.Consumers;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.ExecutorUtil;
Expand All @@ -59,7 +59,7 @@ public final class TaskManager {
private final ExecutorService taskExecutor;
private final ExecutorService taskDbExecutor;
private final ExecutorService serverInfoDbExecutor;
private final ScheduledExecutorService schedulerExecutor;
private final PausableScheduledThreadPool schedulerExecutor;

public static TaskManager instance() {
return MANAGER;
Expand All @@ -76,7 +76,7 @@ private TaskManager(int pool) {
this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool(
1, SERVER_INFO_DB_WORKER);
// For schedule task to run, just one thread is ok
this.schedulerExecutor = ExecutorUtil.newScheduledThreadPool(
this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
1, TASK_SCHEDULER);
// Start after 10s waiting for HugeGraphServer startup
this.schedulerExecutor.scheduleWithFixedDelay(this::scheduleOrExecuteJob,
Expand Down Expand Up @@ -155,6 +155,14 @@ private void closeSchedulerTx(HugeGraphParams graph) {
}
}

public void pauseScheduledThreadPool() {
this.schedulerExecutor.pauseSchedule();
}

public void resumeScheduledThreadPool() {
this.schedulerExecutor.resumeSchedule();
}

public TaskScheduler getScheduler(HugeGraphParams graph) {
return this.schedulers.get(graph);
}
Expand Down
78 changes: 35 additions & 43 deletions hugegraph-dist/src/assembly/static/bin/raft-tools.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,46 +86,38 @@ function remove_peer() {
curl -X POST ${url}
}

while [[ $# -gt 0 ]]; do
case $1 in
# help
--help|-h)
print_usage
shift
;;
# list-peers
--list-peers|-l)
list_peers $2
shift 2
;;
# get-leader
--get-leader|-g)
get_leader $2
shift 2
;;
# set-leader
--set-leader|-s)
set_leader $2 $3
shift 3
;;
# transfer-leader
--transfer-leader|-t)
transfer_leader $2 $3
shift 3
;;
# add-peer
--add-peer|-a)
add_peer $2 $3
shift 3
;;
# remove-peer
--remove-peer|-r)
remove_peer $2 $3
shift 3
;;
*)
print_usage
exit 0
;;
esac
done
case $1 in
# help
--help|-h)
print_usage
;;
# list-peers
--list-peers|-l)
list_peers $2
;;
# get-leader
--get-leader|-g)
get_leader $2
;;
# set-leader
--set-leader|-s)
set_leader $2 $3
;;
# transfer-leader
--transfer-leader|-t)
transfer_leader $2 $3
;;
# add-peer
--add-peer|-a)
add_peer $2 $3
;;
# remove-peer
--remove-peer|-r)
remove_peer $2 $3
;;
*)
print_usage
exit 0
;;
esac
echo ""
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ restserver.url=http://127.0.0.1:8080
# graphs list with pair NAME:CONF_PATH
graphs=[hugegraph:conf/hugegraph.properties]

batch.max_write_ratio=80

# authentication
#auth.authenticator=
#auth.admin_token=
Expand Down

0 comments on commit ad2cc92

Please sign in to comment.