Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

SaslAuthenticate error: SaslAuthenticationFailedError #2638

Copy link
Copy link
Closed
@harisiqbal12

Description

@harisiqbal12
Issue body actions

We are experiencing persistent SaslAuthenticationFailedError: Access denied errors when our Python application, running in an EKS pod, attempts to connect to an AWS MSK cluster using IAM authentication. This issue occurs despite the IAM user (arn:aws:iam::xxxxxxxx) associated with the credentials we are trying to use having the necessary permissions for MSK, as confirmed by other service which is running in java with the same credentials able to connect to the MSK.

from kafka.sasl.oauth import AbstractTokenProvider
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

from app.core.config import settings
from app.core.logger import logger

class MSKTokenProvider(AbstractTokenProvider):
    def token(self):
        oauth2_token, expiry_millis = MSKAuthTokenProvider.generate_auth_token(
            region=settings.AWS_DEFAULT_REGION,
        )
        logger.info(
            f"MSKTOKENPROVIDER: Token generated successfully. Expires in approx {expiry_millis / 1000 / 60:.1f} minutes."
        )
        return oauth2_token


class KafkaTransactionEventConsumer:
    def __init__(self, loop=None):
        self.consumer = None
        self.admin_client = None
        self.running = False
        self._stop_event = threading.Event()
        self.consumer_thread = None
        self.loop = loop or asyncio.get_event_loop()
        self.token_provider = MSKTokenProvider()

    async def start(self):
        bootstrap_servers = get_bootstrap_servers()
        logger.info(f"Bootstrap servers: {bootstrap_servers}") # retrieving  the brokers
        consumer_config = {
                "bootstrap_servers": bootstrap_servers,
                "security_protocol": "SASL_SSL",
                "sasl_mechanism": "AWS_MSK_IAM",
                "client_id": (settings.CLIENT_ID or "default_client_id")
                + "-main-consumer",
                "group_id": (settings.CLIENT_ID or "default_client_id")
                + "-consumer-group",
                "value_deserializer": lambda m: json.loads(m.decode("utf-8")),
                "auto_offset_reset": "earliest",
                "consumer_timeout_ms": 1000,
                "sasl_oauth_token_provider": self.token_provider,
                "api_version": (2, 6, 0),  
                "api_version_auto_timeout_ms": 10000,
                "request_timeout_ms": 30000,
                "connections_max_idle_ms": 540000,
                "session_timeout_ms": 30000,
                "heartbeat_interval_ms": 10000,
                "max_poll_records": 100,
                "enable_auto_commit": True,
                "auto_commit_interval_ms": 5000,
            }

         logger.info("Creating Kafka consumer with config...")
         self.consumer = KafkaConsumer(
                settings.KAFKA_TOPIC_TRANSACTION_EVENTS,
                **consumer_config,
            )

Logs

2025-05-31 11:48:58,379 - INFO - Found credentials in environment variables.
2025-05-31 11:48:58,690 - ERROR - <BrokerConnection client_id=REPORTING_API-admin-client, node_id=bootstrap-2 host=b-3.xxxx.i3df6u.c2.kafka.me-central-1.amazonaws.com:9098 <authenticating> [IPv4 ('10.106.2.49', 9098)]>: SaslAuthenticate error: SaslAuthenticationFailedError ([cc8333d3-37a9-4bb8-9d5a-1b1127502cad]: Access denied)
2025-05-31 11:48:58,690 - ERROR - <BrokerConnection client_id=REPORTING_API-admin-client, node_id=bootstrap-2 host=b-3.xxxx.i3df6u.c2.kafka.me-central-1.amazonaws.com:9098 <authenticating> [IPv4 ('10.106.2.49', 9098)]>: Closing connection. [Error 58] SaslAuthenticationFailedError: [cc8333d3-37a9-4bb8-9d5a-1b1127502cad]: Access denied
2025-05-31 11:49:15,701 - INFO - <BrokerConnection client_id=REPORTING_API-admin-client, node_id=bootstrap-0 host=b-2.xxx.i3df6u.c2.kafka.me-central-1.amazonaws.com:9098 <connecting> [IPv4 ('10.106.0.16', 9098)]>: connecting to b-2.xxx.i3df6u.c2.kafka.me-central-1.amazonaws.com:9098 [('10.xxx.xx.xx', 9098) IPv4]
2025-05-31 11:49:15,727 - INFO - Found credentials in environment variables.

Note

  • I'm able to get the brokers with the same credentials and kafka ARN this is the code of retrieving the brokers.
def get_bootstrap_servers() -> list:
    logger.info(settings.AWS_KAFKA_ARN)
    try:
        config = Config(
            region_name=settings.AWS_DEFAULT_REGION,
        )
        client = boto3.client(
            "kafka",
            region_name=settings.AWS_DEFAULT_REGION,
            aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
            config=config,
        )

        bootstrap_response = client.get_bootstrap_brokers(
            ClusterArn=settings.AWS_KAFKA_ARN
        )

        logger.info(bootstrap_response)

        if "BootstrapBrokerStringSaslIam" in bootstrap_response:
            broker_string = bootstrap_response["BootstrapBrokerStringSaslIam"]
            return broker_string.split(",")
        elif "BootstrapBrokerStringPublicTls" in bootstrap_response:
            broker_string = bootstrap_response["BootstrapBrokerStringPublicTls"]
            return broker_string.split(",")
        elif "BootstrapBrokerString" in bootstrap_response:
            broker_string = bootstrap_response["BootstrapBrokerString"]
            return broker_string.split(",")
        elif "BootstrapBrokerStringTls" in bootstrap_response:
            broker_string = bootstrap_response["BootstrapBrokerStringTls"]
            return broker_string.split(",")

        logger.warning(
            "No bootstrap servers found in MSK response, falling back to configured servers"
        )
    except Exception as e:
        logger.error(f"Error connecting to MSK: {e}")
        logger.warning("Falling back to configured bootstrap servers")

    if settings.KAFKA_BOOTSTRAP_SERVERS:
        return settings.KAFKA_BOOTSTRAP_SERVERS.split(",")
    else:
        logger.error("No bootstrap servers available from MSK or environment")
        return []
  • libraries
aws-msk-iam-sasl-signer-python==1.0.2
kafka-python==2.2.10
python==3.13.1

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      Morty Proxy This is a proxified and sanitized view of the page, visit original site.