Skip to content

Commit

Permalink
NIFI-13604 Python Source processors can be triggered without creating…
Browse files Browse the repository at this point in the history
… new FlowFiles

Closes #9159

Signed-off-by: Marton Szasz <szaszm@apache.org>
  • Loading branch information
pgyori authored and szaszm committed Aug 8, 2024
1 parent 9cd319e commit 4f262dc
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 0 deletions.
2 changes: 2 additions & 0 deletions nifi-docs/src/main/asciidoc/python-developer-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ Each processor based on the `FlowFileSource` API has a `success` relationship an
created in the Processor's Python code. `attributes` and `contents` are both optional. If `attributes` is not provided,
the FlowFile will still have the usual `filename`, `path` and `uuid` attributes, but no additional ones.
If `contents` is not provided, a FlowFile with no contents (only attributes) will be created.
In case there is no useful information to return from the `create` method, `return None` can be used instead of returning an
empty `FlowFileSourceResult`. When `create` returns with `None`, the processor does not produce any output.



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final FlowFileSourceResult result;
try {
result = getTransform().createFlowFile();
if (result == null) {
return;
}
} catch (final Py4JNetworkException e) {
throw new ProcessException("Failed to communicate with Python Process", e);
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,16 @@ public void testCreateFlowFile() throws IOException {
multiLineContent.getBytes(StandardCharsets.UTF_8));
}

@Test
public void testCreateNothing() {
// Test the use-case where the source processor returns with None.
final TestRunner runner = createProcessor("CreateNothing");
waitForValid(runner);
runner.run();

runner.assertTransferCount("success", 0);
}

public interface StringLookupService extends ControllerService {
Optional<String> lookup(Map<String, String> coordinates);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from nifiapi.flowfilesource import FlowFileSource

class CreateNothing(FlowFileSource):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileSource']

class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''A Python processor for testing a use-case where the Source processor
does not create any output.'''
tags = ['test', 'python', 'source']

def __init__(self, **kwargs):
pass

def create(self, context):
return None

0 comments on commit 4f262dc

Please sign in to comment.