Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on May 7, 2026. It is now read-only.

Commit ae35a98

Browse filesBrowse files
feat: Initial support for biglake iceberg tables (#2409)
1 parent b925aa2 commit ae35a98
Copy full SHA for ae35a98

23 files changed

+753-262Lines changed: 753 additions & 262 deletions
Expand file treeCollapse file tree
Open diff view settings
Collapse file

‎bigframes/core/array_value.py‎

Copy file name to clipboardExpand all lines: bigframes/core/array_value.py
+6-7Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
import datetime
1818
import functools
1919
import typing
20-
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple
20+
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple, Union
2121

22-
import google.cloud.bigquery
2322
import pandas
2423
import pyarrow as pa
2524

@@ -91,7 +90,7 @@ def from_range(cls, start, end, step):
9190
@classmethod
9291
def from_table(
9392
cls,
94-
table: google.cloud.bigquery.Table,
93+
table: Union[bq_data.BiglakeIcebergTable, bq_data.GbqNativeTable],
9594
session: Session,
9695
*,
9796
columns: Optional[Sequence[str]] = None,
@@ -103,8 +102,6 @@ def from_table(
103102
):
104103
if offsets_col and primary_key:
105104
raise ValueError("must set at most one of 'offests', 'primary_key'")
106-
# define data source only for needed columns, this makes row-hashing cheaper
107-
table_def = bq_data.GbqTable.from_table(table, columns=columns or ())
108105

109106
# create ordering from info
110107
ordering = None
@@ -115,7 +112,9 @@ def from_table(
115112
[ids.ColumnId(key_part) for key_part in primary_key]
116113
)
117114

118-
bf_schema = schemata.ArraySchema.from_bq_table(table, columns=columns)
115+
bf_schema = schemata.ArraySchema.from_bq_schema(
116+
table.physical_schema, columns=columns
117+
)
119118
# Scan all columns by default, we define this list as it can be pruned while preserving source_def
120119
scan_list = nodes.ScanList(
121120
tuple(
@@ -124,7 +123,7 @@ def from_table(
124123
)
125124
)
126125
source_def = bq_data.BigqueryDataSource(
127-
table=table_def,
126+
table=table,
128127
schema=bf_schema,
129128
at_time=at_time,
130129
sql_predicate=predicate,
Collapse file

‎bigframes/core/bq_data.py‎

Copy file name to clipboardExpand all lines: bigframes/core/bq_data.py
+173-13Lines changed: 173 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,74 +22,214 @@
2222
import queue
2323
import threading
2424
import typing
25-
from typing import Any, Iterator, Optional, Sequence, Tuple
25+
from typing import Any, Iterator, List, Literal, Optional, Sequence, Tuple, Union
2626

2727
from google.cloud import bigquery_storage_v1
2828
import google.cloud.bigquery as bq
2929
import google.cloud.bigquery_storage_v1.types as bq_storage_types
3030
from google.protobuf import timestamp_pb2
3131
import pyarrow as pa
3232

33+
import bigframes.constants
3334
from bigframes.core import pyarrow_utils
3435
import bigframes.core.schema
3536

3637
if typing.TYPE_CHECKING:
3738
import bigframes.core.ordering as orderings
3839

3940

41+
def _resolve_standard_gcp_region(bq_region: str):
42+
"""
43+
Resolve bq regions to standardized
44+
"""
45+
if bq_region.casefold() == "US":
46+
return "us-central1"
47+
elif bq_region.casefold() == "EU":
48+
return "europe-west4"
49+
return bq_region
50+
51+
52+
def is_irc_table(table_id: str):
53+
"""
54+
Determines if a table id should be resolved through the iceberg rest catalog.
55+
"""
56+
return len(table_id.split(".")) == 4
57+
58+
59+
def is_compatible(
60+
data_region: Union[GcsRegion, BigQueryRegion], session_location: str
61+
) -> bool:
62+
# based on https://docs.cloud.google.com/bigquery/docs/locations#storage-location-considerations
63+
if isinstance(data_region, BigQueryRegion):
64+
return data_region.name == session_location
65+
else:
66+
assert isinstance(data_region, GcsRegion)
67+
# TODO(b/463675088): Multi-regions don't yet support rest catalog tables
68+
if session_location in bigframes.constants.BIGQUERY_MULTIREGIONS:
69+
return False
70+
return _resolve_standard_gcp_region(session_location) in data_region.included
71+
72+
73+
def get_default_bq_region(data_region: Union[GcsRegion, BigQueryRegion]) -> str:
74+
if isinstance(data_region, BigQueryRegion):
75+
return data_region.name
76+
elif isinstance(data_region, GcsRegion):
77+
# should maybe try to track and prefer primary replica?
78+
return data_region.included[0]
79+
80+
81+
@dataclasses.dataclass(frozen=True)
82+
class BigQueryRegion:
83+
name: str
84+
85+
86+
@dataclasses.dataclass(frozen=True)
87+
class GcsRegion:
88+
# this is the name of gcs regions, which may be names for multi-regions, so shouldn't be compared with non-gcs locations
89+
storage_regions: tuple[str, ...]
90+
# this tracks all the included standard, specific regions (eg us-east1), and should be comparable to bq regions (except non-standard US, EU, omni regions)
91+
included: tuple[str, ...]
92+
93+
94+
# what is the line between metadata and core fields? Mostly metadata fields are optional or unreliable, but its fuzzy
4095
@dataclasses.dataclass(frozen=True)
41-
class GbqTable:
96+
class TableMetadata:
97+
# this size metadata might be stale, don't use where strict correctness is needed
98+
location: Union[BigQueryRegion, GcsRegion]
99+
type: Literal["TABLE", "EXTERNAL", "VIEW", "MATERIALIZE_VIEW", "SNAPSHOT"]
100+
numBytes: Optional[int] = None
101+
numRows: Optional[int] = None
102+
created_time: Optional[datetime.datetime] = None
103+
modified_time: Optional[datetime.datetime] = None
104+
105+
106+
@dataclasses.dataclass(frozen=True)
107+
class GbqNativeTable:
42108
project_id: str = dataclasses.field()
43109
dataset_id: str = dataclasses.field()
44110
table_id: str = dataclasses.field()
45111
physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field()
46-
is_physically_stored: bool = dataclasses.field()
47-
cluster_cols: typing.Optional[Tuple[str, ...]]
112+
metadata: TableMetadata = dataclasses.field()
113+
partition_col: Optional[str] = None
114+
cluster_cols: typing.Optional[Tuple[str, ...]] = None
115+
primary_key: Optional[Tuple[str, ...]] = None
48116

49117
@staticmethod
50-
def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable:
118+
def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqNativeTable:
51119
# Subsetting fields with columns can reduce cost of row-hash default ordering
52120
if columns:
53121
schema = tuple(item for item in table.schema if item.name in columns)
54122
else:
55123
schema = tuple(table.schema)
56-
return GbqTable(
124+
125+
metadata = TableMetadata(
126+
numBytes=table.num_bytes,
127+
numRows=table.num_rows,
128+
location=BigQueryRegion(table.location), # type: ignore
129+
type=table.table_type or "TABLE", # type: ignore
130+
created_time=table.created,
131+
modified_time=table.modified,
132+
)
133+
partition_col = None
134+
if table.range_partitioning:
135+
partition_col = table.range_partitioning.field
136+
elif table.time_partitioning:
137+
partition_col = table.time_partitioning.field
138+
139+
return GbqNativeTable(
57140
project_id=table.project,
58141
dataset_id=table.dataset_id,
59142
table_id=table.table_id,
60143
physical_schema=schema,
61-
is_physically_stored=(table.table_type in ["TABLE", "MATERIALIZED_VIEW"]),
144+
partition_col=partition_col,
62145
cluster_cols=None
63-
if table.clustering_fields is None
146+
if (table.clustering_fields is None)
64147
else tuple(table.clustering_fields),
148+
primary_key=tuple(_get_primary_keys(table)),
149+
metadata=metadata,
65150
)
66151

67152
@staticmethod
68153
def from_ref_and_schema(
69154
table_ref: bq.TableReference,
70155
schema: Sequence[bq.SchemaField],
156+
location: str,
157+
table_type: Literal["TABLE"] = "TABLE",
71158
cluster_cols: Optional[Sequence[str]] = None,
72-
) -> GbqTable:
73-
return GbqTable(
159+
) -> GbqNativeTable:
160+
return GbqNativeTable(
74161
project_id=table_ref.project,
75162
dataset_id=table_ref.dataset_id,
76163
table_id=table_ref.table_id,
164+
metadata=TableMetadata(location=BigQueryRegion(location), type=table_type),
77165
physical_schema=tuple(schema),
78-
is_physically_stored=True,
79166
cluster_cols=tuple(cluster_cols) if cluster_cols else None,
80167
)
81168

169+
@property
170+
def is_physically_stored(self) -> bool:
171+
return self.metadata.type in ["TABLE", "MATERIALIZED_VIEW"]
172+
82173
def get_table_ref(self) -> bq.TableReference:
83174
return bq.TableReference(
84175
bq.DatasetReference(self.project_id, self.dataset_id), self.table_id
85176
)
86177

178+
def get_full_id(self, quoted: bool = False) -> str:
179+
if quoted:
180+
return f"`{self.project_id}`.`{self.dataset_id}`.`{self.table_id}`"
181+
return f"{self.project_id}.{self.dataset_id}.{self.table_id}"
182+
87183
@property
88184
@functools.cache
89185
def schema_by_id(self):
90186
return {col.name: col for col in self.physical_schema}
91187

92188

189+
@dataclasses.dataclass(frozen=True)
190+
class BiglakeIcebergTable:
191+
project_id: str = dataclasses.field()
192+
catalog_id: str = dataclasses.field()
193+
namespace_id: str = dataclasses.field()
194+
table_id: str = dataclasses.field()
195+
physical_schema: Tuple[bq.SchemaField, ...] = dataclasses.field()
196+
cluster_cols: typing.Optional[Tuple[str, ...]]
197+
metadata: TableMetadata
198+
199+
def get_full_id(self, quoted: bool = False) -> str:
200+
if quoted:
201+
return f"`{self.project_id}`.`{self.catalog_id}`.`{self.namespace_id}`.`{self.table_id}`"
202+
return (
203+
f"{self.project_id}.{self.catalog_id}.{self.namespace_id}.{self.table_id}"
204+
)
205+
206+
@property
207+
@functools.cache
208+
def schema_by_id(self):
209+
return {col.name: col for col in self.physical_schema}
210+
211+
@property
212+
def partition_col(self) -> Optional[str]:
213+
# TODO: Use iceberg partition metadata
214+
return None
215+
216+
@property
217+
def dataset_id(self) -> str:
218+
"""
219+
Not a true dataset, but serves as the dataset component of the identifer in sql queries
220+
"""
221+
return f"{self.catalog_id}.{self.namespace_id}"
222+
223+
@property
224+
def primary_key(self) -> Optional[Tuple[str, ...]]:
225+
return None
226+
227+
def get_table_ref(self) -> bq.TableReference:
228+
return bq.TableReference(
229+
bq.DatasetReference(self.project_id, self.dataset_id), self.table_id
230+
)
231+
232+
93233
@dataclasses.dataclass(frozen=True)
94234
class BigqueryDataSource:
95235
"""
@@ -104,13 +244,13 @@ def __post_init__(self):
104244
self.schema.names
105245
)
106246

107-
table: GbqTable
247+
table: Union[GbqNativeTable, BiglakeIcebergTable]
108248
schema: bigframes.core.schema.ArraySchema
109249
at_time: typing.Optional[datetime.datetime] = None
110250
# Added for backwards compatibility, not validated
111251
sql_predicate: typing.Optional[str] = None
112252
ordering: typing.Optional[orderings.RowOrdering] = None
113-
# Optimization field
253+
# Optimization field, must be correct if set, don't put maybe-stale number here
114254
n_rows: Optional[int] = None
115255

116256

@@ -188,6 +328,8 @@ def get_arrow_batches(
188328
project_id: str,
189329
sample_rate: Optional[float] = None,
190330
) -> ReadResult:
331+
assert isinstance(data.table, GbqNativeTable)
332+
191333
table_mod_options = {}
192334
read_options_dict: dict[str, Any] = {"selected_fields": list(columns)}
193335

@@ -245,3 +387,21 @@ def process_batch(pa_batch):
245387
return ReadResult(
246388
batches, session.estimated_row_count, session.estimated_total_bytes_scanned
247389
)
390+
391+
392+
def _get_primary_keys(
393+
table: bq.Table,
394+
) -> List[str]:
395+
"""Get primary keys from table if they are set."""
396+
397+
primary_keys: List[str] = []
398+
if (
399+
(table_constraints := getattr(table, "table_constraints", None)) is not None
400+
and (primary_key := table_constraints.primary_key) is not None
401+
# This will be False for either None or empty list.
402+
# We want primary_keys = None if no primary keys are set.
403+
and (columns := primary_key.columns)
404+
):
405+
primary_keys = columns if columns is not None else []
406+
407+
return primary_keys
Collapse file

‎bigframes/core/compile/ibis_compiler/ibis_compiler.py‎

Copy file name to clipboardExpand all lines: bigframes/core/compile/ibis_compiler/ibis_compiler.py
+1-3Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,7 @@ def _table_to_ibis(
215215
source: bq_data.BigqueryDataSource,
216216
scan_cols: typing.Sequence[str],
217217
) -> ibis_types.Table:
218-
full_table_name = (
219-
f"{source.table.project_id}.{source.table.dataset_id}.{source.table.table_id}"
220-
)
218+
full_table_name = source.table.get_full_id(quoted=False)
221219
# Physical schema might include unused columns, unsupported datatypes like JSON
222220
physical_schema = ibis_bigquery.BigQuerySchema.to_ibis(
223221
list(source.table.physical_schema)
Collapse file

‎bigframes/core/nodes.py‎

Copy file name to clipboardExpand all lines: bigframes/core/nodes.py
+1-3Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -825,9 +825,7 @@ def variables_introduced(self) -> int:
825825

826826
@property
827827
def row_count(self) -> typing.Optional[int]:
828-
if self.source.sql_predicate is None and self.source.table.is_physically_stored:
829-
return self.source.n_rows
830-
return None
828+
return self.source.n_rows
831829

832830
@property
833831
def node_defined_ids(self) -> Tuple[identifiers.ColumnId, ...]:
Collapse file

‎bigframes/core/schema.py‎

Copy file name to clipboardExpand all lines: bigframes/core/schema.py
+6-21Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from dataclasses import dataclass
1818
import functools
1919
import typing
20-
from typing import Dict, List, Optional, Sequence
20+
from typing import Dict, Optional, Sequence
2121

2222
import google.cloud.bigquery
2323
import pyarrow
@@ -40,31 +40,16 @@ class ArraySchema:
4040
def __iter__(self):
4141
yield from self.items
4242

43-
@classmethod
44-
def from_bq_table(
45-
cls,
46-
table: google.cloud.bigquery.Table,
47-
column_type_overrides: Optional[
48-
typing.Dict[str, bigframes.dtypes.Dtype]
49-
] = None,
50-
columns: Optional[Sequence[str]] = None,
51-
):
52-
if not columns:
53-
fields = table.schema
54-
else:
55-
lookup = {field.name: field for field in table.schema}
56-
fields = [lookup[col] for col in columns]
57-
58-
return ArraySchema.from_bq_schema(
59-
fields, column_type_overrides=column_type_overrides
60-
)
61-
6243
@classmethod
6344
def from_bq_schema(
6445
cls,
65-
schema: List[google.cloud.bigquery.SchemaField],
46+
schema: Sequence[google.cloud.bigquery.SchemaField],
6647
column_type_overrides: Optional[Dict[str, bigframes.dtypes.Dtype]] = None,
48+
columns: Optional[Sequence[str]] = None,
6749
):
50+
if columns:
51+
lookup = {field.name: field for field in schema}
52+
schema = [lookup[col] for col in columns]
6853
if column_type_overrides is None:
6954
column_type_overrides = {}
7055
items = tuple(

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.