From 62445525bbe88f451f96d03bf622040276968829 Mon Sep 17 00:00:00 2001 From: Leah Cole Date: Tue, 17 Oct 2023 16:41:49 -0400 Subject: [PATCH 1/4] fix: dataproc retries --- dataproc/snippets/submit_job_test.py | 132 ++++++++++++----------- dataproc/snippets/update_cluster_test.py | 56 ++++++---- 2 files changed, 105 insertions(+), 83 deletions(-) diff --git a/dataproc/snippets/submit_job_test.py b/dataproc/snippets/submit_job_test.py index 826e0c8a7cf..21d64433115 100644 --- a/dataproc/snippets/submit_job_test.py +++ b/dataproc/snippets/submit_job_test.py @@ -12,106 +12,114 @@ # See the License for the specific language governing permissions and # limitations under the License. +# This sample walks a user through updating the number of clusters using the Dataproc +# client library. + import os import uuid import backoff from google.api_core.exceptions import ( AlreadyExists, + Cancelled, InternalServerError, InvalidArgument, NotFound, ServiceUnavailable, ) -from google.cloud import dataproc_v1 as dataproc +from google.cloud.dataproc_v1 import ClusterStatus, GetClusterRequest +from google.cloud.dataproc_v1.services.cluster_controller.client import ( + ClusterControllerClient, +) import pytest import submit_job PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] REGION = "us-central1" +CLUSTER_NAME = f"py-sj-test-{str(uuid.uuid4())}" +NEW_NUM_INSTANCES = 3 +CLUSTER = { + "project_id": PROJECT_ID, + "cluster_name": CLUSTER_NAME, + "config": { + "master_config": { + "num_instances": 1, + "machine_type_uri": "n1-standard-2", + "disk_config": {"boot_disk_size_gb": 100}, + }, + "worker_config": { + "num_instances": 2, + "machine_type_uri": "n1-standard-2", + "disk_config": {"boot_disk_size_gb": 100}, + }, + }, +} @pytest.fixture(scope="module") def cluster_client(): - return dataproc.ClusterControllerClient( + cluster_client = ClusterControllerClient( client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"} ) + return cluster_client @backoff.on_exception(backoff.expo, (ServiceUnavailable, InvalidArgument), max_tries=5) -def setup_cluster(cluster_client, curr_cluster_name): - CLUSTER = { - "project_id": PROJECT_ID, - "cluster_name": curr_cluster_name, - "config": { - "master_config": { - "num_instances": 1, - "machine_type_uri": "n1-standard-2", - "disk_config": {"boot_disk_size_gb": 100}, - }, - "worker_config": { - "num_instances": 2, - "machine_type_uri": "n1-standard-2", - "disk_config": {"boot_disk_size_gb": 100}, - }, - }, - } - - # Create the cluster. - operation = cluster_client.create_cluster( - request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} - ) - operation.result() +def setup_cluster(cluster_client): + try: + # Create the cluster. + operation = cluster_client.create_cluster( + request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER} + ) + operation.result() + except AlreadyExists: + print("Cluster already exists, utilize existing cluster") @backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5) -def teardown_cluster(cluster_client, curr_cluster_name): +def teardown_cluster(cluster_client): try: operation = cluster_client.delete_cluster( request={ "project_id": PROJECT_ID, "region": REGION, - "cluster_name": curr_cluster_name, + "cluster_name": CLUSTER_NAME, } ) operation.result() - except NotFound: print("Cluster already deleted") -@pytest.fixture(scope="module") -def cluster_name(cluster_client): - curr_cluster_name = f"py-sj-test-{str(uuid.uuid4())}" - - try: - setup_cluster(cluster_client, curr_cluster_name) - yield curr_cluster_name - except ( - AlreadyExists - ): # 409 can happen when we backoff on service errors during submission - print("Already exists, skipping cluster creation") - yield curr_cluster_name - finally: - teardown_cluster(cluster_client, curr_cluster_name) - - -# InvalidArgument is thrown when the subnetwork is not ready -@backoff.on_exception( - backoff.expo, - (InvalidArgument, InternalServerError, ServiceUnavailable), - max_tries=5, -) -def test_submit_job(capsys, cluster_name, cluster_client): - request = dataproc.GetClusterRequest( - project_id=PROJECT_ID, region=REGION, cluster_name=cluster_name - ) - response = cluster_client.get_cluster(request=request) - # verify the cluster is in the RUNNING state before proceeding - # this prevents a retry on InvalidArgument if the cluster is in an ERROR state - assert response.status.state == dataproc.ClusterStatus.State.RUNNING - submit_job.submit_job(PROJECT_ID, REGION, cluster_name) - out, _ = capsys.readouterr() - - assert "Job finished successfully" in out +@backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable, Cancelled), max_tries=5) +def test_submit_job(capsys, cluster_client: ClusterControllerClient): + # using this inner function instead of backoff to retry on an Error in the created cluster + # means that we can retry on the AssertionError of an errored out cluster but not other + # AssertionErrors, and it means we don't have to retry on an InvalidArgument that would occur in + # update cluster if the cluster were in an error state + def test_submit_job_inner(cluster_client: ClusterControllerClient, submit_retries: int): + try: + setup_cluster(cluster_client) + request = GetClusterRequest( + project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME + ) + response = cluster_client.get_cluster(request=request) + # verify the cluster is in the RUNNING state before proceeding + # this prevents a retry on InvalidArgument if the cluster is in an ERROR state + assert response.status.state == ClusterStatus.State.RUNNING + submit_job.submit_job(PROJECT_ID, REGION, CLUSTER_NAME) + out, _ = capsys.readouterr() + + assert "Job finished successfully" in out + except AssertionError as e: + if submit_retries < 3 and response.status.state == ClusterStatus.State.ERROR: + teardown_cluster(cluster_client) + test_submit_job_inner(cluster_client=cluster_client, submit_retries=submit_retries+1) + else: + # if we have exceeded the number of retries or the assertion error + # is not related to the cluster being in error, raise it + raise e + finally: + teardown_cluster(cluster_client) + test_submit_job_inner(cluster_client=cluster_client, submit_retries=0) diff --git a/dataproc/snippets/update_cluster_test.py b/dataproc/snippets/update_cluster_test.py index 91b0afb93ce..f5c0c7c9f4a 100644 --- a/dataproc/snippets/update_cluster_test.py +++ b/dataproc/snippets/update_cluster_test.py @@ -96,26 +96,40 @@ def teardown_cluster(cluster_client): backoff.expo, (InternalServerError, ServiceUnavailable, Cancelled), max_tries=5 ) def test_update_cluster(capsys, cluster_client: ClusterControllerClient): - try: - setup_cluster(cluster_client) - request = GetClusterRequest( - project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME - ) - response = cluster_client.get_cluster(request=request) - # verify the cluster is in the RUNNING state before proceeding - # this prevents a retry on InvalidArgument if the cluster is in an ERROR state - assert response.status.state == ClusterStatus.State.RUNNING + # using this inner function instead of backoff to retry on an Error in the created cluster + # means that we can retry on the AssertionError of an errored out cluster but not other + # AssertionErrors, and it means we don't have to retry on an InvalidArgument that would occur in + # update cluster if the cluster were in an error state + def test_update_cluster_inner(cluster_client: ClusterControllerClient, update_retries: int): + try: + setup_cluster(cluster_client) + request = GetClusterRequest( + project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME + ) + response = cluster_client.get_cluster(request=request) - # Wrapper function for client library function - update_cluster.update_cluster( - PROJECT_ID, REGION, CLUSTER_NAME, NEW_NUM_INSTANCES - ) - new_num_cluster = cluster_client.get_cluster( - project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME - ) - out, _ = capsys.readouterr() - assert CLUSTER_NAME in out - assert new_num_cluster.config.worker_config.num_instances == NEW_NUM_INSTANCES + # verify the cluster is in the RUNNING state before proceeding + # this prevents a retry on InvalidArgument if the cluster is in an ERROR state + assert response.status.state == ClusterStatus.State.RUNNING - finally: - teardown_cluster(cluster_client) + # Wrapper function for client library function + update_cluster.update_cluster( + PROJECT_ID, REGION, CLUSTER_NAME, NEW_NUM_INSTANCES + ) + new_num_cluster = cluster_client.get_cluster( + project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME + ) + out, _ = capsys.readouterr() + assert CLUSTER_NAME in out + assert new_num_cluster.config.worker_config.num_instances == NEW_NUM_INSTANCES + except AssertionError as e: + if update_retries < 3 and response.status.state == ClusterStatus.State.ERROR: + teardown_cluster(cluster_client) + test_update_cluster_inner(cluster_client=cluster_client, update_retries=update_retries+1) + else: + # if we have exceeded the number of retries or the assertion error + # is not related to the cluster being in error, raise it + raise e + finally: + teardown_cluster(cluster_client) + test_update_cluster_inner(cluster_client=cluster_client, update_retries=0) From c12c6fd64979abd9dc70a8e51118147dda9d17c4 Mon Sep 17 00:00:00 2001 From: Leah Cole Date: Tue, 17 Oct 2023 16:45:07 -0400 Subject: [PATCH 2/4] remove comment --- dataproc/snippets/submit_job_test.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dataproc/snippets/submit_job_test.py b/dataproc/snippets/submit_job_test.py index 21d64433115..5c07bfdf155 100644 --- a/dataproc/snippets/submit_job_test.py +++ b/dataproc/snippets/submit_job_test.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# This sample walks a user through updating the number of clusters using the Dataproc -# client library. import os import uuid From d590562b259f26e27adcd6aad7af7a1d87858dc7 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 17 Oct 2023 20:46:24 +0000 Subject: [PATCH 3/4] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- dataproc/snippets/submit_job_test.py | 18 ++++++++++++++---- dataproc/snippets/update_cluster_test.py | 18 ++++++++++++++---- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/dataproc/snippets/submit_job_test.py b/dataproc/snippets/submit_job_test.py index 21d64433115..1bc2102b6d9 100644 --- a/dataproc/snippets/submit_job_test.py +++ b/dataproc/snippets/submit_job_test.py @@ -92,13 +92,17 @@ def teardown_cluster(cluster_client): print("Cluster already deleted") -@backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable, Cancelled), max_tries=5) +@backoff.on_exception( + backoff.expo, (InternalServerError, ServiceUnavailable, Cancelled), max_tries=5 +) def test_submit_job(capsys, cluster_client: ClusterControllerClient): # using this inner function instead of backoff to retry on an Error in the created cluster # means that we can retry on the AssertionError of an errored out cluster but not other # AssertionErrors, and it means we don't have to retry on an InvalidArgument that would occur in # update cluster if the cluster were in an error state - def test_submit_job_inner(cluster_client: ClusterControllerClient, submit_retries: int): + def test_submit_job_inner( + cluster_client: ClusterControllerClient, submit_retries: int + ): try: setup_cluster(cluster_client) request = GetClusterRequest( @@ -113,13 +117,19 @@ def test_submit_job_inner(cluster_client: ClusterControllerClient, submit_retrie assert "Job finished successfully" in out except AssertionError as e: - if submit_retries < 3 and response.status.state == ClusterStatus.State.ERROR: + if ( + submit_retries < 3 + and response.status.state == ClusterStatus.State.ERROR + ): teardown_cluster(cluster_client) - test_submit_job_inner(cluster_client=cluster_client, submit_retries=submit_retries+1) + test_submit_job_inner( + cluster_client=cluster_client, submit_retries=submit_retries + 1 + ) else: # if we have exceeded the number of retries or the assertion error # is not related to the cluster being in error, raise it raise e finally: teardown_cluster(cluster_client) + test_submit_job_inner(cluster_client=cluster_client, submit_retries=0) diff --git a/dataproc/snippets/update_cluster_test.py b/dataproc/snippets/update_cluster_test.py index f5c0c7c9f4a..09fba7dce31 100644 --- a/dataproc/snippets/update_cluster_test.py +++ b/dataproc/snippets/update_cluster_test.py @@ -100,7 +100,9 @@ def test_update_cluster(capsys, cluster_client: ClusterControllerClient): # means that we can retry on the AssertionError of an errored out cluster but not other # AssertionErrors, and it means we don't have to retry on an InvalidArgument that would occur in # update cluster if the cluster were in an error state - def test_update_cluster_inner(cluster_client: ClusterControllerClient, update_retries: int): + def test_update_cluster_inner( + cluster_client: ClusterControllerClient, update_retries: int + ): try: setup_cluster(cluster_client) request = GetClusterRequest( @@ -121,15 +123,23 @@ def test_update_cluster_inner(cluster_client: ClusterControllerClient, update_re ) out, _ = capsys.readouterr() assert CLUSTER_NAME in out - assert new_num_cluster.config.worker_config.num_instances == NEW_NUM_INSTANCES + assert ( + new_num_cluster.config.worker_config.num_instances == NEW_NUM_INSTANCES + ) except AssertionError as e: - if update_retries < 3 and response.status.state == ClusterStatus.State.ERROR: + if ( + update_retries < 3 + and response.status.state == ClusterStatus.State.ERROR + ): teardown_cluster(cluster_client) - test_update_cluster_inner(cluster_client=cluster_client, update_retries=update_retries+1) + test_update_cluster_inner( + cluster_client=cluster_client, update_retries=update_retries + 1 + ) else: # if we have exceeded the number of retries or the assertion error # is not related to the cluster being in error, raise it raise e finally: teardown_cluster(cluster_client) + test_update_cluster_inner(cluster_client=cluster_client, update_retries=0) From a4ca853eb2a637f522c8ecd1b955fc70f6c0d78b Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 17 Oct 2023 20:47:40 +0000 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- dataproc/snippets/submit_job_test.py | 18 ++++++++++++++---- dataproc/snippets/update_cluster_test.py | 18 ++++++++++++++---- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/dataproc/snippets/submit_job_test.py b/dataproc/snippets/submit_job_test.py index 5c07bfdf155..d2201c15230 100644 --- a/dataproc/snippets/submit_job_test.py +++ b/dataproc/snippets/submit_job_test.py @@ -90,13 +90,17 @@ def teardown_cluster(cluster_client): print("Cluster already deleted") -@backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable, Cancelled), max_tries=5) +@backoff.on_exception( + backoff.expo, (InternalServerError, ServiceUnavailable, Cancelled), max_tries=5 +) def test_submit_job(capsys, cluster_client: ClusterControllerClient): # using this inner function instead of backoff to retry on an Error in the created cluster # means that we can retry on the AssertionError of an errored out cluster but not other # AssertionErrors, and it means we don't have to retry on an InvalidArgument that would occur in # update cluster if the cluster were in an error state - def test_submit_job_inner(cluster_client: ClusterControllerClient, submit_retries: int): + def test_submit_job_inner( + cluster_client: ClusterControllerClient, submit_retries: int + ): try: setup_cluster(cluster_client) request = GetClusterRequest( @@ -111,13 +115,19 @@ def test_submit_job_inner(cluster_client: ClusterControllerClient, submit_retrie assert "Job finished successfully" in out except AssertionError as e: - if submit_retries < 3 and response.status.state == ClusterStatus.State.ERROR: + if ( + submit_retries < 3 + and response.status.state == ClusterStatus.State.ERROR + ): teardown_cluster(cluster_client) - test_submit_job_inner(cluster_client=cluster_client, submit_retries=submit_retries+1) + test_submit_job_inner( + cluster_client=cluster_client, submit_retries=submit_retries + 1 + ) else: # if we have exceeded the number of retries or the assertion error # is not related to the cluster being in error, raise it raise e finally: teardown_cluster(cluster_client) + test_submit_job_inner(cluster_client=cluster_client, submit_retries=0) diff --git a/dataproc/snippets/update_cluster_test.py b/dataproc/snippets/update_cluster_test.py index f5c0c7c9f4a..09fba7dce31 100644 --- a/dataproc/snippets/update_cluster_test.py +++ b/dataproc/snippets/update_cluster_test.py @@ -100,7 +100,9 @@ def test_update_cluster(capsys, cluster_client: ClusterControllerClient): # means that we can retry on the AssertionError of an errored out cluster but not other # AssertionErrors, and it means we don't have to retry on an InvalidArgument that would occur in # update cluster if the cluster were in an error state - def test_update_cluster_inner(cluster_client: ClusterControllerClient, update_retries: int): + def test_update_cluster_inner( + cluster_client: ClusterControllerClient, update_retries: int + ): try: setup_cluster(cluster_client) request = GetClusterRequest( @@ -121,15 +123,23 @@ def test_update_cluster_inner(cluster_client: ClusterControllerClient, update_re ) out, _ = capsys.readouterr() assert CLUSTER_NAME in out - assert new_num_cluster.config.worker_config.num_instances == NEW_NUM_INSTANCES + assert ( + new_num_cluster.config.worker_config.num_instances == NEW_NUM_INSTANCES + ) except AssertionError as e: - if update_retries < 3 and response.status.state == ClusterStatus.State.ERROR: + if ( + update_retries < 3 + and response.status.state == ClusterStatus.State.ERROR + ): teardown_cluster(cluster_client) - test_update_cluster_inner(cluster_client=cluster_client, update_retries=update_retries+1) + test_update_cluster_inner( + cluster_client=cluster_client, update_retries=update_retries + 1 + ) else: # if we have exceeded the number of retries or the assertion error # is not related to the cluster being in error, raise it raise e finally: teardown_cluster(cluster_client) + test_update_cluster_inner(cluster_client=cluster_client, update_retries=0)