Source code for ml_toolkit.functions.eval_utils.results

"""Public API for eval results retrieval.

This module provides functions to retrieve and analyze evaluation results
for both individual eval runs and experiments.

Naming convention:
- get_{entity}()      -> Returns dataclass/object
- get_{entity}_df()   -> Returns detailed DataFrame
- list_{entity}s_df() -> Returns listing DataFrame
- compare_{entity}s() -> Returns comparison DataFrame
"""

from pathlib import Path
import tempfile
from typing import TYPE_CHECKING, Literal

from pyspark.sql import functions as F
import yipit_databricks_client as ydbc

from ml_toolkit.functions.eval_utils.constants import (
    EVAL_EXPERIMENTS_FULL_TABLE,
    EVAL_RUNS_FULL_TABLE,
    ExperimentStatus,
    RunStatus,
)
from ml_toolkit.functions.eval_utils.helpers.results import (
    EVAL_RUNS_FULL_TABLE_SCHEMA,
    format_run_summary,
    get_run_registry_entry,
)
from ml_toolkit.functions.eval_utils.helpers.types import (
    EvalRunResult,
    ExperimentResult,
)
from ml_toolkit.ops.helpers.exceptions import (
    MLOpsToolkitEvalRunNotFoundException,
    MLOpsToolkitExperimentNotFoundException,
)
from ml_toolkit.ops.helpers.mlflow import get_mlflow_client
from ml_toolkit.ops.helpers.validation import table_exists

if TYPE_CHECKING:
    from pyspark.sql import DataFrame


# =============================================================================
# Run-level functions (individual model evaluations)
# =============================================================================


[docs] def get_run(run_id: str) -> EvalRunResult: """Get metadata for a single eval run. Args: run_id: The eval run ID returned from run_evaluation(). Returns: EvalRunResult containing run metadata and metrics summary. Raises: MLOpsToolkitEvalRunNotFoundException: If run not found. Example: >>> run = get_run("eval_run_20260126_143052_abc123") >>> print(run.metrics_summary) {'latency_ms_p50': 145.2, 'exact_match_score_accuracy': 0.87} """ run_entry = get_run_registry_entry(run_id) formatted = format_run_summary(run_entry) return EvalRunResult( run_id=run_id, mlflow_run_id=formatted["mlflow_run_id"], results_table=formatted["results_table"], metrics_summary=formatted["metrics_summary"], row_count=formatted["row_count"], error_count=formatted["error_count"], created_at=formatted["created_at"], )
[docs] def get_run_df( run_id: str, *, include_metrics: bool = True, filter_errors: bool = False, ) -> "DataFrame": """Get per-row results for an eval run as a DataFrame. Args: run_id: The eval run ID returned from run_evaluation(). include_metrics: If True, include per-row metric columns. Defaults to True. filter_errors: If True, exclude rows where inference failed. Defaults to False. Returns: Spark DataFrame with columns: - All columns from original eval table - model_output: The model's response text - latency_ms: Inference latency (if metric enabled) - prompt_tokens: Input token count (if metric enabled) - completion_tokens: Output token count (if metric enabled) - exact_match_score: 1.0 if exact match, 0.0 otherwise - fuzzy_match_score: Fuzzy match ratio [0.0, 1.0] - llm_judge_score: LLM judge score (if metric enabled) - error_message: Error message if inference failed, else NULL Raises: MLOpsToolkitEvalRunNotFoundException: If run not found. Example: >>> df = get_run_df("eval_run_20260126_143052_abc123") >>> df.filter(df.exact_match_score == 0).show() """ spark = ydbc.get_spark_session() # Get run info to find results table run_entry = get_run_registry_entry(run_id) results_table = run_entry.get("results_table") if not results_table or not table_exists(results_table): raise MLOpsToolkitEvalRunNotFoundException(run_id) df = spark.table(results_table) if filter_errors: df = df.filter(F.col("error_message").isNull()) if not include_metrics: metric_cols = [ "latency_ms", "prompt_tokens", "completion_tokens", "total_tokens", "exact_match_score", "fuzzy_match_score", "llm_judge_score", "llm_judge_reasoning", ] cols_to_drop = [c for c in metric_cols if c in df.columns] df = df.drop(*cols_to_drop) return df
[docs] def list_runs_df( eval_table: str | None = None, *, experiment_id: str | None = None, status: Literal["all", "completed", "running", "failed"] = "all", limit: int = 100, order_by: Literal["created_at", "completed_at", "run_name"] = "created_at", descending: bool = True, ) -> "DataFrame": """List eval runs as a DataFrame. Can filter by eval table, experiment, or status. Args: eval_table: Optional eval table name to filter by. experiment_id: Optional experiment ID to filter by. status: Filter runs by status. Options: 'all', 'completed', 'running', 'failed'. Defaults to 'all'. limit: Maximum number of runs to return. Defaults to 100. order_by: Column to sort results by. Defaults to 'created_at'. descending: Sort in descending order. Defaults to True. Returns: Spark DataFrame with run metadata columns. Example: >>> runs = list_runs_df(eval_table="catalog.schema.vendor_eval") >>> runs.show() >>> runs = list_runs_df(experiment_id="exp_20260126_143052_abc123") >>> runs.show() """ spark = ydbc.get_spark_session() if not table_exists(EVAL_RUNS_FULL_TABLE): return spark.createDataFrame([], schema=EVAL_RUNS_FULL_TABLE_SCHEMA) df = spark.table(EVAL_RUNS_FULL_TABLE) # Apply filters if eval_table: df = df.where(F.col("eval_table") == eval_table) if experiment_id: df = df.where(F.col("experiment_id") == experiment_id) if status != "all": status_value = { "completed": RunStatus.COMPLETED, "running": RunStatus.RUNNING, "failed": RunStatus.FAILED, }[status] df = df.where(F.col("status") == status_value) # Compute error_rate df = df.withColumn( "error_rate", F.when( F.col("row_count") > 0, F.col("error_count") / F.col("row_count") ).otherwise(0.0), ) # Select columns df = df.select( "run_id", "run_name", "eval_table", "endpoint", "litellm_model", "status", "row_count", "error_count", "error_rate", "metrics_enabled", "mlflow_run_id", "mlflow_experiment", "created_at", "completed_at", "duration_seconds", "experiment_id", "model_name", ) # Order and limit order_col = F.col(order_by) if descending: order_col = order_col.desc() df = df.orderBy(order_col).limit(limit) return df
[docs] def compare_runs_df( run_ids: list[str] | None = None, *, eval_table: str | None = None, limit: int = 100, ) -> "DataFrame": """Get a comparison DataFrame for multiple eval runs. Retrieves aggregated metrics across multiple runs for comparison. Args: run_ids: Optional list of specific run IDs to include. eval_table: Optional eval table name to filter by. limit: Maximum number of runs to return. Defaults to 100. Returns: Spark DataFrame with comparison metrics. Example: >>> summary = compare_runs_df(eval_table="catalog.schema.vendor_eval") >>> summary.orderBy("exact_match_accuracy", ascending=False).show() """ spark = ydbc.get_spark_session() if not table_exists(EVAL_RUNS_FULL_TABLE): return spark.createDataFrame([], schema=EVAL_RUNS_FULL_TABLE_SCHEMA) df = spark.table(EVAL_RUNS_FULL_TABLE) # Apply filters if run_ids: df = df.filter(F.col("run_id").isin(run_ids)) if eval_table: df = df.filter(F.col("eval_table") == eval_table) # Only completed runs df = df.filter(F.col("status") == RunStatus.COMPLETED) # Parse metrics_summary JSON and extract common metrics df = df.withColumn( "metrics", F.from_json(F.col("metrics_summary"), "MAP<STRING, DOUBLE>") ) df = df.select( "run_id", "eval_table", F.coalesce(F.col("endpoint"), F.col("litellm_model")).alias("model"), "run_name", "created_at", "row_count", F.when(F.col("row_count") > 0, F.col("error_count") / F.col("row_count")) .otherwise(0.0) .alias("error_rate"), F.col("metrics.latency_ms_p50").alias("latency_ms_p50"), F.col("metrics.latency_ms_p99").alias("latency_ms_p99"), F.col("metrics.exact_match_score_accuracy").alias("exact_match_accuracy"), F.col("metrics.fuzzy_match_score_avg").alias("fuzzy_match_score_avg"), F.col("metrics.llm_judge_score_mean").alias("llm_judge_score_mean"), F.col("metrics.token_count_count").cast("long").alias("total_tokens"), "mlflow_run_id", "experiment_id", "model_name", ) df = df.orderBy(F.col("created_at").desc()).limit(limit) return df
# ============================================================================= # Experiment-level functions (bundles of model runs) # ============================================================================= def _get_experiment_registry_entry(experiment_id: str) -> dict: """Get experiment entry from registry. Args: experiment_id: The experiment ID. Returns: Dict with experiment info. Raises: MLOpsToolkitExperimentNotFoundException: If experiment not found. """ spark = ydbc.get_spark_session() if not table_exists(EVAL_EXPERIMENTS_FULL_TABLE): raise MLOpsToolkitExperimentNotFoundException(experiment_id) df = ( spark.table(EVAL_EXPERIMENTS_FULL_TABLE) .where(F.col("experiment_id") == experiment_id) .limit(1) ) if df.count() == 0: raise MLOpsToolkitExperimentNotFoundException(experiment_id) row = df.first() return row.asDict()
[docs] def get_experiment(experiment_id: str) -> ExperimentResult: """Get an experiment by its ID. Args: experiment_id: The experiment ID returned from run_experiment(). Returns: ExperimentResult with all model results populated. Raises: MLOpsToolkitExperimentNotFoundException: If experiment not found. Example: >>> exp = get_experiment("exp_20260126_143052_abc123") >>> exp.summary_df.show() """ # Get experiment from registry exp_entry = _get_experiment_registry_entry(experiment_id) # Get all runs for this experiment runs_df = list_runs_df(experiment_id=experiment_id) # Build model_results dict from runs model_results = {} for row in runs_df.collect(): row_dict = row.asDict() model_name = row_dict.get("model_name") if model_name: # Get full run details run_result = get_run(row_dict["run_id"]) model_results[model_name] = run_result return ExperimentResult( experiment_id=experiment_id, experiment_name=exp_entry["experiment_name"], parent_run_id=exp_entry["parent_mlflow_run_id"], mlflow_experiment=exp_entry["mlflow_experiment"], dataset_hash=exp_entry["dataset_hash"], results_table=exp_entry["results_table"], model_results=model_results, )
[docs] def get_experiment_yaml( experiment_id: str, file_name: str | None = None, ) -> str: """Download the experiment YAML config artifact from MLflow. Retrieves the YAML configuration that was saved when the experiment was run via run_experiment(). This YAML can be used to recreate the experiment: ``Experiment(experiment_yaml="restored.yaml")``. Args: experiment_id: The experiment ID returned from run_experiment(). file_name: Optional file path to save the YAML to. If provided, the YAML content is written to this path in addition to being returned as a string. Returns: The YAML config content as a string. Raises: MLOpsToolkitExperimentNotFoundException: If experiment not found. FileNotFoundError: If the YAML artifact does not exist in MLflow (e.g., experiment was run before this feature was added). Example: >>> yaml_content = get_experiment_yaml("exp_20260126_143052_abc123") >>> print(yaml_content) >>> # Save to file and recreate experiment >>> get_experiment_yaml("exp_20260126_143052_abc123", file_name="config.yaml") >>> experiment = Experiment(experiment_yaml="config.yaml") """ if file_name is not None and not file_name.endswith(".yaml"): raise ValueError("file_name must end with .yaml") exp_entry = _get_experiment_registry_entry(experiment_id) parent_mlflow_run_id = exp_entry["parent_mlflow_run_id"] client = get_mlflow_client() with tempfile.TemporaryDirectory() as tmp_dir: try: artifact_path = client.download_artifacts( run_id=parent_mlflow_run_id, path="experiment_config.yaml", dst_path=tmp_dir, ) except Exception as e: raise FileNotFoundError( f"YAML artifact 'experiment_config.yaml' not found for experiment " f"'{experiment_id}' (MLflow run: {parent_mlflow_run_id}). " f"This experiment may have been run before YAML logging was added. " f"Original error: {e}" ) from e yaml_content = Path(artifact_path).read_text() if file_name is not None: output_path = Path(file_name) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(yaml_content) return yaml_content
[docs] def get_experiment_df( experiment_id: str, *, include_metrics: bool = True, ) -> "DataFrame": """Get combined per-row results for an experiment. Returns the combined results table with all model outputs. Args: experiment_id: The experiment ID. include_metrics: If True, include per-row metric columns. Defaults to True. Returns: Spark DataFrame with combined results from all models, including a 'config_name' column to identify the model. Raises: MLOpsToolkitExperimentNotFoundException: If experiment not found. Example: >>> df = get_experiment_df("exp_20260126_143052_abc123") >>> df.filter(df.exact_match_score == 0).show() """ spark = ydbc.get_spark_session() exp_entry = _get_experiment_registry_entry(experiment_id) results_table = exp_entry.get("results_table") if not results_table or not table_exists(results_table): raise MLOpsToolkitExperimentNotFoundException(experiment_id) df = spark.table(results_table) if not include_metrics: metric_cols = [ "latency_ms", "prompt_tokens", "completion_tokens", "total_tokens", "exact_match_score", "fuzzy_match_score", "llm_judge_score", "llm_judge_reasoning", ] cols_to_drop = [c for c in metric_cols if c in df.columns] df = df.drop(*cols_to_drop) return df
[docs] def list_experiments_df( eval_table: str | None = None, *, status: Literal["all", "completed", "running", "partial", "failed"] = "all", limit: int = 100, order_by: Literal["created_at", "completed_at", "experiment_name"] = "created_at", descending: bool = True, ) -> "DataFrame": """List experiments as a DataFrame. Args: eval_table: Optional eval table name to filter by. status: Filter experiments by status. Options: 'all', 'completed', 'running', 'partial', 'failed'. Defaults to 'all'. limit: Maximum number of experiments to return. Defaults to 100. order_by: Column to sort results by. Defaults to 'created_at'. descending: Sort in descending order. Defaults to True. Returns: Spark DataFrame with experiment metadata. Example: >>> experiments = list_experiments_df(eval_table="catalog.schema.vendor_eval") >>> experiments.show() """ from pyspark.sql.types import ( DoubleType, IntegerType, StringType, StructField, StructType, TimestampType, ) spark = ydbc.get_spark_session() # Define schema for empty DataFrame schema = StructType( [ StructField("experiment_id", StringType(), True), StructField("experiment_name", StringType(), True), StructField("eval_table", StringType(), True), StructField("status", StringType(), True), StructField("num_models", IntegerType(), True), StructField("row_count", IntegerType(), True), StructField("created_at", TimestampType(), True), StructField("completed_at", TimestampType(), True), StructField("duration_seconds", DoubleType(), True), ] ) if not table_exists(EVAL_EXPERIMENTS_FULL_TABLE): return spark.createDataFrame([], schema=schema) df = spark.table(EVAL_EXPERIMENTS_FULL_TABLE) # Apply filters if eval_table: df = df.where(F.col("eval_table") == eval_table) if status != "all": status_value = { "completed": ExperimentStatus.COMPLETED, "running": ExperimentStatus.RUNNING, "partial": ExperimentStatus.PARTIAL, "failed": ExperimentStatus.FAILED, }[status] df = df.where(F.col("status") == status_value) # Select columns df = df.select( "experiment_id", "experiment_name", "eval_table", "dataset_hash", "status", "num_models", "row_count", "metrics_enabled", "mlflow_experiment", "parent_mlflow_run_id", "results_table", "created_at", "completed_at", "duration_seconds", "description", ) # Order and limit order_col = F.col(order_by) if descending: order_col = order_col.desc() df = df.orderBy(order_col).limit(limit) return df
[docs] def compare_experiments_df( experiment_ids: list[str] | None = None, *, eval_table: str | None = None, limit: int = 100, ) -> "DataFrame": """Get a comparison DataFrame for multiple experiments. For each experiment, shows the best-performing model on each metric. Args: experiment_ids: Optional list of specific experiment IDs to include. eval_table: Optional eval table name to filter by. limit: Maximum number of experiments to return. Defaults to 100. Returns: Spark DataFrame with experiment comparison metrics. Example: >>> summary = compare_experiments_df(eval_table="catalog.schema.vendor_eval") >>> summary.show() """ from pyspark.sql.types import ( IntegerType, StringType, StructField, StructType, TimestampType, ) spark = ydbc.get_spark_session() schema = StructType( [ StructField("experiment_id", StringType(), True), StructField("experiment_name", StringType(), True), StructField("eval_table", StringType(), True), StructField("num_models", IntegerType(), True), StructField("created_at", TimestampType(), True), ] ) if not table_exists(EVAL_EXPERIMENTS_FULL_TABLE): return spark.createDataFrame([], schema=schema) df = spark.table(EVAL_EXPERIMENTS_FULL_TABLE) # Apply filters if experiment_ids: df = df.filter(F.col("experiment_id").isin(experiment_ids)) if eval_table: df = df.filter(F.col("eval_table") == eval_table) # Only completed experiments df = df.filter(F.col("status") == ExperimentStatus.COMPLETED) df = df.select( "experiment_id", "experiment_name", "eval_table", "dataset_hash", "num_models", "row_count", "metrics_enabled", "results_table", "created_at", "completed_at", "duration_seconds", "description", ) df = df.orderBy(F.col("created_at").desc()).limit(limit) return df