Skip to content

Commit

Permalink
Add integration test for recordtransform partitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Sep 16, 2024
1 parent a20252d commit a46024b
Showing 1 changed file with 85 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,91 @@ public void testRecordTransform() throws NiFiClientException, IOException, Inter
assertEquals("HELLO", secondRecordValues.get( headerIndices.get("greeting") ));
}

@Test
public void testRecordTransformPartitioning() throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity setRecordField = getClientUtil().createPythonProcessor("SetRecordField");
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");

// Add Reader and Writer
final ControllerServiceEntity csvReader = getClientUtil().createControllerService("MockCSVReader");
final ControllerServiceEntity csvWriter = getClientUtil().createControllerService("MockCSVWriter");

getClientUtil().enableControllerService(csvReader);
getClientUtil().enableControllerService(csvWriter);

// Configure the SetRecordField property
final Map<String, String> fieldMap = new HashMap<>();
fieldMap.put("Record Reader", csvReader.getId());
fieldMap.put("Record Writer", csvWriter.getId());
getClientUtil().updateProcessorProperties(setRecordField, fieldMap);
getClientUtil().setAutoTerminatedRelationships(setRecordField, new HashSet<>(Arrays.asList("original", "failure")));

// Set contents of GenerateFlowFile
getClientUtil().updateProcessorProperties(generate,
Collections.singletonMap("Text", "name, group\nJane Doe, default\nJake Doe, other"));

// Connect flow
getClientUtil().createConnection(generate, setRecordField, "success");
final ConnectionEntity outputConnection = getClientUtil().createConnection(setRecordField, terminate, "success");

// Wait for processor validation to complete
getClientUtil().waitForValidProcessor(generate.getId());
getClientUtil().waitForValidProcessor(setRecordField.getId());

// Run the flow
getClientUtil().startProcessor(generate);
getClientUtil().startProcessor(setRecordField);

// Wait for output data
waitForQueueCount(outputConnection.getId(), 2);

// Verify output contents. We don't know the order that the fields will be in, but we know that we should get back 6 fields per record: name, age, color, sport, greeting, group.
final String ff1Contents = getClientUtil().getFlowFileContentAsUtf8(outputConnection.getId(), 0);
final String[] ff1Lines = ff1Contents.split("\n");
final String ff1HeaderLine = ff1Lines[0];
final List<String> ff1Headers = Stream.of(ff1HeaderLine.split(","))
.map(String::trim)
.toList();
assertTrue(ff1Headers.contains("name"));
assertTrue(ff1Headers.contains("group"));

final Map<String, Integer> ff1HeaderIndices = new HashMap<>();
int index = 0;
for (final String header : ff1Headers) {
ff1HeaderIndices.put(header, index++);
}

final String firstRecordLine = ff1Lines[1];
final List<String> firstRecordValues = Stream.of(firstRecordLine.split(","))
.map(String::trim)
.toList();
assertEquals("Jane Doe", firstRecordValues.get( ff1HeaderIndices.get("name") ));
assertEquals("default", firstRecordValues.get( ff1HeaderIndices.get("group") ));

final String ff2Contents = getClientUtil().getFlowFileContentAsUtf8(outputConnection.getId(), 1);
final String[] ff2Lines = ff2Contents.split("\n");
final String ff2HeaderLine = ff2Lines[0];
final List<String> ff2Headers = Stream.of(ff2HeaderLine.split(","))
.map(String::trim)
.toList();
assertTrue(ff2Headers.contains("name"));
assertTrue(ff2Headers.contains("group"));

final Map<String, Integer> headerIndices = new HashMap<>();
index = 0;
for (final String header : ff2Headers) {
headerIndices.put(header, index++);
}

final String secondRecordLine = ff2Lines[1];
final List<String> secondRecordValues = Stream.of(secondRecordLine.split(","))
.map(String::trim)
.toList();
assertEquals("Jake Doe", secondRecordValues.get( headerIndices.get("name") ));
assertEquals("other", secondRecordValues.get( headerIndices.get("group") ));
}

@Test
public void testFlowFileSource() throws NiFiClientException, IOException, InterruptedException {
final String messageContents = "Hello World";
Expand Down

0 comments on commit a46024b

Please sign in to comment.