Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2.0] Expire monitored jobs #484

Merged
merged 1 commit into from
Jan 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/horizon.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
'trim' => [
'recent' => 60,
'failed' => 10080,
'monitored' => 10080,
],

/*
Expand Down
7 changes: 7 additions & 0 deletions src/Contracts/JobRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ public function trimRecentJobs();
*/
public function trimFailedJobs();

/**
* Trim the monitored job list.
*
* @return void
*/
public function trimMonitoredJobs();

/**
* Find a failed job by ID.
*
Expand Down
1 change: 1 addition & 0 deletions src/EventMap.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ trait EventMap
Events\MasterSupervisorLooped::class => [
Listeners\TrimRecentJobs::class,
Listeners\TrimFailedJobs::class,
Listeners\TrimMonitoredJobs::class,
Listeners\ExpireSupervisors::class,
Listeners\MonitorMasterSupervisorMemory::class,
],
Expand Down
47 changes: 47 additions & 0 deletions src/Listeners/TrimMonitoredJobs.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

namespace Laravel\Horizon\Listeners;

use Cake\Chronos\Chronos;
use Laravel\Horizon\Contracts\JobRepository;
use Laravel\Horizon\Events\MasterSupervisorLooped;

class TrimMonitoredJobs
{
/**
* The last time the monitored jobs were trimmed.
*
* @var \Cake\Chronos\Chronos
*/
public $lastTrimmed;

/**
* How many minutes to wait in between each trim.
*
* @var int
*/
public $frequency = 1440;

/**
* Handle the event.
*
* @param \Laravel\Horizon\Events\MasterSupervisorLooped $event
* @return void
*/
public function handle(MasterSupervisorLooped $event)
{
if (! isset($this->lastTrimmed)) {
$this->frequency = max(1, intdiv(
config('horizon.trim.monitored', 10080), 12
));

$this->lastTrimmed = Chronos::now()->subMinutes($this->frequency + 1);
}

if ($this->lastTrimmed->lte(Chronos::now()->subMinutes($this->frequency))) {
app(JobRepository::class)->trimMonitoredJobs();

$this->lastTrimmed = Chronos::now();
}
}
}
40 changes: 38 additions & 2 deletions src/Repositories/RedisJobRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ class RedisJobRepository implements JobRepository
*/
public $failedJobExpires;

/**
* The number of minutes until monitored jobs should be purged.
*
* @var int
*/
public $monitoredJobExpires;

/**
* Create a new repository instance.
*
Expand All @@ -53,6 +60,7 @@ public function __construct(RedisFactory $redis)
$this->redis = $redis;
$this->recentJobExpires = config('horizon.trim.recent', 60);
$this->failedJobExpires = config('horizon.trim.failed', 10080);
$this->monitoredJobExpires = config('horizon.trim.monitored', 10080);
}

/**
Expand Down Expand Up @@ -293,7 +301,7 @@ public function released($connection, $queue, JobPayload $payload)
}

/**
* Store a monitored job.
* Mark the job as completed and monitored.
*
* @param string $connection
* @param string $queue
Expand All @@ -303,6 +311,8 @@ public function released($connection, $queue, JobPayload $payload)
public function remember($connection, $queue, JobPayload $payload)
{
$this->connection()->pipeline(function ($pipe) use ($connection, $queue, $payload) {
$this->storeMonitoredReferences($pipe, $payload->id());

$pipe->hmset(
$payload->id(), [
'id' => $payload->id(),
Expand All @@ -315,10 +325,24 @@ public function remember($connection, $queue, JobPayload $payload)
]
);

$pipe->persist($payload->id());
$pipe->expireat(
$payload->id(), Chronos::now()->addMinutes($this->monitoredJobExpires)->getTimestamp()
);
});
}

/**
* Store the look-up references for a monitored job.
*
* @param mixed $pipe
* @param string $id
* @return void
*/
protected function storeMonitoredReferences($pipe, $id)
{
$pipe->zadd('monitored_jobs', str_replace(',', '.', microtime(true) * -1), $id);
}

/**
* Mark the given jobs as released / pending.
*
Expand Down Expand Up @@ -457,6 +481,18 @@ public function trimFailedJobs()
);
}

/**
* Trim the monitored job list.
*
* @return void
*/
public function trimMonitoredJobs()
{
$this->connection()->zremrangebyscore(
'monitored_jobs', Chronos::now()->subMinutes($this->monitoredJobExpires)->getTimestamp() * -1, '+inf'
);
}

/**
* Find a failed job by ID.
*
Expand Down
1 change: 1 addition & 0 deletions src/ServiceBindings.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ trait ServiceBindings
Contracts\HorizonCommandQueue::class => RedisHorizonCommandQueue::class,
Listeners\TrimRecentJobs::class,
Listeners\TrimFailedJobs::class,
Listeners\TrimMonitoredJobs::class,
Lock::class,
Stopwatch::class,

Expand Down
2 changes: 1 addition & 1 deletion tests/Feature/MonitoringTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public function test_completed_jobs_are_stored_in_database_when_one_of_their_tag
$id = Queue::push(new Jobs\BasicJob);
$this->work();
$this->assertEquals(1, $this->monitoredJobs('first'));
$this->assertEquals(-1, Redis::connection('horizon')->ttl($id));
$this->assertGreaterThan(0, Redis::connection('horizon')->ttl($id));
}

public function test_completed_jobs_are_removed_from_database_when_their_tag_is_no_longer_monitored()
Expand Down
35 changes: 35 additions & 0 deletions tests/Feature/TrimMonitoredJobsTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

namespace Laravel\Horizon\Tests\Feature;

use Mockery;
use Cake\Chronos\Chronos;
use Laravel\Horizon\MasterSupervisor;
use Laravel\Horizon\Tests\IntegrationTest;
use Laravel\Horizon\Contracts\JobRepository;
use Laravel\Horizon\Listeners\TrimMonitoredJobs;
use Laravel\Horizon\Events\MasterSupervisorLooped;

class TrimMonitoredJobsTest extends IntegrationTest
{
public function test_trimmer_has_a_cooldown_period()
{
$trim = new TrimMonitoredJobs;

$repository = Mockery::mock(JobRepository::class);
$repository->shouldReceive('trimMonitoredJobs')->twice();
$this->app->instance(JobRepository::class, $repository);

// Should not be called first time since date is initialized...
$trim->handle(new MasterSupervisorLooped(Mockery::mock(MasterSupervisor::class)));

Chronos::setTestNow(Chronos::now()->addMinutes(1600));

// Should only be called twice...
$trim->handle(new MasterSupervisorLooped(Mockery::mock(MasterSupervisor::class)));
$trim->handle(new MasterSupervisorLooped(Mockery::mock(MasterSupervisor::class)));
$trim->handle(new MasterSupervisorLooped(Mockery::mock(MasterSupervisor::class)));

Chronos::setTestNow();
}
}