From 8cf0e7664dc16ff97ad677309713099d55edca75 Mon Sep 17 00:00:00 2001 From: Artur Piotr Izaak Laskowski Date: Tue, 17 Oct 2023 16:32:22 +0200 Subject: [PATCH 1/2] Update Cleanup DAG --- composer/workflows/airflow_db_cleanup.py | 39 ++++++++++++++++++------ 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/composer/workflows/airflow_db_cleanup.py b/composer/workflows/airflow_db_cleanup.py index 65cc48c4688..daec0893545 100644 --- a/composer/workflows/airflow_db_cleanup.py +++ b/composer/workflows/airflow_db_cleanup.py @@ -53,7 +53,6 @@ import airflow from airflow import settings -from airflow.jobs.base_job import BaseJob from airflow.models import ( DAG, DagModel, @@ -101,13 +100,6 @@ # List of all the objects that will be deleted. Comment out the DB objects you # want to skip. DATABASE_OBJECTS = [ - { - "airflow_db_model": BaseJob, - "age_check_column": BaseJob.latest_heartbeat, - "keep_last": False, - "keep_last_filters": None, - "keep_last_group_by": None, - }, { "airflow_db_model": DagRun, "age_check_column": DagRun.execution_date, @@ -228,6 +220,35 @@ except Exception as e: logging.error(e) +if AIRFLOW_VERSION < ["2", "6", "0"]: + try: + from airflow.jobs.base_job import BaseJob + DATABASE_OBJECTS.append( + { + "airflow_db_model": BaseJob, + "age_check_column": BaseJob.latest_heartbeat, + "keep_last": False, + "keep_last_filters": None, + "keep_last_group_by": None, + } + ) + except Exception as e: + logging.error(e) +else: + try: + from airflow.jobs.job import Job + DATABASE_OBJECTS.append( + { + "airflow_db_model": Job, + "age_check_column": Job.latest_heartbeat, + "keep_last": False, + "keep_last_filters": None, + "keep_last_group_by": None, + } + ) + except Exception as e: + logging.error(e) + default_args = { "owner": DAG_OWNER_NAME, "depends_on_past": False, @@ -462,4 +483,4 @@ def analyze_db(): print_configuration.set_downstream(cleanup_op) cleanup_op.set_downstream(analyze_op) -# [END composer_metadb_cleanup] +# [END composer_metadb_cleanup] \ No newline at end of file From 12ce6872e3281d5d0e91de3b780009473c75da08 Mon Sep 17 00:00:00 2001 From: Artur Piotr Izaak Laskowski Date: Tue, 17 Oct 2023 16:38:16 +0200 Subject: [PATCH 2/2] Update airflow_db_cleanup.py Missing empty line at the end. --- composer/workflows/airflow_db_cleanup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer/workflows/airflow_db_cleanup.py b/composer/workflows/airflow_db_cleanup.py index daec0893545..4b8f0105a76 100644 --- a/composer/workflows/airflow_db_cleanup.py +++ b/composer/workflows/airflow_db_cleanup.py @@ -483,4 +483,4 @@ def analyze_db(): print_configuration.set_downstream(cleanup_op) cleanup_op.set_downstream(analyze_op) -# [END composer_metadb_cleanup] \ No newline at end of file +# [END composer_metadb_cleanup]