Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit bc7fad5

Browse filesBrowse files
committed
Add reconnect
1 parent b8fa38c commit bc7fad5
Copy full SHA for bc7fad5

File tree

1 file changed

+31
-29
lines changed
Filter options
  • src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport

1 file changed

+31
-29
lines changed

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php
+31-29Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Pheanstalk\Contract\PheanstalkSubscriberInterface;
1717
use Pheanstalk\Contract\SocketFactoryInterface;
1818
use Pheanstalk\Exception;
19+
use Pheanstalk\Exception\ConnectionException;
1920
use Pheanstalk\Pheanstalk;
2021
use Pheanstalk\Values\Job as PheanstalkJob;
2122
use Pheanstalk\Values\JobId;
@@ -137,19 +138,17 @@ public function send(string $body, array $headers, int $delay = 0, ?int $priorit
137138
throw new TransportException($exception->getMessage(), 0, $exception);
138139
}
139140

140-
try {
141+
return $this->withReconnect(function () use ($message, $delay, $priority) {
141142
$this->client->useTube($this->tube);
142143
$job = $this->client->put(
143144
$message,
144145
$priority ?? PheanstalkPublisherInterface::DEFAULT_PRIORITY,
145146
(int) ($delay / 1000),
146147
$this->ttr
147148
);
148-
} catch (Exception $exception) {
149-
throw new TransportException($exception->getMessage(), 0, $exception);
150-
}
151149

152-
return $job->getId();
150+
return $job->getId();
151+
});
153152
}
154153

155154
public function get(): ?array
@@ -177,7 +176,7 @@ public function get(): ?array
177176

178177
private function getFromTube(): ?PheanstalkJob
179178
{
180-
try {
179+
return $this->withReconnect(function () {
181180
if ($this->client->watch($this->tube) > 1) {
182181
foreach ($this->client->listTubesWatched() as $tube) {
183182
if ((string) $tube !== (string) $this->tube) {
@@ -187,66 +186,69 @@ private function getFromTube(): ?PheanstalkJob
187186
}
188187

189188
return $this->client->reserveWithTimeout($this->timeout);
190-
} catch (Exception $exception) {
191-
throw new TransportException($exception->getMessage(), 0, $exception);
192-
}
189+
});
193190
}
194191

195192
public function ack(string $id): void
196193
{
197-
try {
194+
$this->withReconnect(function () use ($id) {
198195
$this->client->useTube($this->tube);
199196
$this->client->delete(new JobId($id));
200-
} catch (Exception $exception) {
201-
throw new TransportException($exception->getMessage(), 0, $exception);
202-
}
197+
});
203198
}
204199

205200
public function reject(string $id, ?int $priority = null, bool $forceDelete = false): void
206201
{
207-
try {
202+
$this->withReconnect(function () use ($id, $priority, $forceDelete) {
208203
$this->client->useTube($this->tube);
209204

210205
if (!$forceDelete && $this->buryOnReject) {
211206
$this->client->bury(new JobId($id), $priority ?? PheanstalkPublisherInterface::DEFAULT_PRIORITY);
212207
} else {
213208
$this->client->delete(new JobId($id));
214209
}
215-
} catch (Exception $exception) {
216-
throw new TransportException($exception->getMessage(), 0, $exception);
217-
}
210+
});
218211
}
219212

220213
public function keepalive(string $id): void
221214
{
222-
try {
215+
$this->withReconnect(function () use ($id) {
223216
$this->client->useTube($this->tube);
224217
$this->client->touch(new JobId($id));
225-
} catch (Exception $exception) {
226-
throw new TransportException($exception->getMessage(), 0, $exception);
227-
}
218+
});
228219
}
229220

230221
public function getMessageCount(): int
231222
{
232-
try {
223+
return $this->withReconnect(function () {
233224
$this->client->useTube($this->tube);
234225
$tubeStats = $this->client->statsTube($this->tube);
235-
} catch (Exception $exception) {
236-
throw new TransportException($exception->getMessage(), 0, $exception);
237-
}
238226

239-
return $tubeStats->currentJobsReady;
227+
return $tubeStats->currentJobsReady;
228+
});
240229
}
241230

242231
public function getMessagePriority(string $id): int
243232
{
244-
try {
233+
return $this->withReconnect(function () use ($id) {
245234
$jobStats = $this->client->statsJob(new JobId($id));
235+
236+
return $jobStats->priority;
237+
});
238+
}
239+
240+
private function withReconnect(callable $command): mixed
241+
{
242+
try {
243+
try {
244+
return $command();
245+
} catch (ConnectionException) {
246+
$this->client->disconnect();
247+
248+
return $command();
249+
}
246250
} catch (Exception $exception) {
247251
throw new TransportException($exception->getMessage(), 0, $exception);
248252
}
249-
250-
return $jobStats->priority;
251253
}
252254
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.