Forward Airflow Dag params to Databricks job parameters in CreateJobs/SubmitRun/RunNow#66613
Open
moomindani wants to merge 5 commits intoapache:mainapache/airflow:mainfrom
moomindani:providers/39002-databricks-airflow-params-as-job-paramsmoomindani/airflow:providers/39002-databricks-airflow-params-as-job-paramsCopy head branch name to clipboard
Open
Forward Airflow Dag params to Databricks job parameters in CreateJobs/SubmitRun/RunNow#66613moomindani wants to merge 5 commits intoapache:mainapache/airflow:mainfrom moomindani:providers/39002-databricks-airflow-params-as-job-paramsmoomindani/airflow:providers/39002-databricks-airflow-params-as-job-paramsCopy head branch name to clipboard
moomindani wants to merge 5 commits intoapache:mainapache/airflow:mainfrom
moomindani:providers/39002-databricks-airflow-params-as-job-paramsmoomindani/airflow:providers/39002-databricks-airflow-params-as-job-paramsCopy head branch name to clipboard
Conversation
Apply Lee-W's review suggestion from PR apache#39007: replace the manual loop with a list comprehension that uses ``params.dump()`` (the original ``params.items()`` iteration yielded ``Param`` objects rather than the resolved values, which would not serialise into the Databricks API). Extend the same pattern to: * DatabricksRunNowOperator -> populate top-level ``job_parameters`` (the dict-shaped slot already supported by the run-now endpoint). * DatabricksSubmitRunOperator -> populate dict-shaped per-task parameter fields (notebook_task.base_parameters, python_wheel_task.named_parameters, sql_task.parameters, run_job_task.job_parameters). Tasks whose only parameter field is ``List[str]`` (spark_jar_task, spark_python_task, spark_submit_task) are intentionally skipped because there is no canonical mapping from a key/value dict to positional CLI arguments. Drop the ``"parameters": []`` expectation that was added to the existing test_exec_create / test_exec_reset cases by PR apache#39007 — it never matched the source logic (``self.params`` is falsy when no params are set, so no ``parameters`` key is added). Add tests covering: auto-injection for each operator, no override when the field is already populated, and the per-task injection rules for SubmitRun.
2126811 to
ec4120f
Compare
Contributor
Author
Real-environment validationRan the operators end-to-end through Tasks (DAG
|
| Task | Status |
|---|---|
create_job (DatabricksCreateJobsOperator) |
success |
verify_create_job — assert workspace settings.parameters contains env=prod, batch_size=100 |
success |
run_now (DatabricksRunNowOperator, params={"env": "staging", "batch_size": "42"}) |
success |
verify_run_now — assert run's job_parameters is {"env": "staging", "batch_size": "42"} (operator overrides DAG) |
success |
submit_run (DatabricksSubmitRunOperator, params={"env": "dev", "shard": "1"}) |
success |
verify_submit_run — assert task's notebook_task.base_parameters is {"env": "dev", "batch_size": "100", "shard": "1"} (DAG-level batch_size is inherited where the operator does not override) |
success |
cleanup_job |
success |
The verification tasks confirm the params actually arrive at Databricks (not just that the request body is constructed locally), and that DAG-level params and operator-level params merge correctly.
DAG
dev/dag_pr66613_realenv.py
"""Real-environment validation DAG for PR #66613 (GH-39002)."""
from __future__ import annotations
import os
from datetime import datetime
from airflow.providers.databricks.hooks.databricks import DatabricksHook
from airflow.providers.databricks.operators.databricks import (
DatabricksCreateJobsOperator,
DatabricksRunNowOperator,
DatabricksSubmitRunOperator,
)
from airflow.sdk import DAG, task
NOTEBOOK_PATH = os.environ.get(
"PR66613_NOTEBOOK_PATH", "/Users/<your-user>@example.com/airflow-pr66613-noop"
)
def _hook() -> DatabricksHook:
return DatabricksHook(databricks_conn_id="databricks_default")
with DAG(
dag_id="pr66613_realenv",
start_date=datetime(2026, 1, 1),
schedule=None,
catchup=False,
params={"env": "prod", "batch_size": "100"},
tags=["databricks", "pr66613"],
) as dag:
create_job = DatabricksCreateJobsOperator(
task_id="create_job",
json={
"name": "{{ dag.dag_id }}-{{ ts_nodash }}",
"tasks": [{"task_key": "noop", "notebook_task": {"notebook_path": NOTEBOOK_PATH}}],
},
)
@task(task_id="verify_create_job")
def verify_create_job(job_id: int) -> int:
job = _hook()._do_api_call(("GET", "2.2/jobs/get"), {"job_id": job_id})
params = job["settings"].get("parameters", [])
assert {"name": "env", "default": "prod"} in params, params
assert {"name": "batch_size", "default": "100"} in params, params
return job_id
run_now = DatabricksRunNowOperator(
task_id="run_now",
job_id="{{ ti.xcom_pull(task_ids='verify_create_job') }}",
params={"env": "staging", "batch_size": "42"},
wait_for_termination=False,
)
@task(task_id="verify_run_now")
def verify_run_now(databricks_run_id: int) -> None:
run = _hook()._do_api_call(("GET", "2.2/jobs/runs/get"), {"run_id": databricks_run_id})
found = {p["name"]: p.get("value", p.get("default")) for p in run.get("job_parameters", [])}
assert found.get("env") == "staging", found
assert found.get("batch_size") == "42", found
_hook().cancel_run(databricks_run_id)
submit_run = DatabricksSubmitRunOperator(
task_id="submit_run",
notebook_task={"notebook_path": NOTEBOOK_PATH},
new_cluster={
"spark_version": "15.4.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 0,
"spark_conf": {"spark.master": "local[*]"},
"custom_tags": {"ResourceClass": "SingleNode"},
},
params={"env": "dev", "shard": "1"},
wait_for_termination=False,
)
@task(task_id="verify_submit_run")
def verify_submit_run(databricks_run_id: int) -> None:
run = _hook()._do_api_call(("GET", "2.2/jobs/runs/get"), {"run_id": databricks_run_id})
base_params = run["tasks"][0]["notebook_task"]["base_parameters"]
# Operator-level params override DAG-level params for shared keys; DAG-level
# keys not overridden are still inherited (here: batch_size from the DAG).
assert base_params == {"env": "dev", "batch_size": "100", "shard": "1"}, base_params
_hook().cancel_run(databricks_run_id)
@task(task_id="cleanup_job", trigger_rule="all_done")
def cleanup_job(job_id: int) -> None:
try:
_hook()._do_api_call(("POST", "2.2/jobs/delete"), {"job_id": job_id})
except Exception: # noqa: BLE001
pass
job_id_xcom = verify_create_job(create_job.output)
run_now_done = verify_run_now(
databricks_run_id="{{ ti.xcom_pull(task_ids='run_now', key='run_id') }}"
)
submit_run_done = verify_submit_run(
databricks_run_id="{{ ti.xcom_pull(task_ids='submit_run', key='run_id') }}"
)
create_job >> job_id_xcom >> run_now >> run_now_done
submit_run >> submit_run_done
[run_now_done, submit_run_done] >> cleanup_job(job_id_xcom)Run with:
export AIRFLOW_CONN_DATABRICKS_DEFAULT='{"conn_type":"databricks","host":"https://<workspace>","password":"<token>"}'
export PR66613_NOTEBOOK_PATH=/Users/<you>/airflow-pr66613-noop # any notebook in the workspace
airflow dags test pr66613_realenv
2 tasks
Add a "Forwarding Airflow Dag params" section to the jobs_create, run_now, and submit_run operator guides describing the new behaviour: when the operator's params dict is non-empty and the corresponding json slot is empty, params are auto-injected as job-level parameters / job_parameters / per-task dict-shaped parameters respectively.
cbe66a8 to
942de26
Compare
Contributor
|
Static checks are failing |
…params - pytest.mark.parametrize first arg must be a tuple of names, not a comma-separated string (PT006). - Replace self.params.dump() with dict(self.params) so the call works on both the ParamsDict and the plain-dict legs of self.params' union type, satisfying mypy union-attr.
Contributor
Author
|
Thanks @eladkal — pushed fixes for both static check failures:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The Databricks operators currently require job-level parameters to be hardcoded inside
json. This forwards the operator'sself.params(Airflow Dag / task / dag_run.conf params) into the corresponding Databricks parameter slot when the user has not explicitly populated it:DatabricksCreateJobsOperator-> top-levelparameterslist ([{"name": k, "default": v}, ...]).DatabricksRunNowOperator-> top-leveljob_parametersdict.DatabricksSubmitRunOperator-> dict-shaped per-task fields:notebook_task.base_parameters,python_wheel_task.named_parameters,sql_task.parameters,run_job_task.job_parameters. Tasks whose only parameter slot isList[str](spark_jar_task,spark_python_task,spark_submit_task) are skipped because there is no canonical mapping from a key/value dict to positional CLI arguments.The injection only fires when the corresponding slot is empty, so users who explicitly pass parameters in
jsonkeep their existing behaviour.Builds on @SubhamSinghal's earlier work in #39007 (closed as stale). Picks up @dirrao's and @Lee-W's review feedback (list-comprehension refactor) and @galafis's request to extend the feature to RunNow / SubmitRun.
closes: #39002
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4.7) following the guidelines
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.