Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions 14 airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3026,6 +3026,20 @@ state_store:
example: "mypackage.state.CustomStateBackend"
default: "airflow.state.metastore.MetastoreStateBackend"

deadlines:
description: |
Configuration for deadline alerts feature.
options:
callback_execution_timeout:
description: |
Maximum time in seconds that a deadline callback is allowed to execute before being
terminated. If a callback does not complete within this timeout, the supervisor will
kill the subprocess. Set to 0 to disable the timeout (callbacks may run indefinitely).
version_added: 3.3.0
type: integer
example: ~
default: "300"

profiling:
description: |
Configuration for memory profiling in Airflow component.
Expand Down
28 changes: 25 additions & 3 deletions 28 task-sdk/src/airflow/sdk/execution_time/callback_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from __future__ import annotations

import signal
import sys
import time
from importlib import import_module
Expand All @@ -29,6 +30,7 @@
from pydantic import Field, TypeAdapter

from airflow.sdk._shared.module_loading import accepts_context, accepts_keyword_args
from airflow.sdk.configuration import conf
from airflow.sdk.exceptions import ErrorType
from airflow.sdk.execution_time.comms import (
ErrorResponse,
Expand Down Expand Up @@ -67,6 +69,10 @@ class _BundleInfoLike(Protocol):

log: FilteringBoundLogger = structlog.get_logger(logger_name="callback_supervisor")

# Maximum time (in seconds) a deadline callback is allowed to run before being killed.
# A value of 0 disables the timeout.
CALLBACK_EXECUTION_TIMEOUT: int = conf.getint("deadlines", "callback_execution_timeout", fallback=300)


# The set of messages that a callback subprocess can send to the supervisor.
# This is a minimal subset of ToSupervisor: read-only access to Connections
Expand Down Expand Up @@ -246,14 +252,30 @@ def wait(self) -> int:

def _monitor_subprocess(self):
"""
Monitor the subprocess until it exits.
Monitor the subprocess until it exits or exceeds the callback execution timeout.

A simplified version of ActivitySubprocess._monitor_subprocess() without heartbeating
or timeout handling, just process output monitoring and stuck-socket cleanup.
Enforces the ``[deadlines] callback_execution_timeout`` configuration: if the
subprocess runs longer than the configured limit, it is killed with SIGTERM
(escalating to SIGKILL if necessary).
"""
start_monotonic = time.monotonic()

while self._exit_code is None or self._open_sockets:
self._service_subprocess(max_wait_time=MIN_HEARTBEAT_INTERVAL)

# Enforce callback execution timeout (0 means disabled).
if (
CALLBACK_EXECUTION_TIMEOUT > 0
and self._exit_code is None
and (time.monotonic() - start_monotonic) > CALLBACK_EXECUTION_TIMEOUT
):
log.warning(
"Callback execution timeout reached; terminating subprocess",
pid=self.pid,
timeout_seconds=CALLBACK_EXECUTION_TIMEOUT,
)
self.kill(signal.SIGTERM, force=True)

# If the process has exited but sockets remain open, apply a timeout
# to prevent hanging indefinitely on stuck sockets.
if self._exit_code is not None and self._open_sockets:
Expand Down
102 changes: 102 additions & 0 deletions 102 task-sdk/tests/task_sdk/execution_time/test_callback_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from __future__ import annotations

import signal
import socket
from dataclasses import dataclass
from operator import attrgetter
Expand Down Expand Up @@ -228,3 +229,104 @@ def test_handle_requests(

if client_mock:
mock_client_method.assert_called_once_with(*client_mock.args, **client_mock.kwargs)


class TestCallbackExecutionTimeout:
"""Verify that CallbackSubprocess kills the subprocess when the execution timeout is exceeded."""

@pytest.fixture
def callback_subprocess(self, mocker):
read_end, write_end = socket.socketpair()
proc = CallbackSubprocess(
process_log=mocker.MagicMock(),
id="12345678-1234-5678-1234-567812345678",
pid=12345,
stdin=write_end,
client=mocker.Mock(),
process=mocker.Mock(),
)
return proc, read_end

def test_timeout_kills_subprocess(self, monkeypatch, mocker, callback_subprocess, captured_logs):
"""When the callback exceeds the configured timeout, the subprocess is terminated."""
timeout_seconds = 10
monkeypatch.setattr(
"airflow.sdk.execution_time.callback_supervisor.CALLBACK_EXECUTION_TIMEOUT", timeout_seconds
)

proc, _read_end = callback_subprocess

# Patch the kill method so we can assert it was called
mock_kill = mocker.patch("airflow.sdk.execution_time.callback_supervisor.CallbackSubprocess.kill")

# Simulate time progressing past the timeout.
# First call returns 0 (start), subsequent calls return past-timeout values.
call_count = 0

def mock_monotonic():
nonlocal call_count
call_count += 1
if call_count <= 1:
return 0.0
# After the first call, time has exceeded the timeout
return timeout_seconds + 1.0

# Patch _service_subprocess to simulate one loop iteration where the process is still alive
service_call_count = 0

def mock_service_subprocess(max_wait_time):
nonlocal service_call_count
service_call_count += 1
# After the kill is called (which sets _exit_code via the mock), stop the loop
if service_call_count > 1:
proc._exit_code = -signal.SIGTERM
return None

mocker.patch.object(CallbackSubprocess, "_service_subprocess", side_effect=mock_service_subprocess)

with patch(
"airflow.sdk.execution_time.callback_supervisor.time.monotonic", side_effect=mock_monotonic
):
proc._monitor_subprocess()

mock_kill.assert_called_once_with(signal.SIGTERM, force=True)
assert any(
"Callback execution timeout reached" in record.get("event", "") for record in captured_logs
)

def test_no_timeout_when_disabled(self, monkeypatch, mocker, callback_subprocess):
"""When timeout is 0, the subprocess is never killed for exceeding a time limit."""
monkeypatch.setattr("airflow.sdk.execution_time.callback_supervisor.CALLBACK_EXECUTION_TIMEOUT", 0)

proc, _read_end = callback_subprocess

mock_kill = mocker.patch("airflow.sdk.execution_time.callback_supervisor.CallbackSubprocess.kill")

# Simulate time progressing well past any reasonable timeout
call_count = 0

def mock_monotonic():
nonlocal call_count
call_count += 1
# Return ever-increasing time (simulating long-running callback)
return call_count * 1000.0

# Subprocess exits normally after one iteration
service_call_count = 0

def mock_service_subprocess(max_wait_time):
nonlocal service_call_count
service_call_count += 1
if service_call_count >= 2:
proc._exit_code = 0
return None

mocker.patch.object(CallbackSubprocess, "_service_subprocess", side_effect=mock_service_subprocess)

with patch(
"airflow.sdk.execution_time.callback_supervisor.time.monotonic", side_effect=mock_monotonic
):
proc._monitor_subprocess()

# kill should never be called when timeout is disabled
mock_kill.assert_not_called()
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.