Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit cefce15

Browse filesBrowse files
committed
feat: make QueryJob.done() method more performant
1 parent 3ce826e commit cefce15
Copy full SHA for cefce15

File tree

Expand file treeCollapse file tree

2 files changed

+129
-184
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

2 files changed

+129
-184
lines changed
Open diff view settings
Collapse file

‎google/cloud/bigquery/job/query.py‎

Copy file name to clipboardExpand all lines: google/cloud/bigquery/job/query.py
+35-56Lines changed: 35 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import re
2020

2121
from google.api_core import exceptions
22+
from google.api_core.future import polling as polling_future
2223
import requests
2324

2425
from google.cloud.bigquery.dataset import Dataset
@@ -42,7 +43,6 @@
4243
from google.cloud.bigquery._tqdm_helpers import wait_for_query
4344

4445
from google.cloud.bigquery.job.base import _AsyncJob
45-
from google.cloud.bigquery.job.base import _DONE_STATE
4646
from google.cloud.bigquery.job.base import _JobConfig
4747
from google.cloud.bigquery.job.base import _JobReference
4848

@@ -974,61 +974,6 @@ def estimated_bytes_processed(self):
974974
result = int(result)
975975
return result
976976

977-
def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
978-
"""Refresh the job and checks if it is complete.
979-
980-
Args:
981-
retry (Optional[google.api_core.retry.Retry]):
982-
How to retry the call that retrieves query results. If the job state is
983-
``DONE``, retrying is aborted early, as the job will not change anymore.
984-
timeout (Optional[float]):
985-
The number of seconds to wait for the underlying HTTP transport
986-
before using ``retry``.
987-
reload (Optional[bool]):
988-
If ``True``, make an API call to refresh the job state of
989-
unfinished jobs before checking. Default ``True``.
990-
991-
Returns:
992-
bool: ``True`` if the job is complete or if fetching its status resulted in
993-
an error, ``False`` otherwise.
994-
"""
995-
# Do not refresh if the state is already done, as the job will not
996-
# change once complete.
997-
is_done = self.state == _DONE_STATE
998-
if not reload or is_done:
999-
return is_done
1000-
1001-
# If an explicit timeout is not given, fall back to the transport timeout
1002-
# stored in _blocking_poll() in the process of polling for job completion.
1003-
transport_timeout = timeout if timeout is not None else self._transport_timeout
1004-
1005-
try:
1006-
self._reload_query_results(retry=retry, timeout=transport_timeout)
1007-
except exceptions.GoogleAPIError as exc:
1008-
# Reloading also updates error details on self, thus no need for an
1009-
# explicit self.set_exception() call if reloading succeeds.
1010-
try:
1011-
self.reload(retry=retry, timeout=transport_timeout)
1012-
except exceptions.GoogleAPIError:
1013-
# Use the query results reload exception, as it generally contains
1014-
# much more useful error information.
1015-
self.set_exception(exc)
1016-
return True
1017-
else:
1018-
return self.state == _DONE_STATE
1019-
1020-
# Only reload the job once we know the query is complete.
1021-
# This will ensure that fields such as the destination table are
1022-
# correctly populated.
1023-
if self._query_results.complete:
1024-
try:
1025-
self.reload(retry=retry, timeout=transport_timeout)
1026-
except exceptions.GoogleAPIError as exc:
1027-
self.set_exception(exc)
1028-
return True
1029-
1030-
return self.state == _DONE_STATE
1031-
1032977
def _blocking_poll(self, timeout=None, **kwargs):
1033978
self._done_timeout = timeout
1034979
self._transport_timeout = timeout
@@ -1130,6 +1075,40 @@ def _reload_query_results(self, retry=DEFAULT_RETRY, timeout=None):
11301075
timeout=transport_timeout,
11311076
)
11321077

1078+
def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None):
1079+
"""Check if the query has finished running and raise if it's not.
1080+
1081+
If the query has finished, also reload the job itself.
1082+
"""
1083+
# If an explicit timeout is not given, fall back to the transport timeout
1084+
# stored in _blocking_poll() in the process of polling for job completion.
1085+
transport_timeout = timeout if timeout is not None else self._transport_timeout
1086+
1087+
try:
1088+
self._reload_query_results(retry=retry, timeout=transport_timeout)
1089+
except exceptions.GoogleAPIError as exc:
1090+
# Reloading also updates error details on self, thus no need for an
1091+
# explicit self.set_exception() call if reloading succeeds.
1092+
try:
1093+
self.reload(retry=retry, timeout=transport_timeout)
1094+
except exceptions.GoogleAPIError:
1095+
# Use the query results reload exception, as it generally contains
1096+
# much more useful error information.
1097+
self.set_exception(exc)
1098+
finally:
1099+
return
1100+
1101+
# Only reload the job once we know the query is complete.
1102+
# This will ensure that fields such as the destination table are
1103+
# correctly populated.
1104+
if not self._query_results.complete:
1105+
raise polling_future._OperationNotComplete()
1106+
else:
1107+
try:
1108+
self.reload(retry=retry, timeout=transport_timeout)
1109+
except exceptions.GoogleAPIError as exc:
1110+
self.set_exception(exc)
1111+
11331112
def result(
11341113
self,
11351114
page_size=None,
Collapse file

‎tests/unit/job/test_query.py‎

Copy file name to clipboardExpand all lines: tests/unit/job/test_query.py
+94-128Lines changed: 94 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -309,132 +309,6 @@ def test_cancelled(self):
309309

310310
self.assertTrue(job.cancelled())
311311

312-
def test_done_job_complete(self):
313-
client = _make_client(project=self.PROJECT)
314-
resource = self._make_resource(ended=True)
315-
job = self._get_target_class().from_api_repr(resource, client)
316-
job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
317-
{"jobComplete": True, "jobReference": resource["jobReference"]}
318-
)
319-
self.assertTrue(job.done())
320-
321-
def test_done_w_timeout(self):
322-
client = _make_client(project=self.PROJECT)
323-
resource = self._make_resource(ended=False)
324-
job = self._get_target_class().from_api_repr(resource, client)
325-
326-
with mock.patch.object(
327-
client, "_get_query_results"
328-
) as fake_get_results, mock.patch.object(job, "reload") as fake_reload:
329-
job.done(timeout=42)
330-
331-
fake_get_results.assert_called_once()
332-
call_args = fake_get_results.call_args
333-
self.assertEqual(call_args.kwargs.get("timeout"), 42)
334-
335-
call_args = fake_reload.call_args
336-
self.assertEqual(call_args.kwargs.get("timeout"), 42)
337-
338-
def test_done_w_timeout_and_longer_internal_api_timeout(self):
339-
client = _make_client(project=self.PROJECT)
340-
resource = self._make_resource(ended=False)
341-
job = self._get_target_class().from_api_repr(resource, client)
342-
job._done_timeout = 8.8
343-
344-
with mock.patch.object(
345-
client, "_get_query_results"
346-
) as fake_get_results, mock.patch.object(job, "reload") as fake_reload:
347-
job.done(timeout=5.5)
348-
349-
# The expected timeout used is simply the given timeout, as the latter
350-
# is shorter than the job's internal done timeout.
351-
expected_timeout = 5.5
352-
353-
fake_get_results.assert_called_once()
354-
call_args = fake_get_results.call_args
355-
self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout)
356-
357-
call_args = fake_reload.call_args
358-
self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout)
359-
360-
def test_done_w_query_results_error_reload_ok_job_finished(self):
361-
client = _make_client(project=self.PROJECT)
362-
bad_request_error = exceptions.BadRequest("Error in query")
363-
client._get_query_results = mock.Mock(side_effect=bad_request_error)
364-
365-
resource = self._make_resource(ended=False)
366-
job = self._get_target_class().from_api_repr(resource, client)
367-
job._exception = None
368-
369-
def fake_reload(self, *args, **kwargs):
370-
self._properties["status"]["state"] = "DONE"
371-
self.set_exception(copy.copy(bad_request_error))
372-
373-
fake_reload_method = types.MethodType(fake_reload, job)
374-
375-
with mock.patch.object(job, "reload", new=fake_reload_method):
376-
is_done = job.done()
377-
378-
assert is_done
379-
assert isinstance(job._exception, exceptions.BadRequest)
380-
381-
def test_done_w_query_results_error_reload_ok_job_still_running(self):
382-
client = _make_client(project=self.PROJECT)
383-
retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError)
384-
client._get_query_results = mock.Mock(side_effect=retry_error)
385-
386-
resource = self._make_resource(ended=False)
387-
job = self._get_target_class().from_api_repr(resource, client)
388-
job._exception = None
389-
390-
def fake_reload(self, *args, **kwargs):
391-
self._properties["status"]["state"] = "RUNNING"
392-
393-
fake_reload_method = types.MethodType(fake_reload, job)
394-
395-
with mock.patch.object(job, "reload", new=fake_reload_method):
396-
is_done = job.done()
397-
398-
assert not is_done
399-
assert job._exception is None
400-
401-
def test_done_w_query_results_error_reload_error(self):
402-
client = _make_client(project=self.PROJECT)
403-
bad_request_error = exceptions.BadRequest("Error in query")
404-
client._get_query_results = mock.Mock(side_effect=bad_request_error)
405-
406-
resource = self._make_resource(ended=False)
407-
job = self._get_target_class().from_api_repr(resource, client)
408-
reload_error = exceptions.DataLoss("Oops, sorry!")
409-
job.reload = mock.Mock(side_effect=reload_error)
410-
job._exception = None
411-
412-
is_done = job.done()
413-
414-
assert is_done
415-
assert job._exception is bad_request_error
416-
417-
def test_done_w_job_query_results_ok_reload_error(self):
418-
client = _make_client(project=self.PROJECT)
419-
query_results = google.cloud.bigquery.query._QueryResults(
420-
properties={
421-
"jobComplete": True,
422-
"jobReference": {"projectId": self.PROJECT, "jobId": "12345"},
423-
}
424-
)
425-
client._get_query_results = mock.Mock(return_value=query_results)
426-
427-
resource = self._make_resource(ended=False)
428-
job = self._get_target_class().from_api_repr(resource, client)
429-
retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError)
430-
job.reload = mock.Mock(side_effect=retry_error)
431-
job._exception = None
432-
433-
is_done = job.done()
434-
435-
assert is_done
436-
assert job._exception is retry_error
437-
438312
def test_query_plan(self):
439313
from google.cloud._helpers import _RFC3339_MICROS
440314
from google.cloud.bigquery.job import QueryPlanEntry
@@ -1905,8 +1779,6 @@ def test_reload_w_timeout(self):
19051779
)
19061780

19071781
def test_iter(self):
1908-
import types
1909-
19101782
begun_resource = self._make_resource()
19111783
query_resource = {
19121784
"jobComplete": True,
@@ -1921,3 +1793,97 @@ def test_iter(self):
19211793
job = self._make_one(self.JOB_ID, self.QUERY, client)
19221794

19231795
self.assertIsInstance(iter(job), types.GeneratorType)
1796+
1797+
def test__done_or_raise_w_timeout(self):
1798+
client = _make_client(project=self.PROJECT)
1799+
resource = self._make_resource(ended=False)
1800+
job = self._get_target_class().from_api_repr(resource, client)
1801+
1802+
with mock.patch.object(
1803+
client, "_get_query_results"
1804+
) as fake_get_results, mock.patch.object(job, "reload") as fake_reload:
1805+
job._done_or_raise(timeout=42)
1806+
1807+
fake_get_results.assert_called_once()
1808+
call_args = fake_get_results.call_args
1809+
self.assertEqual(call_args.kwargs.get("timeout"), 42)
1810+
1811+
call_args = fake_reload.call_args
1812+
self.assertEqual(call_args.kwargs.get("timeout"), 42)
1813+
1814+
def test__done_or_raise_w_timeout_and_longer_internal_api_timeout(self):
1815+
client = _make_client(project=self.PROJECT)
1816+
resource = self._make_resource(ended=False)
1817+
job = self._get_target_class().from_api_repr(resource, client)
1818+
job._done_timeout = 8.8
1819+
1820+
with mock.patch.object(
1821+
client, "_get_query_results"
1822+
) as fake_get_results, mock.patch.object(job, "reload") as fake_reload:
1823+
job._done_or_raise(timeout=5.5)
1824+
1825+
# The expected timeout used is simply the given timeout, as the latter
1826+
# is shorter than the job's internal done timeout.
1827+
expected_timeout = 5.5
1828+
1829+
fake_get_results.assert_called_once()
1830+
call_args = fake_get_results.call_args
1831+
self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout)
1832+
1833+
call_args = fake_reload.call_args
1834+
self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout)
1835+
1836+
def test__done_or_raise_w_query_results_error_reload_ok(self):
1837+
client = _make_client(project=self.PROJECT)
1838+
bad_request_error = exceptions.BadRequest("Error in query")
1839+
client._get_query_results = mock.Mock(side_effect=bad_request_error)
1840+
1841+
resource = self._make_resource(ended=False)
1842+
job = self._get_target_class().from_api_repr(resource, client)
1843+
job._exception = None
1844+
1845+
def fake_reload(self, *args, **kwargs):
1846+
self._properties["status"]["state"] = "DONE"
1847+
self.set_exception(copy.copy(bad_request_error))
1848+
1849+
fake_reload_method = types.MethodType(fake_reload, job)
1850+
1851+
with mock.patch.object(job, "reload", new=fake_reload_method):
1852+
job._done_or_raise()
1853+
1854+
assert isinstance(job._exception, exceptions.BadRequest)
1855+
1856+
def test__done_or_raise_w_query_results_error_reload_error(self):
1857+
client = _make_client(project=self.PROJECT)
1858+
bad_request_error = exceptions.BadRequest("Error in query")
1859+
client._get_query_results = mock.Mock(side_effect=bad_request_error)
1860+
1861+
resource = self._make_resource(ended=False)
1862+
job = self._get_target_class().from_api_repr(resource, client)
1863+
reload_error = exceptions.DataLoss("Oops, sorry!")
1864+
job.reload = mock.Mock(side_effect=reload_error)
1865+
job._exception = None
1866+
1867+
job._done_or_raise()
1868+
1869+
assert job._exception is bad_request_error
1870+
1871+
def test__done_or_raise_w_job_query_results_ok_reload_error(self):
1872+
client = _make_client(project=self.PROJECT)
1873+
query_results = google.cloud.bigquery.query._QueryResults(
1874+
properties={
1875+
"jobComplete": True,
1876+
"jobReference": {"projectId": self.PROJECT, "jobId": "12345"},
1877+
}
1878+
)
1879+
client._get_query_results = mock.Mock(return_value=query_results)
1880+
1881+
resource = self._make_resource(ended=False)
1882+
job = self._get_target_class().from_api_repr(resource, client)
1883+
retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError)
1884+
job.reload = mock.Mock(side_effect=retry_error)
1885+
job._exception = None
1886+
1887+
job._done_or_raise()
1888+
1889+
assert job._exception is retry_error

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.