Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

oauth token not refreshing #1485

Copy link
Copy link
Open
Open
Copy link
@sms1190

Description

@sms1190
Issue body actions

Description

In my project, I am using confluent-kafka-python-1.9.2 to consumer and produce messages onto kafka topic. OAuth provider is already set by the other team and token gets expires after 30 minutes. In My code I have used this configuration for the consumer.

config = {'bootstrap.servers': '<server-url>:9093', 
'group.id': 'consumer-group', 
'auto.offset.reset': 'latest',
'queued.max.messages.kbytes': 100000,
'enable.auto.commit': False,
'fetch.min.bytes': 100000,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER', 
sasl.oauthbearer.config': 'oauth_cb',
'oauth_cb': functools.partial(_get_token,<client_id>,<client_secret>), 
}

_get_token function:

def _get_token(client_id, client_secret, oauth_config):
    realm = KeycloakRealm(
        server_url="<auth_server_url>",
        realm_name="<realm_name>",
    )
    oidc_client = realm.open_id_connect(
        client_id=client_id,
        client_secret=client_secret,
    )
    client_credentials = oidc_client.client_credentials()
    access_token = client_credentials["access_token"]
    expires_in = client_credentials["expires_in"]
    print(client_credentials)
    return access_token, time.time() + expires_in

Consumer code:

#.....
config.update({"key.deserializer": key_deserializer,
                          "value.deserializer": value_deserializer,})
consumer = DeserializingConsumer(config)
while True:
        message = self._client.poll(timeout) if timeout else self._client.poll()
        if message is None:
              logger.debug("No messages found")
              continue
        message_error = message.error()
        if message_error:
              logger.error(message_error)
       # processing message code 

So while running the consumer, it can fetch the token for the first time and can consume the messages without any issue. but as token expires after 30 minutes, I started getting following error.

confluent_kafka.error.ConsumeError: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Fetch from broker 31 failed: Broker: Topic authorization failed"}

even I set expires_in to 30 seconds or 1 minute, I still get the above error. so I don't understand that _get_token is called after every 1 minute but when after 30 minutes, I get the above error.

I also tried to set oauthbearer_token_refresh_cb but got this error:

cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Property "oauthbearer_token_refresh_cb" must be set through dedicated .._set_..() function"}

So I would like to know how to refresh token?

How to reproduce

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      Morty Proxy This is a proxified and sanitized view of the page, visit original site.