diff --git a/ingestion-sink/pom.xml b/ingestion-sink/pom.xml index e86224ecad..a1de91b96e 100644 --- a/ingestion-sink/pom.xml +++ b/ingestion-sink/pom.xml @@ -20,6 +20,11 @@ + + com.fasterxml.jackson.datatype + jackson-datatype-json-org + ${jackson.version} + com.google.cloud google-cloud-pubsub diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/io/BigQuery.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/io/BigQuery.java index a21891584b..d62f578115 100644 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/io/BigQuery.java +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/io/BigQuery.java @@ -17,6 +17,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.logging.Logger; public class BigQuery { @@ -35,6 +36,8 @@ private WriteErrors(List errors) { public static class Write implements Function> { + private static final Logger logger = Logger.getLogger("BigQuery.Write"); + private final com.google.cloud.bigquery.BigQuery bigquery; private final int maxBytes = 10_000_000; // HTTP request size limit: 10 MB private final int maxMessages = 10_000; // Maximum rows per request: 10,000 @@ -137,7 +140,8 @@ private synchronized Optional> add(TableRow row) { int index = newSize - 1; return Optional.of(result.thenApplyAsync(r -> { List errors = r.getErrorsFor(index); - if (!errors.isEmpty()) { + if (errors != null && !errors.isEmpty()) { + logger.warning(errors.toString()); throw new WriteErrors(errors); } return row; diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/io/Pubsub.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/io/Pubsub.java index 4d2383602f..b574ef19cb 100644 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/io/Pubsub.java +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/io/Pubsub.java @@ -4,12 +4,15 @@ package com.mozilla.telemetry.ingestion.io; +import static java.util.logging.Level.WARNING; + import com.google.cloud.pubsub.v1.Subscriber; import com.google.common.annotations.VisibleForTesting; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import java.util.logging.Logger; public class Pubsub { @@ -18,6 +21,8 @@ private Pubsub() { public static class Read { + static Logger logger = Logger.getLogger("Pubsub.Read"); + @VisibleForTesting Subscriber subscriber; @@ -30,6 +35,7 @@ public Read(String subscriptionName, Function documentId = Optional .ofNullable(message.getAttributesOrDefault("document_id", null)); + final Map contents; switch (tableRowFormat) { case raw: - return new TableRow(tableId, message.getSerializedSize(), documentId, rawContents(message)); + contents = rawContents(message); + break; case decoded: case payload: default: throw new IllegalArgumentException( "TableRowFormat not yet implemented: " + tableRowFormat.name()); } + return new TableRow(tableId, Json.asBytes(contents).length, documentId, contents); } /** @@ -69,7 +76,8 @@ public TableRow apply(PubsubMessage message) { */ private Map rawContents(PubsubMessage message) { Map contents = new HashMap<>(message.getAttributesMap()); - contents.put("payload", message.getData().toByteArray()); + // bytes must inserted as base64 encoded strings + contents.put("payload", base64Encoder.encodeToString(message.getData().toByteArray())); return contents; } } diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/util/Json.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/util/Json.java new file mode 100644 index 0000000000..56cae32850 --- /dev/null +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/util/Json.java @@ -0,0 +1,30 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package com.mozilla.telemetry.ingestion.util; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; + +public class Json { + + /** + * A Jackson ObjectMapper pre-configured for use in com.mozilla.telemetry. + */ + private static final ObjectMapper MAPPER = new ObjectMapper(); + + /** + * Serialize {@code data} as a {@code byte[]}. + * + * @exception UncheckedIOException if data cannot be encoded as json. + */ + public static byte[] asBytes(Object data) { + try { + return MAPPER.writeValueAsBytes(data); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/transform/PubsubMessageToTableRowTest.java b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/transform/PubsubMessageToTableRowTest.java index 4578c588f3..a89c64adbe 100644 --- a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/transform/PubsubMessageToTableRowTest.java +++ b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/transform/PubsubMessageToTableRowTest.java @@ -27,10 +27,9 @@ public void canTransformSingleMessage() { assertEquals("telemetry_raw", actual.tableId.getDataset()); assertEquals("main_v4", actual.tableId.getTable()); - assertEquals(104, actual.byteSize); - assertEquals("test", new String((byte[]) actual.content.remove("payload"))); + assertEquals(120, actual.byteSize); assertEquals(ImmutableMap.of("document_id", "id", "document_namespace", "telemetry", - "document_type", "main", "document_version", "4"), actual.content); + "document_type", "main", "document_version", "4", "payload", "dGVzdA=="), actual.content); } @Test @@ -41,11 +40,9 @@ public void canHandleEmptyValues() { assertEquals("_raw", actual.tableId.getDataset()); assertEquals("_v", actual.tableId.getTable()); - assertEquals(65, actual.byteSize); - assertEquals("", new String((byte[]) actual.content.remove("payload"))); - assertEquals( - ImmutableMap.of("document_namespace", "", "document_type", "", "document_version", ""), - actual.content); + assertEquals(79, actual.byteSize); + assertEquals(ImmutableMap.of("document_namespace", "", "document_type", "", "document_version", + "", "payload", ""), actual.content); } @Test(expected = IllegalArgumentException.class) diff --git a/pom.xml b/pom.xml index 70783cc67d..38e86394d8 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ - 1.49.0 + 1.61.0 2.9.9