Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 14263ee

Browse filesBrowse files
committed
Merge pull request #1493 from dhermes/bigtable-complete-partial-rows-data
Implementing consume_*() methods on Bigtable PartialRowsData.
2 parents 0891906 + 3d98eb7 commit 14263ee
Copy full SHA for 14263ee

File tree

Expand file treeCollapse file tree

2 files changed

+139
-0
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+139
-0
lines changed

‎gcloud/bigtable/row_data.py

Copy file name to clipboardExpand all lines: gcloud/bigtable/row_data.py
+42Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,3 +279,45 @@ def rows(self):
279279
# NOTE: To avoid duplicating large objects, this is just the
280280
# mutable private data.
281281
return self._rows
282+
283+
def cancel(self):
284+
"""Cancels the iterator, closing the stream."""
285+
self._response_iterator.cancel()
286+
287+
def consume_next(self):
288+
"""Consumes the next ``ReadRowsResponse`` from the stream.
289+
290+
Parses the response and stores it as a :class:`PartialRowData`
291+
in a dictionary owned by this object.
292+
293+
:raises: :class:`StopIteration <exceptions.StopIteration>` if the
294+
response iterator has no more responses to stream.
295+
"""
296+
read_rows_response = self._response_iterator.next()
297+
row_key = read_rows_response.row_key
298+
partial_row = self._rows.get(row_key)
299+
if partial_row is None:
300+
partial_row = self._rows[row_key] = PartialRowData(row_key)
301+
# NOTE: This is not atomic in the case of failures.
302+
partial_row.update_from_read_rows(read_rows_response)
303+
304+
def consume_all(self, max_loops=None):
305+
"""Consume the streamed responses until there are no more.
306+
307+
This simply calls :meth:`consume_next` until there are no
308+
more to consume.
309+
310+
:type max_loops: int
311+
:param max_loops: (Optional) Maximum number of times to try to consume
312+
an additional ``ReadRowsResponse``. You can use this
313+
to avoid long wait times.
314+
"""
315+
curr_loop = 0
316+
if max_loops is None:
317+
max_loops = float('inf')
318+
while curr_loop < max_loops:
319+
curr_loop += 1
320+
try:
321+
self.consume_next()
322+
except StopIteration:
323+
break

‎gcloud/bigtable/test_row_data.py

Copy file name to clipboardExpand all lines: gcloud/bigtable/test_row_data.py
+97Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,22 @@ def _getTargetClass(self):
386386
from gcloud.bigtable.row_data import PartialRowsData
387387
return PartialRowsData
388388

389+
def _getDoNothingClass(self):
390+
klass = self._getTargetClass()
391+
392+
class FakePartialRowsData(klass):
393+
394+
def __init__(self, *args, **kwargs):
395+
super(FakePartialRowsData, self).__init__(*args, **kwargs)
396+
self._consumed = []
397+
398+
def consume_next(self):
399+
value = self._response_iterator.next()
400+
self._consumed.append(value)
401+
return value
402+
403+
return FakePartialRowsData
404+
389405
def _makeOne(self, *args, **kwargs):
390406
return self._getTargetClass()(*args, **kwargs)
391407

@@ -425,3 +441,84 @@ def test_rows_getter(self):
425441
partial_rows_data = self._makeOne(None)
426442
partial_rows_data._rows = value = object()
427443
self.assertTrue(partial_rows_data.rows is value)
444+
445+
def test_cancel(self):
446+
response_iterator = _MockCancellableIterator()
447+
partial_rows_data = self._makeOne(response_iterator)
448+
self.assertEqual(response_iterator.cancel_calls, 0)
449+
partial_rows_data.cancel()
450+
self.assertEqual(response_iterator.cancel_calls, 1)
451+
452+
def test_consume_next(self):
453+
from gcloud.bigtable._generated import (
454+
bigtable_service_messages_pb2 as messages_pb2)
455+
from gcloud.bigtable.row_data import PartialRowData
456+
457+
row_key = b'row-key'
458+
value_pb = messages_pb2.ReadRowsResponse(row_key=row_key)
459+
response_iterator = _MockCancellableIterator(value_pb)
460+
partial_rows_data = self._makeOne(response_iterator)
461+
self.assertEqual(partial_rows_data.rows, {})
462+
partial_rows_data.consume_next()
463+
expected_rows = {row_key: PartialRowData(row_key)}
464+
self.assertEqual(partial_rows_data.rows, expected_rows)
465+
466+
def test_consume_next_row_exists(self):
467+
from gcloud.bigtable._generated import (
468+
bigtable_service_messages_pb2 as messages_pb2)
469+
from gcloud.bigtable.row_data import PartialRowData
470+
471+
row_key = b'row-key'
472+
chunk = messages_pb2.ReadRowsResponse.Chunk(commit_row=True)
473+
value_pb = messages_pb2.ReadRowsResponse(row_key=row_key,
474+
chunks=[chunk])
475+
response_iterator = _MockCancellableIterator(value_pb)
476+
partial_rows_data = self._makeOne(response_iterator)
477+
existing_values = PartialRowData(row_key)
478+
partial_rows_data._rows[row_key] = existing_values
479+
self.assertFalse(existing_values.committed)
480+
partial_rows_data.consume_next()
481+
self.assertTrue(existing_values.committed)
482+
self.assertEqual(existing_values.cells, {})
483+
484+
def test_consume_next_empty_iter(self):
485+
response_iterator = _MockCancellableIterator()
486+
partial_rows_data = self._makeOne(response_iterator)
487+
with self.assertRaises(StopIteration):
488+
partial_rows_data.consume_next()
489+
490+
def test_consume_all(self):
491+
klass = self._getDoNothingClass()
492+
493+
value1, value2, value3 = object(), object(), object()
494+
response_iterator = _MockCancellableIterator(value1, value2, value3)
495+
partial_rows_data = klass(response_iterator)
496+
self.assertEqual(partial_rows_data._consumed, [])
497+
partial_rows_data.consume_all()
498+
self.assertEqual(partial_rows_data._consumed, [value1, value2, value3])
499+
500+
def test_consume_all_with_max_loops(self):
501+
klass = self._getDoNothingClass()
502+
503+
value1, value2, value3 = object(), object(), object()
504+
response_iterator = _MockCancellableIterator(value1, value2, value3)
505+
partial_rows_data = klass(response_iterator)
506+
self.assertEqual(partial_rows_data._consumed, [])
507+
partial_rows_data.consume_all(max_loops=1)
508+
self.assertEqual(partial_rows_data._consumed, [value1])
509+
# Make sure the iterator still has the remaining values.
510+
self.assertEqual(list(response_iterator.iter_values), [value2, value3])
511+
512+
513+
class _MockCancellableIterator(object):
514+
515+
cancel_calls = 0
516+
517+
def __init__(self, *values):
518+
self.iter_values = iter(values)
519+
520+
def cancel(self):
521+
self.cancel_calls += 1
522+
523+
def next(self):
524+
return next(self.iter_values)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.