Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

refactor: Extract data loading logic into class #913

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion 3 bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2542,7 +2542,8 @@ def _get_rows_as_json_values(self) -> Block:
SELECT {select_columns_csv} FROM T1
"""
# The only ways this code is used is through df.apply(axis=1) cope path
destination, query_job = self.session._query_to_destination(
# TODO: Stop using internal API
destination, query_job = self.session._loader._query_to_destination(
json_sql, index_cols=[ordering_column_name], api_name="apply"
)
if not destination:
Expand Down
79 changes: 26 additions & 53 deletions 79 bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2956,17 +2956,20 @@ def to_csv(
if "*" not in path_or_buf:
raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD)

result_table = self._run_io_query(
index=index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID
)
export_data_statement = bigframes.session._io.bigquery.create_export_csv_statement(
f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}",
uri=path_or_buf,
field_delimiter=sep,
header=header,
export_array, id_overrides = self._prepare_export(
index=index and self._has_index,
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
)
_, query_job = self._block.expr.session._start_query(
export_data_statement, api_name="dataframe-to_csv"
options = {
"field_delimiter": sep,
"header": header,
}
query_job = self._session._executor.export_gcs(
export_array,
id_overrides,
path_or_buf,
format="csv",
export_options=options,
)
self._set_internal_query_job(query_job)
return None
Expand Down Expand Up @@ -3006,17 +3009,12 @@ def to_json(
"'lines' keyword is only valid when 'orient' is 'records'."
)

result_table = self._run_io_query(
index=index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID
)
export_data_statement = bigframes.session._io.bigquery.create_export_data_statement(
f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}",
uri=path_or_buf,
format="JSON",
export_options={},
export_array, id_overrides = self._prepare_export(
index=index and self._has_index,
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
)
_, query_job = self._block.expr.session._start_query(
export_data_statement, api_name="dataframe-to_json"
query_job = self._session._executor.export_gcs(
export_array, id_overrides, path_or_buf, format="json", export_options={}
)
self._set_internal_query_job(query_job)
return None
Expand Down Expand Up @@ -3145,18 +3143,17 @@ def to_parquet(
if compression:
export_options["compression"] = compression.upper()

result_table = self._run_io_query(
index=index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID
export_array, id_overrides = self._prepare_export(
index=index and self._has_index,
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
)
export_data_statement = bigframes.session._io.bigquery.create_export_data_statement(
f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}",
uri=path,
format="PARQUET",
query_job = self._session._executor.export_gcs(
export_array,
id_overrides,
path,
format="parquet",
export_options=export_options,
)
_, query_job = self._block.expr.session._start_query(
export_data_statement, api_name="dataframe-to_parquet"
)
self._set_internal_query_job(query_job)
return None

Expand Down Expand Up @@ -3386,30 +3383,6 @@ def _prepare_export(
array_value = array_value.promote_offsets(ordering_id)
return array_value, id_overrides

def _run_io_query(
self,
index: bool,
ordering_id: Optional[str] = None,
) -> bigquery.TableReference:
"""Executes a query job presenting this dataframe and returns the destination
table."""
session = self._block.expr.session
export_array, id_overrides = self._prepare_export(
index=index and self._has_index, ordering_id=ordering_id
)

_, query_job = session._execute(
export_array,
ordered=False,
col_id_overrides=id_overrides,
)
self._set_internal_query_job(query_job)

# The query job should have finished, so there should be always be a result table.
result_table = query_job.destination
assert result_table is not None
return result_table

def map(self, func, na_action: Optional[str] = None) -> DataFrame:
if not callable(func):
raise TypeError("the first argument must be callable")
Expand Down
9 changes: 8 additions & 1 deletion 9 bigframes/functions/_remote_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import google.api_core.retry
from google.cloud import bigquery, functions_v2

import bigframes.session._io.bigquery

from . import _utils

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -142,7 +144,12 @@ def create_bq_remote_function(
self._bq_client.create_dataset(dataset, exists_ok=True)

# TODO(swast): plumb through the original, user-facing api_name.
_, query_job = self._session._start_query(create_function_ddl)
_, query_job = bigframes.session._io.bigquery.start_query_with_client(
self._session.bqclient,
create_function_ddl,
job_config=bigquery.QueryJobConfig(),
)

logger.info(f"Created remote function {query_job.ddl_target_routine}")

def get_cloud_function_fully_qualified_parent(self):
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.