Open
Description
Hello 👋
I'm responsible for two App Containers consuming from 2 different Kafka topics: one with Avro schema, the other schema-less.
The schema less consumer never crashed, however the one with schema crashes (and auto restarts) periodically every 2 weeks (a bit noisy, also not nice to have an unwanted downtime of 30 secs).
Now, let's assume the crashing Container has a temporary networking issue on connecting to its schema registry, how would the library (in particular SchemaRegistryClient / DeserializingConsumer
) behave in this case ? would it be retrying for a little bit, or just crash brutally ?
Consider I'm having the following setup:
schema_registry_client = SchemaRegistryClient(...)
avro_deserializer = AvroDeserializer(schema_registry_client)
string_deserializer = StringDeserializer("utf_8")
consumer = DeserializingConsumer(...)
the exception is being raised is:
Traceback (most recent call last):
File "/urllib3/connectionpool.py", line 703, in urlopen
httplib_response = self._make_request(
File "/urllib3/connectionpool.py", line 449, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/urllib3/connectionpool.py", line 444, in _make_request
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.9/http/client.py", line 1377, in getresponse
response.begin()
File "/usr/local/lib/python3.9/http/client.py", line 320, in begin
version, status, reason = self._read_status()
File "/usr/local/lib/python3.9/http/client.py", line 281, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/lib/python3.9/socket.py", line 704, in readinto
return self._sock.recv_into(b)
File "/usr/local/lib/python3.9/ssl.py", line 1241, in recv_into
return self.read(nbytes, buffer)
File "/usr/local/lib/python3.9/ssl.py", line 1099, in read
return self._sslobj.read(len, buffer)
ConnectionResetError: [Errno 104] Connection reset by peer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/requests/adapters.py", line 440, in send
resp = conn.urlopen(
File "/urllib3/connectionpool.py", line 785, in urlopen
retries = retries.increment(
File "/urllib3/util/retry.py", line 550, in increment
raise six.reraise(type(error), error, _stacktrace)
File "/urllib3/packages/six.py", line 769, in reraise
raise value.with_traceback(tb)
File "/urllib3/connectionpool.py", line 703, in urlopen
httplib_response = self._make_request(
File "/urllib3/connectionpool.py", line 449, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/urllib3/connectionpool.py", line 444, in _make_request
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.9/http/client.py", line 1377, in getresponse
response.begin()
File "/usr/local/lib/python3.9/http/client.py", line 320, in begin
version, status, reason = self._read_status()
File "/usr/local/lib/python3.9/http/client.py", line 281, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/lib/python3.9/socket.py", line 704, in readinto
return self._sock.recv_into(b)
File "/usr/local/lib/python3.9/ssl.py", line 1241, in recv_into
return self.read(nbytes, buffer)
File "/usr/local/lib/python3.9/ssl.py", line 1099, in read
return self._sslobj.read(len, buffer)
urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/confluent_kafka/deserializing_consumer.py", line 137, in poll
value = self._value_deserializer(value, ctx)
File "/confluent_kafka/schema_registry/avro.py", line 348, in __call__
schema = self._registry.get_schema(schema_id)
File "/confluent_kafka/schema_registry/schema_registry_client.py", line 368, in get_schema
response = self._rest_client.get('schemas/ids/
{}
'.format(schema_id))
File "/confluent_kafka/schema_registry/schema_registry_client.py", line 124, in get
return self.send_request(url, method='GET', query=query)
File "/confluent_kafka/schema_registry/schema_registry_client.py", line 167, in send_request
response = self.session.request(
File "/requests/sessions.py", line 529, in request
resp = self.send(prep, **send_kwargs)
File "/requests/sessions.py", line 645, in send
r = adapter.send(request, **kwargs)
File "/requests/adapters.py", line 501, in send
raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/src/kafka_consumer.py", line 88, in <module>
kafka_consumer()
File "/src/kafka_consumer.py", line 62, in kafka_consumer
raise e
File "/src/kafka_consumer.py", line 48, in kafka_consumer
message_raw = consumer.poll(1.0)
File "/confluent_kafka/deserializing_consumer.py", line 139, in poll
raise ValueDeserializationError(exception=se, kafka_message=msg)
confluent_kafka.error.ValueDeserializationError: KafkaError{code=_VALUE_DESERIALIZATION,val=-159,str="('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))"}
Thank you for any assistance or suggestions 🙏