Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-11556: Added ability to use a Process Group as a Stateless Flow #7253

Closed
wants to merge 11 commits into from

Conversation

markap14
Copy link
Contributor

Summary

NIFI-00000

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using mvn clean install -P contrib-check
    • JDK 11
    • JDK 17

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for introducing this new feature @markap14! The changes cover a lot of ground, most of the file adjustments are straightforward given the nature of the changes.

I only noted a couple minor questions on initial code review. I will run through some additional runtime testing.

@markap14
Copy link
Contributor Author

Rebased to main in order to address conflicts and squashed commits in order to make that rebase easier. Thanks for the feedback @exceptionfactory

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @markap14.

While going through some runtime testing, it looks like Controller Services remain in Enabled status when a Process Group is configured for Stateless execution. As a result, the Services cannot be disabled or changed.

@markap14
Copy link
Contributor Author

Force pushed to rebase against main, due to conflicts.

@joewitt
Copy link
Contributor

joewitt commented Jul 24, 2023

Just experimenting with it. It appears the 'stateless flow timeout' is always shown on the process group settings but presumably only matters when the execution engine is 'stateless'. Perhaps we can ensure a tooltip shows up on the process group properties so a user could quickly read that property is often irrelevant/only useful if the engine selection is stateless?

@joewitt
Copy link
Contributor

joewitt commented Jul 24, 2023

Went to try and put a stateful group inside a stateless group and it does block it which is cool. But the error message buries the lede a bit.

"Cannot change Execution Engine for StandardProcessGroup[identifier=87dcc86d-0189-1000-9176-d4a998c280f0,name=Stateful Group] to STANDARD because parent group StandardProcessGroup[identifier=87d912dd-0189-1000-13e8-b1b11eb1c2e6,name=Stateless Group] is configured to use the Stateless Engine. A Process Group using the Stateless Engine may be embedded within a Process Group using the Traditional Engine, but the reverse is not allowed."

Instead can we reverse the order of that output, be consistent about references to 'standard vs traditional' and simplify the language to avoid referring to the 'reverse' situation.

For instance:

"A process group using a standard engine may not be a child of a process group using the stateless engine. Cannot set PG[bla-standard] as a child of PG[bla2-stateless]."

@joewitt
Copy link
Contributor

joewitt commented Jul 24, 2023

On root group created a process group and selected stateless engine.

In that stateless group created a GenerateFlowFile -> UpdateAttribute flow with a batch size of 500 and 1KB non unique text objects. Realized I cannot start/stop individual components and instead must do so at the group level based on error responses. Ideally we would prevent those buttons from being used at all as otherwise we're doing a user experience based on 'try and fail' which is often not fun. Ideally we improve that now or in time.

When I started the group I realized I had the wrong scheduling period set as the default is 1min so a single flowfile went through. I changed to '0 secs' and tried again. Nothing happened and I noticed that scheduling period was returned to '1 min' suggesting my changes aren't taking effect.

@joewitt
Copy link
Contributor

joewitt commented Jul 24, 2023

If I then change the flow to be traditional with generate flowfile feeding into stateless group with updateattribute the strange scheduling behavior remains. Cannot change from 1min to 0 secs as it keeps returning to its old form. But when I start things it runs a couple times then stops. The behavior seems a good bit off even for traditional flow. I did copy/paste from the stateless group to the traditional so maybe that is a factor.

@markap14
Copy link
Contributor Author

markap14 commented Jul 24, 2023

Thanks for testing it out @joewitt. I agree that the message on that error could be improved. Will update that.
I will try to replicate the issue that you pointed out regarding the scheduling.

As for the UI showing things that aren't really applicable: I agree, ideally we hide things that are not applicable. However, my UI skills are very much lacking. So I did what I felt was minimally necessary in terms of UI. I think we could actually go pretty far there - even removing the 'Scheduling' tab from processors in a Stateless Group that have incoming connections. I do feel that could be a follow-on Jira, though, to improve the UI and remove the elements that are not relevant in a Stateless group.

@markap14
Copy link
Contributor Author

@joewitt I was able to replicate the issue around the scheduling period. It looks like this bug is actually present on the main branch. Was brought into this branch through a recent rebase. The bug is actually in the UI and was introduced as part of NIFI-11813. I have re-opened that Jira. Thanks for catching & calling that out!

@markap14
Copy link
Contributor Author

OK pushed some changes @joewitt . Will now automatically show/hide the "Max Concurrent Tasks" and "Stateless Flow Timeout" settings based on the chosen Execution Engine. Also addressed wording of the error messages.

@joewitt
Copy link
Contributor

joewitt commented Jul 24, 2023

Had a scenario I figured would cause problems. Nope. It works perfectly

Cannot change Execution Engine for StandardProcessGroup[identifier=888d6dd0-0189-1000-e88e-b0a720bf53e6,name=Stateless Group] while components are running. UpdateAttribute[id=888d916e-0189-1000-749a-0d0497e2b6e2] is currently running.

Running a typical GenerateFF->UpdateAttr flow in both standard and stateless. Seeing performance being double the speed in standard vs stateless which I think surprises me. Stateless has 5 concurrent threads whereas standard has 1 task for generate and update. It seems like standard runs both generate and update at once whereas stateless runs one then the other and never more than one. So that makes sense speed is double in standard case.

Other note: We should deprecate in 1.x line the ExecuteStateless processor. This will be superior in every way to that.

@joewitt
Copy link
Contributor

joewitt commented Jul 24, 2023

The above info was with run duration set above zero. With run duration at 0 on all things the behavior seems to possibly flip with standard being slower due to backlogs on connection hitting 10K slowing it down.

@joewitt
Copy link
Contributor

joewitt commented Jul 24, 2023

I had a flow running in a PG that is traditional and a PG that is stateless. After restart the stateless PG did not automatically restart.

@joewitt
Copy link
Contributor

joewitt commented Jul 24, 2023

With provenance in Volatile mode now I see stateless flow running with all 5 threads. I think my laptop will melt

@joewitt
Copy link
Contributor

joewitt commented Jul 24, 2023

Ok again it did not restart the stateless group.

So build a stateless flow in nifi. Have it running. Restart NiFi. Then it all comes back up running. Restart nifi again without doing anything else and the stateless flow will come up not running. Traditional flows do run though.

Repeated that pattern twice. and it happened twice.

@joewitt
Copy link
Contributor

joewitt commented Jul 24, 2023

I see this in the logs once I tell nifi to restart.

2023-07-24 12:48:52,219 ERROR [Run Dataflow Stateless Group] o.a.n.s.flow.StandardStatelessFlow Failed to execute dataflow java.lang.RuntimeException: Dataflow canceled at org.apache.nifi.stateless.engine.StandardExecutionProgress.enqueueTriggerResult(StandardExecutionProgress.java:178) at org.apache.nifi.stateless.flow.StandardStatelessFlow.executeDataflow(StandardStatelessFlow.java:571) at org.apache.nifi.stateless.flow.StandardStatelessFlow.lambda$trigger$5(StandardStatelessFlow.java:511) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833)

@joewitt
Copy link
Contributor

joewitt commented Jul 24, 2023

nifi appears stuck runnig the integration tests/system tests in nifi-system-tests with mvn clean install -Pintegration-tests

It isn't moving and stack dump shows

"main" #1 prio=5 os_prio=31 cpu=1795.56ms elapsed=2750.21s tid=0x000000011a008a00 nid=0x2203 in Object.wait() [0x0000000170083000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(java.base@17.0.8/Native Method) - waiting on <0x0000000700a00058> (a java.lang.Thread) at java.lang.Thread.join(java.base@17.0.8/Thread.java:1304) - locked <0x0000000700a00058> (a java.lang.Thread) at java.lang.Thread.join(java.base@17.0.8/Thread.java:1372) at org.apache.nifi.tests.system.AggregateNiFiInstance.start(AggregateNiFiInstance.java:49) at org.apache.nifi.tests.system.NiFiInstanceCache$CachedNiFiInstance.start(NiFiInstanceCache.java:189) at org.apache.nifi.tests.system.NiFiInstance.start(NiFiInstance.java:32) at org.apache.nifi.tests.system.NiFiSystemIT.setup(NiFiSystemIT.java:121) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@17.0.8/Native Method) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@17.0.8/NativeMethodAccessorImpl.java:77) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@17.0.8/DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(java.base@17.0.8/Method.java:568) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)

@markap14
Copy link
Contributor Author

Thanks @joewitt . I haven't been able to replicate an issue with the system test freezing, but based on the thread dump it looks like the nifi instance failed to startup. Perhaps due to a port conflict or something of that nature?

I did track down the issue with the Stateless Group not starting up when NiFi is restarted, though. I pushed a fix for that. Great catch!

…ge; automatically show/hide the Stateless-specific settings for a ProcessGroup based on the chosen Execution Engine
…rt after restart, if there were two restarted in a row without changing the flow in any way
…Processors, Controller Services (even those not executed by the Stateless Engine) are stopped/disabled before considering the Stateless Group to be fully STOPPED.
…, ensure that we wait for the ports' active threads to complete before returning
…eck the result and if the queue still has data (because a Processor hasn't acknowledged the data, for example) then continue issuing request until the queue fully becomes empty.
Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing several recent issues @markap14, this looks close to completion.

Running through several sample flows, the basic operations appear to be covered well.

I noticed one minor issue, where it is possible to set Max Concurrent Tasks to -1, which allows the Stateless group to be started, but it never runs. That could be addressed as a follow-on issue if needed.

I observed one other issue, however, that seems worth correcting before merging. When configured a nested group for Stateless execution, and having Processors in the nested group reference Controller Services in a parent group, the NiFi system does not shutdown properly. The logs indicate application shutdown, and Jetty stops, but the process itself is still running, leaving the bootstrap handler to kill the NiFI process after the grace period. This seems to indicate a thread or executor that is not stopping properly. It appears to be related to the fact that Controller Services in a parent Standard-execution group are enabled, and disabling does not work as expected. This is not a problem when Controller Services are defined in the Stateless-execution group itself, as they are Disabled in terms of framework status. If we can track down this issue, I think this pull request should be ready to go.

… set less than 1 for stateless group; fixed typo in ProcessGroupDTO's docs; on shutdown, we may need to disable controller services asynchronously. At that point, the thread pool used to do so may already be shutdown. If so, catch this and create a new single-thread pool, disable the service, and immediately shutdown the pool. Also, if we fail to disable services on shutdown of a stateless flow, instead of throwing an Exception, just log it and move on - it doesn't make much sense for shutdown() to throw an Exception in that case.
Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the latest updates @markap14. The validation for Max Concurrent Tasks is working as expected.

On further testing, there still seems to be a timing issue with shutting down the system when a Stateless Process Group is running. When configuring 2 or 4 Max Concurrent Tasks for a Stateless group, the following exception occurs when trying to stop NiFi, which appears to indicate that the executor is unable to complete the job of disabling references Controller Services:

2023-08-08 12:15:36,071 INFO [Framework Task Thread Thread-3] o.a.n.s.flow.StandardStatelessFlow Disabling 1 Controller Services
2023-08-08 12:15:36,072 ERROR [Framework Task Thread Thread-3] org.apache.nifi.engine.FlowEngine Uncaught Exception in Runnable task
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@6228b4ee[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@554f4591[Wrapped task = org.apache.nifi.engine.FlowEngine$2@7a54fe16]] rejected from org.apache.nifi.engine.FlowEngine@34a20a50[Shutting down, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 6]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
	at org.apache.nifi.engine.FlowEngine.schedule(FlowEngine.java:87)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:715)
	at org.apache.nifi.controller.scheduling.StandardProcessScheduler.submitFrameworkTask(StandardProcessScheduler.java:148)
	at org.apache.nifi.controller.service.StandardControllerServiceProvider.disableControllerServicesAsync(StandardControllerServiceProvider.java:405)
	at org.apache.nifi.stateless.flow.StandardStatelessFlow.shutdown(StandardStatelessFlow.java:387)
	at org.apache.nifi.controller.tasks.StatelessFlowTask.shutdown(StatelessFlowTask.java:150)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at org.apache.nifi.groups.StandardStatelessGroupNode.shutdownFlows(StandardStatelessGroupNode.java:315)
	at org.apache.nifi.groups.StandardStatelessGroupNode.lambda$stop$3(StandardStatelessGroupNode.java:405)
	at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

markap14 and others added 2 commits August 8, 2023 17:27
…tdownSeconds seconds for the components to stop before shutting down thread pools. This allows for asynchronous operations such as disableControllerServicesAsync to complete during shutdown. Updated StandardStatelessFlow so that on shutdown it catches more general Exception to ensure that shutdown succeeds
@markobean
Copy link
Contributor

I am just beginning to review this and it will take some time because it is a significant change. A couple comments off the bat.

Can you add "help" dialogs for the new properties for the configuration of a process group - specifically Execution Engine, Max Concurrent Tasks, Stateless Flow Timeout? See the Log File Suffix property for an example.

Can you add some information in the PR description to assist in evaluating this one? It is non-trivial to say the least and not having any description of what was added/modified, why, or what areas to focus on makes reviewing even more challenging.

Thanks!

@exceptionfactory
Copy link
Contributor

The latest version has a clean shutdown with the Controller Service reference scenario described, thanks @markap14!

I pushed a correction for the unused import, otherwise this looks ready to go pending successful builds.

@exceptionfactory
Copy link
Contributor

Thanks for taking a look @markobean. As you noted, there is a lot here, but it builds on the core capabilities of Stateless execution. If you are not familiar with Stateless operation, or the ExecuteStateless Processor, it would be worth reviewing the associated documentation for those features.

It is worth giving some consideration to how approachable this capability is for general users, so improving usability is worth doing. However, some of those items could be considered under a follow-on effort. One of the main concerns with this pull request is to ensure existing functionality works, and the new optional functionality operates within reasonable parameters.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following another set of successful automated builds, this looks ready to merge. Thanks for the testing @joewitt, and thanks for resolving the various issues raised @markap14!

@markobean as mentioned, please feel free to review the details and raise additional Jira issues for improvements. Similar to the support for Python-based components, this has significant impact on the framework for NiFi 2.0, but we still have some time to making adjustments as we continue working on technical debt reduction.

+1 merging

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants