Source code for pharia_studio_sdk.evaluation.benchmark.studio_benchmark

import itertools
from collections.abc import Sequence
from datetime import datetime, timezone
from http import HTTPStatus
from typing import Any

import requests
from pharia_inference_sdk.core import Input, Output
from pharia_inference_sdk.core.task import Task
from pharia_inference_sdk.core.tracer.tracer import ExportedSpan
from pydantic import TypeAdapter
from tqdm import tqdm

from pharia_studio_sdk.connectors.studio.studio import (
    AggregationLogicIdentifier,
    BenchmarkLineage,
    EvaluationLogicIdentifier,
    PostBenchmarkExecution,
    StudioClient,
)
from pharia_studio_sdk.evaluation.aggregation.aggregator import (
    AggregationLogic,
    Aggregator,
)
from pharia_studio_sdk.evaluation.aggregation.domain import AggregatedEvaluation
from pharia_studio_sdk.evaluation.aggregation.in_memory_aggregation_repository import (
    InMemoryAggregationRepository,
)
from pharia_studio_sdk.evaluation.benchmark.benchmark import (
    Benchmark,
    BenchmarkRepository,
)
from pharia_studio_sdk.evaluation.benchmark.get_code import get_source_notebook_safe
from pharia_studio_sdk.evaluation.benchmark.trace_information import (
    extract_latency_from_trace,
    extract_token_count_from_trace,
)
from pharia_studio_sdk.evaluation.dataset.domain import ExpectedOutput
from pharia_studio_sdk.evaluation.dataset.studio_dataset_repository import (
    StudioDatasetRepository,
)
from pharia_studio_sdk.evaluation.evaluation.domain import Evaluation
from pharia_studio_sdk.evaluation.evaluation.evaluator.evaluator import (
    EvaluationLogic,
    Evaluator,
)
from pharia_studio_sdk.evaluation.evaluation.in_memory_evaluation_repository import (
    InMemoryEvaluationRepository,
)
from pharia_studio_sdk.evaluation.infrastructure.repository_navigator import (
    EvaluationLineage,
)
from pharia_studio_sdk.evaluation.run.in_memory_run_repository import (
    InMemoryRunRepository,
)
from pharia_studio_sdk.evaluation.run.runner import Runner


[docs] class StudioBenchmark(Benchmark): def __init__( self, benchmark_id: str, name: str, dataset_id: str, eval_logic: EvaluationLogic[Input, Output, ExpectedOutput, Evaluation], aggregation_logic: AggregationLogic[Evaluation, AggregatedEvaluation], studio_client: StudioClient, **kwargs: Any, ): self.id = benchmark_id self.name = name self.dataset_id = dataset_id self.eval_logic = eval_logic self.aggregation_logic = aggregation_logic self.client = studio_client self.run_repository = InMemoryRunRepository() self.evaluation_repository = InMemoryEvaluationRepository() self.aggregation_repository = InMemoryAggregationRepository() self.dataset_repository = StudioDatasetRepository(self.client) self.evaluator = Evaluator( self.dataset_repository, self.run_repository, self.evaluation_repository, f"benchmark-{self.id}-evaluator", self.eval_logic, ) self.aggregator = Aggregator( self.evaluation_repository, self.aggregation_repository, f"benchmark-{self.id}-aggregator", self.aggregation_logic, )
[docs] def execute( self, task: Task[Input, Output], name: str, description: str | None = None, labels: set[str] | None = None, metadata: dict[str, Any] | None = None, max_workers: int = 10, ) -> str: start = datetime.now(timezone.utc) runner = Runner( task, self.dataset_repository, self.run_repository, f"benchmark-{self.id}-runner", ) run_overview = runner.run_dataset( self.dataset_id, description=description, labels=labels, metadata=metadata, max_workers=max_workers, ) evaluation_overview = self.evaluator.evaluate_runs( run_overview.id, description=description, labels=labels, metadata=metadata ) aggregation_overview = self.aggregator.aggregate_evaluation( evaluation_overview.id, description=description, labels=labels, metadata=metadata, ) end = datetime.now(timezone.utc) evaluation_lineages = list( self.evaluator.evaluation_lineages(evaluation_overview.id) ) run_traces = [ self._trace_from_lineage(lineage) for lineage in evaluation_lineages ] tokens_per_trace = [ extract_token_count_from_trace(trace) for trace in run_traces ] latency_per_trace = [extract_latency_from_trace(trace) for trace in run_traces] tokens_per_successful_trace, latency_per_successful_trace = ( self._filter_for_succesful_runs( (tokens_per_trace, latency_per_trace), source_lineage_list=evaluation_lineages, run_id=run_overview.id, ) ) def average_or_zero(list: list) -> float: return sum(list) / len(list) if len(list) > 0 else 0 benchmark_execution_data = PostBenchmarkExecution( name=name, description=description, labels=labels, metadata=metadata, start=start, end=end, run_start=run_overview.start, run_end=run_overview.end, run_successful_count=run_overview.successful_example_count, run_failed_count=run_overview.failed_example_count, run_success_avg_latency=average_or_zero(latency_per_successful_trace), run_success_avg_token_count=average_or_zero(tokens_per_successful_trace), eval_start=evaluation_overview.start_date, eval_end=evaluation_overview.end_date, eval_successful_count=evaluation_overview.successful_evaluation_count, eval_failed_count=evaluation_overview.failed_evaluation_count, aggregation_start=aggregation_overview.start, aggregation_end=aggregation_overview.end, statistics=aggregation_overview.statistics.model_dump_json(), ) benchmark_execution_id = self.client.submit_benchmark_execution( benchmark_id=self.id, data=benchmark_execution_data ) trace_ids = [] for trace in tqdm(run_traces, desc="Submitting traces to Studio"): trace_id = self.client.submit_trace(trace) trace_ids.append(trace_id) benchmark_lineages = self._create_benchmark_lineages( eval_lineages=evaluation_lineages, trace_ids=trace_ids, latencies_per_trace=latency_per_trace, tokens_per_trace=tokens_per_trace, ) self.client.submit_benchmark_lineages( benchmark_lineages=benchmark_lineages, execution_id=benchmark_execution_id, benchmark_id=self.id, ) return benchmark_execution_id
def _filter_for_succesful_runs( self, lists_to_filter: tuple[list, ...], source_lineage_list: list[ EvaluationLineage[Input, ExpectedOutput, Output, Evaluation] ], run_id: str, ) -> tuple[list, ...]: """This method assumes that lists_to_filter and source_lineage_list are all equal length.""" failed_example_output_ids = [ example_output.example_id for example_output in self.run_repository.failed_example_outputs( run_id=run_id, output_type=self.evaluator.output_type() ) ] is_successful_run = [ lineage.example.id not in failed_example_output_ids for lineage in source_lineage_list ] return tuple( list(itertools.compress(sublist, is_successful_run)) for sublist in lists_to_filter ) def _trace_from_lineage( self, eval_lineage: EvaluationLineage[Input, ExpectedOutput, Output, Evaluation] ) -> Sequence[ExportedSpan]: # since we have 1 output only in this scenario, we expected to have exactly 1 tracer trace = eval_lineage.tracers[0] assert trace, "eval lineages always should have at least 1 tracer" return trace.export_for_viewing() def _create_benchmark_lineages( self, eval_lineages: list[ EvaluationLineage[Input, ExpectedOutput, Output, Evaluation] ], trace_ids: list[str], latencies_per_trace: list[int], tokens_per_trace: list[int], ) -> Sequence[BenchmarkLineage[Input, ExpectedOutput, Output, Evaluation]]: return [ self._create_benchmark_lineage( eval_lineage, trace_id, run_latency, run_tokens ) for eval_lineage, trace_id, run_latency, run_tokens in zip( eval_lineages, trace_ids, latencies_per_trace, tokens_per_trace, strict=True, ) ] def _create_benchmark_lineage( self, eval_lineage: EvaluationLineage[Input, ExpectedOutput, Output, Evaluation], trace_id: str, run_latency: int, run_tokens: int, ) -> BenchmarkLineage: return BenchmarkLineage( trace_id=trace_id, input=eval_lineage.example.input, expected_output=eval_lineage.example.expected_output, example_metadata=eval_lineage.example.metadata, output=eval_lineage.outputs[0].output, evaluation=eval_lineage.evaluation.result, run_latency=run_latency, run_tokens=run_tokens, )
[docs] class StudioBenchmarkRepository(BenchmarkRepository): def __init__(self, studio_client: StudioClient): self.client = studio_client
[docs] def create_benchmark( self, dataset_id: str, eval_logic: EvaluationLogic[Input, Output, ExpectedOutput, Evaluation], aggregation_logic: AggregationLogic[Evaluation, AggregatedEvaluation], name: str, metadata: dict[str, Any] | None = None, description: str | None = None, ) -> StudioBenchmark: try: benchmark_id = self.client.submit_benchmark( dataset_id, create_evaluation_logic_identifier(eval_logic), create_aggregation_logic_identifier(aggregation_logic), name, description, metadata, ) except requests.HTTPError as e: if ( e.response.status_code == HTTPStatus.BAD_REQUEST or e.response.status_code == HTTPStatus.NOT_FOUND ): raise ValueError(f"Dataset with ID {dataset_id} not found") from e if e.response.status_code == HTTPStatus.CONFLICT: raise ValueError( f"""Benchmark with name "{name}" already exists. Names of Benchmarks in the same Project must be unique.""" ) from e else: raise ValueError( "An error occurred when attempting to create a benchmark." ) from e return StudioBenchmark( benchmark_id, name, dataset_id, eval_logic, aggregation_logic, studio_client=self.client, )
[docs] def get_benchmark( self, benchmark_id: str, eval_logic: EvaluationLogic[Input, Output, ExpectedOutput, Evaluation], aggregation_logic: AggregationLogic[Evaluation, AggregatedEvaluation], allow_diff: bool = False, ) -> StudioBenchmark | None: benchmark = self.client.get_benchmark(benchmark_id) if benchmark is None: return None return StudioBenchmark( benchmark_id, benchmark.name, benchmark.dataset_id, eval_logic, aggregation_logic, self.client, )
def type_to_schema(type_: type) -> dict[str, Any]: return TypeAdapter(type_).json_schema() def create_evaluation_logic_identifier( eval_logic: EvaluationLogic[Input, Output, ExpectedOutput, Evaluation], ) -> EvaluationLogicIdentifier: evaluator = Evaluator( dataset_repository=None, # type: ignore run_repository=None, # type: ignore evaluation_repository=None, # type: ignore description="", evaluation_logic=eval_logic, ) return EvaluationLogicIdentifier( logic=get_source_notebook_safe(eval_logic), input_schema=type_to_schema(evaluator.input_type()), output_schema=type_to_schema(evaluator.output_type()), expected_output_schema=type_to_schema(evaluator.expected_output_type()), evaluation_schema=type_to_schema(evaluator.evaluation_type()), ) def create_aggregation_logic_identifier( aggregation_logic: AggregationLogic[Evaluation, AggregatedEvaluation], ) -> AggregationLogicIdentifier: aggregator = Aggregator( evaluation_repository=None, # type: ignore aggregation_repository=None, # type: ignore description="", aggregation_logic=aggregation_logic, ) return AggregationLogicIdentifier( logic=get_source_notebook_safe(aggregation_logic), evaluation_schema=type_to_schema(aggregator.evaluation_type()), aggregation_schema=type_to_schema(aggregator.aggregated_evaluation_type()), )