-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_python_script_test.py
More file actions
124 lines (99 loc) · 3.5 KB
/
run_python_script_test.py
File metadata and controls
124 lines (99 loc) · 3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python3
from __future__ import annotations
import json
import os
import subprocess
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
RUNNER = ROOT / "scripts" / "run-python-script.py"
class RunPythonScriptTest(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
completed, payload = cls.run_action({
"script": "return None",
"timeoutSeconds": 120,
})
if completed.returncode != 0:
raise AssertionError(f"prewarm failed: {payload}")
@staticmethod
def run_action(params: dict) -> tuple[subprocess.CompletedProcess[str], dict]:
env = os.environ.copy()
env["DAG_PARAMS_JSON"] = json.dumps(params)
completed = subprocess.run(
[sys.executable, str(RUNNER)],
cwd=ROOT,
env=env,
text=True,
capture_output=True,
timeout=180,
)
payload = json.loads(completed.stdout)
return completed, payload
def test_returns_structured_output(self) -> None:
completed, payload = self.run_action({
"input": {"name": "dagu", "numbers": [2, 3, 5]},
"env": {"GREETING": "hello"},
"script": """
import os
print(os.environ["GREETING"], input["name"])
return {
"total": sum(input["numbers"]),
"message": f'{env["GREETING"]} {input["name"]}',
}
""",
})
self.assertEqual(completed.returncode, 0)
self.assertTrue(payload["ok"])
self.assertEqual(payload["result"], {"total": 10, "message": "hello dagu"})
self.assertEqual(payload["stdout"], "hello dagu\n")
self.assertIn("hello dagu", completed.stderr)
self.assertRegex(payload["pythonVersion"], r"^3\.13\.\d+$")
def test_supports_async_scripts(self) -> None:
completed, payload = self.run_action({
"script": """
import asyncio
await asyncio.sleep(0.01)
return {"done": True}
""",
})
self.assertEqual(completed.returncode, 0)
self.assertTrue(payload["ok"])
self.assertEqual(payload["result"], {"done": True})
def test_installs_requirements(self) -> None:
completed, payload = self.run_action({
"requirements": ["packaging==25.0"],
"timeoutSeconds": 120,
"script": """
from packaging.version import Version
return {"major": Version("2.3.4").major}
""",
})
self.assertEqual(completed.returncode, 0)
self.assertTrue(payload["ok"])
self.assertEqual(payload["result"], {"major": 2})
def test_returns_failure_payload_when_script_throws(self) -> None:
completed, payload = self.run_action({
"script": "raise ValueError('bad input')",
})
self.assertNotEqual(completed.returncode, 0)
self.assertFalse(payload["ok"])
self.assertEqual(payload["result"], None)
self.assertEqual(payload["error"]["name"], "ValueError")
self.assertEqual(payload["error"]["message"], "bad input")
self.assertIn("<dagu-python-script>", payload["error"]["stack"])
def test_enforces_timeout(self) -> None:
completed, payload = self.run_action({
"timeoutSeconds": 1,
"script": """
import time
while True:
time.sleep(1)
""",
})
self.assertNotEqual(completed.returncode, 0)
self.assertFalse(payload["ok"])
self.assertEqual(payload["error"]["name"], "TimeoutError")
if __name__ == "__main__":
unittest.main()