@@ -94,15 +94,15 @@ protected function configure(): void
94
94
*/
95
95
protected function interact (InputInterface $ input , OutputInterface $ output )
96
96
{
97
- $ style = new SymfonyStyle ($ input , $ output );
97
+ $ io = new SymfonyStyle ($ input , $ output instanceof ConsoleOutputInterface ? $ output -> getErrorOutput () : $ output );
98
98
99
99
if ($ this ->receiverNames && !$ this ->receiverLocator ->has ($ receiverName = $ input ->getArgument ('receiver ' ))) {
100
100
if (null === $ receiverName ) {
101
- $ style ->block ('Missing receiver argument. ' , null , 'error ' , ' ' , true );
102
- $ input ->setArgument ('receiver ' , $ style ->choice ('Select one of the available receivers ' , $ this ->receiverNames ));
101
+ $ io ->block ('Missing receiver argument. ' , null , 'error ' , ' ' , true );
102
+ $ input ->setArgument ('receiver ' , $ io ->choice ('Select one of the available receivers ' , $ this ->receiverNames ));
103
103
} elseif ($ alternatives = $ this ->findAlternatives ($ receiverName , $ this ->receiverNames )) {
104
- $ style ->block (sprintf ('Receiver "%s" is not defined. ' , $ receiverName ), null , 'error ' , ' ' , true );
105
- if ($ style ->confirm (sprintf ('Do you want to receive from "%s" instead? ' , $ alternatives [0 ]), false )) {
104
+ $ io ->block (sprintf ('Receiver "%s" is not defined. ' , $ receiverName ), null , 'error ' , ' ' , true );
105
+ if ($ io ->confirm (sprintf ('Do you want to receive from "%s" instead? ' , $ alternatives [0 ]), false )) {
106
106
$ input ->setArgument ('receiver ' , $ alternatives [0 ]);
107
107
}
108
108
}
@@ -111,17 +111,17 @@ protected function interact(InputInterface $input, OutputInterface $output)
111
111
$ busName = $ input ->getOption ('bus ' );
112
112
if ($ this ->busNames && !$ this ->busLocator ->has ($ busName )) {
113
113
if (null === $ busName ) {
114
- $ style ->block ('Missing bus argument. ' , null , 'error ' , ' ' , true );
115
- $ input ->setOption ('bus ' , $ style ->choice ('Select one of the available buses ' , $ this ->busNames ));
114
+ $ io ->block ('Missing bus argument. ' , null , 'error ' , ' ' , true );
115
+ $ input ->setOption ('bus ' , $ io ->choice ('Select one of the available buses ' , $ this ->busNames ));
116
116
} elseif ($ alternatives = $ this ->findAlternatives ($ busName , $ this ->busNames )) {
117
- $ style ->block (sprintf ('Bus "%s" is not defined. ' , $ busName ), null , 'error ' , ' ' , true );
117
+ $ io ->block (sprintf ('Bus "%s" is not defined. ' , $ busName ), null , 'error ' , ' ' , true );
118
118
119
119
if (1 === \count ($ alternatives )) {
120
- if ($ style ->confirm (sprintf ('Do you want to dispatch to "%s" instead? ' , $ alternatives [0 ]), true )) {
120
+ if ($ io ->confirm (sprintf ('Do you want to dispatch to "%s" instead? ' , $ alternatives [0 ]), true )) {
121
121
$ input ->setOption ('bus ' , $ alternatives [0 ]);
122
122
}
123
123
} else {
124
- $ input ->setOption ('bus ' , $ style ->choice ('Did you mean one of the following buses instead? ' , $ alternatives , $ alternatives [0 ]));
124
+ $ input ->setOption ('bus ' , $ io ->choice ('Did you mean one of the following buses instead? ' , $ alternatives , $ alternatives [0 ]));
125
125
}
126
126
}
127
127
}
@@ -143,18 +143,37 @@ protected function execute(InputInterface $input, OutputInterface $output): void
143
143
$ receiver = $ this ->receiverLocator ->get ($ receiverName );
144
144
$ bus = $ this ->busLocator ->get ($ busName );
145
145
146
+ $ stopsWhen = [];
146
147
if ($ limit = $ input ->getOption ('limit ' )) {
148
+ $ stopsWhen [] = "processed {$ limit } messages " ;
147
149
$ receiver = new StopWhenMessageCountIsExceededReceiver ($ receiver , $ limit , $ this ->logger );
148
150
}
149
151
150
152
if ($ memoryLimit = $ input ->getOption ('memory-limit ' )) {
153
+ $ stopsWhen [] = "exceeded {$ memoryLimit } of memory " ;
151
154
$ receiver = new StopWhenMemoryUsageIsExceededReceiver ($ receiver , $ this ->convertToBytes ($ memoryLimit ), $ this ->logger );
152
155
}
153
156
154
157
if ($ timeLimit = $ input ->getOption ('time-limit ' )) {
158
+ $ stopsWhen [] = "been running for {$ timeLimit }s " ;
155
159
$ receiver = new StopWhenTimeLimitIsReachedReceiver ($ receiver , $ timeLimit , $ this ->logger );
156
160
}
157
161
162
+ $ io = new SymfonyStyle ($ input , $ output instanceof ConsoleOutputInterface ? $ output ->getErrorOutput () : $ output );
163
+ $ io ->success (sprintf ('Consuming messages from transport "%s" on bus "%s". ' , $ receiverName , $ busName ));
164
+
165
+ if ($ stopsWhen ) {
166
+ $ last = array_pop ($ stopsWhen );
167
+ $ stopsWhen = ($ stopsWhen ? implode (', ' , $ stopsWhen ).' or ' : '' ).$ last ;
168
+ $ io ->comment ("The worker will automatically exit once it has {$ stopsWhen }. " );
169
+ }
170
+
171
+ $ io ->comment ('Quit the worker with CONTROL-C. ' );
172
+
173
+ if (!$ output ->isDebug ()) {
174
+ $ io ->comment ('Re-run the command with a -vvv option to see logs about consumed messages. ' );
175
+ }
176
+
158
177
$ worker = new Worker ($ receiver , $ bus );
159
178
$ worker ->run ();
160
179
}
@@ -171,7 +190,7 @@ private function convertToBytes(string $memoryLimit): int
171
190
$ max = (int ) $ max ;
172
191
}
173
192
174
- switch (substr ($ memoryLimit , -1 )) {
193
+ switch (substr (rtrim ( $ memoryLimit, ' b ' ) , -1 )) {
175
194
case 't ' : $ max *= 1024 ;
176
195
// no break
177
196
case 'g ' : $ max *= 1024 ;
0 commit comments