From a46024b7a2293ff9c6f21703aafa7fc1d19e334c Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi Date: Mon, 16 Sep 2024 15:40:27 +0200 Subject: [PATCH] Add integration test for recordtransform partitioning --- .../system/python/PythonProcessorIT.java | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonProcessorIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonProcessorIT.java index 9e9064ea113b..a94c312cebc6 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonProcessorIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/python/PythonProcessorIT.java @@ -171,6 +171,91 @@ public void testRecordTransform() throws NiFiClientException, IOException, Inter assertEquals("HELLO", secondRecordValues.get( headerIndices.get("greeting") )); } + @Test + public void testRecordTransformPartitioning() throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + final ProcessorEntity setRecordField = getClientUtil().createPythonProcessor("SetRecordField"); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + + // Add Reader and Writer + final ControllerServiceEntity csvReader = getClientUtil().createControllerService("MockCSVReader"); + final ControllerServiceEntity csvWriter = getClientUtil().createControllerService("MockCSVWriter"); + + getClientUtil().enableControllerService(csvReader); + getClientUtil().enableControllerService(csvWriter); + + // Configure the SetRecordField property + final Map fieldMap = new HashMap<>(); + fieldMap.put("Record Reader", csvReader.getId()); + fieldMap.put("Record Writer", csvWriter.getId()); + getClientUtil().updateProcessorProperties(setRecordField, fieldMap); + getClientUtil().setAutoTerminatedRelationships(setRecordField, new HashSet<>(Arrays.asList("original", "failure"))); + + // Set contents of GenerateFlowFile + getClientUtil().updateProcessorProperties(generate, + Collections.singletonMap("Text", "name, group\nJane Doe, default\nJake Doe, other")); + + // Connect flow + getClientUtil().createConnection(generate, setRecordField, "success"); + final ConnectionEntity outputConnection = getClientUtil().createConnection(setRecordField, terminate, "success"); + + // Wait for processor validation to complete + getClientUtil().waitForValidProcessor(generate.getId()); + getClientUtil().waitForValidProcessor(setRecordField.getId()); + + // Run the flow + getClientUtil().startProcessor(generate); + getClientUtil().startProcessor(setRecordField); + + // Wait for output data + waitForQueueCount(outputConnection.getId(), 2); + + // Verify output contents. We don't know the order that the fields will be in, but we know that we should get back 6 fields per record: name, age, color, sport, greeting, group. + final String ff1Contents = getClientUtil().getFlowFileContentAsUtf8(outputConnection.getId(), 0); + final String[] ff1Lines = ff1Contents.split("\n"); + final String ff1HeaderLine = ff1Lines[0]; + final List ff1Headers = Stream.of(ff1HeaderLine.split(",")) + .map(String::trim) + .toList(); + assertTrue(ff1Headers.contains("name")); + assertTrue(ff1Headers.contains("group")); + + final Map ff1HeaderIndices = new HashMap<>(); + int index = 0; + for (final String header : ff1Headers) { + ff1HeaderIndices.put(header, index++); + } + + final String firstRecordLine = ff1Lines[1]; + final List firstRecordValues = Stream.of(firstRecordLine.split(",")) + .map(String::trim) + .toList(); + assertEquals("Jane Doe", firstRecordValues.get( ff1HeaderIndices.get("name") )); + assertEquals("default", firstRecordValues.get( ff1HeaderIndices.get("group") )); + + final String ff2Contents = getClientUtil().getFlowFileContentAsUtf8(outputConnection.getId(), 1); + final String[] ff2Lines = ff2Contents.split("\n"); + final String ff2HeaderLine = ff2Lines[0]; + final List ff2Headers = Stream.of(ff2HeaderLine.split(",")) + .map(String::trim) + .toList(); + assertTrue(ff2Headers.contains("name")); + assertTrue(ff2Headers.contains("group")); + + final Map headerIndices = new HashMap<>(); + index = 0; + for (final String header : ff2Headers) { + headerIndices.put(header, index++); + } + + final String secondRecordLine = ff2Lines[1]; + final List secondRecordValues = Stream.of(secondRecordLine.split(",")) + .map(String::trim) + .toList(); + assertEquals("Jake Doe", secondRecordValues.get( headerIndices.get("name") )); + assertEquals("other", secondRecordValues.get( headerIndices.get("group") )); + } + @Test public void testFlowFileSource() throws NiFiClientException, IOException, InterruptedException { final String messageContents = "Hello World";