Skip to content

Commit

Permalink
fixup! Create JobKitWatchdog, clean SpoolerTest and BackgroundService…
Browse files Browse the repository at this point in the history
…EventTest #140
  • Loading branch information
hdsdi3g committed Jun 26, 2023
1 parent 8a347ef commit 0aa4d2a
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class BackgroundService {
private final BackgroundServiceEvent event;
private final RunnableWithException task;
private final RunnableWithException disableTask;
private final JobKitWatchdog jobKitWatchdog;

private boolean enabled;
private ScheduledFuture<?> nextRunReference;
Expand All @@ -36,13 +37,15 @@ public BackgroundService(final String name,
final Spooler spooler,
final ScheduledExecutorService scheduledExecutor,
final BackgroundServiceEvent event,
final JobKitWatchdog jobKitWatchdog,
final RunnableWithException task,
final RunnableWithException disableTask) {
this.name = name;
this.spoolName = spoolName;
this.spooler = spooler;
this.scheduledExecutor = scheduledExecutor;
this.event = event;
this.jobKitWatchdog = jobKitWatchdog;
this.task = task;
this.disableTask = disableTask;
hasFirstStarted = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class FlatBackgroundService extends BackgroundService {
final String spoolName,
final Runnable task,
final RunnableWithException disableTask) {
super(null, spoolName, null, scheduledExecutor, null, null, null);
super(null, spoolName, null, scheduledExecutor, null, null, null, null);
runReference = new FlatScheduledFuture(task);
this.disableTask = disableTask;
this.scheduledExecutor = scheduledExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public BackgroundService createService(final String name, // NOSONAR S1133
spooler,
scheduledExecutor,
backgroundServiceEvent,
jobKitWatchdog,
serviceTask,
onServiceDisableTask);
backgroundServices.add(service);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,23 @@ public class JobKitWatchdog {
this.scheduledExecutor = scheduledExecutor;
}

void startJob(final WatchableSpoolJob spoolJob, final long startTime) {
// TODO Auto-generated method stub

}

void endJob(final WatchableSpoolJob spoolJob, final long endTime) {
// TODO Auto-generated method stub
// spoolJob.getExecutorReferer().getQueueSize();
}

void addJob(final WatchableSpoolJob newJob) {
// TODO Auto-generated method stub

}

void shutdown() {
// TODO shutdown
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
@Slf4j
class SpoolExecutor {

@Getter
private final String name;
private final ExecutionEvent event;
private final AtomicLong threadCount;
Expand All @@ -26,16 +27,19 @@ class SpoolExecutor {
private final PriorityBlockingQueue<SpoolJob> queue;
private final AtomicBoolean shutdown;
private final SupervisableEvents supervisableEvents;
private final JobKitWatchdog jobKitWatchdog;

SpoolExecutor(final String name,
final ExecutionEvent event,
final AtomicLong threadCount,
final SupervisableEvents supervisableEvents) {
final SupervisableEvents supervisableEvents,
final JobKitWatchdog jobKitWatchdog) {
this.name = name;
this.event = event;
this.threadCount = threadCount;
this.jobKitWatchdog = jobKitWatchdog;
queueComparator = (l, r) -> {
final var compared = Integer.compare(r.priority, l.priority);
final var compared = Integer.compare(r.jobPriority, l.jobPriority);
if (compared == 0) {
return Long.compare(l.createdIndex, r.createdIndex);
}
Expand All @@ -55,7 +59,9 @@ boolean addToQueue(final RunnableWithException command,
log.error("Can't add to queue new command \"{}\" by \"{}\": the spool is shutdown", name, this.name);
return false;
}
queue.add(new SpoolJob(command, name, priority, afterRunCommand, this));
final var newJob = new SpoolJob(command, name, priority, afterRunCommand, this);
queue.add(newJob);
jobKitWatchdog.addJob(newJob);
log.debug("Add new command \"{}\" by \"{}\" with P{}", name, this.name, priority);
runNext();
return true;
Expand Down Expand Up @@ -134,21 +140,24 @@ private static Optional<StackTraceElement> getCaller() {
.findFirst();
}

private class SpoolJob extends Thread implements SupervisableSupplier {
private class SpoolJob extends Thread implements SupervisableSupplier, WatchableSpoolJob {

final RunnableWithException command;
@Getter
final String commandName;
final int priority;
final int jobPriority;
final Consumer<Exception> afterRunCommand;
@Getter
final SpoolExecutor executorReferer;
final AtomicReference<Supervisable> supervisableReference;
@Getter
final long createdIndex;
@Getter
final Optional<StackTraceElement> creator;

SpoolJob(final RunnableWithException command,
final String commandName,
final int priority,
final int jobPriority,
final Consumer<Exception> afterRunCommand,
final SpoolExecutor executorReferer) {
super();
Expand All @@ -159,7 +168,7 @@ private class SpoolJob extends Thread implements SupervisableSupplier {

this.command = command;
this.commandName = commandName;
this.priority = priority;
this.jobPriority = jobPriority;
this.afterRunCommand = afterRunCommand;
this.executorReferer = executorReferer;
supervisableReference = new AtomicReference<>();
Expand All @@ -174,6 +183,9 @@ private Supervisable createSupervisable(final String jobName) {

@Override
public void run() {
final var startTime = System.currentTimeMillis();
jobKitWatchdog.startJob(this, startTime);

var currentSupervisable = createSupervisable(commandName + " beforeRunJob");
try {
supervisableReference.set(currentSupervisable);
Expand All @@ -186,7 +198,6 @@ public void run() {
}

currentSupervisable = createSupervisable(commandName);
final var startTime = System.currentTimeMillis();
Exception error = null;
try {
log.debug("Start new command \"{}\" by \"{}\"", commandName, name);
Expand Down Expand Up @@ -233,6 +244,8 @@ public void run() {

supervisableReference.set(null);

jobKitWatchdog.endJob(this, endTime);

synchronized (queue) {
currentOperation.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -17,6 +18,7 @@ public class Spooler {
private final AtomicLong threadCount;
private final AtomicBoolean shutdown;
private final SupervisableEvents supervisableEvents;
@Getter
private final JobKitWatchdog jobKitWatchdog;

public Spooler(final ExecutionEvent event,
Expand All @@ -43,7 +45,7 @@ public SpoolExecutor getExecutor(final String name) {
return spoolExecutors.get(name);
}
return spoolExecutors.computeIfAbsent(name,
n -> new SpoolExecutor(n, event, threadCount, supervisableEvents));
n -> new SpoolExecutor(n, event, threadCount, supervisableEvents, jobKitWatchdog));
}

public int getAllQueuesSize() {
Expand All @@ -61,6 +63,7 @@ public void shutdown(final Set<String> spoolsNamesToKeepRunningToTheEnd) {
if (shutdown.get()) {
return;
}
jobKitWatchdog.shutdown();
shutdown.set(true);

final var count = getRunningQueuesCount();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* This file is part of jobkit-engine.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* Copyright (C) hdsdi3g for hd3g.tv 2023
*
*/
package tv.hd3g.jobkit.engine;

import java.util.Optional;

interface WatchableSpoolJob {

Optional<StackTraceElement> getCreator();

String getCommandName();

long getCreatedIndex();

SpoolExecutor getExecutorReferer();

default String getSpoolName() {
return getExecutorReferer().getName();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.longThat;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.internal.verification.VerificationModeFactory.atLeast;
Expand Down Expand Up @@ -54,21 +58,27 @@ void init() throws Exception {
spooler = new Spooler(new ExecutionEvent() {}, new SupervisableEvents() {}, jobKitWatchdog);
error = new AtomicReference<>();

service = new BackgroundService(name, spoolName, spooler, scheduledExecutorService, event, () -> {
latch.await(500, MILLISECONDS);
final var e = error.get();
if (e != null) {
throw e;
}
}, () -> {
endLatch.await(100, MILLISECONDS);
});
service = new BackgroundService(name, spoolName, spooler, scheduledExecutorService, event, jobKitWatchdog,
() -> {
latch.await(500, MILLISECONDS);
final var e = error.get();
if (e != null) {
throw e;
}
}, () -> {
endLatch.await(100, MILLISECONDS);
});
service.setTimedInterval(interval, TimeUnit.MILLISECONDS);
}

@AfterEach
void close() {
verify(jobKitWatchdog, atMost(100)).addJob(any(WatchableSpoolJob.class));
verify(jobKitWatchdog, atMost(100)).startJob(any(WatchableSpoolJob.class), anyLong());
verify(jobKitWatchdog, atMost(100)).endJob(any(WatchableSpoolJob.class), anyLong());

verifyNoMoreInteractions(event, jobKitWatchdog);

service.disable();
spooler.shutdown(Set.of());
scheduledExecutorService.shutdown();
Expand All @@ -91,9 +101,10 @@ void completeCycle_scheduleNextBackgroundServiceTask() throws InterruptedExcepti
verify(event, times(1)).onChangeEnabled(eq(name), eq(spoolName), any(boolean.class));
verify(event, atLeast(1)).onChangeTimedInterval(name, spoolName, interval);
verify(event, atLeast(1)).scheduleNextBackgroundServiceTask(
eq(name), eq(spoolName), eq(0), anyLong());
anyString(), anyString(), anyInt(), anyLong());
verify(event, atLeast(1)).planNextExec(eq(name), eq(spoolName), longThat(m -> m >= interval - 1l));
verify(event, atLeast(1)).nextBackgroundServiceTask(name, spoolName, 0);
reset(event);
}

@Test
Expand Down Expand Up @@ -160,6 +171,7 @@ void completeCycle_onPreviousRunWithError() throws InterruptedException {
eq(name), eq(spoolName), eq(0), longThat(m -> m >= interval));
verify(event, atLeast(1)).planNextExec(eq(name), eq(spoolName), longThat(m -> m >= interval - 1l));
verify(event, atLeast(1)).nextBackgroundServiceTask(name, spoolName, 0);
reset(event);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class BackgroundServiceTest {
ScheduledFuture<Object> nextRunReference;
@Mock
SpoolExecutor spoolExecutor;
@Mock
JobKitWatchdog jobKitWatchdog;

@Captor
ArgumentCaptor<RunnableWithException> commandCaptor;
Expand All @@ -69,7 +71,7 @@ void init() throws Exception {
spoolName = String.valueOf(System.nanoTime());
timedInterval = Math.abs(random.nextLong());
backgroundService = new BackgroundService(
name, spoolName, spooler, scheduledExecutor, event, task, disableTask);
name, spoolName, spooler, scheduledExecutor, event, jobKitWatchdog, task, disableTask);

when(scheduledExecutor.schedule(any(Runnable.class), eq(timedInterval), eq(MILLISECONDS)))
.then(invocation -> nextRunReference);
Expand Down
Loading

0 comments on commit 0aa4d2a

Please sign in to comment.