diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/AtomicComputeReference.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/AtomicComputeReference.java index a69ac08e..12c7013e 100644 --- a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/AtomicComputeReference.java +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/AtomicComputeReference.java @@ -19,6 +19,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.UnaryOperator; public class AtomicComputeReference { private T t; @@ -36,8 +37,10 @@ public synchronized void setAnd(final T t, final Consumer process) { process.accept(t); } - public synchronized void reset() { + public synchronized T reset() { + final var old = t; t = null; + return old; } public synchronized boolean isSet() { @@ -57,4 +60,9 @@ public synchronized boolean computePredicate(final Predicate process) { } return process.test(t); } + + public synchronized void replace(final UnaryOperator process) { + t = process.apply(t); + } + } diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/BackgroundService.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/BackgroundService.java index 8f980ae2..3cfe3de9 100644 --- a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/BackgroundService.java +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/BackgroundService.java @@ -22,6 +22,7 @@ public class BackgroundService { private final BackgroundServiceEvent event; private final RunnableWithException task; private final RunnableWithException disableTask; + private final JobKitWatchdog jobKitWatchdog; private boolean enabled; private ScheduledFuture nextRunReference; @@ -36,6 +37,7 @@ public BackgroundService(final String name, final Spooler spooler, final ScheduledExecutorService scheduledExecutor, final BackgroundServiceEvent event, + final JobKitWatchdog jobKitWatchdog, final RunnableWithException task, final RunnableWithException disableTask) { this.name = name; @@ -43,6 +45,7 @@ public BackgroundService(final String name, this.spooler = spooler; this.scheduledExecutor = scheduledExecutor; this.event = event; + this.jobKitWatchdog = jobKitWatchdog; this.task = task; this.disableTask = disableTask; hasFirstStarted = false; @@ -68,7 +71,7 @@ private void ifNextRunReferenceScheduled(final Runnable ifReady) { private synchronized void planNextExec(final long interval) { ifNextRunReferenceScheduled(() -> { throw new IllegalStateException("Beware, the nextRunReference for \"" + name + "\" is still active (in " - + nextRunReference.getDelay(TimeUnit.MILLISECONDS) + ")"); + + nextRunReference.getDelay(MILLISECONDS) + ")"); }); if (enabled == false) { throw new IllegalStateException("Beware, this service is not enabled (" + name + ")"); @@ -96,7 +99,7 @@ private synchronized void planNextExec(final long interval) { new Date(System.currentTimeMillis() + nextInterval)); event.planNextExec(name, spoolName, nextInterval); }); - }, interval, TimeUnit.MILLISECONDS); + }, interval, MILLISECONDS); } private synchronized void refreshInternalState(final boolean newEnabled, final long newTimedInterval) { @@ -108,6 +111,7 @@ private synchronized void refreshInternalState(final boolean newEnabled, final l } else { refreshToDisabledMode(newTimedInterval); } + jobKitWatchdog.refreshBackgroundService(name, spoolName, enabled, timedInterval); } private void refreshToEnabledMode(final long newTimedInterval) { @@ -119,7 +123,7 @@ private void refreshToEnabledMode(final long newTimedInterval) { log.info("Change Service interval time \"{}\", from {} to {}", name, timedInterval, newTimedInterval); ifNextRunReferenceScheduled(() -> { - final var eta = timedInterval - nextRunReference.getDelay(TimeUnit.MILLISECONDS); + final var eta = timedInterval - nextRunReference.getDelay(MILLISECONDS); if (newTimedInterval > eta) { /** * Extend interval: replan next time newTimedInterval-eta diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/FlatBackgroundService.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/FlatBackgroundService.java index 6095d902..a9ddfc70 100644 --- a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/FlatBackgroundService.java +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/FlatBackgroundService.java @@ -29,7 +29,7 @@ class FlatBackgroundService extends BackgroundService { final String spoolName, final Runnable task, final RunnableWithException disableTask) { - super(null, spoolName, null, scheduledExecutor, null, null, null); + super(null, spoolName, null, scheduledExecutor, null, null, null, null); runReference = new FlatScheduledFuture(task); this.disableTask = disableTask; this.scheduledExecutor = scheduledExecutor; diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/FlatScheduledExecutorService.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/FlatScheduledExecutorService.java index 58617c22..877354e4 100644 --- a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/FlatScheduledExecutorService.java +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/FlatScheduledExecutorService.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -103,37 +104,42 @@ public void shutdown() { @Override public List shutdownNow() { - throw new UnsupportedOperationException(); + return List.of(); } @Override public boolean isShutdown() { - throw new UnsupportedOperationException(); + return false; } @Override public boolean isTerminated() { - throw new UnsupportedOperationException(); + return false; } @Override public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { - throw new UnsupportedOperationException(); + return true; } @Override public Future submit(final Callable task) { - throw new UnsupportedOperationException(); + try { + return CompletableFuture.completedFuture(task.call()); + } catch (final Exception e) { + return CompletableFuture.failedFuture(e); + } } @Override public Future submit(final Runnable task, final T result) { - throw new UnsupportedOperationException(); + task.run(); + return CompletableFuture.completedFuture(result); } @Override public Future submit(final Runnable task) { - throw new UnsupportedOperationException(); + return submit(task, null); } @Override @@ -162,7 +168,7 @@ public T invokeAny(final Collection> tasks, @Override public void execute(final Runnable command) { - throw new UnsupportedOperationException(); + command.run(); } } diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/FlatScheduledFuture.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/FlatScheduledFuture.java index ff31c9fa..ae369870 100644 --- a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/FlatScheduledFuture.java +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/FlatScheduledFuture.java @@ -16,13 +16,15 @@ */ package tv.hd3g.jobkit.engine; -import java.util.Objects; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode class FlatScheduledFuture implements ScheduledFuture { private final Runnable run; @@ -34,26 +36,6 @@ void run() { run.run(); } - @Override - public int hashCode() { - return Objects.hash(run); - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final var other = (FlatScheduledFuture) obj; - return Objects.equals(run, other.run); - } - @Override public long getDelay(final TimeUnit unit) { throw new UnsupportedOperationException(); @@ -86,7 +68,7 @@ public Void get() throws InterruptedException, ExecutionException { @Override public Void get(final long timeout, - final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { throw new UnsupportedOperationException(); } diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/JobKitEngine.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/JobKitEngine.java index 0f0aedfc..db6135f3 100644 --- a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/JobKitEngine.java +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/JobKitEngine.java @@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -26,6 +27,8 @@ public class JobKitEngine implements JobTrait { private final SupervisableManager supervisableManager; private final AtomicBoolean shutdown; private final Set spoolsNamesToKeepRunningToTheEnd; + @Getter + private final JobKitWatchdog jobKitWatchdog; public JobKitEngine(final ScheduledExecutorService scheduledExecutor, final ExecutionEvent executionEvent, @@ -37,11 +40,12 @@ public JobKitEngine(final ScheduledExecutorService scheduledExecutor, spoolsNamesToKeepRunningToTheEnd = Collections.synchronizedSet(new HashSet<>()); this.supervisableManager = supervisableManager; shutdown = new AtomicBoolean(false); + jobKitWatchdog = new JobKitWatchdog(supervisableManager, scheduledExecutor); if (supervisableManager == null) { - spooler = new Spooler(executionEvent, SupervisableManager.voidSupervisableEvents()); + spooler = new Spooler(executionEvent, SupervisableManager.voidSupervisableEvents(), jobKitWatchdog); } else { - spooler = new Spooler(executionEvent, supervisableManager); + spooler = new Spooler(executionEvent, supervisableManager, jobKitWatchdog); } Runtime.getRuntime().addShutdownHook(new ShutdownHook()); @@ -57,6 +61,7 @@ protected JobKitEngine() { scheduledExecutor = null; backgroundServiceEvent = null; spooler = null; + jobKitWatchdog = null; backgroundServices = null; supervisableManager = null; shutdown = new AtomicBoolean(false); @@ -92,6 +97,7 @@ public BackgroundService createService(final String name, // NOSONAR S1133 spooler, scheduledExecutor, backgroundServiceEvent, + jobKitWatchdog, serviceTask, onServiceDisableTask); backgroundServices.add(service); @@ -145,7 +151,6 @@ void shutdown() { log.warn("App want to close: shutdown jobKitEngine..."); shutdown.set(true); backgroundServices.forEach(BackgroundService::disable); - scheduledExecutor.shutdown(); spooler.shutdown(spoolsNamesToKeepRunningToTheEnd); Optional.ofNullable(supervisableManager).ifPresent(SupervisableManager::close); } diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/JobKitWatchdog.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/JobKitWatchdog.java new file mode 100644 index 00000000..3c028b88 --- /dev/null +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/JobKitWatchdog.java @@ -0,0 +1,370 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine; + +import static java.lang.Long.MAX_VALUE; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.function.Predicate.not; +import static java.util.stream.Collectors.toUnmodifiableMap; +import static java.util.stream.Collectors.toUnmodifiableSet; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import lombok.extern.slf4j.Slf4j; +import tv.hd3g.jobkit.engine.watchdog.JobWatchdogPolicy; +import tv.hd3g.jobkit.engine.watchdog.JobWatchdogPolicyWarning; +import tv.hd3g.jobkit.engine.watchdog.JobWatchdogSpoolReport; +import tv.hd3g.jobkit.engine.watchdog.WatchableBackgroundService; +import tv.hd3g.jobkit.engine.watchdog.WatchableSpoolJobState; + +/** + * ThreadSafe + */ +@Slf4j +public class JobKitWatchdog { + + private final SupervisableEvents supervisableEvents; + private final ScheduledExecutorService scheduledExecutor; + private final List policies; + private final Map> activeServicesBySpool; + private final Map> jobsBySpool; + private final Map tiggeredPolicyBySpool; + private final AtomicComputeReference> nextPolicyCheck; + private final AtomicBoolean shutdown; + + JobKitWatchdog(final SupervisableEvents supervisableEvents, + final ScheduledExecutorService scheduledExecutor) { + this.supervisableEvents = supervisableEvents; + this.scheduledExecutor = scheduledExecutor; + policies = new ArrayList<>(); + activeServicesBySpool = new HashMap<>(); + jobsBySpool = new HashMap<>(); + tiggeredPolicyBySpool = new HashMap<>(); + nextPolicyCheck = new AtomicComputeReference<>(); + shutdown = new AtomicBoolean(false); + } + + public JobKitWatchdog addPolicies(final JobWatchdogPolicy... policies) { + Objects.requireNonNull(policies); + synchronized (this.policies) { + this.policies.addAll(Arrays.asList(policies)); + } + return this; + } + + /** + * @return an unmodifiable copy of current policies list + */ + public List getPolicies() { + synchronized (policies) { + return policies.stream().toList(); + } + } + + void refreshBackgroundService(final String serviceName, + final String spoolName, + final boolean enabled, + final long timedInterval) { + synchronized (activeServicesBySpool) { + activeServicesBySpool.putIfAbsent(spoolName, new HashSet<>()); + final var services = activeServicesBySpool.get(spoolName); + services.removeIf(s -> s.serviceName().equals(spoolName)); + if (enabled) { + services.add( + new WatchableBackgroundService( + serviceName, + spoolName, + timedInterval)); + } + } + scheduledExecutor.execute(new Policies()); + } + + private void addWatchableJob(final Set jobs, + final WatchableSpoolJob job, + final Date createdDate, + final long statedDate) { + Optional oStartedDate = Optional.empty(); + if (statedDate > 0l) { + oStartedDate = Optional.ofNullable(statedDate); + } + jobs.add( + new WatchableSpoolJobState(createdDate, + job.getCommandName(), + job.getCreatedIndex(), + job.getCreator(), + oStartedDate)); + } + + void addJob(final WatchableSpoolJob job) { + final var createdDate = new Date(); + synchronized (jobsBySpool) { + jobsBySpool.putIfAbsent(job.getSpoolName(), new HashSet<>()); + final var jobs = jobsBySpool.get(job.getSpoolName()); + addWatchableJob(jobs, job, createdDate, 0); + } + scheduledExecutor.execute(new Policies()); + } + + private WatchableSpoolJobState getOldAndRemoveJobInSpool(final WatchableSpoolJob job, + final Set jobs) { + final var createdIndex = job.getCreatedIndex(); + final var createdJob = jobs.stream() + .filter(j -> j.createdIndex() == createdIndex) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Can't found job {}/{} #{} in current active jobs")); + jobs.remove(createdJob); + return createdJob; + } + + void startJob(final WatchableSpoolJob job, final long startedDate) { + synchronized (jobsBySpool) { + final var jobs = jobsBySpool.get(job.getSpoolName()); + final var createdJob = getOldAndRemoveJobInSpool(job, jobs); + addWatchableJob(jobs, job, createdJob.createdDate(), startedDate); + } + scheduledExecutor.execute(new Policies()); + } + + void endJob(final WatchableSpoolJob job) { + synchronized (jobsBySpool) { + final var jobs = jobsBySpool.get(job.getSpoolName()); + getOldAndRemoveJobInSpool(job, jobs); + } + scheduledExecutor.execute(new Policies()); + } + + private static Map> deepCloneFilterEmpty(final Map> map) { + return map.keySet().stream() + .filter(key -> map.get(key).isEmpty() == false) + .collect(toUnmodifiableMap(key -> key, + key -> map.get(key) + .stream() + .collect(toUnmodifiableSet()))); + } + + private class Policies implements Runnable { + + private Map> currentJobsBySpool; + private Map> currentServicesBySpool; + + private Set getQueuedJobs(final String spoolName) { + return currentJobsBySpool.get(spoolName) + .stream() + .filter(j -> j.startedDate().isPresent() == false) + .collect(toUnmodifiableSet()); + } + + private Optional getActiveJob(final String spoolName) { + return currentJobsBySpool.get(spoolName) + .stream() + .filter(j -> j.startedDate().isPresent()) + .findFirst(); + } + + private void tiggerPolicy(final String spoolName, + final WatchableSpoolJobState activeJob, + final Set queuedJobs, + final Set relativeBackgroundServices, + final JobWatchdogPolicy policy, + final JobWatchdogPolicyWarning warning) { + synchronized (tiggeredPolicyBySpool) { + if (tiggeredPolicyBySpool.containsKey(spoolName)) { + log.trace("Policy \"{}\" was rise a warn, again, on {}", policy.getDescription(), spoolName); + return; + } + final var report = new JobWatchdogSpoolReport( + new Date(), + spoolName, + activeJob, + queuedJobs, + policy, + warning, + relativeBackgroundServices); + tiggeredPolicyBySpool.put(spoolName, report); + + log.warn("Policy \"{}\" rise a warn on {}: {}", policy.getDescription(), spoolName); + log.debug("Send report: {}", report); + supervisableEvents.onJobWatchdogSpoolReport(report); + } + } + + private void releasePolicy(final String spoolName, final JobWatchdogPolicy policy) { + synchronized (tiggeredPolicyBySpool) { + final var oldReport = tiggeredPolicyBySpool.get(spoolName); + if (oldReport == null || oldReport.policy().equals(policy) == false) { + return; + } + tiggeredPolicyBySpool.remove(spoolName); + + log.info("Policy \"{}\" release a warn on {}: {}", policy.getDescription(), spoolName); + log.debug("Send report: {}", oldReport); + supervisableEvents.onJobWatchdogSpoolReleaseReport(oldReport); + } + } + + private long applyPoliciesAndGetLowerNextTimeToCheckOnRegularSpools(final Set regularSpools, + final JobWatchdogPolicy policy, + final String description) { + log.debug("Apply policy: {}, on regular spools: {}", description, regularSpools); + return regularSpools.stream().mapToLong(spoolName -> { + final var oActiveJob = getActiveJob(spoolName); + if (oActiveJob.isEmpty()) { + return MAX_VALUE; + } + final var activeJob = oActiveJob.get(); + final var queuedJobs = getQueuedJobs(spoolName); + try { + final var durationToQueue = policy.isStatusOk( + spoolName, + activeJob, + queuedJobs) + .map(Duration::toMillis) + .orElse(0l); + releasePolicy(spoolName, policy); + if (durationToQueue > 0) { + return durationToQueue; + } + } catch (final JobWatchdogPolicyWarning e) { + tiggerPolicy(spoolName, activeJob, queuedJobs, Set.of(), policy, e); + } + return MAX_VALUE; + }).min().orElse(MAX_VALUE); + } + + private long applyPoliciesAndGetLowerNextTimeToCheckOnServicesSpools(final Set serviceSpools, + final JobWatchdogPolicy policy, + final String description) { + log.debug("Apply policy: {}, on services spools: {}", description, serviceSpools); + return serviceSpools.stream().mapToLong(spoolName -> { + final var oActiveJob = getActiveJob(spoolName); + if (oActiveJob.isEmpty()) { + return MAX_VALUE; + } + final var activeJob = oActiveJob.get(); + final var queuedJobs = getQueuedJobs(spoolName); + final var relativeBackgroundServices = currentServicesBySpool.get(spoolName) + .stream() + .collect(toUnmodifiableSet()); + try { + policy.isStatusOk( + spoolName, + oActiveJob.get(), + queuedJobs, + relativeBackgroundServices); + + final var nextInterval = currentServicesBySpool.get(spoolName).stream() + .mapToLong(WatchableBackgroundService::timedInterval) + .min() + .orElse(MAX_VALUE); + releasePolicy(spoolName, policy); + return nextInterval; + } catch (final JobWatchdogPolicyWarning e) { + tiggerPolicy(spoolName, activeJob, queuedJobs, relativeBackgroundServices, policy, e); + } + return MAX_VALUE; + }).min().orElse(MAX_VALUE); + } + + @Override + public void run() { + if (shutdown.get()) { + log.debug("Don't apply policies: shutdown"); + return; + } + synchronized (activeServicesBySpool) { + currentServicesBySpool = deepCloneFilterEmpty(activeServicesBySpool); + } + synchronized (jobsBySpool) { + currentJobsBySpool = deepCloneFilterEmpty(jobsBySpool); + } + + final var regularSpools = currentJobsBySpool.keySet().stream() + .filter(not(currentServicesBySpool::containsKey)) + .collect(toUnmodifiableSet()); + final var serviceSpools = currentJobsBySpool.keySet().stream() + .filter(currentServicesBySpool::containsKey) + .collect(toUnmodifiableSet()); + + final var oLowerDurationToQueue = getPolicies().stream() + .mapToLong(policy -> { + final var description = policy.getDescription(); + + final var lowerDurationToQueueRegular = applyPoliciesAndGetLowerNextTimeToCheckOnRegularSpools( + regularSpools, policy, description); + final var lowerDurationToQueueService = applyPoliciesAndGetLowerNextTimeToCheckOnServicesSpools( + serviceSpools, policy, description); + + log.trace("Next lowerDurationToQueueRegular={}, lowerDurationToQueueService={}", + Duration.ofMillis(lowerDurationToQueueRegular), + Duration.ofMillis(lowerDurationToQueueService)); + + return Math.min(lowerDurationToQueueRegular, lowerDurationToQueueService); + }) + .min(); + + if (oLowerDurationToQueue.isEmpty() + || oLowerDurationToQueue.getAsLong() == MAX_VALUE) { + return; + } + + final var lowerDurationToQueue = oLowerDurationToQueue.getAsLong(); + + log.debug("Next lowerDurationToQueue={}", Duration.ofMillis(lowerDurationToQueue)); + + nextPolicyCheck.replace(actualSch -> { + if (actualSch != null + && actualSch.isDone() == false + && actualSch.isCancelled() == false) { + if (actualSch.getDelay(MILLISECONDS) < lowerDurationToQueue) { + log.trace("Don't need to remove previous scheduled: {} ms instead of {} ms", + Duration.ofMillis(actualSch.getDelay(MILLISECONDS)), + Duration.ofMillis(lowerDurationToQueue)); + return actualSch; + } + + log.trace("Cancel previous scheduled {}", actualSch); + actualSch.cancel(false); + } + log.trace("Scheduled next {} in {}", actualSch, Duration.ofMillis(lowerDurationToQueue)); + return scheduledExecutor.schedule(new Policies(), lowerDurationToQueue, MILLISECONDS); + }); + + } + + } + + void shutdown() { + shutdown.set(true); + log.debug("Close JobKitWatchDog"); + Optional.ofNullable(nextPolicyCheck.reset()).ifPresent(n -> n.cancel(true)); + } + +} diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SpoolExecutor.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SpoolExecutor.java index fb6a86a1..343da41d 100644 --- a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SpoolExecutor.java +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SpoolExecutor.java @@ -1,5 +1,7 @@ package tv.hd3g.jobkit.engine; +import static java.util.function.Predicate.not; + import java.util.Comparator; import java.util.Optional; import java.util.concurrent.PriorityBlockingQueue; @@ -7,12 +9,15 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Stream; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; @Slf4j class SpoolExecutor { + @Getter private final String name; private final ExecutionEvent event; private final AtomicLong threadCount; @@ -22,16 +27,19 @@ class SpoolExecutor { private final PriorityBlockingQueue queue; private final AtomicBoolean shutdown; private final SupervisableEvents supervisableEvents; + private final JobKitWatchdog jobKitWatchdog; SpoolExecutor(final String name, final ExecutionEvent event, final AtomicLong threadCount, - final SupervisableEvents supervisableEvents) { + final SupervisableEvents supervisableEvents, + final JobKitWatchdog jobKitWatchdog) { this.name = name; this.event = event; this.threadCount = threadCount; + this.jobKitWatchdog = jobKitWatchdog; queueComparator = (l, r) -> { - final var compared = Integer.compare(r.priority, l.priority); + final var compared = Integer.compare(r.jobPriority, l.jobPriority); if (compared == 0) { return Long.compare(l.createdIndex, r.createdIndex); } @@ -51,7 +59,9 @@ boolean addToQueue(final RunnableWithException command, log.error("Can't add to queue new command \"{}\" by \"{}\": the spool is shutdown", name, this.name); return false; } - queue.add(new SpoolJob(command, name, priority, afterRunCommand, this)); + final var newJob = new SpoolJob(command, name, priority, afterRunCommand, this); + queue.add(newJob); + jobKitWatchdog.addJob(newJob); log.debug("Add new command \"{}\" by \"{}\" with P{}", name, this.name, priority); runNext(); return true; @@ -123,19 +133,31 @@ void clean(final boolean purgeWaitList) { log.debug("Spool {} is now empty, without running tasks", name); } - private class SpoolJob extends Thread implements SupervisableSupplier { + private static Optional getCaller() { + return Stream.of(new Throwable().getStackTrace()) + .filter(not(StackTraceElement::isNativeMethod)) + .filter(not(t -> t.getClassName().startsWith(SpoolExecutor.class.getPackageName()))) + .findFirst(); + } + + private class SpoolJob extends Thread implements SupervisableSupplier, WatchableSpoolJob { final RunnableWithException command; + @Getter final String commandName; - final int priority; + final int jobPriority; final Consumer afterRunCommand; + @Getter final SpoolExecutor executorReferer; final AtomicReference supervisableReference; + @Getter final long createdIndex; + @Getter + final Optional creator; SpoolJob(final RunnableWithException command, final String commandName, - final int priority, + final int jobPriority, final Consumer afterRunCommand, final SpoolExecutor executorReferer) { super(); @@ -146,10 +168,11 @@ private class SpoolJob extends Thread implements SupervisableSupplier { this.command = command; this.commandName = commandName; - this.priority = priority; + this.jobPriority = jobPriority; this.afterRunCommand = afterRunCommand; this.executorReferer = executorReferer; supervisableReference = new AtomicReference<>(); + creator = getCaller(); } private Supervisable createSupervisable(final String jobName) { @@ -160,6 +183,9 @@ private Supervisable createSupervisable(final String jobName) { @Override public void run() { + final var startTime = System.currentTimeMillis(); + jobKitWatchdog.startJob(this, startTime); + var currentSupervisable = createSupervisable(commandName + " beforeRunJob"); try { supervisableReference.set(currentSupervisable); @@ -172,7 +198,6 @@ public void run() { } currentSupervisable = createSupervisable(commandName); - final var startTime = System.currentTimeMillis(); Exception error = null; try { log.debug("Start new command \"{}\" by \"{}\"", commandName, name); @@ -219,6 +244,8 @@ public void run() { supervisableReference.set(null); + jobKitWatchdog.endJob(this); + synchronized (queue) { currentOperation.reset(); } diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/Spooler.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/Spooler.java index dab36f8c..bf9741c7 100644 --- a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/Spooler.java +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/Spooler.java @@ -7,6 +7,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -17,13 +18,18 @@ public class Spooler { private final AtomicLong threadCount; private final AtomicBoolean shutdown; private final SupervisableEvents supervisableEvents; + @Getter + private final JobKitWatchdog jobKitWatchdog; - public Spooler(final ExecutionEvent event, final SupervisableEvents supervisableEvents) { + public Spooler(final ExecutionEvent event, + final SupervisableEvents supervisableEvents, + final JobKitWatchdog jobKitWatchdog) { this.event = event; spoolExecutors = new ConcurrentHashMap<>(); threadCount = new AtomicLong(0); shutdown = new AtomicBoolean(false); this.supervisableEvents = supervisableEvents; + this.jobKitWatchdog = jobKitWatchdog; } private Stream getSpoolExecutorStream() { @@ -39,7 +45,7 @@ public SpoolExecutor getExecutor(final String name) { return spoolExecutors.get(name); } return spoolExecutors.computeIfAbsent(name, - n -> new SpoolExecutor(n, event, threadCount, supervisableEvents)); + n -> new SpoolExecutor(n, event, threadCount, supervisableEvents, jobKitWatchdog)); } public int getAllQueuesSize() { @@ -57,6 +63,7 @@ public void shutdown(final Set spoolsNamesToKeepRunningToTheEnd) { if (shutdown.get()) { return; } + jobKitWatchdog.shutdown(); shutdown.set(true); final var count = getRunningQueuesCount(); diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SupervisableEventRegister.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SupervisableEventRegister.java new file mode 100644 index 00000000..9f1a6fbc --- /dev/null +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SupervisableEventRegister.java @@ -0,0 +1,25 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine; + +public interface SupervisableEventRegister { + + void registerOnEndEventConsumer(SupervisableOnEndEventConsumer onEndEventConsumer); + + SupervisableContextExtractor createContextExtractor(SupervisableEndEvent event); + +} diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SupervisableEvents.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SupervisableEvents.java index e72dcc7d..9a6a1789 100644 --- a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SupervisableEvents.java +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SupervisableEvents.java @@ -18,9 +18,16 @@ import java.util.Optional; +import tv.hd3g.jobkit.engine.watchdog.JobWatchdogSpoolReport; + public interface SupervisableEvents extends SupervisableSerializer { default void onEnd(final Supervisable supervisable, final Optional oError) { } + default void onJobWatchdogSpoolReport(final JobWatchdogSpoolReport report) { + } + + default void onJobWatchdogSpoolReleaseReport(final JobWatchdogSpoolReport oldReport) { + } } diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SupervisableManager.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SupervisableManager.java index f0a24903..a675e5db 100644 --- a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SupervisableManager.java +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/SupervisableManager.java @@ -16,8 +16,10 @@ */ package tv.hd3g.jobkit.engine; +import java.time.Duration; import java.util.ArrayDeque; import java.util.Collections; +import java.util.Date; import java.util.HashSet; import java.util.Objects; import java.util.Optional; @@ -29,9 +31,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; +import tv.hd3g.jobkit.engine.watchdog.JobWatchdogSpoolReport; @Slf4j -public class SupervisableManager implements SupervisableEvents { +public class SupervisableManager implements SupervisableEvents, SupervisableEventRegister { + + private static final String JOBKIT_WATCHDOGSPOOL_WARNING_MESSAGE = "jobkit.watchdogspool.warning.message"; + private static final String JOBKIT_WATCHDOGSPOOL_RELEASE_MESSAGE = "jobkit.watchdogspool.warning.releasemessage"; private final String name; private final ObjectMapper objectMapper; @@ -53,6 +59,7 @@ public SupervisableManager(final String name) { this(name, new ObjectMapper(), 10); } + @Override public SupervisableContextExtractor createContextExtractor(final SupervisableEndEvent event) { return new SupervisableContextExtractor(this, event); } @@ -71,6 +78,7 @@ void close() { } } + @Override public void registerOnEndEventConsumer(final SupervisableOnEndEventConsumer onEndEventConsumer) { Objects.requireNonNull(onEndEventConsumer, "\"onEndEventConsumer\" can't to be null"); onEndEventConsumers.add(onEndEventConsumer); @@ -109,6 +117,91 @@ public void onEnd(final Supervisable supervisable, final Optional oEr } } + @Override + public void onJobWatchdogSpoolReport(final JobWatchdogSpoolReport report) { + final var s = new Supervisable(report.spoolName(), report.activeJob().commandName(), this); + + s.start(); + s.markAsUrgent(); + s.markAsInternalStateChange(); + + var i = 0; + s.onMessage( + JOBKIT_WATCHDOGSPOOL_WARNING_MESSAGE + i++, + "For information, a current job execution spooler (\"{0}\") seams to be have some troubles, maybe the current running task is blocked, with {1} waiting task(s). The warning report say: \"{2}\"", + report.spoolName(), + report.queuedJobs().size(), + report.warning().getMessage()); + + s.onMessage( + JOBKIT_WATCHDOGSPOOL_WARNING_MESSAGE + i++, + "The current running job \"{0}\" was created the {1}, started the {2}, by \"{3}\".", + report.activeJob().commandName(), + report.activeJob().createdDate(), + report.activeJob().startedDate().map(Date::new).orElse(null), + report.activeJob().creator().map(StackTraceElement::toString).orElse("(source unknown)")); + + final var queuedJobs = report.queuedJobs().stream() + .sorted((l, r) -> Long.compare(l.createdIndex(), r.createdIndex())).toList(); + if (queuedJobs.isEmpty() == false) { + s.onMessage( + JOBKIT_WATCHDOGSPOOL_WARNING_MESSAGE + i++, + "The older queued (waiting) job in this spooler was created the {0}.", + queuedJobs.get(0).createdDate()); + } + if (queuedJobs.size() > 1) { + s.onMessage( + JOBKIT_WATCHDOGSPOOL_WARNING_MESSAGE + i++, // NOSONAR S1854 + "The most recent queued (waiting) job in this spooler was created the {0}.", + queuedJobs.get(queuedJobs.size() - 1).createdDate()); + } + + if (report.relativeBackgroundServices().isEmpty() == false) { + s.onMessage( + JOBKIT_WATCHDOGSPOOL_WARNING_MESSAGE + i++, // NOSONAR S1854 + "Some jobs (or the totality) was created by one of those application internal service:"); + + report.relativeBackgroundServices() + .forEach(b -> s.onMessage( + JOBKIT_WATCHDOGSPOOL_WARNING_MESSAGE + ".bckservice", // NOSONAR S1854 + "{0}, runned every {1} after the last runned job", + b.serviceName(), Duration.ofMillis(b.timedInterval())) + + ); + } + + s.resultDone( + "jobkit.watchdogspool.warning.policy." + report.policy().getClass().getSimpleName().toLowerCase(), + "[Execution spool warning] {0} on \"{1}\"", + report.policy().getDescription(), + report.spoolName()); + s.setContext(JobWatchdogSpoolReport.class.getName(), report); + s.end(); + } + + @Override + public void onJobWatchdogSpoolReleaseReport(final JobWatchdogSpoolReport report) { + final var s = new Supervisable(report.spoolName(), report.activeJob().commandName(), this); + + s.start(); + s.markAsUrgent(); + s.markAsInternalStateChange(); + + s.onMessage( + JOBKIT_WATCHDOGSPOOL_RELEASE_MESSAGE, + "For information, a job execution spooler (\"{0}\") was triggered an alert. This alert is now closed, the queue resumed a normal activity. The warning report was said: \"{1}\"", + report.spoolName(), + report.warning().getMessage()); + + s.resultDone( + "jobkit.watchdogspool.warning.policy." + report.policy().getClass().getSimpleName().toLowerCase(), + "[Problem closed] {0} on \"{1}\"", + report.policy().getDescription(), + report.spoolName()); + s.setContext(JobWatchdogSpoolReport.class.getName(), report); + s.end(); + } + @Override public JsonNode extractContext(final Object businessObject) { log.trace("Extract {} / {}...", businessObject, businessObject.getClass().getName()); diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/WatchableSpoolJob.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/WatchableSpoolJob.java new file mode 100644 index 00000000..63816b5c --- /dev/null +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/WatchableSpoolJob.java @@ -0,0 +1,35 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine; + +import java.util.Optional; + +interface WatchableSpoolJob { + + Optional getCreator(); + + String getCommandName(); + + long getCreatedIndex(); + + SpoolExecutor getExecutorReferer(); + + default String getSpoolName() { + return getExecutorReferer().getName(); + } + +} diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/JobWatchdogPolicy.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/JobWatchdogPolicy.java new file mode 100644 index 00000000..c15f180b --- /dev/null +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/JobWatchdogPolicy.java @@ -0,0 +1,39 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine.watchdog; + +import java.time.Duration; +import java.util.Optional; +import java.util.Set; + +public interface JobWatchdogPolicy { + + /** + * @return empty/zero == no plan to future checks, else do a future check after this time. + */ + Optional isStatusOk(String spoolName, + WatchableSpoolJobState activeJob, + Set queuedJobs) throws JobWatchdogPolicyWarning; + + void isStatusOk(String spoolName, + WatchableSpoolJobState activeJob, + Set queuedJobs, + Set relativeBackgroundServices) throws JobWatchdogPolicyWarning; + + String getDescription(); + +} diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/JobWatchdogPolicyWarning.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/JobWatchdogPolicyWarning.java new file mode 100644 index 00000000..79108cac --- /dev/null +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/JobWatchdogPolicyWarning.java @@ -0,0 +1,25 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine.watchdog; + +public class JobWatchdogPolicyWarning extends Exception { + + public JobWatchdogPolicyWarning(final String message) { + super(message); + } + +} diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/JobWatchdogSpoolReport.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/JobWatchdogSpoolReport.java new file mode 100644 index 00000000..98b89dfe --- /dev/null +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/JobWatchdogSpoolReport.java @@ -0,0 +1,30 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine.watchdog; + +import java.util.Date; +import java.util.Set; + +public record JobWatchdogSpoolReport(Date created, + String spoolName, + WatchableSpoolJobState activeJob, + Set queuedJobs, + JobWatchdogPolicy policy, + JobWatchdogPolicyWarning warning, + Set relativeBackgroundServices) { + +} diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/LimitedExecTimePolicy.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/LimitedExecTimePolicy.java new file mode 100644 index 00000000..1566a404 --- /dev/null +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/LimitedExecTimePolicy.java @@ -0,0 +1,94 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine.watchdog; + +import java.time.Duration; +import java.util.Optional; +import java.util.Set; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = true) +@NoArgsConstructor +@Slf4j +public class LimitedExecTimePolicy extends LimitedSpoolsBasePolicy { + + private static final String A_JOB_EXECUTION_SHOULD_NOT_LAST_LONGER_THAN = "A job execution should not last longer than "; + private Duration maxExecTime; + + @Override + public Optional isStatusOk(final String spoolName, + final WatchableSpoolJobState activeJob, + final Set queuedJobs) throws JobWatchdogPolicyWarning { + if (maxExecTime == null + || maxExecTime.isNegative() + || maxExecTime.isZero()) { + log.warn("You should set a maxExecTime for this policy"); + return Optional.empty(); + } + if (allowSpool(spoolName) == false) { + return Optional.empty(); + } + + final var oRuntime = activeJob.getRunTime(); + if (oRuntime.isEmpty()) { + return Optional.empty(); + } + final var runtime = oRuntime.get(); + final var remainingTimeAllowed = maxExecTime.toMillis() - runtime.toMillis(); + + if (remainingTimeAllowed < 0) { + throw new JobWatchdogPolicyWarning("This job was started " + runtime + + " ago, but the limit for its spool (" + spoolName + ") was " + + maxExecTime + " max"); + } + + return Optional.ofNullable(Duration.ofMillis(remainingTimeAllowed)); + } + + @Override + public void isStatusOk(final String spoolName, + final WatchableSpoolJobState activeJob, + final Set queuedJobs, + final Set relativeBackgroundServices) throws JobWatchdogPolicyWarning { + isStatusOk(spoolName, activeJob, queuedJobs); + } + + @Override + public String getDescription() { + final var allow = getOnlySpools(); + if (allow.isEmpty() == false) { + return A_JOB_EXECUTION_SHOULD_NOT_LAST_LONGER_THAN + maxExecTime + ", only for spools " + allow; + } + + final var deny = getNotSpools(); + if (deny.isEmpty() == false) { + return A_JOB_EXECUTION_SHOULD_NOT_LAST_LONGER_THAN + maxExecTime + ", not for spools " + deny; + } + + return A_JOB_EXECUTION_SHOULD_NOT_LAST_LONGER_THAN + maxExecTime; + } + +} diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/LimitedServiceExecTimePolicy.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/LimitedServiceExecTimePolicy.java new file mode 100644 index 00000000..8d8cc61e --- /dev/null +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/LimitedServiceExecTimePolicy.java @@ -0,0 +1,98 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine.watchdog; + +import java.time.Duration; +import java.util.Optional; +import java.util.Set; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = true) +@NoArgsConstructor +@Slf4j +public class LimitedServiceExecTimePolicy extends LimitedSpoolsBasePolicy { + + private static final String AN_INTERNAL_SERVICE_SHOULD_NOT_PRODUCE_JOBS_WHO_RUN_MORE_THAN = "An internal Service should not produce jobs who run more than "; + private int waitFactor; + + @Override + public Optional isStatusOk(final String spoolName, + final WatchableSpoolJobState activeJob, + final Set queuedJobs) throws JobWatchdogPolicyWarning { + return Optional.empty(); + } + + @Override + public void isStatusOk(final String spoolName, + final WatchableSpoolJobState activeJob, + final Set queuedJobs, + final Set relativeBackgroundServices) throws JobWatchdogPolicyWarning { + if (waitFactor < 1) { + log.warn("You should set a valid waitFactor (not {}) for this policy", waitFactor); + return; + } + if (allowSpool(spoolName) == false) { + return; + } + final var oRuntime = activeJob.getRunTime(); + if (oRuntime.isEmpty()) { + return; + } + final var runtime = oRuntime.get(); + final var maxExecTime = relativeBackgroundServices.stream() + .mapToLong(WatchableBackgroundService::timedInterval) + .max() + .orElseThrow(); + + final var remainingTimeAllowed = maxExecTime * waitFactor - runtime.toMillis(); + + if (remainingTimeAllowed < 0) { + throw new JobWatchdogPolicyWarning("This job was started " + runtime + + " ago, but max limit time for all the Services run on this spool (" + + spoolName + ") was " + + Duration.ofMillis(maxExecTime * waitFactor) + " max"); + } + } + + @Override + public String getDescription() { + final var allow = getOnlySpools(); + if (allow.isEmpty() == false) { + return AN_INTERNAL_SERVICE_SHOULD_NOT_PRODUCE_JOBS_WHO_RUN_MORE_THAN + waitFactor + + " times the time between two Service runs, only for spools " + allow; + } + + final var deny = getNotSpools(); + if (deny.isEmpty() == false) { + return AN_INTERNAL_SERVICE_SHOULD_NOT_PRODUCE_JOBS_WHO_RUN_MORE_THAN + waitFactor + + " times the time between two Service runs, not for spools " + allow; + } + + return AN_INTERNAL_SERVICE_SHOULD_NOT_PRODUCE_JOBS_WHO_RUN_MORE_THAN + waitFactor + + " times the time between two Service runs"; + } + +} diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/LimitedSpoolsBasePolicy.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/LimitedSpoolsBasePolicy.java new file mode 100644 index 00000000..b12900d5 --- /dev/null +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/LimitedSpoolsBasePolicy.java @@ -0,0 +1,61 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine.watchdog; + +import static java.util.stream.Collectors.toUnmodifiableSet; + +import java.util.Optional; +import java.util.Set; + +import lombok.EqualsAndHashCode; +import lombok.Setter; +import lombok.ToString; + +@Setter +@ToString +@EqualsAndHashCode +abstract class LimitedSpoolsBasePolicy implements JobWatchdogPolicy { + + private Set onlySpools; + private Set notSpools; + + public Set getOnlySpools() { + return Optional.ofNullable(onlySpools).stream() + .flatMap(Set::stream) + .collect(toUnmodifiableSet()); + } + + public Set getNotSpools() { + return Optional.ofNullable(notSpools).stream() + .flatMap(Set::stream) + .collect(toUnmodifiableSet()); + } + + protected boolean allowSpool(final String spoolName) { + final var allow = getOnlySpools(); + if (allow.isEmpty() == false) { + return allow.contains(spoolName); + } + + final var deny = getNotSpools(); + if (deny.isEmpty() == false) { + return deny.contains(spoolName) == false; + } + return true; + } + +} diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/MaxSpoolQueueSizePolicy.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/MaxSpoolQueueSizePolicy.java new file mode 100644 index 00000000..65e2495d --- /dev/null +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/MaxSpoolQueueSizePolicy.java @@ -0,0 +1,85 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine.watchdog; + +import java.time.Duration; +import java.util.Optional; +import java.util.Set; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = true) +@NoArgsConstructor +@Slf4j +public class MaxSpoolQueueSizePolicy extends LimitedSpoolsBasePolicy { + + public static final Duration DEFAULT_CHECKTIME = Duration.ofHours(1); + private static final String THE_QUEUE_SIZE_SHOULD_NOT_FILL_UP_TO = "The queue size should not fill up to "; + private int maxSize; + private Duration checkTime; + + @Override + public Optional isStatusOk(final String spoolName, + final WatchableSpoolJobState activeJob, + final Set queuedJobs) throws JobWatchdogPolicyWarning { + if (maxSize < 0) { + log.warn("You should set a valid maxSize (not {}) for this policy", maxSize); + return Optional.empty(); + } + if (allowSpool(spoolName) == false) { + return Optional.empty(); + } + if (queuedJobs.size() > maxSize) { + throw new JobWatchdogPolicyWarning("The queue size for the spool " + spoolName + + " is actually up to " + queuedJobs.size() + + ", but this policy allow only " + maxSize + " waiting job(s)."); + } + return Optional.ofNullable(checkTime).or(() -> Optional.ofNullable(DEFAULT_CHECKTIME)); + } + + @Override + public void isStatusOk(final String spoolName, + final WatchableSpoolJobState activeJob, + final Set queuedJobs, + final Set relativeBackgroundServices) throws JobWatchdogPolicyWarning { + isStatusOk(spoolName, activeJob, queuedJobs); + } + + @Override + public String getDescription() { + final var allow = getOnlySpools(); + if (allow.isEmpty() == false) { + return THE_QUEUE_SIZE_SHOULD_NOT_FILL_UP_TO + maxSize + " queued jobs, only for spools " + allow; + } + + final var deny = getNotSpools(); + if (deny.isEmpty() == false) { + return THE_QUEUE_SIZE_SHOULD_NOT_FILL_UP_TO + maxSize + " queued jobs, but not for spools " + allow; + } + + return THE_QUEUE_SIZE_SHOULD_NOT_FILL_UP_TO + maxSize + " queued jobs"; + } + +} diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/WatchableBackgroundService.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/WatchableBackgroundService.java new file mode 100644 index 00000000..1c810a7a --- /dev/null +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/WatchableBackgroundService.java @@ -0,0 +1,22 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine.watchdog; + +public record WatchableBackgroundService(String serviceName, + String spoolName, + long timedInterval) { +} diff --git a/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/WatchableSpoolJobState.java b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/WatchableSpoolJobState.java new file mode 100644 index 00000000..14ee5a76 --- /dev/null +++ b/jobkit/engine/src/main/java/tv/hd3g/jobkit/engine/watchdog/WatchableSpoolJobState.java @@ -0,0 +1,33 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine.watchdog; + +import java.time.Duration; +import java.util.Date; +import java.util.Optional; + +public record WatchableSpoolJobState(Date createdDate, + String commandName, + long createdIndex, + Optional creator, + Optional startedDate) { + + public Optional getRunTime() { + return startedDate.map(s -> System.currentTimeMillis() - s).map(Duration::ofMillis); + } + +} diff --git a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/AtomicComputeReferenceTest.java b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/AtomicComputeReferenceTest.java index 523bfba6..8ffb3daf 100644 --- a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/AtomicComputeReferenceTest.java +++ b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/AtomicComputeReferenceTest.java @@ -20,10 +20,14 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.UnaryOperator; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -45,11 +49,13 @@ interface Item { @Mock Item item; @Mock - Object item2; + Item item2; @Mock Function process; @Mock Predicate predicate; + @Mock + UnaryOperator replace; @BeforeEach void init() throws Exception { @@ -59,7 +65,7 @@ void init() throws Exception { @AfterEach void end() { - Mockito.verifyNoMoreInteractions(item, item2, process, predicate); + Mockito.verifyNoMoreInteractions(item, item2, process, predicate, replace); } @Test @@ -86,16 +92,16 @@ void testSetAnd() { } @Test - void testReset() { + void testIsSet() { assertFalse(acr.isSet()); acr.set(item); assertTrue(acr.isSet()); } @Test - void testIsSet() { + void testReset() { acr.set(item); - acr.reset(); + assertEquals(item, acr.reset()); assertNull(acr.get()); } @@ -132,4 +138,14 @@ void testComputePredicate() { })); assertEquals(item, ref.get()); } + + @Test + void testReplace() { + acr.set(item); + when(replace.apply(item)).thenReturn(item2); + acr.replace(replace); + verify(replace, times(1)).apply(item); + assertEquals(item2, acr.get()); + } + } diff --git a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/BackgroundServiceEventTest.java b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/BackgroundServiceEventTest.java index cb14e783..66215907 100644 --- a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/BackgroundServiceEventTest.java +++ b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/BackgroundServiceEventTest.java @@ -2,9 +2,15 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.longThat; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.internal.verification.VerificationModeFactory.atLeast; import static org.mockito.internal.verification.VerificationModeFactory.times; @@ -26,6 +32,8 @@ class BackgroundServiceEventTest { @Mock BackgroundServiceEvent event; + @Mock + JobKitWatchdog jobKitWatchdog; ScheduledExecutorService scheduledExecutorService; CountDownLatch latch; @@ -47,23 +55,29 @@ void init() throws Exception { endLatch = new CountDownLatch(1); name = String.valueOf(System.nanoTime()); spoolName = String.valueOf(System.nanoTime()); - spooler = new Spooler(new ExecutionEvent() {}, new SupervisableEvents() {}); + spooler = new Spooler(new ExecutionEvent() {}, new SupervisableEvents() {}, jobKitWatchdog); error = new AtomicReference<>(); - service = new BackgroundService(name, spoolName, spooler, scheduledExecutorService, event, () -> { - latch.await(500, MILLISECONDS); - final var e = error.get(); - if (e != null) { - throw e; - } - }, () -> { - endLatch.await(100, MILLISECONDS); - }); + service = new BackgroundService(name, spoolName, spooler, scheduledExecutorService, event, jobKitWatchdog, + () -> { + latch.await(500, MILLISECONDS); + final var e = error.get(); + if (e != null) { + throw e; + } + }, () -> { + endLatch.await(100, MILLISECONDS); + }); service.setTimedInterval(interval, TimeUnit.MILLISECONDS); } @AfterEach void close() { + verify(jobKitWatchdog, atMost(100)).addJob(any(WatchableSpoolJob.class)); + verify(jobKitWatchdog, atMost(100)).startJob(any(WatchableSpoolJob.class), anyLong()); + verify(jobKitWatchdog, atMost(100)).endJob(any(WatchableSpoolJob.class)); + verifyNoMoreInteractions(event, jobKitWatchdog); + service.disable(); spooler.shutdown(Set.of()); scheduledExecutorService.shutdown(); @@ -82,6 +96,17 @@ void completeCycle_scheduleNextBackgroundServiceTask() throws InterruptedExcepti Thread.sleep(10 * interval);// NOSONAR verifyScheduleNextBackgroundServiceTask(atLeast(2)); + + verify(event, times(1)).onChangeEnabled(eq(name), eq(spoolName), any(boolean.class)); + verify(event, atLeast(1)).onChangeTimedInterval(name, spoolName, interval); + verify(event, atLeast(1)).scheduleNextBackgroundServiceTask( + anyString(), anyString(), anyInt(), anyLong()); + verify(event, atLeast(1)).planNextExec(eq(name), eq(spoolName), longThat(m -> m >= interval - 1l)); + verify(event, atLeast(1)).nextBackgroundServiceTask(name, spoolName, 0); + reset(event); + + verify(jobKitWatchdog, atLeast(1)) + .refreshBackgroundService(eq(name), eq(spoolName), any(boolean.class), longThat(m -> m >= interval)); } @Test @@ -92,7 +117,15 @@ void completeCycle_nextBackgroundServiceTask() throws InterruptedException { Thread.sleep(10 * interval);// NOSONAR - verify(event, atLeast(1)).nextBackgroundServiceTask(name, spoolName, 0); + verify(event, atLeast(0)).nextBackgroundServiceTask(name, spoolName, 0); + verify(event, times(1)).onChangeEnabled(eq(name), eq(spoolName), any(boolean.class)); + verify(event, atLeast(1)).onChangeTimedInterval(name, spoolName, interval); + verify(event, atLeast(1)).scheduleNextBackgroundServiceTask( + eq(name), eq(spoolName), eq(0), longThat(m -> m >= interval)); + verify(event, atLeast(1)).planNextExec(eq(name), eq(spoolName), longThat(m -> m >= interval)); + + verify(jobKitWatchdog, atLeast(1)) + .refreshBackgroundService(eq(name), eq(spoolName), any(boolean.class), longThat(m -> m >= interval)); } @Test @@ -115,7 +148,17 @@ void completeCycle_planNextExec() throws InterruptedException { service.setRetryAfterTimeFactor(2); Thread.sleep(10 * interval);// NOSONAR - verify(event, atLeast(1)).planNextExec(eq(name), eq(spoolName), longThat(m -> m > interval)); + verify(event, atLeast(1)).planNextExec(eq(name), eq(spoolName), longThat(m -> m >= interval)); + verify(event, atLeast(1)).onChangeTimedInterval(name, spoolName, interval); + verify(event, atLeast(1)).onChangeEnabled(name, spoolName, true); + verify(event, atLeast(1)).scheduleNextBackgroundServiceTask( + eq(name), eq(spoolName), eq(0), longThat(m -> m >= interval)); + verify(event, atLeast(0)).nextBackgroundServiceTask(name, spoolName, 0); + verify(event, atLeast(1)).onPreviousRunWithError(name, spoolName, error.get()); + verify(event, atLeast(1)).onChangeRetryAfterTimeFactor(name, spoolName, 2); + + verify(jobKitWatchdog, atLeast(1)) + .refreshBackgroundService(eq(name), eq(spoolName), any(boolean.class), longThat(m -> m >= interval)); } @Test @@ -129,7 +172,17 @@ void completeCycle_onPreviousRunWithError() throws InterruptedException { error.set(new RuntimeException("This is a test error")); Thread.sleep(10 * interval);// NOSONAR - verify(event, atLeast(1)).onPreviousRunWithError(name, spoolName, error.get()); + verify(event, atLeast(1)).onPreviousRunWithError(any(), any(), any(Exception.class)); + verify(event, times(1)).onChangeEnabled(eq(name), eq(spoolName), any(boolean.class)); + verify(event, atLeast(1)).onChangeTimedInterval(name, spoolName, interval); + verify(event, atLeast(1)).scheduleNextBackgroundServiceTask( + eq(name), eq(spoolName), eq(0), longThat(m -> m >= interval)); + verify(event, atLeast(1)).planNextExec(eq(name), eq(spoolName), longThat(m -> m >= interval - 1l)); + verify(event, atLeast(1)).nextBackgroundServiceTask(name, spoolName, 0); + reset(event); + + verify(jobKitWatchdog, atLeast(1)) + .refreshBackgroundService(eq(name), eq(spoolName), any(boolean.class), longThat(m -> m >= interval)); } @Test @@ -149,6 +202,13 @@ void completeCycle_onChangeTimedInterval() throws InterruptedException { service.setTimedInterval(interval * 3, TimeUnit.MILLISECONDS); verify(event, times(1)).onChangeTimedInterval(name, spoolName, interval * 3); + verify(event, times(2)).onChangeEnabled(eq(name), eq(spoolName), any(boolean.class)); + verify(event, atLeast(1)).onChangeTimedInterval(name, spoolName, interval); + verify(event, atLeast(1)).scheduleNextBackgroundServiceTask( + eq(name), eq(spoolName), eq(0), longThat(m -> m >= interval)); + + verify(jobKitWatchdog, atLeast(1)) + .refreshBackgroundService(eq(name), eq(spoolName), any(boolean.class), longThat(m -> m >= interval)); } @Test @@ -180,6 +240,13 @@ void completeCycle_onChangeEnabled() throws InterruptedException { verifyOnChangeEnabled(times(2), true); verify(event, times(4)).onChangeEnabled(eq(name), eq(spoolName), any(boolean.class)); + verify(event, atLeast(1)).onChangeTimedInterval(name, spoolName, interval); + verify(event, atLeast(1)).scheduleNextBackgroundServiceTask( + eq(name), eq(spoolName), eq(0), longThat(m -> m >= interval)); + verify(event, atMost(1)).nextBackgroundServiceTask(name, spoolName, 0); + + verify(jobKitWatchdog, atLeast(1)) + .refreshBackgroundService(eq(name), eq(spoolName), any(boolean.class), longThat(m -> m >= interval)); } @Test @@ -195,6 +262,14 @@ void completeCycle_onChangeRetryAfterTimeFactor() throws InterruptedException { verifyOnChangeRetryAfterTimeFactor(times(1), 4); verify(event, times(2)).onChangeRetryAfterTimeFactor(eq(name), eq(spoolName), any(double.class)); + verify(event, times(2)).onChangeEnabled(eq(name), eq(spoolName), any(boolean.class)); + verify(event, atLeast(1)).onChangeTimedInterval(name, spoolName, interval); + verify(event, atMost(1)).nextBackgroundServiceTask(name, spoolName, 0); + verify(event, atLeast(1)).scheduleNextBackgroundServiceTask( + eq(name), eq(spoolName), eq(0), longThat(m -> m >= interval)); + + verify(jobKitWatchdog, atLeast(1)) + .refreshBackgroundService(eq(name), eq(spoolName), any(boolean.class), longThat(m -> m >= interval)); } private void verifyScheduleNextBackgroundServiceTask(final VerificationMode mode) { diff --git a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/BackgroundServiceTest.java b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/BackgroundServiceTest.java index baa240df..16fc55ba 100644 --- a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/BackgroundServiceTest.java +++ b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/BackgroundServiceTest.java @@ -49,6 +49,8 @@ class BackgroundServiceTest { ScheduledFuture nextRunReference; @Mock SpoolExecutor spoolExecutor; + @Mock + JobKitWatchdog jobKitWatchdog; @Captor ArgumentCaptor commandCaptor; @@ -69,7 +71,7 @@ void init() throws Exception { spoolName = String.valueOf(System.nanoTime()); timedInterval = Math.abs(random.nextLong()); backgroundService = new BackgroundService( - name, spoolName, spooler, scheduledExecutor, event, task, disableTask); + name, spoolName, spooler, scheduledExecutor, event, jobKitWatchdog, task, disableTask); when(scheduledExecutor.schedule(any(Runnable.class), eq(timedInterval), eq(MILLISECONDS))) .then(invocation -> nextRunReference); diff --git a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/FlatScheduledExecutorServiceTest.java b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/FlatScheduledExecutorServiceTest.java index 342b5e89..d4b69019 100644 --- a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/FlatScheduledExecutorServiceTest.java +++ b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/FlatScheduledExecutorServiceTest.java @@ -21,10 +21,16 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; +import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -33,19 +39,26 @@ class FlatScheduledExecutorServiceTest { - final Callable callable = null; + FlatScheduledExecutorService service; + + @Mock + Callable callable; @Mock FlatScheduledFuture runReference; - - FlatScheduledExecutorService service; + @Mock Runnable task; + @Mock + Object object; @BeforeEach void init() throws Exception { MockitoAnnotations.openMocks(this).close(); service = new FlatScheduledExecutorService(); - task = () -> { - }; + } + + @AfterEach + void end() { + verifyNoMoreInteractions(callable, runReference, task, object); } @Test @@ -110,37 +123,41 @@ void testShutdown() { @Test void testShutdownNow() { - assertThrows(UnsupportedOperationException.class, () -> service.shutdownNow()); + assertEquals(List.of(), service.shutdownNow()); } @Test void testIsShutdown() { - assertThrows(UnsupportedOperationException.class, () -> service.isShutdown()); + assertFalse(service.isShutdown()); } @Test void testIsTerminated() { - assertThrows(UnsupportedOperationException.class, () -> service.isTerminated()); + assertFalse(service.isShutdown()); } @Test - void testAwaitTermination() { - assertThrows(UnsupportedOperationException.class, () -> service.awaitTermination(0, TimeUnit.DAYS)); + void testAwaitTermination() throws InterruptedException { + assertTrue(service.awaitTermination(0, TimeUnit.DAYS)); } @Test - void testSubmitCallableOfT() { - assertThrows(UnsupportedOperationException.class, () -> service.submit(callable)); + void testSubmitCallableOfT() throws Exception { + when(callable.call()).thenReturn(object); + assertEquals(object, service.submit(callable).get()); + verify(callable, times(1)).call(); } @Test - void testSubmitRunnableT() { - assertThrows(UnsupportedOperationException.class, () -> service.submit(task, null)); + void testSubmitRunnableT() throws InterruptedException, ExecutionException { + assertEquals(object, service.submit(task, object).get()); + verify(task, times(1)).run(); } @Test void testSubmitRunnable() { - assertThrows(UnsupportedOperationException.class, () -> service.submit(task)); + service.submit(task); + verify(task, times(1)).run(); } @Test @@ -165,6 +182,7 @@ void testInvokeAnyCollectionOfQextendsCallableOfTLongTimeUnit() { @Test void testExecute() { - assertThrows(UnsupportedOperationException.class, () -> service.execute(task)); + service.execute(task); + verify(task, times(1)).run(); } } diff --git a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/JobKitWatchdogTest.java b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/JobKitWatchdogTest.java new file mode 100644 index 00000000..31bfee4a --- /dev/null +++ b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/JobKitWatchdogTest.java @@ -0,0 +1,527 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.openMocks; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; + +import net.datafaker.Faker; +import tv.hd3g.jobkit.engine.watchdog.JobWatchdogPolicy; +import tv.hd3g.jobkit.engine.watchdog.JobWatchdogPolicyWarning; +import tv.hd3g.jobkit.engine.watchdog.JobWatchdogSpoolReport; +import tv.hd3g.jobkit.engine.watchdog.WatchableBackgroundService; +import tv.hd3g.jobkit.engine.watchdog.WatchableSpoolJobState; + +class JobKitWatchdogTest { + + private static Faker faker = net.datafaker.Faker.instance(); + + JobKitWatchdog w; + + String serviceNameLow; + String serviceName; + String serviceNameHigh; + String spoolName; + long timedInterval; + String commandName; + long startedDate; + long createdIndex; + long durationToQueue; + + @Mock + ScheduledExecutorService sch; + @Mock + ScheduledFuture scheduledFuture; + @Mock + SupervisableEvents supervisableEvents; + @Mock + JobWatchdogPolicy policy; + @Mock + WatchableSpoolJob job; + @Mock + WatchableSpoolJob waitJob; + @Mock + SpoolExecutor spoolExecutor; + @Mock + Optional creator; + @Captor + ArgumentCaptor run; + @Captor + ArgumentCaptor activeJobCaptor; + @Captor + ArgumentCaptor> queuedJobsCaptor; + @Captor + ArgumentCaptor> relativeBackgroundServicesCaptor; + @Captor + ArgumentCaptor reportCaptor; + + @BeforeEach + void init() throws Exception { + openMocks(this).close(); + w = new JobKitWatchdog(supervisableEvents, sch); + serviceNameLow = faker.numerify("serviceNameLow###"); + serviceName = faker.numerify("serviceName###"); + serviceNameHigh = faker.numerify("serviceNameHigh###"); + spoolName = faker.numerify("spoolName###"); + timedInterval = faker.random().nextLong(1000, 10000000); + durationToQueue = faker.random().nextLong(1000, 10000000); + + when(policy.getDescription()).thenReturn(faker.numerify("description###")); + + when(sch.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))).thenReturn( + (ScheduledFuture) scheduledFuture); + + commandName = faker.numerify("commandName###"); + createdIndex = faker.random().nextLong(); + startedDate = System.currentTimeMillis(); + + when(job.getCommandName()).thenReturn(commandName); + when(job.getSpoolName()).thenReturn(spoolName); + when(job.getCreatedIndex()).thenReturn(createdIndex); + when(job.getCreator()).thenReturn(creator); + when(job.getExecutorReferer()).thenReturn(spoolExecutor); + + when(waitJob.getCommandName()).thenReturn(commandName); + when(waitJob.getSpoolName()).thenReturn(spoolName); + when(waitJob.getCreatedIndex()).thenReturn(createdIndex + 1l); + when(waitJob.getCreator()).thenReturn(creator); + when(waitJob.getExecutorReferer()).thenReturn(spoolExecutor); + } + + @AfterEach + void end() { + verify(policy, atLeast(0)).getDescription(); + verify(scheduledFuture, atLeast(0)).isDone(); + verify(scheduledFuture, atLeast(0)).isCancelled(); + verify(scheduledFuture, atLeast(0)).getDelay(MILLISECONDS); + + verifyNoMoreInteractions(sch, scheduledFuture, supervisableEvents, policy, spoolExecutor, creator); + } + + @Test + void testAddGetPolicies() { + final var actual = w.getPolicies(); + assertEquals(List.of(), actual); + assertThrows(UnsupportedOperationException.class, () -> actual.add(policy)); + w.addPolicies(policy); + assertTrue(w.getPolicies().contains(policy)); + assertFalse(actual.contains(policy)); + } + + @Nested + class OkPolicyOneShot { + + @BeforeEach + void init() throws Exception { + w.addPolicies(policy); + } + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + void testRefreshBackgroundService(final boolean enabled) { + w.refreshBackgroundService(serviceName, spoolName, enabled, timedInterval); + verify(sch, times(1)).execute(run.capture()); + run.getValue().run(); + } + + @Test + void testRefreshBackgroundService_add_enabled() { + w.refreshBackgroundService(serviceNameLow, spoolName, true, timedInterval / 2l); + w.refreshBackgroundService(serviceNameHigh, spoolName, true, timedInterval * 2l); + w.refreshBackgroundService(serviceName, spoolName, true, timedInterval); + + verify(sch, times(3)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + } + + @Test + void testRefreshBackgroundService_addmultiple_enabled() { + w.refreshBackgroundService(serviceNameLow, spoolName, true, timedInterval / 2l); + w.refreshBackgroundService(serviceNameHigh, spoolName, true, timedInterval * 2l); + w.refreshBackgroundService(serviceName, spoolName, true, timedInterval); + w.refreshBackgroundService(serviceNameLow, spoolName, true, timedInterval / 2l); + w.refreshBackgroundService(serviceNameHigh, spoolName, false, timedInterval * 2l); + w.refreshBackgroundService(serviceNameHigh, spoolName, true, timedInterval * 2l); + + verify(sch, times(6)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + } + + @Test + void testAddJob() { + w.addJob(job); + verify(sch, times(1)).execute(run.capture()); + run.getValue().run(); + } + + @Test + void testAddStartJob() throws JobWatchdogPolicyWarning { + w.addJob(job); + w.addJob(waitJob); + w.startJob(job, startedDate); + verify(sch, times(3)).execute(run.capture()); + run.getValue().run(); + verify(policy, times(1)).isStatusOk(eq(spoolName), activeJobCaptor.capture(), queuedJobsCaptor.capture()); + + final var activeJob = activeJobCaptor.getValue(); + assertEquals(commandName, activeJob.commandName()); + assertTrue(activeJob.createdDate().getTime() <= startedDate + 100); + assertEquals(createdIndex, activeJob.createdIndex()); + assertTrue(activeJob.getRunTime().isPresent()); + assertEquals(startedDate, activeJob.startedDate().get()); + assertEquals(creator, activeJob.creator()); + + final var queuedJobs = queuedJobsCaptor.getValue(); + assertFalse(queuedJobs.isEmpty()); + final var queuedJob = queuedJobs.stream().findFirst().get(); + assertEquals(commandName, queuedJob.commandName()); + assertTrue(queuedJob.createdDate().getTime() <= startedDate + 100); + assertEquals(createdIndex + 1, queuedJob.createdIndex()); + assertFalse(queuedJob.getRunTime().isPresent()); + assertFalse(queuedJob.startedDate().isPresent()); + assertEquals(creator, queuedJob.creator()); + } + + @Test + void testAddStartEndJob() throws JobWatchdogPolicyWarning { + w.addJob(job); + w.startJob(job, startedDate); + w.endJob(job); + + verify(sch, times(3)).execute(run.capture()); + run.getValue().run(); + } + + } + + @Nested + class OkPolicyRegular extends OkPolicyOneShot { + + @Override + @BeforeEach + void init() throws Exception { + super.init(); + when(policy.isStatusOk(eq(spoolName), any(WatchableSpoolJobState.class), anySet())) + .thenReturn(Optional.ofNullable(Duration.ofMillis(durationToQueue))); + } + + @Override + @Test + void testAddStartJob() throws JobWatchdogPolicyWarning { + super.testAddStartJob(); + verify(sch, times(1)).schedule(any(Runnable.class), eq(durationToQueue), eq(MILLISECONDS)); + } + + @Test + void testAddStartEndJob_notLowerDurationToQueue() throws JobWatchdogPolicyWarning { + when(scheduledFuture.isDone()).thenReturn(true); + when(scheduledFuture.getDelay(MILLISECONDS)).thenReturn(durationToQueue * 2l); + + w.addJob(job); + verify(sch, times(1)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + + w.addJob(waitJob); + verify(sch, times(2)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + when(scheduledFuture.isDone()).thenReturn(false); + + w.startJob(job, startedDate); + verify(sch, times(3)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + verify(scheduledFuture, times(5)).cancel(false); + + w.endJob(job); + verify(sch, times(4)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + verify(policy, times(6)).isStatusOk(eq(spoolName), any(WatchableSpoolJobState.class), argThat(s -> s.size() == 1)); + verify(sch, times(6)).schedule(any(Runnable.class), eq(durationToQueue), eq(MILLISECONDS)); + } + + @Test + void testAddStartEndJob_lowerDurationToQueue() throws JobWatchdogPolicyWarning { + when(scheduledFuture.isDone()).thenReturn(false); + when(scheduledFuture.getDelay(MILLISECONDS)).thenReturn(durationToQueue / 2l); + + w.addJob(job); + verify(sch, times(1)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + w.addJob(waitJob); + verify(sch, times(2)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + w.startJob(job, startedDate); + verify(sch, times(3)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + w.endJob(job); + verify(sch, times(4)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + verify(policy, times(6)).isStatusOk(eq(spoolName), any(WatchableSpoolJobState.class), argThat(s -> s.size() == 1)); + verify(sch, times(1)).schedule(any(Runnable.class), eq(durationToQueue), eq(MILLISECONDS)); + } + + @Test + void testAddStartEndJob_Service() throws JobWatchdogPolicyWarning { + w.refreshBackgroundService(serviceNameLow, spoolName, true, timedInterval / 2l); + w.refreshBackgroundService(serviceNameHigh, spoolName, true, timedInterval * 2l); + w.refreshBackgroundService(serviceName, spoolName, true, timedInterval); + + verify(sch, times(3)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + w.addJob(job); + verify(sch, times(4)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + w.addJob(waitJob); + verify(sch, times(5)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + w.startJob(job, startedDate); + verify(sch, times(6)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + w.endJob(job); + verify(sch, times(7)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + verify(sch, times(1)).schedule(any(Runnable.class), eq(timedInterval / 2), eq(MILLISECONDS)); + verify(policy, times(18)) + .isStatusOk( + eq(spoolName), + any(WatchableSpoolJobState.class), + argThat(s -> s.size() == 1), + argThat(s -> s.size() == 3)); + } + + @Test + void testShutdown() throws JobWatchdogPolicyWarning { + super.testAddStartJob(); + w.refreshBackgroundService(serviceName, spoolName, true, timedInterval); + verify(sch, times(4)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + verify(sch, times(1)).schedule(any(Runnable.class), eq(durationToQueue), eq(MILLISECONDS)); + + w.shutdown(); + verify(scheduledFuture, times(1)).cancel(true); + + verify(policy, times(7)).isStatusOk(eq(spoolName), any(WatchableSpoolJobState.class), anySet(), anySet()); + } + + } + + @Test + void testShutdown_notRun() { + w.shutdown(); + w.refreshBackgroundService(serviceName, spoolName, true, timedInterval); + w.addJob(job); + w.addJob(waitJob); + w.startJob(job, startedDate); + w.endJob(job); + + verify(sch, times(5)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + w.shutdown(); + } + + @Nested + class KoPolicy { + + JobWatchdogPolicyWarning warning; + JobWatchdogSpoolReport report; + AtomicInteger count; + + @BeforeEach + void init() throws Exception { + w.addPolicies(policy); + warning = new JobWatchdogPolicyWarning(faker.numerify("warningMessage###")); + count = new AtomicInteger(0); + } + + @Test + void testAddStartReleaseEndJob() throws JobWatchdogPolicyWarning { + final var now = System.currentTimeMillis(); + when(policy.isStatusOk(eq(spoolName), any(WatchableSpoolJobState.class), anySet())) + .thenAnswer(invocation -> { + if (count.getAndAdd(1) <= 1) { + throw warning; + } + return Optional.ofNullable(Duration.ofMillis(durationToQueue)); + }); + + w.addJob(job); + verify(sch, times(1)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + w.addJob(waitJob); + verify(sch, times(2)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + w.startJob(job, startedDate); + verify(sch, times(3)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + w.endJob(job); + verify(sch, times(4)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + verify(policy, times(6)).isStatusOk( + eq(spoolName), any(WatchableSpoolJobState.class), argThat(s -> s.size() == 1)); + verify(sch, times(1)).schedule(any(Runnable.class), eq(durationToQueue), eq(MILLISECONDS)); + + verify(supervisableEvents, times(1)).onJobWatchdogSpoolReport(reportCaptor.capture()); + final var report = reportCaptor.getValue(); + + assertEquals(commandName, report.activeJob().commandName()); + assertEquals(createdIndex, report.activeJob().createdIndex()); + assertEquals(startedDate, report.activeJob().startedDate().get()); + assertEquals(creator, report.activeJob().creator()); + assertTrue(now <= report.activeJob().createdDate().getTime()); + assertTrue(System.currentTimeMillis() >= report.activeJob().createdDate().getTime()); + assertEquals(spoolName, report.spoolName()); + assertTrue(report.relativeBackgroundServices().isEmpty()); + assertEquals(policy, report.policy()); + assertTrue(now <= report.created().getTime()); + assertTrue(System.currentTimeMillis() >= report.created().getTime()); + assertTrue(System.currentTimeMillis() - startedDate <= report.activeJob().getRunTime().get().toMillis()); + assertEquals(warning, report.warning()); + + assertFalse(report.queuedJobs().isEmpty()); + final var queued = report.queuedJobs().stream().findFirst().get(); + assertTrue(queued.getRunTime().isEmpty()); + assertTrue(queued.startedDate().isEmpty()); + assertEquals(createdIndex + 1, queued.createdIndex()); + + verify(supervisableEvents, times(1)).onJobWatchdogSpoolReleaseReport(reportCaptor.capture()); + assertEquals(report, reportCaptor.getValue()); + } + + @Test + void testAddStartReleaseEndJob_service() throws JobWatchdogPolicyWarning { + final var now = System.currentTimeMillis(); + doAnswer(invocation -> { + if (count.getAndAdd(1) <= 1) { + throw warning; + } + return null; + }).when(policy).isStatusOk(eq(spoolName), any(WatchableSpoolJobState.class), anySet(), anySet()); + when(scheduledFuture.isDone()).thenReturn(true); + + w.refreshBackgroundService(serviceName, spoolName, true, timedInterval); + verify(sch, times(1)).execute(run.capture()); + + w.addJob(job); + verify(sch, times(2)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + w.addJob(waitJob); + verify(sch, times(3)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + w.startJob(job, startedDate); + verify(sch, times(4)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + w.endJob(job); + verify(sch, times(5)).execute(run.capture()); + run.getAllValues().forEach(Runnable::run); + + verify(policy, times(10)).isStatusOk( + eq(spoolName), + any(WatchableSpoolJobState.class), + argThat(s -> s.size() == 1), + argThat(s -> s.size() == 1)); + verify(sch, times(8)).schedule(any(Runnable.class), eq(timedInterval), eq(MILLISECONDS)); + + verify(supervisableEvents, times(1)).onJobWatchdogSpoolReport(reportCaptor.capture()); + final var report = reportCaptor.getValue(); + + assertEquals(commandName, report.activeJob().commandName()); + assertEquals(createdIndex, report.activeJob().createdIndex()); + assertEquals(startedDate, report.activeJob().startedDate().get()); + assertEquals(creator, report.activeJob().creator()); + assertTrue(now <= report.activeJob().createdDate().getTime()); + assertTrue(System.currentTimeMillis() >= report.activeJob().createdDate().getTime()); + assertEquals(spoolName, report.spoolName()); + + assertEquals(1, report.relativeBackgroundServices().size()); + final var service = report.relativeBackgroundServices().stream().findFirst().get(); + assertEquals(serviceName, service.serviceName()); + assertEquals(spoolName, service.spoolName()); + assertEquals(timedInterval, service.timedInterval()); + + assertEquals(policy, report.policy()); + assertTrue(now <= report.created().getTime()); + assertTrue(System.currentTimeMillis() >= report.created().getTime()); + assertTrue(System.currentTimeMillis() - startedDate <= report.activeJob().getRunTime().get().toMillis()); + assertEquals(warning, report.warning()); + + assertFalse(report.queuedJobs().isEmpty()); + final var queued = report.queuedJobs().stream().findFirst().get(); + assertTrue(queued.getRunTime().isEmpty()); + assertTrue(queued.startedDate().isEmpty()); + assertEquals(createdIndex + 1, queued.createdIndex()); + + verify(supervisableEvents, times(1)).onJobWatchdogSpoolReleaseReport(reportCaptor.capture()); + assertEquals(report, reportCaptor.getValue()); + } + } + +} diff --git a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/SpoolExecutorTest.java b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/SpoolExecutorTest.java index e2d4e75c..d3424c65 100644 --- a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/SpoolExecutorTest.java +++ b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/SpoolExecutorTest.java @@ -6,9 +6,11 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.longThat; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.internal.verification.VerificationModeFactory.times; @@ -34,6 +36,7 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.mockito.verification.VerificationMode; import net.datafaker.Faker; @@ -45,6 +48,8 @@ class SpoolExecutorTest { ExecutionEvent event; @Mock SupervisableEvents sEvent; + @Mock + JobKitWatchdog jobKitWatchdog; @Captor ArgumentCaptor> oExceptionCaptor; @@ -64,12 +69,12 @@ void init() throws Exception { spoolExecutorName = "Internal test Spool executor"; threadName = "SpoolExecutor #0"; threadCount = new AtomicLong(); - spoolExecutor = new SpoolExecutor(spoolExecutorName, event, threadCount, sEvent); + spoolExecutor = new SpoolExecutor(spoolExecutorName, event, threadCount, sEvent, jobKitWatchdog); } @AfterEach void ends() { - verifyNoMoreInteractions(sEvent); + verifyNoMoreInteractions(sEvent, jobKitWatchdog); for (var pos = 0; pos < runnedTasks.size(); pos++) { assertEquals(pos, runnedTasks.get(pos)); @@ -119,6 +124,8 @@ void testAddToQueue() throws InterruptedException { verify(event, times(0)).shutdownSpooler(any(Supervisable.class)); checkSupervisableEventOnEnd(3, true); + + checkWatchdog(1); } @Test @@ -156,6 +163,8 @@ void testAddToQueue_withError() throws InterruptedException { verify(event, times(0)).shutdownSpooler(any(Supervisable.class)); verify(sEvent, times(total * 4)).onEnd(any(), any()); + + checkWatchdog(total); } @Test @@ -178,6 +187,8 @@ void testAddToQueue_multiple() throws InterruptedException { assertEquals(total, count.get()); checkSupervisableEventOnEnd(total * 3, true); + + checkWatchdog(total); } @Test @@ -199,6 +210,8 @@ void testAddToQueue_onebyone() throws InterruptedException { assertEquals(total, count.get()); checkSupervisableEventOnEnd(total * 3, true); + + checkWatchdog(total); } @Test @@ -221,6 +234,8 @@ void testAddToQueue_afterRunError() throws InterruptedException { assertTrue(smCmd1.await(10, SECONDS)); verify(sEvent, atLeast(7)).onEnd(any(), any()); + + checkWatchdog(2); } @Test @@ -251,6 +266,8 @@ void testGetQueueSize() throws InterruptedException { verify(event, times(0)).shutdownSpooler(any(Supervisable.class)); checkSupervisableEventOnEnd(7, true); + + checkWatchdog(2); } @Test @@ -274,6 +291,8 @@ void testIsRunning() throws InterruptedException { assertFalse(spoolExecutor.isRunning()); checkSupervisableEventOnEnd(3, true); + + checkWatchdog(1); } @Test @@ -307,6 +326,8 @@ void testClean_purgeWaitList_noEmpty() { spoolExecutor.clean(true); assertTrue(count.get() / 2 < total); verify(sEvent, atLeast(1)).onEnd(any(), any()); + + checkWatchdog(atMost(1000)); } @Test @@ -339,6 +360,10 @@ void testClean_notPurgeWaitList() throws InterruptedException { assertTrue(smCmd.await(total * 10, SECONDS)); checkSupervisableEventOnEnd(total * 4, true); + + verify(jobKitWatchdog, times(total)).addJob(any(WatchableSpoolJob.class)); + verify(jobKitWatchdog, times(total)).startJob(any(WatchableSpoolJob.class), anyLong()); + verify(jobKitWatchdog, times(total)).endJob(any(WatchableSpoolJob.class)); } @Test @@ -394,6 +419,10 @@ class PJob { assertEquals(prioSort, dateSort); checkSupervisableEventOnEnd(count * 3, true); + + verify(jobKitWatchdog, times(count)).addJob(any(WatchableSpoolJob.class)); + verify(jobKitWatchdog, times(count)).startJob(any(WatchableSpoolJob.class), anyLong()); + verify(jobKitWatchdog, atMost(count)).endJob(any(WatchableSpoolJob.class)); } private void checkSupervisableEventOnEnd(final int count, final boolean normalyDone) { @@ -444,6 +473,8 @@ void testAddToQueue_beforeStartError() throws InterruptedException { assertEquals(error, results.get(0).get()); assertFalse(results.get(1).isPresent()); assertFalse(results.get(2).isPresent()); + + checkWatchdog(1); } @Test @@ -467,6 +498,36 @@ void testAddToQueue_afterStartError() throws InterruptedException { assertFalse(results.get(0).isPresent()); assertFalse(results.get(1).isPresent()); assertEquals(error, results.get(2).get()); + checkWatchdog(atMost(1)); + } + + private void checkWatchdog(final VerificationMode mode) { + final var addCaptor = ArgumentCaptor.forClass(WatchableSpoolJob.class); + verify(jobKitWatchdog, mode).addJob(addCaptor.capture()); + + final var startCaptor = ArgumentCaptor.forClass(WatchableSpoolJob.class); + final var startTimeCaptor = ArgumentCaptor.forClass(Long.class); + verify(jobKitWatchdog, mode).startJob(startCaptor.capture(), startTimeCaptor.capture()); + + final var endCaptor = ArgumentCaptor.forClass(WatchableSpoolJob.class); + verify(jobKitWatchdog, mode).endJob(endCaptor.capture()); + + final var addCaptorValues = addCaptor.getAllValues(); + final var startCaptorValues = startCaptor.getAllValues(); + final var endCaptorValues = endCaptor.getAllValues(); + + final var max = Math.min(addCaptorValues.size(), Math.min(startCaptorValues.size(), endCaptorValues.size())); + for (var pos = 0; pos < max; pos++) { + assertEquals(addCaptorValues.get(pos), startCaptorValues.get(pos)); + assertEquals(endCaptorValues.get(pos), addCaptorValues.get(pos)); + final var startTime = startTimeCaptor.getAllValues().get(pos); + assertTrue(startTime > 0l); + } + + } + + private void checkWatchdog(final int total) { + checkWatchdog(atLeast(total - 1)); } } diff --git a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/SpoolerTest.java b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/SpoolerTest.java index a2e1abd1..9acfcffd 100644 --- a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/SpoolerTest.java +++ b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/SpoolerTest.java @@ -7,16 +7,21 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static tv.hd3g.jobkit.engine.RunnableWithException.nothing; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import net.datafaker.Faker; @@ -27,6 +32,8 @@ class SpoolerTest { @Mock ExecutionEvent event; + @Mock + JobKitWatchdog jobKitWatchdog; Spooler spooler; String a; @@ -38,7 +45,12 @@ void init() throws Exception { a = faker.numerify("spoolA###"); b = faker.numerify("spoolB###"); - spooler = new Spooler(event, new SupervisableEvents() {}); + spooler = new Spooler(event, new SupervisableEvents() {}, jobKitWatchdog); + } + + @AfterEach + void end() { + verifyNoMoreInteractions(event, jobKitWatchdog); } @Test @@ -80,9 +92,16 @@ void testShutdown_noKeepRun() { })); assertNull(spooler.getExecutor(b)); - verify(event, Mockito.times(1)).shutdownSpooler(any(Supervisable.class)); + verify(event, times(1)).shutdownSpooler(any(Supervisable.class)); spooler.shutdown(Set.of()); + + verify(event, atLeast(1)).beforeStart(anyString(), anyLong(), any()); + verify(event, atLeast(1)).afterRunCorrectly(anyString(), anyLong(), anyLong(), any()); + verify(jobKitWatchdog, atLeast(1)).addJob(any(WatchableSpoolJob.class)); + verify(jobKitWatchdog, atLeast(1)).startJob(any(WatchableSpoolJob.class), anyLong()); + verify(jobKitWatchdog, atLeast(1)).endJob(any(WatchableSpoolJob.class)); + verify(jobKitWatchdog, times(1)).shutdown(); } @Test @@ -110,7 +129,15 @@ void testShutdown_keepRun() { assertEquals(total * 2, countA.get()); assertTrue(total * 2 > countB.get()); - verify(event, Mockito.times(1)).shutdownSpooler(any(Supervisable.class)); + verify(event, times(1)).shutdownSpooler(any(Supervisable.class)); + + verify(event, atLeast(1)).beforeStart(anyString(), anyLong(), any()); + verify(event, atLeast(1)).afterRunCorrectly(anyString(), anyLong(), anyLong(), any()); + + verify(jobKitWatchdog, atLeast(1)).addJob(any(WatchableSpoolJob.class)); + verify(jobKitWatchdog, atLeast(1)).startJob(any(WatchableSpoolJob.class), anyLong()); + verify(jobKitWatchdog, atLeast(1)).endJob(any(WatchableSpoolJob.class)); + verify(jobKitWatchdog, times(1)).shutdown(); } @Test @@ -118,6 +145,7 @@ void testShutdown_empty() { spooler.shutdown(Set.of()); assertNull(spooler.getExecutor(b)); - verify(event, Mockito.times(1)).shutdownSpooler(any(Supervisable.class)); + verify(event, times(1)).shutdownSpooler(any(Supervisable.class)); + verify(jobKitWatchdog, times(1)).shutdown(); } } diff --git a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/SupervisableManagerTest.java b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/SupervisableManagerTest.java index fd8353a6..1ff5e2ce 100644 --- a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/SupervisableManagerTest.java +++ b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/SupervisableManagerTest.java @@ -18,18 +18,29 @@ import static java.util.Optional.ofNullable; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.openMocks; import static org.mockito.internal.verification.VerificationModeFactory.times; +import static tv.hd3g.jobkit.engine.SupervisableResultState.WORKS_DONE; +import java.time.Duration; +import java.util.Arrays; +import java.util.Date; import java.util.List; import java.util.Optional; +import java.util.Set; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -39,6 +50,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import net.datafaker.Faker; +import tv.hd3g.jobkit.engine.watchdog.JobWatchdogPolicy; +import tv.hd3g.jobkit.engine.watchdog.JobWatchdogPolicyWarning; +import tv.hd3g.jobkit.engine.watchdog.JobWatchdogSpoolReport; +import tv.hd3g.jobkit.engine.watchdog.WatchableBackgroundService; +import tv.hd3g.jobkit.engine.watchdog.WatchableSpoolJobState; class SupervisableManagerTest { static Faker faker = Faker.instance(); @@ -112,7 +128,7 @@ void testGetBusinessObject() throws JsonProcessingException, IllegalArgumentExce void testGetLifeCycle_onEnd() { when(supervisable.getEndEvent(oError, name)).thenReturn(ofNullable(supervisableEndEvent)); s.registerOnEndEventConsumer(eventConsumer); - + s.onEnd(supervisable, oError); verify(eventConsumer, times(1)).afterProcess(supervisableEndEvent); @@ -195,4 +211,169 @@ void testGetLifeCycle_onEnd_non_limited_retention() { verify(eventConsumer2, times(iterations)).afterProcess(supervisableEndEvent); } + private static class Policy implements JobWatchdogPolicy { + @Override + public void isStatusOk(final String spoolName, + final WatchableSpoolJobState activeJob, + final Set queuedJobs, + final Set relativeBackgroundServices) throws JobWatchdogPolicyWarning { + throw new UnsupportedOperationException(); + } + + @Override + public Optional isStatusOk(final String spoolName, + final WatchableSpoolJobState activeJob, + final Set queuedJobs) throws JobWatchdogPolicyWarning { + throw new UnsupportedOperationException(); + } + + @Override + public String getDescription() { + return faker.numerify("policyDescription###"); + } + } + + @Nested + class Report { + + JobWatchdogSpoolReport report; + + String spoolName; + WatchableSpoolJobState activeJob; + Date createdDate; + Set queuedJobs; + JobWatchdogPolicyWarning warning; + Set relativeBackgroundServices; + JobWatchdogPolicy policy; + SupervisableEndEvent endEvent; + + WatchableSpoolJobState queuedJob0; + WatchableSpoolJobState queuedJob1; + WatchableBackgroundService service; + + @Captor + ArgumentCaptor supervisableEndEventCaptor; + + @BeforeEach + void init() throws Exception { + openMocks(this).close(); + s.registerOnEndEventConsumer(eventConsumer); + + spoolName = faker.numerify("spoolName###"); + createdDate = new Date(); + activeJob = new WatchableSpoolJobState( + createdDate, faker.numerify("commandName###"), -1, Optional.empty(), Optional.empty()); + warning = new JobWatchdogPolicyWarning(faker.numerify("warning###")); + queuedJobs = Set.of(); + relativeBackgroundServices = Set.of(); + policy = new Policy(); + + queuedJob0 = new WatchableSpoolJobState( + createdDate, faker.numerify("commandName###"), -1, Optional.empty(), Optional.empty()); + queuedJob1 = new WatchableSpoolJobState( + createdDate, faker.numerify("commandName###"), -1, Optional.empty(), Optional.empty()); + service = new WatchableBackgroundService(faker.numerify("serviceName###"), + spoolName, faker.random().nextLong(10, 10000)); + } + + @AfterEach + void end() { + assertTrue(endEvent.creationDate().getTime() > 0); + assertTrue(new Date().compareTo(endEvent.creationDate()) >= 0); + assertTrue(endEvent.startDate().compareTo(endEvent.creationDate()) >= 0); + assertTrue(endEvent.endDate().compareTo(endEvent.startDate()) >= 0); + + assertEquals(context, endEvent.context()); + assertTrue(endEvent.isInternalStateChangeMarked()); + assertTrue(endEvent.isNotTrivialMarked()); + assertFalse(endEvent.isSecurityMarked()); + assertTrue(endEvent.isTypeName(JobWatchdogSpoolReport.class.getName())); + assertTrue(endEvent.isUrgentMarked()); + assertEquals(WORKS_DONE, endEvent.result().state()); + assertFalse(endEvent.result().message().code().isEmpty()); + assertFalse(endEvent.result().message().defaultResult().isEmpty()); + assertFalse(Arrays.asList(endEvent.result().message().getVarsArray()).isEmpty()); + assertEquals(spoolName, endEvent.spoolName()); + assertEquals(activeJob.commandName(), endEvent.jobName()); + } + + @Test + void testWarnReport() { + report = new JobWatchdogSpoolReport( + createdDate, spoolName, activeJob, queuedJobs, policy, warning, relativeBackgroundServices); + when(objectMapper.valueToTree(report)).thenReturn(context); + + s.onJobWatchdogSpoolReport(report); + + verify(objectMapper, times(1)).valueToTree(report); + verify(eventConsumer, times(1)).afterProcess(supervisableEndEventCaptor.capture()); + endEvent = supervisableEndEventCaptor.getValue(); + + assertEquals(2, endEvent.steps().size()); + } + + @Test + void testWarnReportQueuedJob() { + queuedJobs = Set.of(queuedJob0); + report = new JobWatchdogSpoolReport( + createdDate, spoolName, activeJob, queuedJobs, policy, warning, relativeBackgroundServices); + when(objectMapper.valueToTree(report)).thenReturn(context); + + s.onJobWatchdogSpoolReport(report); + + verify(objectMapper, times(1)).valueToTree(report); + verify(eventConsumer, times(1)).afterProcess(supervisableEndEventCaptor.capture()); + endEvent = supervisableEndEventCaptor.getValue(); + + assertEquals(3, endEvent.steps().size()); + } + + @Test + void testWarnReportQueuedJobs() { + queuedJobs = Set.of(queuedJob0, queuedJob1); + report = new JobWatchdogSpoolReport( + createdDate, spoolName, activeJob, queuedJobs, policy, warning, relativeBackgroundServices); + when(objectMapper.valueToTree(report)).thenReturn(context); + + s.onJobWatchdogSpoolReport(report); + + verify(objectMapper, times(1)).valueToTree(report); + verify(eventConsumer, times(1)).afterProcess(supervisableEndEventCaptor.capture()); + endEvent = supervisableEndEventCaptor.getValue(); + + assertEquals(4, endEvent.steps().size()); + } + + @Test + void testWarnReportServices() { + relativeBackgroundServices = Set.of(service); + report = new JobWatchdogSpoolReport( + createdDate, spoolName, activeJob, queuedJobs, policy, warning, relativeBackgroundServices); + when(objectMapper.valueToTree(report)).thenReturn(context); + + s.onJobWatchdogSpoolReport(report); + + verify(objectMapper, times(1)).valueToTree(report); + verify(eventConsumer, times(1)).afterProcess(supervisableEndEventCaptor.capture()); + endEvent = supervisableEndEventCaptor.getValue(); + + assertEquals(4, endEvent.steps().size()); + } + + @Test + void testReleaseReport() { + report = new JobWatchdogSpoolReport( + createdDate, spoolName, activeJob, queuedJobs, policy, warning, relativeBackgroundServices); + when(objectMapper.valueToTree(report)).thenReturn(context); + + s.onJobWatchdogSpoolReleaseReport(report); + + verify(objectMapper, times(1)).valueToTree(report); + verify(eventConsumer, times(1)).afterProcess(supervisableEndEventCaptor.capture()); + endEvent = supervisableEndEventCaptor.getValue(); + + assertEquals(1, endEvent.steps().size()); + } + + } } diff --git a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/LimitedExecTimePolicyTest.java b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/LimitedExecTimePolicyTest.java new file mode 100644 index 00000000..b92fa815 --- /dev/null +++ b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/LimitedExecTimePolicyTest.java @@ -0,0 +1,139 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine.watchdog; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.openMocks; + +import java.time.Duration; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; + +import net.datafaker.Faker; + +class LimitedExecTimePolicyTest { + static Faker faker = net.datafaker.Faker.instance(); + + LimitedExecTimePolicy p; + + String spoolName; + Duration maxExecTime; + + @Mock + WatchableSpoolJobState activeJob; + @Mock + Set queuedJobs; + @Mock + Set relativeBackgroundServices; + + @BeforeEach + void init() throws Exception { + openMocks(this).close(); + p = new LimitedExecTimePolicy(); + spoolName = faker.numerify("spoolName###"); + maxExecTime = Duration.ofMillis(faker.random().nextLong(1, 100000)); + } + + @AfterEach + void ends() { + verifyNoMoreInteractions(activeJob, queuedJobs, relativeBackgroundServices); + } + + private static Stream testIsStatusOk_noMaxExectime() { + return Stream.of( + Arguments.of((Duration) null), + Arguments.of(Duration.ZERO), + Arguments.of(Duration.ofMillis(-1))); + } + + @ParameterizedTest + @MethodSource + void testIsStatusOk_noMaxExectime(final Duration maxExecTime) throws JobWatchdogPolicyWarning { + p.setMaxExecTime(maxExecTime); + assertTrue(p.isStatusOk(spoolName, activeJob, queuedJobs).isEmpty()); + p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices); + } + + @Test + void testIsStatusOk_notThisSpool() throws JobWatchdogPolicyWarning { + p.setMaxExecTime(maxExecTime); + p.setOnlySpools(Set.of("onlythis")); + assertTrue(p.isStatusOk(spoolName, activeJob, queuedJobs).isEmpty()); + p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices); + } + + @Test + void testIsStatusOk_noRun() throws JobWatchdogPolicyWarning { + p.setMaxExecTime(maxExecTime); + when(activeJob.getRunTime()).thenReturn(Optional.empty()); + assertTrue(p.isStatusOk(spoolName, activeJob, queuedJobs).isEmpty()); + p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices); + verify(activeJob, atLeast(1)).getRunTime(); + } + + @Test + void testIsStatusOk_runOk() throws JobWatchdogPolicyWarning { + final var delta = faker.random().nextLong(1, maxExecTime.toMillis() - 1); + p.setMaxExecTime(maxExecTime); + when(activeJob.getRunTime()).thenReturn(Optional.ofNullable(Duration.ofMillis(maxExecTime.toMillis() - delta))); + + assertEquals(Duration.ofMillis(delta), p.isStatusOk(spoolName, activeJob, queuedJobs).get()); + p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices); + + verify(activeJob, atLeast(1)).getRunTime(); + } + + @Test + void testIsStatusOk_runNok() throws JobWatchdogPolicyWarning { + final var delta = faker.random().nextLong(maxExecTime.toMillis() + 1, maxExecTime.toMillis() * 1000); + p.setMaxExecTime(maxExecTime); + when(activeJob.getRunTime()).thenReturn(Optional.ofNullable(Duration.ofMillis(maxExecTime.toMillis() + delta))); + + assertThrows(JobWatchdogPolicyWarning.class, + () -> p.isStatusOk(spoolName, activeJob, queuedJobs)); + assertThrows(JobWatchdogPolicyWarning.class, + () -> p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices)); + + verify(activeJob, atLeast(1)).getRunTime(); + } + + @Test + void testGetDescription() { + assertFalse(p.getDescription().isEmpty()); + p.setNotSpools(Set.of("Something")); + assertFalse(p.getDescription().isEmpty()); + p.setOnlySpools(Set.of("Something")); + assertFalse(p.getDescription().isEmpty()); + } + +} diff --git a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/LimitedServiceExecTimePolicyTest.java b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/LimitedServiceExecTimePolicyTest.java new file mode 100644 index 00000000..00545abf --- /dev/null +++ b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/LimitedServiceExecTimePolicyTest.java @@ -0,0 +1,142 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine.watchdog; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.openMocks; + +import java.time.Duration; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +import net.datafaker.Faker; + +class LimitedServiceExecTimePolicyTest { + + static Faker faker = net.datafaker.Faker.instance(); + + LimitedServiceExecTimePolicy p; + + String spoolName; + int waitFactor; + long timedInterval; + WatchableBackgroundService currentService; + WatchableBackgroundService shorterTimeService; + + @Mock + WatchableSpoolJobState activeJob; + @Mock + Set queuedJobs; + @Mock + Set relativeBackgroundServices; + + @BeforeEach + void init() throws Exception { + openMocks(this).close(); + p = new LimitedServiceExecTimePolicy(); + spoolName = faker.numerify("spoolName###"); + waitFactor = faker.random().nextInt(1, 10); + timedInterval = faker.random().nextLong(1, 100000); + + currentService = new WatchableBackgroundService(faker.numerify("serviceName###"), spoolName, timedInterval); + shorterTimeService = new WatchableBackgroundService( + faker.numerify("serviceName###"), spoolName, timedInterval / 2); + when(relativeBackgroundServices.stream()).thenReturn(Stream.of(shorterTimeService, currentService)); + } + + @AfterEach + void ends() { + verifyNoMoreInteractions(activeJob, queuedJobs, relativeBackgroundServices); + } + + @Test + void testIsStatusOk_noWaitFactor() throws JobWatchdogPolicyWarning { + p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices); + p.setWaitFactor(faker.random().nextInt(-100000, -1)); + p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices); + verifyNoInteractions(activeJob, queuedJobs, relativeBackgroundServices); + } + + @Test + void testIsStatusOk_notThisSpool() throws JobWatchdogPolicyWarning { + p.setWaitFactor(waitFactor); + p.setOnlySpools(Set.of("onlythis")); + p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices); + verifyNoInteractions(activeJob, queuedJobs, relativeBackgroundServices); + } + + @Test + void testIsStatusOk_noRun() throws JobWatchdogPolicyWarning { + p.setWaitFactor(waitFactor); + when(activeJob.getRunTime()).thenReturn(Optional.empty()); + p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices); + verify(activeJob, atLeast(1)).getRunTime(); + } + + @Test + void testIsStatusOk_runOk() throws JobWatchdogPolicyWarning { + p.setWaitFactor(waitFactor); + when(activeJob.getRunTime()).thenReturn(Optional.ofNullable(Duration.ofMillis(timedInterval - 1))); + + p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices); + verify(activeJob, atLeast(1)).getRunTime(); + verify(relativeBackgroundServices, atLeast(1)).stream(); + } + + @Test + void testIsStatusOk_runNok() throws JobWatchdogPolicyWarning { + p.setWaitFactor(waitFactor); + when(activeJob.getRunTime()) + .thenReturn(Optional.ofNullable(Duration.ofMillis(timedInterval * waitFactor + 2l))); + + assertThrows(JobWatchdogPolicyWarning.class, + () -> p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices)); + verify(activeJob, atLeast(1)).getRunTime(); + verify(relativeBackgroundServices, atLeast(1)).stream(); + } + + @Test + void testGetDescription() { + assertFalse(p.getDescription().isEmpty()); + p.setNotSpools(Set.of("Something")); + assertFalse(p.getDescription().isEmpty()); + p.setOnlySpools(Set.of("Something")); + assertFalse(p.getDescription().isEmpty()); + } + + @Test + void testIsStatusOk_notService() throws JobWatchdogPolicyWarning { + p.setWaitFactor(waitFactor); + assertTrue(p.isStatusOk(spoolName, activeJob, queuedJobs).isEmpty()); + p.setWaitFactor(faker.random().nextInt()); + assertTrue(p.isStatusOk(spoolName, activeJob, queuedJobs).isEmpty()); + } + +} diff --git a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/LimitedSpoolsBasePolicyTest.java b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/LimitedSpoolsBasePolicyTest.java new file mode 100644 index 00000000..b701741d --- /dev/null +++ b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/LimitedSpoolsBasePolicyTest.java @@ -0,0 +1,116 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine.watchdog; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import net.datafaker.Faker; + +class LimitedSpoolsBasePolicyTest { + + private static Faker faker = net.datafaker.Faker.instance(); + + class Policy extends LimitedSpoolsBasePolicy { + + @Override + public Optional isStatusOk(final String spoolName, + final WatchableSpoolJobState activeJob, + final Set queuedJobs) throws JobWatchdogPolicyWarning { + throw new UnsupportedOperationException(); + } + + @Override + public void isStatusOk(final String spoolName, + final WatchableSpoolJobState activeJob, + final Set queuedJobs, + final Set relativeBackgroundServices) throws JobWatchdogPolicyWarning { + throw new UnsupportedOperationException(); + } + + @Override + public String getDescription() { + throw new UnsupportedOperationException(); + } + + } + + Policy p; + Set onlySpools; + Set notSpools; + String value; + + @BeforeEach + void init() throws Exception { + p = new Policy(); + onlySpools = new HashSet<>(); + notSpools = new HashSet<>(); + value = faker.numerify("###"); + } + + @Test + void testGetOnlySpools() { + assertEquals(Set.of(), p.getOnlySpools()); + assertThrows(UnsupportedOperationException.class, () -> p.getOnlySpools().add("")); // NOSONAR S5778 + p.setOnlySpools(onlySpools); + assertEquals(Set.of(), p.getOnlySpools()); + assertThrows(UnsupportedOperationException.class, () -> p.getOnlySpools().add("")); // NOSONAR S5778 + onlySpools.add(value); + assertEquals(Set.of(value), p.getOnlySpools()); + } + + @Test + void testGetNotSpools() { + assertEquals(Set.of(), p.getNotSpools()); + assertThrows(UnsupportedOperationException.class, () -> p.getNotSpools().add("")); // NOSONAR S5778 + p.setNotSpools(notSpools); + assertEquals(Set.of(), p.getNotSpools()); + assertThrows(UnsupportedOperationException.class, () -> p.getNotSpools().add("")); // NOSONAR S5778 + notSpools.add(value); + assertEquals(Set.of(value), p.getNotSpools()); + } + + @Test + void testAllowSpool() { + p.setNotSpools(notSpools); + p.setOnlySpools(onlySpools); + + assertTrue(p.allowSpool(value)); + notSpools.add("A"); + assertTrue(p.allowSpool(value)); + assertFalse(p.allowSpool("A")); + + notSpools.clear(); + onlySpools.add("A"); + assertFalse(p.allowSpool(value)); + assertTrue(p.allowSpool("A")); + + notSpools.add("A"); + assertTrue(p.allowSpool("A")); + assertFalse(p.allowSpool(value)); + } +} diff --git a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/MaxSpoolQueueSizePolicyTest.java b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/MaxSpoolQueueSizePolicyTest.java new file mode 100644 index 00000000..832de514 --- /dev/null +++ b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/MaxSpoolQueueSizePolicyTest.java @@ -0,0 +1,122 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine.watchdog; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.MockitoAnnotations.openMocks; + +import java.time.Duration; +import java.util.Set; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +import net.datafaker.Faker; + +class MaxSpoolQueueSizePolicyTest { + static Faker faker = net.datafaker.Faker.instance(); + + MaxSpoolQueueSizePolicy p; + + String spoolName; + Duration checkTime; + + @Mock + WatchableSpoolJobState activeJob; + @Mock + Set queuedJobs; + @Mock + Set relativeBackgroundServices; + + @BeforeEach + void init() throws Exception { + openMocks(this).close(); + p = new MaxSpoolQueueSizePolicy(); + spoolName = faker.numerify("spoolName###"); + checkTime = Duration.ofMillis(faker.random().nextLong(1, 100000)); + p.setCheckTime(checkTime); + } + + @AfterEach + void ends() { + verifyNoMoreInteractions(activeJob, queuedJobs, relativeBackgroundServices); + } + + @Test + void testIsStatusOk_0maxsize() throws JobWatchdogPolicyWarning { + p.setMaxSize(-1); + assertTrue(p.isStatusOk(spoolName, activeJob, queuedJobs).isEmpty()); + p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices); + } + + @Test + void testIsStatusOk_notThisSpool() throws JobWatchdogPolicyWarning { + p.setOnlySpools(Set.of("onlythis")); + assertTrue(p.isStatusOk(spoolName, activeJob, queuedJobs).isEmpty()); + p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices); + } + + @Test + void testIsStatusOk() throws JobWatchdogPolicyWarning { + p.setMaxSize(5); + when(queuedJobs.size()).thenReturn(1); + assertEquals(checkTime, p.isStatusOk(spoolName, activeJob, queuedJobs).get()); + p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices); + verify(queuedJobs, atLeast(1)).size(); + } + + @Test + void testIsStatusOk_noCheckTime() throws JobWatchdogPolicyWarning { + p.setCheckTime(null); + p.setMaxSize(5); + when(queuedJobs.size()).thenReturn(1); + assertEquals(MaxSpoolQueueSizePolicy.DEFAULT_CHECKTIME, + p.isStatusOk(spoolName, activeJob, queuedJobs).get()); + p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices); + verify(queuedJobs, atLeast(1)).size(); + } + + @Test + void testIsStatusOk_maxSize() throws JobWatchdogPolicyWarning { + p.setMaxSize(5); + when(queuedJobs.size()).thenReturn(6); + assertThrows(JobWatchdogPolicyWarning.class, + () -> p.isStatusOk(spoolName, activeJob, queuedJobs)); + assertThrows(JobWatchdogPolicyWarning.class, + () -> p.isStatusOk(spoolName, activeJob, queuedJobs, relativeBackgroundServices)); + verify(queuedJobs, atLeast(1)).size(); + } + + @Test + void testGetDescription() { + assertFalse(p.getDescription().isEmpty()); + p.setNotSpools(Set.of("Something")); + assertFalse(p.getDescription().isEmpty()); + p.setOnlySpools(Set.of("Something")); + assertFalse(p.getDescription().isEmpty()); + } + +} diff --git a/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/WatchableSpoolJobStateTest.java b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/WatchableSpoolJobStateTest.java new file mode 100644 index 00000000..e44a5aeb --- /dev/null +++ b/jobkit/engine/src/test/java/tv/hd3g/jobkit/engine/watchdog/WatchableSpoolJobStateTest.java @@ -0,0 +1,44 @@ +/* + * This file is part of jobkit-engine. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.engine.watchdog; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Date; +import java.util.Optional; + +import org.junit.jupiter.api.Test; + +class WatchableSpoolJobStateTest { + + WatchableSpoolJobState w; + + @Test + void testGetRunTime() { + final var now = System.currentTimeMillis() - 1; + w = new WatchableSpoolJobState(new Date(), null, 0, Optional.empty(), Optional.ofNullable(now)); + assertTrue(w.getRunTime().get().toMillis() > 0l); + assertTrue(w.getRunTime().get().toMillis() < 100l); + } + + @Test + void testGetRunTime_empty() { + w = new WatchableSpoolJobState(new Date(), null, 0, Optional.empty(), Optional.empty()); + assertTrue(w.getRunTime().isEmpty()); + } + +} diff --git a/jobkit/engine/src/test/resources/logback.xml b/jobkit/engine/src/test/resources/logback.xml index c103ef75..7ecd23ce 100644 --- a/jobkit/engine/src/test/resources/logback.xml +++ b/jobkit/engine/src/test/resources/logback.xml @@ -32,6 +32,7 @@ + diff --git a/jobkit/springboot-service/src/main/java/tv/hd3g/jobkit/Setup.java b/jobkit/springboot-service/src/main/java/tv/hd3g/jobkit/Setup.java index 093aa8c8..1b7babe2 100644 --- a/jobkit/springboot-service/src/main/java/tv/hd3g/jobkit/Setup.java +++ b/jobkit/springboot-service/src/main/java/tv/hd3g/jobkit/Setup.java @@ -22,5 +22,4 @@ @Configuration @ComponentScan(basePackages = { "tv.hd3g.jobkit" }) public class Setup { - } diff --git a/jobkit/springboot-service/src/main/java/tv/hd3g/jobkit/mod/JobKitSetup.java b/jobkit/springboot-service/src/main/java/tv/hd3g/jobkit/mod/JobKitSetup.java index dfe93cce..9d9285b3 100644 --- a/jobkit/springboot-service/src/main/java/tv/hd3g/jobkit/mod/JobKitSetup.java +++ b/jobkit/springboot-service/src/main/java/tv/hd3g/jobkit/mod/JobKitSetup.java @@ -69,8 +69,15 @@ SupervisableServiceSupplier getSupervisableSupplier(final SupervisableManager su JobKitEngine getJobKitEngine(final ScheduledExecutorService scheduledExecutor, final ExecutionEvent executionEvent, final BackgroundServiceEvent backgroundServiceEvent, - final SupervisableManager supervisableManager) { - return new JobKitEngine(scheduledExecutor, executionEvent, backgroundServiceEvent, supervisableManager); + final SupervisableManager supervisableManager, + final JobKitWatchdogConfig watchdogConfig) { + final var jobKit = new JobKitEngine(scheduledExecutor, executionEvent, backgroundServiceEvent, + supervisableManager); + final var watchdog = jobKit.getJobKitWatchdog(); + watchdogConfig.getMaxSpoolQueueSize().forEach(watchdog::addPolicies); + watchdogConfig.getLimitedExecTime().forEach(watchdog::addPolicies); + watchdogConfig.getLimitedServiceExecTime().forEach(watchdog::addPolicies); + return jobKit; } } diff --git a/jobkit/springboot-service/src/main/java/tv/hd3g/jobkit/mod/JobKitWatchdogConfig.java b/jobkit/springboot-service/src/main/java/tv/hd3g/jobkit/mod/JobKitWatchdogConfig.java new file mode 100644 index 00000000..19359a3e --- /dev/null +++ b/jobkit/springboot-service/src/main/java/tv/hd3g/jobkit/mod/JobKitWatchdogConfig.java @@ -0,0 +1,48 @@ +/* + * This file is part of jobkit. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * Copyright (C) hdsdi3g for hd3g.tv 2023 + * + */ +package tv.hd3g.jobkit.mod; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import jakarta.annotation.PostConstruct; +import lombok.Data; +import tv.hd3g.jobkit.engine.watchdog.LimitedExecTimePolicy; +import tv.hd3g.jobkit.engine.watchdog.LimitedServiceExecTimePolicy; +import tv.hd3g.jobkit.engine.watchdog.MaxSpoolQueueSizePolicy; + +@Configuration +@ConfigurationProperties(prefix = "jobkit.watchdogpolicies") +@Data +public class JobKitWatchdogConfig { + + private List maxSpoolQueueSize; + private List limitedExecTime; + private List limitedServiceExecTime; + + @PostConstruct + void init() { + maxSpoolQueueSize = Optional.ofNullable(maxSpoolQueueSize).orElse(new ArrayList<>()); + limitedExecTime = Optional.ofNullable(limitedExecTime).orElse(new ArrayList<>()); + limitedServiceExecTime = Optional.ofNullable(limitedServiceExecTime).orElse(new ArrayList<>()); + } + +} diff --git a/jobkit/springboot-service/src/test/java/tv/hd3g/jobkit/mod/JobKitSetupTest.java b/jobkit/springboot-service/src/test/java/tv/hd3g/jobkit/mod/JobKitSetupTest.java index 77fcb7a4..e3184fff 100644 --- a/jobkit/springboot-service/src/test/java/tv/hd3g/jobkit/mod/JobKitSetupTest.java +++ b/jobkit/springboot-service/src/test/java/tv/hd3g/jobkit/mod/JobKitSetupTest.java @@ -17,20 +17,35 @@ package tv.hd3g.jobkit.mod; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.MockitoAnnotations.openMocks; +import java.time.Duration; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.TestPropertySource; + +import tv.hd3g.jobkit.engine.JobKitEngine; +import tv.hd3g.jobkit.engine.watchdog.LimitedExecTimePolicy; +import tv.hd3g.jobkit.engine.watchdog.LimitedServiceExecTimePolicy; +import tv.hd3g.jobkit.engine.watchdog.MaxSpoolQueueSizePolicy; @SpringBootTest class JobKitSetupTest { @Autowired ScheduledExecutorService scheduledExecutorService; + @Autowired + JobKitEngine jobKitEngine; @Test void testGetScheduledExecutor() throws InterruptedException { @@ -39,4 +54,59 @@ void testGetScheduledExecutor() throws InterruptedException { latch.await(100, TimeUnit.MILLISECONDS); } + @Test + void testGetJobKitEngineWatchdog() { + assertTrue(jobKitEngine.getJobKitWatchdog().getPolicies().isEmpty()); + } + + @SpringBootTest + @TestPropertySource(locations = "classpath:application-watchdogpolicies.yml") + static class Watchdog { + + @Autowired + JobKitEngine jobKitEngine; + @Autowired + JobKitWatchdogConfig watchdogConfig; + + @BeforeEach + void init() throws Exception { + openMocks(this).close(); + } + + @AfterEach + void end() { + } + + @Test + void testWatchdogPolicies() { + final var p = jobKitEngine.getJobKitWatchdog().getPolicies(); + assertEquals(3, p.size()); + + final var maxSpoolQueueSize = p.stream() + .filter(f -> f instanceof MaxSpoolQueueSizePolicy) + .map(f -> (MaxSpoolQueueSizePolicy) f) + .findFirst() + .get(); + assertEquals(10, maxSpoolQueueSize.getMaxSize()); + assertEquals(Set.of("AA"), maxSpoolQueueSize.getOnlySpools()); + + final var limitedExecTime = p.stream() + .filter(f -> f instanceof LimitedExecTimePolicy) + .map(f -> (LimitedExecTimePolicy) f) + .findFirst() + .get(); + assertEquals(Duration.ofMillis(10000), limitedExecTime.getMaxExecTime()); + assertEquals(Set.of("BB"), limitedExecTime.getOnlySpools()); + + final var limitedServiceExecTimePolicy = p.stream() + .filter(f -> f instanceof LimitedServiceExecTimePolicy) + .map(f -> (LimitedServiceExecTimePolicy) f) + .findFirst() + .get(); + assertEquals(5, limitedServiceExecTimePolicy.getWaitFactor()); + assertEquals(Set.of("CC"), limitedServiceExecTimePolicy.getOnlySpools()); + } + + } + } diff --git a/jobkit/springboot-service/src/test/resources/application-watchdogpolicies.yml b/jobkit/springboot-service/src/test/resources/application-watchdogpolicies.yml new file mode 100644 index 00000000..11e351d0 --- /dev/null +++ b/jobkit/springboot-service/src/test/resources/application-watchdogpolicies.yml @@ -0,0 +1,17 @@ +spring: + main: + banner-mode: "off" + log-startup-info: false + web-application-type: none + +jobkit: + watchdogpolicies: + maxSpoolQueueSize: + - maxSize: 10 + onlySpools: ["AA"] + limitedExecTime: + - maxExecTime: 10s + onlySpools: ["BB"] + limitedServiceExecTime: + - waitFactor: 5 + onlySpools: ["CC"] diff --git a/jobkit/springboot-service/src/test/resources/application.yml b/jobkit/springboot-service/src/test/resources/application.yml index 8be3542c..6991ded7 100644 --- a/jobkit/springboot-service/src/test/resources/application.yml +++ b/jobkit/springboot-service/src/test/resources/application.yml @@ -5,57 +5,4 @@ spring: main: banner-mode: "off" log-startup-info: false -server: - port: 0 - -jobkit: - processrunners: - disabled-at-start: true - reply-to: reply@jobkkit.local - send-from: from@jobkkit.local - send-to-admin: [admin1@jobkkit.local, admin2@jobkkit.local] - sender-reference: send-ref-email - default-template-name-done: "def-tpl-ok" - default-template-name-error: "def-tpl-err" - services: - #0 - - name: java-version - spool-name: test-spool - command-line: java0 -version - comment: Just run java version - env: - env1: value1 - period-time: 5s - priority: 2 - run-first-at-boot: true - working-dir: "." - retry-after-time-factor: 3 - after-done: - add-to-template-vars: - varA: valueA - lang: fr_FR - reply-to: reply-java-version@jobkkit.local - send-to: java-version@jobkkit.local - send-cc: java-version-cc@jobkkit.local - template-name: tpl-java-version-ok - after-error: - add-to-template-vars: - varB: valueB - lang: en_US - reply-to: never-working-rply@jobkkit.local - send-to: to-never-working@jobkkit.local - send-cc: never-working-cc@jobkkit.local - template-name: tpl-never-working-err - #1 - - name: java-lazy-version - spool-name: test-spool2 - command-line: java-wo-cmdline - comment: Just run lazy java version - period-time: 1m - #2 - - name: never-configured-workingdir - spool-name: test-spool - command-line: java2 -version - comment: Never run - period-time: 5s - working-dir: /this/never/will/exists + web-application-type: none diff --git a/mailkit/src/main/java/tv/hd3g/mailkit/mod/configuration/MailKitSetup.java b/mailkit/src/main/java/tv/hd3g/mailkit/mod/configuration/MailKitSetup.java index 4717b85a..ae4eb472 100644 --- a/mailkit/src/main/java/tv/hd3g/mailkit/mod/configuration/MailKitSetup.java +++ b/mailkit/src/main/java/tv/hd3g/mailkit/mod/configuration/MailKitSetup.java @@ -32,7 +32,7 @@ import org.thymeleaf.templateresolver.ITemplateResolver; import tv.hd3g.commons.version.EnvironmentVersion; -import tv.hd3g.jobkit.engine.SupervisableManager; +import tv.hd3g.jobkit.engine.SupervisableEventRegister; import tv.hd3g.mailkit.mod.component.Translate; import tv.hd3g.mailkit.mod.service.AppNotificationService; import tv.hd3g.mailkit.notification.NotificationManager; @@ -95,14 +95,14 @@ NotificationManager getNotificationManager(final ResourceBundleMessageSource rbm final MailKitConfig config, final JavaMailSender mailSender, final Translate translate, - final SupervisableManager supervisableManager, + final SupervisableEventRegister supervisableEventRegister, final EnvironmentVersion environmentVersion) { Optional.ofNullable(appNotificationService.getMessageSourceBasename()) .ifPresent(rbms::addBasenames); final var toolkit = new NotificationMailTemplateToolkit(translate, config.getEnv(), environmentVersion); final var setupEngine = new NotificationEngineMailSetup( - supervisableManager, + supervisableEventRegister, appNotificationService, mailSender, config.getSenderAddr(), @@ -117,7 +117,7 @@ NotificationManager getNotificationManager(final ResourceBundleMessageSource rbm new NotificationEngineMailTemplateDebug(toolkit), setupEngine); - return new NotificationManager().register(router).register(supervisableManager); + return new NotificationManager().register(router).register(supervisableEventRegister); } } diff --git a/mailkit/src/main/java/tv/hd3g/mailkit/notification/NotificationManager.java b/mailkit/src/main/java/tv/hd3g/mailkit/notification/NotificationManager.java index e57918f5..723b697d 100644 --- a/mailkit/src/main/java/tv/hd3g/mailkit/notification/NotificationManager.java +++ b/mailkit/src/main/java/tv/hd3g/mailkit/notification/NotificationManager.java @@ -22,7 +22,7 @@ import java.util.Objects; import lombok.extern.slf4j.Slf4j; -import tv.hd3g.jobkit.engine.SupervisableManager; +import tv.hd3g.jobkit.engine.SupervisableEventRegister; @Slf4j public class NotificationManager { @@ -32,7 +32,7 @@ public NotificationManager() { routers = Collections.synchronizedList(new ArrayList<>()); } - public NotificationManager register(final SupervisableManager supervisable) { + public NotificationManager register(final SupervisableEventRegister supervisable) { Objects.requireNonNull(supervisable, "\"supervisable\" can't to be null"); if (routers.isEmpty()) { throw new IllegalStateException("Can't register SupervisableManager: no Router is registed."); diff --git a/mailkit/src/main/java/tv/hd3g/mailkit/notification/implmail/NotificationEngineMailSetup.java b/mailkit/src/main/java/tv/hd3g/mailkit/notification/implmail/NotificationEngineMailSetup.java index 693c6de6..098be6a7 100644 --- a/mailkit/src/main/java/tv/hd3g/mailkit/notification/implmail/NotificationEngineMailSetup.java +++ b/mailkit/src/main/java/tv/hd3g/mailkit/notification/implmail/NotificationEngineMailSetup.java @@ -18,11 +18,11 @@ import org.springframework.mail.javamail.JavaMailSender; -import tv.hd3g.jobkit.engine.SupervisableManager; +import tv.hd3g.jobkit.engine.SupervisableEventRegister; import tv.hd3g.mailkit.mod.service.AppNotificationService; import tv.hd3g.mailkit.notification.NotificationGroup; -public record NotificationEngineMailSetup(SupervisableManager supervisableManager, +public record NotificationEngineMailSetup(SupervisableEventRegister supervisableManager, AppNotificationService appNotificationService, JavaMailSender mailSender, String senderAddr, diff --git a/mailkit/src/test/java/tv/hd3g/mailkit/notification/NotificationManagerTest.java b/mailkit/src/test/java/tv/hd3g/mailkit/notification/NotificationManagerTest.java index 9512b370..1318b038 100644 --- a/mailkit/src/test/java/tv/hd3g/mailkit/notification/NotificationManagerTest.java +++ b/mailkit/src/test/java/tv/hd3g/mailkit/notification/NotificationManagerTest.java @@ -32,7 +32,7 @@ import net.datafaker.Faker; import tv.hd3g.jobkit.engine.SupervisableEndEvent; -import tv.hd3g.jobkit.engine.SupervisableManager; +import tv.hd3g.jobkit.engine.SupervisableEventRegister; import tv.hd3g.jobkit.engine.SupervisableOnEndEventConsumer; class NotificationManagerTest { @@ -42,7 +42,7 @@ class NotificationManagerTest { NotificationManager n; @Mock - SupervisableManager supervisable; + SupervisableEventRegister supervisable; @Mock SupervisableEndEvent event; @Mock diff --git a/mailkit/src/test/java/tv/hd3g/mailkit/notification/implmail/NotificationRouterMailTest.java b/mailkit/src/test/java/tv/hd3g/mailkit/notification/implmail/NotificationRouterMailTest.java index 93c4d10e..09dbaa57 100644 --- a/mailkit/src/test/java/tv/hd3g/mailkit/notification/implmail/NotificationRouterMailTest.java +++ b/mailkit/src/test/java/tv/hd3g/mailkit/notification/implmail/NotificationRouterMailTest.java @@ -49,7 +49,7 @@ import tv.hd3g.jobkit.engine.SupervisableContextExtractor; import tv.hd3g.jobkit.engine.SupervisableEndEvent; import tv.hd3g.jobkit.engine.SupervisableEventMark; -import tv.hd3g.jobkit.engine.SupervisableManager; +import tv.hd3g.jobkit.engine.SupervisableEventRegister; import tv.hd3g.mailkit.mod.service.AppNotificationService; import tv.hd3g.mailkit.notification.NotificationGroup; import tv.hd3g.mailkit.notification.SupervisableUtility; @@ -84,7 +84,7 @@ class NotificationRouterMailTest { @Mock NotificationMailMessageProducer engineDebugTemplate; @Mock - SupervisableManager supervisableManager; + SupervisableEventRegister supervisableManager; @Mock AppNotificationService appNotificationService; @Mock