Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.x] Prevent horizon:purge from killing too many processes #820

Merged
merged 7 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/MasterSupervisor.php
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ public static function name()
*/
public static function basename()
{
$pathHash = substr(md5(__DIR__), 0, 4);

return static::$nameResolver
? call_user_func(static::$nameResolver)
: Str::slug(gethostname());
? call_user_func(static::$nameResolver).'-'.$pathHash
: Str::slug(gethostname()).'-'.$pathHash;
}

/**
Expand Down
57 changes: 47 additions & 10 deletions src/ProcessInspector.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ public function __construct(Exec $exec)
*/
public function current()
{
return array_diff(
$this->exec->run('pgrep -f [h]orizon'),
$this->exec->run('pgrep -f horizon:purge')
);
$supervisorBasename = MasterSupervisor::basename();

return $this->exec->run("pgrep -f '[h]orizon.*[ =]{$supervisorBasename}-'");
}

/**
Expand All @@ -46,17 +45,27 @@ public function current()
*/
public function orphaned()
{
return array_diff($this->current(), $this->monitoring());
return array_diff(
array_merge($this->current(), $this->mastersWithoutSupervisors()),
$this->validMonitoring()
);
}

/**
* Get all of the process IDs Horizon is actively monitoring.
* Get all of the process IDs that Horizon is actively monitoring and that are valid. For master processes "valid"
* means they have child processes (supervisors). For supervisors it means their parent processes (masters) exist.
*
* @return array
*/
public function monitoring()
public function validMonitoring()
{
$masters = $this->monitoredMastersWithSupervisors();
$masterNames = array_flip(Arr::pluck($masters, 'name'));

return collect(app(SupervisorRepository::class)->all())
->filter(function ($supervisor) use (&$masterNames) {
return isset($masterNames[data_get($supervisor, 'master')]);
})
->pluck('pid')
->pipe(function ($processes) {
$processes->each(function ($process) use (&$processes) {
Expand All @@ -65,8 +74,36 @@ public function monitoring()

return $processes;
})
->merge(
Arr::pluck(app(MasterSupervisorRepository::class)->all(), 'pid')
)->all();
->merge(Arr::pluck($masters, 'pid'))
->all();
}

/**
* Get data of master processes that have child processes (supervisors) and are monitored by Horizon.
*
* @return array
*/
public function monitoredMastersWithSupervisors()
{
return collect(app(MasterSupervisorRepository::class)->all())->filter(function ($master) {
return ! empty($this->exec->run('pgrep -P '.data_get($master, 'pid')));
})
->values()
->all();
}

/**
* Get IDs of all master Horizon processes that don't have any supervisors.
*
* @return array
*/
public function mastersWithoutSupervisors()
{
return collect($this->exec->run('pgrep -f [h]orizon$'))
->filter(function ($pid) {
return empty($this->exec->run('pgrep -P '.$pid));
})
->values()
->all();
}
}
61 changes: 59 additions & 2 deletions tests/Feature/ProcessInspectorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Laravel\Horizon\Contracts\MasterSupervisorRepository;
use Laravel\Horizon\Contracts\SupervisorRepository;
use Laravel\Horizon\Exec;
use Laravel\Horizon\MasterSupervisor;
use Laravel\Horizon\ProcessInspector;
use Laravel\Horizon\Tests\IntegrationTest;
use Mockery;
Expand All @@ -14,8 +15,10 @@ class ProcessInspectorTest extends IntegrationTest
public function test_finds_orphaned_process_ids()
{
$exec = Mockery::mock(Exec::class);
$exec->shouldReceive('run')->with('pgrep -f [h]orizon')->andReturn([1, 2, 3, 4, 5, 6]);
$exec->shouldReceive('run')->with('pgrep -f horizon:purge')->andReturn([]);
$exec->shouldReceive('run')->with(Mockery::pattern('/^pgrep -f \'\[h\]orizon\.\*\[ =\]/'))
->andReturn([1, 2, 3, 4, 5, 6]);
$exec->shouldReceive('run')->with('pgrep -f [h]orizon$')->andReturn([6]);
$exec->shouldReceive('run')->with('pgrep -P 6')->andReturn([2, 3]);
$exec->shouldReceive('run')->with('pgrep -P 2')->andReturn([4]);
$exec->shouldReceive('run')->with('pgrep -P 3')->andReturn([5]);
$this->app->instance(Exec::class, $exec);
Expand All @@ -24,9 +27,11 @@ public function test_finds_orphaned_process_ids()
$supervisors->shouldReceive('all')->andReturn([
[
'pid' => 2,
'master' => 'test',
],
[
'pid' => 3,
'master' => 'test',
],
]);
$this->app->instance(SupervisorRepository::class, $supervisors);
Expand All @@ -35,6 +40,7 @@ public function test_finds_orphaned_process_ids()
$masters->shouldReceive('all')->andReturn([
[
'pid' => 6,
'name' => 'test',
],
]);
$this->app->instance(MasterSupervisorRepository::class, $masters);
Expand All @@ -43,4 +49,55 @@ public function test_finds_orphaned_process_ids()

$this->assertEquals([1], $inspector->orphaned());
}

public function test_it_uses_master_supervisor_basename_to_find_current_processes()
{
$exec = Mockery::mock(Exec::class);
$this->app->instance(Exec::class, $exec);
$masterBasename = MasterSupervisor::basename();
$exec->shouldReceive('run')->with(Mockery::pattern("/pgrep -f '\[h\]orizon\.\*\[ =\].*{$masterBasename}-'/"))
->andReturn([1]);

$this->assertEquals([1], resolve(ProcessInspector::class)->current());
}

public function test_it_finds_monitored_masters_that_have_supervisor_processes()
{
$exec = Mockery::mock(Exec::class);
$this->app->instance(Exec::class, $exec);
$exec->shouldReceive('run')->with('pgrep -P 3')->andReturn([1, 2]);
$exec->shouldReceive('run')->with('pgrep -P 4')->andReturn([]);

$masters = Mockery::mock(MasterSupervisorRepository::class);
$this->app->instance(MasterSupervisorRepository::class, $masters);
$masters->shouldReceive('all')->andReturn([
[
'pid' => 3,
],
[
'pid' => 4,
],
]);

$inspector = resolve(ProcessInspector::class);

$this->assertEquals([
[
'pid' => 3,
],
], $inspector->monitoredMastersWithSupervisors());
}

public function test_it_finds_master_processes_without_supervisor_child_processes()
{
$exec = Mockery::mock(Exec::class);
$this->app->instance(Exec::class, $exec);
$exec->shouldReceive('run')->with('pgrep -f [h]orizon$')->andReturn([1, 2]);
$exec->shouldReceive('run')->with('pgrep -P 1')->andReturn([[3, 4]]);
$exec->shouldReceive('run')->with('pgrep -P 2')->andReturn([]);

$inspector = resolve(ProcessInspector::class);

$this->assertEquals([2], $inspector->mastersWithoutSupervisors());
}
}