diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java index 276d3013ed928..a7cb2f5260fde 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/WriteMemoryLimitsIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -42,9 +43,11 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.greaterThan; -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2) +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2, numClientNodes = 1) public class WriteMemoryLimitsIT extends ESIntegTestCase { + public static final String INDEX_NAME = "test"; + @Override protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder() @@ -70,13 +73,12 @@ protected int numberOfShards() { } public void testWriteBytesAreIncremented() throws Exception { - final String index = "test"; - assertAcked(prepareCreate(index, Settings.builder() + assertAcked(prepareCreate(INDEX_NAME, Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))); - ensureGreen(index); + ensureGreen(INDEX_NAME); - IndicesStatsResponse response = client().admin().indices().prepareStats(index).get(); + IndicesStatsResponse response = client().admin().indices().prepareStats(INDEX_NAME).get(); String primaryId = Stream.of(response.getShards()) .map(ShardStats::getShardRouting) .filter(ShardRouting::primary) @@ -89,8 +91,10 @@ public void testWriteBytesAreIncremented() throws Exception { .findAny() .get() .currentNodeId(); - String primaryName = client().admin().cluster().prepareState().get().getState().nodes().get(primaryId).getName(); - String replicaName = client().admin().cluster().prepareState().get().getState().nodes().get(replicaId).getName(); + DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes(); + String primaryName = nodes.get(primaryId).getName(); + String replicaName = nodes.get(replicaId).getName(); + String coordinatingOnlyNode = nodes.getCoordinatingOnlyNodes().iterator().next().value.getName(); final CountDownLatch replicationSendPointReached = new CountDownLatch(1); final CountDownLatch latchBlockingReplicationSend = new CountDownLatch(1); @@ -117,7 +121,7 @@ public void testWriteBytesAreIncremented() throws Exception { final BulkRequest bulkRequest = new BulkRequest(); int totalRequestSize = 0; for (int i = 0; i < 80; ++i) { - IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID()) + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) .source(Collections.singletonMap("key", randomAlphaOfLength(50))); totalRequestSize += request.ramBytesUsed(); assertTrue(request.ramBytesUsed() > request.source().length()); @@ -128,18 +132,19 @@ public void testWriteBytesAreIncremented() throws Exception { final long bulkShardRequestSize = totalRequestSize; try { - final ActionFuture successFuture = client(replicaName).bulk(bulkRequest); + final ActionFuture successFuture = client(coordinatingOnlyNode).bulk(bulkRequest); replicationSendPointReached.await(); WriteMemoryLimits primaryWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, primaryName); WriteMemoryLimits replicaWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, replicaName); + WriteMemoryLimits coordinatingWriteLimits = internalCluster().getInstance(WriteMemoryLimits.class, coordinatingOnlyNode); - assertThat(primaryWriteLimits.getCoordinatingBytes(), greaterThan(bulkShardRequestSize)); - assertThat(primaryWriteLimits.getPrimaryBytes(), greaterThan(bulkShardRequestSize)); - assertEquals(0, primaryWriteLimits.getReplicaBytes()); - assertEquals(bulkRequestSize, replicaWriteLimits.getCoordinatingBytes()); - assertEquals(0, replicaWriteLimits.getPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getReplicaBytes()); + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); + assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); ThreadPool replicaThreadPool = replicaTransportService.getThreadPool(); // Block the replica Write thread pool @@ -162,18 +167,32 @@ public void testWriteBytesAreIncremented() throws Exception { newActionsSendPointReached.await(); latchBlockingReplicationSend.countDown(); - IndexRequest request = new IndexRequest(index).id(UUIDs.base64UUID()) + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) .source(Collections.singletonMap("key", randomAlphaOfLength(50))); final BulkRequest secondBulkRequest = new BulkRequest(); secondBulkRequest.add(request); - ActionFuture secondFuture = client(replicaName).bulk(secondBulkRequest); + // Use the primary or the replica data node as the coordinating node this time + boolean usePrimaryAsCoordinatingNode = randomBoolean(); + final ActionFuture secondFuture; + if (usePrimaryAsCoordinatingNode) { + secondFuture = client(primaryName).bulk(secondBulkRequest); + } else { + secondFuture = client(replicaName).bulk(secondBulkRequest); + } final long secondBulkRequestSize = secondBulkRequest.ramBytesUsed(); final long secondBulkShardRequestSize = request.ramBytesUsed(); - assertEquals(bulkRequestSize + secondBulkRequestSize, replicaWriteLimits.getCoordinatingBytes()); - assertBusy(() -> assertThat(replicaWriteLimits.getReplicaBytes(), + if (usePrimaryAsCoordinatingNode) { + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkRequestSize)); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + } else { + assertThat(primaryWriteLimits.getWriteBytes(), greaterThan(bulkShardRequestSize)); + assertEquals(secondBulkRequestSize, replicaWriteLimits.getWriteBytes()); + } + assertEquals(bulkRequestSize, coordinatingWriteLimits.getWriteBytes()); + assertBusy(() -> assertThat(replicaWriteLimits.getReplicaWriteBytes(), greaterThan(bulkShardRequestSize + secondBulkShardRequestSize))); latchBlockingReplication.countDown(); @@ -181,12 +200,12 @@ public void testWriteBytesAreIncremented() throws Exception { successFuture.actionGet(); secondFuture.actionGet(); - assertEquals(0, primaryWriteLimits.getCoordinatingBytes()); - assertEquals(0, primaryWriteLimits.getPrimaryBytes()); - assertEquals(0, primaryWriteLimits.getReplicaBytes()); - assertEquals(0, replicaWriteLimits.getCoordinatingBytes()); - assertEquals(0, replicaWriteLimits.getPrimaryBytes()); - assertEquals(0, replicaWriteLimits.getReplicaBytes()); + assertEquals(0, primaryWriteLimits.getWriteBytes()); + assertEquals(0, primaryWriteLimits.getReplicaWriteBytes()); + assertEquals(0, replicaWriteLimits.getWriteBytes()); + assertEquals(0, replicaWriteLimits.getReplicaWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getWriteBytes()); + assertEquals(0, coordinatingWriteLimits.getReplicaWriteBytes()); } finally { if (replicationSendPointReached.getCount() > 0) { replicationSendPointReached.countDown(); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index fc498383a1ef4..edccbf40de191 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -162,7 +162,7 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest docWriteReque @Override protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { long indexingBytes = bulkRequest.ramBytesUsed(); - final Releasable releasable = writeMemoryLimits.markCoordinatingOperationStarted(indexingBytes); + final Releasable releasable = writeMemoryLimits.markWriteOperationStarted(indexingBytes); final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); try { doInternalExecute(task, bulkRequest, releasingListener); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java b/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java index 84c702f110622..29371f0795088 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/WriteMemoryLimits.java @@ -25,34 +25,24 @@ public class WriteMemoryLimits { - private final AtomicLong coordinatingBytes = new AtomicLong(0); - private final AtomicLong primaryBytes = new AtomicLong(0); - private final AtomicLong replicaBytes = new AtomicLong(0); + private final AtomicLong writeBytes = new AtomicLong(0); + private final AtomicLong replicaWriteBytes = new AtomicLong(0); - public Releasable markCoordinatingOperationStarted(long bytes) { - coordinatingBytes.addAndGet(bytes); - return () -> coordinatingBytes.getAndAdd(-bytes); + public Releasable markWriteOperationStarted(long bytes) { + writeBytes.addAndGet(bytes); + return () -> writeBytes.getAndAdd(-bytes); } - public long getCoordinatingBytes() { - return coordinatingBytes.get(); + public long getWriteBytes() { + return writeBytes.get(); } - public Releasable markPrimaryOperationStarted(long bytes) { - primaryBytes.addAndGet(bytes); - return () -> primaryBytes.getAndAdd(-bytes); + public Releasable markReplicaWriteStarted(long bytes) { + replicaWriteBytes.getAndAdd(bytes); + return () -> replicaWriteBytes.getAndAdd(-bytes); } - public long getPrimaryBytes() { - return primaryBytes.get(); - } - - public Releasable markReplicaOperationStarted(long bytes) { - replicaBytes.getAndAdd(bytes); - return () -> replicaBytes.getAndAdd(-bytes); - } - - public long getReplicaBytes() { - return replicaBytes.get(); + public long getReplicaWriteBytes() { + return replicaWriteBytes.get(); } } diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 638371f414111..74ddcf54b3212 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -64,6 +64,11 @@ public TransportResyncReplicationAction(Settings settings, TransportService tran writeMemoryLimits); } + @Override + protected void doExecute(Task parentTask, ResyncReplicationRequest request, ActionListener listener) { + assert false : "use TransportResyncReplicationAction#sync"; + } + @Override protected ResyncReplicationResponse newResponseInstance(StreamInput in) throws IOException { return new ResyncReplicationResponse(in); diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 1c05b596d97f0..bce9517e3ef64 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -285,7 +285,7 @@ protected Releasable checkOperationLimits(final Request request) { } protected void handlePrimaryRequest(final ConcreteShardRequest request, final TransportChannel channel, final Task task) { - Releasable releasable = checkPrimaryLimits(request.getRequest()); + Releasable releasable = checkPrimaryLimits(request.getRequest(), request.sentFromLocalReroute()); ActionListener listener = ActionListener.runBefore(new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close); @@ -296,7 +296,7 @@ protected void handlePrimaryRequest(final ConcreteShardRequest request, } } - protected Releasable checkPrimaryLimits(final Request request) { + protected Releasable checkPrimaryLimits(final Request request, boolean rerouteWasLocal) { return () -> {}; } @@ -371,8 +371,7 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId()); transportService.sendRequest(relocatingNode, transportPrimaryAction, new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(), - primaryRequest.getPrimaryTerm()), - transportOptions, + primaryRequest.getPrimaryTerm()), transportOptions, new ActionListenerResponseHandler<>(onCompletionListener, reader) { @Override public void handleResponse(Response response) { @@ -584,7 +583,7 @@ public void onResponse(Releasable releasable) { Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller AsyncReplicaAction.this.onFailure(e); })); - // TODO: Evaludate if we still need to catch this exception + // TODO: Evaluate if we still need to catch this exception } catch (Exception e) { Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller AsyncReplicaAction.this.onFailure(e); @@ -750,7 +749,7 @@ private void performLocalAction(ClusterState state, ShardRouting primary, Discov transportPrimaryAction, request.shardId(), request, state.version(), primary.currentNodeId()); } performAction(node, transportPrimaryAction, true, - new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()))); + new ConcreteShardRequest<>(request, primary.allocationId().getId(), indexMetadata.primaryTerm(primary.id()), true)); } private void performRemoteAction(ClusterState state, ShardRouting primary, DiscoveryNode node) { @@ -1102,19 +1101,27 @@ public static class ConcreteShardRequest extends Tra private final String targetAllocationID; private final long primaryTerm; private final R request; + // Indicates if this primary shard request originated by a reroute on this local node. + private final boolean sentFromLocalReroute; public ConcreteShardRequest(Writeable.Reader requestReader, StreamInput in) throws IOException { targetAllocationID = in.readString(); primaryTerm = in.readVLong(); + sentFromLocalReroute = false; request = requestReader.read(in); } public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm) { + this(request, targetAllocationID, primaryTerm, false); + } + + public ConcreteShardRequest(R request, String targetAllocationID, long primaryTerm, boolean sentFromLocalReroute) { Objects.requireNonNull(request); Objects.requireNonNull(targetAllocationID); this.request = request; this.targetAllocationID = targetAllocationID; this.primaryTerm = primaryTerm; + this.sentFromLocalReroute = sentFromLocalReroute; } @Override @@ -1143,11 +1150,19 @@ public String getDescription() { @Override public void writeTo(StreamOutput out) throws IOException { + // If sentFromLocalReroute is marked true, then this request should just be looped back through + // the local transport. It should never be serialized to be sent over the wire. If it is sent over + // the wire, then it was NOT sent from a local reroute. + assert sentFromLocalReroute == false; out.writeString(targetAllocationID); out.writeVLong(primaryTerm); request.writeTo(out); } + public boolean sentFromLocalReroute() { + return sentFromLocalReroute; + } + public R getRequest() { return request; } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 8407dba506a29..dc5c790b2cf9e 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -80,12 +80,18 @@ protected TransportWriteAction(Settings settings, String actionName, TransportSe @Override protected Releasable checkOperationLimits(Request request) { - return writeMemoryLimits.markCoordinatingOperationStarted(primaryOperationSize(request)); + return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); } @Override - protected Releasable checkPrimaryLimits(Request request) { - return writeMemoryLimits.markPrimaryOperationStarted(primaryOperationSize(request)); + protected Releasable checkPrimaryLimits(Request request, boolean rerouteWasLocal) { + // If this primary request was submitted by a reroute performed on this local node, we have already + // accounted the bytes. + if (rerouteWasLocal) { + return () -> {}; + } else { + return writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); + } } protected long primaryOperationSize(Request request) { @@ -94,7 +100,7 @@ protected long primaryOperationSize(Request request) { @Override protected Releasable checkReplicaLimits(ReplicaRequest request) { - return writeMemoryLimits.markReplicaOperationStarted(replicaOperationSize(request)); + return writeMemoryLimits.markReplicaWriteStarted(replicaOperationSize(request)); } protected long replicaOperationSize(ReplicaRequest request) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 1fbfd11603ace..f3d8a18c99c8f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1164,19 +1164,14 @@ private void assertAllPendingWriteLimitsReleased() throws Exception { assertBusy(() -> { for (NodeAndClient nodeAndClient : nodes.values()) { WriteMemoryLimits writeMemoryLimits = getInstance(WriteMemoryLimits.class, nodeAndClient.name); - final long coordinatingBytes = writeMemoryLimits.getCoordinatingBytes(); - if (coordinatingBytes > 0) { - throw new AssertionError("pending coordinating write bytes [" + coordinatingBytes + "] bytes on node [" + final long writeBytes = writeMemoryLimits.getWriteBytes(); + if (writeBytes > 0) { + throw new AssertionError("pending write bytes [" + writeBytes + "] bytes on node [" + nodeAndClient.name + "]."); } - final long primaryBytes = writeMemoryLimits.getPrimaryBytes(); - if (primaryBytes > 0) { - throw new AssertionError("pending primary write bytes [" + coordinatingBytes + "] bytes on node [" - + nodeAndClient.name + "]."); - } - final long replicaBytes = writeMemoryLimits.getReplicaBytes(); - if (replicaBytes > 0) { - throw new AssertionError("pending replica write bytes [" + coordinatingBytes + "] bytes on node [" + final long replicaWriteBytes = writeMemoryLimits.getReplicaWriteBytes(); + if (replicaWriteBytes > 0) { + throw new AssertionError("pending replica write bytes [" + writeBytes + "] bytes on node [" + nodeAndClient.name + "]."); } } diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java index 8ff18c4763e24..8b55572d19b8b 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java @@ -6,12 +6,15 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.action.bulk.WriteMemoryLimits; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.CcrSingleNodeTestCase; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; @@ -23,6 +26,8 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.stream.StreamSupport; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -74,6 +79,64 @@ public void testFollowIndex() throws Exception { ensureEmptyWriteBuffers(); } + public void testWriteLimitsIncremented() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, Collections.emptyMap()); + assertAcked(client().admin().indices().prepareCreate("leader").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader"); + + // Use a sufficiently small number of docs to ensure that they are well below the number of docs that + // can be sent in a single TransportBulkShardOperationsAction + final long firstBatchNumDocs = randomIntBetween(10, 20); + long sourceSize = 0; + for (int i = 0; i < firstBatchNumDocs; i++) { + BytesArray source = new BytesArray("{}"); + sourceSize += source.length(); + client().prepareIndex("leader").setSource(source, XContentType.JSON).get(); + } + + ThreadPool nodeThreadPool = getInstanceFromNode(ThreadPool.class); + ThreadPool.Info writeInfo = StreamSupport.stream(nodeThreadPool.info().spliterator(), false) + .filter(i -> i.getName().equals(ThreadPool.Names.WRITE)).findAny().get(); + int numberOfThreads = writeInfo.getMax(); + CountDownLatch threadBlockedLatch = new CountDownLatch(numberOfThreads); + CountDownLatch blocker = new CountDownLatch(1); + + for (int i = 0; i < numberOfThreads; ++i) { + nodeThreadPool.executor(ThreadPool.Names.WRITE).execute(() -> { + try { + threadBlockedLatch.countDown(); + blocker.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + }); + } + threadBlockedLatch.await(); + + try { + final PutFollowAction.Request followRequest = getPutFollowRequest("leader", "follower"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + WriteMemoryLimits memoryLimits = getInstanceFromNode(WriteMemoryLimits.class); + final long finalSourceSize = sourceSize; + assertBusy(() -> { + // The actual write bytes will be greater due to other request fields. However, this test is + // just spot checking that the bytes are incremented at all. + assertTrue(memoryLimits.getWriteBytes() > finalSourceSize); + }); + blocker.countDown(); + assertBusy(() -> { + assertThat(client().prepareSearch("follower").get().getHits().getTotalHits().value, equalTo(firstBatchNumDocs)); + }); + ensureEmptyWriteBuffers(); + } finally { + if (blocker.getCount() > 0) { + blocker.countDown(); + } + } + + } + public void testRemoveRemoteConnection() throws Exception { PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); request.setName("my_pattern"); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 6918c4a96678a..512d300d25b3f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SeqNoStats; @@ -25,6 +26,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.index.engine.AlreadyProcessedFollowingEngineException; @@ -36,6 +38,8 @@ public class TransportBulkShardOperationsAction extends TransportWriteAction { + private final WriteMemoryLimits writeMemoryLimits; + @Inject public TransportBulkShardOperationsAction( final Settings settings, @@ -58,6 +62,19 @@ public TransportBulkShardOperationsAction( BulkShardOperationsRequest::new, BulkShardOperationsRequest::new, ThreadPool.Names.WRITE, false, writeMemoryLimits); + this.writeMemoryLimits = writeMemoryLimits; + } + + @Override + protected void doExecute(Task task, BulkShardOperationsRequest request, ActionListener listener) { + // This is executed on the follower coordinator node and we need to mark the bytes. + Releasable releasable = writeMemoryLimits.markWriteOperationStarted(primaryOperationSize(request)); + ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); + try { + super.doExecute(task, request, releasingListener); + } catch (Exception e) { + releasingListener.onFailure(e); + } } @Override