Skip to content

Runner

Execute simulations across experimental grids.

trade_study.run_grid(world, scorer, grid, observables, *, annotations=None, n_jobs=1, callback=None)

Run all configurations in a grid.

Parameters:

Name Type Description Default
world Simulator

Simulator that generates (truth, observations).

required
scorer Scorer

Scorer that evaluates observables.

required
grid list[dict[str, Any]]

List of config dicts to evaluate.

required
observables list[Observable]

Observable definitions (for column ordering).

required
annotations list[Annotation] | None

Optional external annotations (costs, etc.).

None
n_jobs int

Number of parallel workers (-1 for all CPUs).

1
callback ProgressCallback | None

Optional progress callback invoked after each trial with (trial_index, total_trials, trial_result).

None

Returns:

Type Description
ResultsTable

ResultsTable with scored results.

Source code in src/trade_study/runner.py
def run_grid(
    world: Simulator,
    scorer: Scorer,
    grid: list[dict[str, Any]],
    observables: list[Observable],
    *,
    annotations: list[Annotation] | None = None,
    n_jobs: int = 1,
    callback: ProgressCallback | None = None,
) -> ResultsTable:
    """Run all configurations in a grid.

    Args:
        world: Simulator that generates (truth, observations).
        scorer: Scorer that evaluates observables.
        grid: List of config dicts to evaluate.
        observables: Observable definitions (for column ordering).
        annotations: Optional external annotations (costs, etc.).
        n_jobs: Number of parallel workers (-1 for all CPUs).
        callback: Optional progress callback invoked after each trial
            with ``(trial_index, total_trials, trial_result)``.

    Returns:
        ResultsTable with scored results.
    """
    total = len(grid)
    if n_jobs == 1:
        results: list[TrialResult] = []
        for i, cfg in enumerate(grid):
            r = _run_single(world, scorer, cfg)
            results.append(r)
            if callback is not None:
                callback(i, total, r)
    else:
        from joblib import Parallel, delayed  # type: ignore[import-untyped]

        results = Parallel(n_jobs=n_jobs)(
            delayed(_run_single)(world, scorer, cfg) for cfg in grid
        )
        if callback is not None:
            for i, r in enumerate(results):
                callback(i, total, r)

    obs_names = [o.name for o in observables]
    score_matrix = np.array([
        [r.scores.get(name, np.nan) for name in obs_names] for r in results
    ])

    ann_matrix = None
    ann_names: list[str] = []
    if annotations:
        ann_names = [a.name for a in annotations]
        ann_matrix = np.array([
            [a.resolve(r.config) for a in annotations] for r in results
        ])

    return ResultsTable(
        configs=[r.config for r in results],
        scores=score_matrix,
        observable_names=obs_names,
        annotations=ann_matrix,
        annotation_names=ann_names,
        metadata=[{"wall_seconds": r.wall_seconds} for r in results],
    )

trade_study.run_adaptive(world, scorer, factors, observables, *, n_trials=100, seed=42)

Run adaptive multi-objective optimization via optuna.

Parameters:

Name Type Description Default
world Simulator

Simulator.

required
scorer Scorer

Scorer for observables.

required
factors list[Factor]

Factor definitions (from design module).

required
observables list[Observable]

Observable definitions.

required
n_trials int

Number of optuna trials.

100
seed int

Random seed.

42

Returns:

Type Description
ResultsTable

ResultsTable with scored results.

Source code in src/trade_study/runner.py
def run_adaptive(
    world: Simulator,
    scorer: Scorer,
    factors: list[Factor],
    observables: list[Observable],
    *,
    n_trials: int = 100,
    seed: int = 42,
) -> ResultsTable:
    """Run adaptive multi-objective optimization via optuna.

    Args:
        world: Simulator.
        scorer: Scorer for observables.
        factors: Factor definitions (from design module).
        observables: Observable definitions.
        n_trials: Number of optuna trials.
        seed: Random seed.

    Returns:
        ResultsTable with scored results.
    """
    import optuna as _optuna

    from .design import FactorType

    directions_str = [
        "minimize" if o.direction == Direction.MINIMIZE else "maximize"
        for o in observables
    ]

    study = _optuna.create_study(
        directions=directions_str,
        sampler=_optuna.samplers.NSGAIISampler(seed=seed),
    )

    obs_names = [o.name for o in observables]
    obs_weights = [o.weight for o in observables]

    def objective(trial: optuna.trial.Trial) -> tuple[float, ...]:
        config: dict[str, Any] = {}
        for f in factors:
            if f.factor_type == FactorType.CONTINUOUS and f.bounds is not None:
                config[f.name] = trial.suggest_float(
                    f.name,
                    f.bounds[0],
                    f.bounds[1],
                )
            elif f.levels is not None and f.factor_type in {
                FactorType.CATEGORICAL,
                FactorType.DISCRETE,
            }:
                config[f.name] = trial.suggest_categorical(f.name, f.levels)
        truth, observations = world.generate(config)
        scores = scorer.score(truth, observations, config)
        return tuple(
            scores.get(name, float("nan")) * w
            for name, w in zip(obs_names, obs_weights, strict=True)
        )

    _optuna.logging.set_verbosity(_optuna.logging.WARNING)
    study.optimize(objective, n_trials=n_trials)

    configs = []
    score_rows = []
    for trial in study.trials:
        configs.append(trial.params)
        score_rows.append(list(trial.values))

    return ResultsTable(
        configs=configs,
        scores=np.array(score_rows),
        observable_names=obs_names,
    )

trade_study.run_successive_halving(trials, sim, *, rungs, eta=3.0, metric, mode='min')

Successive-halving multi-fidelity early-stopping (#104).

Evaluates every trial at the lowest rung, keeps the top 1/eta by metric (according to mode), promotes survivors to the next budget, and repeats until the highest rung. Every (trial, rung) evaluation is recorded as one row in the returned :class:ResultsTable, with rung index and budget stored in per-row metadata.

Parameters:

Name Type Description Default
trials list[dict[str, Any]]

Candidate configurations to evaluate.

required
sim PartialEvaluator

A :class:PartialEvaluator whose evaluate(config, budget) returns observables including metric.

required
rungs list[float]

Strictly ascending list of budgets (e.g. epochs, iterations). Length determines the number of halving rounds.

required
eta float

Reduction factor between rungs (>1). Each rung keeps ceil(n_prev / eta) survivors. Defaults to 3 per Li et al. (2017).

3.0
metric str

Observable name used to rank trials at each rung.

required
mode str

"min" (lower is better) or "max".

'min'

Returns:

Type Description
ResultsTable

class:ResultsTable whose rows are (trial, rung) evaluations.

ResultsTable

Per-row metadata contains rung (0-indexed), budget,

ResultsTable

trial_index (position in the input trials list),

ResultsTable

promoted (whether this trial advanced past this rung), and

ResultsTable

wall_seconds. Propagates :class:ValueError from the input

ResultsTable

validator when arguments are invalid.

Raises:

Type Description
KeyError

If metric is missing from a returned observables dict.

Source code in src/trade_study/runner.py
def run_successive_halving(
    trials: list[dict[str, Any]],
    sim: PartialEvaluator,
    *,
    rungs: list[float],
    eta: float = 3.0,
    metric: str,
    mode: str = "min",
) -> ResultsTable:
    """Successive-halving multi-fidelity early-stopping (#104).

    Evaluates every trial at the lowest rung, keeps the top ``1/eta`` by
    ``metric`` (according to ``mode``), promotes survivors to the next
    budget, and repeats until the highest rung. Every (trial, rung)
    evaluation is recorded as one row in the returned :class:`ResultsTable`,
    with ``rung`` index and ``budget`` stored in per-row metadata.

    Args:
        trials: Candidate configurations to evaluate.
        sim: A :class:`PartialEvaluator` whose ``evaluate(config, budget)``
            returns observables including ``metric``.
        rungs: Strictly ascending list of budgets (e.g. epochs, iterations).
            Length determines the number of halving rounds.
        eta: Reduction factor between rungs (>1). Each rung keeps
            ``ceil(n_prev / eta)`` survivors. Defaults to 3 per Li et al.
            (2017).
        metric: Observable name used to rank trials at each rung.
        mode: ``"min"`` (lower is better) or ``"max"``.

    Returns:
        :class:`ResultsTable` whose rows are (trial, rung) evaluations.
        Per-row metadata contains ``rung`` (0-indexed), ``budget``,
        ``trial_index`` (position in the input ``trials`` list),
        ``promoted`` (whether this trial advanced past this rung), and
        ``wall_seconds``. Propagates :class:`ValueError` from the input
        validator when arguments are invalid.

    Raises:
        KeyError: If ``metric`` is missing from a returned observables
            dict.
    """
    _sh_validate_inputs(trials, rungs, eta, metric, mode)

    # rung_records[r] = list of (trial_idx, budget, observables) at rung r
    rung_records: list[list[tuple[int, float, dict[str, float], float]]] = [
        [] for _ in rungs
    ]
    survivors: list[int] = list(range(len(trials)))

    for r, budget in enumerate(rungs):
        for trial_idx in survivors:
            t0 = time.perf_counter()
            obs = sim.evaluate(trials[trial_idx], budget)
            wall = time.perf_counter() - t0
            if metric not in obs:
                msg = (
                    f"run_successive_halving: PartialEvaluator did not return "
                    f"metric {metric!r} at rung {r} for trial {trial_idx}"
                )
                raise KeyError(msg)
            rung_records[r].append((trial_idx, budget, obs, wall))

        if r < len(rungs) - 1:
            ranked = sorted(
                rung_records[r],
                key=lambda row: row[2][metric],
                reverse=(mode == "max"),
            )
            n_keep = max(1, int(np.ceil(len(ranked) / eta)))
            survivors = [row[0] for row in ranked[:n_keep]]

    obs_names = _sh_collect_observables([
        [(idx, b, o) for idx, b, o, _w in rung] for rung in rung_records
    ])

    promoted_at_rung: list[set[int]] = [set() for _ in rungs]
    for r in range(len(rungs) - 1):
        ranked = sorted(
            rung_records[r],
            key=lambda row: row[2][metric],
            reverse=(mode == "max"),
        )
        n_keep = max(1, int(np.ceil(len(ranked) / eta)))
        promoted_at_rung[r] = {row[0] for row in ranked[:n_keep]}

    configs: list[dict[str, Any]] = []
    score_rows: list[list[float]] = []
    metadata: list[dict[str, Any]] = []
    for r, rung in enumerate(rung_records):
        for trial_idx, budget, obs, wall in rung:
            configs.append(trials[trial_idx])
            score_rows.append([obs.get(name, float("nan")) for name in obs_names])
            metadata.append({
                "rung": r,
                "budget": budget,
                "trial_index": trial_idx,
                "promoted": trial_idx in promoted_at_rung[r],
                "wall_seconds": wall,
            })

    return ResultsTable(
        configs=configs,
        scores=np.array(score_rows) if score_rows else np.zeros((0, len(obs_names))),
        observable_names=obs_names,
        metadata=metadata,
    )

trade_study.run_hyperband(trial_factory, sim, *, max_budget, eta=3.0, metric, mode='min')

Hyperband: multi-bracket successive-halving (#104).

Wraps :func:run_successive_halving with the bracket schedule from Li et al. (2017). Each bracket trades off the number of initial trials against the minimum budget per trial; together they hedge against picking either ratio wrong.

Parameters:

Name Type Description Default
trial_factory Callable[[int, int], list[dict[str, Any]]]

Callable (bracket_index, n_trials) -> trials that returns a fresh list of candidate configs for each bracket. Typically this wraps :func:trade_study.build_grid with a bracket-derived seed so brackets sample different points.

required
sim PartialEvaluator

A :class:PartialEvaluator.

required
max_budget float

Maximum resource R per trial.

required
eta float

Reduction factor (>1). Defaults to 3.

3.0
metric str

Observable used for ranking within each bracket.

required
mode str

"min" or "max".

'min'

Returns:

Name Type Description
Concatenated ResultsTable

class:ResultsTable across all brackets, with an

ResultsTable

additional bracket field in each row's metadata.

Raises:

Type Description
ValueError

If max_budget <= 0 or eta <= 1.

Source code in src/trade_study/runner.py
def run_hyperband(
    trial_factory: Callable[[int, int], list[dict[str, Any]]],
    sim: PartialEvaluator,
    *,
    max_budget: float,
    eta: float = 3.0,
    metric: str,
    mode: str = "min",
) -> ResultsTable:
    """Hyperband: multi-bracket successive-halving (#104).

    Wraps :func:`run_successive_halving` with the bracket schedule from
    Li et al. (2017). Each bracket trades off the number of initial
    trials against the minimum budget per trial; together they hedge
    against picking either ratio wrong.

    Args:
        trial_factory: Callable ``(bracket_index, n_trials) -> trials`` that
            returns a fresh list of candidate configs for each bracket.
            Typically this wraps :func:`trade_study.build_grid` with a
            bracket-derived seed so brackets sample different points.
        sim: A :class:`PartialEvaluator`.
        max_budget: Maximum resource ``R`` per trial.
        eta: Reduction factor (>1). Defaults to 3.
        metric: Observable used for ranking within each bracket.
        mode: ``"min"`` or ``"max"``.

    Returns:
        Concatenated :class:`ResultsTable` across all brackets, with an
        additional ``bracket`` field in each row's metadata.

    Raises:
        ValueError: If ``max_budget <= 0`` or ``eta <= 1``.
    """
    if max_budget <= 0:
        msg = "run_hyperband: max_budget must be positive"
        raise ValueError(msg)
    if eta <= 1:
        msg = "run_hyperband: eta must be > 1"
        raise ValueError(msg)

    brackets = _hyperband_brackets(max_budget, eta)

    all_configs: list[dict[str, Any]] = []
    all_scores: list[list[float]] = []
    all_metadata: list[dict[str, Any]] = []
    obs_names: list[str] = []

    for bracket_idx, (n_initial, r_min) in enumerate(brackets):
        trials = trial_factory(bracket_idx, n_initial)
        if not trials:
            continue
        s = len(brackets) - 1 - bracket_idx
        rungs = [r_min * eta**i for i in range(s + 1)]
        bracket_results = run_successive_halving(
            trials,
            sim,
            rungs=rungs,
            eta=eta,
            metric=metric,
            mode=mode,
        )
        if not obs_names:
            obs_names = bracket_results.observable_names
        elif obs_names != bracket_results.observable_names:
            # Pad / reorder so all brackets share the column layout.
            union = sorted(set(obs_names) | set(bracket_results.observable_names))
            obs_names = union

        for i, cfg in enumerate(bracket_results.configs):
            all_configs.append(cfg)
            row = bracket_results.scores[i]
            all_scores.append([
                float(row[bracket_results.observable_names.index(n)])
                if n in bracket_results.observable_names
                else float("nan")
                for n in obs_names
            ])
            meta = dict(bracket_results.metadata[i])
            meta["bracket"] = bracket_idx
            all_metadata.append(meta)

    return ResultsTable(
        configs=all_configs,
        scores=np.array(all_scores) if all_scores else np.zeros((0, len(obs_names))),
        observable_names=obs_names,
        metadata=all_metadata,
    )