Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

refactor: Add mappings from internal dtypes to bq types #810

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions 21 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,27 +444,24 @@ def _to_dataframe(self, result) -> pd.DataFrame:
# Runs strict validations to ensure internal type predictions and ibis are completely in sync
# Do not execute these validations outside of testing suite.
if "PYTEST_CURRENT_TEST" in os.environ:
self._validate_result_schema(result_dataframe)
self._validate_result_schema(result.schema)
return result_dataframe

def _validate_result_schema(self, result_df: pd.DataFrame):
def _validate_result_schema(
self, bq_result_schema: list[bigquery.schema.SchemaField]
):
actual_schema = tuple(bq_result_schema)
ibis_schema = self.expr._compiled_schema
internal_schema = self.expr.node.schema
actual_schema = bf_schema.ArraySchema(
tuple(
bf_schema.SchemaItem(name, dtype) # type: ignore
for name, dtype in result_df.dtypes.items()
)
)
if not bigframes.features.PANDAS_VERSIONS.is_arrow_list_dtype_usable:
return
if internal_schema != actual_schema:
if internal_schema.to_bigquery() != actual_schema:
raise ValueError(
f"This error should only occur while testing. BigFrames internal schema: {internal_schema} does not match actual schema: {actual_schema}"
f"This error should only occur while testing. BigFrames internal schema: {internal_schema.to_bigquery()} does not match actual schema: {actual_schema}"
)
if ibis_schema != actual_schema:
if ibis_schema.to_bigquery() != actual_schema:
raise ValueError(
f"This error should only occur while testing. Ibis schema: {ibis_schema} does not match actual schema: {actual_schema}"
f"This error should only occur while testing. Ibis schema: {ibis_schema.to_bigquery()} does not match actual schema: {actual_schema}"
)

def to_arrow(
Expand Down
6 changes: 6 additions & 0 deletions 6 bigframes/core/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ def dtypes(self) -> typing.Tuple[bigframes.dtypes.Dtype, ...]:
def _mapping(self) -> typing.Dict[ColumnIdentifierType, bigframes.dtypes.Dtype]:
return {item.column: item.dtype for item in self.items}

def to_bigquery(self) -> typing.Tuple[google.cloud.bigquery.SchemaField, ...]:
return tuple(
bigframes.dtypes.convert_to_schema_field(item.column, item.dtype)
for item in self.items
)

def drop(self, columns: typing.Iterable[str]) -> ArraySchema:
return ArraySchema(
tuple(item for item in self.items if item.column not in columns)
Expand Down
54 changes: 46 additions & 8 deletions 54 bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ class SimpleDtypeInfo:

dtype: Dtype
arrow_dtype: typing.Optional[pa.DataType]
type_kind: typing.Tuple[str, ...] # Should all correspond to the same db type
type_kind: typing.Tuple[
str, ...
] # Should all correspond to the same db type. Put preferred canonical sql type name first
logical_bytes: int = (
8 # this is approximate only, some types are variably sized, also, compression
)
Expand All @@ -84,20 +86,23 @@ class SimpleDtypeInfo:
SimpleDtypeInfo(
dtype=INT_DTYPE,
arrow_dtype=pa.int64(),
type_kind=("INT64", "INTEGER"),
type_kind=("INTEGER", "INT64"),
orderable=True,
clusterable=True,
),
SimpleDtypeInfo(
dtype=FLOAT_DTYPE,
arrow_dtype=pa.float64(),
type_kind=("FLOAT64", "FLOAT"),
type_kind=("FLOAT", "FLOAT64"),
orderable=True,
),
SimpleDtypeInfo(
dtype=BOOL_DTYPE,
arrow_dtype=pa.bool_(),
type_kind=("BOOL", "BOOLEAN"),
type_kind=(
"BOOLEAN",
"BOOL",
),
logical_bytes=1,
orderable=True,
clusterable=True,
Expand Down Expand Up @@ -143,15 +148,15 @@ class SimpleDtypeInfo:
SimpleDtypeInfo(
dtype=NUMERIC_DTYPE,
arrow_dtype=pa.decimal128(38, 9),
type_kind=("NUMERIC",),
type_kind=("NUMERIC", "DECIMAL"),
logical_bytes=16,
orderable=True,
clusterable=True,
),
SimpleDtypeInfo(
dtype=BIGNUMERIC_DTYPE,
arrow_dtype=pa.decimal256(76, 38),
type_kind=("BIGNUMERIC",),
type_kind=("BIGNUMERIC", "BIGDECIMAL"),
logical_bytes=32,
orderable=True,
clusterable=True,
Expand Down Expand Up @@ -417,6 +422,7 @@ def infer_literal_arrow_type(literal) -> typing.Optional[pa.DataType]:
for mapping in SIMPLE_TYPES
for type_kind in mapping.type_kind
}
_BIGFRAMES_TO_TK = {mapping.dtype: mapping.type_kind[0] for mapping in SIMPLE_TYPES}


def convert_schema_field(
Expand All @@ -440,12 +446,44 @@ def convert_schema_field(
if is_repeated:
pa_type = pa.list_(bigframes_dtype_to_arrow_dtype(singular_type))
return field.name, pd.ArrowDtype(pa_type)
else:
return field.name, singular_type
return field.name, singular_type
else:
raise ValueError(f"Cannot handle type: {field.field_type}")


def convert_to_schema_field(
name: str,
bigframes_dtype: Dtype,
) -> google.cloud.bigquery.SchemaField:
if bigframes_dtype in _BIGFRAMES_TO_TK:
return google.cloud.bigquery.SchemaField(
name, _BIGFRAMES_TO_TK[bigframes_dtype]
)
if isinstance(bigframes_dtype, pd.ArrowDtype):
if pa.types.is_list(bigframes_dtype.pyarrow_dtype):
inner_type = arrow_dtype_to_bigframes_dtype(
bigframes_dtype.pyarrow_dtype.value_type
)
inner_field = convert_to_schema_field(name, inner_type)
return google.cloud.bigquery.SchemaField(
name, inner_field.field_type, mode="REPEATED", fields=inner_field.fields
)
if pa.types.is_struct(bigframes_dtype.pyarrow_dtype):
inner_fields: list[pa.Field] = []
struct_type = typing.cast(pa.StructType, bigframes_dtype.pyarrow_dtype)
for i in range(struct_type.num_fields):
field = struct_type.field(i)
inner_bf_type = arrow_dtype_to_bigframes_dtype(field.type)
inner_fields.append(convert_to_schema_field(field.name, inner_bf_type))

return google.cloud.bigquery.SchemaField(
name, "RECORD", fields=inner_fields
)
raise ValueError(
f"No arrow conversion for {bigframes_dtype}. {constants.FEEDBACK_LINK}"
)


def bf_type_from_type_kind(
bq_schema: list[google.cloud.bigquery.SchemaField],
) -> typing.Dict[str, Dtype]:
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.