Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Confluent-Kafka-Python producing SSL handshake error #1163

Copy link
Copy link
Open
@arjun180

Description

@arjun180
Issue body actions

Description

I'm trying to run a Kafka consumer using the confluent-kafka python package. I've been using this example : https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/oauth_producer.py as reference. We have oauth setup. And this setup works using the command line.

%3|1626297590.465|FAIL|rdkafka#consumer-1| [thrd:sasl_ssl://<kakfa-server>.com:9093/bootstrap]: sasl_ssl://<kakfa-server>.com:9093/bootstrap: SSL handshake failed: error:14090086:SSL routines:ssl3_get_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (brew install openssl) (after 40ms in state SSL_HANDSHAKE, 1 identical error(s) suppressed)

How to reproduce

def _get_token(config):
    """Note here value of config comes from sasl.oauthbearer.config below.
    It is not used in this example but you can put arbitrary values to
    configure how you can get the token (e.g. which token URL to use)
    """
    payload = {
        'grant_type': 'client_credentials',
        'scope': scopes
    }
    resp = requests.post(token_url,
                         auth=(client_id, client_secret),
                         data=payload)
    token = resp.json()
    return token['access_token'], time.time() + float(token['expires_in'])

def producer_config():
    logger = logging.getLogger(__name__)
    return {
        'bootstrap.servers': bootstrap_servers,
        'key.serializer': StringSerializer('utf_8'),
        'value.serializer': StringSerializer('utf_8'),
        'security.protocol': 'sasl_ssl',
        'sasl.mechanisms': 'OAUTHBEARER',
        # sasl.oauthbearer.config can be used to pass argument to your oauth_cb
        # It is not used in this example since we are passing all the arguments
        # from command line
        # 'sasl.oauthbearer.config': 'not-used',
        'oauth_cb': functools.partial(_get_token),
        'logger': logger,
        'security.protocol': 'SASL_SSL',
        'ssl.ca.location': '<CA-CERT>',
        'ssl.certificate.location': '<CERT-LOCATION>',
        'ssl.key.location': '<KEY-LOCATION>'
    }

def delivery_report(err, msg):
    """
    Reports the failure or success of a message delivery.
    Args:
        err (KafkaError): The error that occurred on None on success.
        msg (Message): The message that was produced or failed.
    Note:
        In the delivery report callback the Message.key() and Message.value()
        will be the binary format as encoded by any configured Serializers and
        not the same object that was passed to produce().
        If you wish to pass the original object(s) for key and value to delivery
        report callback we recommend a bound callback or lambda where you pass
        the objects along.
    """
    if err is not None:
        print('Delivery failed for User record {}: {}'.format(msg.key(), err))
        return
    print('User record {} successfully produced to {} [{}] at offset {}'.format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))

def main():
    producer_conf = producer_config()
    delimiter = "|"
    producer = SerializingProducer(producer_conf)
    print('Producing records to topic {}. ^C to exit.'.format(topic))
    while True:
        # Serve on_delivery callbacks from previous calls to produce()
        producer.poll(0.0)
        try:
            msg_data = input(">")
            msg = msg_data.split(delimiter)
            if len(msg) == 2:
                producer.produce(topic=topic, key=msg[0], value=msg[1],
                                 on_delivery=delivery_report)
            else:
                producer.produce(topic=topic, value=msg[0],
                                 on_delivery=delivery_report)
        except KeyboardInterrupt:
            break
    print('\nFlushing {} records...'.format(len(producer)))
    producer.flush()

main()

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue

confluent-kafka-version : 1.7.0
librdkafka : 1.7.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      Morty Proxy This is a proxified and sanitized view of the page, visit original site.