Skip to content

Commit

Permalink
fix LOGBACK-1716 by adding a ThreadPoolExecutor in addition to the Sc…
Browse files Browse the repository at this point in the history
…heduledExecutor in Context interface

Signed-off-by: Ceki Gulcu <ceki@qos.ch>
  • Loading branch information
ceki committed Apr 17, 2023
1 parent d378e8b commit 5f4b9fe
Show file tree
Hide file tree
Showing 19 changed files with 83 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import ch.qos.logback.classic.blackbox.BlackboxClassicTestConstants;
Expand Down Expand Up @@ -52,10 +53,7 @@
import jakarta.mail.internet.MimeMessage;
import jakarta.mail.internet.MimeMultipart;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;

public class SMTPAppender_GreenTest {

Expand Down Expand Up @@ -161,8 +159,14 @@ private MimeMultipart verifyAndExtractMimeMultipart(String subject)
}

void waitUntilEmailIsSent() throws InterruptedException {
loggerContext.getScheduledExecutorService().shutdown();
loggerContext.getScheduledExecutorService().awaitTermination(1000, TimeUnit.MILLISECONDS);
ExecutorService es = loggerContext.getExecutorService();
es.shutdown();
boolean terminated = es.awaitTermination(1000, TimeUnit.MILLISECONDS);
// this assertion may be needlessly strict
if(!terminated) {
fail("executor elapsed before accorded delay");
}

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public final void start() {
throw new IllegalStateException("context not set");
}
if (shouldStart()) {
getContext().getScheduledExecutorService().execute(getRunnableTask());
getContext().getExecutorService().execute(getRunnableTask());
started = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected boolean shouldStart() {

ServerListener<RemoteAppenderClient> listener = createServerListener(serverSocket);

runner = createServerRunner(listener, getContext().getScheduledExecutorService());
runner = createServerRunner(listener, getContext().getExecutorService());
runner.setContext(getContext());
return true;
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private void updateMaskIfNecessary(long now) {
// reader lock.
void detachReconfigurationToNewThread() {
addInfo("Detected change in [" + configurationWatchList.getCopyOfFileWatchList() + "]");
context.getScheduledExecutorService().submit(new ReconfiguringThread());
context.getExecutorService().submit(new ReconfiguringThread());
}

void updateNextCheck(long now) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import ch.qos.logback.core.Context;
import ch.qos.logback.core.ContextBase;


// LOGBACK-329
public class Barebones {

public static void main(String[] args) {
Context context = new ContextBase();
for (int i = 0; i < 3; i++) {
SenderRunnable senderRunnable = new SenderRunnable("" + i);
context.getScheduledExecutorService().execute(senderRunnable);
context.getExecutorService().execute(senderRunnable);
}
System.out.println("done");
// System.exit(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import ch.qos.logback.classic.Level;
Expand All @@ -45,7 +44,6 @@
* interface, and validate that it receives messages and delivers them to its
* appender.
*/
@Disabled
public class ServerSocketReceiverFunctionalTest {

private static final int EVENT_COUNT = 10;
Expand Down Expand Up @@ -78,7 +76,7 @@ public void setUp() throws Exception {
@AfterEach
public void tearDown() throws Exception {
receiver.stop();
ExecutorService executor = lc.getScheduledExecutorService();
ExecutorService executor = lc.getExecutorService();
executor.shutdownNow();
executor.awaitTermination(SHUTDOWN_DELAY, TimeUnit.MILLISECONDS);
assertTrue(executor.isTerminated());
Expand Down
11 changes: 6 additions & 5 deletions logback-core/src/main/java/ch/qos/logback/core/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;

import ch.qos.logback.core.spi.ConfigurationEvent;
import ch.qos.logback.core.spi.ConfigurationEventListener;
Expand Down Expand Up @@ -111,12 +112,11 @@ public interface Context extends PropertyContainer {
/**
* Returns the ScheduledExecutorService for this context.
*
* @return
* @return ScheduledExecutorService
* @since 1.1.7
*/
// Apparently ScheduledThreadPoolExecutor has limitation where a task cannot be
// submitted from
// within a running task. ThreadPoolExecutor does not have this limitation.
// submitted from within a running task. ThreadPoolExecutor does not have this limitation.
// This causes tests failures in
// SocketReceiverTest.testDispatchEventForEnabledLevel and
// ServerSocketReceiverFunctionalTest.testLogEventFromClient.
Expand All @@ -127,11 +127,12 @@ public interface Context extends PropertyContainer {
* tasks in a separate thread.
*
* @return the executor for this context.
* @since 1.0.0
* @deprecated use {@link#getScheduledExecutorService()} instead
* @since 1.0.00 (undeprecated in 1.4.7)
*
*/
ExecutorService getExecutorService();


/**
* Register a component that participates in the context's life cycle.
* <p>
Expand Down
35 changes: 19 additions & 16 deletions logback-core/src/main/java/ch/qos/logback/core/ContextBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.*;

import ch.qos.logback.core.rolling.helper.FileNamePattern;
import ch.qos.logback.core.spi.ConfigurationEvent;
Expand Down Expand Up @@ -56,6 +52,9 @@ public class ContextBase implements Context, LifeCycle {
final private List<ConfigurationEventListener> configurationEventListenerList = new CopyOnWriteArrayList<>();

private ScheduledExecutorService scheduledExecutorService;

private ThreadPoolExecutor threadPoolExecutor;

protected List<ScheduledFuture<?>> scheduledFutures = new ArrayList<ScheduledFuture<?>>(1);
private LifeCycleManager lifeCycleManager;
private SequenceNumberGenerator sequenceNumberGenerator;
Expand Down Expand Up @@ -166,9 +165,9 @@ public void start() {
}

public void stop() {
// We don't check "started" here, because the executor service uses
// We don't check "started" here, because the executor services use
// lazy initialization, rather than being created in the start method
stopExecutorService();
stopExecutorServices();

started = false;
}
Expand Down Expand Up @@ -216,14 +215,17 @@ public Object getConfigurationLock() {
return configurationLock;
}



@Override
/**
* @deprecated replaced by getScheduledExecutorService
*/
public synchronized ExecutorService getExecutorService() {
return getScheduledExecutorService();
if (threadPoolExecutor == null) {
threadPoolExecutor = (ThreadPoolExecutor) ExecutorServiceUtil.newThreadPoolExecutor();
}
return threadPoolExecutor;
}


@Override
public synchronized ScheduledExecutorService getScheduledExecutorService() {
if (scheduledExecutorService == null) {
Expand All @@ -232,11 +234,12 @@ public synchronized ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
}

private synchronized void stopExecutorService() {
if (scheduledExecutorService != null) {
ExecutorServiceUtil.shutdown(scheduledExecutorService);
scheduledExecutorService = null;
}
private synchronized void stopExecutorServices() {
ExecutorServiceUtil.shutdown(scheduledExecutorService);
scheduledExecutorService = null;

ExecutorServiceUtil.shutdown(threadPoolExecutor);
threadPoolExecutor = null;
}

private void removeShutdownHook() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@ public class CoreConstants {
public static final int CORE_POOL_SIZE = 0;

// Apparently ScheduledThreadPoolExecutor has limitation where a task cannot be
// submitted from
// within a running task unless the pool has worker threads already available.
// ThreadPoolExecutor
// does not have this limitation.
// This causes tests failures in
// SocketReceiverTest.testDispatchEventForEnabledLevel and
// submitted from within a running task unless the pool has worker threads already available.
// ThreadPoolExecutor does not have this limitation.
// This causes tests failures in SocketReceiverTest.testDispatchEventForEnabledLevel and
// ServerSocketReceiverFunctionalTest.testLogEventFromClient.
// We thus set a pool size > 0 for tests to pass.
public static final int SCHEDULED_EXECUTOR_POOL_SIZE = 1;
public static final int SCHEDULED_EXECUTOR_POOL_SIZE = 2;

/**
* Maximum number of threads to allow in a context's executor service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void start() {
deque = queueFactory.newLinkedBlockingDeque(queueSize);
peerId = "remote peer " + remoteHost + ":" + port + ": ";
connector = createConnector(address, port, 0, reconnectionDelay.getMilliseconds());
task = getContext().getScheduledExecutorService().submit(new Runnable() {
task = getContext().getExecutorService().submit(new Runnable() {
@Override
public void run() {
connectSocketAndDispatchEvents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ protected void append(E eventObject) {
if (asynchronousSending) {
// perform actual sending asynchronously
SenderRunnable senderRunnable = new SenderRunnable(cbClone, eventObject);
this.asynchronousSendingFuture = context.getScheduledExecutorService().submit(senderRunnable);
this.asynchronousSendingFuture = context.getExecutorService().submit(senderRunnable);
} else {
// synchronous sending
sendBuffer(cbClone, eventObject);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ public void start() {
getInetAddress());
ServerListener<RemoteReceiverClient> listener = createServerListener(socket);

runner = createServerRunner(listener, getContext().getScheduledExecutorService());
runner = createServerRunner(listener, getContext().getExecutorService());
runner.setContext(getContext());
getContext().getScheduledExecutorService().execute(runner);
getContext().getExecutorService().execute(runner);
super.start();
} catch (Exception ex) {
addError("server startup error: " + ex, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public Future<?> asyncCompress(String nameOfFile2Compress, String nameOfCompress
throws RolloverFailure {
CompressionRunnable runnable = new CompressionRunnable(nameOfFile2Compress, nameOfCompressedFile,
innerEntryName);
ExecutorService executorService = context.getScheduledExecutorService();
ExecutorService executorService = context.getExecutorService();
Future<?> future = executorService.submit(runnable);
return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.io.File;
import java.time.Instant;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -221,7 +220,7 @@ public String toString() {

public Future<?> cleanAsynchronously(Instant now) {
ArhiveRemoverRunnable runnable = new ArhiveRemoverRunnable(now);
ExecutorService executorService = context.getScheduledExecutorService();
ExecutorService executorService = context.getExecutorService();
Future<?> future = executorService.submit(runnable);
return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,20 @@ static public ScheduledExecutorService newScheduledExecutorService() {
}

/**
* Creates an executor service suitable for use by logback components.
*
* @return executor service
* @deprecated replaced by {@link #newThreadPoolExecutor()}
*/
static public ExecutorService newExecutorService() {
return newThreadPoolExecutor();
}


/**
* Creates an ThreadPoolExecutor suitable for use by logback components.
*
* @since 1.4.7
* @return ThreadPoolExecutor
*/
static public ThreadPoolExecutor newThreadPoolExecutor() {
return new ThreadPoolExecutor(CoreConstants.CORE_POOL_SIZE, CoreConstants.MAX_POOL_SIZE, 0L,
TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), THREAD_FACTORY);
}
Expand All @@ -69,7 +78,9 @@ static public ExecutorService newExecutorService() {
* @param executorService the executor service to shut down
*/
static public void shutdown(ExecutorService executorService) {
executorService.shutdownNow();
if(executorService != null) {
executorService.shutdownNow();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.net.Socket;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.AfterEach;
Expand All @@ -51,8 +52,8 @@ public class AbstractSocketAppenderIntegrationTest {

private static final int TIMEOUT = 2000;

private ScheduledExecutorService executorService = ExecutorServiceUtil.newScheduledExecutorService();
private MockContext mockContext = new MockContext(executorService);
private ThreadPoolExecutor threadPoolExecutor = ExecutorServiceUtil.newThreadPoolExecutor();
private MockContext mockContext = new MockContext(threadPoolExecutor);
private AutoFlushingObjectWriter objectWriter;
private ObjectWriterFactory objectWriterFactory = new SpyProducingObjectWriterFactory();
private LinkedBlockingDeque<String> deque = spy(new LinkedBlockingDeque<String>(1));
Expand All @@ -70,8 +71,8 @@ public void setUp() throws Exception {
public void tearDown() throws Exception {
instrumentedAppender.stop();
Assertions.assertFalse(instrumentedAppender.isStarted());
executorService.shutdownNow();
Assertions.assertTrue(executorService.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS));
threadPoolExecutor.shutdownNow();
Assertions.assertTrue(threadPoolExecutor.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS));
}

@Disabled // JDK 16
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package ch.qos.logback.core.net.mock;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import ch.qos.logback.core.Context;
import ch.qos.logback.core.ContextBase;
Expand All @@ -29,22 +31,22 @@
*/
public class MockContext extends ContextBase {

private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorService executorService;

private Status lastStatus;

public MockContext() {
this(new MockScheduledExecutorService());
}

public MockContext(ScheduledExecutorService executorService) {
public MockContext(ExecutorService executorService) {
this.setStatusManager(new MockStatusManager());
this.scheduledExecutorService = executorService;
this.executorService = executorService;
}

@Override
public ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
public ExecutorService getExecutorService() {
return executorService;
}

public Status getLastStatus() {
Expand Down
Loading

0 comments on commit 5f4b9fe

Please sign in to comment.