Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Count coordinating and primary bytes as write bytes #58575

Merged
merged 15 commits into from
Jul 1, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,10 @@ public void testWriteBytesAreIncremented() throws Exception {
WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);

assertThat(primaryWriteLimits.getCoordinatingBytes(), greaterThan(bulkShardRequestSize));
assertThat(primaryWriteLimits.getPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertEquals(0, primaryWriteLimits.getReplicaBytes());
assertEquals(bulkRequestSize, replicaWriteLimits.getCoordinatingBytes());
assertEquals(0, replicaWriteLimits.getPrimaryBytes());
assertEquals(0, replicaWriteLimits.getReplicaBytes());
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
assertEquals(bulkRequestSize, replicaWriteLimits.getWriteBytes());
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());

ThreadPool replicaThreadPool = replicaTransportService.getThreadPool();
// Block the replica Write thread pool
Expand Down Expand Up @@ -172,21 +170,20 @@ public void testWriteBytesAreIncremented() throws Exception {
final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed();
final long secondBulkShardRequestSize = request.ramBytesUsed();

assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getCoordinatingBytes());
assertBusy(() -> assertThat(replicaWriteLimits.getReplicaBytes(),
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getWriteBytes());
assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(),
greaterThan(bulkShardRequestSize + secondBulkShardRequestSize)));

latchBlockingReplication.countDown();

successFuture.actionGet();
secondFuture.actionGet();

assertEquals(0, primaryWriteLimits.getCoordinatingBytes());
assertEquals(0, primaryWriteLimits.getPrimaryBytes());
assertEquals(0, primaryWriteLimits.getReplicaBytes());
assertEquals(0, replicaWriteLimits.getCoordinatingBytes());
assertEquals(0, replicaWriteLimits.getPrimaryBytes());
assertEquals(0, replicaWriteLimits.getReplicaBytes());
assertEquals(0, primaryWriteLimits.getWriteBytes());
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
assertEquals(0, replicaWriteLimits.getWriteBytes());
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
} finally {
if (replicationSendPointReached.getCount() > 0) {
replicationSendPointReached.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque
@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
long indexingBytes = bulkRequest.ramBytesUsed();
final Releasable releasable = writeMemoryLimits.markCoordinatingOperationStarted(indexingBytes);
final Releasable releasable = writeMemoryLimits.markWriteOperationStarted(indexingBytes);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
try {
doInternalExecute(task, bulkRequest, releasingListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,24 @@

public class WriteMemoryLimits {

private final AtomicLong coordinatingBytes = new AtomicLong(0);
private final AtomicLong primaryBytes = new AtomicLong(0);
private final AtomicLong replicaBytes = new AtomicLong(0);
private final AtomicLong writeBytes = new AtomicLong(0);
private final AtomicLong replicaWriteBytes = new AtomicLong(0);

public Releasable markCoordinatingOperationStarted(long bytes) {
coordinatingBytes.addAndGet(bytes);
return () -> coordinatingBytes.getAndAdd(-bytes);
public Releasable markWriteOperationStarted(long bytes) {
writeBytes.addAndGet(bytes);
return () -> writeBytes.getAndAdd(-bytes);
}

public long getCoordinatingBytes() {
return coordinatingBytes.get();
public long getWriteBytes() {
return writeBytes.get();
}

public Releasable markPrimaryOperationStarted(long bytes) {
primaryBytes.addAndGet(bytes);
return () -> primaryBytes.getAndAdd(-bytes);
public Releasable markReplicaWriteStarted(long bytes) {
replicaWriteBytes.getAndAdd(bytes);
return () -> replicaWriteBytes.getAndAdd(-bytes);
}

public long getPrimaryBytes() {
return primaryBytes.get();
}

public Releasable markReplicaOperationStarted(long bytes) {
replicaBytes.getAndAdd(bytes);
return () -> replicaBytes.getAndAdd(-bytes);
}

public long getReplicaBytes() {
return replicaBytes.get();
public long getReplicaWriteBytes() {
return replicaWriteBytes.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran
writeMemoryLimits);
}

@Override
protected void doExecute(Task parentTask, ResyncReplicationRequest request, ActionListener<ResyncReplicationResponse> listener) {
assert false : "use TransportResyncReplicationAction#sync";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this the default implementation for doExecute in TransportWriteAction when rerouteBypassed() is true? Maybe we can then also rename that method to supportsRerouteAction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. This is a little tricky because I have to expose reroute in TransportReplicationAction. But I did that and we can discuss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh. Actually I holding off this for the moment because it gets kind of messy. We can talk more about this. I did rename the method.

}

@Override
protected ResyncReplicationResponse newResponseInstance(StreamInput in) throws IOException {
return new ResyncReplicationResponse(in);
Expand All @@ -86,6 +91,11 @@ public ClusterBlockLevel indexBlockLevel() {
return null;
}

@Override
protected boolean rerouteBypassed() {
return true;
}

@Override
protected void dispatchedShardOperationOnPrimary(ResyncReplicationRequest request, IndexShard primary,
ActionListener<PrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse>> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,28 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe

@Override
protected Releasable checkOperationLimits(Request request) {
return writeMemoryLimits.markCoordinatingOperationStarted(primaryOperationSize(request));
if (rerouteBypassed() == false) {
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
} else {
return () -> {};
}
}

@Override
protected Releasable checkPrimaryLimits(Request request) {
return writeMemoryLimits.markPrimaryOperationStarted(primaryOperationSize(request));
if (rerouteBypassed()) {
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
} else {
return () -> {};
}
}

/**
* Some actions bypass the reroute phase and directly call the primary action. If this is the case, we
* need to mark the WRITE bytes when the primary request is received.
*/
protected boolean rerouteBypassed() {
return false;
}

protected long primaryOperationSize(Request request) {
Expand All @@ -94,7 +110,7 @@ protected long primaryOperationSize(Request request) {

@Override
protected Releasable checkReplicaLimits(ReplicaRequest request) {
return writeMemoryLimits.markReplicaOperationStarted(replicaOperationSize(request));
return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request));
}

protected long replicaOperationSize(ReplicaRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ public void handleException(TransportException e) {
}
}

@Override
protected boolean rerouteBypassed() {
return true;
}

@Override
protected void dispatchedShardOperationOnPrimary(Request request, IndexShard primary,
ActionListener<PrimaryResult<Request, Response>> listener) {
Expand Down