16
16
use Pheanstalk \Contract \PheanstalkSubscriberInterface ;
17
17
use Pheanstalk \Contract \SocketFactoryInterface ;
18
18
use Pheanstalk \Exception ;
19
+ use Pheanstalk \Exception \ConnectionException ;
19
20
use Pheanstalk \Pheanstalk ;
20
21
use Pheanstalk \Values \Job as PheanstalkJob ;
21
22
use Pheanstalk \Values \JobId ;
@@ -137,19 +138,17 @@ public function send(string $body, array $headers, int $delay = 0, ?int $priorit
137
138
throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
138
139
}
139
140
140
- try {
141
+ return $ this -> withReconnect ( function () use ( $ message , $ delay , $ priority ) {
141
142
$ this ->client ->useTube ($ this ->tube );
142
143
$ job = $ this ->client ->put (
143
144
$ message ,
144
145
$ priority ?? PheanstalkPublisherInterface::DEFAULT_PRIORITY ,
145
146
(int ) ($ delay / 1000 ),
146
147
$ this ->ttr
147
148
);
148
- } catch (Exception $ exception ) {
149
- throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
150
- }
151
149
152
- return $ job ->getId ();
150
+ return $ job ->getId ();
151
+ });
153
152
}
154
153
155
154
public function get (): ?array
@@ -177,7 +176,7 @@ public function get(): ?array
177
176
178
177
private function getFromTube (): ?PheanstalkJob
179
178
{
180
- try {
179
+ return $ this -> withReconnect ( function () {
181
180
if ($ this ->client ->watch ($ this ->tube ) > 1 ) {
182
181
foreach ($ this ->client ->listTubesWatched () as $ tube ) {
183
182
if ((string ) $ tube !== (string ) $ this ->tube ) {
@@ -187,66 +186,69 @@ private function getFromTube(): ?PheanstalkJob
187
186
}
188
187
189
188
return $ this ->client ->reserveWithTimeout ($ this ->timeout );
190
- } catch (Exception $ exception ) {
191
- throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
192
- }
189
+ });
193
190
}
194
191
195
192
public function ack (string $ id ): void
196
193
{
197
- try {
194
+ $ this -> withReconnect ( function () use ( $ id ) {
198
195
$ this ->client ->useTube ($ this ->tube );
199
196
$ this ->client ->delete (new JobId ($ id ));
200
- } catch (Exception $ exception ) {
201
- throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
202
- }
197
+ });
203
198
}
204
199
205
200
public function reject (string $ id , ?int $ priority = null , bool $ forceDelete = false ): void
206
201
{
207
- try {
202
+ $ this -> withReconnect ( function () use ( $ id , $ priority , $ forceDelete ) {
208
203
$ this ->client ->useTube ($ this ->tube );
209
204
210
205
if (!$ forceDelete && $ this ->buryOnReject ) {
211
206
$ this ->client ->bury (new JobId ($ id ), $ priority ?? PheanstalkPublisherInterface::DEFAULT_PRIORITY );
212
207
} else {
213
208
$ this ->client ->delete (new JobId ($ id ));
214
209
}
215
- } catch (Exception $ exception ) {
216
- throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
217
- }
210
+ });
218
211
}
219
212
220
213
public function keepalive (string $ id ): void
221
214
{
222
- try {
215
+ $ this -> withReconnect ( function () use ( $ id ) {
223
216
$ this ->client ->useTube ($ this ->tube );
224
217
$ this ->client ->touch (new JobId ($ id ));
225
- } catch (Exception $ exception ) {
226
- throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
227
- }
218
+ });
228
219
}
229
220
230
221
public function getMessageCount (): int
231
222
{
232
- try {
223
+ return $ this -> withReconnect ( function () {
233
224
$ this ->client ->useTube ($ this ->tube );
234
225
$ tubeStats = $ this ->client ->statsTube ($ this ->tube );
235
- } catch (Exception $ exception ) {
236
- throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
237
- }
238
226
239
- return $ tubeStats ->currentJobsReady ;
227
+ return $ tubeStats ->currentJobsReady ;
228
+ });
240
229
}
241
230
242
231
public function getMessagePriority (string $ id ): int
243
232
{
244
- try {
233
+ return $ this -> withReconnect ( function () use ( $ id ) {
245
234
$ jobStats = $ this ->client ->statsJob (new JobId ($ id ));
235
+
236
+ return $ jobStats ->priority ;
237
+ });
238
+ }
239
+
240
+ private function withReconnect (callable $ command ): mixed
241
+ {
242
+ try {
243
+ try {
244
+ return $ command ();
245
+ } catch (ConnectionException ) {
246
+ $ this ->client ->disconnect ();
247
+
248
+ return $ command ();
249
+ }
246
250
} catch (Exception $ exception ) {
247
251
throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
248
252
}
249
-
250
- return $ jobStats ->priority ;
251
253
}
252
254
}
0 commit comments