diff --git a/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java b/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java index 2327cf9978931..f181e626b7f15 100644 --- a/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java +++ b/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Set; -import static java.util.Collections.EMPTY_SET; import static java.util.Collections.emptyList; /** @@ -36,7 +35,7 @@ * store events. * */ -public class NoOpProvenanceRepository implements ProvenanceRepository { +public class NoOpProvenanceRepository extends AbstractProvenanceRepository { @Override public void initialize(EventReporter eventReporter, Authorizer authorizer, @@ -134,7 +133,7 @@ public List getSearchableAttributes() { @Override public Set getContainerNames() { - return EMPTY_SET; + return Set.of(); } @Override diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventBuilder.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventBuilder.java index be4fd5e372464..30aa7adbc79e0 100644 --- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventBuilder.java +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventBuilder.java @@ -18,6 +18,8 @@ import java.util.List; import java.util.Map; +import java.util.Set; + import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; @@ -273,7 +275,7 @@ public interface ProvenanceEventBuilder { ProvenanceEventBuilder setDetails(String details); /** - * Sets the to which the FlowFile was routed for + * Sets the relationship to which the FlowFile was routed for * {@link ProvenanceEventType#ROUTE} events. This is valid only for * {@link ProvenanceEventType#ROUTE} events and will be ignored for any * other event types. @@ -283,6 +285,13 @@ public interface ProvenanceEventBuilder { */ ProvenanceEventBuilder setRelationship(Relationship relationship); + /** + * Sets the IDs for the event that happened previously to this event for the given FlowFile + * @param previousEventIds The previous event IDs (usually one except for JOIN events and such) + * @return the builder + */ + ProvenanceEventBuilder setPreviousEventIds(Set previousEventIds); + /** * Populates the builder with as much information as it can from the given * FlowFile @@ -297,7 +306,7 @@ public interface ProvenanceEventBuilder { * {@link ProvenanceEventRecord#getEventId()} on the * {@link ProvenanceEventRecord} that is returned will yield * -1. This is because the implementation of the Event may - * depend on the {@link ProvevenanceEventRepository} to generate the unique + * depend on the {@link ProvenanceEventRepository} to generate the unique * identifier. * * @return the event diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java index 51bb76f659e97..371bb171f41fb 100644 --- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRecord.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Map; +import java.util.Set; /** * Describes an event that happened to a FlowFile. @@ -34,6 +35,12 @@ public interface ProvenanceEventRecord { */ long getEventId(); + /** + * @return a unique ID for the "parent" Provenance Event, namely the one that came directly before this event + * for the given FlowFile. For source events such as CREATE, this value should be set to -1 + */ + Set getPreviousEventIds(); + /** * @return the time at which this Provenance Event was created, as the * number of milliseconds since epoch @@ -149,14 +156,14 @@ default boolean isRemotePortType() { /** * @return the UUID's of all Parent FlowFiles. This is applicable only when * the {@link ProvenanceEventType} is of type - * {@link ProvenanceEventType#SPAWN SPAWN} + * {@link ProvenanceEventType#JOIN JOIN} */ List getParentUuids(); /** * @return the UUID's of all Child FlowFiles. This is applicable only when * the {@link ProvenanceEventType} is of type - * {@link ProvenanceEventType#SPAWN SPAWN} + * {@link ProvenanceEventType#FORK FORK} */ List getChildUuids(); diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java index e9d3b14e072fd..5b228d09e1c34 100644 --- a/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/ProvenanceEventRepository.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.util.List; +import java.util.Set; /** * This Repository houses Provenance Events. The repository is responsible for @@ -93,5 +94,18 @@ public interface ProvenanceEventRepository { */ void close() throws IOException; + /** + * Returns the previous provenance event IDs for the given FlowFile + * @param flowFileUUID the UUID of the FlowFile + * @return the previous event IDs for the given FlowFile + */ + Set getPreviousEventIds(String flowFileUUID); + /** + * Updates the previous provenance event IDs for the given event + * + * @param record The record for which to update the previous event IDs + * @param previousIds the list of previous event IDs to set for the record, or null to remove + */ + void updatePreviousEventIds(ProvenanceEventRecord record, Set previousIds); } diff --git a/nifi-api/src/main/java/org/apache/nifi/provenance/UpdateableProvenanceEventRecord.java b/nifi-api/src/main/java/org/apache/nifi/provenance/UpdateableProvenanceEventRecord.java new file mode 100644 index 0000000000000..6b51d48893fbe --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/provenance/UpdateableProvenanceEventRecord.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.Set; + +public interface UpdateableProvenanceEventRecord extends ProvenanceEventRecord { + + void setEventId(final long eventId); + + void setPreviousEventIds(Set previousEventIds); +} \ No newline at end of file diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/PlaceholderProvenanceEvent.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/PlaceholderProvenanceEvent.java index 5644355b22855..9db0a7c762bd0 100644 --- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/PlaceholderProvenanceEvent.java +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/PlaceholderProvenanceEvent.java @@ -20,20 +20,23 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; /** * A Provenance Event that is used to replace another Provenance Event when authorizations * are not granted for the original Provenance Event */ -public class PlaceholderProvenanceEvent implements ProvenanceEventRecord { +public class PlaceholderProvenanceEvent implements UpdateableProvenanceEventRecord { private final String componentId; - private final long eventId; + private long eventId; + private Set previousEventIds; private final long eventTime; private final String flowFileUuid; public PlaceholderProvenanceEvent(final ProvenanceEventRecord original) { this.componentId = original.getComponentId(); this.eventId = original.getEventId(); + this.previousEventIds = original.getPreviousEventIds(); this.eventTime = original.getEventTime(); this.flowFileUuid = original.getFlowFileUuid(); } @@ -43,6 +46,20 @@ public long getEventId() { return eventId; } + @Override + public void setEventId(long eventId) { + this.eventId = eventId; + } + + @Override + public Set getPreviousEventIds() { + return previousEventIds; + } + + @Override + public void setPreviousEventIds(Set previousEventIds) { + this.previousEventIds = previousEventIds; + } @Override public long getEventTime() { return eventTime; diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFields.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFields.java index de62bca8132da..fbd60d17582d0 100644 --- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFields.java +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/SearchableFields.java @@ -57,6 +57,9 @@ public class SearchableFields { public static final SearchableField SourceQueueIdentifier = new NamedSearchableField("SourceQueueIdentifier", "sourceQueueIdentifier", "Source Queue Identifier", false, SearchableFieldType.STRING); + public static final SearchableField PreviousEventIdentifiers + = new NamedSearchableField("PreviousEventIdentifiers", "previousEventIdentifiers", "Previous Event Identifiers", false, SearchableFieldType.LONG); + private static final Map standardFields; static { @@ -64,7 +67,7 @@ public class SearchableFields { EventTime, FlowFileUUID, Filename, EventType, TransitURI, ComponentID, AlternateIdentifierURI, FileSize, Relationship, Details, LineageStartDate, LineageIdentifier, ContentClaimSection, ContentClaimContainer, ContentClaimIdentifier, - ContentClaimOffset, SourceQueueIdentifier}; + ContentClaimOffset, SourceQueueIdentifier, PreviousEventIdentifiers}; final Map fields = new HashMap<>(); for (final SearchableField field : searchableFields) { diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java index cc3306fca8e0b..c619653fe0261 100644 --- a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java @@ -26,11 +26,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; /** * Holder for provenance relevant information */ -public class StandardProvenanceEventRecord implements ProvenanceEventRecord { +public class StandardProvenanceEventRecord implements UpdateableProvenanceEventRecord { private final long eventTime; private final long entryDate; @@ -68,6 +69,7 @@ public class StandardProvenanceEventRecord implements ProvenanceEventRecord { private final Map updatedAttributes; private volatile long eventId = -1L; + private volatile Set previousEventIds; StandardProvenanceEventRecord(final Builder builder) { this.eventTime = builder.eventTime; @@ -108,6 +110,7 @@ public class StandardProvenanceEventRecord implements ProvenanceEventRecord { if (builder.eventId != null) { eventId = builder.eventId; } + previousEventIds = builder.previousEventIds; } public static StandardProvenanceEventRecord copy(StandardProvenanceEventRecord other) { @@ -123,7 +126,8 @@ public long getStorageByteOffset() { return storageByteOffset; } - void setEventId(final long eventId) { + @Override + public void setEventId(final long eventId) { this.eventId = eventId; } @@ -132,6 +136,14 @@ public long getEventId() { return eventId; } + @Override + public Set getPreviousEventIds() { + return previousEventIds; + } + + public void setPreviousEventIds(Set previousEventIds) { + this.previousEventIds = previousEventIds; + } @Override public long getEventTime() { return eventTime; @@ -316,11 +328,10 @@ public boolean equals(final Object obj) { if (obj == this) { return true; } - if (!(obj instanceof StandardProvenanceEventRecord)) { + if (!(obj instanceof StandardProvenanceEventRecord other)) { return false; } - final StandardProvenanceEventRecord other = (StandardProvenanceEventRecord) obj; // If event ID's are populated and not equal, return false. If they have not yet been populated, do not // use them in the comparison. if (eventId > 0L && other.getEventId() > 0L && eventId != other.getEventId()) { @@ -338,11 +349,7 @@ public boolean equals(final Object obj) { return false; } - if (different(childrenUuids, other.childrenUuids)) { - return false; - } - - // SPAWN had issues indicating which should be the event's FlowFileUUID in the case that there is 1 parent and 1 child. + // JOIN had issues indicating which should be the event's FlowFileUUID in the case that there is 1 parent and 1 child. if (!uuid.equals(other.uuid)) { return false; } @@ -374,15 +381,15 @@ private boolean different(final List a, final List b) { return false; } - if (a == null && b != null && !b.isEmpty()) { + if (a == null && !b.isEmpty()) { return true; } - if (a == null && b.isEmpty()) { + if (a == null) { return false; } - if (a != null && !a.isEmpty() && b == null) { + if (!a.isEmpty() && b == null) { return true; } @@ -413,11 +420,13 @@ private boolean different(final List a, final List b) { public String toString() { return "ProvenanceEventRecord [" + "eventId=" + eventId + + ", previousEventIds=" + previousEventIds + ", eventType=" + eventType + ", eventTime=" + new Date(eventTime) + ", uuid=" + uuid + ", fileSize=" + contentSize + ", componentId=" + componentId + + ", componentType=" + componentType + ", transitUri=" + transitUri + ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier + ", parentUuids=" + parentUuids @@ -461,6 +470,7 @@ public static class Builder implements ProvenanceEventBuilder { private long eventDuration = -1L; private String storageFilename; private Long eventId; + private Set previousEventIds; private String contentClaimSection; private String contentClaimContainer; @@ -519,6 +529,7 @@ public Builder fromEvent(final ProvenanceEventRecord event) { storageFilename = standardProvEvent.storageFilename; } + previousEventIds = event.getPreviousEventIds(); return this; } @@ -572,6 +583,10 @@ public ProvenanceEventBuilder copy() { copy.storageByteOffset = storageByteOffset; copy.storageFilename = storageFilename; + if (previousEventIds != null) { + copy.previousEventIds = previousEventIds; + } + return copy; } @@ -740,6 +755,11 @@ public Builder setDetails(String details) { return this; } + @Override + public ProvenanceEventBuilder setPreviousEventIds(Set previousEventIds) { + this.previousEventIds = previousEventIds; + return this; + } @Override public Builder setRelationship(Relationship relationship) { this.relationship = relationship.getName(); diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java index 3357b06a7f345..15762affc566e 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java @@ -157,12 +157,12 @@ private Object normalizeValue(final Object value) throws SQLException { return null; } - if (value instanceof List) { - return ((List) value).toArray(); + if (value instanceof Set) { + return ((Set) value).toArray(); } - if (value instanceof Array) { - return ((Array) value).getArray(); + if (value instanceof List) { + return ((List) value).toArray(); } return value; diff --git a/nifi-extension-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java b/nifi-extension-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java index 36d6a152c7a90..fbbe0be3f3bd2 100644 --- a/nifi-extension-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java +++ b/nifi-extension-bundles/nifi-extension-utils/nifi-database-utils/src/main/java/org/apache/nifi/util/db/JdbcCommon.java @@ -122,6 +122,8 @@ public class JdbcCommon { public static final Pattern LONG_PATTERN = Pattern.compile("^-?\\d{1,19}$"); public static final Pattern SQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("sql\\.args\\.(\\d+)\\.type"); + + public static final Pattern JDBC_ARRAY_ELEMENT_CLASS_TYPE_PATTERN = Pattern.compile("JavaType\\(class (.*)\\) ARRAY"); public static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); public static final String MIME_TYPE_AVRO_BINARY = "application/avro-binary"; @@ -663,11 +665,23 @@ public static Schema createSchema(final ResultSet rs, AvroConversionOptions opti case BINARY: case VARBINARY: case LONGVARBINARY: - case ARRAY: case BLOB: builder.name(columnName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault(); break; + case ARRAY: + // Parse array element type from column type name + String columnTypeName = meta.getColumnTypeName(i); + Matcher matcher = JDBC_ARRAY_ELEMENT_CLASS_TYPE_PATTERN.matcher(columnTypeName); + if (matcher.find()) { + String elementTypeName = matcher.group(1); + addArrayElementType(builder, columnName, elementTypeName); + } else { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and() + .array().items().unionOf().nullBuilder().endNull().and().bytesType().endUnion().endUnion().noDefault(); + } + break; + case -150: // SQLServer may return -150 from the driver even though it's really -156 (sql_variant), treat as a union since we don't know what the values will actually be case -156: builder.name(columnName).type().unionOf().nullBuilder().endNull().and().stringType().and().intType().and().longType().and().booleanType().and().bytesType().and() @@ -683,6 +697,42 @@ public static Schema createSchema(final ResultSet rs, AvroConversionOptions opti return builder.endRecord(); } + private static void addArrayElementType(final FieldAssembler builder, final String columnName, final String elementTypeName) { + // Use Java class name to add an array with element type to the builder + if ("java.lang.String".equals(elementTypeName)) { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and() + .array().items().unionOf().nullBuilder().endNull().and().stringType().endUnion() + .endUnion().noDefault(); + } else if ("java.lang.Integer".equals(elementTypeName)) { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and() + .array().items().unionOf().nullBuilder().endNull().and().intType().endUnion() + .endUnion().noDefault(); + } else if ("java.lang.Long".equals(elementTypeName)) { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and() + .array().items().unionOf().nullBuilder().endNull().and().longType().endUnion() + .endUnion().noDefault(); + } else if ("java.lang.Boolean".equals(elementTypeName)) { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and() + .array().items().unionOf().nullBuilder().endNull().and().booleanType().endUnion() + .endUnion().noDefault(); + } else if ("java.lang.Float".equals(elementTypeName)) { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and() + .array().items().unionOf().nullBuilder().endNull().and().floatType().endUnion() + .endUnion().noDefault(); + } else if ("java.lang.Double".equals(elementTypeName)) { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and() + .array().items().unionOf().nullBuilder().endNull().and().doubleType().endUnion() + .endUnion().noDefault(); + } else if ("java.lang.Byte".equals(elementTypeName)) { + builder.name(columnName).type().unionOf().nullBuilder().endNull().and() + .array().items().unionOf().nullBuilder().endNull().and().bytesType().endUnion() + .endUnion().noDefault(); + } else { + // default to bytes as before + builder.name(columnName).type().unionOf().nullBuilder().endNull().and().bytesType().endUnion().noDefault(); + } + } + public static String normalizeNameForAvro(String inputName) { String normalizedName = inputName.replaceAll("[^A-Za-z0-9_]", "_"); if (Character.isDigit(normalizedName.charAt(0))) { diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml index 81cb83ef8b858..ce910ba0be9aa 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/pom.xml @@ -51,5 +51,11 @@ lucene-backward-codecs runtime + + org.apache.nifi + nifi-framework-core-api + 2.0.0.4.0.0.0-5 + compile + diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java index a60dcf4b0ce68..a4b8a3918e038 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordWriter.java @@ -34,10 +34,13 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -61,6 +64,7 @@ public class EventIdFirstSchemaRecordWriter extends CompressableRecordWriter { private static final List eventTypeNames; private long firstEventId; + private final Map> previousEventIdsMap = new HashMap<>(); private long systemTimeOffset; static { @@ -98,6 +102,11 @@ public Map writeRecords(final Iterable previousEventIds = previousEventIdsMap.get(flowFileUUID); + switch (event.getEventType()) { + case CREATE: + case RECEIVE: + event.setPreviousEventIds(Collections.singleton(-1L)); + previousEventIdsMap.put(flowFileUUID, Collections.singleton(recordIdentifier)); + break; + case DROP: + case EXPIRE: + event.setPreviousEventIds(previousEventIds); + previousEventIdsMap.remove(flowFileUUID); + break; + case FORK: + case CLONE: + case REPLAY: + event.setPreviousEventIds(previousEventIds); + for (final String childUUID : event.getChildUuids()) { + // Add the child FlowFiles to the previous event ID map with this event's entry in the map + previousEventIdsMap.put(childUUID, Collections.singleton(recordIdentifier)); + } + break; + case JOIN: + List parents = event.getParentUuids(); + Set parentEventIds = new HashSet<>(parents.size()); + for (final String parentUUID : parents) { + parentEventIds.addAll(previousEventIdsMap.get(parentUUID)); + } + event.setPreviousEventIds(parentEventIds); + previousEventIdsMap.put(flowFileUUID, Collections.singleton(recordIdentifier)); + break; + + default: + event.setPreviousEventIds(previousEventIds); + previousEventIdsMap.put(flowFileUUID, Collections.singleton(recordIdentifier)); + break; + } + } } diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java index 583d84d64cd54..2d9d4d23baf9a 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java @@ -87,7 +87,7 @@ * across disks for far greater performance. *

*/ -public class WriteAheadProvenanceRepository implements ProvenanceRepository { +public class WriteAheadProvenanceRepository extends AbstractProvenanceRepository { private static final Logger logger = LoggerFactory.getLogger(WriteAheadProvenanceRepository.class); static final int BLOCK_SIZE = 1024 * 32; public static final String EVENT_CATEGORY = "Provenance Repository"; diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/ConvertEventToLuceneDocument.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/ConvertEventToLuceneDocument.java index 85e78f4575be3..26b635a1b2c48 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/ConvertEventToLuceneDocument.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/ConvertEventToLuceneDocument.java @@ -94,6 +94,15 @@ public Document convert(final ProvenanceEventRecord record, final long eventId) // be stored so that we know how to lookup the event in the store. doc.add(new UnIndexedLongField(SearchableFields.Identifier.getSearchableFieldName(), eventId)); + final Set previousEventIDs = record.getPreviousEventIds(); + if (previousEventIDs != null) { + for (Long previousEventID : previousEventIDs) { + doc.add(new StringField(SearchableFields.PreviousEventIdentifiers.getSearchableFieldName(), String.valueOf(previousEventID), Store.YES)); + } + } else { + doc.add(new StringField(SearchableFields.PreviousEventIdentifiers.getSearchableFieldName(), "-1", Store.YES)); + } + // If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs. final ProvenanceEventType eventType = record.getEventType(); if (eventType == ProvenanceEventType.FORK || eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.REPLAY) { @@ -141,7 +150,7 @@ private static class UnIndexedLongField extends Field { public UnIndexedLongField(String name, long value) { super(name, TYPE); - fieldsData = Long.valueOf(value); + fieldsData = value; } } } diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java index d6f50dda2e4eb..ba5fe2f4d1c0a 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventFieldNames.java @@ -19,6 +19,7 @@ public class EventFieldNames { public static final String EVENT_IDENTIFIER = "Event ID"; + public static final String PREVIOUS_EVENT_IDENTIFIERS = "Previous Event IDs"; public static final String EVENT_TYPE = "Event Type"; public static final String EVENT_TIME = "Event Time"; public static final String FLOWFILE_ENTRY_DATE = "FlowFile Entry Date"; diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java index 8c82b110fdec8..8671704a85d35 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java @@ -20,6 +20,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; @@ -111,6 +113,8 @@ public Object getFieldValue(final String fieldName) { return event.getTransitUri(); case EventFieldNames.UPDATED_ATTRIBUTES: return event.getUpdatedAttributes(); + case EventFieldNames.PREVIOUS_EVENT_IDENTIFIERS: + return event.getPreviousEventIds(); } return null; @@ -157,6 +161,11 @@ public static StandardProvenanceEventRecord getEvent(final Record record, final (Long) currentClaimRecord.getFieldValue(EventFieldNames.CONTENT_CLAIM_SIZE)); } + final Set previousEventIDs = (Set) record.getFieldValue(EventFieldNames.PREVIOUS_EVENT_IDENTIFIERS); + if (previousEventIDs != null) { + builder.setPreviousEventIds(previousEventIDs); + } + final Record previousClaimRecord = (Record) record.getFieldValue(EventFieldNames.PREVIOUS_CONTENT_CLAIM); if (previousClaimRecord != null) { builder.setPreviousContentClaim( @@ -177,7 +186,7 @@ private static Map truncateAttributes(final Map // Check if any attribute value exceeds the attribute length final boolean anyExceedsLength = attributes.values().stream() - .filter(value -> value != null) + .filter(Objects::nonNull) .anyMatch(value -> value.length() > maxAttributeLength); if (!anyExceedsLength) { diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java index 2fe16761d957c..99c7df580ab7f 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecord.java @@ -17,10 +17,13 @@ package org.apache.nifi.provenance.schema; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Supplier; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -224,6 +227,8 @@ public Object getFieldValue(final String fieldName) { return event.getUpdatedAttributes(); case EventFieldNames.FLOWFILE_UUID: return event.getAttribute(CoreAttributes.UUID.key()); + case EventFieldNames.PREVIOUS_EVENT_IDENTIFIERS: + return event.getPreviousEventIds(); } return null; @@ -353,6 +358,11 @@ public static StandardProvenanceEventRecord getEvent(final Record record, final } } + final Collection previousEventIDsRecordField = (Collection) record.getFieldValue(EventFieldNames.PREVIOUS_EVENT_IDENTIFIERS); + if (previousEventIDsRecordField != null) { + final Set previousEventIDs = new HashSet<>(previousEventIDsRecordField); + builder.setPreviousEventIds(previousEventIDs); + } return builder.build(); } diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java index 057763656dd37..d44b2d0bf5fb0 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventRecordFields.java @@ -33,6 +33,7 @@ public class LookupTableEventRecordFields { // General Event fields. public static final RecordField RECORD_IDENTIFIER_OFFSET = new SimpleRecordField(EventFieldNames.EVENT_IDENTIFIER, FieldType.INT, EXACTLY_ONE); + public static final RecordField PREVIOUS_EVENT_IDENTIFIERS = new SimpleRecordField(EventFieldNames.PREVIOUS_EVENT_IDENTIFIERS, FieldType.LONG, ZERO_OR_MORE); public static final RecordField EVENT_TYPE_ORDINAL = new SimpleRecordField(EventFieldNames.EVENT_TYPE, FieldType.INT, EXACTLY_ONE); public static final RecordField EVENT_TIME_OFFSET = new SimpleRecordField(EventFieldNames.EVENT_TIME, FieldType.INT, EXACTLY_ONE); public static final RecordField FLOWFILE_ENTRY_DATE_OFFSET = new SimpleRecordField(EventFieldNames.FLOWFILE_ENTRY_DATE, FieldType.INT, EXACTLY_ONE); diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java index d596c8e80061e..cda43d0856ae6 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/LookupTableEventSchema.java @@ -34,6 +34,7 @@ import static org.apache.nifi.provenance.schema.LookupTableEventRecordFields.PARENT_UUIDS; import static org.apache.nifi.provenance.schema.LookupTableEventRecordFields.PREVIOUS_ATTRIBUTES; import static org.apache.nifi.provenance.schema.LookupTableEventRecordFields.PREVIOUS_CONTENT_CLAIM; +import static org.apache.nifi.provenance.schema.LookupTableEventRecordFields.PREVIOUS_EVENT_IDENTIFIERS; import static org.apache.nifi.provenance.schema.LookupTableEventRecordFields.RECORD_IDENTIFIER_OFFSET; import static org.apache.nifi.provenance.schema.LookupTableEventRecordFields.RELATIONSHIP; import static org.apache.nifi.provenance.schema.LookupTableEventRecordFields.SOURCE_QUEUE_ID; @@ -64,6 +65,7 @@ private static RecordSchema buildSchemaV1(final boolean includeEventId) { fields.add(RECORD_IDENTIFIER_OFFSET); } + fields.add(PREVIOUS_EVENT_IDENTIFIERS); fields.add(EVENT_TYPE_ORDINAL); fields.add(EVENT_TIME_OFFSET); fields.add(FLOWFILE_ENTRY_DATE_OFFSET); diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java index 766278ae21ca0..7a6728fc6ce8d 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/util/StorageSummaryEvent.java @@ -19,11 +19,14 @@ import java.util.List; import java.util.Map; +import java.util.Set; + import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.UpdateableProvenanceEventRecord; import org.apache.nifi.provenance.serialization.StorageSummary; -public class StorageSummaryEvent implements ProvenanceEventRecord { +public class StorageSummaryEvent implements UpdateableProvenanceEventRecord { private final ProvenanceEventRecord event; private final StorageSummary storageSummary; @@ -37,6 +40,25 @@ public long getEventId() { return storageSummary.getEventId(); } + @Override + public void setEventId(long eventId) { + if (event instanceof UpdateableProvenanceEventRecord) { + ((UpdateableProvenanceEventRecord) event).setEventId(eventId); + } + } + + @Override + public Set getPreviousEventIds() { + return event.getPreviousEventIds(); + } + + @Override + public void setPreviousEventIds(Set previousEventIds) { + if (event instanceof UpdateableProvenanceEventRecord) { + ((UpdateableProvenanceEventRecord) event).setPreviousEventIds(previousEventIds); + } + } + @Override public long getEventTime() { return event.getEventTime(); diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java index 607ab97dc6501..10984600bd74e 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java @@ -63,7 +63,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; -public class VolatileProvenanceRepository implements ProvenanceRepository { +public class VolatileProvenanceRepository extends AbstractProvenanceRepository { // properties public static final String BUFFER_SIZE = "nifi.provenance.repository.buffer.size"; @@ -786,7 +786,7 @@ public void run() { } } - private static class IdEnrichedProvEvent implements ProvenanceEventRecord { + private static class IdEnrichedProvEvent implements UpdateableProvenanceEventRecord { private final ProvenanceEventRecord record; private final long id; @@ -801,6 +801,25 @@ public long getEventId() { return id; } + @Override + public void setEventId(long eventId) { + if (record instanceof UpdateableProvenanceEventRecord) { + ((UpdateableProvenanceEventRecord) record).setEventId(eventId); + } + } + + @Override + public Set getPreviousEventIds() { + return record.getPreviousEventIds(); + } + + @Override + public void setPreviousEventIds(Set previousEventIds) { + if (record instanceof UpdateableProvenanceEventRecord) { + ((UpdateableProvenanceEventRecord) record).setPreviousEventIds(previousEventIds); + } + } + @Override public long getEventTime() { return record.getEventTime(); diff --git a/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index 2b2b539aa65cd..e0be0e209ed4a 100644 --- a/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-extension-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -30,6 +30,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; @@ -384,6 +385,15 @@ private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectB addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri(), allowNullValues); addField(builder, "platform", platform, allowNullValues); addField(builder, "application", applicationName, allowNullValues); + List previousEventIdsList = null; + Set previousEventIds = event.getPreviousEventIds(); + if (previousEventIds != null) { + previousEventIdsList = new ArrayList<>(previousEventIds.size()); + for (Long previousEventId : previousEventIds) { + previousEventIdsList.add(String.valueOf(previousEventId)); + } + } + addField(builder, factory, "previousEventIds", previousEventIdsList, allowNullValues); return builder.build(); } diff --git a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProvenanceDataSource.java b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProvenanceDataSource.java index f9089d47df214..bfc66f3966639 100644 --- a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProvenanceDataSource.java +++ b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/datasources/ProvenanceDataSource.java @@ -38,6 +38,7 @@ public class ProvenanceDataSource implements ResettableDataSource { public static final NiFiTableSchema SCHEMA = new NiFiTableSchema(List.of( new ColumnSchema("eventId", long.class, false), + new ColumnSchema("previousEventIds", new ArrayType(ScalarType.LONG), true), new ColumnSchema("eventType", String.class, false), new ColumnSchema("timestampMillis", long.class, true), new ColumnSchema("durationMillis", long.class, true), @@ -118,6 +119,7 @@ private Object[] toArray(final ProvenanceEventRecord provenanceEvent) { final ArrayList rowList = new ArrayList<>(); rowList.add(provenanceEvent.getEventId()); + rowList.add(provenanceEvent.getPreviousEventIds().toArray(new Long[provenanceEvent.getPreviousEventIds().size()])); rowList.add(provenanceEvent.getEventType().name()); rowList.add(provenanceEvent.getEventTime()); rowList.add(provenanceEvent.getEventDuration()); diff --git a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java index d79e2473a2105..c334fd4017918 100644 --- a/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java +++ b/nifi-extension-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java @@ -62,7 +62,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; @@ -75,6 +74,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -539,11 +539,16 @@ void testProvenanceTable() throws InitializationException { assertEquals(1001, rows.size()); // Validate the first row Map row = rows.get(0); - assertEquals(24, row.size()); + assertEquals(25, row.size()); // Verify the first row contents final Long firstEventId = (Long) row.get("eventId"); assertEquals("CREATE", row.get("eventType")); assertEquals(12L, row.get("entitySize")); + final Object previousEventIdsObj = row.get("previousEventIds"); + assertNotNull(previousEventIdsObj); + Object[] previousEventIds = (Object[]) previousEventIdsObj; + assertEquals(1, previousEventIds.length); + assertEquals(10L, previousEventIds[0]); assertNull(row.get("contentPath")); assertNull(row.get("previousContentPath")); @@ -559,14 +564,14 @@ void testProvenanceTable() throws InitializationException { // Verify some fields in the second row row = rows.get(1); - assertEquals(24, row.size()); + assertEquals(25, row.size()); // Verify the second row contents assertEquals(firstEventId + 1, row.get("eventId")); assertEquals("DROP", row.get("eventType")); // Verify some fields in the last row row = rows.get(1000); - assertEquals(24, row.size()); + assertEquals(25, row.size()); // Verify the last row contents assertEquals(firstEventId + 1000L, row.get("eventId")); assertEquals("DROP", row.get("eventType")); @@ -700,6 +705,7 @@ private MockQueryNiFiReportingTask initTask(final Map>() { - @Override - public List answer(final InvocationOnMock invocation) throws Throwable { - final long startEventId = invocation.getArgument(0); - final int max = invocation.getArgument(1); - return mockProvenanceRepository.getEvents(startEventId, max); - } + Mockito.when(eventAccess.getProvenanceEvents(anyLong(), anyInt())).thenAnswer((Answer>) invocation -> { + final long startEventId = invocation.getArgument(0); + final int max = invocation.getArgument(1); + return mockProvenanceRepository.getEvents(startEventId, max); }); } catch (final IOException e) { // Won't happen @@ -794,6 +797,7 @@ public List findBulletinsForController() { private static class MockProvenanceRepository implements ProvenanceEventRepository { private final List events = new ArrayList<>(); + protected final Map> previousEventIdsMap = new HashMap<>(); @Override public ProvenanceEventBuilder eventBuilder() { @@ -837,5 +841,19 @@ public ProvenanceEventRecord getEvent(final long id) { @Override public void close() { } + + @Override + public Set getPreviousEventIds(String flowFileUUID) { + return previousEventIdsMap.get(flowFileUUID); + } + + @Override + public void updatePreviousEventIds(ProvenanceEventRecord record, Set previousIds) { + if (previousIds == null) { + previousEventIdsMap.remove(record.getFlowFileUuid()); + } else { + previousEventIdsMap.put(record.getFlowFileUuid(), previousIds); + } + } } } \ No newline at end of file diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/AbstractProvenanceRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/AbstractProvenanceRepository.java new file mode 100644 index 0000000000000..a3e05b0c915fd --- /dev/null +++ b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/AbstractProvenanceRepository.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public abstract class AbstractProvenanceRepository implements ProvenanceRepository { + + protected final Map> previousEventIdsMap = new HashMap<>(); + + @Override + public Set getPreviousEventIds(String flowFileUUID) { + return previousEventIdsMap.get(flowFileUUID); + } + + @Override + public void updatePreviousEventIds(ProvenanceEventRecord record, Set previousIds) { + if (previousIds == null) { + previousEventIdsMap.remove(record.getFlowFileUuid()); + } else { + previousEventIdsMap.put(record.getFlowFileUuid(), previousIds); + } + } +} \ No newline at end of file diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java index 3ac141368b27d..7f8f1a08b7d5d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java @@ -17,6 +17,7 @@ package org.apache.nifi.controller.repository; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.FlowFileHandlingException; import org.apache.nifi.provenance.InternalProvenanceReporter; @@ -29,6 +30,7 @@ import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.Set; import java.util.function.Predicate; @@ -115,7 +117,7 @@ public void receiveMigration(final Set events) { } /** - * Generates a Fork event for the given child and parents but does not register the event. This is useful so that a ProcessSession has the ability to de-dupe events, since one or more events may + * Generates a Join event for the given child and parents but does not register the event. This is useful so that a ProcessSession has the ability to de-dupe events, since one or more events may * be created by the session itself, as well as by the Processor * * @param parents parents @@ -127,16 +129,59 @@ public ProvenanceEventRecord generateJoinEvent(final Collection parent final ProvenanceEventBuilder eventBuilder = build(child, ProvenanceEventType.JOIN); eventBuilder.addChildFlowFile(child); + Set parentEventIds = new HashSet<>(parents.size()); for (final FlowFile parent : parents) { eventBuilder.addParentFlowFile(parent); + parentEventIds.addAll(repository.getPreviousEventIds(parent.getAttribute(CoreAttributes.UUID.key()))); } + eventBuilder.setPreviousEventIds(parentEventIds); - return eventBuilder.build(); + ProvenanceEventRecord record = eventBuilder.build(); + repository.updatePreviousEventIds(record, parentEventIds); + return record; } @Override public ProvenanceEventRecord generateDropEvent(final FlowFile flowFile, final String details) { - return build(flowFile, ProvenanceEventType.DROP).setDetails(details).build(); + final String flowFileUUID = flowFile.getAttribute(CoreAttributes.UUID.key()); + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.DROP) + .setDetails(details) + .setPreviousEventIds(repository.getPreviousEventIds(flowFileUUID)) + .build(); + repository.updatePreviousEventIds(record, null); + return record; + } + + @Override + public ProvenanceEventRecord generateModifyContentEvent(final FlowFile flowFile, final String details) { + final String flowFileUUID = flowFile.getAttribute(CoreAttributes.UUID.key()); + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CONTENT_MODIFIED) + .setDetails(details) + .setPreviousEventIds(repository.getPreviousEventIds(flowFileUUID)) + .build(); + repository.updatePreviousEventIds(record, Collections.singleton(record.getEventId())); + return record; + } + + @Override + public ProvenanceEventRecord generateModifyAttributesEvent(final FlowFile flowFile, final String details) { + final String flowFileUUID = flowFile.getAttribute(CoreAttributes.UUID.key()); + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED) + .setDetails(details) + .setPreviousEventIds(repository.getPreviousEventIds(flowFileUUID)) + .build(); + repository.updatePreviousEventIds(record, Collections.singleton(record.getEventId())); + return record; + } + + @Override + public ProvenanceEventRecord generateCreateEvent(final FlowFile flowFile, final String details) { + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CREATE) + .setDetails(details) + .setPreviousEventIds(Collections.emptySet()) + .build(); + repository.updatePreviousEventIds(record, Collections.singleton(record.getEventId())); + return record; } private void verifyFlowFileKnown(final FlowFile flowFile) { @@ -170,13 +215,15 @@ public void receive(final FlowFile flowFile, final String transitUri, final Stri verifyFlowFileKnown(flowFile); try { - final ProvenanceEventBuilder builder = build(flowFile, ProvenanceEventType.RECEIVE); - builder.setTransitUri(transitUri); - builder.setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier); - builder.setEventDuration(transmissionMillis); - builder.setDetails(details); - final ProvenanceEventRecord record = builder.build(); + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.RECEIVE) + .setTransitUri(transitUri) + .setSourceSystemFlowFileIdentifier(sourceSystemFlowFileIdentifier) + .setEventDuration(transmissionMillis) + .setDetails(details) + .setPreviousEventIds(Collections.singleton(-1L)) + .build(); events.add(record); + repository.updatePreviousEventIds(record, Collections.singleton(record.getEventId())); bytesReceived += flowFile.getSize(); flowFilesReceived++; @@ -200,13 +247,16 @@ public void fetch(final FlowFile flowFile, final String transitUri, final String verifyFlowFileKnown(flowFile); try { + final String flowFileUUID = flowFile.getAttribute(CoreAttributes.UUID.key()); final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.FETCH) .setTransitUri(transitUri) .setEventDuration(transmissionMillis) .setDetails(details) + .setPreviousEventIds(repository.getPreviousEventIds(flowFileUUID)) .build(); events.add(record); + repository.updatePreviousEventIds(record, Collections.singleton(record.getEventId())); bytesFetched += flowFile.getSize(); flowFilesFetched++; @@ -253,7 +303,12 @@ public void send(final FlowFile flowFile, final String transitUri, final String @Override public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis, final boolean force) { try { - final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build(); + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.SEND) + .setTransitUri(transitUri) + .setEventDuration(transmissionMillis) + .setDetails(details) + .setPreviousEventIds(repository.getPreviousEventIds(flowFile.getAttribute(CoreAttributes.UUID.key()))) + .build(); // If the transmissionMillis field has been populated, use zero as the value of commitNanos (the call to System.nanoTime() is expensive but the value will be ignored). final long commitNanos = transmissionMillis < 0 ? System.nanoTime() : 0L; final ProvenanceEventRecord enriched = eventEnricher == null ? record : eventEnricher.enrich(record, flowFile, commitNanos); @@ -263,6 +318,7 @@ public void send(final FlowFile flowFile, final String transitUri, final String } else { events.add(enriched); } + repository.updatePreviousEventIds(enriched, enriched.getPreviousEventIds()); bytesSent += flowFile.getSize(); flowFilesSent++; @@ -311,7 +367,10 @@ public void invokeRemoteProcess(final FlowFile flowFile, final String transitUri public void invokeRemoteProcess(FlowFile flowFile, String transitUri, String details) { try { final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.REMOTE_INVOCATION) - .setTransitUri(transitUri).setDetails(details).build(); + .setTransitUri(transitUri) + .setDetails(details) + .setPreviousEventIds(repository.getPreviousEventIds(flowFile.getAttribute(CoreAttributes.UUID.key()))) + .build(); events.add(record); } catch (final Exception e) { logger.error("Failed to generate Provenance Event", e); @@ -335,8 +394,12 @@ public void associate(final FlowFile flowFile, final String alternateIdentifierN } final String alternateIdentifierUri = trimmedNamespace + ":" + trimmedIdentifier; - final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ADDINFO).setAlternateIdentifierUri(alternateIdentifierUri).build(); + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ADDINFO) + .setAlternateIdentifierUri(alternateIdentifierUri) + .setPreviousEventIds(repository.getPreviousEventIds(flowFile.getAttribute(CoreAttributes.UUID.key()))) + .build(); events.add(record); + repository.updatePreviousEventIds(record, Collections.singleton(record.getEventId())); } catch (final Exception e) { logger.error("Failed to generate Provenance Event", e); } @@ -349,8 +412,11 @@ public ProvenanceEventRecord drop(final FlowFile flowFile, final String reason) if (reason != null) { builder.setDetails("Discard reason: " + reason); } + builder.setPreviousEventIds(repository.getPreviousEventIds(flowFile.getAttribute(CoreAttributes.UUID.key()))); + final ProvenanceEventRecord record = builder.build(); events.add(record); + repository.updatePreviousEventIds(record, null); return record; } catch (final Exception e) { logger.error("Failed to generate Provenance Event", e); @@ -361,8 +427,12 @@ public ProvenanceEventRecord drop(final FlowFile flowFile, final String reason) @Override public void expire(final FlowFile flowFile, final String details) { try { - final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.EXPIRE).setDetails(details).build(); + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.EXPIRE) + .setDetails(details) + .setPreviousEventIds(repository.getPreviousEventIds(flowFile.getAttribute(CoreAttributes.UUID.key()))) + .build(); events.add(record); + repository.updatePreviousEventIds(record, null); } catch (final Exception e) { logger.error("Failed to generate Provenance Event", e); } @@ -403,6 +473,15 @@ public void fork(final FlowFile parent, final Collection children, fin } events.add(eventBuilder.build()); + final ProvenanceEventRecord record = eventBuilder.build(); + events.add(record); + // TODO is this right? + for (final FlowFile child : children) { + // Add the child FlowFiles to the previous event ID map with the parent's entry in the map + repository.updatePreviousEventIds( + record, + repository.getPreviousEventIds(child.getAttribute(CoreAttributes.UUID.key()))); + } } catch (final Exception e) { logger.error("Failed to generate Provenance Event", e); } @@ -431,12 +510,20 @@ public void join(final Collection parents, final FlowFile child, final final ProvenanceEventBuilder eventBuilder = build(child, ProvenanceEventType.JOIN); eventBuilder.addChildFlowFile(child); eventBuilder.setDetails(details); + Set parentEventIds = new HashSet<>(parents.size()); for (final FlowFile parent : parents) { eventBuilder.addParentFlowFile(parent); + Set previousEventIds = repository.getPreviousEventIds(parent.getAttribute(CoreAttributes.UUID.key())); + if (previousEventIds != null) { + parentEventIds.addAll(previousEventIds); + } } + eventBuilder.setPreviousEventIds(parentEventIds); - events.add(eventBuilder.build()); + final ProvenanceEventRecord record = eventBuilder.build(); + events.add(record); + repository.updatePreviousEventIds(record, parentEventIds); } catch (final Exception e) { logger.error("Failed to generate Provenance Event", e); } @@ -455,9 +542,13 @@ public void clone(final FlowFile parent, final FlowFile child, final boolean ver try { final ProvenanceEventBuilder eventBuilder = build(parent, ProvenanceEventType.CLONE); - eventBuilder.addChildFlowFile(child); - eventBuilder.addParentFlowFile(parent); - events.add(eventBuilder.build()); + final ProvenanceEventRecord event = eventBuilder + .addChildFlowFile(child) + .addParentFlowFile(parent) + .setPreviousEventIds(repository.getPreviousEventIds(parent.getAttribute(CoreAttributes.UUID.key()))) + .build(); + events.add(event); + repository.updatePreviousEventIds(event, Collections.singleton(event.getEventId())); } catch (final Exception e) { logger.error("Failed to generate Provenance Event", e); } @@ -483,8 +574,13 @@ public void modifyContent(final FlowFile flowFile, final String details, final l verifyFlowFileKnown(flowFile); try { - final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CONTENT_MODIFIED).setEventDuration(processingMillis).setDetails(details).build(); + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CONTENT_MODIFIED) + .setEventDuration(processingMillis) + .setDetails(details) + .setPreviousEventIds(repository.getPreviousEventIds(flowFile.getAttribute(CoreAttributes.UUID.key()))) + .build(); events.add(record); + repository.updatePreviousEventIds(record, Collections.singleton(record.getEventId())); } catch (final Exception e) { logger.error("Failed to generate Provenance Event", e); } @@ -500,8 +596,12 @@ public void modifyAttributes(final FlowFile flowFile, final String details) { verifyFlowFileKnown(flowFile); try { - final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED).setDetails(details).build(); + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ATTRIBUTES_MODIFIED) + .setDetails(details) + .setPreviousEventIds(repository.getPreviousEventIds(flowFile.getAttribute(CoreAttributes.UUID.key()))) + .build(); events.add(record); + repository.updatePreviousEventIds(record, Collections.singleton(record.getEventId())); } catch (final Exception e) { logger.error("Failed to generate Provenance Event", e); } @@ -527,8 +627,13 @@ public void route(final FlowFile flowFile, final Relationship relationship, fina verifyFlowFileKnown(flowFile); try { - final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ROUTE).setRelationship(relationship).setDetails(details).setEventDuration(processingDuration).build(); + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.ROUTE).setRelationship(relationship) + .setDetails(details) + .setEventDuration(processingDuration) + .setPreviousEventIds(repository.getPreviousEventIds(flowFile.getAttribute(CoreAttributes.UUID.key()))) + .build(); events.add(record); + repository.updatePreviousEventIds(record, Collections.singleton(record.getEventId())); } catch (final Exception e) { logger.error("Failed to generate Provenance Event", e); } @@ -544,7 +649,10 @@ public void create(final FlowFile flowFile, final String details) { verifyFlowFileKnown(flowFile); try { - final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CREATE).setDetails(details).build(); + final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.CREATE) + .setDetails(details) + .setPreviousEventIds(Collections.emptySet()) + .build(); events.add(record); } catch (final Exception e) { logger.error("Failed to generate Provenance Event", e); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/provenance/InternalProvenanceReporter.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/provenance/InternalProvenanceReporter.java index 6570865ae2a4a..24b1d73e923bf 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/provenance/InternalProvenanceReporter.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/provenance/InternalProvenanceReporter.java @@ -33,6 +33,12 @@ public interface InternalProvenanceReporter extends ProvenanceReporter { ProvenanceEventRecord generateJoinEvent(Collection parents, FlowFile child); + ProvenanceEventRecord generateModifyContentEvent(FlowFile flowFile, String explanation); + + ProvenanceEventRecord generateCreateEvent(FlowFile flowFile, String explanation); + + ProvenanceEventRecord generateModifyAttributesEvent(FlowFile flowFile, String explanation); + void remove(ProvenanceEventRecord event); void removeEventsForFlowFile(String uuid); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java index 56ba6391389c4..4574ffa76f713 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/AbstractFlowFileQueue.java @@ -423,6 +423,7 @@ private ProvenanceEventRecord createDropProvenanceEvent(final FlowFileRecord flo builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), contentClaim.getOffset(), flowFile.getSize()); } + builder.setPreviousEventIds(provRepository.getPreviousEventIds(flowFile.getAttribute(CoreAttributes.UUID.key()))); return builder.build(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/provenance/MockProvenanceRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/provenance/MockProvenanceRepository.java index 8eeccacebd0f5..bb1f59b1d5399 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/provenance/MockProvenanceRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/provenance/MockProvenanceRepository.java @@ -32,7 +32,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -public class MockProvenanceRepository implements ProvenanceRepository { +public class MockProvenanceRepository extends AbstractProvenanceRepository { private final List records = new ArrayList<>(); private final AtomicLong idGenerator = new AtomicLong(0L); diff --git a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEvent.java b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEvent.java index 31366c3d658d4..c4f048b148d86 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEvent.java +++ b/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEvent.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; public class MockProvenanceEvent implements ProvenanceEventRecord { @@ -64,6 +65,7 @@ public class MockProvenanceEvent implements ProvenanceEventRecord { private final Map updatedAttributes; private volatile long eventId = -1L; + private volatile Set previousEventIds; private MockProvenanceEvent(final Builder builder) { this.eventTime = builder.eventTime; @@ -102,6 +104,8 @@ private MockProvenanceEvent(final Builder builder) { if (builder.eventId != null) { eventId = builder.eventId; } + + previousEventIds = builder.previousEventIds; } @@ -114,6 +118,15 @@ public long getEventId() { return eventId; } + @Override + public Set getPreviousEventIds() { + return previousEventIds; + } + + public void setPreviousEventIds(Set previousEventIds) { + this.previousEventIds = previousEventIds; + } + @Override public long getEventTime() { return eventTime; @@ -440,6 +453,7 @@ public static class Builder implements ProvenanceEventBuilder { private String relationship = null; private long eventDuration = -1L; private Long eventId = eventIdGenerator.getAndIncrement(); + private Set previousEventIds; private String contentClaimSection; private String contentClaimContainer; @@ -492,6 +506,8 @@ public Builder fromEvent(final ProvenanceEventRecord event) { sourceQueueIdentifier = event.getSourceQueueIdentifier(); + previousEventIds = event.getPreviousEventIds(); + return this; } @@ -543,6 +559,10 @@ public ProvenanceEventBuilder copy() { copy.sourceQueueIdentifier = sourceQueueIdentifier; + if (previousEventIds != null) { + copy.previousEventIds = previousEventIds; + } + return copy; } @@ -675,6 +695,12 @@ public Builder setDetails(String details) { return this; } + @Override + public ProvenanceEventBuilder setPreviousEventIds(Set previousEventIds) { + this.previousEventIds = previousEventIds; + return this; + } + @Override public Builder setRelationship(Relationship relationship) { this.relationship = relationship.getName(); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java index cb084d76d76c6..69bd7cc51e66e 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java @@ -19,6 +19,7 @@ import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.AbstractProvenanceRepository; import org.apache.nifi.provenance.AsyncLineageSubmission; import org.apache.nifi.provenance.IdentifierLookup; import org.apache.nifi.provenance.ProvenanceAuthorizableFactory; @@ -26,8 +27,8 @@ import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; -import org.apache.nifi.provenance.ProvenanceRepository; import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.provenance.UpdateableProvenanceEventRecord; import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.search.QuerySubmission; @@ -41,7 +42,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -public class StatelessProvenanceRepository implements ProvenanceRepository { +public class StatelessProvenanceRepository extends AbstractProvenanceRepository { public static String CONTAINER_NAME = "in-memory"; @@ -106,23 +107,13 @@ public Long getMaxEventId() { } public ProvenanceEventRecord getEvent(final String identifier) throws IOException { - final List records = ringBuffer.getSelectedElements(new RingBuffer.Filter() { - @Override - public boolean select(final ProvenanceEventRecord event) { - return identifier.equals(event.getFlowFileUuid()); - } - }, 1); + final List records = ringBuffer.getSelectedElements(event -> identifier.equals(event.getFlowFileUuid()), 1); return records.isEmpty() ? null : records.get(0); } @Override public ProvenanceEventRecord getEvent(final long id) { - final List records = ringBuffer.getSelectedElements(new RingBuffer.Filter() { - @Override - public boolean select(final ProvenanceEventRecord event) { - return event.getEventId() == id; - } - }, 1); + final List records = ringBuffer.getSelectedElements(event -> event.getEventId() == id, 1); return records.isEmpty() ? null : records.get(0); } @@ -207,7 +198,7 @@ public String getContainerFileStoreName(String containerName) { return null; } - private static class IdEnrichedProvEvent implements ProvenanceEventRecord { + private static class IdEnrichedProvEvent implements UpdateableProvenanceEventRecord { private final ProvenanceEventRecord record; private final long id; @@ -222,6 +213,25 @@ public long getEventId() { return id; } + @Override + public void setEventId(long eventId) { + if (record instanceof UpdateableProvenanceEventRecord) { + ((UpdateableProvenanceEventRecord) record).setEventId(eventId); + } + } + + @Override + public Set getPreviousEventIds() { + return record.getPreviousEventIds(); + } + + @Override + public void setPreviousEventIds(Set previousEventIds) { + if (record instanceof UpdateableProvenanceEventRecord) { + ((UpdateableProvenanceEventRecord) record).setPreviousEventIds(previousEventIds); + } + } + @Override public long getEventTime() { return record.getEventTime();