Skip to content

Commit

Permalink
Create JobKitWatchdog, add Supervisable for it and add Spring Boot co…
Browse files Browse the repository at this point in the history
…nf. Do some clean code on JobJit/test and Mailkit #140
  • Loading branch information
hdsdi3g committed Jul 3, 2023
1 parent 0d5bd23 commit e6c266e
Show file tree
Hide file tree
Showing 47 changed files with 2,777 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;

public class AtomicComputeReference<T> {
private T t;
Expand All @@ -36,8 +37,10 @@ public synchronized void setAnd(final T t, final Consumer<T> process) {
process.accept(t);
}

public synchronized void reset() {
public synchronized T reset() {
final var old = t;
t = null;
return old;
}

public synchronized boolean isSet() {
Expand All @@ -57,4 +60,9 @@ public synchronized boolean computePredicate(final Predicate<T> process) {
}
return process.test(t);
}

public synchronized void replace(final UnaryOperator<T> process) {
t = process.apply(t);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class BackgroundService {
private final BackgroundServiceEvent event;
private final RunnableWithException task;
private final RunnableWithException disableTask;
private final JobKitWatchdog jobKitWatchdog;

private boolean enabled;
private ScheduledFuture<?> nextRunReference;
Expand All @@ -36,13 +37,15 @@ public BackgroundService(final String name,
final Spooler spooler,
final ScheduledExecutorService scheduledExecutor,
final BackgroundServiceEvent event,
final JobKitWatchdog jobKitWatchdog,
final RunnableWithException task,
final RunnableWithException disableTask) {
this.name = name;
this.spoolName = spoolName;
this.spooler = spooler;
this.scheduledExecutor = scheduledExecutor;
this.event = event;
this.jobKitWatchdog = jobKitWatchdog;
this.task = task;
this.disableTask = disableTask;
hasFirstStarted = false;
Expand All @@ -68,7 +71,7 @@ private void ifNextRunReferenceScheduled(final Runnable ifReady) {
private synchronized void planNextExec(final long interval) {
ifNextRunReferenceScheduled(() -> {
throw new IllegalStateException("Beware, the nextRunReference for \"" + name + "\" is still active (in "
+ nextRunReference.getDelay(TimeUnit.MILLISECONDS) + ")");
+ nextRunReference.getDelay(MILLISECONDS) + ")");
});
if (enabled == false) {
throw new IllegalStateException("Beware, this service is not enabled (" + name + ")");
Expand Down Expand Up @@ -96,7 +99,7 @@ private synchronized void planNextExec(final long interval) {
new Date(System.currentTimeMillis() + nextInterval));
event.planNextExec(name, spoolName, nextInterval);
});
}, interval, TimeUnit.MILLISECONDS);
}, interval, MILLISECONDS);
}

private synchronized void refreshInternalState(final boolean newEnabled, final long newTimedInterval) {
Expand All @@ -108,6 +111,7 @@ private synchronized void refreshInternalState(final boolean newEnabled, final l
} else {
refreshToDisabledMode(newTimedInterval);
}
jobKitWatchdog.refreshBackgroundService(name, spoolName, enabled, timedInterval);
}

private void refreshToEnabledMode(final long newTimedInterval) {
Expand All @@ -119,7 +123,7 @@ private void refreshToEnabledMode(final long newTimedInterval) {
log.info("Change Service interval time \"{}\", from {} to {}", name, timedInterval,
newTimedInterval);
ifNextRunReferenceScheduled(() -> {
final var eta = timedInterval - nextRunReference.getDelay(TimeUnit.MILLISECONDS);
final var eta = timedInterval - nextRunReference.getDelay(MILLISECONDS);
if (newTimedInterval > eta) {
/**
* Extend interval: replan next time newTimedInterval-eta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class FlatBackgroundService extends BackgroundService {
final String spoolName,
final Runnable task,
final RunnableWithException disableTask) {
super(null, spoolName, null, scheduledExecutor, null, null, null);
super(null, spoolName, null, scheduledExecutor, null, null, null, null);
runReference = new FlatScheduledFuture(task);
this.disableTask = disableTask;
this.scheduledExecutor = scheduledExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -103,37 +104,42 @@ public void shutdown() {

@Override
public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException();
return List.of();
}

@Override
public boolean isShutdown() {
throw new UnsupportedOperationException();
return false;
}

@Override
public boolean isTerminated() {
throw new UnsupportedOperationException();
return false;
}

@Override
public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
return true;
}

@Override
public <T> Future<T> submit(final Callable<T> task) {
throw new UnsupportedOperationException();
try {
return CompletableFuture.completedFuture(task.call());
} catch (final Exception e) {
return CompletableFuture.failedFuture(e);
}
}

@Override
public <T> Future<T> submit(final Runnable task, final T result) {
throw new UnsupportedOperationException();
task.run();
return CompletableFuture.completedFuture(result);
}

@Override
public Future<?> submit(final Runnable task) {
throw new UnsupportedOperationException();
return submit(task, null);
}

@Override
Expand Down Expand Up @@ -162,7 +168,7 @@ public <T> T invokeAny(final Collection<? extends Callable<T>> tasks,

@Override
public void execute(final Runnable command) {
throw new UnsupportedOperationException();
command.run();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
*/
package tv.hd3g.jobkit.engine;

import java.util.Objects;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import lombok.EqualsAndHashCode;

@EqualsAndHashCode
class FlatScheduledFuture implements ScheduledFuture<Void> {
private final Runnable run;

Expand All @@ -34,26 +36,6 @@ void run() {
run.run();
}

@Override
public int hashCode() {
return Objects.hash(run);
}

@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final var other = (FlatScheduledFuture) obj;
return Objects.equals(run, other.run);
}

@Override
public long getDelay(final TimeUnit unit) {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -86,7 +68,7 @@ public Void get() throws InterruptedException, ExecutionException {

@Override
public Void get(final long timeout,
final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -26,6 +27,8 @@ public class JobKitEngine implements JobTrait {
private final SupervisableManager supervisableManager;
private final AtomicBoolean shutdown;
private final Set<String> spoolsNamesToKeepRunningToTheEnd;
@Getter
private final JobKitWatchdog jobKitWatchdog;

public JobKitEngine(final ScheduledExecutorService scheduledExecutor,
final ExecutionEvent executionEvent,
Expand All @@ -37,11 +40,12 @@ public JobKitEngine(final ScheduledExecutorService scheduledExecutor,
spoolsNamesToKeepRunningToTheEnd = Collections.synchronizedSet(new HashSet<>());
this.supervisableManager = supervisableManager;
shutdown = new AtomicBoolean(false);
jobKitWatchdog = new JobKitWatchdog(supervisableManager, scheduledExecutor);

if (supervisableManager == null) {
spooler = new Spooler(executionEvent, SupervisableManager.voidSupervisableEvents());
spooler = new Spooler(executionEvent, SupervisableManager.voidSupervisableEvents(), jobKitWatchdog);
} else {
spooler = new Spooler(executionEvent, supervisableManager);
spooler = new Spooler(executionEvent, supervisableManager, jobKitWatchdog);
}

Runtime.getRuntime().addShutdownHook(new ShutdownHook());
Expand All @@ -57,6 +61,7 @@ protected JobKitEngine() {
scheduledExecutor = null;
backgroundServiceEvent = null;
spooler = null;
jobKitWatchdog = null;
backgroundServices = null;
supervisableManager = null;
shutdown = new AtomicBoolean(false);
Expand Down Expand Up @@ -92,6 +97,7 @@ public BackgroundService createService(final String name, // NOSONAR S1133
spooler,
scheduledExecutor,
backgroundServiceEvent,
jobKitWatchdog,
serviceTask,
onServiceDisableTask);
backgroundServices.add(service);
Expand Down Expand Up @@ -145,7 +151,6 @@ void shutdown() {
log.warn("App want to close: shutdown jobKitEngine...");
shutdown.set(true);
backgroundServices.forEach(BackgroundService::disable);
scheduledExecutor.shutdown();
spooler.shutdown(spoolsNamesToKeepRunningToTheEnd);
Optional.ofNullable(supervisableManager).ifPresent(SupervisableManager::close);
}
Expand Down
Loading

0 comments on commit e6c266e

Please sign in to comment.