diff --git a/.kokoro/continuous/doctest.cfg b/.kokoro/continuous/doctest.cfg index dca21d43fd..6016700408 100644 --- a/.kokoro/continuous/doctest.cfg +++ b/.kokoro/continuous/doctest.cfg @@ -3,7 +3,7 @@ # Only run this nox session. env_vars: { key: "NOX_SESSION" - value: "doctest" + value: "doctest cleanup" } env_vars: { diff --git a/.kokoro/presubmit/doctest.cfg b/.kokoro/presubmit/doctest.cfg index dca21d43fd..6016700408 100644 --- a/.kokoro/presubmit/doctest.cfg +++ b/.kokoro/presubmit/doctest.cfg @@ -3,7 +3,7 @@ # Only run this nox session. env_vars: { key: "NOX_SESSION" - value: "doctest" + value: "doctest cleanup" } env_vars: { diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 0d7a90c250..072bcc5781 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -274,6 +274,10 @@ def __init__( metrics=self._metrics, ) + def __del__(self): + """Automatic cleanup of internal resources""" + self.close() + @property def bqclient(self): return self._clients_provider.bqclient diff --git a/noxfile.py b/noxfile.py index 92f8acad7f..ef4bf1a37a 100644 --- a/noxfile.py +++ b/noxfile.py @@ -697,8 +697,8 @@ def system_prerelease(session: nox.sessions.Session): @nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS) def notebook(session: nox.Session): - GOOGLE_CLOUD_PROJECT = os.getenv("GOOGLE_CLOUD_PROJECT") - if not GOOGLE_CLOUD_PROJECT: + google_cloud_project = os.getenv("GOOGLE_CLOUD_PROJECT") + if not google_cloud_project: session.error( "Set GOOGLE_CLOUD_PROJECT environment variable to run notebook session." ) @@ -937,3 +937,31 @@ def release_dry_run(session): ): env["PROJECT_ROOT"] = "." session.run(".kokoro/release-nightly.sh", "--dry-run", env=env) + + +@nox.session(python=DEFAULT_PYTHON_VERSION) +def cleanup(session): + """Clean up stale and/or temporary resources in the test project.""" + google_cloud_project = os.getenv("GOOGLE_CLOUD_PROJECT") + if not google_cloud_project: + session.error( + "Set GOOGLE_CLOUD_PROJECT environment variable to run notebook session." + ) + + # Cleanup a few stale (more than 12 hours old) temporary cloud run + # functions created by bigframems. This will help keeping the test GCP + # project within the "Number of functions" quota + # https://cloud.google.com/functions/quotas#resource_limits + recency_cutoff_hours = 12 + cleanup_count_per_location = 10 + + session.install("-e", ".") + + session.run( + "python", + "scripts/manage_cloud_functions.py", + f"--project-id={google_cloud_project}", + f"--recency-cutoff={recency_cutoff_hours}", + "cleanup", + f"--number={cleanup_count_per_location}", + ) diff --git a/scripts/manage_cloud_functions.py b/scripts/manage_cloud_functions.py index 6b69089089..33af8463c9 100644 --- a/scripts/manage_cloud_functions.py +++ b/scripts/manage_cloud_functions.py @@ -13,7 +13,7 @@ # limitations under the License. import argparse -from datetime import datetime +import datetime as dt import sys import time @@ -94,8 +94,10 @@ def summarize_gcfs(args): # Count how many GCFs are newer than a day recent = 0 for f in functions: - age = datetime.now() - datetime.fromtimestamp(f.update_time.timestamp()) - if age.days <= 0: + age = dt.datetime.now() - dt.datetime.fromtimestamp( + f.update_time.timestamp() + ) + if age.total_seconds() < args.recency_cutoff: recent += 1 region_counts[region] = (functions_count, recent) @@ -106,7 +108,7 @@ def summarize_gcfs(args): region = item[0] count, recent = item[1] print( - "{}: Total={}, Recent={}, OlderThanADay={}".format( + "{}: Total={}, Recent={}, Older={}".format( region, count, recent, count - recent ) ) @@ -120,8 +122,10 @@ def cleanup_gcfs(args): functions = get_bigframes_functions(args.project_id, region) count = 0 for f in functions: - age = datetime.now() - datetime.fromtimestamp(f.update_time.timestamp()) - if age.days > 0: + age = dt.datetime.now() - dt.datetime.fromtimestamp( + f.update_time.timestamp() + ) + if age.total_seconds() >= args.recency_cutoff: try: count += 1 GCF_CLIENT.delete_function(name=f.name) @@ -134,12 +138,15 @@ def cleanup_gcfs(args): # that for this clean-up, i.e. 6 mutations per minute. So wait for # 60/6 = 10 seconds time.sleep(10) + except google.api_core.exceptions.NotFound: + # Most likely the function was deleted otherwise + pass except google.api_core.exceptions.ResourceExhausted: # Stop deleting in this region for now print( - f"Cannot delete any more functions in region {region} due to quota exhaustion. Please try again later." + f"Failed to delete function in region {region} due to quota exhaustion. Pausing for 2 minutes." ) - break + time.sleep(120) def list_str(values): @@ -168,6 +175,19 @@ def list_str(values): help="Cloud functions region(s). If multiple regions, Specify comma separated (e.g. region1,region2)", ) + def hours_to_timedelta(hrs): + return dt.timedelta(hours=int(hrs)).total_seconds() + + parser.add_argument( + "-c", + "--recency-cutoff", + type=hours_to_timedelta, + required=False, + default=hours_to_timedelta("24"), + action="store", + help="Number of hours, cloud functions older than which should be considered stale (worthy of cleanup).", + ) + subparsers = parser.add_subparsers(title="subcommands", required=True) parser_summary = subparsers.add_parser( "summary",