Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-11556: Added ability to use a Process Group as a Stateless Flow #7253

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.components;

public enum PortFunction {
/**
* Port functions as a standard port, transferring FlowFiles to all outgoing connections.
*/
STANDARD,
exceptionfactory marked this conversation as resolved.
Show resolved Hide resolved

/**
* Port denotes that the invocation of the stateless flow has failed. If run using the Standard Engine, will operate as a Standard port. If running
* using the Stateless Engine, the transaction is rolled back, and any input FlowFiles will be transferred to this Port.
*/
FAILURE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ProcessGroupStatus implements Cloneable {
private Integer outputCount;
private Long outputContentSize;
private Integer activeThreadCount;
private Integer statelessActiveThreadCount;
private Integer terminatedThreadCount;
private Integer queuedCount;
private Long queuedContentSize;
Expand Down Expand Up @@ -152,6 +153,14 @@ public void setActiveThreadCount(final Integer activeThreadCount) {
this.activeThreadCount = activeThreadCount;
}

public Integer getStatelessActiveThreadCount() {
return statelessActiveThreadCount;
}

public void setStatelessActiveThreadCount(final Integer statelessActiveThreadCount) {
this.statelessActiveThreadCount = statelessActiveThreadCount;
}

public Integer getTerminatedThreadCount() {
return terminatedThreadCount;
}
Expand Down Expand Up @@ -427,6 +436,7 @@ public static void merge(final ProcessGroupStatus target, final ProcessGroupStat
target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten());
target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
target.setStatelessActiveThreadCount(target.getStatelessActiveThreadCount() + toMerge.getStatelessActiveThreadCount());
target.setTerminatedThreadCount(target.getTerminatedThreadCount() + toMerge.getTerminatedThreadCount());
target.setFlowFilesTransferred(target.getFlowFilesTransferred() + toMerge.getFlowFilesTransferred());
target.setBytesTransferred(target.getBytesTransferred() + toMerge.getBytesTransferred());
Expand Down
35 changes: 35 additions & 0 deletions nifi-api/src/main/java/org/apache/nifi/flow/ExecutionEngine.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.flow;

public enum ExecutionEngine {
/**
* Run using the standard NiFi engine
*/
STANDARD,

/**
* Run using the Stateless engine
*/
STATELESS,

/**
* Use the Execution Engine that is configured for the parent Process Group. If there is no parent Process Group, default to the standard engine.
*/
INHERITED;
}
11 changes: 11 additions & 0 deletions nifi-api/src/main/java/org/apache/nifi/flow/VersionedPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.nifi.flow;

import io.swagger.annotations.ApiModelProperty;
import org.apache.nifi.components.PortFunction;

public class VersionedPort extends VersionedComponent {
private PortType type;
private Integer concurrentlySchedulableTaskCount;
private ScheduledState scheduledState;
private Boolean allowRemoteAccess;
private PortFunction portFunction;

@ApiModelProperty("The number of tasks that should be concurrently scheduled for the port.")
public Integer getConcurrentlySchedulableTaskCount() {
Expand Down Expand Up @@ -65,6 +67,15 @@ public void setAllowRemoteAccess(Boolean allowRemoteAccess) {
this.allowRemoteAccess = allowRemoteAccess;
}

@ApiModelProperty("Specifies how the Port should function")
public PortFunction getPortFunction() {
return portFunction;
}

public void setPortFunction(final PortFunction portFunction) {
this.portFunction = portFunction;
}

@Override
public ComponentType getComponentType() {
if (type == PortType.OUTPUT_PORT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public class VersionedProcessGroup extends VersionedComponent {
private String defaultFlowFileExpiration;
private Long defaultBackPressureObjectThreshold;
private String defaultBackPressureDataSizeThreshold;
private ScheduledState scheduledState;
private ExecutionEngine executionEngine;
private Integer maxConcurrentTasks;
private String statelessFlowTimeout;

private String logFileSuffix;

Expand Down Expand Up @@ -216,4 +220,40 @@ public String getLogFileSuffix() {
public void setLogFileSuffix(final String logFileSuffix) {
this.logFileSuffix = logFileSuffix;
}

@ApiModelProperty("The Scheduled State of the Process Group, if the group is configured to use the Stateless Execution Engine. Otherwise, this value has no relevance.")
public ScheduledState getScheduledState() {
return scheduledState;
}

public void setScheduledState(final ScheduledState scheduledState) {
this.scheduledState = scheduledState;
}

@ApiModelProperty("The Execution Engine that should be used to run the components within the group.")
public ExecutionEngine getExecutionEngine() {
return executionEngine;
}

public void setExecutionEngine(final ExecutionEngine executionEngine) {
this.executionEngine = executionEngine;
}

@ApiModelProperty("The maximum number of concurrent tasks that should be scheduled for this Process Group when using the Stateless Engine")
public Integer getMaxConcurrentTasks() {
return maxConcurrentTasks;
}

public void setMaxConcurrentTasks(final Integer maxConcurrentTasks) {
this.maxConcurrentTasks = maxConcurrentTasks;
}

@ApiModelProperty("The maximum amount of time that the flow is allows to run using the Stateless engine before it times out and is considered a failure")
public String getStatelessFlowTimeout() {
return statelessFlowTimeout;
}

public void setStatelessFlowTimeout(final String timeout) {
this.statelessFlowTimeout = timeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public enum ComponentType {
*/
REPORTING_TASK,

/**
* Bulletin is associated with a Process Group
*/
PROCESS_GROUP,

/**
* Bulletin is associated with a Parameter Provider
*/
Expand Down
1 change: 1 addition & 0 deletions nifi-docs/src/main/asciidoc/administration-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3405,6 +3405,7 @@ The contents of the _nifi.properties_ file are relatively stable but can change
Consider configuring items below marked with an asterisk (`*`) in such a way that upgrading will be easier. For example, change the default directory configurations to locations outside the main root installation. In this way, these items can remain in their configured location through an upgrade, allowing NiFi to find all the repositories and configuration files and pick up where it left off as soon as the old version is stopped and the new version is started. Furthermore, the administrator may reuse this _nifi.properties_ file and any other configuration files without having to re-configure them each time an upgrade takes place. See <<upgrading_nifi>> for more details.


[[core_properties]]
=== Core Properties +

The first section of the _nifi.properties_ file is for the Core Properties. These properties apply to the core framework as a whole.
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Loading