Description
Description
I'm trying to run a Kafka consumer using the confluent-kafka python package. I've been using this example : https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/oauth_producer.py as reference. We have oauth setup. And this setup works using the command line.
%3|1626297590.465|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://<kakfa-server>.com:9093/bootstrap]: sasl_ssl://<kakfa-server>.com:9093/bootstrap: SSL handshake failed: error:14090086:SSL routines:ssl3_get_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (brew install openssl) (after 40ms in state SSL_HANDSHAKE, 1 identical error(s) suppressed)
How to reproduce
def _get_token(config):
"""Note here value of config comes from sasl.oauthbearer.config below.
It is not used in this example but you can put arbitrary values to
configure how you can get the token (e.g. which token URL to use)
"""
payload = {
'grant_type': 'client_credentials',
'scope': scopes
}
resp = requests.post(token_url,
auth=(client_id, client_secret),
data=payload)
token = resp.json()
return token['access_token'], time.time() + float(token['expires_in'])
def producer_config():
logger = logging.getLogger(__name__)
return {
'bootstrap.servers': bootstrap_servers,
'key.serializer': StringSerializer('utf_8'),
'value.serializer': StringSerializer('utf_8'),
'security.protocol': 'sasl_ssl',
'sasl.mechanisms': 'OAUTHBEARER',
# sasl.oauthbearer.config can be used to pass argument to your oauth_cb
# It is not used in this example since we are passing all the arguments
# from command line
# 'sasl.oauthbearer.config': 'not-used',
'oauth_cb': functools.partial(_get_token),
'logger': logger,
'security.protocol': 'SASL_SSL',
'ssl.ca.location': '<CA-CERT>',
'ssl.certificate.location': '<CERT-LOCATION>',
'ssl.key.location': '<KEY-LOCATION>'
}
def delivery_report(err, msg):
"""
Reports the failure or success of a message delivery.
Args:
err (KafkaError): The error that occurred on None on success.
msg (Message): The message that was produced or failed.
Note:
In the delivery report callback the Message.key() and Message.value()
will be the binary format as encoded by any configured Serializers and
not the same object that was passed to produce().
If you wish to pass the original object(s) for key and value to delivery
report callback we recommend a bound callback or lambda where you pass
the objects along.
"""
if err is not None:
print('Delivery failed for User record {}: {}'.format(msg.key(), err))
return
print('User record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))
def main():
producer_conf = producer_config()
delimiter = "|"
producer = SerializingProducer(producer_conf)
print('Producing records to topic {}. ^C to exit.'.format(topic))
while True:
# Serve on_delivery callbacks from previous calls to produce()
producer.poll(0.0)
try:
msg_data = input(">")
msg = msg_data.split(delimiter)
if len(msg) == 2:
producer.produce(topic=topic, key=msg[0], value=msg[1],
on_delivery=delivery_report)
else:
producer.produce(topic=topic, value=msg[0],
on_delivery=delivery_report)
except KeyboardInterrupt:
break
print('\nFlushing {} records...'.format(len(producer)))
producer.flush()
main()
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): - Apache Kafka broker version:
- Client configuration:
{...}
- Operating system:
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue
confluent-kafka-version : 1.7.0
librdkafka : 1.7.0