Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

VALUE_DESERIALIZATION - Connection reset by peer #1334

Copy link
Copy link
Open
@mugx-fc

Description

@mugx-fc
Issue body actions

Hello 👋
I'm responsible for two App Containers consuming from 2 different Kafka topics: one with Avro schema, the other schema-less.
The schema less consumer never crashed, however the one with schema crashes (and auto restarts) periodically every 2 weeks (a bit noisy, also not nice to have an unwanted downtime of 30 secs).

Now, let's assume the crashing Container has a temporary networking issue on connecting to its schema registry, how would the library (in particular SchemaRegistryClient / DeserializingConsumer) behave in this case ? would it be retrying for a little bit, or just crash brutally ?

Consider I'm having the following setup:

schema_registry_client = SchemaRegistryClient(...)
avro_deserializer = AvroDeserializer(schema_registry_client)
string_deserializer = StringDeserializer("utf_8")
consumer = DeserializingConsumer(...)

the exception is being raised is:

Traceback (most recent call last):
  File "/urllib3/connectionpool.py", line 703, in urlopen
    httplib_response = self._make_request(
  File "/urllib3/connectionpool.py", line 449, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/urllib3/connectionpool.py", line 444, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.9/http/client.py", line 1377, in getresponse
    response.begin()
  File "/usr/local/lib/python3.9/http/client.py", line 320, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.9/http/client.py", line 281, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
  File "/usr/local/lib/python3.9/ssl.py", line 1241, in recv_into
    return self.read(nbytes, buffer)
  File "/usr/local/lib/python3.9/ssl.py", line 1099, in read
    return self._sslobj.read(len, buffer)
ConnectionResetError: [Errno 104] Connection reset by peer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/requests/adapters.py", line 440, in send
    resp = conn.urlopen(
  File "/urllib3/connectionpool.py", line 785, in urlopen
    retries = retries.increment(
  File "/urllib3/util/retry.py", line 550, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/urllib3/packages/six.py", line 769, in reraise
    raise value.with_traceback(tb)
  File "/urllib3/connectionpool.py", line 703, in urlopen
    httplib_response = self._make_request(
  File "/urllib3/connectionpool.py", line 449, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/urllib3/connectionpool.py", line 444, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.9/http/client.py", line 1377, in getresponse
    response.begin()
  File "/usr/local/lib/python3.9/http/client.py", line 320, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.9/http/client.py", line 281, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.9/socket.py", line 704, in readinto
    return self._sock.recv_into(b)
  File "/usr/local/lib/python3.9/ssl.py", line 1241, in recv_into
    return self.read(nbytes, buffer)
  File "/usr/local/lib/python3.9/ssl.py", line 1099, in read
    return self._sslobj.read(len, buffer)
urllib3.exceptions.ProtocolError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/confluent_kafka/deserializing_consumer.py", line 137, in poll
    value = self._value_deserializer(value, ctx)
  File "/confluent_kafka/schema_registry/avro.py", line 348, in __call__
    schema = self._registry.get_schema(schema_id)
  File "/confluent_kafka/schema_registry/schema_registry_client.py", line 368, in get_schema
    response = self._rest_client.get('schemas/ids/
{}
'.format(schema_id))
  File "/confluent_kafka/schema_registry/schema_registry_client.py", line 124, in get
    return self.send_request(url, method='GET', query=query)
  File "/confluent_kafka/schema_registry/schema_registry_client.py", line 167, in send_request
    response = self.session.request(
  File "/requests/sessions.py", line 529, in request
    resp = self.send(prep, **send_kwargs)
  File "/requests/sessions.py", line 645, in send
    r = adapter.send(request, **kwargs)
  File "/requests/adapters.py", line 501, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/src/kafka_consumer.py", line 88, in <module>
    kafka_consumer()
  File "/src/kafka_consumer.py", line 62, in kafka_consumer
    raise e
  File "/src/kafka_consumer.py", line 48, in kafka_consumer
    message_raw = consumer.poll(1.0)
  File "/confluent_kafka/deserializing_consumer.py", line 139, in poll
    raise ValueDeserializationError(exception=se, kafka_message=msg)
confluent_kafka.error.ValueDeserializationError: KafkaError{code=_VALUE_DESERIALIZATION,val=-159,str="('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))"}

Thank you for any assistance or suggestions 🙏

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      Morty Proxy This is a proxified and sanitized view of the page, visit original site.