Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-3065 Per Process Group logging #7315

Merged
merged 5 commits into from
Jun 24, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class VersionedProcessGroup extends VersionedComponent {
private Long defaultBackPressureObjectThreshold;
private String defaultBackPressureDataSizeThreshold;

private String logFileSuffix;


@ApiModelProperty("The child Process Groups")
public Set<VersionedProcessGroup> getProcessGroups() {
Expand Down Expand Up @@ -205,4 +207,13 @@ public String getDefaultBackPressureDataSizeThreshold() {
public void setDefaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
}

@ApiModelProperty(value = "The log file suffix for this Process Group for dedicated logging.")
public String getLogFileSuffix() {
return logFileSuffix;
}

public void setLogFileSuffix(final String logFileSuffix) {
this.logFileSuffix = logFileSuffix;
}
}
6 changes: 6 additions & 0 deletions nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ language governing permissions and limitations under the License. -->
<artifactId>slf4j-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-per-process-group-logging</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
Expand Down
1 change: 1 addition & 0 deletions nifi-assembly/src/main/assembly/common.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<include>*:logback-core</include>
<include>*:nifi-api</include>
<include>*:nifi-property-protection-api</include>
<include>*:nifi-per-process-group-logging</include>
</includes>
</dependencySet>

Expand Down
31 changes: 31 additions & 0 deletions nifi-commons/nifi-per-process-group-logging/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-per-process-group-logging</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.logging;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.sift.Discriminator;
import org.slf4j.event.KeyValuePair;

public class NifiDiscriminator implements Discriminator<ILoggingEvent> {
private static final String KEY = "logFileSuffix";

private boolean started;

@Override
public String getDiscriminatingValue(final ILoggingEvent iLoggingEvent) {
for (KeyValuePair keyValuePair : iLoggingEvent.getKeyValuePairs()) {
if (keyValuePair.key.equals(getKey())) {
return keyValuePair.value.toString();
}
}
return null;
}

@Override
public String getKey() {
return KEY;
}

@Override
public void start() {
started = true;
}

@Override
public void stop() {
started = false;
}

@Override
public boolean isStarted() {
return started;
}
}
1 change: 1 addition & 0 deletions nifi-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<module>nifi-logging-utils</module>
<module>nifi-metrics</module>
<module>nifi-parameter</module>
<module>nifi-per-process-group-logging</module>
<module>nifi-property-encryptor</module>
<module>nifi-property-utils</module>
<module>nifi-properties</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ProcessGroupDTO extends ComponentDTO {
private String defaultFlowFileExpiration;
private Long defaultBackPressureObjectThreshold;
private String defaultBackPressureDataSizeThreshold;
private String logFileSuffix;

private Integer runningCount;
private Integer stoppedCount;
Expand Down Expand Up @@ -403,4 +404,13 @@ public String getDefaultBackPressureDataSizeThreshold() {
public void setDefaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
}

@ApiModelProperty(value = "The log file suffix for this Process Group for dedicated logging.")
public String getLogFileSuffix() {
return logFileSuffix;
}

public void setLogFileSuffix(final String logFileSuffix) {
this.logFileSuffix = logFileSuffix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.InstanceClassLoader;
import org.apache.nifi.nar.NarCloseable;
Expand Down Expand Up @@ -1572,7 +1573,7 @@ private void run(ScheduledExecutorService taskScheduler, long administrativeYiel
SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping, ScheduledState desiredState, ScheduledState scheduledState) {

final Processor processor = processorRef.get().getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor, new StandardLoggingContext(StandardProcessorNode.this));
LOG.info("Starting {}", this);

ScheduledState currentState;
Expand Down Expand Up @@ -1714,7 +1715,7 @@ private void initiateStart(final ScheduledExecutorService taskScheduler, final l
AtomicLong startupAttemptCount, final Supplier<ProcessContext> processContextFactory, final SchedulingAgentCallback schedulingAgentCallback) {

final Processor processor = getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor, new StandardLoggingContext(StandardProcessorNode.this));

// Completion Timestamp is set to MAX_VALUE because we don't want to timeout until the task has a chance to run.
final AtomicLong completionTimestampRef = new AtomicLong(Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.FormatUtils;
Expand Down Expand Up @@ -596,7 +597,8 @@ public void run() {

enablingAttemptCount.incrementAndGet();
if (enablingAttemptCount.get() == 120 || enablingAttemptCount.get() % 3600 == 0) {
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this,
new StandardLoggingContext(StandardControllerServiceNode.this));
componentLog.error("Encountering difficulty enabling. (Validation State is {}: {}). Will continue trying to enable.",
validationState, validationState.getValidationErrors());
}
Expand Down Expand Up @@ -635,7 +637,8 @@ public void run() {
future.completeExceptionally(e);

final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this,
new StandardLoggingContext(StandardControllerServiceNode.this));
componentLog.error("Failed to invoke @OnEnabled method", cause);
invokeDisable(configContext);

Expand Down Expand Up @@ -717,7 +720,7 @@ private void invokeDisable(ConfigurationContext configContext) {
LOG.debug("Successfully disabled {}", this);
} catch (Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this, new StandardLoggingContext(StandardControllerServiceNode.this));
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -52,7 +53,7 @@ private ComponentLog getLogger(final String componentId) {
final LogRepository repo = LogRepositoryFactory.getRepository(componentId);
final ComponentLog logger = (repo == null) ? null : repo.getLogger();
if (repo == null || logger == null) {
return new SimpleProcessLogger(componentId, this);
return new SimpleProcessLogger(componentId, this, new StandardLoggingContext(null));
}

return logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.nifi.parameter.ParameterParser;
import org.apache.nifi.parameter.ParameterTokenList;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.processor.StandardValidationContext;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.security.util.SslContextFactory;
Expand Down Expand Up @@ -341,7 +342,7 @@ private static StateProvider createStateProvider(
propertyMap.put(descriptor, new StandardPropertyValue(resourceContext, entry.getValue(),null, parameterLookup, variableRegistry));
}

final ComponentLog logger = new SimpleProcessLogger(providerConfig.getId(), provider);
final ComponentLog logger = new SimpleProcessLogger(providerConfig.getId(), provider, new StandardLoggingContext(null));
final StateProviderInitializationContext initContext = new StandardStateProviderInitializationContext(providerConfig.getId(), propertyMap, sslContext, logger);

synchronized (provider) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ private void synchronize(final ProcessGroup group, final VersionedProcessGroup p
group.setDefaultBackPressureObjectThreshold(proposed.getDefaultBackPressureObjectThreshold());
group.setDefaultBackPressureDataSizeThreshold(proposed.getDefaultBackPressureDataSizeThreshold());

group.setLogFileSuffix(proposed.getLogFileSuffix());

final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates();
if (remoteCoordinates == null) {
group.disconnectVersionControl(false);
Expand Down Expand Up @@ -1795,6 +1797,7 @@ public void synchronizeProcessGroupSettings(final ProcessGroup processGroup, fin
groupToUpdate.setComments(proposed.getComments());
groupToUpdate.setName(proposed.getName());
groupToUpdate.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
groupToUpdate.setLogFileSuffix(proposed.getLogFileSuffix());

if (processGroup == null) {
LOG.info("Successfully synchronized {} by adding it to the flow", groupToUpdate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private static final String DEFAULT_FLOWFILE_EXPIRATION = "0 sec";
private static final long DEFAULT_BACKPRESSURE_OBJECT = 10_000L;
private static final String DEFAULT_BACKPRESSURE_DATA_SIZE = "1 GB";
private static final String VALID_DIRECTORY_NAME_REGEX = "[\\s\\<\\>:\\'\\\"\\/\\\\\\|\\?\\*]";
timeabarna marked this conversation as resolved.
Show resolved Hide resolved
private volatile String logFileSuffix;


public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
Expand Down Expand Up @@ -244,6 +246,7 @@ public StandardProcessGroup(final String id, final ControllerServiceProvider ser
this.defaultFlowFileExpiration = new AtomicReference<>();
this.defaultBackPressureObjectThreshold = new AtomicReference<>();
this.defaultBackPressureDataSizeThreshold = new AtomicReference<>();
this.logFileSuffix = null;

// save only the nifi properties needed, and account for the possibility those properties are missing
if (nifiProperties == null) {
Expand Down Expand Up @@ -4422,6 +4425,21 @@ public QueueSize getQueueSize() {
return new QueueSize(count, contentSize);
}

@Override
public String getLogFileSuffix() {
return logFileSuffix;
}

@Override
public void setLogFileSuffix(final String logFileSuffix) {
final Pattern pattern = Pattern.compile(VALID_DIRECTORY_NAME_REGEX);
if (logFileSuffix != null && pattern.matcher(logFileSuffix).find()) {
throw new IllegalArgumentException("Log file suffix can not contain the following characters: space, <, >, :, \', \", /, \\, |, ?, *");
} else {
this.logFileSuffix = logFileSuffix;
}
}

@Override
public String getDefaultBackPressureDataSizeThreshold() {
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, obtain from nifi properties.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.logging;

import java.util.Optional;

public interface LoggingContext {
/**
* @return the log file name suffix. This will be the discriminating value for the dedicated logging.
*/
Optional<String> getLogFileSuffix();

/**
* @return The key under which the discriminating value should be exported into the host environment.
*/
String getDiscriminatorKey();
}
Loading