2 namespace Drush\Drupal\Commands\core;
4 use Consolidation\AnnotatedCommand\CommandData;
5 use Consolidation\AnnotatedCommand\CommandError;
6 use Consolidation\OutputFormatters\StructuredData\RowsOfFields;
7 use Drupal\Core\Queue\QueueFactory;
8 use Drupal\Core\Queue\QueueInterface;
9 use Drupal\Core\Queue\QueueWorkerManagerInterface;
10 use Drupal\Core\Queue\RequeueException;
11 use Drupal\Core\Queue\SuspendQueueException;
12 use Drush\Commands\DrushCommands;
14 class QueueCommands extends DrushCommands
18 * @var \Drupal\Core\Queue\QueueWorkerManager
20 protected $workerManager;
22 protected $queueService;
24 public function __construct(QueueWorkerManagerInterface $workerManager, QueueFactory $queueService)
26 $this->workerManager = $workerManager;
27 $this->queueService = $queueService;
31 * @return \Drupal\Core\Queue\QueueWorkerManager
33 public function getWorkerManager()
35 return $this->workerManager;
39 * @return \Drupal\Core\Queue\QueueFactory
41 public function getQueueService()
43 return $this->queueService;
47 * Keep track of queue definitions.
51 protected static $queues;
54 * Run a specific queue by name.
58 * @param string $name The name of the queue to run, as defined in either hook_queue_info or hook_cron_queue_info.
59 * @validate-queue name
60 * @option time-limit The maximum number of seconds allowed to run the queue
62 public function run($name, $options = ['time-limit' => self::REQ])
64 $time_limit = (int) $options['time-limit'];
65 $start = microtime(true);
66 $worker = $this->getWorkerManager()->createInstance($name);
67 $end = time() + $time_limit;
68 $queue = $this->getQueue($name);
71 while ((!$time_limit || time() < $end) && ($item = $queue->claimItem())) {
73 $this->logger()->info(dt('Processing item @id from @name queue.', ['@name' => $name, '@id' => $item->item_id]));
74 $worker->processItem($item->data);
75 $queue->deleteItem($item);
77 } catch (RequeueException $e) {
78 // The worker requested the task to be immediately requeued.
79 $queue->releaseItem($item);
80 } catch (SuspendQueueException $e) {
81 // If the worker indicates there is a problem with the whole queue,
83 $queue->releaseItem($item);
84 throw new \Exception($e->getMessage());
87 $elapsed = microtime(true) - $start;
88 $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $count, '@name' => $name, '@elapsed' => round($elapsed, 2)]));
92 * Returns a list of all defined queues.
101 * @return \Consolidation\OutputFormatters\StructuredData\RowsOfFields
103 public function qList($options = ['format' => 'table'])
106 foreach (array_keys($this->getQueues()) as $name) {
107 $q = $this->getQueue($name);
110 'items' => $q->numberOfItems(),
111 'class' => get_class($q),
114 return new RowsOfFields($result);
118 * Delete all items in a specific queue.
120 * @command queue:delete
121 * @aliases queue-delete
122 * @param $name The name of the queue to run, as defined in either hook_queue_info or hook_cron_queue_info.
123 * @validate-queue name
125 public function delete($name)
127 $queue = $this->getQueue($name);
128 $queue->deleteQueue();
129 $this->logger()->success(dt('All items in @name queue deleted.', ['@name' => $name]));
133 * Validate that queue permission exists.
135 * Annotation value should be the name of the argument/option containing the name.
137 * @hook validate @validate-queue
138 * @param \Consolidation\AnnotatedCommand\CommandData $commandData
139 * @return \Consolidation\AnnotatedCommand\CommandError|null
141 public function validateQueueName(CommandData $commandData)
143 $arg_name = $commandData->annotationData()->get('validate-queue', null);
144 $name = $commandData->input()->getArgument($arg_name);
145 $all = array_keys(self::getQueues());
146 if (!in_array($name, $all)) {
147 $msg = dt('Queue not found: !name', ['!name' => $name]);
148 return new CommandError($msg);
155 public function getQueues()
157 if (!isset(static::$queues)) {
158 static::$queues = [];
159 foreach ($this->getWorkerManager()->getDefinitions() as $name => $info) {
160 static::$queues[$name] = $info;
163 return static::$queues;
169 * @return \Drupal\Core\Queue\QueueInterface
171 public function getQueue($name)
173 return $this->getQueueService()->get($name);