Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[5.x] Add autoScalingStrategy option #1254

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/AutoScaler.php
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,16 @@ protected function timeToClearPerQueue(Supervisor $supervisor, Collection $pools
protected function numberOfWorkersPerQueue(Supervisor $supervisor, Collection $queues)
{
$timeToClearAll = $queues->sum('time');
$totalJobs = $queues->sum('size');

return $queues->mapWithKeys(function ($timeToClear, $queue) use ($supervisor, $timeToClearAll) {
return $queues->mapWithKeys(function ($timeToClear, $queue) use ($supervisor, $timeToClearAll, $totalJobs) {
if ($timeToClearAll > 0 &&
$supervisor->options->autoScaling()) {
return [$queue => (($timeToClear['time'] / $timeToClearAll) * $supervisor->options->maxProcesses)];
$numberOfProcesses = $supervisor->options->autoScaleByNumberOfJobs()
? ($timeToClear['size'] / $totalJobs)
: ($timeToClear['time'] / $timeToClearAll);

return [$queue => $numberOfProcesses *= $supervisor->options->maxProcesses];
} elseif ($timeToClearAll == 0 &&
$supervisor->options->autoScaling()) {
return [
Expand Down
10 changes: 8 additions & 2 deletions src/Console/SupervisorCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class SupervisorCommand extends Command
{--sleep=3 : Number of seconds to sleep when no job is available}
{--timeout=60 : The number of seconds a child process can run}
{--tries=0 : Number of times to attempt a job before logging it failed}
{--auto-scaling-strategy=time : If supervisor should scale by jobs or time to complete}
{--balance-cooldown=3 : The number of seconds to wait in between auto-scaling attempts}
{--balance-max-shift=1 : The maximum number of processes to increase or decrease per one scaling}
{--workers-name=default : The name that should be assigned to the workers}
Expand Down Expand Up @@ -111,12 +112,16 @@ protected function supervisorOptions()
? $this->option('backoff')
: $this->option('delay');

$balance = $this->option('balance');

$autoScalingStrategy = $balance === 'auto' ? $this->option('auto-scaling-strategy') : null;

return new SupervisorOptions(
$this->argument('name'),
$this->argument('connection'),
$this->getQueue($this->argument('connection')),
$this->option('workers-name'),
$this->option('balance'),
$balance,
$backoff,
$this->option('max-time'),
$this->option('max-jobs'),
Expand All @@ -131,7 +136,8 @@ protected function supervisorOptions()
$this->option('balance-cooldown'),
$this->option('balance-max-shift'),
$this->option('parent-id'),
$this->option('rest')
$this->option('rest'),
$autoScalingStrategy
);
}

Expand Down
3 changes: 2 additions & 1 deletion src/QueueCommandString.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public static function toWorkerOptionsString(SupervisorOptions $options)
*/
public static function toSupervisorOptionsString(SupervisorOptions $options)
{
return sprintf('--workers-name=%s --balance=%s --max-processes=%s --min-processes=%s --nice=%s --balance-cooldown=%s --balance-max-shift=%s --parent-id=%s %s',
return sprintf('--workers-name=%s --balance=%s --max-processes=%s --min-processes=%s --nice=%s --balance-cooldown=%s --balance-max-shift=%s --parent-id=%s --auto-scaling-strategy=%s %s',
$options->workersName,
$options->balance,
$options->maxProcesses,
Expand All @@ -36,6 +36,7 @@ public static function toSupervisorOptionsString(SupervisorOptions $options)
$options->balanceCooldown,
$options->balanceMaxShift,
$options->parentId,
$options->autoScalingStrategy,
static::toOptionsString($options)
);
}
Expand Down
32 changes: 23 additions & 9 deletions src/SupervisorOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ class SupervisorOptions
*/
public $balance = 'off';

/**
* Indicates whether auto-scaling strategy should use "time" (time-to-complete) or "size" (total count of jobs) strategies.
*
* @var string|null
*/
public $autoScalingStrategy = null;

/**
* The maximum number of total processes to start when auto-scaling.
*
Expand Down Expand Up @@ -151,13 +158,6 @@ class SupervisorOptions
*/
public $rest;

/**
* Indicates if the supervisor should auto-scale.
*
* @var bool
*/
public $autoScale;

/**
* Create a new worker options instance.
*
Expand All @@ -181,6 +181,7 @@ class SupervisorOptions
* @param int $balanceMaxShift
* @param int $parentId
* @param int $rest
* @param string|null $autoScalingStrategy
*/
public function __construct($name,
$connection,
Expand All @@ -201,8 +202,9 @@ public function __construct($name,
$balanceCooldown = 3,
$balanceMaxShift = 1,
$parentId = 0,
$rest = 0)
{
$rest = 0,
$autoScalingStrategy = 'time'
) {
$this->name = $name;
$this->connection = $connection;
$this->queue = $queue ?: config('queue.connections.'.$connection.'.queue');
Expand All @@ -223,6 +225,7 @@ public function __construct($name,
$this->balanceMaxShift = $balanceMaxShift;
$this->parentId = $parentId;
$this->rest = $rest;
$this->autoScalingStrategy = $autoScalingStrategy;
}

/**
Expand Down Expand Up @@ -258,6 +261,16 @@ public function autoScaling()
return $this->balance === 'auto';
}

/**
* Determine if auto-scaling should be based on the number of jobs on the queue instead of time-to-clear.
*
* @return bool
*/
public function autoScaleByNumberOfJobs()
{
return $this->autoScalingStrategy === 'size';
}

/**
* Get the command-line representation of the options for a supervisor.
*
Expand Down Expand Up @@ -316,6 +329,7 @@ public function toArray()
'balanceMaxShift' => $this->balanceMaxShift,
'parentId' => $this->parentId,
'rest' => $this->rest,
'autoScalingStrategy' => $this->autoScalingStrategy,
];
}

Expand Down
2 changes: 1 addition & 1 deletion tests/Feature/AddSupervisorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public function test_add_supervisor_command_creates_new_supervisor_on_master_pro
$this->assertCount(1, $master->supervisors);

$this->assertSame(
'exec '.$phpBinary.' artisan horizon:supervisor my-supervisor redis --workers-name=default --balance=off --max-processes=1 --min-processes=1 --nice=0 --balance-cooldown=3 --balance-max-shift=1 --parent-id=0 --backoff=0 --max-time=0 --max-jobs=0 --memory=128 --queue="default" --sleep=3 --timeout=60 --tries=0 --rest=0',
'exec '.$phpBinary.' artisan horizon:supervisor my-supervisor redis --workers-name=default --balance=off --max-processes=1 --min-processes=1 --nice=0 --balance-cooldown=3 --balance-max-shift=1 --parent-id=0 --auto-scaling-strategy=time --backoff=0 --max-time=0 --max-jobs=0 --memory=128 --queue="default" --sleep=3 --timeout=60 --tries=0 --rest=0',
$master->supervisors->first()->process->getCommandLine()
);
}
Expand Down
18 changes: 18 additions & 0 deletions tests/Feature/AutoScalerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,22 @@ public function test_scaler_does_not_permit_going_to_zero_processes_despite_exce
$this->assertSame(14, $supervisor->processPools['first']->totalProcessCount());
$this->assertSame(1, $supervisor->processPools['second']->totalProcessCount());
}

public function test_scaler_assigns_more_processes_to_queue_with_more_jobs_when_using_size_strategy()
{
[$scaler, $supervisor] = $this->with_scaling_scenario(100, [
'first' => ['current' => 50, 'size' => 1000, 'runtime' => 10],
'second' => ['current' => 50, 'size' => 500, 'runtime' => 1000],
], ['autoScalingStrategy' => 'size']);

$scaler->scale($supervisor);

$this->assertSame(51, $supervisor->processPools['first']->totalProcessCount());
$this->assertSame(49, $supervisor->processPools['second']->totalProcessCount());

$scaler->scale($supervisor);

$this->assertSame(52, $supervisor->processPools['first']->totalProcessCount());
$this->assertSame(48, $supervisor->processPools['second']->totalProcessCount());
}
}