From af27b52ccac951f0aa62961058fbdc9bd944c962 Mon Sep 17 00:00:00 2001 From: Marat Pak Date: Tue, 20 May 2025 11:35:58 +0200 Subject: [PATCH 1/2] feat: Make serializer configurable via YAML configuration - Add support for configuring serializers through YAML in RdKafkaContext - Allow serializer specification as a class name, array with options, or instance --- pkg/rdkafka/RdKafkaContext.php | 30 +++++++++++++++++++++++- pkg/rdkafka/Tests/RdKafkaContextTest.php | 22 +++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php index a252fcfd5..1e1fd9dbd 100644 --- a/pkg/rdkafka/RdKafkaContext.php +++ b/pkg/rdkafka/RdKafkaContext.php @@ -16,6 +16,7 @@ use Interop\Queue\Queue; use Interop\Queue\SubscriptionConsumer; use Interop\Queue\Topic; +use InvalidArgumentException; use RdKafka\Conf; use RdKafka\KafkaConsumer; use RdKafka\Producer as VendorProducer; @@ -54,8 +55,35 @@ public function __construct(array $config) $this->config = $config; $this->kafkaConsumers = []; $this->rdKafkaConsumers = []; + $this->configureSerializer($config); + } + + /** + * @param array $config + * @return void + */ + private function configureSerializer(array $config): void + { + if (!isset($config['serializer'])) { + $this->setSerializer(new JsonSerializer()); + return; + } - $this->setSerializer(new JsonSerializer()); + if (is_string($config['serializer'])) { + $this->setSerializer(new $config['serializer']()); + } elseif (is_array($config['serializer']) && isset($config['serializer']['class'])) { + $serializerClass = $config['serializer']['class']; + $serializerOptions = $config['serializer']['options'] ?? []; + if (!empty($serializerOptions)) { + $this->setSerializer(new $serializerClass($serializerOptions)); + } else { + $this->setSerializer(new $serializerClass()); + } + } elseif ($config['serializer'] instanceof Serializer) { + $this->setSerializer($config['serializer']); + } else { + throw new InvalidArgumentException('Invalid serializer configuration'); + } } /** diff --git a/pkg/rdkafka/Tests/RdKafkaContextTest.php b/pkg/rdkafka/Tests/RdKafkaContextTest.php index dc1b597de..f67408baa 100644 --- a/pkg/rdkafka/Tests/RdKafkaContextTest.php +++ b/pkg/rdkafka/Tests/RdKafkaContextTest.php @@ -8,6 +8,7 @@ use Enqueue\RdKafka\Serializer; use Interop\Queue\Exception\InvalidDestinationException; use Interop\Queue\Exception\TemporaryQueueNotSupportedException; +use InvalidArgumentException; use PHPUnit\Framework\TestCase; class RdKafkaContextTest extends TestCase @@ -36,6 +37,27 @@ public function testShouldSetJsonSerializerInConstructor() $this->assertInstanceOf(JsonSerializer::class, $context->getSerializer()); } + public function testShouldUseStringSerializerClassFromConfig() + { + $mockSerializerClass = get_class($this->createMock(Serializer::class)); + + $context = new RdKafkaContext([ + 'serializer' => $mockSerializerClass + ]); + + $this->assertInstanceOf($mockSerializerClass, $context->getSerializer()); + } + + public function testShouldThrowExceptionOnInvalidSerializerConfig() + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Invalid serializer configuration'); + + new RdKafkaContext([ + 'serializer' => 123 + ]); + } + public function testShouldAllowGetPreviouslySetSerializer() { $context = new RdKafkaContext([]); From 55597235ef721317d33fe9da4572ebdfc9309e25 Mon Sep 17 00:00:00 2001 From: Marat Pak Date: Thu, 22 May 2025 09:28:01 +0200 Subject: [PATCH 2/2] Add fallback to JsonSerializer in configureSerializer - Ensures robustness by providing a default serializer - Refactoring --- pkg/rdkafka/RdKafkaContext.php | 81 +++++++++++++++--------- pkg/rdkafka/Tests/RdKafkaContextTest.php | 14 ++-- 2 files changed, 62 insertions(+), 33 deletions(-) diff --git a/pkg/rdkafka/RdKafkaContext.php b/pkg/rdkafka/RdKafkaContext.php index 1e1fd9dbd..cd0ff3c1a 100644 --- a/pkg/rdkafka/RdKafkaContext.php +++ b/pkg/rdkafka/RdKafkaContext.php @@ -16,7 +16,6 @@ use Interop\Queue\Queue; use Interop\Queue\SubscriptionConsumer; use Interop\Queue\Topic; -use InvalidArgumentException; use RdKafka\Conf; use RdKafka\KafkaConsumer; use RdKafka\Producer as VendorProducer; @@ -58,34 +57,6 @@ public function __construct(array $config) $this->configureSerializer($config); } - /** - * @param array $config - * @return void - */ - private function configureSerializer(array $config): void - { - if (!isset($config['serializer'])) { - $this->setSerializer(new JsonSerializer()); - return; - } - - if (is_string($config['serializer'])) { - $this->setSerializer(new $config['serializer']()); - } elseif (is_array($config['serializer']) && isset($config['serializer']['class'])) { - $serializerClass = $config['serializer']['class']; - $serializerOptions = $config['serializer']['options'] ?? []; - if (!empty($serializerOptions)) { - $this->setSerializer(new $serializerClass($serializerOptions)); - } else { - $this->setSerializer(new $serializerClass()); - } - } elseif ($config['serializer'] instanceof Serializer) { - $this->setSerializer($config['serializer']); - } else { - throw new InvalidArgumentException('Invalid serializer configuration'); - } - } - /** * @return RdKafkaMessage */ @@ -208,6 +179,58 @@ public static function getLibrdKafkaVersion(): string return "$major.$minor.$patch"; } + /** + * @return void + * JsonSerializer should be the default fallback if no serializer is specified + */ + private function configureSerializer(array $config): void + { + if (!isset($config['serializer'])) { + $this->setSerializer(new JsonSerializer()); + + return; + } + + $serializer = $config['serializer']; + + if ($serializer instanceof Serializer) { + $this->setSerializer($serializer); + + return; + } + + $serializerClass = $this->resolveSerializerClass($serializer); + + if (!class_exists($serializerClass) || !is_a($serializerClass, Serializer::class, true)) { + throw $this->createInvalidSerializerException($serializerClass); + } + + $serializerOptions = $serializer['options'] ?? []; + $this->setSerializer(new $serializerClass($serializerOptions)); + } + + private function resolveSerializerClass(mixed $serializer): string + { + if (is_string($serializer)) { + return $serializer; + } + + if (is_array($serializer) && isset($serializer['class'])) { + return $serializer['class']; + } + + throw $this->createInvalidSerializerException($serializer); + } + + private function createInvalidSerializerException(mixed $value): \InvalidArgumentException + { + return new \InvalidArgumentException(sprintf( + 'Invalid serializer configuration. Expected "serializer" to be a string, an array with a "class" key, or a %s instance. Received %s instead.', + Serializer::class, + get_debug_type($value) + )); + } + private function getConf(): Conf { if (null === $this->conf) { diff --git a/pkg/rdkafka/Tests/RdKafkaContextTest.php b/pkg/rdkafka/Tests/RdKafkaContextTest.php index f67408baa..a691fcc5b 100644 --- a/pkg/rdkafka/Tests/RdKafkaContextTest.php +++ b/pkg/rdkafka/Tests/RdKafkaContextTest.php @@ -8,7 +8,6 @@ use Enqueue\RdKafka\Serializer; use Interop\Queue\Exception\InvalidDestinationException; use Interop\Queue\Exception\TemporaryQueueNotSupportedException; -use InvalidArgumentException; use PHPUnit\Framework\TestCase; class RdKafkaContextTest extends TestCase @@ -42,19 +41,26 @@ public function testShouldUseStringSerializerClassFromConfig() $mockSerializerClass = get_class($this->createMock(Serializer::class)); $context = new RdKafkaContext([ - 'serializer' => $mockSerializerClass + 'serializer' => $mockSerializerClass, ]); $this->assertInstanceOf($mockSerializerClass, $context->getSerializer()); } + public function testShouldUseJsonSerializer() + { + $context = new RdKafkaContext([]); + + $this->assertInstanceOf(JsonSerializer::class, $context->getSerializer()); + } + public function testShouldThrowExceptionOnInvalidSerializerConfig() { - $this->expectException(InvalidArgumentException::class); + $this->expectException(\InvalidArgumentException::class); $this->expectExceptionMessage('Invalid serializer configuration'); new RdKafkaContext([ - 'serializer' => 123 + 'serializer' => 123, ]); }