Source code for pharia_studio_sdk.evaluation.run.run_repository

from abc import ABC, abstractmethod
from collections.abc import Iterable, Sequence
from multiprocessing import Lock as lock
from multiprocessing.synchronize import Lock
from typing import final

from pharia_inference_sdk.core import Output, Tracer
from pydantic import BaseModel

from pharia_studio_sdk.evaluation.run.domain import (
    ExampleOutput,
    FailedExampleRun,
    RunOverview,
)


class RecoveryData(BaseModel):
    run_id: str
    finished_examples: list[str] = []


[docs] class RunRepository(ABC): """Base run repository interface. Provides methods to store and load run results: :class:`RunOverview` and :class:`ExampleOutput`. A :class:`RunOverview` is created from and is linked (by its ID) to multiple :class:`ExampleOutput`s representing results of a dataset. """ def __init__(self) -> None: self.locks: dict[str, Lock] = {}
[docs] @abstractmethod def store_run_overview(self, overview: RunOverview) -> None: """Stores a :class:`RunOverview`. Args: overview: The overview to be persisted. """ pass
@abstractmethod def _create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None: pass @abstractmethod def _delete_temporary_run_data(self, tmp_hash: str) -> None: pass @abstractmethod def _temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: pass
[docs] @abstractmethod def finished_examples(self, tmp_hash: str) -> RecoveryData | None: pass
[docs] @final def create_temporary_run_data(self, tmp_hash: str, run_id: str) -> None: self.locks[tmp_hash] = lock() self._create_temporary_run_data(tmp_hash, run_id)
[docs] @final def delete_temporary_run_data(self, tmp_hash: str) -> None: del self.locks[tmp_hash] self._delete_temporary_run_data(tmp_hash)
[docs] @final def temp_store_finished_example(self, tmp_hash: str, example_id: str) -> None: with self.locks[tmp_hash]: self._temp_store_finished_example(tmp_hash, example_id)
[docs] @abstractmethod def run_overview(self, run_id: str) -> RunOverview | None: """Returns a :class:`RunOverview` for the given ID. Args: run_id: ID of the run overview to retrieve. Returns: :class:`RunOverview` if it was found, `None` otherwise. """ ...
[docs] def run_overviews(self) -> Iterable[RunOverview]: """Returns all :class:`RunOverview`s sorted by their ID. Yields: :class:`Iterable` of :class:`RunOverview`s. """ for run_id in self.run_overview_ids(): run_overview = self.run_overview(run_id) if run_overview is not None: yield run_overview
[docs] @abstractmethod def run_overview_ids(self) -> Sequence[str]: """Returns sorted IDs of all stored :class:`RunOverview`s. Returns: A :class:`Sequence` of the :class:`RunOverview` IDs. """ ...
[docs] @abstractmethod def store_example_output(self, example_output: ExampleOutput[Output]) -> None: """Stores an :class:`ExampleOutput`. Args: example_output: The example output to be persisted. """ ...
[docs] @final def store_example_output_parallel( self, tmp_hash: str, example_output: ExampleOutput[Output] ) -> None: with self.locks[tmp_hash]: self.store_example_output(example_output)
[docs] @abstractmethod def example_output( self, run_id: str, example_id: str, output_type: type[Output] ) -> ExampleOutput[Output] | ExampleOutput[FailedExampleRun] | None: """Returns :class:`ExampleOutput` for the given run ID and example ID. Args: run_id: The ID of the linked run overview. example_id: ID of the example to retrieve. output_type: Type of output that the `Task` returned in :func:`Task.do_run` Returns: class:`ExampleOutput` if it was found, `None` otherwise. """ ...
[docs] @abstractmethod def example_outputs( self, run_id: str, output_type: type[Output] ) -> Iterable[ExampleOutput[Output] | ExampleOutput[FailedExampleRun]]: """Returns all :class:`ExampleOutput` for a given run ID sorted by their example ID. Args: run_id: The ID of the run overview. output_type: Type of output that the `Task` returned in :func:`Task.do_run` Returns: :class:`Iterable` of :class:`ExampleOutput`s. """ ...
[docs] @abstractmethod def example_output_ids(self, run_id: str) -> Sequence[str]: """Returns the sorted IDs of all :class:`ExampleOutput`s for a given run ID. Args: run_id: The ID of the run overview. Returns: A :class:`Sequence` of all :class:`ExampleOutput` IDs. """ ...
[docs] def successful_example_outputs( self, run_id: str, output_type: type[Output] ) -> Iterable[ExampleOutput[Output]]: """Returns all :class:`ExampleOutput` for successful example runs with a given run-overview ID sorted by their example ID. Args: run_id: The ID of the run overview. output_type: Type of output that the `Task` returned in :func:`Task.do_run` Returns: :class:`Iterable` of :class:`ExampleOutput`s. """ results = self.example_outputs(run_id, output_type) return (r for r in results if not isinstance(r.output, FailedExampleRun)) # type: ignore
[docs] def failed_example_outputs( self, run_id: str, output_type: type[Output] ) -> Iterable[ExampleOutput[FailedExampleRun]]: """Returns all :class:`ExampleOutput` for failed example runs with a given run-overview ID sorted by their example ID. Args: run_id: The ID of the run overview. output_type: Type of output that the `Task` returned in :func:`Task.do_run` Returns: :class:`Iterable` of :class:`ExampleOutput`s. """ results = self.example_outputs(run_id, output_type) return (r for r in results if isinstance(r.output, FailedExampleRun)) # type: ignore
[docs] @abstractmethod def example_tracer(self, run_id: str, example_id: str) -> Tracer | None: """Returns an :class:`Optional[Tracer]` for the given run ID and example ID. Args: run_id: The ID of the linked run overview. example_id: ID of the example whose :class:`Tracer` should be retrieved. Returns: A :class:`Tracer` if it was found, `None` otherwise. """ ...
[docs] @abstractmethod def create_tracer_for_example(self, run_id: str, example_id: str) -> Tracer: """Creates and returns a :class:`Tracer` for the given run ID and example ID. Args: run_id: The ID of the linked run overview. example_id: ID of the example whose :class:`Tracer` should be retrieved. Returns: A :.class:`Tracer`. """ ...