Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

test: refactor remote function tests #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions 90 tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import pytest
import test_utils.prefixer

import bigframes
from bigframes.remote_function import (
get_cloud_function_name,
get_remote_function_locations,
Expand Down Expand Up @@ -1120,3 +1121,92 @@ def plusone(x):
)
for dir_ in dirs_to_cleanup:
shutil.rmtree(dir_)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_via_session_context_connection_setter(
scalars_dfs, dataset_id, bq_cf_connection
):
# Creating a session scoped only to this test as we would be setting a
# property in it
context = bigframes.BigQueryOptions()
context.bq_connection = bq_cf_connection
session = bigframes.connect(context)

try:
# Without an explicit bigquery connection, the one present in Session,
# set via context setter would be used. Without an explicit `reuse` the
# default behavior of reuse=True will take effect. Please note that the
# udf is same as the one used in other tests in this file so the underlying
# cloud function would be common with reuse=True. Since we are using a
# unique dataset_id, even though the cloud function would be reused, the bq
# remote function would still be created, making use of the bq connection
# set in the BigQueryOptions above.
@session.remote_function([int], int, dataset=dataset_id)
def square(x):
return x * x

scalars_df, scalars_pandas_df = scalars_dfs

bf_int64_col = scalars_df["int64_col"]
bf_int64_col_filter = bf_int64_col.notnull()
bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter]
bf_result_col = bf_int64_col_filtered.apply(square)
bf_result = (
bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas()
)

pd_int64_col = scalars_pandas_df["int64_col"]
pd_int64_col_filter = pd_int64_col.notnull()
pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter]
pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x)
# TODO(shobs): Figure why pandas .apply() changes the dtype, i.e.
# pd_int64_col_filtered.dtype is Int64Dtype()
# pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64.
# For this test let's force the pandas dtype to be same as bigframes' dtype.
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal_ignore_ordering(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_default_connection(session, scalars_dfs, dataset_id):
try:

@session.remote_function([int], int, dataset=dataset_id)
def square(x):
return x * x

scalars_df, scalars_pandas_df = scalars_dfs

bf_int64_col = scalars_df["int64_col"]
bf_int64_col_filter = bf_int64_col.notnull()
bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter]
bf_result_col = bf_int64_col_filtered.apply(square)
bf_result = (
bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas()
)

pd_int64_col = scalars_pandas_df["int64_col"]
pd_int64_col_filter = pd_int64_col.notnull()
pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter]
pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x)
# TODO(shobs): Figure why pandas .apply() changes the dtype, i.e.
# pd_int64_col_filtered.dtype is Int64Dtype()
# pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64.
# For this test let's force the pandas dtype to be same as bigframes' dtype.
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal_ignore_ordering(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square
)
156 changes: 30 additions & 126 deletions 156 tests/system/small/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@
# limitations under the License.

from google.cloud import bigquery
from ibis.backends.bigquery import datatypes as bq_types
from ibis.expr import datatypes as ibis_types
import pandas as pd
import pytest

import bigframes
from bigframes import remote_function as rf
import bigframes.pandas as bpd
from tests.system.utils import assert_pandas_df_equal_ignore_ordering


Expand Down Expand Up @@ -65,45 +62,14 @@ def bq_cf_connection_location_project_mismatched() -> str:


@pytest.fixture(scope="module")
def session_with_bq_connection(bq_cf_connection) -> bigframes.Session:
return bigframes.Session(bigframes.BigQueryOptions(bq_connection=bq_cf_connection))


@pytest.fixture(scope="module")
def session_with_bq_connection_location_specified(
bq_cf_connection_location,
) -> bigframes.Session:
return bigframes.Session(
bigframes.BigQueryOptions(bq_connection=bq_cf_connection_location)
)


@pytest.fixture(scope="module")
def session_with_bq_connection_location_mistached(
bq_cf_connection_location_mistached,
) -> bigframes.Session:
return bigframes.Session(
bigframes.BigQueryOptions(bq_connection=bq_cf_connection_location_mistached)
)


@pytest.fixture(scope="module")
def session_with_bq_connection_location_project_specified(
bq_cf_connection_location_project,
def session_with_bq_connection_and_permanent_dataset(
bq_cf_connection, dataset_id_permanent
) -> bigframes.Session:
return bigframes.Session(
bigframes.BigQueryOptions(bq_connection=bq_cf_connection_location_project)
session = bigframes.Session(
bigframes.BigQueryOptions(bq_connection=bq_cf_connection)
)


def test_supported_types_correspond():
# The same types should be representable by the supported Python and BigQuery types.
ibis_types_from_python = {ibis_types.dtype(t) for t in rf.SUPPORTED_IO_PYTHON_TYPES}
ibis_types_from_bigquery = {
bq_types.BigQueryType.to_ibis(tk) for tk in rf.SUPPORTED_IO_BIGQUERY_TYPEKINDS
}

assert ibis_types_from_python == ibis_types_from_bigquery
session._session_dataset = bigquery.Dataset(dataset_id_permanent)
return session


@pytest.mark.flaky(retries=2, delay=120)
Expand Down Expand Up @@ -311,11 +277,13 @@ def square(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_direct_session_param(session_with_bq_connection, scalars_dfs):
def test_remote_function_direct_session_param(
session_with_bq_connection_and_permanent_dataset, scalars_dfs
):
@rf.remote_function(
[int],
int,
session=session_with_bq_connection,
session=session_with_bq_connection_and_permanent_dataset,
)
def square(x):
return x * x
Expand Down Expand Up @@ -345,15 +313,17 @@ def square(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_via_session_default(session_with_bq_connection, scalars_dfs):
def test_remote_function_via_session_default(
session_with_bq_connection_and_permanent_dataset, scalars_dfs
):
# Session has bigquery connection initialized via context. Without an
# explicit dataset the default dataset from the session would be used.
# Without an explicit bigquery connection, the one present in Session set
# through the explicit BigQueryOptions would be used. Without an explicit `reuse`
# the default behavior of reuse=True will take effect. Please note that the
# udf is same as the one used in other tests in this file so the underlying
# cloud function would be common and quickly reused.
@session_with_bq_connection.remote_function([int], int)
@session_with_bq_connection_and_permanent_dataset.remote_function([int], int)
def square(x):
return x * x

Expand Down Expand Up @@ -421,87 +391,15 @@ def square(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_via_session_context_connection_setter(
scalars_dfs, dataset_id, bq_cf_connection
def test_dataframe_applymap(
session_with_bq_connection_and_permanent_dataset, scalars_dfs
):
# Creating a session scoped only to this test as we would be setting a
# property in it
context = bigframes.BigQueryOptions()
context.bq_connection = bq_cf_connection
session = bigframes.connect(context)

# Without an explicit bigquery connection, the one present in Session,
# set via context setter would be used. Without an explicit `reuse` the
# default behavior of reuse=True will take effect. Please note that the
# udf is same as the one used in other tests in this file so the underlying
# cloud function would be common with reuse=True. Since we are using a
# unique dataset_id, even though the cloud function would be reused, the bq
# remote function would still be created, making use of the bq connection
# set in the BigQueryOptions above.
@session.remote_function([int], int, dataset=dataset_id)
def square(x):
return x * x

scalars_df, scalars_pandas_df = scalars_dfs

bf_int64_col = scalars_df["int64_col"]
bf_int64_col_filter = bf_int64_col.notnull()
bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter]
bf_result_col = bf_int64_col_filtered.apply(square)
bf_result = (
bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas()
)

pd_int64_col = scalars_pandas_df["int64_col"]
pd_int64_col_filter = pd_int64_col.notnull()
pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter]
pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x)
# TODO(shobs): Figure why pandas .apply() changes the dtype, i.e.
# pd_int64_col_filtered.dtype is Int64Dtype()
# pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64.
# For this test let's force the pandas dtype to be same as bigframes' dtype.
pd_result_col = pd_result_col.astype(pd.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal_ignore_ordering(bf_result, pd_result)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_default_connection(scalars_dfs, dataset_id):
@bpd.remote_function([int], int, dataset=dataset_id)
def square(x):
return x * x

scalars_df, scalars_pandas_df = scalars_dfs

bf_int64_col = scalars_df["int64_col"]
bf_int64_col_filter = bf_int64_col.notnull()
bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter]
bf_result_col = bf_int64_col_filtered.apply(square)
bf_result = (
bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas()
)

pd_int64_col = scalars_pandas_df["int64_col"]
pd_int64_col_filter = pd_int64_col.notnull()
pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter]
pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x)
# TODO(shobs): Figure why pandas .apply() changes the dtype, i.e.
# pd_int64_col_filtered.dtype is Int64Dtype()
# pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64.
# For this test let's force the pandas dtype to be same as bigframes' dtype.
pd_result_col = pd_result_col.astype(pd.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal_ignore_ordering(bf_result, pd_result)


@pytest.mark.flaky(retries=2, delay=120)
def test_dataframe_applymap(session_with_bq_connection, scalars_dfs):
def add_one(x):
return x + 1

remote_add_one = session_with_bq_connection.remote_function([int], int)(add_one)
remote_add_one = session_with_bq_connection_and_permanent_dataset.remote_function(
[int], int
)(add_one)

scalars_df, scalars_pandas_df = scalars_dfs
int64_cols = ["int64_col", "int64_too"]
Expand All @@ -524,11 +422,15 @@ def add_one(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_dataframe_applymap_na_ignore(session_with_bq_connection, scalars_dfs):
def test_dataframe_applymap_na_ignore(
session_with_bq_connection_and_permanent_dataset, scalars_dfs
):
def add_one(x):
return x + 1

remote_add_one = session_with_bq_connection.remote_function([int], int)(add_one)
remote_add_one = session_with_bq_connection_and_permanent_dataset.remote_function(
[int], int
)(add_one)

scalars_df, scalars_pandas_df = scalars_dfs
int64_cols = ["int64_col", "int64_too"]
Expand All @@ -549,11 +451,13 @@ def add_one(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_series_map(session_with_bq_connection, scalars_dfs):
def test_series_map(session_with_bq_connection_and_permanent_dataset, scalars_dfs):
def add_one(x):
return x + 1

remote_add_one = session_with_bq_connection.remote_function([int], int)(add_one)
remote_add_one = session_with_bq_connection_and_permanent_dataset.remote_function(
[int], int
)(add_one)

scalars_df, scalars_pandas_df = scalars_dfs

Expand Down Expand Up @@ -635,7 +539,7 @@ def square1(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_read_gbq_function_reads_udfs(bigquery_client, scalars_dfs, dataset_id):
def test_read_gbq_function_reads_udfs(bigquery_client, dataset_id):
dataset_ref = bigquery.DatasetReference.from_string(dataset_id)
arg = bigquery.RoutineArgument(
name="x",
Expand Down
28 changes: 28 additions & 0 deletions 28 tests/unit/test_remote_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ibis.backends.bigquery import datatypes as bq_types
from ibis.expr import datatypes as ibis_types

from bigframes import remote_function as rf


def test_supported_types_correspond():
# The same types should be representable by the supported Python and BigQuery types.
ibis_types_from_python = {ibis_types.dtype(t) for t in rf.SUPPORTED_IO_PYTHON_TYPES}
ibis_types_from_bigquery = {
bq_types.BigQueryType.to_ibis(tk) for tk in rf.SUPPORTED_IO_BIGQUERY_TYPEKINDS
}

assert ibis_types_from_python == ibis_types_from_bigquery
Morty Proxy This is a proxified and sanitized view of the page, visit original site.