From dc5412608273e7d9ec78c3fe7bd77d66dc882488 Mon Sep 17 00:00:00 2001 From: zixingdeng Date: Tue, 25 Nov 2025 17:29:20 +0800 Subject: [PATCH 01/10] feat(reporter): refactor reporter implementation to use FileReporter - Update run.sh to use --file-report-mode flag - Replace Reporter with BaseReporter and FileReporter in collector.py - Replace Reporter with BaseReporter and FileReporter in executor.py - Add CHANGELOG.md to track changes --- pytest/CHANGELOG.md | 15 +++++++++++++++ pytest/script/linux/run.sh | 2 +- pytest/src/testsolar_pytestx/collector.py | 10 +++++----- pytest/src/testsolar_pytestx/executor.py | 9 +++++---- 4 files changed, 26 insertions(+), 10 deletions(-) create mode 100644 pytest/CHANGELOG.md diff --git a/pytest/CHANGELOG.md b/pytest/CHANGELOG.md new file mode 100644 index 0000000..6118249 --- /dev/null +++ b/pytest/CHANGELOG.md @@ -0,0 +1,15 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +### Changed +- Update file reporting mode in run script +- Refactor reporter implementation to use FileReporter instead of Reporter + +### Fixed +- Improve test case collection and execution with better file handling \ No newline at end of file diff --git a/pytest/script/linux/run.sh b/pytest/script/linux/run.sh index ec37d99..4d61e32 100644 --- a/pytest/script/linux/run.sh +++ b/pytest/script/linux/run.sh @@ -10,4 +10,4 @@ export PYTHONUNBUFFERED=1 export TESTSOLAR_TTP_LOADINSUBPROC=1 # 隔离环境 /usr/local/bin/testtools_sdk version -/usr/local/bin/testtools_sdk serve --tool pytest \ No newline at end of file +/usr/local/bin/testtools_sdk serve --tool pytest --file-report-mode \ No newline at end of file diff --git a/pytest/src/testsolar_pytestx/collector.py b/pytest/src/testsolar_pytestx/collector.py index 83ac916..b499094 100644 --- a/pytest/src/testsolar_pytestx/collector.py +++ b/pytest/src/testsolar_pytestx/collector.py @@ -15,7 +15,7 @@ from testsolar_testtool_sdk.model.load import LoadResult, LoadError from testsolar_testtool_sdk.model.param import EntryParam from testsolar_testtool_sdk.model.test import TestCase -from testsolar_testtool_sdk.reporter import Reporter +from testsolar_testtool_sdk.reporter import BaseReporter, FileReporter from .converter import selector_to_pytest, pytest_to_selector, CASE_DRIVE_SEPARATOR from .filter import filter_invalid_selector_path @@ -25,10 +25,10 @@ class PytestCollector: - def __init__(self, pipe_io: Optional[BinaryIO] = None): + def __init__(self, report_file_path: Path): self.collected: List[Item] = [] self.errors: Dict[str, str] = {} - self.reporter: Reporter = Reporter(pipe_io=pipe_io) + self.reporter: BaseReporter = FileReporter(report_file_path) def pytest_collection_modifyitems(self, items: Sequence[Union[Item, Collector]]) -> None: for item in items: @@ -105,7 +105,7 @@ def collect_testcases( testcase_list = [os.path.join(entry_param.ProjectPath, it) for it in pytest_paths if it] - my_plugin = PytestCollector(pipe_io) + my_plugin = PytestCollector(Path(entry_param.FileReportPath)) args = [ f"--rootdir={entry_param.ProjectPath}", "--collect-only", @@ -150,7 +150,7 @@ def collect_testcases( print(f"[Load] collect testcase count: {len(load_result.Tests)}") print(f"[Load] collect load error count: {len(load_result.LoadErrors)}") - reporter = Reporter(pipe_io=pipe_io) + reporter = FileReporter(Path(entry_param.FileReportPath)) reporter.report_load_result(load_result) diff --git a/pytest/src/testsolar_pytestx/executor.py b/pytest/src/testsolar_pytestx/executor.py index a5fda0b..00bea78 100644 --- a/pytest/src/testsolar_pytestx/executor.py +++ b/pytest/src/testsolar_pytestx/executor.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import sys from datetime import datetime, timedelta from typing import BinaryIO, Optional, Dict, Any, List, Callable @@ -18,7 +19,7 @@ ResultType, TestCaseStep, ) -from testsolar_testtool_sdk.reporter import Reporter +from testsolar_testtool_sdk.reporter import BaseReporter, FileReporter from enum import Enum from .case_log import gen_logs @@ -46,11 +47,11 @@ class RunMode(Enum): class PytestExecutor: def __init__( self, - reporter: Reporter, + reporter: BaseReporter, comment_fields: Optional[List[str]] = None, data_drive_key: Optional[str] = None, ) -> None: - self.reporter: Reporter = reporter + self.reporter: BaseReporter = reporter self.testcase_count = 0 self.testdata: Dict[str, TestResult] = {} self.skipped_testcase: Dict[str, str] = {} @@ -265,7 +266,7 @@ def run_testcases( append_extra_args(args) - reporter: Reporter = Reporter(pipe_io=pipe_io) + reporter: BaseReporter = FileReporter(report_path=Path(entry.FileReportPath)) exit_code = 0 captured_stderr = "" if run_mode == RunMode.SINGLE: From b3231b5f90942c36923e4f668845ec67ad1c2eaa Mon Sep 17 00:00:00 2001 From: zixingdeng Date: Tue, 25 Nov 2025 17:30:37 +0800 Subject: [PATCH 02/10] chore: add .vscode/ to gitignore --- pytest/.gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pytest/.gitignore b/pytest/.gitignore index aa9a7a8..7bd7ad1 100644 --- a/pytest/.gitignore +++ b/pytest/.gitignore @@ -161,4 +161,5 @@ cython_debug/ # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ -testdata/allure_results/* \ No newline at end of file +testdata/allure_results/*.vscode/ +.vscode/ From 0a0db5306a6926ba556d7e368aa65e7a24656823 Mon Sep 17 00:00:00 2001 From: zixingdeng Date: Wed, 26 Nov 2025 12:05:10 +0800 Subject: [PATCH 03/10] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=8D=95=E5=85=83?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytest/src/testsolar_pytestx/parser.py | 2 +- pytest/tests/test_collector.py | 446 +++++++++++------------ pytest/tests/test_executor.py | 481 ++++++++++++------------- pytest/tests/test_load.py | 58 +-- pytest/tests/test_run.py | 122 ++++--- pytest/tests/test_run_allure.py | 58 +-- pytest/uv.lock | 5 +- 7 files changed, 606 insertions(+), 566 deletions(-) diff --git a/pytest/src/testsolar_pytestx/parser.py b/pytest/src/testsolar_pytestx/parser.py index 04ebfa7..6d4d81e 100644 --- a/pytest/src/testsolar_pytestx/parser.py +++ b/pytest/src/testsolar_pytestx/parser.py @@ -46,7 +46,7 @@ def parse_case_attributes(item: Item, comment_fields: Optional[List[str]] = None for data_name in mark.args[0]: if data_name == case_data_name: attributes["coding_testcase_id"] = mark.args[0][data_name] - + attributes["tags"] = json.dumps(tags) return attributes diff --git a/pytest/tests/test_collector.py b/pytest/tests/test_collector.py index 7feeb42..e4df2f2 100644 --- a/pytest/tests/test_collector.py +++ b/pytest/tests/test_collector.py @@ -1,11 +1,11 @@ -import io +import tempfile import unittest from pathlib import Path from typing import Dict, List from testsolar_testtool_sdk.model.param import EntryParam from testsolar_testtool_sdk.model.load import LoadResult -from testsolar_testtool_sdk.pipe_reader import read_load_result +from testsolar_testtool_sdk.file_reader import read_file_load_result from src.testsolar_pytestx.collector import collect_testcases @@ -14,238 +14,240 @@ class CollectorTest(unittest.TestCase): testdata_dir: str = str(Path(__file__).parent.parent.absolute().joinpath("testdata")) def test_collect_testcases_when_selector_is_valid(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=self.testdata_dir, - TestSelectors=[ - "test_normal_case.py?test_success", - "aa/bb/cc/test_in_sub_class.py", - "test_data_drive.py", + with tempfile.TemporaryDirectory() as tmpdir: + report_file = Path(tmpdir) / "result.json" + entry = EntryParam( + TaskId="aa", + ProjectPath=self.testdata_dir, + TestSelectors=[ + "test_normal_case.py?test_success", + "aa/bb/cc/test_in_sub_class.py", + "test_data_drive.py", + "errors/test_import_error.py", + "errors/test_load_error.py", + ], + FileReportPath=str(report_file), + ) + + collect_testcases(entry) + + re = read_file_load_result(report_file) + + self.assertEqual(len(re.Tests), 6) + self.assertEqual(len(re.LoadErrors), 2) + re.Tests.sort(key=lambda x: x.Name) + re.LoadErrors.sort(key=lambda x: x.name) + self.assertEqual(re.Tests[0].Name, "aa/bb/cc/test_in_sub_class.py?TestCompute/test_add") + self.assertEqual(re.Tests[1].Name, "test_data_drive.py?test_eval/[2+4-6]") + self.assertEqual(re.Tests[2].Name, "test_data_drive.py?test_eval/[3+5-8]") + self.assertEqual(re.Tests[3].Name, "test_data_drive.py?test_eval/[6*9-42]") + self.assertEqual( + re.Tests[4].Name, + "test_data_drive.py?test_special_data_drive_name/[中文-分号+[id:32]]", + ) + + self.assertEqual(re.Tests[5].Name, "test_normal_case.py?test_success") + self.assertEqual(re.Tests[5].Attributes["owner"], "foo") + self.assertEqual(re.Tests[5].Attributes["description"], "测试获取答案") + self.assertEqual(re.Tests[5].Attributes["tags"], '["high"]') + self.assertEqual(re.Tests[5].Attributes["extra_attributes"], '[{"env": ["AA", "BB"]}]') + + self.assertEqual( + re.LoadErrors[0].name, "errors/test_import_error.py", - "errors/test_load_error.py", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - collect_testcases(entry, pipe_io) - pipe_io.seek(0) - - re = read_load_result(pipe_io) - - self.assertEqual(len(re.Tests), 6) - self.assertEqual(len(re.LoadErrors), 2) - re.Tests.sort(key=lambda x: x.Name) - re.LoadErrors.sort(key=lambda x: x.name) - self.assertEqual(re.Tests[0].Name, "aa/bb/cc/test_in_sub_class.py?TestCompute/test_add") - self.assertEqual(re.Tests[1].Name, "test_data_drive.py?test_eval/[2+4-6]") - self.assertEqual(re.Tests[2].Name, "test_data_drive.py?test_eval/[3+5-8]") - self.assertEqual(re.Tests[3].Name, "test_data_drive.py?test_eval/[6*9-42]") - self.assertEqual( - re.Tests[4].Name, - "test_data_drive.py?test_special_data_drive_name/[中文-分号+[id:32]]", - ) - - self.assertEqual(re.Tests[5].Name, "test_normal_case.py?test_success") - self.assertEqual(re.Tests[5].Attributes["owner"], "foo") - self.assertEqual(re.Tests[5].Attributes["description"], "测试获取答案") - self.assertEqual(re.Tests[5].Attributes["tags"], '["high"]') - self.assertEqual(re.Tests[5].Attributes["extra_attributes"], '[{"env": ["AA", "BB"]}]') - - self.assertEqual( - re.LoadErrors[0].name, - "errors/test_import_error.py", - ) - self.assertIn( - "ModuleNotFoundError: No module named 'bad_import'", - re.LoadErrors[0].message, - ) - self.assertEqual(re.LoadErrors[1].name, "errors/test_load_error.py") - self.assertIn("SyntaxError: ", re.LoadErrors[1].message) + ) + self.assertIn( + "ModuleNotFoundError: No module named 'bad_import'", + re.LoadErrors[0].message, + ) + self.assertEqual(re.LoadErrors[1].name, "errors/test_load_error.py") + self.assertIn("SyntaxError: ", re.LoadErrors[1].message) def test_collect_testcases_when_select_not_valid(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=self.testdata_dir, - TestSelectors=[ - "test_data_drive.py", - "test_not_exist.py", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - collect_testcases(entry, pipe_io) - pipe_io.seek(0) - - re = read_load_result(pipe_io) - re.Tests.sort(key=lambda x: x.Name) - re.LoadErrors.sort(key=lambda x: x.name) - self.assertEqual(len(re.Tests), 4) - self.assertEqual(len(re.LoadErrors), 1) - self.assertIn("test_not_exist.py does not exist, SKIP it", re.LoadErrors[0].message) + with tempfile.TemporaryDirectory() as tmpdir: + report_file = Path(tmpdir) / "result.json" + entry = EntryParam( + TaskId="aa", + ProjectPath=self.testdata_dir, + TestSelectors=[ + "test_data_drive.py", + "test_not_exist.py", + ], + FileReportPath=str(report_file), + ) + + collect_testcases(entry) + + re = read_file_load_result(report_file) + re.Tests.sort(key=lambda x: x.Name) + re.LoadErrors.sort(key=lambda x: x.name) + self.assertEqual(len(re.Tests), 4) + self.assertEqual(len(re.LoadErrors), 1) + self.assertIn("test_not_exist.py does not exist, SKIP it", re.LoadErrors[0].message) def test_collect_testcases_with_utf8_chars(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=self.testdata_dir, - TestSelectors=[ - "test_data_drive_zh_cn.py", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - collect_testcases(entry, pipe_io) - pipe_io.seek(0) - - re = read_load_result(pipe_io) - re.Tests.sort(key=lambda x: x.Name) - re.LoadErrors.sort(key=lambda x: x.name) - self.assertEqual(len(re.Tests), 3) - self.assertEqual(len(re.LoadErrors), 0) - - self.assertEqual( - re.Tests[0].Name, - "test_data_drive_zh_cn.py?test_include/[#?-#?^$%!/]", - ) - self.assertEqual( - re.Tests[1].Name, - "test_data_drive_zh_cn.py?test_include/[中文-中文汉字]", - ) - self.assertEqual( - re.Tests[2].Name, - "test_data_drive_zh_cn.py?test_include/[파일을 찾을 수 없습니다-ファイルが見つかりません]", - ) + with tempfile.TemporaryDirectory() as tmpdir: + report_file = Path(tmpdir) / "result.json" + entry = EntryParam( + TaskId="aa", + ProjectPath=self.testdata_dir, + TestSelectors=[ + "test_data_drive_zh_cn.py", + ], + FileReportPath=str(report_file), + ) + + collect_testcases(entry) + + re = read_file_load_result(report_file) + re.Tests.sort(key=lambda x: x.Name) + re.LoadErrors.sort(key=lambda x: x.name) + self.assertEqual(len(re.Tests), 3) + self.assertEqual(len(re.LoadErrors), 0) + + self.assertEqual( + re.Tests[0].Name, + "test_data_drive_zh_cn.py?test_include/[#?-#?^$%!/]", + ) + self.assertEqual( + re.Tests[1].Name, + "test_data_drive_zh_cn.py?test_include/[中文-中文汉字]", + ) + self.assertEqual( + re.Tests[2].Name, + "test_data_drive_zh_cn.py?test_include/[파일을 찾을 수 없습니다-ファイルが見つかりません]", + ) def test_collect_testcases_with_case_drive_separator(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=self.testdata_dir, - TestSelectors=[ - "test_normal_case.py?test_success→压缩机测试", - "test_normal_case.py?test_success→解压机测试", - "test_normal_case.py?test_success→循环机测试", - ], - FileReportPath="", - ) - - case_records = {} - - def loader_extend(param_1: str, param_2: LoadResult, param_3: Dict[str, List[str]]) -> None: - case_records.update(param_3) - - pipe_io = io.BytesIO() - collect_testcases(entry, pipe_io, extra_load_function=loader_extend) - pipe_io.seek(0) - - re = read_load_result(pipe_io) - re.Tests.sort(key=lambda x: x.Name) - re.LoadErrors.sort(key=lambda x: x.name) - self.assertEqual(len(re.Tests), 1) - self.assertEqual(len(re.LoadErrors), 0) - - self.assertEqual(re.Tests[0].Name, "test_normal_case.py?test_success") - - self.assertEqual(len(case_records), 1) - self.assertIn("test_normal_case.py?test_success", case_records) - - records = case_records["test_normal_case.py?test_success"] - self.assertEqual(len(records), 3) - self.assertEqual(records[0], "压缩机测试") - self.assertEqual(records[1], "解压机测试") - self.assertEqual(records[2], "循环机测试") + with tempfile.TemporaryDirectory() as tmpdir: + report_file = Path(tmpdir) / "result.json" + entry = EntryParam( + TaskId="aa", + ProjectPath=self.testdata_dir, + TestSelectors=[ + "test_normal_case.py?test_success→压缩机测试", + "test_normal_case.py?test_success→解压机测试", + "test_normal_case.py?test_success→循环机测试", + ], + FileReportPath=str(report_file), + ) + + case_records = {} + + def loader_extend( + param_1: str, param_2: LoadResult, param_3: Dict[str, List[str]] + ) -> None: + case_records.update(param_3) + + collect_testcases(entry, extra_load_function=loader_extend) + + re = read_file_load_result(report_file) + re.Tests.sort(key=lambda x: x.Name) + re.LoadErrors.sort(key=lambda x: x.name) + self.assertEqual(len(re.Tests), 1) + self.assertEqual(len(re.LoadErrors), 0) + + self.assertEqual(re.Tests[0].Name, "test_normal_case.py?test_success") + + self.assertEqual(len(case_records), 1) + self.assertIn("test_normal_case.py?test_success", case_records) + + records = case_records["test_normal_case.py?test_success"] + self.assertEqual(len(records), 3) + self.assertEqual(records[0], "压缩机测试") + self.assertEqual(records[1], "解压机测试") + self.assertEqual(records[2], "循环机测试") def test_collect_testcases_when_testcase_not_exist(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=self.testdata_dir, - TestSelectors=[ + with tempfile.TemporaryDirectory() as tmpdir: + report_file = Path(tmpdir) / "result.json" + entry = EntryParam( + TaskId="aa", + ProjectPath=self.testdata_dir, + TestSelectors=[ + "test_normal_case.py?name=not_exist", + ], + FileReportPath=str(report_file), + ) + + collect_testcases(entry) + + re = read_file_load_result(report_file) + re.Tests.sort(key=lambda x: x.Name) + re.LoadErrors.sort(key=lambda x: x.name) + self.assertEqual(len(re.LoadErrors), 1) + + self.assertEqual( + re.LoadErrors[0].name, "test_normal_case.py?name=not_exist", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - collect_testcases(entry, pipe_io) - pipe_io.seek(0) - - re = read_load_result(pipe_io) - re.Tests.sort(key=lambda x: x.Name) - re.LoadErrors.sort(key=lambda x: x.name) - self.assertEqual(len(re.LoadErrors), 1) - - self.assertEqual( - re.LoadErrors[0].name, - "test_normal_case.py?name=not_exist", - ) + ) def test_collect_testcases_with_skipp_error(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=self.testdata_dir, - TestSelectors=[ - "test_normal_case.py", - "test_skipped_error.py", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - collect_testcases(entry, pipe_io) - pipe_io.seek(0) - - re = read_load_result(pipe_io) - re.Tests.sort(key=lambda x: x.Name) - re.LoadErrors.sort(key=lambda x: x.name) - self.assertEqual(len(re.Tests), 3) - self.assertEqual(len(re.LoadErrors), 1) + with tempfile.TemporaryDirectory() as tmpdir: + report_file = Path(tmpdir) / "result.json" + entry = EntryParam( + TaskId="aa", + ProjectPath=self.testdata_dir, + TestSelectors=[ + "test_normal_case.py", + "test_skipped_error.py", + ], + FileReportPath=str(report_file), + ) + + collect_testcases(entry) + + re = read_file_load_result(report_file) + re.Tests.sort(key=lambda x: x.Name) + re.LoadErrors.sort(key=lambda x: x.name) + self.assertEqual(len(re.Tests), 3) + self.assertEqual(len(re.LoadErrors), 1) def test_collect_testcases_with_emoji(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=self.testdata_dir, - TestSelectors=[ - "test_emoji_data_drive.py", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - collect_testcases(entry, pipe_io) - pipe_io.seek(0) - - re = read_load_result(pipe_io) - re.Tests.sort(key=lambda x: x.Name) - re.LoadErrors.sort(key=lambda x: x.name) - self.assertEqual(len(re.Tests), 1) - self.assertEqual(len(re.LoadErrors), 0) - self.assertEqual( - re.Tests[0].Name, - "test_emoji_data_drive.py?test_emoji_data_drive_name/[😄]", - ) - + with tempfile.TemporaryDirectory() as tmpdir: + report_file = Path(tmpdir) / "result.json" + entry = EntryParam( + TaskId="aa", + ProjectPath=self.testdata_dir, + TestSelectors=[ + "test_emoji_data_drive.py", + ], + FileReportPath=str(report_file), + ) + + collect_testcases(entry) + + re = read_file_load_result(report_file) + re.Tests.sort(key=lambda x: x.Name) + re.LoadErrors.sort(key=lambda x: x.name) + self.assertEqual(len(re.Tests), 1) + self.assertEqual(len(re.LoadErrors), 0) + self.assertEqual( + re.Tests[0].Name, + "test_emoji_data_drive.py?test_emoji_data_drive_name/[😄]", + ) + def test_collect_testcases_with_coding_testcase_id(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=self.testdata_dir, - TestSelectors=[ - "test_coding_id.py", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - collect_testcases(entry, pipe_io) - pipe_io.seek(0) - - re = read_load_result(pipe_io) - re.Tests.sort(key=lambda x: x.Name) - re.LoadErrors.sort(key=lambda x: x.name) - self.assertEqual(len(re.Tests), 3) - self.assertEqual(len(re.LoadErrors), 0) - self.assertEqual( - re.Tests[0].Name, - 'test_coding_id.py?test_eval/[2+4-6]', - ) - self.assertEqual(re.Tests[1].Attributes['coding_testcase_id'], '789') + with tempfile.TemporaryDirectory() as tmpdir: + report_file = Path(tmpdir) / "result.json" + entry = EntryParam( + TaskId="aa", + ProjectPath=self.testdata_dir, + TestSelectors=[ + "test_coding_id.py", + ], + FileReportPath=str(report_file), + ) + + collect_testcases(entry) + + re = read_file_load_result(report_file) + re.Tests.sort(key=lambda x: x.Name) + re.LoadErrors.sort(key=lambda x: x.name) + self.assertEqual(len(re.Tests), 3) + self.assertEqual(len(re.LoadErrors), 0) + self.assertEqual( + re.Tests[0].Name, + "test_coding_id.py?test_eval/[2+4-6]", + ) + self.assertEqual(re.Tests[1].Attributes["coding_testcase_id"], "789") diff --git a/pytest/tests/test_executor.py b/pytest/tests/test_executor.py index beadf32..d65a8b7 100644 --- a/pytest/tests/test_executor.py +++ b/pytest/tests/test_executor.py @@ -1,4 +1,4 @@ -import io +import tempfile import unittest from datetime import datetime, timedelta from pathlib import Path @@ -6,7 +6,8 @@ from testsolar_testtool_sdk.model.param import EntryParam from testsolar_testtool_sdk.model.testresult import ResultType, LogLevel -from testsolar_testtool_sdk.pipe_reader import read_test_result +from testsolar_testtool_sdk.file_reader import read_file_test_result +from testsolar_testtool_sdk.model.test import TestCase from src.testsolar_pytestx.executor import run_testcases, append_extra_args @@ -20,228 +21,217 @@ class ExecutorTest(unittest.TestCase): testdata_dir = Path(__file__).parent.parent.absolute().joinpath("testdata") def test_run_success_testcase_with_logs(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=str(self.testdata_dir), - TestSelectors=[ - "test_normal_case.py?name=test_success&tag=A&priority=High", - ], - FileReportPath="", - ) - - current_time = datetime.utcnow() - - pipe_io = io.BytesIO() - run_testcases(entry, pipe_io) - pipe_io.seek(0) - - start = read_test_result(pipe_io) - self.assertEqual(start.ResultType, ResultType.RUNNING) - - end = read_test_result(pipe_io) - self.assertEqual(end.Test.Name, "test_normal_case.py?test_success") - self.assertEqual(end.Test.Attributes["tags"], '["high"]') - self.assertEqual(end.Test.Attributes["owner"], "foo") - elapse: timedelta = convert_to_datetime(str(end.StartTime)) - current_time - self.assertLess(elapse.total_seconds(), 0.2) - elapse_end: timedelta = convert_to_datetime(str(end.EndTime)) - current_time - self.assertLess(elapse_end.total_seconds(), 0.2) - self.assertGreater(elapse_end.total_seconds(), 0) - self.assertEqual(end.ResultType, ResultType.SUCCEED) - - self.assertEqual(len(end.Steps), 3) - - # 检查Setup的时间是否符合要求 - step1 = end.Steps[0] - self.assertEqual(step1.Title, "Setup") - elapse = convert_to_datetime(str(step1.StartTime)) - current_time - self.assertLess(elapse.total_seconds(), 0.2) - self.assertGreater(elapse.total_seconds(), 0) - elapse = convert_to_datetime(str(step1.EndTime)) - current_time - self.assertLess(elapse.total_seconds(), 0.2) - self.assertGreater(elapse.total_seconds(), 0) - - # 检查Log的时间是否符合要求 - self.assertEqual(len(step1.Logs), 1) - self.assertEqual(step1.ResultType, ResultType.SUCCEED) - log = step1.Logs[0] - self.assertEqual(log.Level, LogLevel.INFO) - self.assertIn("this is setup", log.Content) - elapse = convert_to_datetime(str(log.Time)) - current_time - self.assertGreater(elapse.total_seconds(), 0) - self.assertLess(elapse.total_seconds(), 0.2) - - # 检查 Run TestCase 时间是否符合要求 - step2 = end.Steps[1] - self.assertEqual(step2.Title, "Run TestCase") - elapse = convert_to_datetime(str(step2.StartTime)) - current_time - self.assertLess(elapse.total_seconds(), 0.2) - self.assertGreater(elapse.total_seconds(), 0) - elapse = convert_to_datetime(str(step2.EndTime)) - current_time - self.assertLess(elapse.total_seconds(), 0.2) - self.assertGreater(elapse.total_seconds(), 0) - - self.assertEqual(len(step2.Logs), 1) - self.assertEqual(step2.Logs[0].Level, LogLevel.INFO) - self.assertEqual(step2.ResultType, ResultType.SUCCEED) - self.assertIn("this is print sample output", step2.Logs[0].Content) - - # 检查 Teardown 是否是否符合要求 - step3 = end.Steps[2] - self.assertEqual(step3.Title, "Teardown") - self.assertEqual(len(step3.Logs), 1) - self.assertEqual(step3.Logs[0].Level, LogLevel.INFO) - self.assertEqual(step3.ResultType, ResultType.SUCCEED) - self.assertEqual( - step3.Logs[0].Content, - """this is setup + with tempfile.TemporaryDirectory() as tmpdir: + report_dir = Path(tmpdir) + entry = EntryParam( + TaskId="aa", + ProjectPath=str(self.testdata_dir), + TestSelectors=[ + "test_normal_case.py?name=test_success&tag=A&priority=High", + ], + FileReportPath=str(report_dir), + ) + + current_time = datetime.utcnow() + + run_testcases(entry) + + test_case = TestCase(Name="test_normal_case.py?test_success", Attributes={}) + end = read_file_test_result(report_dir, test_case) + self.assertEqual(end.Test.Name, "test_normal_case.py?test_success") + self.assertEqual(end.Test.Attributes["tags"], '["high"]') + self.assertEqual(end.Test.Attributes["owner"], "foo") + elapse: timedelta = convert_to_datetime(str(end.StartTime)) - current_time + self.assertLess(elapse.total_seconds(), 0.2) + elapse_end: timedelta = convert_to_datetime(str(end.EndTime)) - current_time + self.assertLess(elapse_end.total_seconds(), 0.2) + self.assertGreater(elapse_end.total_seconds(), 0) + self.assertEqual(end.ResultType, ResultType.SUCCEED) + + self.assertEqual(len(end.Steps), 3) + + # 检查Setup的时间是否符合要求 + step1 = end.Steps[0] + self.assertEqual(step1.Title, "Setup") + elapse = convert_to_datetime(str(step1.StartTime)) - current_time + self.assertLess(elapse.total_seconds(), 0.2) + self.assertGreater(elapse.total_seconds(), 0) + elapse = convert_to_datetime(str(step1.EndTime)) - current_time + self.assertLess(elapse.total_seconds(), 0.2) + self.assertGreater(elapse.total_seconds(), 0) + + # 检查Log的时间是否符合要求 + self.assertEqual(len(step1.Logs), 1) + self.assertEqual(step1.ResultType, ResultType.SUCCEED) + log = step1.Logs[0] + self.assertEqual(log.Level, LogLevel.INFO) + self.assertIn("this is setup", log.Content) + elapse = convert_to_datetime(str(log.Time)) - current_time + self.assertGreater(elapse.total_seconds(), 0) + self.assertLess(elapse.total_seconds(), 0.2) + + # 检查 Run TestCase 时间是否符合要求 + step2 = end.Steps[1] + self.assertEqual(step2.Title, "Run TestCase") + elapse = convert_to_datetime(str(step2.StartTime)) - current_time + self.assertLess(elapse.total_seconds(), 0.2) + self.assertGreater(elapse.total_seconds(), 0) + elapse = convert_to_datetime(str(step2.EndTime)) - current_time + self.assertLess(elapse.total_seconds(), 0.2) + self.assertGreater(elapse.total_seconds(), 0) + + self.assertEqual(len(step2.Logs), 1) + self.assertEqual(step2.Logs[0].Level, LogLevel.INFO) + self.assertEqual(step2.ResultType, ResultType.SUCCEED) + self.assertIn("this is print sample output", step2.Logs[0].Content) + + # 检查 Teardown 是否是否符合要求 + step3 = end.Steps[2] + self.assertEqual(step3.Title, "Teardown") + self.assertEqual(len(step3.Logs), 1) + self.assertEqual(step3.Logs[0].Level, LogLevel.INFO) + self.assertEqual(step3.ResultType, ResultType.SUCCEED) + self.assertEqual( + step3.Logs[0].Content, + """this is setup this is print sample output this is teardown """, - ) - elapse = convert_to_datetime(str(step3.StartTime)) - current_time - self.assertLess(elapse.total_seconds(), 0.2) - self.assertGreater(elapse.total_seconds(), 0) - elapse = convert_to_datetime(str(step3.EndTime)) - current_time - self.assertLess(elapse.total_seconds(), 0.2) - self.assertGreater(elapse.total_seconds(), 0) + ) + elapse = convert_to_datetime(str(step3.StartTime)) - current_time + self.assertLess(elapse.total_seconds(), 0.2) + self.assertGreater(elapse.total_seconds(), 0) + elapse = convert_to_datetime(str(step3.EndTime)) - current_time + self.assertLess(elapse.total_seconds(), 0.2) + self.assertGreater(elapse.total_seconds(), 0) def test_run_success_testcase_with_one_invalid_selector(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=str(self.testdata_dir), - TestSelectors=[ - "test_normal_case.py?name=test_success", - "test_invalid_case.py?test_success", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - run_testcases(entry, pipe_io) - pipe_io.seek(0) - - start = read_test_result(pipe_io) - self.assertEqual(start.ResultType, ResultType.RUNNING) + with tempfile.TemporaryDirectory() as tmpdir: + report_dir = Path(tmpdir) + entry = EntryParam( + TaskId="aa", + ProjectPath=str(self.testdata_dir), + TestSelectors=[ + "test_normal_case.py?name=test_success", + "test_invalid_case.py?test_success", + ], + FileReportPath=str(report_dir), + ) + + run_testcases(entry) def test_run_failed_testcase_with_log(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=str(self.testdata_dir), - TestSelectors=[ - "test_normal_case.py?test_failed&priority=High", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - run_testcases(entry, pipe_io) - pipe_io.seek(0) - - start = read_test_result(pipe_io) - self.assertEqual(start.ResultType, ResultType.RUNNING) - - end = read_test_result(pipe_io) - self.assertEqual(end.ResultType, ResultType.FAILED) - self.assertEqual(len(end.Steps), 3) - self.assertIn("testdata/test_normal_case.py", end.Message) - - step2 = end.Steps[1] - self.assertEqual(len(step2.Logs), 1) - self.assertEqual(step2.Logs[0].Level, LogLevel.ERROR) - self.assertEqual(step2.ResultType, ResultType.FAILED) - self.assertIn("E assert 4 == 6", step2.Logs[0].Content) + with tempfile.TemporaryDirectory() as tmpdir: + report_dir = Path(tmpdir) + entry = EntryParam( + TaskId="aa", + ProjectPath=str(self.testdata_dir), + TestSelectors=[ + "test_normal_case.py?test_failed&priority=High", + ], + FileReportPath=str(report_dir), + ) + + run_testcases(entry) + + test_case = TestCase(Name="test_normal_case.py?test_failed", Attributes={}) + end = read_file_test_result(report_dir, test_case) + self.assertEqual(end.ResultType, ResultType.FAILED) + self.assertEqual(len(end.Steps), 3) + self.assertIn("testdata/test_normal_case.py", end.Message) + + step2 = end.Steps[1] + self.assertEqual(len(step2.Logs), 1) + self.assertEqual(step2.Logs[0].Level, LogLevel.ERROR) + self.assertEqual(step2.ResultType, ResultType.FAILED) + self.assertIn("E assert 4 == 6", step2.Logs[0].Content) def test_run_failed_testcase_with_raise_error(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=str(self.testdata_dir), - TestSelectors=[ - "test_normal_case.py?test_raise_error", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - run_testcases(entry, pipe_io) - pipe_io.seek(0) - - start = read_test_result(pipe_io) - self.assertEqual(start.ResultType, ResultType.RUNNING) - - end = read_test_result(pipe_io) - self.assertEqual(end.ResultType, ResultType.FAILED) - self.assertEqual(len(end.Steps), 3) - - step2 = end.Steps[1] - self.assertEqual(len(step2.Logs), 1) - self.assertEqual(step2.Logs[0].Level, LogLevel.ERROR) - self.assertEqual(step2.ResultType, ResultType.FAILED) - self.assertIn("E RuntimeError: this is raise runtime error", step2.Logs[0].Content) + with tempfile.TemporaryDirectory() as tmpdir: + report_dir = Path(tmpdir) + entry = EntryParam( + TaskId="aa", + ProjectPath=str(self.testdata_dir), + TestSelectors=[ + "test_normal_case.py?test_raise_error", + ], + FileReportPath=str(report_dir), + ) + + run_testcases(entry) + + test_case = TestCase(Name="test_normal_case.py?test_raise_error", Attributes={}) + end = read_file_test_result(report_dir, test_case) + self.assertEqual(end.ResultType, ResultType.FAILED) + self.assertEqual(len(end.Steps), 3) + + step2 = end.Steps[1] + self.assertEqual(len(step2.Logs), 1) + self.assertEqual(step2.Logs[0].Level, LogLevel.ERROR) + self.assertEqual(step2.ResultType, ResultType.FAILED) + self.assertIn( + "E RuntimeError: this is raise runtime error", step2.Logs[0].Content + ) def test_run_skipped_testcase(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=str(self.testdata_dir), - TestSelectors=[ - "test_skipped.py?test_filtered", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - run_testcases(entry, pipe_io) - pipe_io.seek(0) - - start = read_test_result(pipe_io) - self.assertEqual(start.ResultType, ResultType.RUNNING) - - end = read_test_result(pipe_io) - self.assertEqual(end.ResultType, ResultType.IGNORED) - self.assertEqual(len(end.Steps), 2) - self.assertEqual(end.Message, "Skipped: no way of currently testing this") + with tempfile.TemporaryDirectory() as tmpdir: + report_dir = Path(tmpdir) + entry = EntryParam( + TaskId="aa", + ProjectPath=str(self.testdata_dir), + TestSelectors=[ + "test_skipped.py?test_filtered", + ], + FileReportPath=str(report_dir), + ) + + run_testcases(entry) + + test_case = TestCase(Name="test_skipped.py?test_filtered", Attributes={}) + end = read_file_test_result(report_dir, test_case) + self.assertEqual(end.ResultType, ResultType.IGNORED) + self.assertEqual(len(end.Steps), 2) + self.assertEqual(end.Message, "Skipped: no way of currently testing this") def test_run_datadrive_with_single_value(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=str(self.testdata_dir), - TestSelectors=[ - "test_data_drive.py?test_eval/[2+4-6]", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - run_testcases(entry, pipe_io) - pipe_io.seek(0) - - start = read_test_result(pipe_io) - self.assertEqual(start.ResultType, ResultType.RUNNING) - - end = read_test_result(pipe_io) - self.assertEqual(end.ResultType, ResultType.SUCCEED) - self.assertEqual(len(end.Steps), 3) + with tempfile.TemporaryDirectory() as tmpdir: + report_dir = Path(tmpdir) + entry = EntryParam( + TaskId="aa", + ProjectPath=str(self.testdata_dir), + TestSelectors=[ + "test_data_drive.py?test_eval/[2+4-6]", + ], + FileReportPath=str(report_dir), + ) + + run_testcases(entry) + + test_case = TestCase(Name="test_data_drive.py?test_eval/[2+4-6]", Attributes={}) + end = read_file_test_result(report_dir, test_case) + self.assertEqual(end.ResultType, ResultType.SUCCEED) + self.assertEqual(len(end.Steps), 3) def test_run_datadrive_with_utf8_str(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=str(self.testdata_dir), - TestSelectors=[ - "test_data_drive_zh_cn.py?test_include/[中文-中文汉字]", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - run_testcases(entry, pipe_io) - pipe_io.seek(0) - - start = read_test_result(pipe_io) - self.assertEqual(start.ResultType, ResultType.RUNNING) - - end = read_test_result(pipe_io) - self.assertEqual(end.ResultType, ResultType.SUCCEED) - self.assertEqual(len(end.Steps), 3) + with tempfile.TemporaryDirectory() as tmpdir: + report_dir = Path(tmpdir) + entry = EntryParam( + TaskId="aa", + ProjectPath=str(self.testdata_dir), + TestSelectors=[ + "test_data_drive_zh_cn.py?test_include/[中文-中文汉字]", + ], + FileReportPath=str(report_dir), + ) + + run_testcases(entry) + + test_case = TestCase( + Name="test_data_drive_zh_cn.py?test_include/[中文-中文汉字]", Attributes={} + ) + end = read_file_test_result(report_dir, test_case) + self.assertEqual(end.ResultType, ResultType.SUCCEED) + self.assertEqual(len(end.Steps), 3) def test_split_args_with_space(self): args = [] @@ -262,40 +252,43 @@ def test_split_args_with_space(self): self.assertEqual(args[5], "fast iu897 nuh") def test_run_not_exist_selector(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=str(self.testdata_dir), - TestSelectors=[ - "test_normal_case.py?name=not_exist", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - run_testcases(entry, pipe_io) - pipe_io.seek(0) - - result = read_test_result(pipe_io) - self.assertEqual(result.ResultType, ResultType.FAILED) - self.assertEqual(result.Test.Name, "test_normal_case.py?name=not_exist") + with tempfile.TemporaryDirectory() as tmpdir: + report_dir = Path(tmpdir) + entry = EntryParam( + TaskId="aa", + ProjectPath=str(self.testdata_dir), + TestSelectors=[ + "test_normal_case.py?name=not_exist", + ], + FileReportPath=str(report_dir), + ) + + run_testcases(entry) + + test_case = TestCase(Name="test_normal_case.py?name=not_exist", Attributes={}) + result = read_file_test_result(report_dir, test_case) + self.assertEqual(result.ResultType, ResultType.FAILED) + self.assertEqual(result.Test.Name, "test_normal_case.py?name=not_exist") def test_run_testcase_with_emoji_data_drive(self): - entry = EntryParam( - TaskId="aa", - ProjectPath=str(self.testdata_dir), - TestSelectors=[ - "test_emoji_data_drive.py?test_emoji_data_drive_name/[\U0001f604]", - ], - FileReportPath="", - ) - - pipe_io = io.BytesIO() - run_testcases(entry, pipe_io) - pipe_io.seek(0) - - start = read_test_result(pipe_io) - self.assertEqual( - start.Test.Name, - "test_emoji_data_drive.py?test_emoji_data_drive_name/[😄]", - ) - self.assertEqual(start.ResultType, ResultType.RUNNING) + with tempfile.TemporaryDirectory() as tmpdir: + report_dir = Path(tmpdir) + entry = EntryParam( + TaskId="aa", + ProjectPath=str(self.testdata_dir), + TestSelectors=[ + "test_emoji_data_drive.py?test_emoji_data_drive_name/[\U0001f604]", + ], + FileReportPath=str(report_dir), + ) + + run_testcases(entry) + + test_case = TestCase( + Name="test_emoji_data_drive.py?test_emoji_data_drive_name/[😄]", Attributes={} + ) + end = read_file_test_result(report_dir, test_case) + self.assertEqual( + end.Test.Name, + "test_emoji_data_drive.py?test_emoji_data_drive_name/[😄]", + ) diff --git a/pytest/tests/test_load.py b/pytest/tests/test_load.py index a01e467..88a0331 100644 --- a/pytest/tests/test_load.py +++ b/pytest/tests/test_load.py @@ -1,8 +1,9 @@ -import io +import tempfile +import json from pathlib import Path from unittest import TestCase -from testsolar_testtool_sdk.pipe_reader import read_load_result +from testsolar_testtool_sdk.file_reader import read_file_load_result from src.load import collect_testcases_from_args @@ -11,30 +12,35 @@ class TestCollectorEntry(TestCase): testdata_dir: str = str(Path(__file__).parent.parent.absolute().joinpath("testdata")) def test_collect_testcases_from_args(self): - pipe_io = io.BytesIO() - collect_testcases_from_args( - args=["load.py", Path.joinpath(Path(self.testdata_dir), "entry.json")], - workspace=self.testdata_dir, - pipe_io=pipe_io, - ) - - pipe_io.seek(0) - re = read_load_result(pipe_io) - re.Tests.sort(key=lambda x: x.Name) - re.LoadErrors.sort(key=lambda x: x.name) - self.assertEqual(len(re.Tests), 7) - self.assertEqual( - re.Tests[4].Name, - "test_data_drive.py?test_special_data_drive_name/[中文-分号+[id:32]]", - ) - self.assertEqual( - re.Tests[6].Name, - "test_unit_test_case.py?TestInnerCase/test_inner_case", - ) + with tempfile.TemporaryDirectory() as tmpdir: + report_file = Path(tmpdir) / "result.json" + + # 创建临时的entry.json,设置正确的FileReportPath + entry_file = Path(tmpdir) / "entry.json" + with open(Path(self.testdata_dir) / "entry.json", "r") as f: + entry_data = json.load(f) + entry_data["FileReportPath"] = str(report_file) + with open(entry_file, "w") as f: + json.dump(entry_data, f) - def test_raise_error_when_param_is_invalid(self): - with self.assertRaises(SystemExit): - pipe_io = io.BytesIO() collect_testcases_from_args( - args=["load.py"], workspace=self.testdata_dir, pipe_io=pipe_io + args=["load.py", str(entry_file)], + workspace=self.testdata_dir, + ) + + re = read_file_load_result(report_file) + re.Tests.sort(key=lambda x: x.Name) + re.LoadErrors.sort(key=lambda x: x.name) + self.assertEqual(len(re.Tests), 7) + self.assertEqual( + re.Tests[4].Name, + "test_data_drive.py?test_special_data_drive_name/[中文-分号+[id:32]]", ) + self.assertEqual( + re.Tests[6].Name, + "test_unit_test_case.py?TestInnerCase/test_inner_case", + ) + + def test_raise_error_when_param_is_invalid(self): + with self.assertRaises(SystemExit): + collect_testcases_from_args(args=["load.py"], workspace=self.testdata_dir) diff --git a/pytest/tests/test_run.py b/pytest/tests/test_run.py index cc3d7ee..2d0f685 100644 --- a/pytest/tests/test_run.py +++ b/pytest/tests/test_run.py @@ -1,12 +1,13 @@ -import io -import struct +import json +import tempfile from pathlib import Path from unittest import TestCase import dacite.exceptions import pytest from testsolar_testtool_sdk.model.testresult import ResultType -from testsolar_testtool_sdk.pipe_reader import read_test_result +from testsolar_testtool_sdk.file_reader import read_file_test_result +from testsolar_testtool_sdk.model.test import TestCase as TestCaseModel from src.run import run_testcases_from_args @@ -15,30 +16,39 @@ class TestExecuteEntry(TestCase): testdata_dir = Path(__file__).parent.parent.absolute().joinpath("testdata") def test_run_testcases_from_args(self): - pipe_io = io.BytesIO() - run_testcases_from_args( - args=[ - "run.py", - Path.joinpath(Path(self.testdata_dir), "entry.json"), - ], - workspace=str(self.testdata_dir), - pipe_io=pipe_io, - ) - - pipe_io.seek(0) - start = read_test_result(pipe_io) - self.assertEqual(start.ResultType, ResultType.RUNNING) + with tempfile.TemporaryDirectory() as tmpdir: + report_dir = Path(tmpdir) + + # 创建临时entry.json + entry_file = Path(tmpdir) / "entry.json" + with open(Path(self.testdata_dir) / "entry.json", "r") as f: + entry_data = json.load(f) + entry_data["FileReportPath"] = str(report_dir) + with open(entry_file, "w") as f: + json.dump(entry_data, f) + + run_testcases_from_args( + args=[ + "run.py", + str(entry_file), + ], + workspace=str(self.testdata_dir), + ) + + # 读取第一个测试用例的结果(不检查RUNNING状态) + test_case = TestCaseModel(Name="test_normal_case.py?test_success", Attributes={}) + start = read_file_test_result(report_dir, test_case) + # 由于FileReporter会覆盖,我们只能检查最终状态 + self.assertIn(start.ResultType, [ResultType.RUNNING, ResultType.SUCCEED]) def test_raise_error_when_param_is_invalid(self): with self.assertRaises(dacite.exceptions.MissingValueError): - pipe_io = io.BytesIO() run_testcases_from_args( args=[ "run.py", Path.joinpath(Path(self.testdata_dir), "bad_entry.json"), ], workspace=str(self.testdata_dir), - pipe_io=pipe_io, ) def test_run_some_case_of_many_case_with_custom_pytest_ini(self): @@ -46,38 +56,56 @@ def test_run_some_case_of_many_case_with_custom_pytest_ini(self): 如果用户代码仓库中存在冲突的pytest.ini选项配置,那么需要覆盖掉用户配置 """ - pipe_io = io.BytesIO() - run_testcases_from_args( - args=[ - "run.py", - str(Path(self.testdata_dir) / "custom_pytest_ini" / "entry.json"), - ], - workspace=str(self.testdata_dir / "custom_pytest_ini"), - pipe_io=pipe_io, - ) + with tempfile.TemporaryDirectory() as tmpdir: + report_dir = Path(tmpdir) - pipe_io.seek(0) - start = read_test_result(pipe_io) - self.assertEqual(start.ResultType, ResultType.RUNNING) + # 创建临时entry.json + custom_entry_path = Path(self.testdata_dir) / "custom_pytest_ini" / "entry.json" + entry_file = Path(tmpdir) / "entry.json" + with open(custom_entry_path, "r") as f: + entry_data = json.load(f) + entry_data["FileReportPath"] = str(report_dir) + with open(entry_file, "w") as f: + json.dump(entry_data, f) - end = read_test_result(pipe_io) - self.assertEqual(end.ResultType, ResultType.SUCCEED) + run_testcases_from_args( + args=[ + "run.py", + str(entry_file), + ], + workspace=str(self.testdata_dir / "custom_pytest_ini"), + ) - with pytest.raises(struct.error): - read_test_result(pipe_io) + # 读取测试结果 + test_case = TestCaseModel( + Name="many/v1/test_normal_case_01.py?test_success", Attributes={} + ) + end = read_file_test_result(report_dir, test_case) + self.assertEqual(end.ResultType, ResultType.SUCCEED) @pytest.mark.skip("暂时未实现,需要执行出错时上报忽略状态") def test_continue_run_when_one_case_is_not_found(self): - pipe_io = io.BytesIO() - run_testcases_from_args( - args=[ - "run.py", - Path.joinpath(Path(self.testdata_dir), "entry_1_case_not_found.json"), - ], - workspace=str(self.testdata_dir), - pipe_io=pipe_io, - ) - - pipe_io.seek(0) - start = read_test_result(pipe_io) - self.assertEqual(start.ResultType, ResultType.IGNORED) + with tempfile.TemporaryDirectory() as tmpdir: + report_dir = Path(tmpdir) + + entry_file = Path(tmpdir) / "entry.json" + with open( + Path.joinpath(Path(self.testdata_dir), "entry_1_case_not_found.json"), "r" + ) as f: + entry_data = json.load(f) + entry_data["FileReportPath"] = str(report_dir) + with open(entry_file, "w") as f: + json.dump(entry_data, f) + + run_testcases_from_args( + args=[ + "run.py", + str(entry_file), + ], + workspace=str(self.testdata_dir), + ) + + # 读取结果 + test_case = TestCaseModel(Name="test_normal_case.py?test_success", Attributes={}) + start = read_file_test_result(report_dir, test_case) + self.assertEqual(start.ResultType, ResultType.IGNORED) diff --git a/pytest/tests/test_run_allure.py b/pytest/tests/test_run_allure.py index 20f64e0..d6bb483 100644 --- a/pytest/tests/test_run_allure.py +++ b/pytest/tests/test_run_allure.py @@ -1,5 +1,6 @@ -import io +import json import os +import tempfile from datetime import datetime from pathlib import Path from unittest import TestCase @@ -9,7 +10,7 @@ from testsolar_testtool_sdk.model.testresult import ( TestResult, ) -from testsolar_testtool_sdk.pipe_reader import read_test_result +from testsolar_testtool_sdk.file_reader import read_file_test_result from src.run import run_testcases_from_args from src.testsolar_pytestx.extend.allure_extend import ( @@ -98,26 +99,35 @@ class TestExecuteEntry(TestCase): def test_run_testcases_from_args(self): os.environ["TESTSOLAR_TTP_ENABLEALLURE"] = "1" - pipe_io = io.BytesIO() - run_testcases_from_args( - args=[ - "run.py", - Path.joinpath(Path(self.testdata_dir), "allure_entry.json"), - ], - workspace=self.testdata_dir, - pipe_io=pipe_io, - ) - # testcase running - pipe_io.seek(0) - start = read_test_result(pipe_io) - self.assertEqual(start.ResultType, ResultType.RUNNING) - self.assertEqual(start.Test.Name, "allure/allure_step_test.py?test_step/[data0]") - - # testcase finish - stop = read_test_result(pipe_io) - self.assertEqual(stop.ResultType, ResultType.SUCCEED) - self.assertEqual(stop.Test.Name, "allure/allure_step_test.py?test_step/[data0]") - self.assertEqual(stop.Message, "") - self.assertEqual(type(stop.Steps), list) - self.assertEqual(len(stop.Steps), 6) + with tempfile.TemporaryDirectory() as tmpdir: + report_dir = Path(tmpdir) + + # 创建临时entry.json + entry_file = Path(tmpdir) / "entry.json" + with open(Path.joinpath(Path(self.testdata_dir), "allure_entry.json"), "r") as f: + entry_data = json.load(f) + entry_data["FileReportPath"] = str(report_dir) + with open(entry_file, "w") as f: + json.dump(entry_data, f) + + run_testcases_from_args( + args=[ + "run.py", + str(entry_file), + ], + workspace=self.testdata_dir, + ) + + # 读取测试结果(不检查RUNNING状态) + test_case = TestSolar_TestCase( + Name="allure/allure_step_test.py?test_step/[data0]", Attributes={} + ) + stop = read_file_test_result(report_dir, test_case) + self.assertEqual(stop.ResultType, ResultType.SUCCEED) + self.assertEqual(stop.Test.Name, "allure/allure_step_test.py?test_step/[data0]") + + # Check allure step info + self.assertEqual(len(stop.Steps), 6) + self.assertEqual(stop.Steps[0].Title, "1: First step") + self.assertEqual(stop.Steps[0].ResultType, ResultType.SUCCEED) diff --git a/pytest/uv.lock b/pytest/uv.lock index 7f1a063..f3095d7 100644 --- a/pytest/uv.lock +++ b/pytest/uv.lock @@ -1,4 +1,5 @@ version = 1 +revision = 1 requires-python = ">=3.7" [[package]] @@ -361,7 +362,7 @@ name = "portalocker" version = "2.7.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "pywin32", marker = "platform_system == 'Windows'" }, + { name = "pywin32", marker = "sys_platform == 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/1f/f8/969e6f280201b40b31bcb62843c619f343dcc351dff83a5891530c9dd60e/portalocker-2.7.0.tar.gz", hash = "sha256:032e81d534a88ec1736d03f780ba073f047a06c478b06e2937486f334e955c51", size = 20183 } wheels = [ @@ -462,7 +463,7 @@ wheels = [ [[package]] name = "testsolar-pytestx" -version = "0.1.55" +version = "0.1.56" source = { virtual = "." } dependencies = [ { name = "coverage" }, From 2ae85a0b4a7f5f6dd33e60c5376e366e8fa75031 Mon Sep 17 00:00:00 2001 From: zixingdeng Date: Wed, 26 Nov 2025 15:16:13 +0800 Subject: [PATCH 04/10] =?UTF-8?q?=E5=8D=87=E7=BA=A7=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytest/pyproject.toml | 2 +- pytest/testtool.yaml | 2 +- pytest/uv.lock | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pytest/pyproject.toml b/pytest/pyproject.toml index 82e0262..42ee16e 100644 --- a/pytest/pyproject.toml +++ b/pytest/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "testsolar-pytestx" -version = "0.1.56" +version = "0.1.57" description = "Pytest tool for TestSolar" authors = [ {name = "asiazhang2002", email = "asiazhang2002@gmail.com"}, diff --git a/pytest/testtool.yaml b/pytest/testtool.yaml index 4e7124f..080cc59 100644 --- a/pytest/testtool.yaml +++ b/pytest/testtool.yaml @@ -2,7 +2,7 @@ schemaVersion: 1.0 name: pytest nameZh: pytest自动化测试 lang: python -version: '0.1.56' +version: '0.1.57' langType: INTERPRETED description: |- pytest是一个成熟的全功能Python测试工具,可以帮助您编写更好的程序。此测试工具允许您在TestSolar上运行pytest。 diff --git a/pytest/uv.lock b/pytest/uv.lock index f3095d7..5f70638 100644 --- a/pytest/uv.lock +++ b/pytest/uv.lock @@ -463,7 +463,7 @@ wheels = [ [[package]] name = "testsolar-pytestx" -version = "0.1.56" +version = "0.1.57" source = { virtual = "." } dependencies = [ { name = "coverage" }, From e5e6bf24fc39935f25c47107e58faec1814ea46e Mon Sep 17 00:00:00 2001 From: zixingdeng Date: Wed, 26 Nov 2025 18:04:49 +0800 Subject: [PATCH 05/10] =?UTF-8?q?=E5=85=BC=E5=AE=B9windows=E5=B9=B3?= =?UTF-8?q?=E5=8F=B0=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytest/src/testsolar_pytestx/converter.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pytest/src/testsolar_pytestx/converter.py b/pytest/src/testsolar_pytestx/converter.py index 6834d12..78f22c0 100644 --- a/pytest/src/testsolar_pytestx/converter.py +++ b/pytest/src/testsolar_pytestx/converter.py @@ -78,6 +78,8 @@ def pytest_to_selector(item: Item, project_path: str) -> str: if hasattr(item, "path") and hasattr(item, "cls") and item.path: rel_path = os.path.relpath(item.path, project_path) + # 统一使用正斜杠,确保跨平台一致性 + rel_path = rel_path.replace(os.sep, "/") name = item.name if item.cls: name = item.cls.__name__ + "/" + name @@ -87,6 +89,8 @@ def pytest_to_selector(item: Item, project_path: str) -> str: full_name = normalize_testcase_name(item.nodeid) else: rel_path, _, name = item.location + # 统一使用正斜杠,确保跨平台一致性 + rel_path = rel_path.replace(os.sep, "/") name = name.replace(".", "/") name = decode_datadrive(name) full_name = f"{rel_path}?{name}" @@ -140,6 +144,8 @@ def normalize_testcase_name(name: str, sub_case_key: Optional[str] = None) -> st -> test_directory/test_module.py?TestExampleClass/test_example_function/[datedrive] """ assert "::" in name + # 统一使用正斜杠,确保跨平台一致性 + name = name.replace(os.sep, "/") name = name.replace("::", "?", 1).replace( # 第一个分割符是文件,因此替换为? "::", "/" ) # 后续的分割符是测试用例名称,替换为/ From 4bebe65c54722d2a82f3d959f61f1e35fdfbc144 Mon Sep 17 00:00:00 2001 From: zixingdeng Date: Wed, 26 Nov 2025 18:31:06 +0800 Subject: [PATCH 06/10] =?UTF-8?q?=E5=8D=87=E7=BA=A7=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pytest/pyproject.toml | 2 +- pytest/testtool.yaml | 2 +- pytest/uv.lock | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pytest/pyproject.toml b/pytest/pyproject.toml index 42ee16e..838119b 100644 --- a/pytest/pyproject.toml +++ b/pytest/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "testsolar-pytestx" -version = "0.1.57" +version = "0.1.58" description = "Pytest tool for TestSolar" authors = [ {name = "asiazhang2002", email = "asiazhang2002@gmail.com"}, diff --git a/pytest/testtool.yaml b/pytest/testtool.yaml index 080cc59..79a4c81 100644 --- a/pytest/testtool.yaml +++ b/pytest/testtool.yaml @@ -2,7 +2,7 @@ schemaVersion: 1.0 name: pytest nameZh: pytest自动化测试 lang: python -version: '0.1.57' +version: '0.1.58' langType: INTERPRETED description: |- pytest是一个成熟的全功能Python测试工具,可以帮助您编写更好的程序。此测试工具允许您在TestSolar上运行pytest。 diff --git a/pytest/uv.lock b/pytest/uv.lock index 5f70638..a5ab4eb 100644 --- a/pytest/uv.lock +++ b/pytest/uv.lock @@ -463,7 +463,7 @@ wheels = [ [[package]] name = "testsolar-pytestx" -version = "0.1.57" +version = "0.1.58" source = { virtual = "." } dependencies = [ { name = "coverage" }, From e1e3c53c7d166c3e7103142d4077d60fbab60a23 Mon Sep 17 00:00:00 2001 From: ambzhang <450145249@qq.com> Date: Wed, 10 Dec 2025 17:52:04 +0800 Subject: [PATCH 07/10] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E5=99=A8=E5=92=8Callure=E6=89=A9=E5=B1=95=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 解决执行器和allure扩展中的逻辑错误,提升测试稳定性。 --- pytest/src/testsolar_pytestx/executor.py | 4 +++- .../testsolar_pytestx/extend/allure_extend.py | 22 +++++++++++++++---- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/pytest/src/testsolar_pytestx/executor.py b/pytest/src/testsolar_pytestx/executor.py index 00bea78..d5bec05 100644 --- a/pytest/src/testsolar_pytestx/executor.py +++ b/pytest/src/testsolar_pytestx/executor.py @@ -258,7 +258,9 @@ def run_testcases( enable_allure = check_allure_enable() if enable_allure: print("Start allure test") - allure_dir = os.path.join(entry.ProjectPath, "allure_results") + # 通过给allure_results增加当前进程pid来生成唯一的路径,避免并发冲突 + process_id = os.getpid() + allure_dir = os.path.join(entry.ProjectPath, f"allure_results_{process_id}") args.append("--alluredir={}".format(allure_dir)) initialization_allure_dir(allure_dir) diff --git a/pytest/src/testsolar_pytestx/extend/allure_extend.py b/pytest/src/testsolar_pytestx/extend/allure_extend.py index 493a9c4..5ef3508 100644 --- a/pytest/src/testsolar_pytestx/extend/allure_extend.py +++ b/pytest/src/testsolar_pytestx/extend/allure_extend.py @@ -80,9 +80,16 @@ def initialization_allure_dir(allure_dir: str) -> None: logger.info(f"Initializing Allure directory: {allure_dir}") if Path(allure_dir).exists(): logger.info(f"Directory {allure_dir} exists. Removing it.") - shutil.rmtree(allure_dir) - os.makedirs(allure_dir, exist_ok=True) - logger.info(f"Directory {allure_dir} created.") + try: + shutil.rmtree(allure_dir) + except (OSError, FileNotFoundError) as e: + logger.warning(f"Failed to remove existing directory {allure_dir}: {e}") + + try: + os.makedirs(allure_dir, exist_ok=True) + logger.info(f"Directory {allure_dir} created.") + except OSError as e: + logger.error(f"Failed to create directory {allure_dir}: {e}") def generate_allure_results( @@ -110,8 +117,15 @@ def generate_allure_results( ) logger.debug(f"Formatted test case name: {testcase_format_name}") + # 处理参数化测试用例:提取基础名称进行匹配 + testcase_base_name = testcase_format_name.split('[')[0] if '[' in testcase_format_name else testcase_format_name + if full_name != testcase_format_name: - if full_name in testcase_format_name and testcase_format_name.endswith( + if full_name == testcase_base_name: + logger.info( + f"Test case {testcase_format_name} is a parameterized case, matched base name: {full_name}" + ) + elif full_name in testcase_format_name and testcase_format_name.endswith( allure_data.name ): logger.info( From cc3860792809d8ac0d639369d74b477280a973c7 Mon Sep 17 00:00:00 2001 From: ambzhang <450145249@qq.com> Date: Wed, 10 Dec 2025 18:02:07 +0800 Subject: [PATCH 08/10] =?UTF-8?q?chore:=20=E6=9B=B4=E6=96=B0=E9=A1=B9?= =?UTF-8?q?=E7=9B=AE=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修改 pyproject.toml 和 testtool.yaml 以优化项目配置。 --- pytest/pyproject.toml | 2 +- pytest/testtool.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pytest/pyproject.toml b/pytest/pyproject.toml index 838119b..e09c7ca 100644 --- a/pytest/pyproject.toml +++ b/pytest/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "testsolar-pytestx" -version = "0.1.58" +version = "0.1.59" description = "Pytest tool for TestSolar" authors = [ {name = "asiazhang2002", email = "asiazhang2002@gmail.com"}, diff --git a/pytest/testtool.yaml b/pytest/testtool.yaml index 79a4c81..fd43e78 100644 --- a/pytest/testtool.yaml +++ b/pytest/testtool.yaml @@ -2,7 +2,7 @@ schemaVersion: 1.0 name: pytest nameZh: pytest自动化测试 lang: python -version: '0.1.58' +version: '0.1.59' langType: INTERPRETED description: |- pytest是一个成熟的全功能Python测试工具,可以帮助您编写更好的程序。此测试工具允许您在TestSolar上运行pytest。 From b9dc2b91db98a7739eb15faef993efdedbe709c4 Mon Sep 17 00:00:00 2001 From: ambzhang <450145249@qq.com> Date: Thu, 11 Dec 2025 10:43:54 +0800 Subject: [PATCH 09/10] =?UTF-8?q?chore:=20=E6=9B=B4=E6=96=B0=E9=A1=B9?= =?UTF-8?q?=E7=9B=AE=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 更新 pyproject.toml 和 testtool.yaml 以同步配置变更。 --- pytest/pyproject.toml | 2 +- pytest/testtool.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pytest/pyproject.toml b/pytest/pyproject.toml index e09c7ca..4786bb8 100644 --- a/pytest/pyproject.toml +++ b/pytest/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "testsolar-pytestx" -version = "0.1.59" +version = "0.1.69" description = "Pytest tool for TestSolar" authors = [ {name = "asiazhang2002", email = "asiazhang2002@gmail.com"}, diff --git a/pytest/testtool.yaml b/pytest/testtool.yaml index fd43e78..73469a0 100644 --- a/pytest/testtool.yaml +++ b/pytest/testtool.yaml @@ -2,7 +2,7 @@ schemaVersion: 1.0 name: pytest nameZh: pytest自动化测试 lang: python -version: '0.1.59' +version: '0.1.69' langType: INTERPRETED description: |- pytest是一个成熟的全功能Python测试工具,可以帮助您编写更好的程序。此测试工具允许您在TestSolar上运行pytest。 From 394b5c10ee24e6bc3d30ef7a488b39468969fc54 Mon Sep 17 00:00:00 2001 From: ambzhang <450145249@qq.com> Date: Thu, 11 Dec 2025 10:49:23 +0800 Subject: [PATCH 10/10] chore: update project configuration files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修改 pyproject.toml 和 testtool.yaml 以更新项目配置。 --- pytest/pyproject.toml | 2 +- pytest/testtool.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pytest/pyproject.toml b/pytest/pyproject.toml index 4786bb8..1970bbf 100644 --- a/pytest/pyproject.toml +++ b/pytest/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "testsolar-pytestx" -version = "0.1.69" +version = "0.1.70" description = "Pytest tool for TestSolar" authors = [ {name = "asiazhang2002", email = "asiazhang2002@gmail.com"}, diff --git a/pytest/testtool.yaml b/pytest/testtool.yaml index 73469a0..75fabb2 100644 --- a/pytest/testtool.yaml +++ b/pytest/testtool.yaml @@ -2,7 +2,7 @@ schemaVersion: 1.0 name: pytest nameZh: pytest自动化测试 lang: python -version: '0.1.69' +version: '0.1.70' langType: INTERPRETED description: |- pytest是一个成熟的全功能Python测试工具,可以帮助您编写更好的程序。此测试工具允许您在TestSolar上运行pytest。