#!/usr/bin/env python3 from __future__ import annotations import json import os import shutil import signal import subprocess import sys import tempfile import threading import time import traceback from pathlib import Path from typing import Any DEFAULT_TIMEOUT_SECONDS = 30 MAX_TIMEOUT_SECONDS = 300 DEFAULT_PYTHON_VERSION = "3.13.9" RUNNER_SOURCE = r''' from __future__ import annotations import asyncio import inspect import json import os import platform import sys import textwrap import traceback def write_payload(path, payload): with open(path, "w", encoding="utf-8") as file: json.dump(payload, file, ensure_ascii=False, allow_nan=False) file.write("\n") def error_payload(error): return { "name": error.__class__.__name__, "message": str(error), "stack": "".join(traceback.format_exception(error)), } async def run_user_script(params): script = params["script"] input_value = params["input"] if "input" in params else None extra_env = params.get("env") or {} for key, value in extra_env.items(): os.environ[key] = value body = script.rstrip() if body: body = textwrap.indent(body, " ") else: body = " pass" source = "async def __dagu_user_main__(input, params, env):\n" + body + "\n pass\n" namespace = {"__name__": "__dagu_python_script__"} exec(compile(source, "", "exec"), namespace) result = namespace["__dagu_user_main__"](input_value, params, dict(os.environ)) if inspect.isawaitable(result): result = await result json.dumps(result, ensure_ascii=False, allow_nan=False) return result def main(): params_path, payload_path = sys.argv[1], sys.argv[2] try: with open(params_path, "r", encoding="utf-8") as file: params = json.load(file) result = asyncio.run(run_user_script(params)) write_payload(payload_path, { "ok": True, "result": result, "pythonVersion": platform.python_version(), }) return 0 except BaseException as error: write_payload(payload_path, { "ok": False, "result": None, "pythonVersion": platform.python_version(), "error": error_payload(error), }) return 1 if __name__ == "__main__": raise SystemExit(main()) ''' class ValidationError(Exception): pass def parse_params() -> dict[str, Any]: raw = os.environ.get("DAG_PARAMS_JSON") if not raw: raise ValidationError("DAG_PARAMS_JSON is required") try: params = json.loads(raw) except json.JSONDecodeError as error: raise ValidationError(f"DAG_PARAMS_JSON must be valid JSON: {error}") from error if params is None or isinstance(params, list) or not isinstance(params, dict): raise ValidationError("DAG_PARAMS_JSON must be a JSON object") return params def require_script(params: dict[str, Any]) -> str: script = params.get("script") if not isinstance(script, str) or not script.strip(): raise ValidationError("script must be a non-empty string") return script def resolve_timeout_seconds(value: Any) -> int: if value in (None, ""): return DEFAULT_TIMEOUT_SECONDS if not isinstance(value, int) or isinstance(value, bool): raise ValidationError("timeoutSeconds must be an integer") if value < 1 or value > MAX_TIMEOUT_SECONDS: raise ValidationError(f"timeoutSeconds must be between 1 and {MAX_TIMEOUT_SECONDS}") return value def resolve_python_version(value: Any) -> str: if value in (None, ""): return DEFAULT_PYTHON_VERSION if not isinstance(value, str) or not value.strip(): raise ValidationError("pythonVersion must be a non-empty string") version = value.strip() if any(char.isspace() for char in version): raise ValidationError("pythonVersion must not contain whitespace") return version def resolve_requirements(value: Any) -> list[str]: if value in (None, ""): return [] if not isinstance(value, list): raise ValidationError("requirements must be an array of strings") requirements: list[str] = [] for index, item in enumerate(value): if not isinstance(item, str) or not item.strip(): raise ValidationError(f"requirements[{index}] must be a non-empty string") requirements.append(item.strip()) return requirements def resolve_env(value: Any) -> dict[str, str]: if value is None: return {} if not isinstance(value, dict) or isinstance(value, list): raise ValidationError("env must be an object of string values") env: dict[str, str] = {} for key, item in value.items(): if not isinstance(key, str) or not key: raise ValidationError("env contains an empty key") if not isinstance(item, str): raise ValidationError(f"env.{key} must be a string") env[key] = item return env def duration_ms(started_at: float) -> int: return int((time.monotonic() - started_at) * 1000) def error_payload(error: BaseException) -> dict[str, str]: return { "name": error.__class__.__name__, "message": str(error), "stack": "".join(traceback.format_exception(error)), } def write_output(output: dict[str, Any]) -> None: sys.stdout.write(json.dumps(output, ensure_ascii=False, allow_nan=False) + "\n") sys.stdout.flush() def build_child_env(extra_env: dict[str, str]) -> dict[str, str]: env = os.environ.copy() env.update(extra_env) env["PYTHONUNBUFFERED"] = "1" env.setdefault("UV_NO_PROGRESS", "1") return env def build_command(python_version: str, requirements: list[str], runner_path: Path, params_path: Path, payload_path: Path) -> list[str]: uv = shutil.which("uv") if not uv: raise ValidationError("uv command was not found in PATH") command = [ uv, "run", "--quiet", "--no-progress", "--isolated", "--python", python_version, ] for requirement in requirements: command.extend(["--with", requirement]) command.extend(["python", str(runner_path), str(params_path), str(payload_path)]) return command def pump_stream(stream: Any, chunks: list[str]) -> None: try: for data in iter(stream.readline, b""): if not data: break text = data.decode("utf-8", errors="replace") chunks.append(text) sys.stderr.write(text) sys.stderr.flush() finally: stream.close() def terminate_process_group(process: subprocess.Popen[bytes]) -> None: if process.poll() is not None: return if os.name == "nt": subprocess.run( ["taskkill", "/F", "/T", "/PID", str(process.pid)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False, ) if process.poll() is not None: return process.kill() return try: os.killpg(process.pid, signal.SIGTERM) except ProcessLookupError: return except OSError: process.terminate() try: process.wait(timeout=2) return except subprocess.TimeoutExpired: pass try: os.killpg(process.pid, signal.SIGKILL) except ProcessLookupError: return except OSError: process.kill() process.wait() def start_process(command: list[str], env: dict[str, str]) -> subprocess.Popen[bytes]: kwargs: dict[str, Any] = { "stdout": subprocess.PIPE, "stderr": subprocess.PIPE, "env": env, } if os.name == "nt": kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP else: kwargs["start_new_session"] = True return subprocess.Popen(command, **kwargs) def read_payload(payload_path: Path) -> dict[str, Any] | None: if not payload_path.exists(): return None with payload_path.open("r", encoding="utf-8") as file: payload = json.load(file) if not isinstance(payload, dict): raise ValidationError("script payload must be a JSON object") return payload def run_child(params: dict[str, Any], timeout_seconds: int, python_version: str, requirements: list[str], extra_env: dict[str, str]) -> dict[str, Any]: with tempfile.TemporaryDirectory(prefix="dagu-python-script-") as temp_dir: root = Path(temp_dir) runner_path = root / "runner.py" params_path = root / "params.json" payload_path = root / "payload.json" runner_path.write_text(RUNNER_SOURCE, encoding="utf-8") params_path.write_text(json.dumps(params, ensure_ascii=False, allow_nan=False), encoding="utf-8") command = build_command(python_version, requirements, runner_path, params_path, payload_path) process = start_process(command, build_child_env(extra_env)) stdout_chunks: list[str] = [] stderr_chunks: list[str] = [] stdout_thread = threading.Thread(target=pump_stream, args=(process.stdout, stdout_chunks), daemon=True) stderr_thread = threading.Thread(target=pump_stream, args=(process.stderr, stderr_chunks), daemon=True) stdout_thread.start() stderr_thread.start() timed_out = False try: return_code = process.wait(timeout=timeout_seconds) except subprocess.TimeoutExpired: timed_out = True terminate_process_group(process) return_code = process.wait() stdout_thread.join(timeout=5) stderr_thread.join(timeout=5) stdout = "".join(stdout_chunks) stderr = "".join(stderr_chunks) if timed_out: return { "ok": False, "result": None, "stdout": stdout, "stderr": stderr, "pythonVersion": python_version, "error": { "name": "TimeoutError", "message": f"script timed out after {timeout_seconds}s", }, } payload = read_payload(payload_path) if payload is None: return { "ok": False, "result": None, "stdout": stdout, "stderr": stderr, "pythonVersion": python_version, "error": { "name": "RuntimeError", "message": f"python script process exited with code {return_code} without writing output", "stack": stderr, }, } if return_code != 0 and payload.get("ok") is True: payload = { "ok": False, "result": None, "pythonVersion": payload.get("pythonVersion", python_version), "error": { "name": "RuntimeError", "message": f"python script process exited with code {return_code}", "stack": stderr, }, } payload["stdout"] = stdout payload["stderr"] = stderr payload.setdefault("pythonVersion", python_version) return payload def main() -> int: started_at = time.monotonic() try: params = parse_params() require_script(params) timeout_seconds = resolve_timeout_seconds(params.get("timeoutSeconds")) python_version = resolve_python_version(params.get("pythonVersion")) requirements = resolve_requirements(params.get("requirements")) extra_env = resolve_env(params.get("env")) output = run_child(params, timeout_seconds, python_version, requirements, extra_env) except BaseException as error: output = { "ok": False, "result": None, "stdout": "", "stderr": "", "pythonVersion": DEFAULT_PYTHON_VERSION, "error": error_payload(error), } output["durationMs"] = duration_ms(started_at) write_output(output) return 0 if output.get("ok") is True else 1 if __name__ == "__main__": raise SystemExit(main())