Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

feature/aip-93-scoping: Passing Asset name/uri through to BaseEventTrigger#66595

Draft
jroachgolf84 wants to merge 2 commits intoapache:mainapache/airflow:mainfrom
jroachgolf84:feature/aip-93-scopingjroachgolf84/airflow:feature/aip-93-scopingCopy head branch name to clipboard
Draft

feature/aip-93-scoping: Passing Asset name/uri through to BaseEventTrigger#66595
jroachgolf84 wants to merge 2 commits intoapache:mainapache/airflow:mainfrom
jroachgolf84:feature/aip-93-scopingjroachgolf84/airflow:feature/aip-93-scopingCopy head branch name to clipboard

Conversation

@jroachgolf84
Copy link
Copy Markdown
Collaborator

@jroachgolf84 jroachgolf84 commented May 8, 2026

Description

With the foundation laid by AIP-103, the path forward for AIP-93 is a bit more clear; if the BaseEventTrigger can be Asset-aware, the Asset.get(...) and Asset.set(...) functionality can be leveraged.

#65103 drafted an approach that "flipped" the definition of the Asset and AssetWatcher. However, for good reason, the approach was challenged. After conversation, the goal became to "pass" the Asset through to the BaseEventTrigger with some runtime magic.

With this model, defining an Asset and AssetWatcher remains the same as before.

from airflow.sdk import Asset, AssetWatcher, DAG, task
from datetime import datetime
from triggers.event_triggers import GenericEventTrigger


generic_asset_watcher = AssetWatcher(
    name="generic_asset_watcher",
    trigger=GenericEventTrigger(
        random_number=1,
        waiter_delay=15
    )
)

generic_asset = Asset(
    name="generic_asset",
    watchers=[generic_asset_watcher],
)


with DAG(
    dag_id="aip_93_scoping",
    start_date=datetime(2026, 1, 1),
    schedule=[generic_asset]
) as dag:

    @task
    def downstream_task():
        pass

    downstream_task()

Testing

No unit tests have been written for this logic yet. However, testing has been performed E2E locally with breeze. The trigger authored below is what was used for testing. When this ran, the Asset name and uri were output in the Triggerer logs.

class GenericEventTrigger(BaseEventTrigger):
    def __init__(
        self,
        random_number,
        waiter_delay,
        **kwargs
    ):
        super().__init__(**kwargs)

        self.random_number = random_number
        self.waiter_delay = waiter_delay

    def serialize(self) -> tuple[str, dict[str, Any]]:
        """Serialize the Trigger, including the func, params, and waiter_delay."""
        return (
            self.__class__.__module__ + "." + self.__class__.__qualname__,
            {
                "random_number": self.random_number,
            },
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Logic that fires a TriggerEvent."""
        logging.info(f"***** watched_asset: {self.watched_asset}")
        logging.info(f"***** watched_asset.name: {self.watched_asset.name}")
        logging.info(f"***** watched_asset.uri: {self.watched_asset.uri}")

        while True:
            result = random.randint(0, 5)
            logging.info(f"result: {result}")

            if result == self.random_number:
                logging.info("yield'ing TriggerEvent")
                yield TriggerEvent({"status": "success", "result": result})
                break

            logging.info(f"Sleeping for {self.waiter_delay} seconds")
            await asyncio.sleep(self.waiter_delay)

@boring-cyborg boring-cyborg Bot added area:Executors-core LocalExecutor & SequentialExecutor area:Triggerer labels May 8, 2026
@jroachgolf84
Copy link
Copy Markdown
Collaborator Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Executors-core LocalExecutor & SequentialExecutor area:Triggerer

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Morty Proxy This is a proxified and sanitized view of the page, visit original site.