Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e8d89e1

Browse filesBrowse files
committed
BC path
1 parent ab94974 commit e8d89e1
Copy full SHA for e8d89e1

24 files changed

+1099
-278
lines changed

‎pkg/sns/SnsAsyncClient.php

Copy file name to clipboard
+53Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Sns;
6+
7+
use AsyncAws\Sns\Result\CreateTopicResponse;
8+
use AsyncAws\Sns\Result\ListSubscriptionsByTopicResponse;
9+
use AsyncAws\Sns\Result\PublishResponse;
10+
use AsyncAws\Sns\Result\SubscribeResponse;
11+
use AsyncAws\Sns\SnsClient as AwsSnsClient;
12+
13+
class SnsAsyncClient
14+
{
15+
private $client;
16+
17+
public function __construct(AwsSnsClient $client)
18+
{
19+
$this->client = $client;
20+
}
21+
22+
public function createTopic(array $args): CreateTopicResponse
23+
{
24+
return $this->client->CreateTopic($args);
25+
}
26+
27+
public function deleteTopic(string $topicArn): void
28+
{
29+
$this->client->DeleteTopic([
30+
'TopicArn' => $topicArn,
31+
]);
32+
}
33+
34+
public function publish(array $args): PublishResponse
35+
{
36+
return $this->client->Publish($args);
37+
}
38+
39+
public function subscribe(array $args): SubscribeResponse
40+
{
41+
return $this->client->Subscribe($args);
42+
}
43+
44+
public function unsubscribe(array $args): void
45+
{
46+
$this->client->Unsubscribe($args);
47+
}
48+
49+
public function listSubscriptionsByTopic(array $args): ListSubscriptionsByTopicResponse
50+
{
51+
return $this->client->ListSubscriptionsByTopic($args);
52+
}
53+
}

‎pkg/sns/SnsClient.php

Copy file name to clipboardExpand all lines: pkg/sns/SnsClient.php
+88-27Lines changed: 88 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,82 +4,143 @@
44

55
namespace Enqueue\Sns;
66

7-
use AsyncAws\Sns\Result\CreateTopicResponse;
8-
use AsyncAws\Sns\Result\ListSubscriptionsByTopicResponse;
9-
use AsyncAws\Sns\Result\PublishResponse;
10-
use AsyncAws\Sns\Result\SubscribeResponse;
11-
use AsyncAws\Sns\SnsClient as AwsSnsClient;
7+
use Aws\MultiRegionClient;
8+
use Aws\Result;
9+
use Aws\Sns\SnsClient as AwsSnsClient;
10+
11+
@trigger_error(sprintf('The class "%s" is deprecated since 0.10. Use "%s" instead.', __CLASS__, SnsAsyncClient::class), E_USER_DEPRECATED);
1212

1313
class SnsClient
1414
{
1515
/**
1616
* @var AwsSnsClient
1717
*/
18-
private $client;
18+
private $singleClient;
19+
20+
/**
21+
* @var MultiRegionClient
22+
*/
23+
private $multiClient;
1924

2025
/**
2126
* @var callable
2227
*/
2328
private $inputClient;
2429

2530
/**
26-
* @param AwsSnsClient|callable $inputClient
31+
* @param AwsSnsClient|MultiRegionClient|callable $inputClient
2732
*/
2833
public function __construct($inputClient)
2934
{
3035
$this->inputClient = $inputClient;
3136
}
3237

33-
public function createTopic(array $args): CreateTopicResponse
38+
public function createTopic(array $args): Result
3439
{
35-
return $this->getAWSClient()->CreateTopic($args);
40+
return $this->callApi('createTopic', $args);
3641
}
3742

38-
public function deleteTopic(string $topicArn): void
43+
public function deleteTopic(string $topicArn): Result
3944
{
40-
$this->getAWSClient()->DeleteTopic([
45+
return $this->callApi('DeleteTopic', [
4146
'TopicArn' => $topicArn,
4247
]);
4348
}
4449

45-
public function publish(array $args): PublishResponse
50+
public function publish(array $args): Result
4651
{
47-
return $this->getAWSClient()->Publish($args);
52+
return $this->callApi('publish', $args);
4853
}
4954

50-
public function subscribe(array $args): SubscribeResponse
55+
public function subscribe(array $args): Result
5156
{
52-
return $this->getAWSClient()->Subscribe($args);
57+
return $this->callApi('subscribe', $args);
5358
}
5459

55-
public function unsubscribe(array $args): void
60+
public function unsubscribe(array $args): Result
5661
{
57-
$this->getAWSClient()->Unsubscribe($args);
62+
return $this->callApi('unsubscribe', $args);
5863
}
5964

60-
public function listSubscriptionsByTopic(array $args): ListSubscriptionsByTopicResponse
65+
public function listSubscriptionsByTopic(array $args): Result
6166
{
62-
return $this->getAWSClient()->ListSubscriptionsByTopic($args);
67+
return $this->callApi('ListSubscriptionsByTopic', $args);
6368
}
6469

6570
public function getAWSClient(): AwsSnsClient
6671
{
67-
if ($this->client) {
68-
return $this->client;
72+
$this->resolveClient();
73+
74+
if ($this->singleClient) {
75+
return $this->singleClient;
6976
}
7077

71-
$client = $this->inputClient;
72-
if (is_callable($client)) {
73-
$client = $client();
78+
if ($this->multiClient) {
79+
$mr = new \ReflectionMethod($this->multiClient, 'getClientFromPool');
80+
$mr->setAccessible(true);
81+
$singleClient = $mr->invoke($this->multiClient, $this->multiClient->getRegion());
82+
$mr->setAccessible(false);
83+
84+
return $singleClient;
7485
}
7586

76-
if ($client instanceof AwsSnsClient) {
77-
return $this->client = $client;
87+
throw new \LogicException('The multi or single client must be set');
88+
}
89+
90+
private function callApi(string $name, array $args): Result
91+
{
92+
$this->resolveClient();
93+
94+
if ($this->singleClient) {
95+
if (false == empty($args['@region'])) {
96+
throw new \LogicException('Cannot send message to another region because transport is configured with single aws client');
97+
}
98+
99+
unset($args['@region']);
100+
101+
return call_user_func([$this->singleClient, $name], $args);
102+
}
103+
104+
if ($this->multiClient) {
105+
return call_user_func([$this->multiClient, $name], $args);
106+
}
107+
108+
throw new \LogicException('The multi or single client must be set');
109+
}
110+
111+
private function resolveClient(): void
112+
{
113+
if ($this->singleClient || $this->multiClient) {
114+
return;
115+
}
116+
117+
$client = $this->inputClient;
118+
if ($client instanceof MultiRegionClient) {
119+
$this->multiClient = $client;
120+
121+
return;
122+
} elseif ($client instanceof AwsSnsClient) {
123+
$this->singleClient = $client;
124+
125+
return;
126+
} elseif (is_callable($client)) {
127+
$client = call_user_func($client);
128+
if ($client instanceof MultiRegionClient) {
129+
$this->multiClient = $client;
130+
131+
return;
132+
}
133+
if ($client instanceof AwsSnsClient) {
134+
$this->singleClient = $client;
135+
136+
return;
137+
}
78138
}
79139

80140
throw new \LogicException(sprintf(
81-
'The input client must be an instance of "%s" or a callable that returns it. Got "%s"',
141+
'The input client must be an instance of "%s" or "%s" or a callable that returns one of those. Got "%s"',
82142
AwsSnsClient::class,
143+
MultiRegionClient::class,
83144
is_object($client) ? get_class($client) : gettype($client)
84145
));
85146
}

‎pkg/sns/SnsConnectionFactory.php

Copy file name to clipboardExpand all lines: pkg/sns/SnsConnectionFactory.php
+18-18Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
namespace Enqueue\Sns;
66

7-
use AsyncAws\Sns\SnsClient as AwsSnsClient;
7+
use AsyncAws\Sns\SnsClient as AsyncAwsSnsClient;
8+
use Aws\Sns\SnsClient as AwsSnsClient;
89
use Enqueue\Dsn\Dsn;
910
use Interop\Queue\ConnectionFactory;
1011
use Interop\Queue\Context;
@@ -17,7 +18,7 @@ class SnsConnectionFactory implements ConnectionFactory
1718
private $config;
1819

1920
/**
20-
* @var SnsClient
21+
* @var SnsClient|SnsAsyncClient
2122
*/
2223
private $client;
2324

@@ -27,7 +28,6 @@ class SnsConnectionFactory implements ConnectionFactory
2728
* 'secret' => null, AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment.
2829
* 'token' => null, AWS credentials. If no credentials are provided, the SDK will attempt to load them from the environment.
2930
* 'region' => null, (string, required) Region to connect to. See http://docs.aws.amazon.com/general/latest/gr/rande.html for a list of available regions.
30-
* 'lazy' => true, Enable lazy connection (boolean)
3131
* 'endpoint' => null (string, default=null) The full URI of the webservice. This is only required when connecting to a custom endpoint e.g. localstack
3232
* 'profile' => null, (string, default=null) The name of an AWS profile to used, if provided the SDK will attempt to read associated credentials from the ~/.aws/credentials file.
3333
* ].
@@ -37,10 +37,16 @@ class SnsConnectionFactory implements ConnectionFactory
3737
* sns:
3838
* sns::?key=aKey&secret=aSecret&token=aToken
3939
*
40-
* @param array|string|AwsSnsClient|null $config
40+
* @param array|string|AwsSnsClient|AsyncAwsSnsClient|null $config
4141
*/
4242
public function __construct($config = 'sns:')
4343
{
44+
if ($config instanceof AsyncAwsSnsClient) {
45+
$this->client = new SnsAsyncClient($config);
46+
$this->config = $this->defaultConfig();
47+
48+
return;
49+
}
4450
if ($config instanceof AwsSnsClient) {
4551
$this->client = new SnsClient($config);
4652
$this->config = ['lazy' => false] + $this->defaultConfig();
@@ -59,7 +65,7 @@ public function __construct($config = 'sns:')
5965
unset($config['dsn']);
6066
}
6167
} else {
62-
throw new \LogicException(sprintf('The config must be either an array of options, a DSN string, null or instance of %s', AwsSnsClient::class));
68+
throw new \LogicException(sprintf('The config must be either an array of options, a DSN string, null or instance of %s', AsyncAwsSnsClient::class));
6369
}
6470

6571
$this->config = array_replace($this->defaultConfig(), $config);
@@ -73,7 +79,12 @@ public function createContext(): Context
7379
return new SnsContext($this->establishConnection(), $this->config);
7480
}
7581

76-
private function establishConnection(): SnsClient
82+
/**
83+
* @todo in 0.11 restore return typehint
84+
*
85+
* @return SnsAsyncClient|SnsClient
86+
*/
87+
private function establishConnection()
7788
{
7889
if ($this->client) {
7990
return $this->client;
@@ -100,16 +111,7 @@ private function establishConnection(): SnsClient
100111
}
101112
}
102113

103-
$establishConnection = function () use ($config) {
104-
return new AwsSnsClient($config);
105-
};
106-
107-
$this->client = $this->config['lazy'] ?
108-
new SnsClient($establishConnection) :
109-
new SnsClient($establishConnection())
110-
;
111-
112-
return $this->client;
114+
return $this->client = new SnsAsyncClient(new AsyncAwsSnsClient($config));
113115
}
114116

115117
private function parseDsn(string $dsn): array
@@ -125,7 +127,6 @@ private function parseDsn(string $dsn): array
125127
'secret' => $dsn->getString('secret'),
126128
'token' => $dsn->getString('token'),
127129
'region' => $dsn->getString('region'),
128-
'lazy' => $dsn->getBool('lazy'),
129130
'endpoint' => $dsn->getString('endpoint'),
130131
'profile' => $dsn->getString('profile'),
131132
]), function ($value) { return null !== $value; });
@@ -138,7 +139,6 @@ private function defaultConfig(): array
138139
'secret' => null,
139140
'token' => null,
140141
'region' => null,
141-
'lazy' => true,
142142
'endpoint' => null,
143143
'profile' => null,
144144
];

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.