Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit bf89cd6

Browse filesBrowse files
committed
Making sleep configurable
1 parent 19a3c7f commit bf89cd6
Copy full SHA for bf89cd6

7 files changed

+63
-18
lines changed

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+4-1Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ protected function configure(): void
7474
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
7575
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
7676
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
77+
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
7778
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
7879
])
7980
->setDescription('Consumes messages')
@@ -184,7 +185,9 @@ protected function execute(InputInterface $input, OutputInterface $output): void
184185
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
185186
}
186187

187-
$worker->run();
188+
$worker->run([
189+
'sleep' => $input->getOption('sleep') * 1000000
190+
]);
188191
}
189192

190193
private function convertToBytes(string $memoryLimit): int

‎src/Symfony/Component/Messenger/Tests/WorkerTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/WorkerTest.php
+44-8Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public function testWorkerDispatchTheReceivedMessage()
4444
$bus->expects($this->at(1))->method('dispatch')->with($envelope = new Envelope($ipaMessage, new ReceivedStamp()))->willReturn($envelope);
4545

4646
$worker = new Worker($receiver, $bus, 'receiver_id');
47-
$worker->run(function (?Envelope $envelope) use ($worker) {
47+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
4848
// stop after the messages finish
4949
if (null === $envelope) {
5050
$worker->stop();
@@ -65,7 +65,7 @@ public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
6565
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
6666

6767
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
68-
$worker->run(function (?Envelope $envelope) use ($worker) {
68+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
6969
// stop after the messages finish
7070
if (null === $envelope) {
7171
$worker->stop();
@@ -101,7 +101,7 @@ public function testDispatchCausesRetry()
101101
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);
102102

103103
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
104-
$worker->run(function (?Envelope $envelope) use ($worker) {
104+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
105105
// stop after the messages finish
106106
if (null === $envelope) {
107107
$worker->stop();
@@ -125,7 +125,7 @@ public function testDispatchCausesRejectWhenNoRetry()
125125
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false);
126126

127127
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
128-
$worker->run(function (?Envelope $envelope) use ($worker) {
128+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
129129
// stop after the messages finish
130130
if (null === $envelope) {
131131
$worker->stop();
@@ -148,7 +148,7 @@ public function testDispatchCausesRejectOnUnrecoverableMessage()
148148
$retryStrategy->expects($this->never())->method('isRetryable');
149149

150150
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
151-
$worker->run(function (?Envelope $envelope) use ($worker) {
151+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
152152
// stop after the messages finish
153153
if (null === $envelope) {
154154
$worker->stop();
@@ -168,7 +168,7 @@ public function testWorkerDoesNotSendNullMessagesToTheBus()
168168
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
169169

170170
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy);
171-
$worker->run(function (?Envelope $envelope) use ($worker) {
171+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
172172
// stop after the messages finish
173173
if (null === $envelope) {
174174
$worker->stop();
@@ -195,7 +195,7 @@ public function testWorkerDispatchesEventsOnSuccess()
195195
);
196196

197197
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher);
198-
$worker->run(function (?Envelope $envelope) use ($worker) {
198+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
199199
// stop after the messages finish
200200
if (null === $envelope) {
201201
$worker->stop();
@@ -223,13 +223,49 @@ public function testWorkerDispatchesEventsOnError()
223223
);
224224

225225
$worker = new Worker($receiver, $bus, 'receiver_id', $retryStrategy, $eventDispatcher);
226-
$worker->run(function (?Envelope $envelope) use ($worker) {
226+
$worker->run(['sleep' => 0], function (?Envelope $envelope) use ($worker) {
227227
// stop after the messages finish
228228
if (null === $envelope) {
229229
$worker->stop();
230230
}
231231
});
232232
}
233+
234+
public function testTimeoutIsConfigurable()
235+
{
236+
$apiMessage = new DummyMessage('API');
237+
$receiver = new DummyReceiver([
238+
[new Envelope($apiMessage), new Envelope($apiMessage)],
239+
null, // will cause a wait
240+
null, // will cause a wait
241+
[new Envelope($apiMessage)],
242+
[new Envelope($apiMessage)],
243+
null, // will cause a wait
244+
[new Envelope($apiMessage)],
245+
]);
246+
247+
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
248+
249+
$worker = new Worker($receiver, $bus, 'receiver_id');
250+
$receivedCount = 0;
251+
$startTime = microtime(true);
252+
// sleep .1 after each idle
253+
$worker->run(['sleep' => 100000], function (?Envelope $envelope) use ($worker, &$receivedCount, $startTime) {
254+
if (null !== $envelope) {
255+
$receivedCount++;
256+
}
257+
258+
if (5 === $receivedCount) {
259+
$worker->stop();
260+
$duration = microtime(true) - $startTime;
261+
262+
// wait time should be .3 seconds, so execution should
263+
// be only a bit more than that
264+
$this->assertGreaterThanOrEqual(.3, $duration);
265+
$this->assertLessThan(.5, $duration);
266+
}
267+
});
268+
}
233269
}
234270

235271
class DummyReceiver implements ReceiverInterface

‎src/Symfony/Component/Messenger/Worker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Worker.php
+6-2Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,12 @@ public function __construct(ReceiverInterface $receiver, MessageBusInterface $bu
6060
/**
6161
* Receive the messages and dispatch them to the bus.
6262
*/
63-
public function run(callable $onHandledCallback = null): void
63+
public function run(array $options = [], callable $onHandledCallback = null): void
6464
{
65+
$options = array_merge([
66+
'sleep' => 1000000,
67+
], $options);
68+
6569
if (\function_exists('pcntl_signal')) {
6670
pcntl_signal(SIGTERM, function () {
6771
$this->stop();
@@ -92,7 +96,7 @@ public function run(callable $onHandledCallback = null): void
9296
if (false === $envelopeHandled) {
9397
$handled(null);
9498

95-
usleep(1000000);
99+
usleep($options['sleep']);
96100
}
97101
}
98102
}

‎src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Worker/StopWhenMemoryUsageIsExceededWorker.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ public function __construct(WorkerInterface $decoratedWorker, int $memoryLimit,
3737
};
3838
}
3939

40-
public function run(callable $onHandledCallback = null): void
40+
public function run(array $options = [], callable $onHandledCallback = null): void
4141
{
42-
$this->decoratedWorker->run(function (?Envelope $envelope) use ($onHandledCallback) {
42+
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback) {
4343
if (null !== $onHandledCallback) {
4444
$onHandledCallback($envelope);
4545
}

‎src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Worker/StopWhenMessageCountIsExceededWorker.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ public function __construct(WorkerInterface $decoratedWorker, int $maximumNumber
3333
$this->logger = $logger;
3434
}
3535

36-
public function run(callable $onHandledCallback = null): void
36+
public function run(array $options = [], callable $onHandledCallback = null): void
3737
{
3838
$receivedMessages = 0;
3939

40-
$this->decoratedWorker->run(function (?Envelope $envelope) use ($onHandledCallback, &$receivedMessages) {
40+
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, &$receivedMessages) {
4141
if (null !== $onHandledCallback) {
4242
$onHandledCallback($envelope);
4343
}

‎src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Worker/StopWhenTimeLimitIsReachedWorker.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ public function __construct(WorkerInterface $decoratedWorker, int $timeLimitInSe
3333
$this->logger = $logger;
3434
}
3535

36-
public function run(callable $onHandledCallback = null): void
36+
public function run(array $options = [], callable $onHandledCallback = null): void
3737
{
3838
$startTime = microtime(true);
3939
$endTime = $startTime + $this->timeLimitInSeconds;
4040

41-
$this->decoratedWorker->run(function (?Envelope $envelope) use ($onHandledCallback, $endTime) {
41+
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $endTime) {
4242
if (null !== $onHandledCallback) {
4343
$onHandledCallback($envelope);
4444
}

‎src/Symfony/Component/Messenger/WorkerInterface.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/WorkerInterface.php
+3-1Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ interface WorkerInterface
2424
*
2525
* The $onHandledCallback will be passed the Envelope that was just
2626
* handled or null if nothing was handled.
27+
*
28+
* @param mixed[] $options Options used to control worker behavior.
2729
*/
28-
public function run(callable $onHandledCallback = null): void;
30+
public function run(array $options = [], callable $onHandledCallback = null): void;
2931

3032
/**
3133
* Stop receiving messages.

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.