Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[HELP] - Kafka Producer IAM Role Authentication Fails After Idle Period with MSK #1960

Copy link
Copy link
Open
@Sivakajan-Galaxy

Description

@Sivakajan-Galaxy
Issue body actions

I need to create a Kafka producer using the confluent_kafka.Producer that works with AWS MSK using IAM role-based authentication. To achieve this, I have developed a working solution as outlined below.

import json
import logging
from confluent_kafka import Producer
from config.settings import KAFKA_BOOTSTRAP_SERVERS, REGION
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import socket

logger = logging.getLogger(__name__)


class KafkaProducerService:
    def __init__(self):
        self.producer = Producer(
            {
                "bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
                "acks": "all",
                "security.protocol": "SASL_SSL",
                "sasl.mechanism": "OAUTHBEARER",
                "oauth_cb": self.oauth_cb,
                "client.id": socket.gethostname()
            }
        )

    def oauth_cb(self, oauth_config):
        auth_token, exp_ms = MSKAuthTokenProvider.generate_auth_token(REGION)
        return auth_token, exp_ms/1000

    def produce_message(self, topic, message):
        try:
            self.producer.produce(topic=topic, value=json.dumps(message).encode("utf-8"))
            self.producer.flush()
            self.producer.poll(10)
            logger.info(f"Produced message to {topic}: {message}")
        except Exception as e:
            logger.error(f"Failed to send message to {topic}: {str(e)}")
            raise


kafka_producer_service = KafkaProducerService()

The producer is able to establish a healthy connection and successfully send messages to the consumer. However, if the connection remains idle for a few minutes, it starts logging MSK-related connection errors repeatedly.

|1743698633.362|FAIL|api-service-29457134-mr5tm#producer-2| [thrd:sasl_ssl://b-1.*******devmsk.dc43-1948-12.kafka.us-east-1.a]: sasl_ssl://b-1.********devmsk.dc256562362.kafka.us-east-1.amazonaws.com:9098/1: SASL authentication error: [20234d-243-49fb-92e8-4]2324234: Access denied (after 235ms in state AUTH_REQ)

The error logs keep appearing repeatedly while the connection is idle. However, as soon as a new message is sent, it reaches the consumer successfully and the error logs stop. Once the connection becomes idle again, the same errors start appearing after a short timeout.

Can anyone help me to solve this, and able to give a best solution for the case.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      Morty Proxy This is a proxified and sanitized view of the page, visit original site.