Skip to content

Commit

Permalink
Significantly Lower Monitoring HttpExport Memory Footprint (elastic#4…
Browse files Browse the repository at this point in the history
…8854)

The `HttpExportBulk` exporter is using a lot more memory than it needs to
by allocating buffers for serialization and IO:

* Remove copying of all bytes when flushing, instead use the stream wrapper
* Remove copying step turning the BAOS into a `byte[]`
  * This also avoids the allocation of a single huge `byte[]` and instead makes use of the internal paging logic of the `BytesStreamOutput`
* Don't allocate a new BAOS for every document, just keep appending to a single BAOS
  • Loading branch information
original-brownbear committed Nov 11, 2019
1 parent acae071 commit f201a49
Showing 1 changed file with 38 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,22 @@
package org.elasticsearch.xpack.monitoring.exporter.http;

import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NByteArrayEntity;
import org.apache.http.entity.InputStreamEntity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils;
Expand Down Expand Up @@ -60,7 +58,7 @@ class HttpExportBulk extends ExportBulk {
/**
* The bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}.
*/
private byte[] payload = null;
private BytesReference payload = null;

HttpExportBulk(final String name, final RestClient client, final Map<String, String> parameters,
final DateFormatter dateTimeFormatter, final ThreadContext threadContext) {
Expand All @@ -77,12 +75,11 @@ public void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
if (docs != null && docs.isEmpty() == false) {
try (BytesStreamOutput payload = new BytesStreamOutput()) {
for (MonitoringDoc monitoringDoc : docs) {
// any failure caused by an individual doc will be written as an empty byte[], thus not impacting the rest
payload.write(toBulkBytes(monitoringDoc));
writeDocument(monitoringDoc, payload);
}

// store the payload until we flush
this.payload = BytesReference.toBytes(payload.bytes());
this.payload = payload.bytes();
}
}
} catch (Exception e) {
Expand All @@ -94,12 +91,19 @@ public void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
public void doFlush(ActionListener<Void> listener) throws ExportException {
if (payload == null) {
listener.onFailure(new ExportException("unable to send documents because none were loaded for export bulk [{}]", name));
} else if (payload.length != 0) {
} else if (payload.length() != 0) {
final Request request = new Request("POST", "/_bulk");
for (Map.Entry<String, String> param : params.entrySet()) {
request.addParameter(param.getKey(), param.getValue());
}
request.setEntity(new NByteArrayEntity(payload, ContentType.APPLICATION_JSON));
try {
request.setEntity(new InputStreamEntity(payload.streamInput(), payload.length(), ContentType.APPLICATION_JSON));
} catch (IOException e) {
listener.onFailure(e);
return;
}
// null out serialized docs to make things easier on the GC
payload = null;

client.performRequestAsync(request, new ResponseListener() {
@Override
Expand All @@ -123,51 +127,43 @@ public void onFailure(Exception exception) {
}
}

private byte[] toBulkBytes(final MonitoringDoc doc) throws IOException {
private void writeDocument(MonitoringDoc doc, StreamOutput out) throws IOException {
final XContentType xContentType = XContentType.JSON;
final XContent xContent = xContentType.xContent();

final String index = MonitoringTemplateUtils.indexName(formatter, doc.getSystem(), doc.getTimestamp());
final String id = doc.getId();

try (BytesStreamOutput out = new BytesStreamOutput()) {
try (XContentBuilder builder = new XContentBuilder(xContent, out)) {
// Builds the bulk action metadata line
builder.startObject();
try (XContentBuilder builder = new XContentBuilder(xContent, out)) {
// Builds the bulk action metadata line
builder.startObject();
{
builder.startObject("index");
{
builder.startObject("index");
{
builder.field("_index", index);
if (id != null) {
builder.field("_id", id);
}
builder.field("_index", index);
if (id != null) {
builder.field("_id", id);
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
}

// Adds action metadata line bulk separator
out.write(xContent.streamSeparator());

// Adds the source of the monitoring document
final BytesRef source = XContentHelper.toXContent(doc, xContentType, false).toBytesRef();
out.write(source.bytes, source.offset, source.length);

// Adds final bulk separator
out.write(xContent.streamSeparator());
// Adds action metadata line bulk separator
out.write(xContent.streamSeparator());

logger.trace(
"http exporter [{}] - added index request [index={}, id={}, monitoring data type={}]",
name, index, id, doc.getType()
);
// Adds the source of the monitoring document
try (XContentBuilder builder = new XContentBuilder(xContent, out)) {
doc.toXContent(builder, ToXContent.EMPTY_PARAMS);
}

return BytesReference.toBytes(out.bytes());
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to render document [{}], skipping it [{}]", doc, name), e);
// Adds final bulk separator
out.write(xContent.streamSeparator());

return BytesRef.EMPTY_BYTES;
}
logger.trace(
"http exporter [{}] - added index request [index={}, id={}, monitoring data type={}]",
name, index, id, doc.getType()
);
}

}

0 comments on commit f201a49

Please sign in to comment.