Skip to content

Commit

Permalink
NIFI-11556: Fixed bug that caused Stateless Process Groups not to sta…
Browse files Browse the repository at this point in the history
…rt after restart, if there were two restarted in a row without changing the flow in any way
  • Loading branch information
markap14 committed Jul 31, 2023
1 parent 7b5e484 commit 473d00b
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public ScheduledState getState(final ControllerServiceNode serviceNode) {

@Override
public ScheduledState getState(final ProcessGroup group) {
if (group.getStatelessScheduledState() == StatelessGroupScheduledState.RUNNING) {
if (group.getDesiredStatelessScheduledState() == StatelessGroupScheduledState.RUNNING) {
return ScheduledState.RUNNING;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import org.apache.nifi.groups.BundleUpdateStrategy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.StatelessGroupScheduledState;
import org.apache.nifi.nar.ExtensionDefinition;
import org.apache.nifi.nar.ExtensionDiscoveringManager;
import org.apache.nifi.nar.ExtensionManager;
Expand Down Expand Up @@ -1196,7 +1197,7 @@ public void trigger(final ComponentNode component) {
remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
startedTransmitting++;
} catch (final Throwable t) {
LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t});
LOG.error("Unable to start transmitting with {}", remoteGroupPort, t);
}
}

Expand All @@ -1211,7 +1212,7 @@ public void trigger(final ComponentNode component) {
startConnectable(connectable);
}
} catch (final Throwable t) {
LOG.error("Unable to start {} due to {}", new Object[]{connectable, t});
LOG.error("Unable to start {}", connectable, t);
}
}

Expand Down Expand Up @@ -1568,6 +1569,15 @@ public ScheduledState getScheduledState(final Port port) {

return port.getScheduledState();
}

@Override
public ScheduledState getScheduledState(final ProcessGroup processGroup) {
if (startGroupsAfterInitialization.contains(processGroup)) {
return ScheduledState.RUNNING;
}

return processGroup.getDesiredStatelessScheduledState() == StatelessGroupScheduledState.RUNNING ? ScheduledState.RUNNING : ScheduledState.STOPPED;
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.StatelessGroupScheduledState;

public interface ScheduledStateLookup {

ScheduledState getScheduledState(ProcessorNode procNode);

ScheduledState getScheduledState(Port port);

ScheduledState getScheduledState(ProcessGroup processGroup);


ScheduledStateLookup IDENTITY_LOOKUP = new ScheduledStateLookup() {
@Override
Expand All @@ -38,5 +42,10 @@ public ScheduledState getScheduledState(final ProcessorNode procNode) {
public ScheduledState getScheduledState(final Port port) {
return port.getScheduledState();
}

@Override
public ScheduledState getScheduledState(final ProcessGroup processGroup) {
return processGroup.getDesiredStatelessScheduledState() == StatelessGroupScheduledState.RUNNING ? ScheduledState.RUNNING : ScheduledState.STOPPED;
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,16 @@
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.flow.VersionedDataflow;
import org.apache.nifi.controller.flow.VersionedFlowEncodingVersion;
import org.apache.nifi.flow.VersionedFlowRegistryClient;
import org.apache.nifi.controller.flow.VersionedTemplate;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.ScheduledState;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.flow.VersionedFlowRegistryClient;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedParameterProvider;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedReportingTask;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.StatelessGroupScheduledState;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.registry.flow.FlowRegistryClientNode;
Expand Down Expand Up @@ -221,7 +220,7 @@ public ScheduledState getState(final ControllerServiceNode serviceNode) {

@Override
public ScheduledState getState(final ProcessGroup group) {
return group.getDesiredStatelessScheduledState() == StatelessGroupScheduledState.RUNNING ? ScheduledState.RUNNING : ScheduledState.ENABLED;
return map(stateLookup.getScheduledState(group));
}
};
}
Expand Down

0 comments on commit 473d00b

Please sign in to comment.