跳转至

ztxexp.runner

ztxexp.runner

实验执行器。

本模块负责将配置列表调度为具体运行,并按 v2 协议写入产物: - config.json - run.json - meta.json(可选) - metrics.json(可选) - metrics.jsonl(可选) - events.jsonl(可选) - artifacts/ - run.log / error.log(按需)

ExperimentFn module-attribute

ExperimentFn = Callable[[RunContext], dict[str, Any] | None]

ExpRunner

实验执行器。

源代码位于: ztxexp/runner.py
class ExpRunner:
    """实验执行器。"""

    def __init__(
        self,
        configs: list[dict[str, Any]],
        results_root: str | Path,
        exp_function: ExperimentFn | None = None,
    ):
        self.configs = [dict(config) for config in configs]
        self.results_root = Path(results_root)
        self.exp_function = exp_function
        utils.create_dir(self.results_root)

    def run(
        self,
        exp_function: ExperimentFn | None = None,
        mode: str = "sequential",
        workers: int = 1,
        cpu_threshold: int = 80,
        execution_mode: str | None = None,
        num_workers: int | None = None,
        dynamic_cpu_threshold: int | None = None,
        metadata: RunMetadata | None = None,
        max_attempts: int = 1,
        retry_on: tuple[str, ...] = ("Exception",),
        tracker_specs: list[dict[str, Any]] | None = None,
        trackers: list[Tracker] | None = None,
    ) -> RunSummary:
        """执行全部配置并返回汇总。

        Args:
            exp_function: 单次实验函数,签名应为
                ``exp_fn(ctx: RunContext) -> dict | None``。
            mode: 执行模式。可选
                ``sequential`` / ``process_pool`` / ``joblib`` / ``dynamic``。
            workers: 并行 worker 数。
            cpu_threshold: ``dynamic`` 模式提交新任务时的 CPU 阈值。
            execution_mode: 兼容参数,等价于 ``mode``。
            num_workers: 兼容参数,等价于 ``workers``。
            dynamic_cpu_threshold: 兼容参数,等价于 ``cpu_threshold``。
            metadata: 运行元数据模板。框架会补全可采集字段。
            max_attempts: 每个配置最大尝试次数(失败重试上限)。
            retry_on: 可重试异常名集合(支持父类名,如 ``Exception``)。
            tracker_specs: 追踪器规格列表(字符串模式构造 tracker)。
            trackers: 追踪器实例列表(当前进程内对象)。

        Returns:
            RunSummary: 本次批量执行汇总(成功/失败/跳过计数与耗时)。

        Raises:
            ValueError: 未提供 ``exp_function`` 或 ``mode`` 不合法时抛出。

        Notes:
            - ``exp_fn`` 返回 ``dict`` 时自动写入 ``metrics.json``;
            - ``exp_fn`` 返回 ``None`` 时不写 ``metrics.json``;
            - 返回非 ``dict|None`` 会判定为失败并写 ``error.log``;
            - 抛出 ``SkipRun`` 会标记为 ``skipped``;
            - 成功判定以 ``run.json.status == succeeded`` 为准。

        Examples:
            >>> def exp_fn(ctx: RunContext):
            ...     return {"score": 0.9}
            >>> summary = ExpRunner([{"lr": 0.001}], "./results").run(exp_fn)
            >>> summary.total
            1
        """
        if execution_mode is not None:
            mode = execution_mode
        if num_workers is not None:
            workers = num_workers
        if dynamic_cpu_threshold is not None:
            cpu_threshold = dynamic_cpu_threshold

        experiment = exp_function or self.exp_function
        if experiment is None:
            raise ValueError("exp_function is required.")

        total = len(self.configs)
        started = time.time()

        if total == 0:
            return RunSummary(
                total=0,
                succeeded=0,
                failed=0,
                skipped=0,
                duration_sec=0.0,
                failed_run_ids=[],
            )

        resolved_specs = list(tracker_specs or [])
        has_jsonl_spec = any(
            str(spec.get("type", "")).lower() == "jsonl" for spec in resolved_specs
        )
        has_jsonl_instance = any(isinstance(tracker, JsonlTracker) for tracker in (trackers or []))
        if not has_jsonl_spec and not has_jsonl_instance:
            resolved_specs.append({"type": "jsonl", "kwargs": {}})

        if mode == "sequential":
            records = [
                _execute_single_run(
                    config=config,
                    exp_function=experiment,
                    results_root=self.results_root,
                    metadata=metadata,
                    max_attempts=max_attempts,
                    retry_on=retry_on,
                    tracker_specs=resolved_specs,
                    trackers=trackers,
                )
                for config in self.configs
            ]
        elif mode == "process_pool":
            if trackers:
                print("Live tracker instances are ignored in process_pool mode.")
            records = self._run_process_pool(
                exp_function=experiment,
                workers=workers,
                metadata=metadata,
                max_attempts=max_attempts,
                retry_on=retry_on,
                tracker_specs=resolved_specs,
            )
        elif mode == "joblib":
            if trackers:
                print("Live tracker instances are ignored in joblib mode.")
            records = self._run_joblib(
                exp_function=experiment,
                workers=workers,
                metadata=metadata,
                max_attempts=max_attempts,
                retry_on=retry_on,
                tracker_specs=resolved_specs,
            )
        elif mode == "dynamic":
            if trackers:
                print("Live tracker instances are ignored in dynamic mode.")
            records = self._run_dynamic(
                exp_function=experiment,
                workers=workers,
                cpu_threshold=cpu_threshold,
                metadata=metadata,
                max_attempts=max_attempts,
                retry_on=retry_on,
                tracker_specs=resolved_specs,
            )
        else:
            raise ValueError(
                f"Invalid mode '{mode}'. Choose from sequential/process_pool/joblib/dynamic."
            )

        duration = round(time.time() - started, 6)
        return self._summarize(records, total, duration)

    def _run_process_pool(
        self,
        exp_function: ExperimentFn,
        workers: int,
        metadata: RunMetadata | None,
        max_attempts: int,
        retry_on: tuple[str, ...],
        tracker_specs: list[dict[str, Any]],
    ) -> list[dict[str, Any]]:
        """使用 ProcessPoolExecutor 并行执行。"""
        records: list[dict[str, Any]] = []
        with ProcessPoolExecutor(max_workers=workers) as executor:
            future_map = {
                executor.submit(
                    _execute_single_run,
                    config,
                    exp_function,
                    self.results_root,
                    metadata,
                    max_attempts,
                    retry_on,
                    tracker_specs,
                    None,
                ): config
                for config in self.configs
            }
            for future in as_completed(future_map):
                try:
                    records.append(future.result())
                except Exception as exc:  # pragma: no cover
                    records.append(_failure_record_from_exception(exc))
        return records

    def _run_joblib(
        self,
        exp_function: ExperimentFn,
        workers: int,
        metadata: RunMetadata | None,
        max_attempts: int,
        retry_on: tuple[str, ...],
        tracker_specs: list[dict[str, Any]],
    ) -> list[dict[str, Any]]:
        """使用 joblib 并行执行。"""
        try:
            return Parallel(n_jobs=workers, prefer="processes")(
                delayed(_execute_single_run)(
                    config,
                    exp_function,
                    self.results_root,
                    metadata,
                    max_attempts,
                    retry_on,
                    tracker_specs,
                    None,
                )
                for config in self.configs
            )
        except Exception as exc:  # pragma: no cover
            return [_failure_record_from_exception(exc) for _ in self.configs]

    def _run_dynamic(
        self,
        exp_function: ExperimentFn,
        workers: int,
        cpu_threshold: int,
        metadata: RunMetadata | None,
        max_attempts: int,
        retry_on: tuple[str, ...],
        tracker_specs: list[dict[str, Any]],
    ) -> list[dict[str, Any]]:
        """动态调度执行(实验特性)。"""
        pending = deque(self.configs)
        in_flight: dict[Any, dict[str, Any]] = {}
        records: list[dict[str, Any]] = []

        with ProcessPoolExecutor(max_workers=workers) as executor:
            while pending or in_flight:
                cpu_usage = psutil.cpu_percent(interval=0.2)

                while pending and len(in_flight) < workers and cpu_usage < cpu_threshold:
                    config = pending.popleft()
                    future = executor.submit(
                        _execute_single_run,
                        config,
                        exp_function,
                        self.results_root,
                        metadata,
                        max_attempts,
                        retry_on,
                        tracker_specs,
                        None,
                    )
                    in_flight[future] = config
                    cpu_usage = psutil.cpu_percent(interval=0.0)

                if not in_flight:
                    time.sleep(0.2)
                    continue

                done, _ = wait(
                    in_flight.keys(),
                    timeout=0.5,
                    return_when=FIRST_COMPLETED,
                )

                for future in done:
                    in_flight.pop(future, None)
                    try:
                        records.append(future.result())
                    except Exception as exc:  # pragma: no cover
                        records.append(_failure_record_from_exception(exc))

        return records

    def _summarize(
        self,
        records: list[dict[str, Any]],
        total: int,
        duration_sec: float,
    ) -> RunSummary:
        """将执行记录聚合为 ``RunSummary``。"""
        succeeded = sum(1 for record in records if record.get("status") == RUN_STATUS_SUCCEEDED)
        failed = sum(1 for record in records if record.get("status") == RUN_STATUS_FAILED)
        skipped = sum(1 for record in records if record.get("status") == RUN_STATUS_SKIPPED)
        failed_run_ids = [
            str(record.get("run_id"))
            for record in records
            if record.get("status") == RUN_STATUS_FAILED
        ]

        return RunSummary(
            total=total,
            succeeded=succeeded,
            failed=failed,
            skipped=skipped,
            duration_sec=duration_sec,
            failed_run_ids=failed_run_ids,
        )

configs instance-attribute

configs = [(dict(config)) for config in configs]

exp_function instance-attribute

exp_function = exp_function

results_root instance-attribute

results_root = Path(results_root)

__init__

__init__(configs: list[dict[str, Any]], results_root: str | Path, exp_function: ExperimentFn | None = None)
源代码位于: ztxexp/runner.py
def __init__(
    self,
    configs: list[dict[str, Any]],
    results_root: str | Path,
    exp_function: ExperimentFn | None = None,
):
    self.configs = [dict(config) for config in configs]
    self.results_root = Path(results_root)
    self.exp_function = exp_function
    utils.create_dir(self.results_root)

run

run(exp_function: ExperimentFn | None = None, mode: str = 'sequential', workers: int = 1, cpu_threshold: int = 80, execution_mode: str | None = None, num_workers: int | None = None, dynamic_cpu_threshold: int | None = None, metadata: RunMetadata | None = None, max_attempts: int = 1, retry_on: tuple[str, ...] = ('Exception',), tracker_specs: list[dict[str, Any]] | None = None, trackers: list[Tracker] | None = None) -> RunSummary

执行全部配置并返回汇总。

参数:

名称 类型 描述 默认
exp_function ExperimentFn | None

单次实验函数,签名应为 exp_fn(ctx: RunContext) -> dict | None

None
mode str

执行模式。可选 sequential / process_pool / joblib / dynamic

'sequential'
workers int

并行 worker 数。

1
cpu_threshold int

dynamic 模式提交新任务时的 CPU 阈值。

80
execution_mode str | None

兼容参数,等价于 mode

None
num_workers int | None

兼容参数,等价于 workers

None
dynamic_cpu_threshold int | None

兼容参数,等价于 cpu_threshold

None
metadata RunMetadata | None

运行元数据模板。框架会补全可采集字段。

None
max_attempts int

每个配置最大尝试次数(失败重试上限)。

1
retry_on tuple[str, ...]

可重试异常名集合(支持父类名,如 Exception)。

('Exception',)
tracker_specs list[dict[str, Any]] | None

追踪器规格列表(字符串模式构造 tracker)。

None
trackers list[Tracker] | None

追踪器实例列表(当前进程内对象)。

None

返回:

名称 类型 描述
RunSummary RunSummary

本次批量执行汇总(成功/失败/跳过计数与耗时)。

引发:

类型 描述
ValueError

未提供 exp_functionmode 不合法时抛出。

Notes
  • exp_fn 返回 dict 时自动写入 metrics.json
  • exp_fn 返回 None 时不写 metrics.json
  • 返回非 dict|None 会判定为失败并写 error.log
  • 抛出 SkipRun 会标记为 skipped
  • 成功判定以 run.json.status == succeeded 为准。

示例:

>>> def exp_fn(ctx: RunContext):
...     return {"score": 0.9}
>>> summary = ExpRunner([{"lr": 0.001}], "./results").run(exp_fn)
>>> summary.total
1
源代码位于: ztxexp/runner.py
def run(
    self,
    exp_function: ExperimentFn | None = None,
    mode: str = "sequential",
    workers: int = 1,
    cpu_threshold: int = 80,
    execution_mode: str | None = None,
    num_workers: int | None = None,
    dynamic_cpu_threshold: int | None = None,
    metadata: RunMetadata | None = None,
    max_attempts: int = 1,
    retry_on: tuple[str, ...] = ("Exception",),
    tracker_specs: list[dict[str, Any]] | None = None,
    trackers: list[Tracker] | None = None,
) -> RunSummary:
    """执行全部配置并返回汇总。

    Args:
        exp_function: 单次实验函数,签名应为
            ``exp_fn(ctx: RunContext) -> dict | None``。
        mode: 执行模式。可选
            ``sequential`` / ``process_pool`` / ``joblib`` / ``dynamic``。
        workers: 并行 worker 数。
        cpu_threshold: ``dynamic`` 模式提交新任务时的 CPU 阈值。
        execution_mode: 兼容参数,等价于 ``mode``。
        num_workers: 兼容参数,等价于 ``workers``。
        dynamic_cpu_threshold: 兼容参数,等价于 ``cpu_threshold``。
        metadata: 运行元数据模板。框架会补全可采集字段。
        max_attempts: 每个配置最大尝试次数(失败重试上限)。
        retry_on: 可重试异常名集合(支持父类名,如 ``Exception``)。
        tracker_specs: 追踪器规格列表(字符串模式构造 tracker)。
        trackers: 追踪器实例列表(当前进程内对象)。

    Returns:
        RunSummary: 本次批量执行汇总(成功/失败/跳过计数与耗时)。

    Raises:
        ValueError: 未提供 ``exp_function`` 或 ``mode`` 不合法时抛出。

    Notes:
        - ``exp_fn`` 返回 ``dict`` 时自动写入 ``metrics.json``;
        - ``exp_fn`` 返回 ``None`` 时不写 ``metrics.json``;
        - 返回非 ``dict|None`` 会判定为失败并写 ``error.log``;
        - 抛出 ``SkipRun`` 会标记为 ``skipped``;
        - 成功判定以 ``run.json.status == succeeded`` 为准。

    Examples:
        >>> def exp_fn(ctx: RunContext):
        ...     return {"score": 0.9}
        >>> summary = ExpRunner([{"lr": 0.001}], "./results").run(exp_fn)
        >>> summary.total
        1
    """
    if execution_mode is not None:
        mode = execution_mode
    if num_workers is not None:
        workers = num_workers
    if dynamic_cpu_threshold is not None:
        cpu_threshold = dynamic_cpu_threshold

    experiment = exp_function or self.exp_function
    if experiment is None:
        raise ValueError("exp_function is required.")

    total = len(self.configs)
    started = time.time()

    if total == 0:
        return RunSummary(
            total=0,
            succeeded=0,
            failed=0,
            skipped=0,
            duration_sec=0.0,
            failed_run_ids=[],
        )

    resolved_specs = list(tracker_specs or [])
    has_jsonl_spec = any(
        str(spec.get("type", "")).lower() == "jsonl" for spec in resolved_specs
    )
    has_jsonl_instance = any(isinstance(tracker, JsonlTracker) for tracker in (trackers or []))
    if not has_jsonl_spec and not has_jsonl_instance:
        resolved_specs.append({"type": "jsonl", "kwargs": {}})

    if mode == "sequential":
        records = [
            _execute_single_run(
                config=config,
                exp_function=experiment,
                results_root=self.results_root,
                metadata=metadata,
                max_attempts=max_attempts,
                retry_on=retry_on,
                tracker_specs=resolved_specs,
                trackers=trackers,
            )
            for config in self.configs
        ]
    elif mode == "process_pool":
        if trackers:
            print("Live tracker instances are ignored in process_pool mode.")
        records = self._run_process_pool(
            exp_function=experiment,
            workers=workers,
            metadata=metadata,
            max_attempts=max_attempts,
            retry_on=retry_on,
            tracker_specs=resolved_specs,
        )
    elif mode == "joblib":
        if trackers:
            print("Live tracker instances are ignored in joblib mode.")
        records = self._run_joblib(
            exp_function=experiment,
            workers=workers,
            metadata=metadata,
            max_attempts=max_attempts,
            retry_on=retry_on,
            tracker_specs=resolved_specs,
        )
    elif mode == "dynamic":
        if trackers:
            print("Live tracker instances are ignored in dynamic mode.")
        records = self._run_dynamic(
            exp_function=experiment,
            workers=workers,
            cpu_threshold=cpu_threshold,
            metadata=metadata,
            max_attempts=max_attempts,
            retry_on=retry_on,
            tracker_specs=resolved_specs,
        )
    else:
        raise ValueError(
            f"Invalid mode '{mode}'. Choose from sequential/process_pool/joblib/dynamic."
        )

    duration = round(time.time() - started, 6)
    return self._summarize(records, total, duration)

SkipRun

Bases: Exception

主动跳过当前运行。

exp_fn 中抛出该异常时,当前 run 会被标记为 skipped, 而不是 failed。适用于“业务上不合法、无需重试”的配置分支。

示例:

>>> from ztxexp import SkipRun
>>> def exp_fn(ctx):
...     if ctx.config.get("batch_size", 0) <= 0:
...         raise SkipRun("batch_size must be positive")
...     return {"score": 0.9}
源代码位于: ztxexp/runner.py
class SkipRun(Exception):
    """主动跳过当前运行。

    在 ``exp_fn`` 中抛出该异常时,当前 run 会被标记为 ``skipped``,
    而不是 ``failed``。适用于“业务上不合法、无需重试”的配置分支。

    Examples:
        >>> from ztxexp import SkipRun
        >>> def exp_fn(ctx):
        ...     if ctx.config.get("batch_size", 0) <= 0:
        ...         raise SkipRun("batch_size must be positive")
        ...     return {"score": 0.9}
    """