Closed
Description
We are experiencing persistent SaslAuthenticationFailedError: Access denied errors when our Python application, running in an EKS pod, attempts to connect to an AWS MSK cluster using IAM authentication. This issue occurs despite the IAM user (arn:aws:iam::xxxxxxxx) associated with the credentials we are trying to use having the necessary permissions for MSK, as confirmed by other service which is running in java with the same credentials able to connect to the MSK.
from kafka.sasl.oauth import AbstractTokenProvider
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
from app.core.config import settings
from app.core.logger import logger
class MSKTokenProvider(AbstractTokenProvider):
def token(self):
oauth2_token, expiry_millis = MSKAuthTokenProvider.generate_auth_token(
region=settings.AWS_DEFAULT_REGION,
)
logger.info(
f"MSKTOKENPROVIDER: Token generated successfully. Expires in approx {expiry_millis / 1000 / 60:.1f} minutes."
)
return oauth2_token
class KafkaTransactionEventConsumer:
def __init__(self, loop=None):
self.consumer = None
self.admin_client = None
self.running = False
self._stop_event = threading.Event()
self.consumer_thread = None
self.loop = loop or asyncio.get_event_loop()
self.token_provider = MSKTokenProvider()
async def start(self):
bootstrap_servers = get_bootstrap_servers()
logger.info(f"Bootstrap servers: {bootstrap_servers}") # retrieving the brokers
consumer_config = {
"bootstrap_servers": bootstrap_servers,
"security_protocol": "SASL_SSL",
"sasl_mechanism": "AWS_MSK_IAM",
"client_id": (settings.CLIENT_ID or "default_client_id")
+ "-main-consumer",
"group_id": (settings.CLIENT_ID or "default_client_id")
+ "-consumer-group",
"value_deserializer": lambda m: json.loads(m.decode("utf-8")),
"auto_offset_reset": "earliest",
"consumer_timeout_ms": 1000,
"sasl_oauth_token_provider": self.token_provider,
"api_version": (2, 6, 0),
"api_version_auto_timeout_ms": 10000,
"request_timeout_ms": 30000,
"connections_max_idle_ms": 540000,
"session_timeout_ms": 30000,
"heartbeat_interval_ms": 10000,
"max_poll_records": 100,
"enable_auto_commit": True,
"auto_commit_interval_ms": 5000,
}
logger.info("Creating Kafka consumer with config...")
self.consumer = KafkaConsumer(
settings.KAFKA_TOPIC_TRANSACTION_EVENTS,
**consumer_config,
)
Logs
2025-05-31 11:48:58,379 - INFO - Found credentials in environment variables.
2025-05-31 11:48:58,690 - ERROR - <BrokerConnection client_id=REPORTING_API-admin-client, node_id=bootstrap-2 host=b-3.xxxx.i3df6u.c2.kafka.me-central-1.amazonaws.com:9098 <authenticating> [IPv4 ('10.106.2.49', 9098)]>: SaslAuthenticate error: SaslAuthenticationFailedError ([cc8333d3-37a9-4bb8-9d5a-1b1127502cad]: Access denied)
2025-05-31 11:48:58,690 - ERROR - <BrokerConnection client_id=REPORTING_API-admin-client, node_id=bootstrap-2 host=b-3.xxxx.i3df6u.c2.kafka.me-central-1.amazonaws.com:9098 <authenticating> [IPv4 ('10.106.2.49', 9098)]>: Closing connection. [Error 58] SaslAuthenticationFailedError: [cc8333d3-37a9-4bb8-9d5a-1b1127502cad]: Access denied
2025-05-31 11:49:15,701 - INFO - <BrokerConnection client_id=REPORTING_API-admin-client, node_id=bootstrap-0 host=b-2.xxx.i3df6u.c2.kafka.me-central-1.amazonaws.com:9098 <connecting> [IPv4 ('10.106.0.16', 9098)]>: connecting to b-2.xxx.i3df6u.c2.kafka.me-central-1.amazonaws.com:9098 [('10.xxx.xx.xx', 9098) IPv4]
2025-05-31 11:49:15,727 - INFO - Found credentials in environment variables.
Note
- I'm able to get the brokers with the same credentials and kafka ARN this is the code of retrieving the brokers.
def get_bootstrap_servers() -> list:
logger.info(settings.AWS_KAFKA_ARN)
try:
config = Config(
region_name=settings.AWS_DEFAULT_REGION,
)
client = boto3.client(
"kafka",
region_name=settings.AWS_DEFAULT_REGION,
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
config=config,
)
bootstrap_response = client.get_bootstrap_brokers(
ClusterArn=settings.AWS_KAFKA_ARN
)
logger.info(bootstrap_response)
if "BootstrapBrokerStringSaslIam" in bootstrap_response:
broker_string = bootstrap_response["BootstrapBrokerStringSaslIam"]
return broker_string.split(",")
elif "BootstrapBrokerStringPublicTls" in bootstrap_response:
broker_string = bootstrap_response["BootstrapBrokerStringPublicTls"]
return broker_string.split(",")
elif "BootstrapBrokerString" in bootstrap_response:
broker_string = bootstrap_response["BootstrapBrokerString"]
return broker_string.split(",")
elif "BootstrapBrokerStringTls" in bootstrap_response:
broker_string = bootstrap_response["BootstrapBrokerStringTls"]
return broker_string.split(",")
logger.warning(
"No bootstrap servers found in MSK response, falling back to configured servers"
)
except Exception as e:
logger.error(f"Error connecting to MSK: {e}")
logger.warning("Falling back to configured bootstrap servers")
if settings.KAFKA_BOOTSTRAP_SERVERS:
return settings.KAFKA_BOOTSTRAP_SERVERS.split(",")
else:
logger.error("No bootstrap servers available from MSK or environment")
return []
- libraries
aws-msk-iam-sasl-signer-python==1.0.2
kafka-python==2.2.10
python==3.13.1
Metadata
Metadata
Assignees
Labels
No labels