Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metadata-jobs): improve consumer logging #10173

Merged
merged 1 commit into from
Mar 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -628,12 +628,20 @@ public List<UpdateAspectResult> ingestAspects(
public List<UpdateAspectResult> ingestAspects(
@Nonnull final AspectsBatch aspectsBatch, boolean emitMCL, boolean overwrite) {

// Skip DB timer for empty batch
if (aspectsBatch.getItems().size() == 0) {
return Collections.emptyList();
}

log.info("Ingesting aspects batch to database, items: {}", aspectsBatch.getItems());
Timer.Context ingestToLocalDBTimer =
MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time();
List<UpdateAspectResult> ingestResults = ingestAspectsToLocalDB(aspectsBatch, overwrite);
List<UpdateAspectResult> mclResults = emitMCL(ingestResults, emitMCL);
ingestToLocalDBTimer.stop();
long took = ingestToLocalDBTimer.stop();
log.info(
"Ingestion of aspects batch to database took {} ms", TimeUnit.NANOSECONDS.toMillis(took));

List<UpdateAspectResult> mclResults = emitMCL(ingestResults, emitMCL);
return mclResults;
}

Expand Down Expand Up @@ -1505,10 +1513,7 @@ public Optional<Pair<Future<?>, Boolean>> conditionallyProduceMCLAsync(
AspectSpec aspectSpec) {
boolean isNoOp = oldAspect == newAspect;
if (!isNoOp || alwaysEmitChangeLog || shouldAspectEmitChangeLog(aspectSpec)) {
log.debug(
"Producing MetadataChangeLog for ingested aspect {}, urn {}",
aspectSpec.getName(),
entityUrn);
log.info("Producing MCL for ingested aspect {}, urn {}", aspectSpec.getName(), entityUrn);

final MetadataChangeLog metadataChangeLog =
constructMCL(
Expand All @@ -1528,8 +1533,8 @@ public Optional<Pair<Future<?>, Boolean>> conditionallyProduceMCLAsync(
alwaysProduceMCLAsync(entityUrn, aspectSpec, metadataChangeLog);
return emissionStatus.getFirst() != null ? Optional.of(emissionStatus) : Optional.empty();
} else {
log.debug(
"Skipped producing MetadataChangeLog for ingested aspect {}, urn {}. Aspect has not changed.",
log.info(
"Skipped producing MCL for ingested aspect {}, urn {}. Aspect has not changed.",
aspectSpec.getName(),
entityUrn);
return Optional.empty();
Expand Down Expand Up @@ -1636,7 +1641,7 @@ private void ingestSnapshotUnion(
final List<Pair<String, RecordTemplate>> aspectRecordsToIngest =
NewModelUtils.getAspectsFromSnapshot(snapshotRecord);

log.info("INGEST urn {} with system metadata {}", urn, systemMetadata.toString());
log.info("Ingesting entity urn {} with system metadata {}", urn, systemMetadata.toString());

AspectsBatchImpl aspectsBatch =
AspectsBatchImpl.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public void appendRunId(@Nonnull String entityName, @Nonnull Urn urn, @Nullable
return;
}
final String docId = maybeDocId.get();
log.debug(String.format("Appending run id for entityName: %s, docId: %s", entityName, docId));
log.info(
"Appending run id for entity name: {}, doc id: {}, run id: {}", entityName, docId, runId);
esWriteDAO.applyScriptUpdate(
entityName,
docId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon

if (response.hasFailures()) {
log.error(
"Failed to feed bulk request. Number of events: "
"Failed to feed bulk request "
+ executionId
+ "."
+ " Number of events: "
+ response.getItems().length
+ " Took time ms: "
+ response.getTook().getMillis()
Expand All @@ -56,7 +59,10 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
+ response.buildFailureMessage());
} else {
log.info(
"Successfully fed bulk request. Number of events: "
"Successfully fed bulk request "
+ executionId
+ "."
+ " Number of events: "
+ response.getItems().length
+ " Took time ms: "
+ response.getTook().getMillis()
Expand All @@ -69,7 +75,8 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// Exception raised outside this method
log.error(
"Error feeding bulk request. No retries left. Request: {}",
"Error feeding bulk request {}. No retries left. Request: {}",
executionId,
buildBulkRequestSummary(request),
failure);
incrementMetrics(request, failure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ private ESBulkProcessor(
public ESBulkProcessor add(DocWriteRequest<?> request) {
MetricUtils.counter(this.getClass(), ES_WRITES_METRIC).inc();
bulkProcessor.add(request);
log.info(
"Added request id: {}, operation type: {}, index: {}",
request.id(),
request.opType(),
request.index());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,15 @@ public void consume(final ConsumerRecord<String, String> consumerRecord) {
try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
final String record = consumerRecord.value();
log.debug("Got DHUE");

log.info(
"Got DHUE event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
consumerRecord.key(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());

Optional<DataHubUsageEventTransformer.TransformedDocument> eventDocument =
dataHubUsageEventTransformer.transformDataHubUsageEvent(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,14 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
final GenericRecord record = consumerRecord.value();
log.debug(
"Got Generic MCL on topic: {}, partition: {}, offset: {}",
log.info(
"Got MCL event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
consumerRecord.key(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset());
consumerRecord.offset(),
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());
MetricUtils.counter(this.getClass(), "received_mcl_count").inc();

MetadataChangeLog event;
Expand All @@ -96,17 +99,23 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
return;
}

log.debug(
"Invoking MCL hooks for urn: {}, key: {}",
log.info(
"Invoking MCL hooks for urn: {}, aspect name: {}, entity type: {}, change type: {}",
event.getEntityUrn(),
event.getEntityKeyAspect());
event.hasAspectName() ? event.getAspectName() : null,
event.hasEntityType() ? event.getEntityType() : null,
event.hasChangeType() ? event.getChangeType() : null);

// Here - plug in additional "custom processor hooks"
for (MetadataChangeLogHook hook : this.hooks) {
if (!hook.isEnabled()) {
log.debug(String.format("Skipping disabled hook %s", hook.getClass()));
log.info(String.format("Skipping disabled hook %s", hook.getClass()));
continue;
}
log.info(
"Invoking MCL hook {} for urn: {}",
hook.getClass().getSimpleName(),
event.getEntityUrn());
try (Timer.Context ignored =
MetricUtils.timer(this.getClass(), hook.getClass().getSimpleName() + "_latency")
.time()) {
Expand All @@ -121,10 +130,7 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
}
// TODO: Manually commit kafka offsets after full processing.
MetricUtils.counter(this.getClass(), "consumed_mcl_count").inc();
log.debug(
"Successfully completed MCL hooks for urn: {}, key: {}",
event.getEntityUrn(),
event.getEntityKeyAspect());
log.info("Successfully completed MCL hooks for urn: {}", event.getEntityUrn());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
final GenericRecord record = consumerRecord.value();

log.info(
"Got MCE event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
consumerRecord.key(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());

log.debug("Record {}", record);

MetadataChangeEvent event = new MetadataChangeEvent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,25 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "consume").time()) {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
final GenericRecord record = consumerRecord.value();

log.info(
"Got MCP event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
consumerRecord.key(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());

log.debug("Record {}", record);

MetadataChangeProposal event = new MetadataChangeProposal();
try {
event = EventUtils.avroToPegasusMCP(record);
log.debug("MetadataChangeProposal {}", event);
// TODO: Get this from the event itself.
entityClient.ingestProposal(event, false);
String urn = entityClient.ingestProposal(event, false);
log.info("Successfully processed MCP event urn: {}", urn);
} catch (Throwable throwable) {
log.error("MCP Processor Error", throwable);
log.error("Message: {}", record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)

kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
final GenericRecord record = consumerRecord.value();
log.debug(
"Got Generic PE on topic: {}, partition: {}, offset: {}",
log.info(
"Got PE event key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
consumerRecord.key(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset());
consumerRecord.offset(),
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());
MetricUtils.counter(this.getClass(), "received_pe_count").inc();

PlatformEvent event;
Expand All @@ -68,9 +71,13 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
return;
}

log.debug("Invoking PE hooks for event name {}", event.getName());
log.info("Invoking PE hooks for event name {}", event.getName());

for (PlatformEventHook hook : this.hooks) {
log.info(
"Invoking PE hook {} for event name {}",
hook.getClass().getSimpleName(),
event.getName());
try (Timer.Context ignored =
MetricUtils.timer(this.getClass(), hook.getClass().getSimpleName() + "_latency")
.time()) {
Expand All @@ -83,7 +90,7 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
}
}
MetricUtils.counter(this.getClass(), "consumed_pe_count").inc();
log.debug("Successfully completed PE hooks for event with name {}", event.getName());
log.info("Successfully completed PE hooks for event with name {}", event.getName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testAsyncDefaultAspects() throws URISyntaxException {
.request(req)
.build())));
aspectResource.ingestProposal(mcp, "false");
verify(producer, times(10))
verify(producer, times(5))
.produceMetadataChangeLog(eq(urn), any(AspectSpec.class), any(MetadataChangeLog.class));
verifyNoMoreInteractions(producer);
}
Expand Down
Loading