diff --git a/integration-tests/src/main/java/io/druid/testing/clients/EventReceiverFirehoseTestClient.java b/integration-tests/src/main/java/io/druid/testing/clients/EventReceiverFirehoseTestClient.java index 77e27e2254c5..e56e5a080d3d 100644 --- a/integration-tests/src/main/java/io/druid/testing/clients/EventReceiverFirehoseTestClient.java +++ b/integration-tests/src/main/java/io/druid/testing/clients/EventReceiverFirehoseTestClient.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.api.client.util.Charsets; import com.google.common.base.Throwables; import com.metamx.common.ISE; @@ -44,14 +45,22 @@ public class EventReceiverFirehoseTestClient private final ObjectMapper jsonMapper; private final HttpClient httpClient; private final String chatID; + private final ObjectMapper smileMapper; - public EventReceiverFirehoseTestClient(String host, String chatID, ObjectMapper jsonMapper, HttpClient httpClient) + public EventReceiverFirehoseTestClient( + String host, + String chatID, + ObjectMapper jsonMapper, + HttpClient httpClient, + ObjectMapper smileMapper + ) { this.host = host; this.jsonMapper = jsonMapper; this.responseHandler = new StatusResponseHandler(Charsets.UTF_8); this.httpClient = httpClient; this.chatID = chatID; + this.smileMapper = smileMapper; } private String getURL() @@ -70,15 +79,15 @@ private String getURL() * * @return */ - public int postEvents(Collection> events) + public int postEvents(Collection> events, ObjectMapper objectMapper, String mediaType) { try { StatusResponseHolder response = httpClient.go( new Request( HttpMethod.POST, new URL(getURL()) ).setContent( - MediaType.APPLICATION_JSON, - this.jsonMapper.writeValueAsBytes(events) + mediaType, + objectMapper.writeValueAsBytes(events) ), responseHandler ).get(); @@ -91,7 +100,7 @@ HttpMethod.POST, new URL(getURL()) response.getContent() ); } - Map responseData = jsonMapper.readValue( + Map responseData = objectMapper.readValue( response.getContent(), new TypeReference>() { } @@ -103,18 +112,31 @@ HttpMethod.POST, new URL(getURL()) } } + /** + * Reads each events from file and post them to the indexing service. + * Uses both smileMapper and jsonMapper to send events alternately. + * + * @param file location of file to post events from + * + * @return number of events sent to the indexing service + */ public int postEventsFromFile(String file) { - try { - BufferedReader reader = new BufferedReader( - new InputStreamReader( - EventReceiverFirehoseTestClient.class.getResourceAsStream( - file - ) - ) - ); + try ( + BufferedReader reader = new BufferedReader( + new InputStreamReader( + EventReceiverFirehoseTestClient.class.getResourceAsStream( + file + ) + ) + ); + ) { + String s; Collection> events = new ArrayList>(); + // Test sending events using both jsonMapper and smileMapper. + // sends events one by one using both jsonMapper and smileMapper. + int totalEventsPosted = 0; while ((s = reader.readLine()) != null) { events.add( (Map) this.jsonMapper.readValue( @@ -123,12 +145,19 @@ public int postEventsFromFile(String file) } ) ); + ObjectMapper mapper = (totalEventsPosted % 2 == 0) ? jsonMapper : smileMapper; + String mediaType = (totalEventsPosted % 2 == 0) + ? MediaType.APPLICATION_JSON + : SmileMediaTypes.APPLICATION_JACKSON_SMILE; + totalEventsPosted += postEvents(events, mapper, mediaType); + ; + events = new ArrayList<>(); } - int eventsPosted = postEvents(events); - if (eventsPosted != events.size()) { - throw new ISE("All events not posted, expected : %d actual : %d", events.size(), eventsPosted); + + if (totalEventsPosted != events.size()) { + throw new ISE("All events not posted, expected : %d actual : %d", events.size(), totalEventsPosted); } - return eventsPosted; + return totalEventsPosted; } catch (Exception e) { throw Throwables.propagate(e); diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/AbstractIndexerTest.java index 42b846641e85..f5f86b69a22f 100644 --- a/integration-tests/src/test/java/io/druid/tests/indexer/AbstractIndexerTest.java +++ b/integration-tests/src/test/java/io/druid/tests/indexer/AbstractIndexerTest.java @@ -19,6 +19,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; import io.druid.testing.clients.CoordinatorResourceTestClient; import io.druid.testing.clients.OverlordResourceTestClient; import io.druid.testing.utils.FromFileTestQueryHelper; @@ -38,8 +40,11 @@ public abstract class AbstractIndexerTest @Inject protected OverlordResourceTestClient indexer; @Inject + @Json protected ObjectMapper jsonMapper; - + @Inject + @Smile + protected ObjectMapper smileMapper; @Inject protected FromFileTestQueryHelper queryHelper; diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITRealtimeIndexTaskTest.java index db2169a1e6af..6af76cd02d34 100644 --- a/integration-tests/src/test/java/io/druid/tests/indexer/ITRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITRealtimeIndexTaskTest.java @@ -129,7 +129,8 @@ public void postEvents() throws Exception host, EVENT_RECEIVER_SERVICE_NAME, jsonMapper, - httpClient + httpClient, + smileMapper ); client.postEventsFromFile(EVENT_DATA_FILE); } diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITUnionQueryTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITUnionQueryTest.java index ca182e664c10..824f683c4706 100644 --- a/integration-tests/src/test/java/io/druid/tests/indexer/ITUnionQueryTest.java +++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITUnionQueryTest.java @@ -159,7 +159,8 @@ public void postEvents(int id) throws Exception host, EVENT_RECEIVER_SERVICE_PREFIX + id, jsonMapper, - httpClient + httpClient, + smileMapper ); client.postEventsFromFile(UNION_DATA_FILE); } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java index a99406e42538..cb6c947dd03b 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java @@ -20,6 +20,10 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -32,9 +36,15 @@ import io.druid.data.input.Rows; import io.druid.data.input.impl.MapInputRowParser; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; +import java.io.InputStream; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.io.IOException; @@ -58,12 +68,16 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory chatHandlerProvider; + private final ObjectMapper jsonMapper; + private final ObjectMapper smileMapper; @JsonCreator public EventReceiverFirehoseFactory( @JsonProperty("serviceName") String serviceName, @JsonProperty("bufferSize") Integer bufferSize, - @JacksonInject ChatHandlerProvider chatHandlerProvider + @JacksonInject ChatHandlerProvider chatHandlerProvider, + @JacksonInject @Json ObjectMapper jsonMapper, + @JacksonInject @Smile ObjectMapper smileMapper ) { Preconditions.checkNotNull(serviceName, "serviceName"); @@ -71,6 +85,8 @@ public EventReceiverFirehoseFactory( this.serviceName = serviceName; this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize; this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); + this.jsonMapper = jsonMapper; + this.smileMapper = smileMapper; } @Override @@ -123,9 +139,30 @@ public EventReceiverFirehose(MapInputRowParser parser) @POST @Path("/push-events") - @Produces(MediaType.APPLICATION_JSON) - public Response addAll(Collection> events) + @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + public Response addAll( + InputStream in, + @Context final HttpServletRequest req // used only to get request content-type + ) { + final String reqContentType = req.getContentType(); + final boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(reqContentType); + final String contentType = isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON; + + ObjectMapper objectMapper = isSmile ? smileMapper : jsonMapper; + + Collection> events = null; + try { + events = objectMapper.readValue( + in, new TypeReference>>() + { + } + ); + } + catch (IOException e) { + return Response.serverError().entity(ImmutableMap.of("error", e.getMessage())).build(); + } log.debug("Adding %,d events to firehose: %s", events.size(), serviceName); final List rows = Lists.newArrayList(); @@ -146,12 +183,18 @@ public Response addAll(Collection> events) } } - return Response.ok().entity(ImmutableMap.of("eventCount", events.size())).build(); + return Response.ok( + objectMapper.writeValueAsString(ImmutableMap.of("eventCount", events.size())), + contentType + ).build(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw Throwables.propagate(e); } + catch (JsonProcessingException e) { + throw Throwables.propagate(e); + } } @Override