Skip to content

Commit

Permalink
NIFI-12249 FetchFTP and FetchSFTP set fetch.failure.reason on failures
Browse files Browse the repository at this point in the history
- Set fetch.failure.reason to relationship name when routing to failure relationships

This closes apache#7929

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
annanys23 authored and exceptionfactory committed Oct 28, 2023
1 parent ae14738 commit 90498a3
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
@WritesAttribute(attribute = "ftp.remote.port", description = "The port that was used to communicate with the remote FTP server"),
@WritesAttribute(attribute = "ftp.remote.filename", description = "The name of the remote file that was pulled"),
@WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"),
@WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute")
@WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute"),
@WritesAttribute(attribute = "fetch.failure.reason", description = "The name of the failure relationship applied when routing to any failure relationship")
})
@MultiProcessorUseCase(
description = "Retrieve all files in a directory of an FTP Server",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public abstract class FetchFileTransfer extends AbstractProcessor {
static final AllowableValue COMPLETION_NONE = new AllowableValue("None", "None", "Leave the file as-is");
static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Move the file to the directory specified by the <Move Destination Directory> property");
static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the remote system");

static final String FAILURE_REASON_ATTRIBUTE = "fetch.failure.reason";

static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
Expand Down Expand Up @@ -254,36 +254,33 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
transfer = transferWrapper.getFileTransfer();
}

Relationship failureRelationship = null;
boolean closeConnOnFailure = false;

try {
// Pull data from remote system.
try {
flowFile = transfer.getRemoteFile(filename, flowFile, session);

} catch (final FileNotFoundException e) {
failureRelationship = REL_NOT_FOUND;
getLogger().log(levelFileNotFound, "Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}",
flowFile, filename, host, REL_NOT_FOUND.getName());
session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
cleanupTransfer(transfer, false, transferQueue, host, port);
return;
flowFile, filename, host, failureRelationship.getName());
} catch (final PermissionDeniedException e) {
failureRelationship = REL_PERMISSION_DENIED;
getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}",
flowFile, filename, host, REL_PERMISSION_DENIED.getName());
session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED);
session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED);
cleanupTransfer(transfer, false, transferQueue, host, port);
return;
flowFile, filename, host, failureRelationship.getName());
} catch (final ProcessException | IOException e) {
getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure",
flowFile, filename, host, port, e.toString(), e);
session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE);
cleanupTransfer(transfer, true, transferQueue, host, port);
return;
failureRelationship = REL_COMMS_FAILURE;
getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to {}",
flowFile, filename, host, port, e.toString(), failureRelationship.getName(), e);

closeConnOnFailure = true;
}

// Add FlowFile attributes
final String protocolName = transfer.getProtocolName();
final Map<String, String> attributes = new HashMap<>();
final String protocolName = transfer.getProtocolName();

attributes.put(protocolName + ".remote.host", host);
attributes.put(protocolName + ".remote.port", String.valueOf(port));
attributes.put(protocolName + ".remote.filename", filename);
Expand All @@ -296,6 +293,16 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
} else {
attributes.put(CoreAttributes.FILENAME.key(), filename);
}

if (failureRelationship != null) {
attributes.put(FAILURE_REASON_ATTRIBUTE, failureRelationship.getName());
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(session.penalize(flowFile), failureRelationship);
session.getProvenanceReporter().route(flowFile, failureRelationship);
cleanupTransfer(transfer, closeConnOnFailure, transferQueue, host, port);
return;
}

flowFile = session.putAllAttributes(flowFile, attributes);

// emit provenance event and transfer FlowFile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
@WritesAttribute(attribute = "sftp.remote.port", description = "The port that was used to communicate with the remote SFTP server"),
@WritesAttribute(attribute = "sftp.remote.filename", description = "The name of the remote file that was pulled"),
@WritesAttribute(attribute = "filename", description = "The filename is updated to point to the filename fo the remote file"),
@WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute")
@WritesAttribute(attribute = "path", description = "If the Remote File contains a directory name, that directory name will be added to the FlowFile using the 'path' attribute"),
@WritesAttribute(attribute = "fetch.failure.reason", description = "The name of the failure relationship applied when routing to any failure relationship")
})
@MultiProcessorUseCase(
description = "Retrieve all files in a directory of an SFTP Server",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ public void testFetchFtpFileNotFound() {
runner.run();

runner.assertAllFlowFilesTransferred(FetchFTP.REL_NOT_FOUND);
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
}

@Test
Expand All @@ -290,6 +291,7 @@ public void testFetchFtpFilePermissionDenied() {
runner.run();

runner.assertAllFlowFilesTransferred(FetchFTP.REL_PERMISSION_DENIED);
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ public void testContentNotFound() {

runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 1);
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
MockFlowFile transferredFlowFile = runner.getPenalizedFlowFiles().get(0);
assertEquals(FetchFileTransfer.REL_NOT_FOUND.getName(), transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE));
}

@Test
Expand All @@ -122,6 +125,9 @@ public void testInsufficientPermissions() {

runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 1);
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
MockFlowFile transferredFlowFile = runner.getPenalizedFlowFiles().get(0);
assertEquals(FetchFileTransfer.REL_PERMISSION_DENIED.getName(), transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE));
}

@Test
Expand All @@ -132,6 +138,7 @@ public void testInsufficientPermissionsDoesNotCloseConnection() {

runner.run(2, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_PERMISSION_DENIED, 2);
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);

assertEquals(1, proc.numberOfFileTransfers);
assertFalse(proc.isClosed);
Expand All @@ -145,6 +152,7 @@ public void testFileNotFoundDoesNotCloseConnection() {

runner.run(2, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_NOT_FOUND, 2);
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);

assertEquals(1, proc.numberOfFileTransfers);
assertFalse(proc.isClosed);
Expand All @@ -157,6 +165,9 @@ public void testCommunicationFailureClosesConnection() {

runner.run(1, false, false);
runner.assertAllFlowFilesTransferred(FetchFileTransfer.REL_COMMS_FAILURE, 1);
runner.assertAllFlowFilesContainAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE);
MockFlowFile transferredFlowFile = runner.getPenalizedFlowFiles().get(0);
assertEquals(FetchFileTransfer.REL_COMMS_FAILURE.getName(), transferredFlowFile.getAttribute(FetchFileTransfer.FAILURE_REASON_ATTRIBUTE));

assertTrue(proc.isClosed);
}
Expand Down

0 comments on commit 90498a3

Please sign in to comment.