Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Count coordinating and primary bytes as write bytes #58575

Merged
merged 15 commits into from
Jul 1, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -42,9 +43,11 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.greaterThan;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2)
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2, numClientNodes = 1)
public class WriteMemoryLimitsIT extends ESIntegTestCase {

public static final String INDEX_NAME = "test";

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
Expand All @@ -70,13 +73,12 @@ protected int numberOfShards() {
}

public void testWriteBytesAreIncremented() throws Exception {
final String index = "test";
assertAcked(prepareCreate(index, Settings.builder()
assertAcked(prepareCreate(INDEX_NAME, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)));
ensureGreen(index);
ensureGreen(INDEX_NAME);

IndicesStatsResponse response = client().admin().indices().prepareStats(index).get();
IndicesStatsResponse response = client().admin().indices().prepareStats(INDEX_NAME).get();
String primaryId = Stream.of(response.getShards())
.map(ShardStats::getShardRouting)
.filter(ShardRouting::primary)
Expand All @@ -89,8 +91,10 @@ public void testWriteBytesAreIncremented() throws Exception {
.findAny()
.get()
.currentNodeId();
String primaryName = client().admin().cluster().prepareState().get().getState().nodes().get(primaryId).getName();
String replicaName = client().admin().cluster().prepareState().get().getState().nodes().get(replicaId).getName();
DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes();
String primaryName = nodes.get(primaryId).getName();
String replicaName = nodes.get(replicaId).getName();
String coordinatingOnlyNode = nodes.getCoordinatingOnlyNodes().iterator().next().value.getName();

final CountDownLatch replicationSendPointReached = new CountDownLatch(1);
final CountDownLatch latchBlockingReplicationSend = new CountDownLatch(1);
Expand All @@ -117,7 +121,7 @@ public void testWriteBytesAreIncremented() throws Exception {
final BulkRequest bulkRequest = new BulkRequest();
int totalRequestSize = 0;
for (int i = 0; i < 80; ++i) {
IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID())
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
totalRequestSize += request.ramBytesUsed();
assertTrue(request.ramBytesUsed() > request.source().length());
Expand All @@ -128,18 +132,19 @@ public void testWriteBytesAreIncremented() throws Exception {
final long bulkShardRequestSize = totalRequestSize;

try {
final ActionFuture<BulkResponse> successFuture = client(replicaName).bulk(bulkRequest);
final ActionFuture<BulkResponse> successFuture = client(coordinatingOnlyNode).bulk(bulkRequest);
replicationSendPointReached.await();

WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName);
WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName);
WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode);

assertThat(primaryWriteLimits.getCoordinatingBytes(), greaterThan(bulkShardRequestSize));
assertThat(primaryWriteLimits.getPrimaryBytes(), greaterThan(bulkShardRequestSize));
assertEquals(0, primaryWriteLimits.getReplicaBytes());
assertEquals(bulkRequestSize, replicaWriteLimits.getCoordinatingBytes());
assertEquals(0, replicaWriteLimits.getPrimaryBytes());
assertEquals(0, replicaWriteLimits.getReplicaBytes());
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
assertEquals(0, replicaWriteLimits.getWriteBytes());
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());

ThreadPool replicaThreadPool = replicaTransportService.getThreadPool();
// Block the replica Write thread pool
Expand All @@ -162,31 +167,45 @@ public void testWriteBytesAreIncremented() throws Exception {
newActionsSendPointReached.await();
latchBlockingReplicationSend.countDown();

IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID())
IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID())
.source(Collections.singletonMap("key", randomAlphaOfLength(50)));
final BulkRequest secondBulkRequest = new BulkRequest();
secondBulkRequest.add(request);

ActionFuture<BulkResponse> secondFuture = client(replicaName).bulk(secondBulkRequest);
// Use the primary or the replica data node as the coordinating node this time
boolean usePrimaryAsCoordinatingNode = randomBoolean();
final ActionFuture<BulkResponse> secondFuture;
if (usePrimaryAsCoordinatingNode) {
secondFuture = client(primaryName).bulk(secondBulkRequest);
} else {
secondFuture = client(replicaName).bulk(secondBulkRequest);
}

final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed();
final long secondBulkShardRequestSize = request.ramBytesUsed();

assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getCoordinatingBytes());
assertBusy(() -> assertThat(replicaWriteLimits.getReplicaBytes(),
if (usePrimaryAsCoordinatingNode) {
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize));
assertEquals(0, replicaWriteLimits.getWriteBytes());
} else {
assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize));
assertEquals(secondBulkRequestSize, replicaWriteLimits.getWriteBytes());
}
assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes());
assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(),
greaterThan(bulkShardRequestSize + secondBulkShardRequestSize)));

latchBlockingReplication.countDown();

successFuture.actionGet();
secondFuture.actionGet();

assertEquals(0, primaryWriteLimits.getCoordinatingBytes());
assertEquals(0, primaryWriteLimits.getPrimaryBytes());
assertEquals(0, primaryWriteLimits.getReplicaBytes());
assertEquals(0, replicaWriteLimits.getCoordinatingBytes());
assertEquals(0, replicaWriteLimits.getPrimaryBytes());
assertEquals(0, replicaWriteLimits.getReplicaBytes());
assertEquals(0, primaryWriteLimits.getWriteBytes());
assertEquals(0, primaryWriteLimits.getReplicaWriteBytes());
assertEquals(0, replicaWriteLimits.getWriteBytes());
assertEquals(0, replicaWriteLimits.getReplicaWriteBytes());
assertEquals(0, coordinatingWriteLimits.getWriteBytes());
assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes());
} finally {
if (replicationSendPointReached.getCount() > 0) {
replicationSendPointReached.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest<?> docWriteReque
@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
long indexingBytes = bulkRequest.ramBytesUsed();
final Releasable releasable = writeMemoryLimits.markCoordinatingOperationStarted(indexingBytes);
final Releasable releasable = writeMemoryLimits.markWriteOperationStarted(indexingBytes);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
try {
doInternalExecute(task, bulkRequest, releasingListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,24 @@

public class WriteMemoryLimits {

private final AtomicLong coordinatingBytes = new AtomicLong(0);
private final AtomicLong primaryBytes = new AtomicLong(0);
private final AtomicLong replicaBytes = new AtomicLong(0);
private final AtomicLong writeBytes = new AtomicLong(0);
private final AtomicLong replicaWriteBytes = new AtomicLong(0);

public Releasable markCoordinatingOperationStarted(long bytes) {
coordinatingBytes.addAndGet(bytes);
return () -> coordinatingBytes.getAndAdd(-bytes);
public Releasable markWriteOperationStarted(long bytes) {
writeBytes.addAndGet(bytes);
return () -> writeBytes.getAndAdd(-bytes);
}

public long getCoordinatingBytes() {
return coordinatingBytes.get();
public long getWriteBytes() {
return writeBytes.get();
}

public Releasable markPrimaryOperationStarted(long bytes) {
primaryBytes.addAndGet(bytes);
return () -> primaryBytes.getAndAdd(-bytes);
public Releasable markReplicaWriteStarted(long bytes) {
replicaWriteBytes.getAndAdd(bytes);
return () -> replicaWriteBytes.getAndAdd(-bytes);
}

public long getPrimaryBytes() {
return primaryBytes.get();
}

public Releasable markReplicaOperationStarted(long bytes) {
replicaBytes.getAndAdd(bytes);
return () -> replicaBytes.getAndAdd(-bytes);
}

public long getReplicaBytes() {
return replicaBytes.get();
public long getReplicaWriteBytes() {
return replicaWriteBytes.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran
writeMemoryLimits);
}

@Override
protected void doExecute(Task parentTask, ResyncReplicationRequest request, ActionListener<ResyncReplicationResponse> listener) {
assert false : "use TransportResyncReplicationAction#sync";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this the default implementation for doExecute in TransportWriteAction when rerouteBypassed() is true? Maybe we can then also rename that method to supportsRerouteAction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. This is a little tricky because I have to expose reroute in TransportReplicationAction. But I did that and we can discuss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh. Actually I holding off this for the moment because it gets kind of messy. We can talk more about this. I did rename the method.

}

@Override
protected ResyncReplicationResponse newResponseInstance(StreamInput in) throws IOException {
return new ResyncReplicationResponse(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionResponse;
Expand Down Expand Up @@ -285,7 +286,7 @@ protected Releasable checkOperationLimits(final Request request) {
}

protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request, final TransportChannel channel, final Task task) {
Releasable releasable = checkPrimaryLimits(request.getRequest());
Releasable releasable = checkPrimaryLimits(request.getRequest(), request.rerouteWasLocal());
ActionListener<Response> listener =
ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close);

Expand All @@ -296,7 +297,7 @@ protected void handlePrimaryRequest(final ConcreteShardRequest<Request> request,
}
}

protected Releasable checkPrimaryLimits(final Request request) {
protected Releasable checkPrimaryLimits(final Request request, boolean rerouteWasLocal) {
return () -> {};
}

Expand Down Expand Up @@ -371,8 +372,7 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
transportService.sendRequest(relocatingNode, transportPrimaryAction,
new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(),
primaryRequest.getPrimaryTerm()),
transportOptions,
primaryRequest.getPrimaryTerm()), transportOptions,
new ActionListenerResponseHandler<>(onCompletionListener, reader) {
@Override
public void handleResponse(Response response) {
Expand Down Expand Up @@ -584,7 +584,7 @@ public void onResponse(Releasable releasable) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
AsyncReplicaAction.this.onFailure(e);
}));
// TODO: Evaludate if we still need to catch this exception
// TODO: Evaluate if we still need to catch this exception
} catch (Exception e) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
AsyncReplicaAction.this.onFailure(e);
Expand Down Expand Up @@ -750,7 +750,7 @@ private void performLocalAction(ClusterState state, ShardRouting primary, Discov
transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId());
}
performAction(node, transportPrimaryAction, true,
new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id())));
new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()), true));
}

private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) {
Expand Down Expand Up @@ -1102,19 +1102,31 @@ public static class ConcreteShardRequest<R extends TransportRequest> extends Tra
private final String targetAllocationID;
private final long primaryTerm;
private final R request;
private final boolean rerouteWasLocal;

public ConcreteShardRequest(Writeable.Reader<R> requestReader, StreamInput in) throws IOException {
targetAllocationID = in.readString();
primaryTerm = in.readVLong();
// TODO: Change after backport
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
rerouteWasLocal = in.readBoolean();
} else {
rerouteWasLocal = false;
}
request = requestReader.read(in);
}

public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm) {
this(request, targetAllocationID, primaryTerm, false);
}

public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm, boolean rerouteWasLocal) {
Objects.requireNonNull(request);
Objects.requireNonNull(targetAllocationID);
this.request = request;
this.targetAllocationID = targetAllocationID;
this.primaryTerm = primaryTerm;
this.rerouteWasLocal = rerouteWasLocal;
}

@Override
Expand Down Expand Up @@ -1145,9 +1157,17 @@ public String getDescription() {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(targetAllocationID);
out.writeVLong(primaryTerm);
// TODO: Change after backport
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(rerouteWasLocal);
}
request.writeTo(out);
}

public boolean rerouteWasLocal() {
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
return rerouteWasLocal;
}

public R getRequest() {
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,18 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe

@Override
protected Releasable checkOperationLimits(Request request) {
return writeMemoryLimits.markCoordinatingOperationStarted(primaryOperationSize(request));
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
}

@Override
protected Releasable checkPrimaryLimits(Request request) {
return writeMemoryLimits.markPrimaryOperationStarted(primaryOperationSize(request));
protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal) {
// If the reroute this primary request was submitted by a reroute on this local node, we have already
// accounted the bytes.
if (rerouteWasLocal) {
return () -> {};
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
} else {
return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request));
}
}

protected long primaryOperationSize(Request request) {
Expand All @@ -94,7 +100,7 @@ protected long primaryOperationSize(Request request) {

@Override
protected Releasable checkReplicaLimits(ReplicaRequest request) {
return writeMemoryLimits.markReplicaOperationStarted(replicaOperationSize(request));
return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request));
}

protected long replicaOperationSize(ReplicaRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1164,19 +1164,14 @@ private void assertAllPendingWriteLimitsReleased() throws Exception {
assertBusy(() -> {
for (NodeAndClient nodeAndClient : nodes.values()) {
WriteMemoryLimits writeMemoryLimits = getInstance(WriteMemoryLimits.class, nodeAndClient.name);
final long coordinatingBytes = writeMemoryLimits.getCoordinatingBytes();
if (coordinatingBytes > 0) {
throw new AssertionError("pending coordinating write bytes [" + coordinatingBytes + "] bytes on node ["
final long writeBytes = writeMemoryLimits.getWriteBytes();
if (writeBytes > 0) {
throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node ["
+ nodeAndClient.name + "].");
}
final long primaryBytes = writeMemoryLimits.getPrimaryBytes();
if (primaryBytes > 0) {
throw new AssertionError("pending primary write bytes [" + coordinatingBytes + "] bytes on node ["
+ nodeAndClient.name + "].");
}
final long replicaBytes = writeMemoryLimits.getReplicaBytes();
if (replicaBytes > 0) {
throw new AssertionError("pending replica write bytes [" + coordinatingBytes + "] bytes on node ["
final long replicaWriteBytes = writeMemoryLimits.getReplicaWriteBytes();
if (replicaWriteBytes > 0) {
throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node ["
+ nodeAndClient.name + "].");
}
}
Expand Down
Loading