Skip to content

Commit

Permalink
Add a callback 'onBusy' used to adaptive rate limit
Browse files Browse the repository at this point in the history
* Also fix bug of raft-tools script

Change-Id: I5e043cefd903efc01bf443d495aec3f34632c0fa
  • Loading branch information
Linary committed Apr 2, 2021
1 parent 39b9474 commit 35cd7de
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,13 @@ public void onError(PeerId peer, Status status) {
}
}

@Override
public void onBusy(PeerId peer, Status status) {
// NOTE: Jraft itself doesn't have this callback, it's added by us
int count = RaftNode.this.busyCounter.incrementAndGet();
LOG.info("Increase busy counter: [{}]", count);
}

private boolean isWriteBufferOverflow(Status status) {
String expectMsg = "maybe write overflow";
return RaftError.EINTERNAL == status.getRaftError() &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;

import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -362,7 +362,7 @@ private ExecutorService createBackendExecutor(int threads) {
private static ExecutorService newPool(int coreThreads, int maxThreads,
String name,
RejectedExecutionHandler handler) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();
return ThreadPoolUtil.newBuilder()
.poolName(name)
.enableMetric(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

import org.slf4j.Logger;

Expand Down Expand Up @@ -103,8 +104,11 @@ public void onApply(Iterator iter) {
LOG.debug("Node role: {}", this.node().selfIsLeader() ?
"leader" : "follower");
StoreClosure closure = null;
List<Future<?>> futures = new ArrayList<>();
int count = 0;
try {
while (iter.hasNext()) {
count++;
closure = (StoreClosure) iter.done();
if (closure != null) {
// Leader just take it out from the closure
Expand All @@ -124,7 +128,7 @@ public void onApply(Iterator iter) {
byte[] bytes = iter.getData().array();
// Follower seems no way to wait future
// Let the backend thread do it directly
this.context.backendExecutor().submit(() -> {
futures.add(this.context.backendExecutor().submit(() -> {
BytesBuffer buffer = LZ4Util.decompress(bytes,
RaftSharedContext.BLOCK_SIZE);
buffer.forReadWritten();
Expand All @@ -137,10 +141,15 @@ public void onApply(Iterator iter) {
action, e);
throw new BackendException("Backend error", e);
}
});
}));
}
iter.next();
}
System.out.println("futures size: " + count);
// Follower wait tasks finished
for (Future<?> future : futures) {
future.get();
}
} catch (Throwable e) {
LOG.error("StateMachine occured critical error", e);
Status status = new Status(RaftError.ESTATEMACHINE,
Expand All @@ -150,6 +159,7 @@ public void onApply(Iterator iter) {
closure.failure(status, e);
}
// Will cause current node inactive
// TODO: rollback to correct index
iter.setErrorAndRollback(1L, status);
}
}
Expand Down Expand Up @@ -253,6 +263,7 @@ public void onConfigurationCommitted(Configuration conf) {

@Override
public void onError(final RaftException e) {
// If busy, spin and wait a moment
LOG.error("Raft error: {}", e.getMessage(), e);
}
}
78 changes: 35 additions & 43 deletions hugegraph-dist/src/assembly/static/bin/raft-tools.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,46 +86,38 @@ function remove_peer() {
curl -X POST ${url}
}

while [[ $# -gt 0 ]]; do
case $1 in
# help
--help|-h)
print_usage
shift
;;
# list-peers
--list-peers|-l)
list_peers $2
shift 2
;;
# get-leader
--get-leader|-g)
get_leader $2
shift 2
;;
# set-leader
--set-leader|-s)
set_leader $2 $3
shift 3
;;
# transfer-leader
--transfer-leader|-t)
transfer_leader $2 $3
shift 3
;;
# add-peer
--add-peer|-a)
add_peer $2 $3
shift 3
;;
# remove-peer
--remove-peer|-r)
remove_peer $2 $3
shift 3
;;
*)
print_usage
exit 0
;;
esac
done
case $1 in
# help
--help|-h)
print_usage
;;
# list-peers
--list-peers|-l)
list_peers $2
;;
# get-leader
--get-leader|-g)
get_leader $2
;;
# set-leader
--set-leader|-s)
set_leader $2 $3
;;
# transfer-leader
--transfer-leader|-t)
transfer_leader $2 $3
;;
# add-peer
--add-peer|-a)
add_peer $2 $3
;;
# remove-peer
--remove-peer|-r)
remove_peer $2 $3
;;
*)
print_usage
exit 0
;;
esac
echo ""

0 comments on commit 35cd7de

Please sign in to comment.