Skip to content

Commit

Permalink
NIFI-12855: Add previous event IDs to provenance events to facilitate…
Browse files Browse the repository at this point in the history
… full graph traversal
  • Loading branch information
mattyb149 committed Aug 11, 2024
1 parent 920edfc commit 713f0cf
Show file tree
Hide file tree
Showing 31 changed files with 576 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@
import java.util.List;
import java.util.Set;

import static java.util.Collections.EMPTY_SET;
import static java.util.Collections.emptyList;

/**
* Implementation of {@link ProvenanceRepository} that does not
* store events.
*
*/
public class NoOpProvenanceRepository implements ProvenanceRepository {
public class NoOpProvenanceRepository extends AbstractProvenanceRepository {

@Override
public void initialize(EventReporter eventReporter, Authorizer authorizer,
Expand Down Expand Up @@ -134,7 +133,7 @@ public List<SearchableField> getSearchableAttributes() {

@Override
public Set<String> getContainerNames() {
return EMPTY_SET;
return Set.of();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
Expand Down Expand Up @@ -273,7 +275,7 @@ public interface ProvenanceEventBuilder {
ProvenanceEventBuilder setDetails(String details);

/**
* Sets the to which the FlowFile was routed for
* Sets the relationship to which the FlowFile was routed for
* {@link ProvenanceEventType#ROUTE} events. This is valid only for
* {@link ProvenanceEventType#ROUTE} events and will be ignored for any
* other event types.
Expand All @@ -283,6 +285,13 @@ public interface ProvenanceEventBuilder {
*/
ProvenanceEventBuilder setRelationship(Relationship relationship);

/**
* Sets the IDs for the event that happened previously to this event for the given FlowFile
* @param previousEventIds The previous event IDs (usually one except for JOIN events and such)
* @return the builder
*/
ProvenanceEventBuilder setPreviousEventIds(Set<Long> previousEventIds);

/**
* Populates the builder with as much information as it can from the given
* FlowFile
Expand All @@ -297,7 +306,7 @@ public interface ProvenanceEventBuilder {
* {@link ProvenanceEventRecord#getEventId()} on the
* {@link ProvenanceEventRecord} that is returned will yield
* <code>-1</code>. This is because the implementation of the Event may
* depend on the {@link ProvevenanceEventRepository} to generate the unique
* depend on the {@link ProvenanceEventRepository} to generate the unique
* identifier.
*
* @return the event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Describes an event that happened to a FlowFile.
Expand All @@ -34,6 +35,12 @@ public interface ProvenanceEventRecord {
*/
long getEventId();

/**
* @return a unique ID for the "parent" Provenance Event, namely the one that came directly before this event
* for the given FlowFile. For source events such as CREATE, this value should be set to -1
*/
Set<Long> getPreviousEventIds();

/**
* @return the time at which this Provenance Event was created, as the
* number of milliseconds since epoch
Expand Down Expand Up @@ -149,14 +156,14 @@ default boolean isRemotePortType() {
/**
* @return the UUID's of all Parent FlowFiles. This is applicable only when
* the {@link ProvenanceEventType} is of type
* {@link ProvenanceEventType#SPAWN SPAWN}
* {@link ProvenanceEventType#JOIN JOIN}
*/
List<String> getParentUuids();

/**
* @return the UUID's of all Child FlowFiles. This is applicable only when
* the {@link ProvenanceEventType} is of type
* {@link ProvenanceEventType#SPAWN SPAWN}
* {@link ProvenanceEventType#FORK FORK}
*/
List<String> getChildUuids();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.util.List;
import java.util.Set;

/**
* This Repository houses Provenance Events. The repository is responsible for
Expand Down Expand Up @@ -93,5 +94,18 @@ public interface ProvenanceEventRepository {
*/
void close() throws IOException;

/**
* Returns the previous provenance event IDs for the given FlowFile
* @param flowFileUUID the UUID of the FlowFile
* @return the previous event IDs for the given FlowFile
*/
Set<Long> getPreviousEventIds(String flowFileUUID);

/**
* Updates the previous provenance event IDs for the given event
*
* @param record The record for which to update the previous event IDs
* @param previousIds the list of previous event IDs to set for the record, or null to remove
*/
void updatePreviousEventIds(ProvenanceEventRecord record, Set<Long> previousIds);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance;

import java.util.Set;

public interface UpdateableProvenanceEventRecord extends ProvenanceEventRecord {

void setEventId(final long eventId);

void setPreviousEventIds(Set<Long> previousEventIds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,23 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* A Provenance Event that is used to replace another Provenance Event when authorizations
* are not granted for the original Provenance Event
*/
public class PlaceholderProvenanceEvent implements ProvenanceEventRecord {
public class PlaceholderProvenanceEvent implements UpdateableProvenanceEventRecord {
private final String componentId;
private final long eventId;
private long eventId;
private Set<Long> previousEventIds;
private final long eventTime;
private final String flowFileUuid;

public PlaceholderProvenanceEvent(final ProvenanceEventRecord original) {
this.componentId = original.getComponentId();
this.eventId = original.getEventId();
this.previousEventIds = original.getPreviousEventIds();
this.eventTime = original.getEventTime();
this.flowFileUuid = original.getFlowFileUuid();
}
Expand All @@ -43,6 +46,20 @@ public long getEventId() {
return eventId;
}

@Override
public void setEventId(long eventId) {
this.eventId = eventId;
}

@Override
public Set<Long> getPreviousEventIds() {
return previousEventIds;
}

@Override
public void setPreviousEventIds(Set<Long> previousEventIds) {
this.previousEventIds = previousEventIds;
}
@Override
public long getEventTime() {
return eventTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,17 @@ public class SearchableFields {
public static final SearchableField SourceQueueIdentifier
= new NamedSearchableField("SourceQueueIdentifier", "sourceQueueIdentifier", "Source Queue Identifier", false, SearchableFieldType.STRING);

public static final SearchableField PreviousEventIdentifiers
= new NamedSearchableField("PreviousEventIdentifiers", "previousEventIdentifiers", "Previous Event Identifiers", false, SearchableFieldType.LONG);

private static final Map<String, SearchableField> standardFields;

static {
final SearchableField[] searchableFields = new SearchableField[]{
EventTime, FlowFileUUID, Filename, EventType, TransitURI,
ComponentID, AlternateIdentifierURI, FileSize, Relationship, Details,
LineageStartDate, LineageIdentifier, ContentClaimSection, ContentClaimContainer, ContentClaimIdentifier,
ContentClaimOffset, SourceQueueIdentifier};
ContentClaimOffset, SourceQueueIdentifier, PreviousEventIdentifiers};

final Map<String, SearchableField> fields = new HashMap<>();
for (final SearchableField field : searchableFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Holder for provenance relevant information
*/
public class StandardProvenanceEventRecord implements ProvenanceEventRecord {
public class StandardProvenanceEventRecord implements UpdateableProvenanceEventRecord {

private final long eventTime;
private final long entryDate;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class StandardProvenanceEventRecord implements ProvenanceEventRecord {
private final Map<String, String> updatedAttributes;

private volatile long eventId = -1L;
private volatile Set<Long> previousEventIds;

StandardProvenanceEventRecord(final Builder builder) {
this.eventTime = builder.eventTime;
Expand Down Expand Up @@ -108,6 +110,7 @@ public class StandardProvenanceEventRecord implements ProvenanceEventRecord {
if (builder.eventId != null) {
eventId = builder.eventId;
}
previousEventIds = builder.previousEventIds;
}

public static StandardProvenanceEventRecord copy(StandardProvenanceEventRecord other) {
Expand All @@ -123,7 +126,8 @@ public long getStorageByteOffset() {
return storageByteOffset;
}

void setEventId(final long eventId) {
@Override
public void setEventId(final long eventId) {
this.eventId = eventId;
}

Expand All @@ -132,6 +136,14 @@ public long getEventId() {
return eventId;
}

@Override
public Set<Long> getPreviousEventIds() {
return previousEventIds;
}

public void setPreviousEventIds(Set<Long> previousEventIds) {
this.previousEventIds = previousEventIds;
}
@Override
public long getEventTime() {
return eventTime;
Expand Down Expand Up @@ -316,11 +328,10 @@ public boolean equals(final Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof StandardProvenanceEventRecord)) {
if (!(obj instanceof StandardProvenanceEventRecord other)) {
return false;
}

final StandardProvenanceEventRecord other = (StandardProvenanceEventRecord) obj;
// If event ID's are populated and not equal, return false. If they have not yet been populated, do not
// use them in the comparison.
if (eventId > 0L && other.getEventId() > 0L && eventId != other.getEventId()) {
Expand All @@ -338,11 +349,7 @@ public boolean equals(final Object obj) {
return false;
}

if (different(childrenUuids, other.childrenUuids)) {
return false;
}

// SPAWN had issues indicating which should be the event's FlowFileUUID in the case that there is 1 parent and 1 child.
// JOIN had issues indicating which should be the event's FlowFileUUID in the case that there is 1 parent and 1 child.
if (!uuid.equals(other.uuid)) {
return false;
}
Expand Down Expand Up @@ -374,15 +381,15 @@ private boolean different(final List<String> a, final List<String> b) {
return false;
}

if (a == null && b != null && !b.isEmpty()) {
if (a == null && !b.isEmpty()) {
return true;
}

if (a == null && b.isEmpty()) {
if (a == null) {
return false;
}

if (a != null && !a.isEmpty() && b == null) {
if (!a.isEmpty() && b == null) {
return true;
}

Expand Down Expand Up @@ -413,11 +420,13 @@ private boolean different(final List<String> a, final List<String> b) {
public String toString() {
return "ProvenanceEventRecord ["
+ "eventId=" + eventId
+ ", previousEventIds=" + previousEventIds
+ ", eventType=" + eventType
+ ", eventTime=" + new Date(eventTime)
+ ", uuid=" + uuid
+ ", fileSize=" + contentSize
+ ", componentId=" + componentId
+ ", componentType=" + componentType
+ ", transitUri=" + transitUri
+ ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier
+ ", parentUuids=" + parentUuids
Expand Down Expand Up @@ -461,6 +470,7 @@ public static class Builder implements ProvenanceEventBuilder {
private long eventDuration = -1L;
private String storageFilename;
private Long eventId;
private Set<Long> previousEventIds;

private String contentClaimSection;
private String contentClaimContainer;
Expand Down Expand Up @@ -519,6 +529,7 @@ public Builder fromEvent(final ProvenanceEventRecord event) {
storageFilename = standardProvEvent.storageFilename;
}

previousEventIds = event.getPreviousEventIds();
return this;
}

Expand Down Expand Up @@ -572,6 +583,10 @@ public ProvenanceEventBuilder copy() {
copy.storageByteOffset = storageByteOffset;
copy.storageFilename = storageFilename;

if (previousEventIds != null) {
copy.previousEventIds = previousEventIds;
}

return copy;
}

Expand Down Expand Up @@ -740,6 +755,11 @@ public Builder setDetails(String details) {
return this;
}

@Override
public ProvenanceEventBuilder setPreviousEventIds(Set<Long> previousEventIds) {
this.previousEventIds = previousEventIds;
return this;
}
@Override
public Builder setRelationship(Relationship relationship) {
this.relationship = relationship.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ private Object normalizeValue(final Object value) throws SQLException {
return null;
}

if (value instanceof List) {
return ((List) value).toArray();
if (value instanceof Set) {
return ((Set) value).toArray();
}

if (value instanceof Array) {
return ((Array) value).getArray();
if (value instanceof List) {
return ((List) value).toArray();
}

return value;
Expand Down
Loading

0 comments on commit 713f0cf

Please sign in to comment.