Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[VarDumper] Add a RdKafka caster #35347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions 15 .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ before_install:
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 -p 7006:7006 -p 7007:7007 -e "STANDALONE=true" --name redis-cluster grokzen/redis-cluster:5.0.4
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'

- |
# Start Kafka and install an up-to-date librdkafka
docker network create kafka_network
docker pull wurstmeister/zookeeper:3.4.6
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:3.4.6
docker pull wurstmeister/kafka:2.12-2.3.1
docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=false" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:2.12-2.3.1
export KAFKA_BROKER=kafka:9092
sudo sh -c 'echo "\n127.0.0.1 kafka\n" >> /etc/hosts'

mkdir /tmp/librdkafka
curl https://codeload.github.com/edenhill/librdkafka/tar.gz/v0.11.6 | tar xzf - -C /tmp/librdkafka
(cd /tmp/librdkafka/librdkafka-0.11.6 && ./configure && make && sudo make install)

- |
# General configuration
set -e
Expand Down Expand Up @@ -175,6 +189,7 @@ before_install:
tfold ext.igbinary tpecl igbinary-3.1.2 igbinary.so $INI
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI
tfold ext.rdkafka tpecl rdkafka-4.0.2 rdkafka.so $INI
tfold ext.redis tpecl redis-4.3.0 redis.so $INI "no"
done
- |
Expand Down
5 changes: 5 additions & 0 deletions 5 src/Symfony/Component/VarDumper/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CHANGELOG
=========

5.1.0
-----

* added `RdKafka` support

4.4.0
-----

Expand Down
187 changes: 187 additions & 0 deletions 187 src/Symfony/Component/VarDumper/Caster/RdKafkaCaster.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\VarDumper\Caster;

use RdKafka;
use RdKafka\Conf;
use RdKafka\Exception as RdKafkaException;
use RdKafka\KafkaConsumer;
use RdKafka\Message;
use RdKafka\Metadata\Broker as BrokerMetadata;
use RdKafka\Metadata\Collection as CollectionMetadata;
use RdKafka\Metadata\Partition as PartitionMetadata;
use RdKafka\Metadata\Topic as TopicMetadata;
use RdKafka\Topic;
use RdKafka\TopicConf;
use RdKafka\TopicPartition;
use Symfony\Component\VarDumper\Cloner\Stub;

/**
* Casts RdKafka related classes to array representation.
*
* @author Romain Neutron <imprec@gmail.com>
*/
class RdKafkaCaster
{
public static function castKafkaConsumer(KafkaConsumer $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;

try {
$assignment = $c->getAssignment();
} catch (RdKafkaException $e) {
$assignment = [];
}

$a += [
$prefix.'subscription' => $c->getSubscription(),
$prefix.'assignment' => $assignment,
];

$a += self::extractMetadata($c);

return $a;
}

public static function castTopic(Topic $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;

$a += [
$prefix.'name' => $c->getName(),
];

return $a;
}

public static function castTopicPartition(TopicPartition $c, array $a)
{
$prefix = Caster::PREFIX_VIRTUAL;

$a += [
$prefix.'offset' => $c->getOffset(),
$prefix.'partition' => $c->getPartition(),
$prefix.'topic' => $c->getTopic(),
];

return $a;
}

public static function castMessage(Message $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;

$a += [
$prefix.'errstr' => $c->errstr(),
];

return $a;
}

public static function castConf(Conf $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;

foreach ($c->dump() as $key => $value) {
$a[$prefix.$key] = $value;
}

return $a;
}

public static function castTopicConf(TopicConf $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;

foreach ($c->dump() as $key => $value) {
$a[$prefix.$key] = $value;
}

return $a;
}

public static function castRdKafka(\RdKafka $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;

$a += [
$prefix.'out_q_len' => $c->getOutQLen(),
];

$a += self::extractMetadata($c);

return $a;
}

public static function castCollectionMetadata(CollectionMetadata $c, array $a, Stub $stub, $isNested)
{
$a += iterator_to_array($c);

return $a;
}

public static function castTopicMetadata(TopicMetadata $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;

$a += [
$prefix.'name' => $c->getTopic(),
$prefix.'partitions' => $c->getPartitions(),
];

return $a;
}

public static function castPartitionMetadata(PartitionMetadata $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;

$a += [
$prefix.'id' => $c->getId(),
$prefix.'err' => $c->getErr(),
$prefix.'leader' => $c->getLeader(),
];

return $a;
}

public static function castBrokerMetadata(BrokerMetadata $c, array $a, Stub $stub, $isNested)
{
$prefix = Caster::PREFIX_VIRTUAL;

$a += [
$prefix.'id' => $c->getId(),
$prefix.'host' => $c->getHost(),
$prefix.'port' => $c->getPort(),
];

return $a;
}

private static function extractMetadata($c)
{
$prefix = Caster::PREFIX_VIRTUAL;

try {
$m = $c->getMetadata(true, null, 500);
} catch (RdKafkaException $e) {
return [];
}

return [
$prefix.'orig_broker_id' => $m->getOrigBrokerId(),
$prefix.'orig_broker_name' => $m->getOrigBrokerName(),
$prefix.'brokers' => $m->getBrokers(),
$prefix.'topics' => $m->getTopics(),
];
}
}
12 changes: 12 additions & 0 deletions 12 src/Symfony/Component/VarDumper/Cloner/AbstractCloner.php
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,18 @@ abstract class AbstractCloner implements ClonerInterface
':persistent stream' => ['Symfony\Component\VarDumper\Caster\ResourceCaster', 'castStream'],
':stream-context' => ['Symfony\Component\VarDumper\Caster\ResourceCaster', 'castStreamContext'],
':xml' => ['Symfony\Component\VarDumper\Caster\XmlResourceCaster', 'castXml'],

'RdKafka' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castRdKafka'],
'RdKafka\Conf' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castConf'],
'RdKafka\KafkaConsumer' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castKafkaConsumer'],
'RdKafka\Metadata\Broker' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castBrokerMetadata'],
'RdKafka\Metadata\Collection' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castCollectionMetadata'],
'RdKafka\Metadata\Partition' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castPartitionMetadata'],
'RdKafka\Metadata\Topic' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopicMetadata'],
'RdKafka\Message' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castMessage'],
'RdKafka\Topic' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopic'],
'RdKafka\TopicPartition' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopicPartition'],
'RdKafka\TopicConf' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopicConf'],
];

protected $maxItems = 2500;
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.