diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java index d920cb1085a9..2323010421ca 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java @@ -47,7 +47,8 @@ @WritesAttribute(attribute = "ftp.remote.port", description = "The port that was used to communicate with the remote FTP server"), @WritesAttribute(attribute = "ftp.remote.filename", description = "The name of the remote file that was pulled"), @WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"), - @WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute") + @WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute"), + @WritesAttribute(attribute = "fetch.failure.reason", description = "The name of the failure relationship applied when routing to any failure relationship") }) @MultiProcessorUseCase( description = "Retrieve all files in a directory of an FTP Server", diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java index 227904282d95..1e8efba2c06a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFileTransfer.java @@ -61,7 +61,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor { static final AllowableValue COMPLETION_NONE = new AllowableValue("None", "None", "Leave the file as-is"); static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Move the file to the directory specified by the property"); static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the remote system"); - + static final String FAILURE_REASON_ATTRIBUTE = "fetch.failure.reason"; static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() .name("Hostname") @@ -254,36 +254,33 @@ public void onTrigger(final ProcessContext context, final ProcessSession session transfer = transferWrapper.getFileTransfer(); } + Relationship failureRelationship = null; + boolean closeConnOnFailure = false; + try { // Pull data from remote system. try { flowFile = transfer.getRemoteFile(filename, flowFile, session); - } catch (final FileNotFoundException e) { + failureRelationship = REL_NOT_FOUND; getLogger().log(levelFileNotFound, "Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}", - flowFile, filename, host, REL_NOT_FOUND.getName()); - session.transfer(session.penalize(flowFile), REL_NOT_FOUND); - session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND); - cleanupTransfer(transfer, false, transferQueue, host, port); - return; + flowFile, filename, host, failureRelationship.getName()); } catch (final PermissionDeniedException e) { + failureRelationship = REL_PERMISSION_DENIED; getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}", - flowFile, filename, host, REL_PERMISSION_DENIED.getName()); - session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED); - session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED); - cleanupTransfer(transfer, false, transferQueue, host, port); - return; + flowFile, filename, host, failureRelationship.getName()); } catch (final ProcessException | IOException e) { - getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure", - flowFile, filename, host, port, e.toString(), e); - session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE); - cleanupTransfer(transfer, true, transferQueue, host, port); - return; + failureRelationship = REL_COMMS_FAILURE; + getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to {}", + flowFile, filename, host, port, e.toString(), failureRelationship.getName(), e); + + closeConnOnFailure = true; } // Add FlowFile attributes - final String protocolName = transfer.getProtocolName(); final Map attributes = new HashMap<>(); + final String protocolName = transfer.getProtocolName(); + attributes.put(protocolName + ".remote.host", host); attributes.put(protocolName + ".remote.port", String.valueOf(port)); attributes.put(protocolName + ".remote.filename", filename); @@ -296,6 +293,16 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } else { attributes.put(CoreAttributes.FILENAME.key(), filename); } + + if (failureRelationship != null) { + attributes.put(FAILURE_REASON_ATTRIBUTE, failureRelationship.getName()); + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(session.penalize(flowFile), failureRelationship); + session.getProvenanceReporter().route(flowFile, failureRelationship); + cleanupTransfer(transfer, closeConnOnFailure, transferQueue, host, port); + return; + } + flowFile = session.putAllAttributes(flowFile, attributes); // emit provenance event and transfer FlowFile diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java index 9fde9114e4c6..e35c45cfd32c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchSFTP.java @@ -47,7 +47,8 @@ @WritesAttribute(attribute = "sftp.remote.port", description = "The port that was used to communicate with the remote SFTP server"), @WritesAttribute(attribute = "sftp.remote.filename", description = "The name of the remote file that was pulled"), @WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"), - @WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute") + @WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute"), + @WritesAttribute(attribute = "fetch.failure.reason", description = "The name of the failure relationship applied when routing to any failure relationship") }) @MultiProcessorUseCase( description = "Retrieve all files in a directory of an SFTP Server", diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java index 97b6a8e7377a..6695bb50a7d3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFTP.java @@ -269,6 +269,7 @@ public void testFetchFtpFileNotFound() { runner.run(); runner.assertAllFlowFilesTransferred(FetchFTP.REL_NOT_FOUND); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); } @Test @@ -290,6 +291,7 @@ public void testFetchFtpFilePermissionDenied() { runner.run(); runner.assertAllFlowFilesTransferred(FetchFTP.REL_PERMISSION_DENIED); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); } @Test diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java index 5f636880ec62..4fcdf7a21ac7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFTP.java @@ -113,6 +113,9 @@ public void testContentNotFound() { runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 1); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); + MockFlowFile transferredFlowFile = runner.getPenalizedFlowFiles().get(0); + assertEquals(FetchFileTransfer.REL_NOT_FOUND.getName(), transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE)); } @Test @@ -122,6 +125,9 @@ public void testInsufficientPermissions() { runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); + MockFlowFile transferredFlowFile = runner.getPenalizedFlowFiles().get(0); + assertEquals(FetchFileTransfer.REL_PERMISSION_DENIED.getName(), transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE)); } @Test @@ -132,6 +138,7 @@ public void testInsufficientPermissionsDoesNotCloseConnection() { runner.run(2, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 2); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); assertEquals(1, proc.numberOfFileTransfers); assertFalse(proc.isClosed); @@ -145,6 +152,7 @@ public void testFileNotFoundDoesNotCloseConnection() { runner.run(2, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 2); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); assertEquals(1, proc.numberOfFileTransfers); assertFalse(proc.isClosed); @@ -157,6 +165,9 @@ public void testCommunicationFailureClosesConnection() { runner.run(1, false, false); runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_COMMS_FAILURE, 1); + runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE); + MockFlowFile transferredFlowFile = runner.getPenalizedFlowFiles().get(0); + assertEquals(FetchFileTransfer.REL_COMMS_FAILURE.getName(), transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE)); assertTrue(proc.isClosed); }