Skip to content

Commit

Permalink
Remove forking and pcntl requirements while still supporting timeouts. (
Browse files Browse the repository at this point in the history
#15650)

* remove need to use forking while still supporting timeouts

* Applied fixes from StyleCI (#15649)
  • Loading branch information
taylorotwell authored Sep 28, 2016
1 parent 5a719af commit b949244
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 186 deletions.
213 changes: 213 additions & 0 deletions src/Illuminate/Queue/Console/DaemonCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
<?php

namespace Illuminate\Queue\Console;

use Carbon\Carbon;
use Illuminate\Queue\Worker;
use Illuminate\Console\Command;
use Illuminate\Queue\WorkerOptions;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\JobProcessed;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;

class DaemonCommand extends Command
{
/**
* The console command name.
*
* @var string
*/
protected $name = 'queue:daemon';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Daemon worker process that should not be called directly (use queue:work)';

/**
* The queue worker instance.
*
* @var \Illuminate\Queue\Worker
*/
protected $worker;

/**
* Create a new queue listen command.
*
* @param \Illuminate\Queue\Worker $worker
* @return void
*/
public function __construct(Worker $worker)
{
parent::__construct();

$this->worker = $worker;
}

/**
* Execute the console command.
*
* @return void
*/
public function fire()
{
if ($this->downForMaintenance() && $this->option('once')) {
return $this->worker->sleep($this->option('sleep'));
}

// We'll listen to the processed and failed events so we can write information
// to the console as jobs are processed, which will let the developer watch
// which jobs are coming through a queue and be informed on its progress.
$this->listenForEvents();

$connection = $this->argument('connection')
?: $this->laravel['config']['queue.default'];

// We need to get the right queue for the connection which is set in the queue
// configuration file for the application. We will pull it based on the set
// connection being run for the queue operation currently being executed.
$queue = $this->option('queue') ?: $this->laravel['config']->get(
"queue.connections.{$connection}.queue", 'default'
);

$response = $this->runWorker(
$connection, $queue
);
}

/**
* Run the worker instance.
*
* @param string $connection
* @param string $queue
* @return array
*/
protected function runWorker($connection, $queue)
{
$this->worker->setCache($this->laravel['cache']->driver());

return $this->worker->{$this->option('once') ? 'runNextJob' : 'daemon'}(
$connection, $queue, $this->gatherWorkerOptions()
);
}

/**
* Gather all of the queue worker options as a single object.
*
* @return \Illuminate\Queue\WorkerOptions
*/
protected function gatherWorkerOptions()
{
return new WorkerOptions(
$this->option('delay'), $this->option('memory'),
$this->option('timeout'), $this->option('sleep'),
$this->option('tries')
);
}

/**
* Listen for the queue events in order to update the console output.
*
* @return void
*/
protected function listenForEvents()
{
$this->laravel['events']->listen('illuminate.queue.looping', function () {
$this->output->writeln('.');
});

$this->laravel['events']->listen(JobProcessed::class, function ($event) {
$this->writeOutput($event->job, false);
});

$this->laravel['events']->listen(JobFailed::class, function ($event) {
$this->writeOutput($event->job, true);

$this->logFailedJob($event);
});
}

/**
* Write the status output for the queue worker.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param bool $failed
* @return void
*/
protected function writeOutput(Job $job, $failed)
{
if ($failed) {
$this->output->writeln('<error>['.Carbon::now()->format('Y-m-d H:i:s').'] Failed:</error> '.$job->resolveName());
} else {
$this->output->writeln('<info>['.Carbon::now()->format('Y-m-d H:i:s').'] Processed:</info> '.$job->resolveName());
}
}

/**
* Store a failed job event.
*
* @param JobFailed $event
* @return void
*/
protected function logFailedJob(JobFailed $event)
{
$this->laravel['queue.failer']->log(
$event->connectionName, $event->job->getQueue(),
$event->job->getRawBody(), $event->exception
);
}

/**
* Determine if the worker should run in maintenance mode.
*
* @return bool
*/
protected function downForMaintenance()
{
return $this->option('force') ? false : $this->laravel->isDownForMaintenance();
}

/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['connection', InputArgument::OPTIONAL, 'The name of connection', null],
];
}

/**
* Get the console command options.
*
* @return array
*/
protected function getOptions()
{
return [
['queue', null, InputOption::VALUE_OPTIONAL, 'The queue to listen on'],

['daemon', null, InputOption::VALUE_NONE, 'Run the worker in daemon mode (Deprecated)'],

['once', null, InputOption::VALUE_NONE, 'Only process the next job on the queue'],

['delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0],

['force', null, InputOption::VALUE_NONE, 'Force the worker to run even in maintenance mode'],

['memory', null, InputOption::VALUE_OPTIONAL, 'The memory limit in megabytes', 128],

['sleep', null, InputOption::VALUE_OPTIONAL, 'Number of seconds to sleep when no job is available', 3],

['timeout', null, InputOption::VALUE_OPTIONAL, 'The number of seconds a child process can run', 60],

['tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0],
];
}
}
7 changes: 0 additions & 7 deletions src/Illuminate/Queue/Console/ListenCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

namespace Illuminate\Queue\Console;

use RuntimeException;
use Illuminate\Queue\Listener;
use Illuminate\Console\Command;
use Symfony\Component\Console\Input\InputOption;
Expand Down Expand Up @@ -48,8 +47,6 @@ public function __construct(Listener $listener)
* Execute the console command.
*
* @return void
*
* @throws \RuntimeException
*/
public function fire()
{
Expand All @@ -66,10 +63,6 @@ public function fire()

$timeout = $this->input->getOption('timeout');

if ($timeout && ! function_exists('pcntl_fork')) {
throw new RuntimeException('Timeouts not supported without the pcntl_fork extension.');
}

// We need to get the right queue for the connection which is set in the queue
// configuration file for the application. We will pull it based on the set
// connection being run for the queue operation currently being executed.
Expand Down
Loading

0 comments on commit b949244

Please sign in to comment.