Description
Description
In my project, I am using confluent-kafka-python-1.9.2
to consumer and produce messages onto kafka topic. OAuth provider is already set by the other team and token gets expires after 30 minutes. In My code I have used this configuration for the consumer.
config = {'bootstrap.servers': '<server-url>:9093',
'group.id': 'consumer-group',
'auto.offset.reset': 'latest',
'queued.max.messages.kbytes': 100000,
'enable.auto.commit': False,
'fetch.min.bytes': 100000,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
sasl.oauthbearer.config': 'oauth_cb',
'oauth_cb': functools.partial(_get_token,<client_id>,<client_secret>),
}
_get_token function:
def _get_token(client_id, client_secret, oauth_config):
realm = KeycloakRealm(
server_url="<auth_server_url>",
realm_name="<realm_name>",
)
oidc_client = realm.open_id_connect(
client_id=client_id,
client_secret=client_secret,
)
client_credentials = oidc_client.client_credentials()
access_token = client_credentials["access_token"]
expires_in = client_credentials["expires_in"]
print(client_credentials)
return access_token, time.time() + expires_in
Consumer code:
#.....
config.update({"key.deserializer": key_deserializer,
"value.deserializer": value_deserializer,})
consumer = DeserializingConsumer(config)
while True:
message = self._client.poll(timeout) if timeout else self._client.poll()
if message is None:
logger.debug("No messages found")
continue
message_error = message.error()
if message_error:
logger.error(message_error)
# processing message code
So while running the consumer, it can fetch the token for the first time and can consume the messages without any issue. but as token expires after 30 minutes, I started getting following error.
confluent_kafka.error.ConsumeError: KafkaError{code=TOPIC_AUTHORIZATION_FAILED,val=29,str="Fetch from broker 31 failed: Broker: Topic authorization failed"}
even I set expires_in
to 30 seconds or 1 minute, I still get the above error. so I don't understand that _get_token
is called after every 1 minute but when after 30 minutes, I get the above error.
I also tried to set oauthbearer_token_refresh_cb
but got this error:
cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Property "oauthbearer_token_refresh_cb" must be set through dedicated .._set_..() function"}
So I would like to know how to refresh token?
How to reproduce
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): - Apache Kafka broker version:
- Client configuration:
{...}
- Operating system:
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue