diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java index 9a7b3ee978..a9edc8297a 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftNode.java @@ -285,6 +285,13 @@ public void onError(PeerId peer, Status status) { } } + @Override + public void onBusy(PeerId peer, Status status) { + // NOTE: Jraft itself doesn't have this callback, it's added by us + int count = RaftNode.this.busyCounter.incrementAndGet(); + LOG.info("Increase busy counter: [{}]", count); + } + private boolean isWriteBufferOverflow(Status status) { String expectMsg = "maybe write overflow"; return RaftError.EINTERNAL == status.getRaftError() && diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java index e4c7f44385..56ee6c0548 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java @@ -25,9 +25,9 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.io.FileUtils; @@ -362,7 +362,7 @@ private ExecutorService createBackendExecutor(int threads) { private static ExecutorService newPool(int coreThreads, int maxThreads, String name, RejectedExecutionHandler handler) { - BlockingQueue workQueue = new LinkedBlockingQueue<>(); + BlockingQueue workQueue = new SynchronousQueue<>(); return ThreadPoolUtil.newBuilder() .poolName(name) .enableMetric(false) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java index 2b8981015c..2689b713f4 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Future; import org.slf4j.Logger; @@ -103,8 +104,11 @@ public void onApply(Iterator iter) { LOG.debug("Node role: {}", this.node().selfIsLeader() ? "leader" : "follower"); StoreClosure closure = null; + List> futures = new ArrayList<>(); + int count = 0; try { while (iter.hasNext()) { + count++; closure = (StoreClosure) iter.done(); if (closure != null) { // Leader just take it out from the closure @@ -124,7 +128,7 @@ public void onApply(Iterator iter) { byte[] bytes = iter.getData().array(); // Follower seems no way to wait future // Let the backend thread do it directly - this.context.backendExecutor().submit(() -> { + futures.add(this.context.backendExecutor().submit(() -> { BytesBuffer buffer = LZ4Util.decompress(bytes, RaftSharedContext.BLOCK_SIZE); buffer.forReadWritten(); @@ -137,10 +141,15 @@ public void onApply(Iterator iter) { action, e); throw new BackendException("Backend error", e); } - }); + })); } iter.next(); } + System.out.println("futures size: " + count); + // Follower wait tasks finished + for (Future future : futures) { + future.get(); + } } catch (Throwable e) { LOG.error("StateMachine occured critical error", e); Status status = new Status(RaftError.ESTATEMACHINE, @@ -150,6 +159,7 @@ public void onApply(Iterator iter) { closure.failure(status, e); } // Will cause current node inactive + // TODO: rollback to correct index iter.setErrorAndRollback(1L, status); } } @@ -253,6 +263,7 @@ public void onConfigurationCommitted(Configuration conf) { @Override public void onError(final RaftException e) { + // If busy, spin and wait a moment LOG.error("Raft error: {}", e.getMessage(), e); } } diff --git a/hugegraph-dist/src/assembly/static/bin/raft-tools.sh b/hugegraph-dist/src/assembly/static/bin/raft-tools.sh index aa5faa2e59..f386571e80 100755 --- a/hugegraph-dist/src/assembly/static/bin/raft-tools.sh +++ b/hugegraph-dist/src/assembly/static/bin/raft-tools.sh @@ -86,46 +86,38 @@ function remove_peer() { curl -X POST ${url} } -while [[ $# -gt 0 ]]; do - case $1 in - # help - --help|-h) - print_usage - shift - ;; - # list-peers - --list-peers|-l) - list_peers $2 - shift 2 - ;; - # get-leader - --get-leader|-g) - get_leader $2 - shift 2 - ;; - # set-leader - --set-leader|-s) - set_leader $2 $3 - shift 3 - ;; - # transfer-leader - --transfer-leader|-t) - transfer_leader $2 $3 - shift 3 - ;; - # add-peer - --add-peer|-a) - add_peer $2 $3 - shift 3 - ;; - # remove-peer - --remove-peer|-r) - remove_peer $2 $3 - shift 3 - ;; - *) - print_usage - exit 0 - ;; - esac -done +case $1 in + # help + --help|-h) + print_usage + ;; + # list-peers + --list-peers|-l) + list_peers $2 + ;; + # get-leader + --get-leader|-g) + get_leader $2 + ;; + # set-leader + --set-leader|-s) + set_leader $2 $3 + ;; + # transfer-leader + --transfer-leader|-t) + transfer_leader $2 $3 + ;; + # add-peer + --add-peer|-a) + add_peer $2 $3 + ;; + # remove-peer + --remove-peer|-r) + remove_peer $2 $3 + ;; + *) + print_usage + exit 0 + ;; +esac +echo ""