14
14
use Predis \Connection \Factory ;
15
15
use Predis \Connection \Aggregate \PredisCluster ;
16
16
use Predis \Connection \Aggregate \RedisCluster ;
17
+ use Predis \Response \Status ;
17
18
use Symfony \Component \Cache \Exception \InvalidArgumentException ;
18
19
19
20
/**
@@ -136,11 +137,14 @@ public static function createConnection($dsn, array $options = array())
136
137
protected function doFetch (array $ ids )
137
138
{
138
139
if ($ ids ) {
139
- $ values = $ this ->redis ->mGet ($ ids );
140
- $ index = 0 ;
141
- foreach ($ ids as $ id ) {
142
- if ($ value = $ values [$ index ++]) {
143
- yield $ id => parent ::unserialize ($ value );
140
+ $ values = $ this ->pipeline (function () use ($ ids ) {
141
+ foreach ($ ids as $ id ) {
142
+ yield 'get ' => array ($ id );
143
+ }
144
+ });
145
+ foreach ($ values as $ id => $ v ) {
146
+ if ($ v ) {
147
+ yield $ id => parent ::unserialize ($ v );
144
148
}
145
149
}
146
150
}
@@ -251,61 +255,60 @@ protected function doSave(array $values, $lifetime)
251
255
return $ failed ;
252
256
}
253
257
254
- if (0 >= $ lifetime ) {
255
- $ this ->redis ->mSet ($ serialized );
256
-
257
- return $ failed ;
258
- }
259
-
260
- $ this ->pipeline (function ($ pipe ) use (&$ serialized , $ lifetime ) {
258
+ $ results = $ this ->pipeline (function () use ($ serialized , $ lifetime ) {
261
259
foreach ($ serialized as $ id => $ value ) {
262
- $ pipe ('setEx ' , $ id , array ($ lifetime , $ value ));
260
+ if (0 >= $ lifetime ) {
261
+ yield 'set ' => array ($ id , $ value );
262
+ } else {
263
+ yield 'setEx ' => array ($ id , $ lifetime , $ value );
264
+ }
263
265
}
264
266
});
267
+ foreach ($ results as $ id => $ result ) {
268
+ if (true !== $ result && (!$ result instanceof Status || $ result !== Status::get ('OK ' ))) {
269
+ $ failed [] = $ id ;
270
+ }
271
+ }
265
272
266
273
return $ failed ;
267
274
}
268
275
269
- private function execute ( $ command , $ id , array $ args , $ redis = null )
276
+ private function pipeline ( \ Closure $ generator )
270
277
{
271
- array_unshift ($ args , $ id );
272
- call_user_func_array (array ($ redis ?: $ this ->redis , $ command ), $ args );
273
- }
278
+ $ ids = array ();
274
279
275
- private function pipeline (\Closure $ callback )
276
- {
277
- $ redis = $ this ->redis ;
278
-
279
- try {
280
- if ($ redis instanceof \Predis \Client) {
281
- $ redis ->pipeline (function ($ pipe ) use ($ callback ) {
282
- $ this ->redis = $ pipe ;
283
- $ callback (array ($ this , 'execute ' ));
284
- });
285
- } elseif ($ redis instanceof \RedisArray) {
286
- $ connections = array ();
287
- $ callback (function ($ command , $ id , $ args ) use (&$ connections ) {
288
- if (!isset ($ connections [$ h = $ this ->redis ->_target ($ id )])) {
289
- $ connections [$ h ] = $ this ->redis ->_instance ($ h );
290
- $ connections [$ h ]->multi (\Redis::PIPELINE );
291
- }
292
- $ this ->execute ($ command , $ id , $ args , $ connections [$ h ]);
293
- });
294
- foreach ($ connections as $ c ) {
295
- $ c ->exec ();
280
+ if ($ this ->redis instanceof \Predis \Client) {
281
+ $ results = $ this ->redis ->pipeline (function ($ redis ) use ($ generator , &$ ids ) {
282
+ foreach ($ generator () as $ command => $ args ) {
283
+ call_user_func_array (array ($ redis , $ command ), $ args );
284
+ $ ids [] = $ args [0 ];
296
285
}
297
- } else {
298
- $ pipe = $ redis ->multi (\Redis::PIPELINE );
299
- try {
300
- $ callback (array ($ this , 'execute ' ));
301
- } finally {
302
- if ($ pipe ) {
303
- $ redis ->exec ();
304
- }
286
+ });
287
+ } elseif ($ this ->redis instanceof \RedisArray) {
288
+ $ connections = $ results = $ ids = array ();
289
+ foreach ($ generator () as $ command => $ args ) {
290
+ if (!isset ($ connections [$ h = $ this ->redis ->_target ($ args [0 ])])) {
291
+ $ connections [$ h ] = array ($ this ->redis ->_instance ($ h ), array ());
292
+ $ connections [$ h ][0 ]->multi (\Redis::PIPELINE );
305
293
}
294
+ call_user_func_array (array ($ connections [$ h ][0 ], $ command ), $ args );
295
+ $ connections [$ h ][1 ][] = $ args [0 ];
296
+ }
297
+ foreach ($ connections as $ c ) {
298
+ $ results = array_merge ($ results , $ c [0 ]->exec ());
299
+ $ ids = array_merge ($ ids , $ c [1 ]);
306
300
}
307
- } finally {
308
- $ this ->redis = $ redis ;
301
+ } else {
302
+ $ this ->redis ->multi (\Redis::PIPELINE );
303
+ foreach ($ generator () as $ command => $ args ) {
304
+ call_user_func_array (array ($ this ->redis , $ command ), $ args );
305
+ $ ids [] = $ args [0 ];
306
+ }
307
+ $ results = $ this ->redis ->exec ();
308
+ }
309
+
310
+ foreach ($ ids as $ k => $ id ) {
311
+ yield $ id => $ results [$ k ];
309
312
}
310
313
}
311
314
}
0 commit comments