19
19
use Symfony \Component \Console \Input \InputOption ;
20
20
use Symfony \Component \Console \Output \ConsoleOutputInterface ;
21
21
use Symfony \Component \Console \Output \OutputInterface ;
22
+ use Symfony \Component \Console \SignalRegistry \SignalRegistry ;
22
23
use Symfony \Component \Console \Style \SymfonyStyle ;
23
24
use Symfony \Component \EventDispatcher \EventDispatcherInterface ;
24
25
use Symfony \Component \Messenger \Event \WorkerMessageReceivedEvent ;
26
+ use Symfony \Component \Messenger \Event \WorkerStartedEvent ;
25
27
use Symfony \Component \Messenger \EventListener \StopWorkerOnMessageLimitListener ;
26
28
use Symfony \Component \Messenger \MessageBusInterface ;
27
29
use Symfony \Component \Messenger \Stamp \MessageDecodingFailedStamp ;
@@ -165,6 +167,16 @@ private function runInteractive(string $failureTransportName, SymfonyStyle $io,
165
167
166
168
private function runWorker (string $ failureTransportName , ReceiverInterface $ receiver , SymfonyStyle $ io , bool $ shouldForce ): int
167
169
{
170
+ $ this ->eventDispatcher ->addListener (WorkerStartedEvent::class, $ signalsListener = function () {
171
+ if (!\defined ('SIGINT ' ) || !SignalRegistry::isSupported ()) {
172
+ return ;
173
+ }
174
+
175
+ foreach ([\SIGINT , \SIGTERM ] as $ signal ) {
176
+ pcntl_signal ($ signal , $ this ->getApplication ()->getSignalRegistry ()->handle (...));
177
+ }
178
+ });
179
+
168
180
$ count = 0 ;
169
181
$ listener = function (WorkerMessageReceivedEvent $ messageReceivedEvent ) use ($ io , $ receiver , $ shouldForce , &$ count ) {
170
182
++$ count ;
@@ -197,6 +209,7 @@ private function runWorker(string $failureTransportName, ReceiverInterface $rece
197
209
try {
198
210
$ worker ->run ();
199
211
} finally {
212
+ $ this ->eventDispatcher ->removeListener (WorkerStartedEvent::class, $ signalsListener );
200
213
$ this ->eventDispatcher ->removeListener (WorkerMessageReceivedEvent::class, $ listener );
201
214
}
202
215
0 commit comments