Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit c267c7c

Browse filesBrowse files
committed
[Messenger] Add worker metadata to worker lifecycle events
Fix changelog Use a separate class instead fix CS fix Fix typo Add tests CS fix
1 parent f1353d5 commit c267c7c
Copy full SHA for c267c7c

File tree

7 files changed

+119
-8
lines changed
Filter options

7 files changed

+119
-8
lines changed

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ CHANGELOG
55
---
66

77
* Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker.
8+
* Added `WorkerMetadata` class which allows you to access the configuration details of a worker, like `queueNames` and `transportNames` it consumes from.
9+
* New optional argument `WorkerMetadata $workerMetadata` was added to `WorkerStartedEvent`,`WorkerRunningEvent` and `WorkerStoppedEvent` event classes.
10+
* New method `getWorkerMetadata()` was added to `WorkerStartedEvent`,`WorkerRunningEvent` and `WorkerStoppedEvent` event classes which returns the `WorkerMetadata` object.
811

912
5.3
1013
---

‎src/Symfony/Component/Messenger/Event/WorkerRunningEvent.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Event/WorkerRunningEvent.php
+9-1Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Event;
1313

1414
use Symfony\Component\Messenger\Worker;
15+
use Symfony\Component\Messenger\WorkerMetadata;
1516

1617
/**
1718
* Dispatched after the worker processed a message or didn't receive a message at all.
@@ -22,11 +23,13 @@ final class WorkerRunningEvent
2223
{
2324
private $worker;
2425
private $isWorkerIdle;
26+
private $workerMetadata;
2527

26-
public function __construct(Worker $worker, bool $isWorkerIdle)
28+
public function __construct(Worker $worker, bool $isWorkerIdle, WorkerMetadata $workerMetadata = null)
2729
{
2830
$this->worker = $worker;
2931
$this->isWorkerIdle = $isWorkerIdle;
32+
$this->workerMetadata = $workerMetadata ?? new WorkerMetadata([]);
3033
}
3134

3235
public function getWorker(): Worker
@@ -41,4 +44,9 @@ public function isWorkerIdle(): bool
4144
{
4245
return $this->isWorkerIdle;
4346
}
47+
48+
public function getWorkerMetadata(): WorkerMetadata
49+
{
50+
return $this->workerMetadata;
51+
}
4452
}

‎src/Symfony/Component/Messenger/Event/WorkerStartedEvent.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Event/WorkerStartedEvent.php
+9-1Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Event;
1313

1414
use Symfony\Component\Messenger\Worker;
15+
use Symfony\Component\Messenger\WorkerMetadata;
1516

1617
/**
1718
* Dispatched when a worker has been started.
@@ -21,14 +22,21 @@
2122
final class WorkerStartedEvent
2223
{
2324
private $worker;
25+
private $workerMetadata;
2426

25-
public function __construct(Worker $worker)
27+
public function __construct(Worker $worker, WorkerMetadata $workerMetadata = null)
2628
{
2729
$this->worker = $worker;
30+
$this->workerMetadata = $workerMetadata ?? new WorkerMetadata([]);
2831
}
2932

3033
public function getWorker(): Worker
3134
{
3235
return $this->worker;
3336
}
37+
38+
public function getWorkerMetadata(): WorkerMetadata
39+
{
40+
return $this->workerMetadata;
41+
}
3442
}

‎src/Symfony/Component/Messenger/Event/WorkerStoppedEvent.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Event/WorkerStoppedEvent.php
+9-1Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Event;
1313

1414
use Symfony\Component\Messenger\Worker;
15+
use Symfony\Component\Messenger\WorkerMetadata;
1516

1617
/**
1718
* Dispatched when a worker has been stopped.
@@ -21,14 +22,21 @@
2122
final class WorkerStoppedEvent
2223
{
2324
private $worker;
25+
private $workerMetadata;
2426

25-
public function __construct(Worker $worker)
27+
public function __construct(Worker $worker, WorkerMetadata $workerMetadata = null)
2628
{
2729
$this->worker = $worker;
30+
$this->workerMetadata = $workerMetadata ?? new WorkerMetadata([]);
2831
}
2932

3033
public function getWorker(): Worker
3134
{
3235
return $this->worker;
3336
}
37+
38+
public function getWorkerMetadata(): WorkerMetadata
39+
{
40+
return $this->workerMetadata;
41+
}
3442
}

‎src/Symfony/Component/Messenger/Tests/WorkerTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/WorkerTest.php
+36Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,34 @@ public function testWorkerDispatchesEventsOnError()
167167
$worker->run();
168168
}
169169

170+
public function testWorkerEventsContainMetadata()
171+
{
172+
$envelope = new Envelope(new DummyMessage('Hello'));
173+
$receiver = new DummyQueueReceiver([[$envelope]]);
174+
175+
$bus = $this->createMock(MessageBusInterface::class);
176+
$bus->method('dispatch')->willReturn($envelope);
177+
178+
$dispatcher = new EventDispatcher();
179+
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) {
180+
$this->assertEquals(['dummyReceiver'], $event->getWorkerMetadata()->getTransportNames());
181+
$this->assertEquals(['queue1', 'queue2'], $event->getWorkerMetadata()->getQueueNames());
182+
183+
$event->getWorker()->stop();
184+
});
185+
$dispatcher->addListener(WorkerStartedEvent::class, function (WorkerStartedEvent $event) {
186+
$this->assertEquals(['dummyReceiver'], $event->getWorkerMetadata()->getTransportNames());
187+
$this->assertEquals(['queue1', 'queue2'], $event->getWorkerMetadata()->getQueueNames());
188+
});
189+
$dispatcher->addListener(WorkerStoppedEvent::class, function (WorkerStoppedEvent $event) {
190+
$this->assertEquals(['dummyReceiver'], $event->getWorkerMetadata()->getTransportNames());
191+
$this->assertEquals(['queue1', 'queue2'], $event->getWorkerMetadata()->getQueueNames());
192+
});
193+
194+
$worker = new Worker(['dummyReceiver' => $receiver], $bus, $dispatcher);
195+
$worker->run(['queues' => ['queue1', 'queue2']]);
196+
}
197+
170198
public function testTimeoutIsConfigurable()
171199
{
172200
$apiMessage = new DummyMessage('API');
@@ -359,3 +387,11 @@ public function getAcknowledgedEnvelopes(): array
359387
return $this->acknowledgedEnvelopes;
360388
}
361389
}
390+
391+
class DummyQueueReceiver extends DummyReceiver implements QueueReceiverInterface
392+
{
393+
public function getFromQueues(array $queueNames): iterable
394+
{
395+
return $this->get();
396+
}
397+
}

‎src/Symfony/Component/Messenger/Worker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Worker.php
+10-5Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,18 @@ public function __construct(array $receivers, MessageBusInterface $bus, EventDis
6363
*/
6464
public function run(array $options = []): void
6565
{
66-
$this->dispatchEvent(new WorkerStartedEvent($this));
67-
6866
$options = array_merge([
6967
'sleep' => 1000000,
7068
], $options);
7169
$queueNames = $options['queues'] ?? false;
7270

71+
$workerMetadata = new WorkerMetadata([
72+
'queueNames' => $queueNames,
73+
'transportNames' => array_keys($this->receivers),
74+
]);
75+
76+
$this->dispatchEvent(new WorkerStartedEvent($this, $workerMetadata));
77+
7378
if ($queueNames) {
7479
// if queue names are specified, all receivers must implement the QueueReceiverInterface
7580
foreach ($this->receivers as $transportName => $receiver) {
@@ -92,7 +97,7 @@ public function run(array $options = []): void
9297
$envelopeHandled = true;
9398

9499
$this->handleMessage($envelope, $receiver, $transportName);
95-
$this->dispatchEvent(new WorkerRunningEvent($this, false));
100+
$this->dispatchEvent(new WorkerRunningEvent($this, false, $workerMetadata));
96101

97102
if ($this->shouldStop) {
98103
break 2;
@@ -108,13 +113,13 @@ public function run(array $options = []): void
108113
}
109114

110115
if (false === $envelopeHandled) {
111-
$this->dispatchEvent(new WorkerRunningEvent($this, true));
116+
$this->dispatchEvent(new WorkerRunningEvent($this, true, $workerMetadata));
112117

113118
usleep($options['sleep']);
114119
}
115120
}
116121

117-
$this->dispatchEvent(new WorkerStoppedEvent($this));
122+
$this->dispatchEvent(new WorkerStoppedEvent($this, $workerMetadata));
118123
}
119124

120125
private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName): void
+43Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
final class WorkerMetadata
15+
{
16+
private $queueNames;
17+
private $transportNames;
18+
19+
public function __construct(array $metadata)
20+
{
21+
$this->queueNames = $metadata['queueNames'] ?? false;
22+
$this->transportNames = $metadata['transportNames'] ?? [];
23+
}
24+
25+
/**
26+
* Returns the queue names the worker consumes from, if "--queues" option was used.
27+
* Returns false otherwise.
28+
*
29+
* @return array|false
30+
*/
31+
public function getQueueNames()
32+
{
33+
return $this->queueNames;
34+
}
35+
36+
/**
37+
* Returns an array of unique identifiers for transport receivers the worker consumes from.
38+
*/
39+
public function getTransportNames(): array
40+
{
41+
return $this->transportNames;
42+
}
43+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.