Skip to content

Commit

Permalink
Fix ingestion-sink cross-project inserts
Browse files Browse the repository at this point in the history
Update google cloud library to 1.61.0 to match beam 2.13.0 and pick up googleapis/google-cloud-java#4196

Emit stack traces in ingestion-sink
  • Loading branch information
relud committed Aug 15, 2019
1 parent 7cbd98d commit 8ad5e0a
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 12 deletions.
5 changes: 5 additions & 0 deletions ingestion-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
</properties>

<dependencies>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-json-org</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.logging.Logger;

public class BigQuery {

Expand All @@ -35,6 +36,8 @@ private WriteErrors(List<BigQueryError> errors) {

public static class Write implements Function<Write.TableRow, CompletableFuture<Write.TableRow>> {

private static final Logger logger = Logger.getLogger("BigQuery.Write");

private final com.google.cloud.bigquery.BigQuery bigquery;
private final int maxBytes = 10_000_000; // HTTP request size limit: 10 MB
private final int maxMessages = 10_000; // Maximum rows per request: 10,000
Expand Down Expand Up @@ -137,7 +140,8 @@ private synchronized Optional<CompletableFuture<TableRow>> add(TableRow row) {
int index = newSize - 1;
return Optional.of(result.thenApplyAsync(r -> {
List<BigQueryError> errors = r.getErrorsFor(index);
if (!errors.isEmpty()) {
if (errors != null && !errors.isEmpty()) {
logger.warning(errors.toString());
throw new WriteErrors(errors);
}
return row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

package com.mozilla.telemetry.ingestion.io;

import static java.util.logging.Level.WARNING;

import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.annotations.VisibleForTesting;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.logging.Logger;

public class Pubsub {

Expand All @@ -18,6 +21,8 @@ private Pubsub() {

public static class Read {

static Logger logger = Logger.getLogger("Pubsub.Read");

@VisibleForTesting
Subscriber subscriber;

Expand All @@ -30,6 +35,7 @@ public Read(String subscriptionName, Function<PubsubMessage, CompletableFuture<?
if (exception == null) {
consumer.ack();
} else {
logger.log(WARNING, "Exception while attempting to deliver message:", exception);
consumer.nack();
}
})))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.google.cloud.bigquery.TableId;
import com.google.pubsub.v1.PubsubMessage;
import com.mozilla.telemetry.ingestion.io.BigQuery.Write.TableRow;
import com.mozilla.telemetry.ingestion.util.Json;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -18,6 +20,8 @@ public enum TableRowFormat {
raw, decoded, payload
}

private static final Base64.Encoder base64Encoder = Base64.getEncoder();

private final String tableSpecTemplate;
private final TableRowFormat tableRowFormat;

Expand Down Expand Up @@ -47,15 +51,18 @@ public TableRow apply(PubsubMessage message) {
final TableId tableId = getTableId(message.getAttributesMap());
final Optional<String> documentId = Optional
.ofNullable(message.getAttributesOrDefault("document_id", null));
final Map<String, Object> contents;
switch (tableRowFormat) {
case raw:
return new TableRow(tableId, message.getSerializedSize(), documentId, rawContents(message));
contents = rawContents(message);
break;
case decoded:
case payload:
default:
throw new IllegalArgumentException(
"TableRowFormat not yet implemented: " + tableRowFormat.name());
}
return new TableRow(tableId, Json.asBytes(contents).length, documentId, contents);
}

/**
Expand All @@ -69,7 +76,8 @@ public TableRow apply(PubsubMessage message) {
*/
private Map<String, Object> rawContents(PubsubMessage message) {
Map<String, Object> contents = new HashMap<>(message.getAttributesMap());
contents.put("payload", message.getData().toByteArray());
// bytes must inserted as base64 encoded strings
contents.put("payload", base64Encoder.encodeToString(message.getData().toByteArray()));
return contents;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

package com.mozilla.telemetry.ingestion.util;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.UncheckedIOException;

public class Json {

/**
* A Jackson ObjectMapper pre-configured for use in com.mozilla.telemetry.
*/
private static final ObjectMapper MAPPER = new ObjectMapper();

/**
* Serialize {@code data} as a {@code byte[]}.
*
* @exception UncheckedIOException if data cannot be encoded as json.
*/
public static byte[] asBytes(Object data) {
try {
return MAPPER.writeValueAsBytes(data);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ public void canTransformSingleMessage() {

assertEquals("telemetry_raw", actual.tableId.getDataset());
assertEquals("main_v4", actual.tableId.getTable());
assertEquals(104, actual.byteSize);
assertEquals("test", new String((byte[]) actual.content.remove("payload")));
assertEquals(120, actual.byteSize);
assertEquals(ImmutableMap.of("document_id", "id", "document_namespace", "telemetry",
"document_type", "main", "document_version", "4"), actual.content);
"document_type", "main", "document_version", "4", "payload", "dGVzdA=="), actual.content);
}

@Test
Expand All @@ -41,11 +40,9 @@ public void canHandleEmptyValues() {

assertEquals("_raw", actual.tableId.getDataset());
assertEquals("_v", actual.tableId.getTable());
assertEquals(65, actual.byteSize);
assertEquals("", new String((byte[]) actual.content.remove("payload")));
assertEquals(
ImmutableMap.of("document_namespace", "", "document_type", "", "document_version", ""),
actual.content);
assertEquals(79, actual.byteSize);
assertEquals(ImmutableMap.of("document_namespace", "", "document_type", "", "document_version",
"", "payload", ""), actual.content);
}

@Test(expected = IllegalArgumentException.class)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

<!-- Keep these dependency versions in sync with those pulled in by beam;
check https://mvnrepository.com/artifact/org.apache.beam -->
<google-cloud.version>1.49.0</google-cloud.version>
<google-cloud.version>1.61.0</google-cloud.version>
<jackson.version>2.9.9</jackson.version>
</properties>

Expand Down

0 comments on commit 8ad5e0a

Please sign in to comment.