Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Feature/pass airflow confi as job param#39007

Closed
SubhamSinghal wants to merge 4 commits intoapache:mainapache/airflow:mainfrom
SubhamSinghal:feature/pass-airflow-confi-as-job-paramSubhamSinghal/airflow:feature/pass-airflow-confi-as-job-paramCopy head branch name to clipboard
Closed

Feature/pass airflow confi as job param#39007
SubhamSinghal wants to merge 4 commits intoapache:mainapache/airflow:mainfrom
SubhamSinghal:feature/pass-airflow-confi-as-job-paramSubhamSinghal/airflow:feature/pass-airflow-confi-as-job-paramCopy head branch name to clipboard

Conversation

@SubhamSinghal
Copy link
Copy Markdown
Contributor

@SubhamSinghal SubhamSinghal commented Apr 14, 2024

Fixes: #39002

Tests:

  1. empty airflow config and empty job parameter -- empty job parameter
  2. empty airflow config and some job parameter in databricks operator -- job params from operator get applied
  3. non empty airflow config with empty job parameter -- airflow config gets passed as job param
  4. non empty airflow config with non empty job parameter -- job params from operator get applied

Comment thread airflow/providers/databricks/operators/databricks.py Outdated
Comment thread airflow/providers/databricks/operators/databricks.py Outdated
@SubhamSinghal
Copy link
Copy Markdown
Contributor Author

@dirrao Let me know if it makes more sense to add a flag "import_airflow_config" and when it is True then set airflow config as job_params in databricks jobs.

@eladkal eladkal requested review from Lee-W and pankajkoti April 16, 2024 08:00
Comment on lines +317 to +321
job_params = self.params.items() if self.params.items() else {}
param_list = []
for k, v in job_params:
param_list.append({"name": k, "default": v})
self.json["parameters"] = param_list
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
job_params = self.params.items() if self.params.items() else {}
param_list = []
for k, v in job_params:
param_list.append({"name": k, "default": v})
self.json["parameters"] = param_list
if self.params.items() is not None:
self.json["parameters"] = [{"name": k, "default": v} for for k, v in self.params.items()]
else:
self.json["parameters"] = {}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this self.params from?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are airflow configs

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jul 8, 2024

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions Bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jul 8, 2024
@github-actions github-actions Bot closed this Jul 14, 2024
moomindani added a commit to moomindani/airflow that referenced this pull request May 9, 2026
Apply Lee-W's review suggestion from PR apache#39007: replace the manual loop
with a list comprehension that uses ``params.dump()`` (the original
``params.items()`` iteration yielded ``Param`` objects rather than the
resolved values, which would not serialise into the Databricks API).

Extend the same pattern to:
* DatabricksRunNowOperator -> populate top-level ``job_parameters`` (the
  dict-shaped slot already supported by the run-now endpoint).
* DatabricksSubmitRunOperator -> populate dict-shaped per-task parameter
  fields (notebook_task.base_parameters, python_wheel_task.named_parameters,
  sql_task.parameters, run_job_task.job_parameters). Tasks whose only
  parameter field is ``List[str]`` (spark_jar_task, spark_python_task,
  spark_submit_task) are intentionally skipped because there is no
  canonical mapping from a key/value dict to positional CLI arguments.

Drop the ``"parameters": []`` expectation that was added to the existing
test_exec_create / test_exec_reset cases by PR apache#39007 — it never matched
the source logic (``self.params`` is falsy when no params are set, so no
``parameters`` key is added).

Add tests covering: auto-injection for each operator, no override when
the field is already populated, and the per-task injection rules for
SubmitRun.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:databricks stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow passing airflow params as job parameter in databricks job

4 participants

Morty Proxy This is a proxified and sanitized view of the page, visit original site.