"""Main run_evaluation function for executing model evaluations.
This module provides the core run_evaluation function that orchestrates:
- Inference execution against model endpoints
- Metrics computation (latency, token counts, exact/fuzzy match, LLM judge)
- MLflow experiment tracking
- Results persistence to Delta tables
"""
from datetime import datetime
from typing import Any, Callable
import mlflow
from pyspark.sql import functions as F
import yaml
import yipit_databricks_client as ydbc
from yipit_databricks_utils.helpers.delta import create_table
from ml_toolkit.functions.eval_utils.constants import (
BUILTIN_METRICS,
DEFAULT_BATCH_SIZE,
DEFAULT_MAX_CONCURRENT,
DEFAULT_MAX_OUTPUT_TOKENS,
DEFAULT_TIMEOUT_SECONDS,
MAX_ERROR_RATE_THRESHOLD,
ExperimentStatus,
RunStatus,
)
from ml_toolkit.functions.eval_utils.helpers.config import (
DatasetConfig,
Experiment,
ModelConfig,
)
from ml_toolkit.functions.eval_utils.helpers.eval import (
_compute_llm_judge_metrics,
_compute_row_metrics,
_create_combined_results_table,
_generate_experiment_id,
_generate_parent_run_id,
_generate_run_id,
_get_mlflow_experiment_path,
_register_eval_run,
_register_experiment,
)
from ml_toolkit.functions.eval_utils.helpers.inference import run_inference
from ml_toolkit.functions.eval_utils.helpers.metrics import aggregate_all_metrics
from ml_toolkit.functions.eval_utils.helpers.types import (
EvalRunResult,
ExperimentResult,
LLMJudgeConfig,
RemoteExperimentRun,
_EvalRunSnapshot,
)
from ml_toolkit.ops.helpers.exceptions import (
AggregateExceptionHandler,
MLOpsToolkitTableNotFoundException,
)
from ml_toolkit.ops.helpers.logger import get_logger, suppress_table_creation_logs
from ml_toolkit.ops.helpers.validation import table_exists
from ml_toolkit.ops.storage.workspace import create_workspace_directory
[docs]
def run_evaluation(
eval_table: str | None = None,
*,
primary_key: str | None = None,
model_name: str | None = None,
endpoint: str | None = None,
litellm_model: str | None = None,
version: int | None = None,
prompt_template: str | None = None,
system_prompt: str | None = None,
prompt_registry_name: str | None = None,
prompt_version: str | int | None = None,
prompt_alias: str | None = None,
metrics: list[str | Callable] = ["latency", "token_count"],
llm_judge_config: LLMJudgeConfig | None = None,
mlflow_experiment: str | None = None,
run_name: str | None = None,
batch_size: int = DEFAULT_BATCH_SIZE,
max_concurrent: int = DEFAULT_MAX_CONCURRENT,
timeout_seconds: float = DEFAULT_TIMEOUT_SECONDS,
input_column: str | None = "input",
expected_output_column: str | None = "expected_output",
additional_context_columns: list[str] | None = None,
tags: dict[str, str] | None = None,
max_output_tokens: int | None = DEFAULT_MAX_OUTPUT_TOKENS,
litellm_model_kwargs: dict[str, Any] | None = None,
# Object-based configuration (values override defaults, but explicit params take precedence)
dataset: DatasetConfig | None = None,
model_config: ModelConfig | None = None,
is_nested: bool = False,
# Internal parameters for experiment tracking (not part of public API)
_experiment_id: str | None = None,
_model_name: str | None = None,
) -> EvalRunResult:
"""Execute an evaluation run against a model endpoint.
Runs inference on all rows in the eval table using ai_query or OpenAI-compatible
clients, computes specified metrics, and logs results to MLflow.
Parameters can be provided directly or via DatasetConfig/ModelConfig objects.
When both are provided, explicit parameters take precedence over object values.
Args:
eval_table: Fully qualified eval table name (catalog.schema.table).
Can be provided via dataset object instead.
primary_key: Name of the primary key column. Can be provided via dataset object.
model_name: Unity Catalog model name (e.g., 'catalog.schema.model_name').
If provided without endpoint, the endpoint name will be automatically
inferred from the model name. Either model_name/endpoint or litellm_model
must be specified. Can be provided via config object.
endpoint: Databricks Model Serving endpoint name. Optional if model_name
is provided (will be inferred). Either model_name/endpoint or litellm_model
must be specified (not both). Can be provided via config object.
litellm_model: LiteLLM model identifier (e.g., 'gpt-4', 'claude-3').
Either model_name/endpoint or litellm_model must be specified (not both).
Can be provided via config object.
version: Optional integer version for routing to a specific
model version in multi-version serving endpoints. Only used when
endpoint is specified. Example: 1, 2, 3. Defaults to None (latest version).
Can be provided via config object.
prompt_template: Template for constructing the user message using <<variable>>
syntax. Available variables: <<input>>, <<candidates>>, and any columns from
additional_context_columns. Defaults to "<<input>>". Can be provided via config object.
system_prompt: Optional system prompt for chat completion. Can be provided via config object.
prompt_registry_name: Fully qualified prompt name from MLflow Prompt Registry
(e.g., 'catalog.schema.prompt_name') or URI (e.g., 'prompts:/catalog.schema.prompt_name@alias').
If provided, loads prompt_template from registry (converts {{variable}} to <<variable>>).
Can be provided via config object.
prompt_version: Specific version to load from registry. Ignored if prompt_registry_name
includes version/alias or if prompt_alias is provided. Can be provided via config object.
prompt_alias: Alias to load from registry (e.g., 'production'). Takes precedence over prompt_version.
Can be provided via config object.
metrics: List of metrics to compute. Defaults to ['latency', 'token_count'].
Can include built-in metric names ('latency', 'token_count', 'exact_match',
'fuzzy_match', 'llm_judge') or custom scorer callables
(``Callable[[str, str, dict], float]``). Can be provided via config object.
llm_judge_config: Configuration for LLM-as-judge metric. Required
if 'llm_judge' is in metrics list. Can be provided via config object.
mlflow_experiment: MLflow experiment name. If not specified, defaults
to '/Evals/{schema_name}/{table_base_name}' derived from eval_table.
run_name: Optional name for the MLflow run. Defaults to
'{endpoint_or_model}_{timestamp}'. Can be provided via config.name.
batch_size: Number of rows to process per batch.
max_concurrent: Maximum concurrent requests to the endpoint.
timeout_seconds: Timeout per inference request.
input_column: Name of the input column. Defaults to 'input'. Can be provided via dataset object.
expected_output_column: Name of the expected output column for
comparison metrics. Set to None if not available. Can be provided via dataset object.
additional_context_columns: Additional columns to include in prompt
template context. Can be provided via dataset object.
tags: Optional tags to apply to the MLflow run. Merged with config.tags if both provided.
max_output_tokens: Maximum output tokens for model response. Can be provided via config object.
litellm_model_kwargs: Additional kwargs for LiteLLM model.
dataset: DatasetConfig object containing table reference and column configuration.
Values are used as defaults when explicit parameters are not provided.
model_config: ModelConfig object containing model and prompt configuration.
Values are used as defaults when explicit parameters are not provided.
Returns:
EvalRunResult containing:
- run_id: Unique identifier for this eval run
- mlflow_run_id: MLflow run ID
- results_table: Fully qualified name of results table
- metrics_summary: Dict of aggregated metrics
- row_count: Number of rows evaluated
- error_count: Number of failed inferences
Raises:
ValueError: If neither endpoint nor litellm_model specified.
ValueError: If both endpoint and litellm_model specified.
ValueError: If 'llm_judge' in metrics but llm_judge_config is None.
MLOpsToolkitTableNotFoundException: If eval_table doesn't exist.
MLOpsToolkitEvalRunFailedException: If error rate exceeds threshold.
Example:
>>> # Using model_name (endpoint automatically inferred):
>>> result = run_evaluation(
... eval_table="catalog.schema.vendor_eval_20260126",
... primary_key="row_id",
... model_name="catalog.schema.vendor_tagger_model",
... prompt_template="Tag the following vendor name: <<input>>",
... metrics=["latency", "token_count", "exact_match"],
... )
>>> # Using explicit endpoint:
>>> result = run_evaluation(
... eval_table="catalog.schema.vendor_eval_20260126",
... primary_key="row_id",
... endpoint="vendor-tagger-v1",
... prompt_template="Tag the following vendor name: <<input>>",
... metrics=["latency", "token_count", "exact_match"],
... )
>>> # Using version-based routing (defaults to latest version):
>>> result = run_evaluation(
... eval_table="catalog.schema.vendor_eval",
... model_name="catalog.schema.my_model",
... prompt_template="Extract entity: <<input>>",
... )
>>> # Using specific version:
>>> result = run_evaluation(
... eval_table="catalog.schema.vendor_eval",
... model_name="catalog.schema.my_model",
... version=1,
... prompt_template="Extract entity: <<input>>",
... )
>>> # Using DatasetConfig and ModelConfig objects:
>>> dataset = DatasetConfig(table="catalog.schema.vendor_eval", primary_key="row_id")
>>> model_config = ModelConfig(name="gpt-4o-test", litellm_model="gpt-4o")
>>> result = run_evaluation(dataset=dataset, model_config=model_config)
>>> # Using prompt registry:
>>> result = run_evaluation(
... eval_table="catalog.schema.vendor_eval_20260126",
... primary_key="row_id",
... endpoint="vendor-tagger-v1",
... prompt_registry_name="mycatalog.myschema.vendor_tagging_prompt",
... prompt_alias="production",
... metrics=["latency", "token_count", "exact_match"],
... )
"""
logger = get_logger()
spark = ydbc.get_spark_session()
with AggregateExceptionHandler() as exc_handler:
if dataset is None:
dataset = DatasetConfig(
table=eval_table,
primary_key=primary_key,
input_column=input_column,
expected_output_column=expected_output_column,
additional_context_columns=additional_context_columns,
)
# Generate default run_name before ModelConfig creation
if model_config is None and run_name is None:
model_name_for_default = model_name or endpoint or litellm_model
if model_name_for_default:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
run_name = f"{model_name_for_default}_{timestamp}"
if model_config is None:
# Generate a default name if run_name is None
# ModelConfig requires a non-empty name, so we generate one based on endpoint/model
config_name = run_name
if config_name is None:
# Generate name from model_name, endpoint, or litellm_model
temp_model_name = (
model_name or endpoint or litellm_model or "unknown_model"
)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
config_name = f"{temp_model_name}_{timestamp}"
model_config = ModelConfig(
name=config_name,
model_name=model_name,
endpoint=endpoint,
litellm_model=litellm_model,
version=version,
prompt_template=prompt_template,
system_prompt=system_prompt,
prompt_registry_name=prompt_registry_name,
prompt_version=prompt_version,
prompt_alias=prompt_alias,
max_output_tokens=max_output_tokens,
tags=tags,
temperature=litellm_model_kwargs.get("temperature")
if litellm_model_kwargs
else None,
litellm_model_kwargs=litellm_model_kwargs,
)
if any(m == "llm_judge" for m in metrics) and llm_judge_config is None:
exc_handler.collect_raise(
ValueError,
"'llm_judge_config' is required when 'llm_judge' metric is specified.",
)
exc_handler.raise_if_any()
builtin_metrics = [
m for m in metrics if isinstance(m, str) and m in BUILTIN_METRICS
]
custom_scorers = [m for m in metrics if callable(m)]
# Generate IDs and paths
run_id = _generate_run_id()
mlflow_exp_path = mlflow_experiment or _get_mlflow_experiment_path(dataset.table)
# Ensure model_config was created successfully
if model_config is None:
raise ValueError(
"model_config is None. This should not happen - ModelConfig creation must have failed. "
"Check that endpoint or litellm_model is provided, and name is valid."
)
model_name = model_config.endpoint or model_config.litellm_model
if run_name is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
run_name = f"{model_name}_{timestamp}"
logger.info(f"Starting eval run: {run_id}")
logger.info(f"Eval table_name: {eval_table}")
logger.info(f"Model: {model_name}")
# Create snapshot of run configuration for logging
run_snapshot = _EvalRunSnapshot(
eval_table=dataset.table,
primary_key=primary_key,
endpoint=model_config.endpoint,
litellm_model=model_config.litellm_model,
prompt_template=model_config.prompt_template,
system_prompt=model_config.system_prompt,
metrics=builtin_metrics,
llm_judge_config=llm_judge_config,
batch_size=batch_size,
max_concurrent=max_concurrent,
timeout_seconds=timeout_seconds,
max_output_tokens=model_config.max_output_tokens,
input_column=dataset.input_column,
expected_output_column=dataset.expected_output_column,
additional_context_columns=dataset.additional_context_columns,
litellm_model_kwargs=model_config.litellm_model_kwargs,
)
df = spark.table(dataset.table)
row_count = df.count()
logger.info(f"Loaded {row_count} rows from eval table_name")
# Ensure workspace directory exists for MLflow experiment
parent_dir = "/".join(mlflow_exp_path.split("/")[:-1])
logger.info(f"MLflow experiment path: {mlflow_exp_path}, parent dir: {parent_dir}")
create_workspace_directory(parent_dir)
logger.info("Workspace directory created, setting MLflow experiment...")
mlflow.set_experiment(mlflow_exp_path)
logger.info("MLflow experiment set successfully")
with mlflow.start_run(run_name=run_name, nested=is_nested) as mlflow_run:
mlflow_run_id = mlflow_run.info.run_id
mlflow.log_params(
{
"eval_table": dataset.table,
"primary_key": dataset.primary_key,
"endpoint": model_config.endpoint,
"litellm_model": model_config.litellm_model,
"metrics": str(builtin_metrics),
"row_count": row_count,
}
)
# Log config as artifact
mlflow.log_dict(run_snapshot.to_dict(), "config.json")
if tags:
mlflow.set_tags(tags)
try:
# Run inference
logger.info("Running inference...")
df_with_output = run_inference(
df=df,
endpoint=model_config.endpoint,
litellm_model=model_config.litellm_model,
prompt_column=dataset.input_column,
prompt_template=model_config.prompt_template,
system_prompt=model_config.system_prompt,
max_output_tokens=model_config.max_output_tokens,
output_column="model_output",
additional_context_columns=dataset.additional_context_columns,
litellm_model_kwargs=model_config.litellm_model_kwargs,
version=model_config.version,
)
# Compute metrics
logger.info("Computing metrics...")
df_with_metrics = _compute_row_metrics(
df=df_with_output,
metrics=builtin_metrics,
expected_output_column=dataset.expected_output_column,
llm_judge_config=llm_judge_config,
custom_scorers=custom_scorers,
)
# Compute LLM judge if requested
if "llm_judge" in builtin_metrics and llm_judge_config is not None:
logger.info("Computing LLM judge scores...")
df_with_metrics = _compute_llm_judge_metrics(
df=df_with_metrics,
config=llm_judge_config,
expected_output_column=dataset.expected_output_column,
)
# Add metadata columns
df_with_metrics = df_with_metrics.withColumn(
"created_at", F.current_timestamp()
)
# Drop internal columns that may contain NullType (not supported by Parquet)
internal_columns_to_drop = ["_ai_result"]
for col in internal_columns_to_drop:
if col in df_with_metrics.columns:
df_with_metrics = df_with_metrics.drop(col)
# Count errors
error_count = df_with_metrics.filter(
F.col("error_message").isNotNull()
).count()
error_rate = error_count / row_count if row_count > 0 else 0
# Check error threshold
if error_rate > MAX_ERROR_RATE_THRESHOLD:
logger.error(
f"Eval run failed: error rate {error_rate} exceeds threshold {MAX_ERROR_RATE_THRESHOLD}"
)
# Save results to Delta table_name
table_parts = dataset.table.split(".")
results_table_name = f"{table_parts[2]}_results_{run_id}"
results_full_table = (
f"{table_parts[0]}.{table_parts[1]}.{results_table_name}"
)
logger.info(f"Saving results to: {results_full_table}")
with suppress_table_creation_logs():
create_table(
schema_name=table_parts[1],
table_name=results_table_name,
query=df_with_metrics,
catalog_name=table_parts[0],
overwrite=False,
)
# Build metric columns from user-specified metrics
# latency_ms is always present from inference; other metrics have _score suffix
metric_columns = ["latency_ms"]
for m in builtin_metrics:
if m not in ("latency", "token_count"):
metric_columns.append(f"{m}_score")
# Collect results for aggregation
results_rows = df_with_metrics.select(*metric_columns).collect()
results_dicts = [row.asDict() for row in results_rows]
metrics_summary = aggregate_all_metrics(results_dicts, metric_columns)
# Add error metrics
metrics_summary["error_count"] = error_count
metrics_summary["error_rate"] = error_rate
# Log metrics to MLflow
mlflow.log_metrics(metrics_summary)
# Log results table_name reference
mlflow.log_text(results_full_table, "results_table_ref.txt")
# Register run
_register_eval_run(
run_id=run_id,
eval_table=dataset.table,
config=run_snapshot,
results_table=results_full_table,
mlflow_experiment=mlflow_exp_path,
mlflow_run_id=mlflow_run_id,
metrics_summary=metrics_summary,
row_count=row_count,
error_count=error_count,
status=RunStatus.COMPLETED,
run_name=run_name,
experiment_id=_experiment_id,
model_name=_model_name,
)
logger.info(f"Eval run completed: {run_id}")
logger.info(f"Results table_name: {results_full_table}")
logger.info(f"Metrics: {metrics_summary}")
return EvalRunResult(
run_id=run_id,
mlflow_run_id=mlflow_run_id,
results_table=results_full_table,
metrics_summary=metrics_summary,
row_count=row_count,
error_count=error_count,
created_at=datetime.now(),
)
except Exception as e:
logger.error(f"Eval run failed: {e}")
mlflow.log_param("status", RunStatus.FAILED)
mlflow.log_param("error", str(e))
raise
[docs]
def run_experiment(
experiment: Experiment,
*,
mlflow_experiment_name: str | None = None,
trigger_remote: bool = False,
wait: bool = False,
) -> ExperimentResult | RemoteExperimentRun:
"""Run an experiment comparing multiple models against a single dataset.
Creates a parent MLflow run with nested child runs for each model.
All results are stored in individual tables plus a combined table
with a 'model_name' column for easy comparison.
Metrics are defined once at the experiment level and applied to all models,
eliminating duplication.
Args:
experiment: Experiment object containing dataset, models, and metrics.
mlflow_experiment_name: MLflow experiment name. If not specified, defaults
to '/Evals/{schema_name}/{table_base_name}' derived from dataset table.
trigger_remote: If True, submit the experiment as a remote Databricks
serverless job instead of running locally. The experiment must not
contain callable metrics. Defaults to False.
wait: Only used when trigger_remote=True. If True, block and poll
until the remote job completes, then return a full ExperimentResult.
If False (default), return immediately with a RemoteExperimentRun.
Returns:
ExperimentResult if running locally or with trigger_remote=True and wait=True.
RemoteExperimentRun if trigger_remote=True and wait=False (fire-and-forget).
Raises:
MLOpsToolkitTableNotFoundException: If dataset table doesn't exist.
ValueError: If trigger_remote=True and experiment contains callable metrics.
Example:
>>> dataset = DatasetConfig(
... table="catalog.schema.vendor_eval",
... primary_key="row_id",
... input_column="vendor_name",
... expected_output_column="canonical_name",
... )
>>> experiment = Experiment(
... name="vendor-tagger-comparison",
... dataset=dataset,
... models=[
... ModelConfig(name="llama-8b", endpoint="llama-endpoint"),
... ModelConfig(name="gpt-4o", litellm_model="gpt-4o"),
... ],
... metrics=[Metric.LATENCY, Metric.EXACT_MATCH],
... )
>>> result = run_experiment(experiment)
>>> result.summary_df.orderBy("exact_match_accuracy", ascending=False).show()
>>> # Run remotely (fire-and-forget):
>>> remote_run = run_experiment(experiment, trigger_remote=True)
>>> print(remote_run.databricks_url)
>>> result = remote_run.get_result() # blocks until done
"""
# --- Remote execution path ---
if trigger_remote:
from ml_toolkit.functions.eval_utils.helpers.remote import (
_submit_remote_experiment,
_validate_experiment_serializable,
)
_validate_experiment_serializable(experiment)
remote_run = _submit_remote_experiment(
experiment=experiment,
mlflow_experiment_name=mlflow_experiment_name,
)
if wait:
return remote_run.get_result()
return remote_run
# --- Local execution path ---
logger = get_logger()
spark = ydbc.get_spark_session()
dataset = experiment.dataset
# Validate inputs
with AggregateExceptionHandler() as exc_handler:
if not table_exists(dataset.table):
exc_handler.collect_raise(MLOpsToolkitTableNotFoundException, dataset.table)
exc_handler.raise_if_any()
# Generate IDs and paths
experiment_id = _generate_experiment_id()
parent_run_id = _generate_parent_run_id()
mlflow_exp_path = mlflow_experiment_name or _get_mlflow_experiment_path(dataset)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
parent_run_name = f"{experiment.name}_{timestamp}"
dataset_hash = dataset.compute_dataset_hash(spark)
row_count = spark.table(dataset.table).count()
experiment_started_at = datetime.now()
logger.info(f"Starting experiment: {experiment.name} ({experiment_id})")
logger.info(f"Dataset: {dataset.table} (hash: {dataset_hash})")
logger.info(f"Models: {[m.name for m in experiment.models]}")
logger.info(f"Metrics: {[str(m) for m in experiment.metrics if not callable(m)]}")
# Ensure workspace directory exists for MLflow experiment
parent_dir = "/".join(mlflow_exp_path.split("/")[:-1])
logger.info(f"MLflow experiment path: {mlflow_exp_path}, parent dir: {parent_dir}")
create_workspace_directory(parent_dir)
logger.info("Workspace directory created, setting MLflow experiment...")
mlflow.set_experiment(mlflow_exp_path)
logger.info("MLflow experiment set successfully")
model_results: dict[str, EvalRunResult] = {}
# Start parent run
with mlflow.start_run(run_name=parent_run_name) as parent_run:
parent_mlflow_run_id = parent_run.info.run_id
mlflow.log_params(
{
"experiment_name": experiment.name,
"dataset_table": dataset.table,
"dataset_hash": dataset_hash,
"num_models": len(experiment.models),
"model_names": str([m.name for m in experiment.models]),
"primary_key": dataset.primary_key,
}
)
mlflow.log_dict(experiment.to_dict(), "experiment_info.json")
# Save experiment config as YAML artifact for reproducibility
yaml_content = yaml.dump(
experiment.to_dict(), default_flow_style=False, sort_keys=False
)
mlflow.log_text(yaml_content, "experiment_config.yaml")
mlflow.set_tags(
{
"eval_type": "experiment",
"experiment_id": experiment_id,
"experiment_name": experiment.name,
"parent_run_id": parent_run_id,
**(experiment.tags or {}),
}
)
# Run each model as a nested run
for model in experiment.models:
logger.info(f"Running model: {model.name}")
try:
# Call existing run_evaluation for this model
# Pass experiment-level metrics to each model run
result = run_evaluation(
dataset=dataset,
model_config=model,
tags={
"model_name": model.name,
"experiment_id": experiment_id,
"experiment_name": experiment.name,
"parent_run_id": parent_run_id,
**(model.tags or {}),
},
metrics=experiment.metrics,
llm_judge_config=experiment.llm_judge_config,
is_nested=True,
# Internal parameters for experiment tracking
_experiment_id=experiment_id,
_model_name=model.name,
)
model_results[model.name] = result
except Exception as e:
logger.error(f"Model '{model.name}' failed: {e}")
raise
combined_results_table = _create_combined_results_table(
results=model_results,
dataset=dataset,
parent_run_id=parent_run_id,
)
# Log summary metrics to parent run
summary_metrics = {}
for model_name, result in model_results.items():
for metric_name, value in result.metrics_summary.items():
summary_metrics[f"{model_name}_{metric_name}"] = value
mlflow.log_metrics(summary_metrics)
mlflow.log_text(combined_results_table, "combined_results_table_ref.txt")
# Determine experiment status
experiment_completed_at = datetime.now()
experiment_duration = (
experiment_completed_at - experiment_started_at
).total_seconds()
if len(model_results) == len(experiment.models):
experiment_status = ExperimentStatus.COMPLETED
elif len(model_results) > 0:
experiment_status = ExperimentStatus.PARTIAL
else:
experiment_status = ExperimentStatus.FAILED
# Get metrics list (filter out callables)
metrics_enabled = [str(m) for m in experiment.metrics if not callable(m)]
# Register experiment in the registry
_register_experiment(
experiment_id=experiment_id,
experiment_name=experiment.name,
eval_table=dataset.table,
dataset_hash=dataset_hash,
config=experiment.to_dict(),
metrics_enabled=metrics_enabled,
mlflow_experiment=mlflow_exp_path,
parent_mlflow_run_id=parent_mlflow_run_id,
results_table=combined_results_table,
num_models=len(experiment.models),
row_count=row_count,
status=experiment_status,
created_at=experiment_started_at,
completed_at=experiment_completed_at,
duration_seconds=experiment_duration,
description=experiment.description,
tags=experiment.tags,
)
logger.info(f"Experiment completed: {experiment.name} ({experiment_id})")
logger.info(f"Combined results: {combined_results_table}")
return ExperimentResult(
experiment_id=experiment_id,
experiment_name=experiment.name,
parent_run_id=parent_mlflow_run_id,
mlflow_experiment=mlflow_exp_path,
dataset_hash=dataset_hash,
results_table=combined_results_table,
model_results=model_results,
)