Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feat: label query jobs with an bigframes-api-xx key indicating the Python API(s) used to construct it #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
30 changes: 28 additions & 2 deletions 30 bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def __init__(
hidden_ordering_columns: Optional[Sequence[ibis_types.Value]] = None,
ordering: ExpressionOrdering = ExpressionOrdering(),
predicates: Optional[Collection[ibis_types.BooleanValue]] = None,
api_methods: Sequence[str] = [],
):
self._session = session
self._table = table
Expand All @@ -120,6 +121,9 @@ def __init__(
self._hidden_ordering_column_names = {
column.get_name(): column for column in self._hidden_ordering_columns
}

self._api_methods = api_methods

### Validation
value_col_ids = self._column_names.keys()
hidden_col_ids = self._hidden_ordering_column_names.keys()
Expand Down Expand Up @@ -234,6 +238,10 @@ def _ibis_order(self) -> Sequence[ibis_types.Value]:
self._ordering.all_ordering_columns,
)

@property
def api_methods(self) -> Sequence[str]:
return self._api_methods

def builder(self) -> ArrayValueBuilder:
"""Creates a mutable builder for expressions."""
# Since ArrayValue is intended to be immutable (immutability offers
Expand Down Expand Up @@ -456,11 +464,13 @@ def projection(self, columns: Iterable[ibis_types.Value]) -> ArrayValue:
new_expr = builder.build()
return new_expr

def shape(self) -> typing.Tuple[int, int]:
def shape(self, api_method: str) -> typing.Tuple[int, int]:
"""Returns dimensions as (length, width) tuple."""
width = len(self.columns)
count_expr = self._to_ibis_expr(ordering_mode="unordered").count()
sql = self._session.ibis_client.compile(count_expr)
add_api_methods = list(self.api_methods)
add_api_methods.append(api_method)

# Support in-memory engines for hermetic unit tests.
if not isinstance(sql, str):
Expand All @@ -469,6 +479,7 @@ def shape(self) -> typing.Tuple[int, int]:
row_iterator, _ = self._session._start_query(
sql=sql,
max_results=1,
api_methods=add_api_methods,
)
length = next(row_iterator)[0]
return (length, width)
Expand Down Expand Up @@ -566,6 +577,7 @@ def aggregate(
aggregations: typing.Sequence[typing.Tuple[str, agg_ops.AggregateOp, str]],
by_column_ids: typing.Sequence[str] = (),
dropna: bool = True,
api_method: str = "",
) -> ArrayValue:
"""
Apply aggregations to the expression.
Expand All @@ -579,6 +591,8 @@ def aggregate(
col_out: agg_op._as_ibis(table[col_in])
for col_in, agg_op, col_out in aggregations
}
add_api_methods = list(self.api_methods)
add_api_methods.append(api_method)
if by_column_ids:
result = table.group_by(by_column_ids).aggregate(**stats)
# Must have deterministic ordering, so order by the unique "by" column
Expand All @@ -590,7 +604,13 @@ def aggregate(
total_ordering_columns=frozenset(by_column_ids),
)
columns = tuple(result[key] for key in result.columns)
expr = ArrayValue(self._session, result, columns=columns, ordering=ordering)
expr = ArrayValue(
self._session,
result,
columns=columns,
ordering=ordering,
api_methods=add_api_methods,
)
if dropna:
for column_id in by_column_ids:
expr = expr._filter(
Expand All @@ -613,6 +633,7 @@ def aggregate(
columns=[result[col_id] for col_id in [*stats.keys()]],
hidden_ordering_columns=[result[ORDER_ID_COLUMN]],
ordering=ordering,
api_methods=add_api_methods,
)

def corr_aggregate(
Expand Down Expand Up @@ -871,6 +892,7 @@ def start_query(
job_config: Optional[bigquery.job.QueryJobConfig] = None,
max_results: Optional[int] = None,
expose_extra_columns: bool = False,
api_name: str = "",
) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
"""Execute a query and return metadata about the results."""
# TODO(swast): Cache the job ID so we can look it up again if they ask
Expand All @@ -885,10 +907,14 @@ def start_query(
# maybe we just print the job metadata that we have so far?
table = self._to_ibis_expr(expose_hidden_cols=expose_extra_columns)
sql = self._session.ibis_client.compile(table) # type:ignore

add_api_methods = list(self.api_methods)
add_api_methods.append(api_name)
return self._session._start_query(
sql=sql,
job_config=job_config,
max_results=max_results,
api_methods=add_api_methods,
)

def _get_table_size(self, destination_table):
Expand Down
24 changes: 18 additions & 6 deletions 24 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def index(self) -> indexes.IndexValue:
@functools.cached_property
def shape(self) -> typing.Tuple[int, int]:
"""Returns dimensions as (length, width) tuple."""
impl_length, _ = self._expr.shape()
impl_length, _ = self._expr.shape("shape")
return (impl_length, len(self.value_columns))

@property
Expand Down Expand Up @@ -429,6 +429,7 @@ def to_pandas(
max_download_size=max_download_size,
sampling_method=sampling_method,
random_state=random_state,
api_name="to_pandas",
)
return df, query_job

Expand All @@ -439,13 +440,16 @@ def _compute_and_count(
max_download_size: Optional[int] = None,
sampling_method: Optional[str] = None,
random_state: Optional[int] = None,
api_name: str = "",
) -> Tuple[pd.DataFrame, int, bigquery.QueryJob]:
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
# TODO(swast): Allow for dry run and timeout.
expr = self._apply_value_keys_to_expr(value_keys=value_keys)

results_iterator, query_job = expr.start_query(
max_results=max_results, expose_extra_columns=True
max_results=max_results,
expose_extra_columns=True,
api_name=api_name,
)

table_size = expr._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES
Expand Down Expand Up @@ -602,11 +606,13 @@ def _split(
return [sliced_block.drop_columns(drop_cols) for sliced_block in sliced_blocks]

def _compute_dry_run(
self, value_keys: Optional[Iterable[str]] = None
self,
value_keys: Optional[Iterable[str]] = None,
api_name: str = "",
) -> bigquery.QueryJob:
expr = self._apply_value_keys_to_expr(value_keys=value_keys)
job_config = bigquery.QueryJobConfig(dry_run=True)
_, query_job = expr.start_query(job_config=job_config)
_, query_job = expr.start_query(job_config=job_config, api_name=api_name)
return query_job

def _apply_value_keys_to_expr(self, value_keys: Optional[Iterable[str]] = None):
Expand Down Expand Up @@ -924,6 +930,7 @@ def aggregate(
*,
as_index: bool = True,
dropna: bool = True,
api_method: str = "",
) -> typing.Tuple[Block, typing.Sequence[str]]:
"""
Apply aggregations to the block. Callers responsible for setting index column(s) after.
Expand All @@ -938,7 +945,9 @@ def aggregate(
for input_id, operation in aggregations
]
output_col_ids = [agg_spec[2] for agg_spec in agg_specs]
result_expr = self.expr.aggregate(agg_specs, by_column_ids, dropna=dropna)
result_expr = self.expr.aggregate(
agg_specs, by_column_ids, dropna=dropna, api_method=api_method
)

aggregate_labels = self._get_labels_for_columns(
[agg[0] for agg in aggregations]
Expand Down Expand Up @@ -1026,6 +1035,7 @@ def summarize(
self,
column_ids: typing.Sequence[str],
stats: typing.Sequence[agg_ops.AggregateOp],
api_method: str,
):
"""Get a list of stats as a deferred block object."""
label_col_id = guid.generate_guid()
Expand All @@ -1039,7 +1049,9 @@ def summarize(
(col_id, [f"{col_id}-{stat.name}" for stat in stats])
for col_id in column_ids
]
expr = self.expr.aggregate(aggregations).unpivot(
expr = self.expr.aggregate(
aggregations=aggregations, api_method=api_method
).unpivot(
labels,
unpivot_columns=columns,
index_col_ids=[label_col_id],
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.