Skip to content

Commit

Permalink
Serialize Monitoring Bulk Request Compressed
Browse files Browse the repository at this point in the history
Even with changes from elastic#48854 we're still seeing significant (as in tens and hundreds of MB)
buffer usage for bulk exports in some cases which destabilizes master nodes.
Since we need to know the serialized length of the bulk body we can't do the serialization
in a streaming manner. (also it's not easily doable with the HTTP client API we're using anyway).
=> let's at least serialize on heap in compressed form and decompress as we're streaming to the
http connection. For small requests this adds negligible overhead but for large requests this reduces
the size of the payload field by about an order of magnitude (empirically determined) which is a massive
reduction in size when considering O(100MB) bulk requests.
  • Loading branch information
original-brownbear committed May 8, 2020
1 parent e1dbe26 commit f0b09f3
Showing 1 changed file with 55 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.time.DateFormatter;
Expand All @@ -28,7 +29,9 @@
import org.elasticsearch.xpack.monitoring.exporter.ExportBulk;
import org.elasticsearch.xpack.monitoring.exporter.ExportException;

import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -56,10 +59,15 @@ class HttpExportBulk extends ExportBulk {
private final DateFormatter formatter;

/**
* The bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}.
* The compressed bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}.
*/
private BytesReference payload = null;

/**
* Uncompressed length of {@link #payload} contents.
*/
private long payloadLength = -1L;

HttpExportBulk(final String name, final RestClient client, final Map<String, String> parameters,
final DateFormatter dateTimeFormatter, final ThreadContext threadContext) {
super(name, threadContext);
Expand All @@ -73,14 +81,17 @@ class HttpExportBulk extends ExportBulk {
public void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
try {
if (docs != null && docs.isEmpty() == false) {
try (BytesStreamOutput payload = new BytesStreamOutput()) {
final BytesStreamOutput scratch = new BytesStreamOutput();
final CountingOutputStream countingStream;
try (StreamOutput payload = CompressorFactory.COMPRESSOR.streamOutput(scratch)) {
countingStream = new CountingOutputStream(payload);
for (MonitoringDoc monitoringDoc : docs) {
writeDocument(monitoringDoc, payload);
writeDocument(monitoringDoc, countingStream);
}

// store the payload until we flush
this.payload = payload.bytes();
}
payloadLength = countingStream.bytesWritten;
// store the payload until we flush
this.payload = scratch.bytes();
}
} catch (Exception e) {
throw new ExportException("failed to add documents to export bulk [{}]", e, name);
Expand All @@ -97,7 +108,8 @@ public void doFlush(ActionListener<Void> listener) throws ExportException {
request.addParameter(param.getKey(), param.getValue());
}
try {
request.setEntity(new InputStreamEntity(payload.streamInput(), payload.length(), ContentType.APPLICATION_JSON));
request.setEntity(new InputStreamEntity(
CompressorFactory.COMPRESSOR.streamInput(payload.streamInput()), payloadLength, ContentType.APPLICATION_JSON));
} catch (IOException e) {
listener.onFailure(e);
return;
Expand Down Expand Up @@ -127,7 +139,7 @@ public void onFailure(Exception exception) {
}
}

private void writeDocument(MonitoringDoc doc, StreamOutput out) throws IOException {
private void writeDocument(MonitoringDoc doc, OutputStream out) throws IOException {
final XContentType xContentType = XContentType.JSON;
final XContent xContent = xContentType.xContent();

Expand Down Expand Up @@ -166,4 +178,39 @@ private void writeDocument(MonitoringDoc doc, StreamOutput out) throws IOExcepti
name, index, id, doc.getType()
);
}

// Counting input stream used to record the uncompressed size of the bulk payload when writing it to a compressed stream
private static final class CountingOutputStream extends FilterOutputStream {
private long bytesWritten = 0;

CountingOutputStream(final OutputStream out) {
super(out);
}

@Override
public void write(final int b) throws IOException {
out.write(b);
count(1);
}
@Override
public void write(final byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
out.write(b, off, len);
count(len);
}

@Override
public void close() {
// don't close nested stream
}

protected void count(final long written) {
if (written != -1) {
bytesWritten += written;
}
}
}
}

0 comments on commit f0b09f3

Please sign in to comment.