Description
I need to create a Kafka producer using the confluent_kafka.Producer
that works with AWS MSK using IAM role-based authentication. To achieve this, I have developed a working solution as outlined below.
import json
import logging
from confluent_kafka import Producer
from config.settings import KAFKA_BOOTSTRAP_SERVERS, REGION
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import socket
logger = logging.getLogger(__name__)
class KafkaProducerService:
def __init__(self):
self.producer = Producer(
{
"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
"acks": "all",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"oauth_cb": self.oauth_cb,
"client.id": socket.gethostname()
}
)
def oauth_cb(self, oauth_config):
auth_token, exp_ms = MSKAuthTokenProvider.generate_auth_token(REGION)
return auth_token, exp_ms/1000
def produce_message(self, topic, message):
try:
self.producer.produce(topic=topic, value=json.dumps(message).encode("utf-8"))
self.producer.flush()
self.producer.poll(10)
logger.info(f"Produced message to {topic}: {message}")
except Exception as e:
logger.error(f"Failed to send message to {topic}: {str(e)}")
raise
kafka_producer_service = KafkaProducerService()
The producer is able to establish a healthy connection and successfully send messages to the consumer. However, if the connection remains idle for a few minutes, it starts logging MSK-related connection errors repeatedly.
|1743698633.362|FAIL|api-service-29457134-mr5tm#producer-2| [thrd:sasl_ssl://b-1.*******devmsk.dc43-1948-12.kafka.us-east-1.a]: sasl_ssl://b-1.********devmsk.dc256562362.kafka.us-east-1.amazonaws.com:9098/1: SASL authentication error: [20234d-243-49fb-92e8-4]2324234: Access denied (after 235ms in state AUTH_REQ)
The error logs keep appearing repeatedly while the connection is idle. However, as soon as a new message is sent, it reaches the consumer successfully and the error logs stop. Once the connection becomes idle again, the same errors start appearing after a short timeout.
Can anyone help me to solve this, and able to give a best solution for the case.