"""Public API for eval results retrieval.
This module provides functions to retrieve and analyze evaluation results
for both individual eval runs and experiments.
Naming convention:
- get_{entity}() -> Returns dataclass/object
- get_{entity}_df() -> Returns detailed DataFrame
- list_{entity}s_df() -> Returns listing DataFrame
- compare_{entity}s() -> Returns comparison DataFrame
"""
from pathlib import Path
import tempfile
from typing import TYPE_CHECKING, Literal
from pyspark.sql import functions as F
import yipit_databricks_client as ydbc
from ml_toolkit.functions.eval_utils.constants import (
EVAL_EXPERIMENTS_FULL_TABLE,
EVAL_RUNS_FULL_TABLE,
ExperimentStatus,
RunStatus,
)
from ml_toolkit.functions.eval_utils.helpers.results import (
EVAL_RUNS_FULL_TABLE_SCHEMA,
format_run_summary,
get_run_registry_entry,
)
from ml_toolkit.functions.eval_utils.helpers.types import (
EvalRunResult,
ExperimentResult,
)
from ml_toolkit.ops.helpers.exceptions import (
MLOpsToolkitEvalRunNotFoundException,
MLOpsToolkitExperimentNotFoundException,
)
from ml_toolkit.ops.helpers.mlflow import get_mlflow_client
from ml_toolkit.ops.helpers.validation import table_exists
if TYPE_CHECKING:
from pyspark.sql import DataFrame
# =============================================================================
# Run-level functions (individual model evaluations)
# =============================================================================
[docs]
def get_run(run_id: str) -> EvalRunResult:
"""Get metadata for a single eval run.
Args:
run_id: The eval run ID returned from run_evaluation().
Returns:
EvalRunResult containing run metadata and metrics summary.
Raises:
MLOpsToolkitEvalRunNotFoundException: If run not found.
Example:
>>> run = get_run("eval_run_20260126_143052_abc123")
>>> print(run.metrics_summary)
{'latency_ms_p50': 145.2, 'exact_match_score_accuracy': 0.87}
"""
run_entry = get_run_registry_entry(run_id)
formatted = format_run_summary(run_entry)
return EvalRunResult(
run_id=run_id,
mlflow_run_id=formatted["mlflow_run_id"],
results_table=formatted["results_table"],
metrics_summary=formatted["metrics_summary"],
row_count=formatted["row_count"],
error_count=formatted["error_count"],
created_at=formatted["created_at"],
)
[docs]
def get_run_df(
run_id: str,
*,
include_metrics: bool = True,
filter_errors: bool = False,
) -> "DataFrame":
"""Get per-row results for an eval run as a DataFrame.
Args:
run_id: The eval run ID returned from run_evaluation().
include_metrics: If True, include per-row metric columns.
Defaults to True.
filter_errors: If True, exclude rows where inference failed.
Defaults to False.
Returns:
Spark DataFrame with columns:
- All columns from original eval table
- model_output: The model's response text
- latency_ms: Inference latency (if metric enabled)
- prompt_tokens: Input token count (if metric enabled)
- completion_tokens: Output token count (if metric enabled)
- exact_match_score: 1.0 if exact match, 0.0 otherwise
- fuzzy_match_score: Fuzzy match ratio [0.0, 1.0]
- llm_judge_score: LLM judge score (if metric enabled)
- error_message: Error message if inference failed, else NULL
Raises:
MLOpsToolkitEvalRunNotFoundException: If run not found.
Example:
>>> df = get_run_df("eval_run_20260126_143052_abc123")
>>> df.filter(df.exact_match_score == 0).show()
"""
spark = ydbc.get_spark_session()
# Get run info to find results table
run_entry = get_run_registry_entry(run_id)
results_table = run_entry.get("results_table")
if not results_table or not table_exists(results_table):
raise MLOpsToolkitEvalRunNotFoundException(run_id)
df = spark.table(results_table)
if filter_errors:
df = df.filter(F.col("error_message").isNull())
if not include_metrics:
metric_cols = [
"latency_ms",
"prompt_tokens",
"completion_tokens",
"total_tokens",
"exact_match_score",
"fuzzy_match_score",
"llm_judge_score",
"llm_judge_reasoning",
]
cols_to_drop = [c for c in metric_cols if c in df.columns]
df = df.drop(*cols_to_drop)
return df
[docs]
def list_runs_df(
eval_table: str | None = None,
*,
experiment_id: str | None = None,
status: Literal["all", "completed", "running", "failed"] = "all",
limit: int = 100,
order_by: Literal["created_at", "completed_at", "run_name"] = "created_at",
descending: bool = True,
) -> "DataFrame":
"""List eval runs as a DataFrame.
Can filter by eval table, experiment, or status.
Args:
eval_table: Optional eval table name to filter by.
experiment_id: Optional experiment ID to filter by.
status: Filter runs by status. Options: 'all', 'completed',
'running', 'failed'. Defaults to 'all'.
limit: Maximum number of runs to return. Defaults to 100.
order_by: Column to sort results by. Defaults to 'created_at'.
descending: Sort in descending order. Defaults to True.
Returns:
Spark DataFrame with run metadata columns.
Example:
>>> runs = list_runs_df(eval_table="catalog.schema.vendor_eval")
>>> runs.show()
>>> runs = list_runs_df(experiment_id="exp_20260126_143052_abc123")
>>> runs.show()
"""
spark = ydbc.get_spark_session()
if not table_exists(EVAL_RUNS_FULL_TABLE):
return spark.createDataFrame([], schema=EVAL_RUNS_FULL_TABLE_SCHEMA)
df = spark.table(EVAL_RUNS_FULL_TABLE)
# Apply filters
if eval_table:
df = df.where(F.col("eval_table") == eval_table)
if experiment_id:
df = df.where(F.col("experiment_id") == experiment_id)
if status != "all":
status_value = {
"completed": RunStatus.COMPLETED,
"running": RunStatus.RUNNING,
"failed": RunStatus.FAILED,
}[status]
df = df.where(F.col("status") == status_value)
# Compute error_rate
df = df.withColumn(
"error_rate",
F.when(
F.col("row_count") > 0, F.col("error_count") / F.col("row_count")
).otherwise(0.0),
)
# Select columns
df = df.select(
"run_id",
"run_name",
"eval_table",
"endpoint",
"litellm_model",
"status",
"row_count",
"error_count",
"error_rate",
"metrics_enabled",
"mlflow_run_id",
"mlflow_experiment",
"created_at",
"completed_at",
"duration_seconds",
"experiment_id",
"model_name",
)
# Order and limit
order_col = F.col(order_by)
if descending:
order_col = order_col.desc()
df = df.orderBy(order_col).limit(limit)
return df
[docs]
def compare_runs_df(
run_ids: list[str] | None = None,
*,
eval_table: str | None = None,
limit: int = 100,
) -> "DataFrame":
"""Get a comparison DataFrame for multiple eval runs.
Retrieves aggregated metrics across multiple runs for comparison.
Args:
run_ids: Optional list of specific run IDs to include.
eval_table: Optional eval table name to filter by.
limit: Maximum number of runs to return. Defaults to 100.
Returns:
Spark DataFrame with comparison metrics.
Example:
>>> summary = compare_runs_df(eval_table="catalog.schema.vendor_eval")
>>> summary.orderBy("exact_match_accuracy", ascending=False).show()
"""
spark = ydbc.get_spark_session()
if not table_exists(EVAL_RUNS_FULL_TABLE):
return spark.createDataFrame([], schema=EVAL_RUNS_FULL_TABLE_SCHEMA)
df = spark.table(EVAL_RUNS_FULL_TABLE)
# Apply filters
if run_ids:
df = df.filter(F.col("run_id").isin(run_ids))
if eval_table:
df = df.filter(F.col("eval_table") == eval_table)
# Only completed runs
df = df.filter(F.col("status") == RunStatus.COMPLETED)
# Parse metrics_summary JSON and extract common metrics
df = df.withColumn(
"metrics", F.from_json(F.col("metrics_summary"), "MAP<STRING, DOUBLE>")
)
df = df.select(
"run_id",
"eval_table",
F.coalesce(F.col("endpoint"), F.col("litellm_model")).alias("model"),
"run_name",
"created_at",
"row_count",
F.when(F.col("row_count") > 0, F.col("error_count") / F.col("row_count"))
.otherwise(0.0)
.alias("error_rate"),
F.col("metrics.latency_ms_p50").alias("latency_ms_p50"),
F.col("metrics.latency_ms_p99").alias("latency_ms_p99"),
F.col("metrics.exact_match_score_accuracy").alias("exact_match_accuracy"),
F.col("metrics.fuzzy_match_score_avg").alias("fuzzy_match_score_avg"),
F.col("metrics.llm_judge_score_mean").alias("llm_judge_score_mean"),
F.col("metrics.token_count_count").cast("long").alias("total_tokens"),
"mlflow_run_id",
"experiment_id",
"model_name",
)
df = df.orderBy(F.col("created_at").desc()).limit(limit)
return df
# =============================================================================
# Experiment-level functions (bundles of model runs)
# =============================================================================
def _get_experiment_registry_entry(experiment_id: str) -> dict:
"""Get experiment entry from registry.
Args:
experiment_id: The experiment ID.
Returns:
Dict with experiment info.
Raises:
MLOpsToolkitExperimentNotFoundException: If experiment not found.
"""
spark = ydbc.get_spark_session()
if not table_exists(EVAL_EXPERIMENTS_FULL_TABLE):
raise MLOpsToolkitExperimentNotFoundException(experiment_id)
df = (
spark.table(EVAL_EXPERIMENTS_FULL_TABLE)
.where(F.col("experiment_id") == experiment_id)
.limit(1)
)
if df.count() == 0:
raise MLOpsToolkitExperimentNotFoundException(experiment_id)
row = df.first()
return row.asDict()
[docs]
def get_experiment(experiment_id: str) -> ExperimentResult:
"""Get an experiment by its ID.
Args:
experiment_id: The experiment ID returned from run_experiment().
Returns:
ExperimentResult with all model results populated.
Raises:
MLOpsToolkitExperimentNotFoundException: If experiment not found.
Example:
>>> exp = get_experiment("exp_20260126_143052_abc123")
>>> exp.summary_df.show()
"""
# Get experiment from registry
exp_entry = _get_experiment_registry_entry(experiment_id)
# Get all runs for this experiment
runs_df = list_runs_df(experiment_id=experiment_id)
# Build model_results dict from runs
model_results = {}
for row in runs_df.collect():
row_dict = row.asDict()
model_name = row_dict.get("model_name")
if model_name:
# Get full run details
run_result = get_run(row_dict["run_id"])
model_results[model_name] = run_result
return ExperimentResult(
experiment_id=experiment_id,
experiment_name=exp_entry["experiment_name"],
parent_run_id=exp_entry["parent_mlflow_run_id"],
mlflow_experiment=exp_entry["mlflow_experiment"],
dataset_hash=exp_entry["dataset_hash"],
results_table=exp_entry["results_table"],
model_results=model_results,
)
[docs]
def get_experiment_yaml(
experiment_id: str,
file_name: str | None = None,
) -> str:
"""Download the experiment YAML config artifact from MLflow.
Retrieves the YAML configuration that was saved when the experiment
was run via run_experiment(). This YAML can be used to recreate
the experiment: ``Experiment(experiment_yaml="restored.yaml")``.
Args:
experiment_id: The experiment ID returned from run_experiment().
file_name: Optional file path to save the YAML to. If provided,
the YAML content is written to this path in addition to being
returned as a string.
Returns:
The YAML config content as a string.
Raises:
MLOpsToolkitExperimentNotFoundException: If experiment not found.
FileNotFoundError: If the YAML artifact does not exist in MLflow
(e.g., experiment was run before this feature was added).
Example:
>>> yaml_content = get_experiment_yaml("exp_20260126_143052_abc123")
>>> print(yaml_content)
>>> # Save to file and recreate experiment
>>> get_experiment_yaml("exp_20260126_143052_abc123", file_name="config.yaml")
>>> experiment = Experiment(experiment_yaml="config.yaml")
"""
if file_name is not None and not file_name.endswith(".yaml"):
raise ValueError("file_name must end with .yaml")
exp_entry = _get_experiment_registry_entry(experiment_id)
parent_mlflow_run_id = exp_entry["parent_mlflow_run_id"]
client = get_mlflow_client()
with tempfile.TemporaryDirectory() as tmp_dir:
try:
artifact_path = client.download_artifacts(
run_id=parent_mlflow_run_id,
path="experiment_config.yaml",
dst_path=tmp_dir,
)
except Exception as e:
raise FileNotFoundError(
f"YAML artifact 'experiment_config.yaml' not found for experiment "
f"'{experiment_id}' (MLflow run: {parent_mlflow_run_id}). "
f"This experiment may have been run before YAML logging was added. "
f"Original error: {e}"
) from e
yaml_content = Path(artifact_path).read_text()
if file_name is not None:
output_path = Path(file_name)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(yaml_content)
return yaml_content
[docs]
def get_experiment_df(
experiment_id: str,
*,
include_metrics: bool = True,
) -> "DataFrame":
"""Get combined per-row results for an experiment.
Returns the combined results table with all model outputs.
Args:
experiment_id: The experiment ID.
include_metrics: If True, include per-row metric columns.
Defaults to True.
Returns:
Spark DataFrame with combined results from all models,
including a 'config_name' column to identify the model.
Raises:
MLOpsToolkitExperimentNotFoundException: If experiment not found.
Example:
>>> df = get_experiment_df("exp_20260126_143052_abc123")
>>> df.filter(df.exact_match_score == 0).show()
"""
spark = ydbc.get_spark_session()
exp_entry = _get_experiment_registry_entry(experiment_id)
results_table = exp_entry.get("results_table")
if not results_table or not table_exists(results_table):
raise MLOpsToolkitExperimentNotFoundException(experiment_id)
df = spark.table(results_table)
if not include_metrics:
metric_cols = [
"latency_ms",
"prompt_tokens",
"completion_tokens",
"total_tokens",
"exact_match_score",
"fuzzy_match_score",
"llm_judge_score",
"llm_judge_reasoning",
]
cols_to_drop = [c for c in metric_cols if c in df.columns]
df = df.drop(*cols_to_drop)
return df
[docs]
def list_experiments_df(
eval_table: str | None = None,
*,
status: Literal["all", "completed", "running", "partial", "failed"] = "all",
limit: int = 100,
order_by: Literal["created_at", "completed_at", "experiment_name"] = "created_at",
descending: bool = True,
) -> "DataFrame":
"""List experiments as a DataFrame.
Args:
eval_table: Optional eval table name to filter by.
status: Filter experiments by status. Options: 'all', 'completed',
'running', 'partial', 'failed'. Defaults to 'all'.
limit: Maximum number of experiments to return. Defaults to 100.
order_by: Column to sort results by. Defaults to 'created_at'.
descending: Sort in descending order. Defaults to True.
Returns:
Spark DataFrame with experiment metadata.
Example:
>>> experiments = list_experiments_df(eval_table="catalog.schema.vendor_eval")
>>> experiments.show()
"""
from pyspark.sql.types import (
DoubleType,
IntegerType,
StringType,
StructField,
StructType,
TimestampType,
)
spark = ydbc.get_spark_session()
# Define schema for empty DataFrame
schema = StructType(
[
StructField("experiment_id", StringType(), True),
StructField("experiment_name", StringType(), True),
StructField("eval_table", StringType(), True),
StructField("status", StringType(), True),
StructField("num_models", IntegerType(), True),
StructField("row_count", IntegerType(), True),
StructField("created_at", TimestampType(), True),
StructField("completed_at", TimestampType(), True),
StructField("duration_seconds", DoubleType(), True),
]
)
if not table_exists(EVAL_EXPERIMENTS_FULL_TABLE):
return spark.createDataFrame([], schema=schema)
df = spark.table(EVAL_EXPERIMENTS_FULL_TABLE)
# Apply filters
if eval_table:
df = df.where(F.col("eval_table") == eval_table)
if status != "all":
status_value = {
"completed": ExperimentStatus.COMPLETED,
"running": ExperimentStatus.RUNNING,
"partial": ExperimentStatus.PARTIAL,
"failed": ExperimentStatus.FAILED,
}[status]
df = df.where(F.col("status") == status_value)
# Select columns
df = df.select(
"experiment_id",
"experiment_name",
"eval_table",
"dataset_hash",
"status",
"num_models",
"row_count",
"metrics_enabled",
"mlflow_experiment",
"parent_mlflow_run_id",
"results_table",
"created_at",
"completed_at",
"duration_seconds",
"description",
)
# Order and limit
order_col = F.col(order_by)
if descending:
order_col = order_col.desc()
df = df.orderBy(order_col).limit(limit)
return df
[docs]
def compare_experiments_df(
experiment_ids: list[str] | None = None,
*,
eval_table: str | None = None,
limit: int = 100,
) -> "DataFrame":
"""Get a comparison DataFrame for multiple experiments.
For each experiment, shows the best-performing model on each metric.
Args:
experiment_ids: Optional list of specific experiment IDs to include.
eval_table: Optional eval table name to filter by.
limit: Maximum number of experiments to return. Defaults to 100.
Returns:
Spark DataFrame with experiment comparison metrics.
Example:
>>> summary = compare_experiments_df(eval_table="catalog.schema.vendor_eval")
>>> summary.show()
"""
from pyspark.sql.types import (
IntegerType,
StringType,
StructField,
StructType,
TimestampType,
)
spark = ydbc.get_spark_session()
schema = StructType(
[
StructField("experiment_id", StringType(), True),
StructField("experiment_name", StringType(), True),
StructField("eval_table", StringType(), True),
StructField("num_models", IntegerType(), True),
StructField("created_at", TimestampType(), True),
]
)
if not table_exists(EVAL_EXPERIMENTS_FULL_TABLE):
return spark.createDataFrame([], schema=schema)
df = spark.table(EVAL_EXPERIMENTS_FULL_TABLE)
# Apply filters
if experiment_ids:
df = df.filter(F.col("experiment_id").isin(experiment_ids))
if eval_table:
df = df.filter(F.col("eval_table") == eval_table)
# Only completed experiments
df = df.filter(F.col("status") == ExperimentStatus.COMPLETED)
df = df.select(
"experiment_id",
"experiment_name",
"eval_table",
"dataset_hash",
"num_models",
"row_count",
"metrics_enabled",
"results_table",
"created_at",
"completed_at",
"duration_seconds",
"description",
)
df = df.orderBy(F.col("created_at").desc()).limit(limit)
return df