Updated Drupal to 8.6. This goes with the following updates because it's possible...
[yaffs-website] / vendor / drush / drush / src / Drupal / Commands / core / QueueCommands.php
1 <?php
2 namespace Drush\Drupal\Commands\core;
3
4 use Consolidation\AnnotatedCommand\CommandData;
5 use Consolidation\AnnotatedCommand\CommandError;
6 use Consolidation\OutputFormatters\StructuredData\RowsOfFields;
7 use Drupal\Core\Queue\QueueFactory;
8 use Drupal\Core\Queue\QueueInterface;
9 use Drupal\Core\Queue\QueueWorkerManagerInterface;
10 use Drupal\Core\Queue\RequeueException;
11 use Drupal\Core\Queue\SuspendQueueException;
12 use Drush\Commands\DrushCommands;
13
14 class QueueCommands extends DrushCommands
15 {
16
17     /**
18      * @var \Drupal\Core\Queue\QueueWorkerManager
19      */
20     protected $workerManager;
21
22     protected $queueService;
23
24     public function __construct(QueueWorkerManagerInterface $workerManager, QueueFactory $queueService)
25     {
26         $this->workerManager = $workerManager;
27         $this->queueService = $queueService;
28     }
29
30     /**
31      * @return \Drupal\Core\Queue\QueueWorkerManager
32      */
33     public function getWorkerManager()
34     {
35         return $this->workerManager;
36     }
37
38     /**
39      * @return \Drupal\Core\Queue\QueueFactory
40      */
41     public function getQueueService()
42     {
43         return $this->queueService;
44     }
45
46     /**
47      * Keep track of queue definitions.
48      *
49      * @var array
50      */
51     protected static $queues;
52
53     /**
54      * Run a specific queue by name.
55      *
56      * @command queue:run
57      * @aliases queue-run
58      * @param string $name The name of the queue to run, as defined in either hook_queue_info or hook_cron_queue_info.
59      * @validate-queue name
60      * @option time-limit The maximum number of seconds allowed to run the queue
61      */
62     public function run($name, $options = ['time-limit' => self::REQ])
63     {
64         $time_limit = (int) $options['time-limit'];
65         $start = microtime(true);
66         $worker = $this->getWorkerManager()->createInstance($name);
67         $end = time() + $time_limit;
68         $queue = $this->getQueue($name);
69         $count = 0;
70
71         while ((!$time_limit || time() < $end) && ($item = $queue->claimItem())) {
72             try {
73                 $this->logger()->info(dt('Processing item @id from @name queue.', ['@name' => $name, '@id' => $item->item_id]));
74                 $worker->processItem($item->data);
75                 $queue->deleteItem($item);
76                 $count++;
77             } catch (RequeueException $e) {
78                 // The worker requested the task to be immediately requeued.
79                 $queue->releaseItem($item);
80             } catch (SuspendQueueException $e) {
81                 // If the worker indicates there is a problem with the whole queue,
82                 // release the item.
83                 $queue->releaseItem($item);
84                 throw new \Exception($e->getMessage());
85             }
86         }
87         $elapsed = microtime(true) - $start;
88         $this->logger()->success(dt('Processed @count items from the @name queue in @elapsed sec.', ['@count' => $count, '@name' => $name, '@elapsed' => round($elapsed, 2)]));
89     }
90
91     /**
92      * Returns a list of all defined queues.
93      *
94      * @command queue:list
95      * @aliases queue-list
96      * @field-labels
97      *   queue: Queue
98      *   items: Items
99      *   class: Class
100      *
101      * @return \Consolidation\OutputFormatters\StructuredData\RowsOfFields
102      */
103     public function qList($options = ['format' => 'table'])
104     {
105         $result = [];
106         foreach (array_keys($this->getQueues()) as $name) {
107             $q = $this->getQueue($name);
108             $result[$name] = [
109             'queue' => $name,
110             'items' => $q->numberOfItems(),
111             'class' => get_class($q),
112             ];
113         }
114         return new RowsOfFields($result);
115     }
116
117     /**
118      * Delete all items in a specific queue.
119      *
120      * @command queue:delete
121      * @aliases queue-delete
122      * @param $name The name of the queue to run, as defined in either hook_queue_info or hook_cron_queue_info.
123      * @validate-queue name
124      */
125     public function delete($name)
126     {
127         $queue = $this->getQueue($name);
128         $queue->deleteQueue();
129         $this->logger()->success(dt('All items in @name queue deleted.', ['@name' => $name]));
130     }
131
132     /**
133      * Validate that queue permission exists.
134      *
135      * Annotation value should be the name of the argument/option containing the name.
136      *
137      * @hook validate @validate-queue
138      * @param \Consolidation\AnnotatedCommand\CommandData $commandData
139      * @return \Consolidation\AnnotatedCommand\CommandError|null
140      */
141     public function validateQueueName(CommandData $commandData)
142     {
143         $arg_name = $commandData->annotationData()->get('validate-queue', null);
144         $name = $commandData->input()->getArgument($arg_name);
145         $all = array_keys(self::getQueues());
146         if (!in_array($name, $all)) {
147             $msg = dt('Queue not found: !name', ['!name' => $name]);
148             return new CommandError($msg);
149         }
150     }
151
152     /**
153      * {@inheritdoc}
154      */
155     public function getQueues()
156     {
157         if (!isset(static::$queues)) {
158             static::$queues = [];
159             foreach ($this->getWorkerManager()->getDefinitions() as $name => $info) {
160                 static::$queues[$name] = $info;
161             }
162         }
163         return static::$queues;
164     }
165
166     /**
167      * {@inheritdoc}
168      *
169      * @return \Drupal\Core\Queue\QueueInterface
170      */
171     public function getQueue($name)
172     {
173         return $this->getQueueService()->get($name);
174     }
175 }