Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e081beb

Browse filesBrowse files
authored
Fix: Surface Fatal Stream Errors to Future; Adjust Retryable Error Codes (#1422)
1 parent 272b09f commit e081beb
Copy full SHA for e081beb

File tree

Expand file treeCollapse file tree

2 files changed

+92
-15
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

2 files changed

+92
-15
lines changed
Open diff view settings
Collapse file

‎google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py‎

Copy file name to clipboardExpand all lines: google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
+47-7Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import collections
1818
import functools
19+
import inspect
1920
import itertools
2021
import logging
2122
import threading
@@ -62,14 +63,22 @@
6263
_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
6364
_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
6465
_RETRYABLE_STREAM_ERRORS = (
66+
exceptions.Aborted,
6567
exceptions.DeadlineExceeded,
66-
exceptions.ServiceUnavailable,
68+
exceptions.GatewayTimeout,
6769
exceptions.InternalServerError,
70+
exceptions.ResourceExhausted,
71+
exceptions.ServiceUnavailable,
6872
exceptions.Unknown,
69-
exceptions.GatewayTimeout,
70-
exceptions.Aborted,
7173
)
72-
_TERMINATING_STREAM_ERRORS = (exceptions.Cancelled,)
74+
_TERMINATING_STREAM_ERRORS = (
75+
exceptions.Cancelled,
76+
exceptions.InvalidArgument,
77+
exceptions.NotFound,
78+
exceptions.PermissionDenied,
79+
exceptions.Unauthenticated,
80+
exceptions.Unauthorized,
81+
)
7382
_MAX_LOAD = 1.0
7483
"""The load threshold above which to pause the incoming message stream."""
7584

@@ -98,6 +107,13 @@
98107
code_pb2.UNAVAILABLE,
99108
}
100109

110+
# `on_fatal_exception` was added in `google-api-core v2.25.1``, which allows us to inform
111+
# callers on unrecoverable errors. We can only pass this arg if it's available in the
112+
# `BackgroundConsumer` spec.
113+
_SHOULD_USE_ON_FATAL_ERROR_CALLBACK = "on_fatal_exception" in inspect.getfullargspec(
114+
bidi.BackgroundConsumer
115+
)
116+
101117

102118
def _wrap_as_exception(maybe_exception: Any) -> BaseException:
103119
"""Wrap an object as a Python exception, if needed.
@@ -876,7 +892,18 @@ def open(
876892
assert self._scheduler is not None
877893
scheduler_queue = self._scheduler.queue
878894
self._dispatcher = dispatcher.Dispatcher(self, scheduler_queue)
879-
self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
895+
896+
# `on_fatal_exception` is only available in more recent library versions.
897+
# For backwards compatibility reasons, we only pass it when `google-api-core` supports it.
898+
if _SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
899+
self._consumer = bidi.BackgroundConsumer(
900+
self._rpc,
901+
self._on_response,
902+
on_fatal_exception=self._on_fatal_exception,
903+
)
904+
else:
905+
self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
906+
880907
self._leaser = leaser.Leaser(self)
881908
self._heartbeater = heartbeater.Heartbeater(self)
882909

@@ -1247,6 +1274,17 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
12471274

12481275
self.maybe_pause_consumer()
12491276

1277+
def _on_fatal_exception(self, exception: BaseException) -> None:
1278+
"""
1279+
Called whenever `self.consumer` receives a non-retryable exception.
1280+
We close the manager on such non-retryable cases.
1281+
"""
1282+
_LOGGER.exception(
1283+
"Streaming pull terminating after receiving non-recoverable error: %s",
1284+
exception,
1285+
)
1286+
self.close(exception)
1287+
12501288
def _should_recover(self, exception: BaseException) -> bool:
12511289
"""Determine if an error on the RPC stream should be recovered.
12521290
@@ -1283,8 +1321,10 @@ def _should_terminate(self, exception: BaseException) -> bool:
12831321
in a list of terminating exceptions.
12841322
"""
12851323
exception = _wrap_as_exception(exception)
1286-
if isinstance(exception, _TERMINATING_STREAM_ERRORS):
1287-
_LOGGER.debug("Observed terminating stream error %s", exception)
1324+
is_api_error = isinstance(exception, exceptions.GoogleAPICallError)
1325+
# Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.)
1326+
if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS):
1327+
_LOGGER.error("Observed terminating stream error %s", exception)
12881328
return True
12891329
_LOGGER.debug("Observed non-terminating stream error %s", exception)
12901330
return False
Collapse file

‎tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py‎

Copy file name to clipboardExpand all lines: tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
+45-8Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,7 +1333,13 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
13331333
leaser.return_value.start.assert_called_once()
13341334
assert manager.leaser == leaser.return_value
13351335

1336-
background_consumer.assert_called_once_with(manager._rpc, manager._on_response)
1336+
if streaming_pull_manager._SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
1337+
background_consumer.assert_called_once_with(
1338+
manager._rpc, manager._on_response, manager._on_fatal_exception
1339+
)
1340+
else:
1341+
background_consumer.assert_called_once_with(manager._rpc, manager._on_response)
1342+
13371343
background_consumer.return_value.start.assert_called_once()
13381344
assert manager._consumer == background_consumer.return_value
13391345

@@ -1432,6 +1438,31 @@ def test_close():
14321438
assert manager.is_active is False
14331439

14341440

1441+
def test_closes_on_fatal_consumer_error():
1442+
(
1443+
manager,
1444+
consumer,
1445+
dispatcher,
1446+
leaser,
1447+
heartbeater,
1448+
scheduler,
1449+
) = make_running_manager()
1450+
1451+
if streaming_pull_manager._SHOULD_USE_ON_FATAL_ERROR_CALLBACK:
1452+
error = ValueError("some fatal exception")
1453+
manager._on_fatal_exception(error)
1454+
1455+
await_manager_shutdown(manager, timeout=3)
1456+
1457+
consumer.stop.assert_called_once()
1458+
leaser.stop.assert_called_once()
1459+
dispatcher.stop.assert_called_once()
1460+
heartbeater.stop.assert_called_once()
1461+
scheduler.shutdown.assert_called_once()
1462+
1463+
assert manager.is_active is False
1464+
1465+
14351466
def test_close_inactive_consumer():
14361467
(
14371468
manager,
@@ -2270,18 +2301,24 @@ def test__should_recover_false():
22702301
def test__should_terminate_true():
22712302
manager = make_manager()
22722303

2273-
details = "Cancelled. Go away, before I taunt you a second time."
2274-
exc = exceptions.Cancelled(details)
2275-
2276-
assert manager._should_terminate(exc) is True
2304+
for exc in [
2305+
exceptions.Cancelled(""),
2306+
exceptions.PermissionDenied(""),
2307+
TypeError(),
2308+
ValueError(),
2309+
]:
2310+
assert manager._should_terminate(exc)
22772311

22782312

22792313
def test__should_terminate_false():
22802314
manager = make_manager()
22812315

2282-
exc = TypeError("wahhhhhh")
2283-
2284-
assert manager._should_terminate(exc) is False
2316+
for exc in [
2317+
exceptions.ResourceExhausted(""),
2318+
exceptions.ServiceUnavailable(""),
2319+
exceptions.DeadlineExceeded(""),
2320+
]:
2321+
assert not manager._should_terminate(exc)
22852322

22862323

22872324
@mock.patch("threading.Thread", autospec=True)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.