Description
I am using latest version 2.2.7. I have below getting below error
Exception in thread Thread-6 (process_data): Traceback (most recent call last): File "/usr/local/lib/python3.13/threading.py", line 1041, in _bootstrap_inner self.run() ~~~~~~~~^^ File "/usr/local/lib/python3.13/threading.py", line 992, in run self._target(*self._args, **self._kwargs) ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.13/site-packages/kconnector/processor.py", line 51, in process_data kafka_producer.send(topic, key=json.dumps(source), value=datum) ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.13/site-packages/kafka/producer/kafka.py", line 844, in send self._wait_on_metadata(topic, timer.timeout_ms) ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.13/site-packages/kafka/producer/kafka.py", line 987, in _wait_on_metadata raise Errors.KafkaTimeoutError( "Failed to update metadata after %.1f secs." % (max_wait_ms * 1000,)) kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 49999000.0 secs.
I am using below setting
KafkaProducer( acks='all', retries=retries, linger_ms=linger_ms, batch_size=batch_size, key_serializer=str.encode, value_serializer=msgpack.dumps, bootstrap_servers=[bootstrap_server], sasl_mechanism=sasl_mechanism, security_protocol=sasl_security_protocol, sasl_plain_username=sasl_plain_username, sasl_plain_password=sasl_plain_password, reconnect_backoff_max_ms=reconnection_period, api_version=(2, 8, 1) )
the config getting set is
{'bootstrap_servers': [OCI-xx:9092'], 'client_id': 'kafka-python-producer-2', 'key_serializer': <method 'encode' of 'str' objects>, 'value_serializer': <function packb at 0x7353f96e2d40>, 'enable_idempotence': False, 'transactional_id': None, 'transaction_timeout_ms': 60000, 'delivery_timeout_ms': 120000, 'acks': 1, 'bootstrap_topics_filter': set(), 'compression_type': None, 'retries': inf, 'batch_size': 16384, 'linger_ms': 0, 'partitioner': <kafka.partitioner.default.DefaultPartitioner object at 0x7353f9839a90>, 'connections_max_idle_ms': 540000, 'max_block_ms': 60000, 'max_request_size': 1048576, 'allow_auto_create_topics': True, 'metadata_max_age_ms': 300000, 'retry_backoff_ms': 100, 'request_timeout_ms': 30000, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(6, 1, 1)], 'sock_chunk_bytes': 4096, 'sock_chunk_buffer_count': 1000, 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 30000, 'max_in_flight_requests_per_connection': 5, 'security_protocol': 'SASL_SSL', 'ssl_context': None, 'ssl_check_hostname': True, 'ssl_cafile': None, 'ssl_certfile': None, 'ssl_keyfile': None, 'ssl_crlfile': None, 'ssl_password': None, 'ssl_ciphers': None, 'api_version': (2, 8), 'api_version_auto_timeout_ms': 2000, 'metric_reporters': [], 'metrics_enabled': True, 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, 'selector': <class 'selectors.EpollSelector'>, 'sasl_mechanism': 'PLAIN', 'sasl_plain_username': 'xxx', 'sasl_plain_password': 'xx:', 'sasl_kerberos_name': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, 'socks5_proxy': None, 'kafka_client': <class 'kafka.client_async.KafkaClient'>, 'compression_attrs': 0}
here there are two issue I am mentioning is
- 49999000.0 secs These may second I did not set
- KafkaTimeoutError
I have digged into the code and got thatfuture.add_both(lambda e, *args: e.set(), metadata_event)
this is not getting set
Can you check why is this error ?