Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d771742

Browse filesBrowse files
fix: improve pipeline create_from (#1158)
Improves Query to Pipeline logic: - add support for `cursor` and `limit_to_last` - apply fewer `exists` clauses, so pipelines give the same results as matching RunQuery statements
1 parent cdec6a4 commit d771742
Copy full SHA for d771742

7 files changed

+387-63Lines changed: 387 additions & 63 deletions

File tree

Expand file treeCollapse file tree
Open diff view settings
Filter options
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎packages/google-cloud-firestore/google/cloud/firestore_v1/base_aggregation.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-firestore/google/cloud/firestore_v1/base_aggregation.py
-4Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -361,12 +361,8 @@ def _build_pipeline(self, source: "PipelineSource"):
361361
"""
362362
Convert this query into a Pipeline
363363
364-
Queries containing a `cursor` or `limit_to_last` are not currently supported
365-
366364
Args:
367365
source: the PipelineSource to build the pipeline off of
368-
Raises:
369-
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
370366
Returns:
371367
a Pipeline representing the query
372368
"""
Collapse file

‎packages/google-cloud-firestore/google/cloud/firestore_v1/base_collection.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-firestore/google/cloud/firestore_v1/base_collection.py
-4Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -608,12 +608,8 @@ def _build_pipeline(self, source: "PipelineSource"):
608608
"""
609609
Convert this query into a Pipeline
610610
611-
Queries containing a `cursor` or `limit_to_last` are not currently supported
612-
613611
Args:
614612
source: the PipelineSource to build the pipeline off o
615-
Raises:
616-
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
617613
Returns:
618614
a Pipeline representing the query
619615
"""
Collapse file

‎packages/google-cloud-firestore/google/cloud/firestore_v1/base_query.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-firestore/google/cloud/firestore_v1/base_query.py
+136-33Lines changed: 136 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,12 +1134,8 @@ def _build_pipeline(self, source: "PipelineSource"):
11341134
"""
11351135
Convert this query into a Pipeline
11361136
1137-
Queries containing a `cursor` or `limit_to_last` are not currently supported
1138-
11391137
Args:
11401138
source: the PipelineSource to build the pipeline off of
1141-
Raises:
1142-
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
11431139
Returns:
11441140
a Pipeline representing the query
11451141
"""
@@ -1161,39 +1157,61 @@ def _build_pipeline(self, source: "PipelineSource"):
11611157
ppl = ppl.select(*[field.field_path for field in self._projection.fields])
11621158

11631159
# Orders
1164-
orders = self._normalize_orders()
1165-
if orders:
1166-
exists = []
1167-
orderings = []
1168-
for order in orders:
1169-
field = pipeline_expressions.Field.of(order.field.field_path)
1170-
exists.append(field.exists())
1171-
direction = (
1172-
"ascending"
1173-
if order.direction == StructuredQuery.Direction.ASCENDING
1174-
else "descending"
1175-
)
1176-
orderings.append(pipeline_expressions.Ordering(field, direction))
11771160

1178-
# Add exists filters to match Query's implicit orderby semantics.
1179-
if len(exists) == 1:
1180-
ppl = ppl.where(exists[0])
1181-
else:
1182-
ppl = ppl.where(pipeline_expressions.And(*exists))
1161+
# "explicit_orders" are only those explicitly added by the user via order_by().
1162+
# We only generate existence filters for these fields.
1163+
if self._orders:
1164+
exists = [
1165+
pipeline_expressions.Field.of(o.field.field_path).exists()
1166+
for o in self._orders
1167+
]
1168+
ppl = ppl.where(
1169+
pipeline_expressions.And(*exists) if len(exists) > 1 else exists[0]
1170+
)
1171+
1172+
# "normalized_orders" includes both user-specified orders and implicit orders
1173+
# (e.g. __name__ or inequality fields) required by Firestore semantics.
1174+
normalized_orders = self._normalize_orders()
1175+
orderings = [
1176+
pipeline_expressions.Ordering(
1177+
pipeline_expressions.Field.of(o.field.field_path),
1178+
"ascending"
1179+
if o.direction == StructuredQuery.Direction.ASCENDING
1180+
else "descending",
1181+
)
1182+
for o in normalized_orders
1183+
]
1184+
1185+
# Apply cursors as filters.
1186+
if orderings:
1187+
for cursor, is_start in [(self._start_at, True), (self._end_at, False)]:
1188+
cursor = self._normalize_cursor(cursor, normalized_orders)
1189+
if cursor:
1190+
ppl = ppl.where(
1191+
_where_conditions_from_cursor(cursor, orderings, is_start)
1192+
)
1193+
1194+
# Handle sort and limit, including limit_to_last semantics.
1195+
is_limit_to_last = self._limit_to_last and bool(orderings)
11831196

1184-
# Add sort orderings
1197+
if is_limit_to_last:
1198+
# If limit_to_last is set, we need to reverse the orderings to find the
1199+
# "last" N documents (which effectively become the "first" N in reverse order).
1200+
ppl = ppl.sort(*_reverse_orderings(orderings))
1201+
elif orderings:
11851202
ppl = ppl.sort(*orderings)
11861203

1187-
# Cursors, Limit and Offset
1188-
if self._start_at or self._end_at or self._limit_to_last:
1189-
raise NotImplementedError(
1190-
"Query to Pipeline conversion: cursors and limit_to_last is not supported yet."
1191-
)
1192-
else: # Limit & Offset without cursors
1193-
if self._offset:
1194-
ppl = ppl.offset(self._offset)
1195-
if self._limit:
1196-
ppl = ppl.limit(self._limit)
1204+
if self._limit is not None and (not self._limit_to_last or orderings):
1205+
ppl = ppl.limit(self._limit)
1206+
1207+
if is_limit_to_last:
1208+
# If we reversed the orderings for limit_to_last, we must now re-sort
1209+
# using the original orderings to return the results in the user-requested order.
1210+
ppl = ppl.sort(*orderings)
1211+
1212+
# Offset
1213+
if self._offset:
1214+
ppl = ppl.offset(self._offset)
11971215

11981216
return ppl
11991217

@@ -1366,6 +1384,91 @@ def _cursor_pb(cursor_pair: Optional[Tuple[list, bool]]) -> Optional[Cursor]:
13661384
return None
13671385

13681386

1387+
def _get_cursor_exclusive_condition(
1388+
is_start_cursor: bool,
1389+
ordering: pipeline_expressions.Ordering,
1390+
value: pipeline_expressions.Constant,
1391+
) -> pipeline_expressions.BooleanExpression:
1392+
"""
1393+
Helper to determine the correct comparison operator (greater_than or less_than)
1394+
based on the cursor type (start/end) and the sort direction (ascending/descending).
1395+
"""
1396+
field = ordering.expr
1397+
if (
1398+
is_start_cursor
1399+
and ordering.order_dir == pipeline_expressions.Ordering.Direction.ASCENDING
1400+
) or (
1401+
not is_start_cursor
1402+
and ordering.order_dir == pipeline_expressions.Ordering.Direction.DESCENDING
1403+
):
1404+
return field.greater_than(value)
1405+
else:
1406+
return field.less_than(value)
1407+
1408+
1409+
def _where_conditions_from_cursor(
1410+
cursor: Tuple[List, bool],
1411+
orderings: List[pipeline_expressions.Ordering],
1412+
is_start_cursor: bool,
1413+
) -> pipeline_expressions.BooleanExpression:
1414+
"""
1415+
Converts a cursor into a filter condition for the pipeline.
1416+
1417+
Args:
1418+
cursor: The cursor values and the 'before' flag.
1419+
orderings: The list of ordering expressions used in the query.
1420+
is_start_cursor: True if this is a start_at/start_after cursor, False if it is an end_at/end_before cursor.
1421+
Returns:
1422+
A BooleanExpression representing the cursor condition.
1423+
"""
1424+
cursor_values, before = cursor
1425+
size = len(cursor_values)
1426+
1427+
ordering = orderings[size - 1]
1428+
field = ordering.expr
1429+
value = pipeline_expressions.Constant(cursor_values[size - 1])
1430+
1431+
# Add condition for last bound
1432+
condition = _get_cursor_exclusive_condition(is_start_cursor, ordering, value)
1433+
1434+
if (is_start_cursor and before) or (not is_start_cursor and not before):
1435+
# When the cursor bound is inclusive, then the last bound
1436+
# can be equal to the value, otherwise it's not equal
1437+
condition = pipeline_expressions.Or(condition, field.equal(value))
1438+
1439+
# Iterate backwards over the remaining bounds, adding a condition for each one
1440+
for i in range(size - 2, -1, -1):
1441+
ordering = orderings[i]
1442+
field = ordering.expr
1443+
value = pipeline_expressions.Constant(cursor_values[i])
1444+
1445+
# For each field in the orderings, the condition is either
1446+
# a) lessThan|greaterThan the cursor value,
1447+
# b) or equal the cursor value and lessThan|greaterThan the cursor values for other fields
1448+
exclusive_condition = _get_cursor_exclusive_condition(
1449+
is_start_cursor, ordering, value
1450+
)
1451+
condition = pipeline_expressions.Or(
1452+
exclusive_condition,
1453+
pipeline_expressions.And(field.equal(value), condition),
1454+
)
1455+
1456+
return condition
1457+
1458+
1459+
def _reverse_orderings(
1460+
orderings: List[pipeline_expressions.Ordering],
1461+
) -> List[pipeline_expressions.Ordering]:
1462+
reversed_orderings = []
1463+
for o in orderings:
1464+
if o.order_dir == pipeline_expressions.Ordering.Direction.ASCENDING:
1465+
new_dir = "descending"
1466+
else:
1467+
new_dir = "ascending"
1468+
reversed_orderings.append(pipeline_expressions.Ordering(o.expr, new_dir))
1469+
return reversed_orderings
1470+
1471+
13691472
def _query_response_to_snapshot(
13701473
response_pb: RunQueryResponse, collection, expected_prefix: str
13711474
) -> Optional[document.DocumentSnapshot]:
Collapse file

‎packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py
+8-2Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1833,15 +1833,21 @@ def _from_query_filter_pb(filter_pb, client):
18331833
elif filter_pb.op == Query_pb.FieldFilter.Operator.EQUAL:
18341834
return And(field.exists(), field.equal(value))
18351835
elif filter_pb.op == Query_pb.FieldFilter.Operator.NOT_EQUAL:
1836-
return And(field.exists(), field.not_equal(value))
1836+
# In Enterprise DBs NOT_EQUAL will match a field that does not exist,
1837+
# therefore we do not want an existence filter for the NOT_EQUAL conversion
1838+
# so the Query and Pipeline behavior are consistent in Enterprise.
1839+
return field.not_equal(value)
18371840
if filter_pb.op == Query_pb.FieldFilter.Operator.ARRAY_CONTAINS:
18381841
return And(field.exists(), field.array_contains(value))
18391842
elif filter_pb.op == Query_pb.FieldFilter.Operator.ARRAY_CONTAINS_ANY:
18401843
return And(field.exists(), field.array_contains_any(value))
18411844
elif filter_pb.op == Query_pb.FieldFilter.Operator.IN:
18421845
return And(field.exists(), field.equal_any(value))
18431846
elif filter_pb.op == Query_pb.FieldFilter.Operator.NOT_IN:
1844-
return And(field.exists(), field.not_equal_any(value))
1847+
# In Enterprise DBs NOT_IN will match a field that does not exist,
1848+
# therefore we do not want an existence filter for the NOT_IN conversion
1849+
# so the Query and Pipeline behavior are consistent in Enterprise.
1850+
return field.not_equal_any(value)
18451851
else:
18461852
raise TypeError(f"Unexpected FieldFilter operator type: {filter_pb.op}")
18471853
elif isinstance(filter_pb, Query_pb.Filter):
Collapse file

‎packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py‎

Copy file name to clipboardExpand all lines: packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py
-4Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,8 @@ def create_from(
5757
"""
5858
Create a pipeline from an existing query
5959
60-
Queries containing a `cursor` or `limit_to_last` are not currently supported
61-
6260
Args:
6361
query: the query to build the pipeline off of
64-
Raises:
65-
- NotImplementedError: raised if the query contains a `cursor` or `limit_to_last`
6662
Returns:
6763
a new pipeline instance representing the query
6864
"""

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.