Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Forward Airflow Dag params to Databricks job parameters in CreateJobs/SubmitRun/RunNow#66613

Open
moomindani wants to merge 5 commits intoapache:mainapache/airflow:mainfrom
moomindani:providers/39002-databricks-airflow-params-as-job-paramsmoomindani/airflow:providers/39002-databricks-airflow-params-as-job-paramsCopy head branch name to clipboard
Open

Forward Airflow Dag params to Databricks job parameters in CreateJobs/SubmitRun/RunNow#66613
moomindani wants to merge 5 commits intoapache:mainapache/airflow:mainfrom
moomindani:providers/39002-databricks-airflow-params-as-job-paramsmoomindani/airflow:providers/39002-databricks-airflow-params-as-job-paramsCopy head branch name to clipboard

Conversation

@moomindani
Copy link
Copy Markdown
Contributor

The Databricks operators currently require job-level parameters to be hardcoded inside json. This forwards the operator's self.params (Airflow Dag / task / dag_run.conf params) into the corresponding Databricks parameter slot when the user has not explicitly populated it:

  • DatabricksCreateJobsOperator -> top-level parameters list ([{"name": k, "default": v}, ...]).
  • DatabricksRunNowOperator -> top-level job_parameters dict.
  • DatabricksSubmitRunOperator -> dict-shaped per-task fields: notebook_task.base_parameters, python_wheel_task.named_parameters, sql_task.parameters, run_job_task.job_parameters. Tasks whose only parameter slot is List[str] (spark_jar_task, spark_python_task, spark_submit_task) are skipped because there is no canonical mapping from a key/value dict to positional CLI arguments.

The injection only fires when the corresponding slot is empty, so users who explicitly pass parameters in json keep their existing behaviour.

Builds on @SubhamSinghal's earlier work in #39007 (closed as stale). Picks up @dirrao's and @Lee-W's review feedback (list-comprehension refactor) and @galafis's request to extend the feature to RunNow / SubmitRun.

closes: #39002


Was generative AI tooling used to co-author this PR?
  • Yes — Claude Code (Opus 4.7)

Generated-by: Claude Code (Opus 4.7) following the guidelines


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

Apply Lee-W's review suggestion from PR apache#39007: replace the manual loop
with a list comprehension that uses ``params.dump()`` (the original
``params.items()`` iteration yielded ``Param`` objects rather than the
resolved values, which would not serialise into the Databricks API).

Extend the same pattern to:
* DatabricksRunNowOperator -> populate top-level ``job_parameters`` (the
  dict-shaped slot already supported by the run-now endpoint).
* DatabricksSubmitRunOperator -> populate dict-shaped per-task parameter
  fields (notebook_task.base_parameters, python_wheel_task.named_parameters,
  sql_task.parameters, run_job_task.job_parameters). Tasks whose only
  parameter field is ``List[str]`` (spark_jar_task, spark_python_task,
  spark_submit_task) are intentionally skipped because there is no
  canonical mapping from a key/value dict to positional CLI arguments.

Drop the ``"parameters": []`` expectation that was added to the existing
test_exec_create / test_exec_reset cases by PR apache#39007 — it never matched
the source logic (``self.params`` is falsy when no params are set, so no
``parameters`` key is added).

Add tests covering: auto-injection for each operator, no override when
the field is already populated, and the per-task injection rules for
SubmitRun.
@moomindani moomindani force-pushed the providers/39002-databricks-airflow-params-as-job-params branch from 2126811 to ec4120f Compare May 9, 2026 00:03
@moomindani
Copy link
Copy Markdown
Contributor Author

Real-environment validation

Ran the operators end-to-end through airflow dags test against a real Databricks workspace, with verification tasks in the same DAG that read the workspace state back via the REST API. All seven tasks succeeded.

Tasks (DAG pr66613_realenv, with DAG-level params={"env": "prod", "batch_size": "100"})

Task Status
create_job (DatabricksCreateJobsOperator) success
verify_create_job — assert workspace settings.parameters contains env=prod, batch_size=100 success
run_now (DatabricksRunNowOperator, params={"env": "staging", "batch_size": "42"}) success
verify_run_now — assert run's job_parameters is {"env": "staging", "batch_size": "42"} (operator overrides DAG) success
submit_run (DatabricksSubmitRunOperator, params={"env": "dev", "shard": "1"}) success
verify_submit_run — assert task's notebook_task.base_parameters is {"env": "dev", "batch_size": "100", "shard": "1"} (DAG-level batch_size is inherited where the operator does not override) success
cleanup_job success

The verification tasks confirm the params actually arrive at Databricks (not just that the request body is constructed locally), and that DAG-level params and operator-level params merge correctly.

DAG

dev/dag_pr66613_realenv.py
"""Real-environment validation DAG for PR #66613 (GH-39002)."""
from __future__ import annotations

import os
from datetime import datetime

from airflow.providers.databricks.hooks.databricks import DatabricksHook
from airflow.providers.databricks.operators.databricks import (
    DatabricksCreateJobsOperator,
    DatabricksRunNowOperator,
    DatabricksSubmitRunOperator,
)
from airflow.sdk import DAG, task

NOTEBOOK_PATH = os.environ.get(
    "PR66613_NOTEBOOK_PATH", "/Users/<your-user>@example.com/airflow-pr66613-noop"
)


def _hook() -> DatabricksHook:
    return DatabricksHook(databricks_conn_id="databricks_default")


with DAG(
    dag_id="pr66613_realenv",
    start_date=datetime(2026, 1, 1),
    schedule=None,
    catchup=False,
    params={"env": "prod", "batch_size": "100"},
    tags=["databricks", "pr66613"],
) as dag:
    create_job = DatabricksCreateJobsOperator(
        task_id="create_job",
        json={
            "name": "{{ dag.dag_id }}-{{ ts_nodash }}",
            "tasks": [{"task_key": "noop", "notebook_task": {"notebook_path": NOTEBOOK_PATH}}],
        },
    )

    @task(task_id="verify_create_job")
    def verify_create_job(job_id: int) -> int:
        job = _hook()._do_api_call(("GET", "2.2/jobs/get"), {"job_id": job_id})
        params = job["settings"].get("parameters", [])
        assert {"name": "env", "default": "prod"} in params, params
        assert {"name": "batch_size", "default": "100"} in params, params
        return job_id

    run_now = DatabricksRunNowOperator(
        task_id="run_now",
        job_id="{{ ti.xcom_pull(task_ids='verify_create_job') }}",
        params={"env": "staging", "batch_size": "42"},
        wait_for_termination=False,
    )

    @task(task_id="verify_run_now")
    def verify_run_now(databricks_run_id: int) -> None:
        run = _hook()._do_api_call(("GET", "2.2/jobs/runs/get"), {"run_id": databricks_run_id})
        found = {p["name"]: p.get("value", p.get("default")) for p in run.get("job_parameters", [])}
        assert found.get("env") == "staging", found
        assert found.get("batch_size") == "42", found
        _hook().cancel_run(databricks_run_id)

    submit_run = DatabricksSubmitRunOperator(
        task_id="submit_run",
        notebook_task={"notebook_path": NOTEBOOK_PATH},
        new_cluster={
            "spark_version": "15.4.x-scala2.12",
            "node_type_id": "i3.xlarge",
            "num_workers": 0,
            "spark_conf": {"spark.master": "local[*]"},
            "custom_tags": {"ResourceClass": "SingleNode"},
        },
        params={"env": "dev", "shard": "1"},
        wait_for_termination=False,
    )

    @task(task_id="verify_submit_run")
    def verify_submit_run(databricks_run_id: int) -> None:
        run = _hook()._do_api_call(("GET", "2.2/jobs/runs/get"), {"run_id": databricks_run_id})
        base_params = run["tasks"][0]["notebook_task"]["base_parameters"]
        # Operator-level params override DAG-level params for shared keys; DAG-level
        # keys not overridden are still inherited (here: batch_size from the DAG).
        assert base_params == {"env": "dev", "batch_size": "100", "shard": "1"}, base_params
        _hook().cancel_run(databricks_run_id)

    @task(task_id="cleanup_job", trigger_rule="all_done")
    def cleanup_job(job_id: int) -> None:
        try:
            _hook()._do_api_call(("POST", "2.2/jobs/delete"), {"job_id": job_id})
        except Exception:  # noqa: BLE001
            pass

    job_id_xcom = verify_create_job(create_job.output)
    run_now_done = verify_run_now(
        databricks_run_id="{{ ti.xcom_pull(task_ids='run_now', key='run_id') }}"
    )
    submit_run_done = verify_submit_run(
        databricks_run_id="{{ ti.xcom_pull(task_ids='submit_run', key='run_id') }}"
    )
    create_job >> job_id_xcom >> run_now >> run_now_done
    submit_run >> submit_run_done
    [run_now_done, submit_run_done] >> cleanup_job(job_id_xcom)

Run with:

export AIRFLOW_CONN_DATABRICKS_DEFAULT='{"conn_type":"databricks","host":"https://<workspace>","password":"<token>"}'
export PR66613_NOTEBOOK_PATH=/Users/<you>/airflow-pr66613-noop  # any notebook in the workspace
airflow dags test pr66613_realenv

Add a "Forwarding Airflow Dag params" section to the jobs_create, run_now,
and submit_run operator guides describing the new behaviour: when the
operator's params dict is non-empty and the corresponding json slot is
empty, params are auto-injected as job-level parameters / job_parameters /
per-task dict-shaped parameters respectively.
@moomindani moomindani force-pushed the providers/39002-databricks-airflow-params-as-job-params branch from cbe66a8 to 942de26 Compare May 9, 2026 00:31
@eladkal
Copy link
Copy Markdown
Contributor

eladkal commented May 9, 2026

Static checks are failing

…params

- pytest.mark.parametrize first arg must be a tuple of names, not a comma-separated
  string (PT006).
- Replace self.params.dump() with dict(self.params) so the call works on both the
  ParamsDict and the plain-dict legs of self.params' union type, satisfying
  mypy union-attr.
@moomindani
Copy link
Copy Markdown
Contributor Author

Thanks @eladkal — pushed fixes for both static check failures:

  • PT006 on the new @pytest.mark.parametrize calls (changed the first arg to a tuple of names).
  • union-attr mypy errors on self.params.dump() (self.params can be a plain dict; switched to dict(self.params) which works on both ParamsDict and plain dict).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow passing airflow params as job parameter in databricks job

3 participants

Morty Proxy This is a proxified and sanitized view of the page, visit original site.