diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExportBulk.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExportBulk.java index 3476495cc928a..3468a49423dff 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExportBulk.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExportBulk.java @@ -6,12 +6,9 @@ package org.elasticsearch.xpack.monitoring.exporter.http; import org.apache.http.entity.ContentType; -import org.apache.http.nio.entity.NByteArrayEntity; +import org.apache.http.entity.InputStreamEntity; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; @@ -19,11 +16,12 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils; @@ -60,7 +58,7 @@ class HttpExportBulk extends ExportBulk { /** * The bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}. */ - private byte[] payload = null; + private BytesReference payload = null; HttpExportBulk(final String name, final RestClient client, final Map parameters, final DateFormatter dateTimeFormatter, final ThreadContext threadContext) { @@ -77,12 +75,11 @@ public void doAdd(Collection docs) throws ExportException { if (docs != null && docs.isEmpty() == false) { try (BytesStreamOutput payload = new BytesStreamOutput()) { for (MonitoringDoc monitoringDoc : docs) { - // any failure caused by an individual doc will be written as an empty byte[], thus not impacting the rest - payload.write(toBulkBytes(monitoringDoc)); + writeDocument(monitoringDoc, payload); } // store the payload until we flush - this.payload = BytesReference.toBytes(payload.bytes()); + this.payload = payload.bytes(); } } } catch (Exception e) { @@ -94,12 +91,19 @@ public void doAdd(Collection docs) throws ExportException { public void doFlush(ActionListener listener) throws ExportException { if (payload == null) { listener.onFailure(new ExportException("unable to send documents because none were loaded for export bulk [{}]", name)); - } else if (payload.length != 0) { + } else if (payload.length() != 0) { final Request request = new Request("POST", "/_bulk"); for (Map.Entry param : params.entrySet()) { request.addParameter(param.getKey(), param.getValue()); } - request.setEntity(new NByteArrayEntity(payload, ContentType.APPLICATION_JSON)); + try { + request.setEntity(new InputStreamEntity(payload.streamInput(), payload.length(), ContentType.APPLICATION_JSON)); + } catch (IOException e) { + listener.onFailure(e); + return; + } + // null out serialized docs to make things easier on the GC + payload = null; client.performRequestAsync(request, new ResponseListener() { @Override @@ -123,51 +127,43 @@ public void onFailure(Exception exception) { } } - private byte[] toBulkBytes(final MonitoringDoc doc) throws IOException { + private void writeDocument(MonitoringDoc doc, StreamOutput out) throws IOException { final XContentType xContentType = XContentType.JSON; final XContent xContent = xContentType.xContent(); final String index = MonitoringTemplateUtils.indexName(formatter, doc.getSystem(), doc.getTimestamp()); final String id = doc.getId(); - try (BytesStreamOutput out = new BytesStreamOutput()) { - try (XContentBuilder builder = new XContentBuilder(xContent, out)) { - // Builds the bulk action metadata line - builder.startObject(); + try (XContentBuilder builder = new XContentBuilder(xContent, out)) { + // Builds the bulk action metadata line + builder.startObject(); + { + builder.startObject("index"); { - builder.startObject("index"); - { - builder.field("_index", index); - if (id != null) { - builder.field("_id", id); - } + builder.field("_index", index); + if (id != null) { + builder.field("_id", id); } - builder.endObject(); } builder.endObject(); } + builder.endObject(); + } - // Adds action metadata line bulk separator - out.write(xContent.streamSeparator()); - - // Adds the source of the monitoring document - final BytesRef source = XContentHelper.toXContent(doc, xContentType, false).toBytesRef(); - out.write(source.bytes, source.offset, source.length); - - // Adds final bulk separator - out.write(xContent.streamSeparator()); + // Adds action metadata line bulk separator + out.write(xContent.streamSeparator()); - logger.trace( - "http exporter [{}] - added index request [index={}, id={}, monitoring data type={}]", - name, index, id, doc.getType() - ); + // Adds the source of the monitoring document + try (XContentBuilder builder = new XContentBuilder(xContent, out)) { + doc.toXContent(builder, ToXContent.EMPTY_PARAMS); + } - return BytesReference.toBytes(out.bytes()); - } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("failed to render document [{}], skipping it [{}]", doc, name), e); + // Adds final bulk separator + out.write(xContent.streamSeparator()); - return BytesRef.EMPTY_BYTES; - } + logger.trace( + "http exporter [{}] - added index request [index={}, id={}, monitoring data type={}]", + name, index, id, doc.getType() + ); } - }