Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize Monitoring Bulk Request Compressed #56410

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.time.DateFormatter;
Expand All @@ -28,7 +29,9 @@
import org.elasticsearch.xpack.monitoring.exporter.ExportBulk;
import org.elasticsearch.xpack.monitoring.exporter.ExportException;

import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -56,10 +59,15 @@ class HttpExportBulk extends ExportBulk {
private final DateFormatter formatter;

/**
* The bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}.
* The compressed bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}.
*/
private BytesReference payload = null;

/**
* Uncompressed length of {@link #payload} contents.
*/
private long payloadLength = -1L;

HttpExportBulk(final String name, final RestClient client, final Map<String, String> parameters,
final DateFormatter dateTimeFormatter, final ThreadContext threadContext) {
super(name, threadContext);
Expand All @@ -73,14 +81,17 @@ class HttpExportBulk extends ExportBulk {
public void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
try {
if (docs != null && docs.isEmpty() == false) {
try (BytesStreamOutput payload = new BytesStreamOutput()) {
final BytesStreamOutput scratch = new BytesStreamOutput();
final CountingOutputStream countingStream;
try (StreamOutput payload = CompressorFactory.COMPRESSOR.streamOutput(scratch)) {
countingStream = new CountingOutputStream(payload);
for (MonitoringDoc monitoringDoc : docs) {
writeDocument(monitoringDoc, payload);
writeDocument(monitoringDoc, countingStream);
}

// store the payload until we flush
this.payload = payload.bytes();
}
payloadLength = countingStream.bytesWritten;
// store the payload until we flush
this.payload = scratch.bytes();
}
} catch (Exception e) {
throw new ExportException("failed to add documents to export bulk [{}]", e, name);
Expand All @@ -97,7 +108,8 @@ public void doFlush(ActionListener<Void> listener) throws ExportException {
request.addParameter(param.getKey(), param.getValue());
}
try {
request.setEntity(new InputStreamEntity(payload.streamInput(), payload.length(), ContentType.APPLICATION_JSON));
request.setEntity(new InputStreamEntity(
CompressorFactory.COMPRESSOR.streamInput(payload.streamInput()), payloadLength, ContentType.APPLICATION_JSON));
} catch (IOException e) {
listener.onFailure(e);
return;
Expand Down Expand Up @@ -127,7 +139,7 @@ public void onFailure(Exception exception) {
}
}

private void writeDocument(MonitoringDoc doc, StreamOutput out) throws IOException {
private void writeDocument(MonitoringDoc doc, OutputStream out) throws IOException {
final XContentType xContentType = XContentType.JSON;
final XContent xContent = xContentType.xContent();

Expand Down Expand Up @@ -166,4 +178,39 @@ private void writeDocument(MonitoringDoc doc, StreamOutput out) throws IOExcepti
name, index, id, doc.getType()
);
}

// Counting input stream used to record the uncompressed size of the bulk payload when writing it to a compressed stream
private static final class CountingOutputStream extends FilterOutputStream {
private long bytesWritten = 0;

CountingOutputStream(final OutputStream out) {
super(out);
}

@Override
public void write(final int b) throws IOException {
out.write(b);
count(1);
}
@Override
public void write(final byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
out.write(b, off, len);
count(len);
}

@Override
public void close() {
// don't close nested stream
}

protected void count(final long written) {
if (written != -1) {
bytesWritten += written;
}
}
}
}