Skip to content

Domain

domain

Domain layer for gepa-adk evolution engine.

This module exports the core domain models for the GEPA-ADK evolution engine.

ATTRIBUTE DESCRIPTION
EvolutionConfig

Configuration parameters for evolution runs.

TYPE: class

EvolutionResult

Outcome of a completed evolution run.

TYPE: class

Candidate

Instruction candidate being evolved.

TYPE: class

IterationRecord

Metrics for a single evolution iteration.

TYPE: class

Score

Type alias for normalized scores.

TYPE: type

ComponentName

Type alias for component identifiers.

TYPE: type

ModelName

Type alias for model identifiers.

TYPE: type

TrajectoryConfig

Configuration for trajectory extraction.

TYPE: class

EvolutionError

Base exception for all gepa-adk errors.

TYPE: class

ConfigurationError

Raised when configuration validation fails.

TYPE: class

EvaluationError

Raised when batch evaluation fails.

TYPE: class

AdapterError

Raised when adapter operations fail.

TYPE: class

Examples:

Basic usage with configuration and records:

from gepa_adk.domain import EvolutionConfig, IterationRecord

config = EvolutionConfig(max_iterations=20)
record = IterationRecord(
    iteration_number=1,
    score=0.85,
    component_text="Test",
    evolved_component="instruction",
    accepted=True,
)
See Also
Note

This package contains pure domain logic with no external dependencies. All models follow hexagonal architecture principles (ADR-000).

COMPONENT_GENERATE_CONFIG module-attribute

COMPONENT_GENERATE_CONFIG: ComponentName = (
    "generate_content_config"
)

Component name for LLM generation configuration (temperature, top_p, etc.).

COMPONENT_INSTRUCTION module-attribute

COMPONENT_INSTRUCTION: ComponentName = 'instruction'

Component name for agent instructions (same as DEFAULT_COMPONENT_NAME).

COMPONENT_OUTPUT_SCHEMA module-attribute

COMPONENT_OUTPUT_SCHEMA: ComponentName = 'output_schema'

Component name for Pydantic output schema definitions.

DEFAULT_COMPONENT_NAME module-attribute

DEFAULT_COMPONENT_NAME: ComponentName = 'instruction'

Default component name for single-component evolution.

This constant provides a single source of truth for the default component name used when evolving a single component (typically an agent's instruction). Use this constant instead of hardcoding 'instruction' throughout the codebase.

AncestorLog module-attribute

AncestorLog: TypeAlias = tuple[int, int, int]

Type alias for tracking attempted merges.

Represents a merge attempt triplet that has been tried, preventing duplicate merges.

Type

tuple[int, int, int]: (parent1_idx, parent2_idx, ancestor_idx)

Examples:

from gepa_adk.domain.types import AncestorLog

# Log of attempted merge
log: AncestorLog = (5, 8, 2)  # (parent1_idx, parent2_idx, ancestor_idx)
Note

Type alias used by MergeProposer to track which merge combinations have already been attempted, preventing redundant merge operations.

ComponentName module-attribute

ComponentName: TypeAlias = str

Name of a candidate component (e.g., 'instruction', 'output_schema').

MergeAttempt module-attribute

MergeAttempt: TypeAlias = (
    tuple["Candidate", int, int, int] | None
)

Type alias for merge attempt results.

Represents a successful merge attempt with the merged candidate and parent/ancestor indices, or None if merge was not possible.

Type:
    tuple[Candidate, int, int, int] | None: (merged_candidate, parent1_idx,
        parent2_idx, ancestor_idx) or None

Examples:

from gepa_adk.domain.types import MergeAttempt
from gepa_adk.domain.models import Candidate

# Successful merge
attempt: MergeAttempt = (
    Candidate(components={"instruction": "..."}),
    5,  # parent1_idx
    8,  # parent2_idx
    2,  # ancestor_idx
)

# Failed merge
failed: MergeAttempt = None
Note

Type alias reserved for future merge reporting; currently unused but kept for parity with the merge-proposer design docs.

ModelName module-attribute

ModelName: TypeAlias = str

Model identifier (e.g., 'gemini-2.5-flash', 'gpt-4o').

MultiAgentCandidate module-attribute

MultiAgentCandidate: TypeAlias = dict[str, str]

Type alias for multi-agent candidate structure.

Maps qualified component names to their values using dot-separated format: {agent_name}.{component_name} as the key.

See ADR-012 for the addressing scheme rationale.

Examples:

Basic multi-agent candidate:

from gepa_adk.domain.types import MultiAgentCandidate, ComponentSpec

# Using ComponentSpec for type-safe construction
gen_inst = ComponentSpec(agent="generator", component="instruction")
critic_schema = ComponentSpec(agent="critic", component="output_schema")

candidate: MultiAgentCandidate = {
    gen_inst.qualified: "Generate Python code...",
    critic_schema.qualified: "Review code output schema...",
}

# Equivalent direct construction
candidate: MultiAgentCandidate = {
    "generator.instruction": "Generate Python code...",
    "critic.output_schema": "Review code output schema...",
}
Note

This type alias is compatible with GEPA's Candidate type (dict[str, str]), enabling seamless integration with existing mutation proposers and evolution engine components.

Score module-attribute

Score: TypeAlias = float

Normalized score, typically in [0.0, 1.0].

AdapterError

Bases: EvaluationError


              flowchart TD
              gepa_adk.domain.AdapterError[AdapterError]
              gepa_adk.domain.exceptions.EvaluationError[EvaluationError]
              gepa_adk.domain.exceptions.EvolutionError[EvolutionError]

                              gepa_adk.domain.exceptions.EvaluationError --> gepa_adk.domain.AdapterError
                                gepa_adk.domain.exceptions.EvolutionError --> gepa_adk.domain.exceptions.EvaluationError
                



              click gepa_adk.domain.AdapterError href "" "gepa_adk.domain.AdapterError"
              click gepa_adk.domain.exceptions.EvaluationError href "" "gepa_adk.domain.exceptions.EvaluationError"
              click gepa_adk.domain.exceptions.EvolutionError href "" "gepa_adk.domain.exceptions.EvolutionError"
            

Raised when an adapter operation fails.

This exception is used by adapter implementations (e.g., ADKAdapter) for adapter-specific failures such as session errors or configuration issues.

Examples:

Raising an adapter error:

from gepa_adk.domain.exceptions import AdapterError

if not self._session_service:
    raise AdapterError(
        "Session service unavailable",
        adapter="ADKAdapter",
        operation="evaluate",
    )
Note

AdapterError is a subclass of EvaluationError, so callers can catch either for different granularity of error handling.

Source code in src/gepa_adk/domain/exceptions.py
class AdapterError(EvaluationError):
    """Raised when an adapter operation fails.

    This exception is used by adapter implementations (e.g., ADKAdapter)
    for adapter-specific failures such as session errors or configuration
    issues.

    Examples:
        Raising an adapter error:

        ```python
        from gepa_adk.domain.exceptions import AdapterError

        if not self._session_service:
            raise AdapterError(
                "Session service unavailable",
                adapter="ADKAdapter",
                operation="evaluate",
            )
        ```

    Note:
        AdapterError is a subclass of EvaluationError, so callers can
        catch either for different granularity of error handling.
    """

ConfigurationError

Bases: EvolutionError


              flowchart TD
              gepa_adk.domain.ConfigurationError[ConfigurationError]
              gepa_adk.domain.exceptions.EvolutionError[EvolutionError]

                              gepa_adk.domain.exceptions.EvolutionError --> gepa_adk.domain.ConfigurationError
                


              click gepa_adk.domain.ConfigurationError href "" "gepa_adk.domain.ConfigurationError"
              click gepa_adk.domain.exceptions.EvolutionError href "" "gepa_adk.domain.exceptions.EvolutionError"
            

Raised when configuration validation fails.

This exception is raised during EvolutionConfig initialization when a parameter violates its validation constraints.

ATTRIBUTE DESCRIPTION
field

The name of the configuration field that failed validation.

TYPE: str | None

value

The invalid value that was provided.

TYPE: object

constraint

Description of the validation constraint that was violated.

TYPE: str | None

Examples:

Creating a configuration error with context:

from gepa_adk.domain.exceptions import ConfigurationError

error = ConfigurationError(
    "max_iterations must be non-negative",
    field="max_iterations",
    value=-5,
    constraint=">= 0",
)
print(error.field, error.value, error.constraint)
# Output: max_iterations -5 >= 0
Note

Arises from user-provided invalid settings, not programming errors. Should be caught and reported with clear guidance on valid values.

Source code in src/gepa_adk/domain/exceptions.py
class ConfigurationError(EvolutionError):
    """Raised when configuration validation fails.

    This exception is raised during EvolutionConfig initialization
    when a parameter violates its validation constraints.

    Attributes:
        field (str | None): The name of the configuration field that failed
            validation.
        value (object): The invalid value that was provided.
        constraint (str | None): Description of the validation constraint that
            was violated.

    Examples:
        Creating a configuration error with context:

        ```python
        from gepa_adk.domain.exceptions import ConfigurationError

        error = ConfigurationError(
            "max_iterations must be non-negative",
            field="max_iterations",
            value=-5,
            constraint=">= 0",
        )
        print(error.field, error.value, error.constraint)
        # Output: max_iterations -5 >= 0
        ```

    Note:
        Arises from user-provided invalid settings, not programming errors.
        Should be caught and reported with clear guidance on valid values.
    """

    def __init__(
        self,
        message: str,
        *,
        field: str | None = None,
        value: object = None,
        constraint: str | None = None,
    ) -> None:
        """Initialize ConfigurationError with context.

        Args:
            message: Human-readable error description.
            field: Name of the invalid configuration field.
            value: The invalid value provided.
            constraint: Description of the validation constraint.

        Note:
            Context fields use keyword-only syntax to ensure explicit labeling
            and prevent positional argument mistakes.
        """
        super().__init__(message)
        self.field = field
        self.value = value
        self.constraint = constraint

    def __str__(self) -> str:
        """Return string representation with context.

        Returns:
            Formatted error message including field and value context.

        Note:
            Outputs formatted error message with field and value context
            when available, preserving base message structure.
        """
        base = super().__str__()
        context_parts = []
        if self.field is not None:
            context_parts.append(f"field={self.field!r}")
        if self.value is not None:
            context_parts.append(f"value={self.value!r}")
        if context_parts:
            return f"{base} [{', '.join(context_parts)}]"
        return base

__init__

__init__(
    message: str,
    *,
    field: str | None = None,
    value: object = None,
    constraint: str | None = None,
) -> None

Initialize ConfigurationError with context.

PARAMETER DESCRIPTION
message

Human-readable error description.

TYPE: str

field

Name of the invalid configuration field.

TYPE: str | None DEFAULT: None

value

The invalid value provided.

TYPE: object DEFAULT: None

constraint

Description of the validation constraint.

TYPE: str | None DEFAULT: None

Note

Context fields use keyword-only syntax to ensure explicit labeling and prevent positional argument mistakes.

Source code in src/gepa_adk/domain/exceptions.py
def __init__(
    self,
    message: str,
    *,
    field: str | None = None,
    value: object = None,
    constraint: str | None = None,
) -> None:
    """Initialize ConfigurationError with context.

    Args:
        message: Human-readable error description.
        field: Name of the invalid configuration field.
        value: The invalid value provided.
        constraint: Description of the validation constraint.

    Note:
        Context fields use keyword-only syntax to ensure explicit labeling
        and prevent positional argument mistakes.
    """
    super().__init__(message)
    self.field = field
    self.value = value
    self.constraint = constraint

__str__

__str__() -> str

Return string representation with context.

RETURNS DESCRIPTION
str

Formatted error message including field and value context.

Note

Outputs formatted error message with field and value context when available, preserving base message structure.

Source code in src/gepa_adk/domain/exceptions.py
def __str__(self) -> str:
    """Return string representation with context.

    Returns:
        Formatted error message including field and value context.

    Note:
        Outputs formatted error message with field and value context
        when available, preserving base message structure.
    """
    base = super().__str__()
    context_parts = []
    if self.field is not None:
        context_parts.append(f"field={self.field!r}")
    if self.value is not None:
        context_parts.append(f"value={self.value!r}")
    if context_parts:
        return f"{base} [{', '.join(context_parts)}]"
    return base

CriticOutputParseError

Bases: ScoringError


              flowchart TD
              gepa_adk.domain.CriticOutputParseError[CriticOutputParseError]
              gepa_adk.domain.exceptions.ScoringError[ScoringError]
              gepa_adk.domain.exceptions.EvolutionError[EvolutionError]

                              gepa_adk.domain.exceptions.ScoringError --> gepa_adk.domain.CriticOutputParseError
                                gepa_adk.domain.exceptions.EvolutionError --> gepa_adk.domain.exceptions.ScoringError
                



              click gepa_adk.domain.CriticOutputParseError href "" "gepa_adk.domain.CriticOutputParseError"
              click gepa_adk.domain.exceptions.ScoringError href "" "gepa_adk.domain.exceptions.ScoringError"
              click gepa_adk.domain.exceptions.EvolutionError href "" "gepa_adk.domain.exceptions.EvolutionError"
            

Raised when critic agent output cannot be parsed as valid JSON.

This exception indicates that the critic agent returned output that could not be parsed as JSON or did not conform to the expected schema.

ATTRIBUTE DESCRIPTION
raw_output

The unparseable output from critic.

TYPE: str

parse_error

Description of parsing failure.

TYPE: str

Examples:

Handling parse errors:

from gepa_adk.domain.exceptions import CriticOutputParseError

try:
    score, metadata = await scorer.async_score(...)
except CriticOutputParseError as e:
    print(f"Invalid JSON: {e.raw_output}")
    print(f"Error: {e.parse_error}")
Note

Arises when critic agent output cannot be parsed as valid JSON. Typically occurs when LLM output doesn't follow structured format despite output_schema being set.

Source code in src/gepa_adk/domain/exceptions.py
class CriticOutputParseError(ScoringError):
    """Raised when critic agent output cannot be parsed as valid JSON.

    This exception indicates that the critic agent returned output that
    could not be parsed as JSON or did not conform to the expected schema.

    Attributes:
        raw_output (str): The unparseable output from critic.
        parse_error (str): Description of parsing failure.

    Examples:
        Handling parse errors:

        ```python
        from gepa_adk.domain.exceptions import CriticOutputParseError

        try:
            score, metadata = await scorer.async_score(...)
        except CriticOutputParseError as e:
            print(f"Invalid JSON: {e.raw_output}")
            print(f"Error: {e.parse_error}")
        ```

    Note:
        Arises when critic agent output cannot be parsed as valid JSON.
        Typically occurs when LLM output doesn't follow structured format
        despite output_schema being set.
    """

    def __init__(
        self,
        message: str,
        *,
        raw_output: str,
        parse_error: str,
        cause: Exception | None = None,
    ) -> None:
        """Initialize CriticOutputParseError with context.

        Args:
            message: Human-readable error description.
            raw_output: The unparseable output string from critic.
            parse_error: Description of the parsing failure.
            cause: Original exception that caused this error.

        Note:
            Context fields use keyword-only syntax to ensure explicit labeling
            and prevent positional argument mistakes.
        """
        super().__init__(message, cause=cause)
        self.raw_output = raw_output
        self.parse_error = parse_error

    def __str__(self) -> str:
        """Return string with parse error details.

        Returns:
            Formatted message including parse error and raw output preview.

        Note:
            Outputs formatted error message with parse error and raw output
            preview (truncated to 100 chars), preserving base message structure.
        """
        base = super().__str__()
        output_preview = (
            self.raw_output[:100] + "..."
            if len(self.raw_output) > 100
            else self.raw_output
        )
        return (
            f"{base} [parse_error={self.parse_error!r}, raw_output={output_preview!r}]"
        )

__init__

__init__(
    message: str,
    *,
    raw_output: str,
    parse_error: str,
    cause: Exception | None = None,
) -> None

Initialize CriticOutputParseError with context.

PARAMETER DESCRIPTION
message

Human-readable error description.

TYPE: str

raw_output

The unparseable output string from critic.

TYPE: str

parse_error

Description of the parsing failure.

TYPE: str

cause

Original exception that caused this error.

TYPE: Exception | None DEFAULT: None

Note

Context fields use keyword-only syntax to ensure explicit labeling and prevent positional argument mistakes.

Source code in src/gepa_adk/domain/exceptions.py
def __init__(
    self,
    message: str,
    *,
    raw_output: str,
    parse_error: str,
    cause: Exception | None = None,
) -> None:
    """Initialize CriticOutputParseError with context.

    Args:
        message: Human-readable error description.
        raw_output: The unparseable output string from critic.
        parse_error: Description of the parsing failure.
        cause: Original exception that caused this error.

    Note:
        Context fields use keyword-only syntax to ensure explicit labeling
        and prevent positional argument mistakes.
    """
    super().__init__(message, cause=cause)
    self.raw_output = raw_output
    self.parse_error = parse_error

__str__

__str__() -> str

Return string with parse error details.

RETURNS DESCRIPTION
str

Formatted message including parse error and raw output preview.

Note

Outputs formatted error message with parse error and raw output preview (truncated to 100 chars), preserving base message structure.

Source code in src/gepa_adk/domain/exceptions.py
def __str__(self) -> str:
    """Return string with parse error details.

    Returns:
        Formatted message including parse error and raw output preview.

    Note:
        Outputs formatted error message with parse error and raw output
        preview (truncated to 100 chars), preserving base message structure.
    """
    base = super().__str__()
    output_preview = (
        self.raw_output[:100] + "..."
        if len(self.raw_output) > 100
        else self.raw_output
    )
    return (
        f"{base} [parse_error={self.parse_error!r}, raw_output={output_preview!r}]"
    )

EvaluationError

Bases: EvolutionError


              flowchart TD
              gepa_adk.domain.EvaluationError[EvaluationError]
              gepa_adk.domain.exceptions.EvolutionError[EvolutionError]

                              gepa_adk.domain.exceptions.EvolutionError --> gepa_adk.domain.EvaluationError
                


              click gepa_adk.domain.EvaluationError href "" "gepa_adk.domain.EvaluationError"
              click gepa_adk.domain.exceptions.EvolutionError href "" "gepa_adk.domain.exceptions.EvolutionError"
            

Raised when batch evaluation fails.

This exception indicates failures during agent evaluation, such as agent execution errors, timeout, or malformed output.

ATTRIBUTE DESCRIPTION
cause

Original exception that caused this error.

TYPE: Exception | None

context

Additional context for debugging.

TYPE: dict

Examples:

Wrapping an ADK error:

from gepa_adk.domain.exceptions import EvaluationError

try:
    result = await runner.run_async(...)
except ADKError as e:
    raise EvaluationError(
        "Agent execution failed",
        cause=e,
        agent_name="my_agent",
    ) from e
Note

Always preserves the original cause for debugging while providing a consistent interface for error handling.

Source code in src/gepa_adk/domain/exceptions.py
class EvaluationError(EvolutionError):
    """Raised when batch evaluation fails.

    This exception indicates failures during agent evaluation,
    such as agent execution errors, timeout, or malformed output.

    Attributes:
        cause (Exception | None): Original exception that caused this error.
        context (dict): Additional context for debugging.

    Examples:
        Wrapping an ADK error:

        ```python
        from gepa_adk.domain.exceptions import EvaluationError

        try:
            result = await runner.run_async(...)
        except ADKError as e:
            raise EvaluationError(
                "Agent execution failed",
                cause=e,
                agent_name="my_agent",
            ) from e
        ```

    Note:
        Always preserves the original cause for debugging while providing
        a consistent interface for error handling.
    """

    def __init__(
        self,
        message: str,
        *,
        cause: Exception | None = None,
        **context: object,
    ) -> None:
        """Initialize EvaluationError with cause and context.

        Args:
            message: Human-readable error description.
            cause: Original exception that caused this error.
            **context: Additional context for debugging (agent_name, etc.).

        Note:
            Context is passed via keyword arguments. Positional arguments
            after message are not allowed.
        """
        super().__init__(message)
        self.cause = cause
        self.context = context

    def __str__(self) -> str:
        """Return string with cause chain if present.

        Returns:
            Formatted message with context and cause information.

        Note:
            Outputs formatted error message with context dict and cause chain
            when available, preserving base message structure.
        """
        base = super().__str__()
        if self.context:
            ctx_str = ", ".join(f"{k}={v!r}" for k, v in self.context.items())
            base = f"{base} [{ctx_str}]"
        if self.cause:
            base = f"{base} (caused by: {self.cause})"
        return base

__init__

__init__(
    message: str,
    *,
    cause: Exception | None = None,
    **context: object,
) -> None

Initialize EvaluationError with cause and context.

PARAMETER DESCRIPTION
message

Human-readable error description.

TYPE: str

cause

Original exception that caused this error.

TYPE: Exception | None DEFAULT: None

**context

Additional context for debugging (agent_name, etc.).

TYPE: object DEFAULT: {}

Note

Context is passed via keyword arguments. Positional arguments after message are not allowed.

Source code in src/gepa_adk/domain/exceptions.py
def __init__(
    self,
    message: str,
    *,
    cause: Exception | None = None,
    **context: object,
) -> None:
    """Initialize EvaluationError with cause and context.

    Args:
        message: Human-readable error description.
        cause: Original exception that caused this error.
        **context: Additional context for debugging (agent_name, etc.).

    Note:
        Context is passed via keyword arguments. Positional arguments
        after message are not allowed.
    """
    super().__init__(message)
    self.cause = cause
    self.context = context

__str__

__str__() -> str

Return string with cause chain if present.

RETURNS DESCRIPTION
str

Formatted message with context and cause information.

Note

Outputs formatted error message with context dict and cause chain when available, preserving base message structure.

Source code in src/gepa_adk/domain/exceptions.py
def __str__(self) -> str:
    """Return string with cause chain if present.

    Returns:
        Formatted message with context and cause information.

    Note:
        Outputs formatted error message with context dict and cause chain
        when available, preserving base message structure.
    """
    base = super().__str__()
    if self.context:
        ctx_str = ", ".join(f"{k}={v!r}" for k, v in self.context.items())
        base = f"{base} [{ctx_str}]"
    if self.cause:
        base = f"{base} (caused by: {self.cause})"
    return base

EvolutionError

Bases: Exception


              flowchart TD
              gepa_adk.domain.EvolutionError[EvolutionError]

              

              click gepa_adk.domain.EvolutionError href "" "gepa_adk.domain.EvolutionError"
            

Base exception for all gepa-adk errors.

All custom exceptions in gepa-adk should inherit from this class to allow for unified exception handling.

Examples:

Catching evolution errors:

from gepa_adk.domain.exceptions import EvolutionError

try:
    raise EvolutionError("Evolution failed unexpectedly")
except EvolutionError as e:
    print(f"Caught: {e}")
# Output: Caught: Evolution failed unexpectedly
Note

Always use this base class or its subclasses for domain errors. Standard Python exceptions should still be raised for programming errors (e.g., TypeError, ValueError for developer mistakes).

Source code in src/gepa_adk/domain/exceptions.py
class EvolutionError(Exception):
    """Base exception for all gepa-adk errors.

    All custom exceptions in gepa-adk should inherit from this class
    to allow for unified exception handling.

    Examples:
        Catching evolution errors:

        ```python
        from gepa_adk.domain.exceptions import EvolutionError

        try:
            raise EvolutionError("Evolution failed unexpectedly")
        except EvolutionError as e:
            print(f"Caught: {e}")
        # Output: Caught: Evolution failed unexpectedly
        ```

    Note:
        Always use this base class or its subclasses for domain errors.
        Standard Python exceptions should still be raised for programming
        errors (e.g., TypeError, ValueError for developer mistakes).
    """

MissingScoreFieldError

Bases: ScoringError


              flowchart TD
              gepa_adk.domain.MissingScoreFieldError[MissingScoreFieldError]
              gepa_adk.domain.exceptions.ScoringError[ScoringError]
              gepa_adk.domain.exceptions.EvolutionError[EvolutionError]

                              gepa_adk.domain.exceptions.ScoringError --> gepa_adk.domain.MissingScoreFieldError
                                gepa_adk.domain.exceptions.EvolutionError --> gepa_adk.domain.exceptions.ScoringError
                



              click gepa_adk.domain.MissingScoreFieldError href "" "gepa_adk.domain.MissingScoreFieldError"
              click gepa_adk.domain.exceptions.ScoringError href "" "gepa_adk.domain.exceptions.ScoringError"
              click gepa_adk.domain.exceptions.EvolutionError href "" "gepa_adk.domain.exceptions.EvolutionError"
            

Raised when score field is missing or null in parsed output.

This exception indicates that the output was successfully parsed and validated, but the required score field is missing or null.

ATTRIBUTE DESCRIPTION
parsed_output

The parsed output without valid score.

TYPE: dict[str, Any]

available_fields

Fields that were present.

TYPE: list[str]

Examples:

Handling missing score field:

from gepa_adk.domain.exceptions import MissingScoreFieldError

try:
    score, metadata = await scorer.async_score(...)
except MissingScoreFieldError as e:
    print(f"Missing score. Available fields: {e.available_fields}")
Note

Applies when parsed output lacks a valid score value. The parsed_output may contain other valid fields that will be preserved in metadata if score is found.

Source code in src/gepa_adk/domain/exceptions.py
class MissingScoreFieldError(ScoringError):
    """Raised when score field is missing or null in parsed output.

    This exception indicates that the output was successfully parsed
    and validated, but the required `score` field is missing or null.

    Attributes:
        parsed_output (dict[str, Any]): The parsed output without valid score.
        available_fields (list[str]): Fields that were present.

    Examples:
        Handling missing score field:

        ```python
        from gepa_adk.domain.exceptions import MissingScoreFieldError

        try:
            score, metadata = await scorer.async_score(...)
        except MissingScoreFieldError as e:
            print(f"Missing score. Available fields: {e.available_fields}")
        ```

    Note:
        Applies when parsed output lacks a valid score value.
        The parsed_output may contain other valid fields that will be
        preserved in metadata if score is found.
    """

    def __init__(
        self,
        message: str,
        *,
        parsed_output: dict[str, Any],
        cause: Exception | None = None,
    ) -> None:
        """Initialize MissingScoreFieldError with parsed output.

        Args:
            message: Human-readable error description.
            parsed_output: The parsed dict without valid score field.
            cause: Original exception that caused this error.

        Note:
            Context fields use keyword-only syntax to ensure explicit labeling
            and prevent positional argument mistakes.
        """
        super().__init__(message, cause=cause)
        self.parsed_output = parsed_output
        self.available_fields = list(parsed_output.keys())

    def __str__(self) -> str:
        """Return string with available fields.

        Returns:
            Formatted message including list of available fields.

        Note:
            Outputs formatted error message with list of available fields
            from parsed output, preserving base message structure.
        """
        base = super().__str__()
        return f"{base} [available_fields={self.available_fields}]"

__init__

__init__(
    message: str,
    *,
    parsed_output: dict[str, Any],
    cause: Exception | None = None,
) -> None

Initialize MissingScoreFieldError with parsed output.

PARAMETER DESCRIPTION
message

Human-readable error description.

TYPE: str

parsed_output

The parsed dict without valid score field.

TYPE: dict[str, Any]

cause

Original exception that caused this error.

TYPE: Exception | None DEFAULT: None

Note

Context fields use keyword-only syntax to ensure explicit labeling and prevent positional argument mistakes.

Source code in src/gepa_adk/domain/exceptions.py
def __init__(
    self,
    message: str,
    *,
    parsed_output: dict[str, Any],
    cause: Exception | None = None,
) -> None:
    """Initialize MissingScoreFieldError with parsed output.

    Args:
        message: Human-readable error description.
        parsed_output: The parsed dict without valid score field.
        cause: Original exception that caused this error.

    Note:
        Context fields use keyword-only syntax to ensure explicit labeling
        and prevent positional argument mistakes.
    """
    super().__init__(message, cause=cause)
    self.parsed_output = parsed_output
    self.available_fields = list(parsed_output.keys())

__str__

__str__() -> str

Return string with available fields.

RETURNS DESCRIPTION
str

Formatted message including list of available fields.

Note

Outputs formatted error message with list of available fields from parsed output, preserving base message structure.

Source code in src/gepa_adk/domain/exceptions.py
def __str__(self) -> str:
    """Return string with available fields.

    Returns:
        Formatted message including list of available fields.

    Note:
        Outputs formatted error message with list of available fields
        from parsed output, preserving base message structure.
    """
    base = super().__str__()
    return f"{base} [available_fields={self.available_fields}]"

MultiAgentValidationError

Bases: EvolutionError


              flowchart TD
              gepa_adk.domain.MultiAgentValidationError[MultiAgentValidationError]
              gepa_adk.domain.exceptions.EvolutionError[EvolutionError]

                              gepa_adk.domain.exceptions.EvolutionError --> gepa_adk.domain.MultiAgentValidationError
                


              click gepa_adk.domain.MultiAgentValidationError href "" "gepa_adk.domain.MultiAgentValidationError"
              click gepa_adk.domain.exceptions.EvolutionError href "" "gepa_adk.domain.exceptions.EvolutionError"
            

Raised when multi-agent configuration validation fails.

This exception is raised during MultiAgentAdapter initialization when a parameter violates its validation constraints.

ATTRIBUTE DESCRIPTION
field

The name of the configuration field that failed validation.

TYPE: str

value

The invalid value that was provided.

TYPE: object

constraint

Description of the validation constraint that was violated.

TYPE: str

Examples:

Creating a multi-agent validation error:

from gepa_adk.domain.exceptions import MultiAgentValidationError

error = MultiAgentValidationError(
    "agents list cannot be empty",
    field="agents",
    value=[],
    constraint="len >= 1",
)
print(error.field, error.value)  # agents []
Note

Arises from user-provided invalid multi-agent settings, not programming errors. Should be caught and reported with clear guidance on valid values.

Source code in src/gepa_adk/domain/exceptions.py
class MultiAgentValidationError(EvolutionError):
    """Raised when multi-agent configuration validation fails.

    This exception is raised during MultiAgentAdapter initialization
    when a parameter violates its validation constraints.

    Attributes:
        field (str): The name of the configuration field that failed
            validation.
        value (object): The invalid value that was provided.
        constraint (str): Description of the validation constraint that
            was violated.

    Examples:
        Creating a multi-agent validation error:

        ```python
        from gepa_adk.domain.exceptions import MultiAgentValidationError

        error = MultiAgentValidationError(
            "agents list cannot be empty",
            field="agents",
            value=[],
            constraint="len >= 1",
        )
        print(error.field, error.value)  # agents []
        ```

    Note:
        Arises from user-provided invalid multi-agent settings, not
        programming errors. Should be caught and reported with clear
        guidance on valid values.
    """

    def __init__(
        self,
        message: str,
        *,
        field: str,
        value: object,
        constraint: str,
    ) -> None:
        """Initialize MultiAgentValidationError with context.

        Args:
            message: Human-readable error description.
            field: Name of the invalid configuration field.
            value: The invalid value provided.
            constraint: Description of the validation constraint.

        Note:
            Context fields use keyword-only syntax to ensure explicit labeling
            and prevent positional argument mistakes.
        """
        super().__init__(message)
        self.field = field
        self.value = value
        self.constraint = constraint

    def __str__(self) -> str:
        """Return string representation with context.

        Returns:
            Formatted error message including field and value context.

        Note:
            Outputs formatted error message with field, value, and constraint
            context when available, preserving base message structure.
        """
        base = super().__str__()
        return (
            f"{base} [field={self.field!r}, value={self.value!r}, "
            f"constraint={self.constraint!r}]"
        )

__init__

__init__(
    message: str,
    *,
    field: str,
    value: object,
    constraint: str,
) -> None

Initialize MultiAgentValidationError with context.

PARAMETER DESCRIPTION
message

Human-readable error description.

TYPE: str

field

Name of the invalid configuration field.

TYPE: str

value

The invalid value provided.

TYPE: object

constraint

Description of the validation constraint.

TYPE: str

Note

Context fields use keyword-only syntax to ensure explicit labeling and prevent positional argument mistakes.

Source code in src/gepa_adk/domain/exceptions.py
def __init__(
    self,
    message: str,
    *,
    field: str,
    value: object,
    constraint: str,
) -> None:
    """Initialize MultiAgentValidationError with context.

    Args:
        message: Human-readable error description.
        field: Name of the invalid configuration field.
        value: The invalid value provided.
        constraint: Description of the validation constraint.

    Note:
        Context fields use keyword-only syntax to ensure explicit labeling
        and prevent positional argument mistakes.
    """
    super().__init__(message)
    self.field = field
    self.value = value
    self.constraint = constraint

__str__

__str__() -> str

Return string representation with context.

RETURNS DESCRIPTION
str

Formatted error message including field and value context.

Note

Outputs formatted error message with field, value, and constraint context when available, preserving base message structure.

Source code in src/gepa_adk/domain/exceptions.py
def __str__(self) -> str:
    """Return string representation with context.

    Returns:
        Formatted error message including field and value context.

    Note:
        Outputs formatted error message with field, value, and constraint
        context when available, preserving base message structure.
    """
    base = super().__str__()
    return (
        f"{base} [field={self.field!r}, value={self.value!r}, "
        f"constraint={self.constraint!r}]"
    )

NoCandidateAvailableError

Bases: EvolutionError


              flowchart TD
              gepa_adk.domain.NoCandidateAvailableError[NoCandidateAvailableError]
              gepa_adk.domain.exceptions.EvolutionError[EvolutionError]

                              gepa_adk.domain.exceptions.EvolutionError --> gepa_adk.domain.NoCandidateAvailableError
                


              click gepa_adk.domain.NoCandidateAvailableError href "" "gepa_adk.domain.NoCandidateAvailableError"
              click gepa_adk.domain.exceptions.EvolutionError href "" "gepa_adk.domain.exceptions.EvolutionError"
            

Raised when no candidates are available for selection.

ATTRIBUTE DESCRIPTION
cause

Original exception that caused this error.

TYPE: Exception | None

context

Extra context (candidate_idx, frontier_type).

TYPE: dict[str, object]

Examples:

raise NoCandidateAvailableError(
    "No candidates available",
    frontier_type="instance",
)
Note

Arises when candidate selector cannot find any valid candidates from the Pareto frontier, typically due to empty frontier or filtering constraints.

Source code in src/gepa_adk/domain/exceptions.py
class NoCandidateAvailableError(EvolutionError):
    """Raised when no candidates are available for selection.

    Attributes:
        cause (Exception | None): Original exception that caused this error.
        context (dict[str, object]): Extra context (candidate_idx, frontier_type).

    Examples:
        ```python
        raise NoCandidateAvailableError(
            "No candidates available",
            frontier_type="instance",
        )
        ```

    Note:
        Arises when candidate selector cannot find any valid candidates
        from the Pareto frontier, typically due to empty frontier or
        filtering constraints.
    """

    def __init__(
        self,
        message: str,
        *,
        cause: Exception | None = None,
        **context: object,
    ) -> None:
        """Initialize NoCandidateAvailableError with context.

        Args:
            message: Human-readable error description.
            cause: Original exception that caused this error.
            **context: Additional context such as candidate_idx or frontier_type.

        Note:
            Context fields use keyword-only syntax to ensure explicit labeling
            and prevent positional argument mistakes.
        """
        super().__init__(message)
        self.cause = cause
        self.context = context

    def __str__(self) -> str:
        """Return string with context and cause details.

        Returns:
            Formatted error message including context and cause information.

        Note:
            Outputs formatted error message with context dict and cause chain
            when available, preserving base message structure.
        """
        base = super().__str__()
        if self.context:
            ctx_str = ", ".join(f"{k}={v!r}" for k, v in self.context.items())
            base = f"{base} [{ctx_str}]"
        if self.cause:
            base = f"{base} (caused by: {self.cause})"
        return base

__init__

__init__(
    message: str,
    *,
    cause: Exception | None = None,
    **context: object,
) -> None

Initialize NoCandidateAvailableError with context.

PARAMETER DESCRIPTION
message

Human-readable error description.

TYPE: str

cause

Original exception that caused this error.

TYPE: Exception | None DEFAULT: None

**context

Additional context such as candidate_idx or frontier_type.

TYPE: object DEFAULT: {}

Note

Context fields use keyword-only syntax to ensure explicit labeling and prevent positional argument mistakes.

Source code in src/gepa_adk/domain/exceptions.py
def __init__(
    self,
    message: str,
    *,
    cause: Exception | None = None,
    **context: object,
) -> None:
    """Initialize NoCandidateAvailableError with context.

    Args:
        message: Human-readable error description.
        cause: Original exception that caused this error.
        **context: Additional context such as candidate_idx or frontier_type.

    Note:
        Context fields use keyword-only syntax to ensure explicit labeling
        and prevent positional argument mistakes.
    """
    super().__init__(message)
    self.cause = cause
    self.context = context

__str__

__str__() -> str

Return string with context and cause details.

RETURNS DESCRIPTION
str

Formatted error message including context and cause information.

Note

Outputs formatted error message with context dict and cause chain when available, preserving base message structure.

Source code in src/gepa_adk/domain/exceptions.py
def __str__(self) -> str:
    """Return string with context and cause details.

    Returns:
        Formatted error message including context and cause information.

    Note:
        Outputs formatted error message with context dict and cause chain
        when available, preserving base message structure.
    """
    base = super().__str__()
    if self.context:
        ctx_str = ", ".join(f"{k}={v!r}" for k, v in self.context.items())
        base = f"{base} [{ctx_str}]"
    if self.cause:
        base = f"{base} (caused by: {self.cause})"
    return base

ScoringError

Bases: EvolutionError


              flowchart TD
              gepa_adk.domain.ScoringError[ScoringError]
              gepa_adk.domain.exceptions.EvolutionError[EvolutionError]

                              gepa_adk.domain.exceptions.EvolutionError --> gepa_adk.domain.ScoringError
                


              click gepa_adk.domain.ScoringError href "" "gepa_adk.domain.ScoringError"
              click gepa_adk.domain.exceptions.EvolutionError href "" "gepa_adk.domain.exceptions.EvolutionError"
            

Base exception for all scoring-related errors.

All scoring exceptions inherit from this class to allow for unified exception handling in scoring operations.

ATTRIBUTE DESCRIPTION
cause

Original exception that caused this error.

TYPE: Exception | None

Examples:

Catching scoring errors:

from gepa_adk.domain.exceptions import ScoringError

try:
    score, metadata = await scorer.async_score(...)
except ScoringError as e:
    print(f"Scoring failed: {e}")
Note

All scoring exceptions inherit from this base class. ScoringError extends EvolutionError, following ADR-009 exception hierarchy guidelines.

Source code in src/gepa_adk/domain/exceptions.py
class ScoringError(EvolutionError):
    """Base exception for all scoring-related errors.

    All scoring exceptions inherit from this class to allow for unified
    exception handling in scoring operations.

    Attributes:
        cause (Exception | None): Original exception that caused this error.

    Examples:
        Catching scoring errors:

        ```python
        from gepa_adk.domain.exceptions import ScoringError

        try:
            score, metadata = await scorer.async_score(...)
        except ScoringError as e:
            print(f"Scoring failed: {e}")
        ```

    Note:
        All scoring exceptions inherit from this base class.
        ScoringError extends EvolutionError, following ADR-009 exception
        hierarchy guidelines.
    """

    def __init__(
        self,
        message: str,
        *,
        cause: Exception | None = None,
    ) -> None:
        """Initialize ScoringError with message and optional cause.

        Args:
            message: Human-readable error description.
            cause: Original exception that caused this error.

        Note:
            Context fields use keyword-only syntax to ensure explicit labeling
            and prevent positional argument mistakes.
        """
        super().__init__(message)
        self.cause = cause

    def __str__(self) -> str:
        """Return string with cause chain if present.

        Returns:
            Formatted message with cause information.

        Note:
            Outputs formatted error message with cause chain when available,
            preserving base message structure.
        """
        base = super().__str__()
        if self.cause:
            base = f"{base} (caused by: {self.cause})"
        return base

__init__

__init__(
    message: str, *, cause: Exception | None = None
) -> None

Initialize ScoringError with message and optional cause.

PARAMETER DESCRIPTION
message

Human-readable error description.

TYPE: str

cause

Original exception that caused this error.

TYPE: Exception | None DEFAULT: None

Note

Context fields use keyword-only syntax to ensure explicit labeling and prevent positional argument mistakes.

Source code in src/gepa_adk/domain/exceptions.py
def __init__(
    self,
    message: str,
    *,
    cause: Exception | None = None,
) -> None:
    """Initialize ScoringError with message and optional cause.

    Args:
        message: Human-readable error description.
        cause: Original exception that caused this error.

    Note:
        Context fields use keyword-only syntax to ensure explicit labeling
        and prevent positional argument mistakes.
    """
    super().__init__(message)
    self.cause = cause

__str__

__str__() -> str

Return string with cause chain if present.

RETURNS DESCRIPTION
str

Formatted message with cause information.

Note

Outputs formatted error message with cause chain when available, preserving base message structure.

Source code in src/gepa_adk/domain/exceptions.py
def __str__(self) -> str:
    """Return string with cause chain if present.

    Returns:
        Formatted message with cause information.

    Note:
        Outputs formatted error message with cause chain when available,
        preserving base message structure.
    """
    base = super().__str__()
    if self.cause:
        base = f"{base} (caused by: {self.cause})"
    return base

VideoValidationError

Bases: ConfigurationError


              flowchart TD
              gepa_adk.domain.VideoValidationError[VideoValidationError]
              gepa_adk.domain.exceptions.ConfigurationError[ConfigurationError]
              gepa_adk.domain.exceptions.EvolutionError[EvolutionError]

                              gepa_adk.domain.exceptions.ConfigurationError --> gepa_adk.domain.VideoValidationError
                                gepa_adk.domain.exceptions.EvolutionError --> gepa_adk.domain.exceptions.ConfigurationError
                



              click gepa_adk.domain.VideoValidationError href "" "gepa_adk.domain.VideoValidationError"
              click gepa_adk.domain.exceptions.ConfigurationError href "" "gepa_adk.domain.exceptions.ConfigurationError"
              click gepa_adk.domain.exceptions.EvolutionError href "" "gepa_adk.domain.exceptions.EvolutionError"
            

Raised when video file validation fails.

This exception is raised during video file processing when a video file does not exist, exceeds size limits, or has an invalid MIME type.

ATTRIBUTE DESCRIPTION
video_path

The path to the video file that failed validation.

TYPE: str

field

The configuration field name (default "video").

TYPE: str

constraint

Description of the validation constraint that was violated.

TYPE: str

Examples:

Raising a video validation error:

from gepa_adk.domain.exceptions import VideoValidationError

raise VideoValidationError(
    "Video file not found",
    video_path="/path/to/missing.mp4",
    constraint="file must exist",
)

Handling video validation errors:

from gepa_adk.domain.exceptions import VideoValidationError

try:
    await video_service.prepare_video_parts(["/bad/path.mp4"])
except VideoValidationError as e:
    print(f"Invalid video: {e.video_path}")
    print(f"Constraint violated: {e.constraint}")
Note

Arises from video file validation failures during multimodal input processing. File existence, size limits (2GB), and MIME type (video/*) are validated before loading video content.

Source code in src/gepa_adk/domain/exceptions.py
class VideoValidationError(ConfigurationError):
    """Raised when video file validation fails.

    This exception is raised during video file processing when a video
    file does not exist, exceeds size limits, or has an invalid MIME type.

    Attributes:
        video_path (str): The path to the video file that failed validation.
        field (str): The configuration field name (default "video").
        constraint (str): Description of the validation constraint that was violated.

    Examples:
        Raising a video validation error:

        ```python
        from gepa_adk.domain.exceptions import VideoValidationError

        raise VideoValidationError(
            "Video file not found",
            video_path="/path/to/missing.mp4",
            constraint="file must exist",
        )
        ```

        Handling video validation errors:

        ```python
        from gepa_adk.domain.exceptions import VideoValidationError

        try:
            await video_service.prepare_video_parts(["/bad/path.mp4"])
        except VideoValidationError as e:
            print(f"Invalid video: {e.video_path}")
            print(f"Constraint violated: {e.constraint}")
        ```

    Note:
        Arises from video file validation failures during multimodal input
        processing. File existence, size limits (2GB), and MIME type
        (video/*) are validated before loading video content.
    """

    def __init__(
        self,
        message: str,
        *,
        video_path: str,
        field: str = "video",
        constraint: str,
    ) -> None:
        """Initialize VideoValidationError with video file context.

        Args:
            message: Human-readable error description.
            video_path: The path to the video file that failed validation.
            field: Name of the configuration field (default "video").
            constraint: Description of the validation constraint violated.

        Note:
            Context fields use keyword-only syntax to ensure explicit labeling
            and prevent positional argument mistakes.
        """
        super().__init__(message, field=field, value=video_path, constraint=constraint)
        self.video_path = video_path

    def __str__(self) -> str:
        """Return string representation with video path context.

        Returns:
            Formatted error message including video_path and constraint.

        Note:
            Outputs formatted error message with video_path for easy
            identification of the problematic file in error logs.
        """
        base = Exception.__str__(self)
        return (
            f"{base} [video_path={self.video_path!r}, constraint={self.constraint!r}]"
        )

__init__

__init__(
    message: str,
    *,
    video_path: str,
    field: str = "video",
    constraint: str,
) -> None

Initialize VideoValidationError with video file context.

PARAMETER DESCRIPTION
message

Human-readable error description.

TYPE: str

video_path

The path to the video file that failed validation.

TYPE: str

field

Name of the configuration field (default "video").

TYPE: str DEFAULT: 'video'

constraint

Description of the validation constraint violated.

TYPE: str

Note

Context fields use keyword-only syntax to ensure explicit labeling and prevent positional argument mistakes.

Source code in src/gepa_adk/domain/exceptions.py
def __init__(
    self,
    message: str,
    *,
    video_path: str,
    field: str = "video",
    constraint: str,
) -> None:
    """Initialize VideoValidationError with video file context.

    Args:
        message: Human-readable error description.
        video_path: The path to the video file that failed validation.
        field: Name of the configuration field (default "video").
        constraint: Description of the validation constraint violated.

    Note:
        Context fields use keyword-only syntax to ensure explicit labeling
        and prevent positional argument mistakes.
    """
    super().__init__(message, field=field, value=video_path, constraint=constraint)
    self.video_path = video_path

__str__

__str__() -> str

Return string representation with video path context.

RETURNS DESCRIPTION
str

Formatted error message including video_path and constraint.

Note

Outputs formatted error message with video_path for easy identification of the problematic file in error logs.

Source code in src/gepa_adk/domain/exceptions.py
def __str__(self) -> str:
    """Return string representation with video path context.

    Returns:
        Formatted error message including video_path and constraint.

    Note:
        Outputs formatted error message with video_path for easy
        identification of the problematic file in error logs.
    """
    base = Exception.__str__(self)
    return (
        f"{base} [video_path={self.video_path!r}, constraint={self.constraint!r}]"
    )

Candidate dataclass

Represents an instruction candidate being evolved.

Unlike GEPA's simple dict[str, str] type alias, this class provides richer state tracking for async scenarios including lineage and metadata.

ATTRIBUTE DESCRIPTION
components

Component name to text value mapping. Common keys include 'instruction' (main agent prompt) and 'output_schema'.

TYPE: dict[str, str]

generation

Generation number in the evolution lineage (0 = initial).

TYPE: int

parent_id

ID of the parent candidate for lineage tracking (legacy field, retained for compatibility).

TYPE: str | None

parent_ids

Multi-parent indices for merge operations. None for seed candidates, [single_idx] for mutations, [idx1, idx2] for merges.

TYPE: list[int] | None

metadata

Extensible metadata dict for async tracking and debugging.

TYPE: dict[str, Any]

Examples:

Creating a candidate:

from gepa_adk.domain.models import Candidate

candidate = Candidate(
    components={"instruction": "Be helpful"},
    generation=0,
)
print(candidate.components["instruction"])  # Be helpful
print(candidate.generation)  # 0
Note

A mutable candidate representation with richer state tracking than GEPA's simple dict. Components and metadata can be modified during the evolution process. Use generation and parent_id to track lineage.

Source code in src/gepa_adk/domain/models.py
@dataclass(slots=True, kw_only=True)
class Candidate:
    """Represents an instruction candidate being evolved.

    Unlike GEPA's simple `dict[str, str]` type alias, this class provides
    richer state tracking for async scenarios including lineage and metadata.

    Attributes:
        components (dict[str, str]): Component name to text value mapping.
            Common keys include 'instruction' (main agent prompt) and
            'output_schema'.
        generation (int): Generation number in the evolution lineage
            (0 = initial).
        parent_id (str | None): ID of the parent candidate for lineage
            tracking (legacy field, retained for compatibility).
        parent_ids (list[int] | None): Multi-parent indices for merge operations.
            None for seed candidates, [single_idx] for mutations, [idx1, idx2] for merges.
        metadata (dict[str, Any]): Extensible metadata dict for async tracking
            and debugging.

    Examples:
        Creating a candidate:

        ```python
        from gepa_adk.domain.models import Candidate

        candidate = Candidate(
            components={"instruction": "Be helpful"},
            generation=0,
        )
        print(candidate.components["instruction"])  # Be helpful
        print(candidate.generation)  # 0
        ```

    Note:
        A mutable candidate representation with richer state tracking than
        GEPA's simple dict. Components and metadata can be modified during
        the evolution process. Use generation and parent_id to track lineage.
    """

    components: dict[str, str] = field(default_factory=dict)
    generation: int = 0
    parent_id: str | None = None
    parent_ids: list[int] | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

EvolutionConfig dataclass

Configuration parameters for an evolution run.

Defines the parameters that control how evolution proceeds, including iteration limits, concurrency settings, and stopping criteria.

ATTRIBUTE DESCRIPTION
max_iterations

Maximum number of evolution iterations. 0 means just evaluate baseline without evolving.

TYPE: int

max_concurrent_evals

Number of concurrent batch evaluations. Must be at least 1.

TYPE: int

min_improvement_threshold

Minimum score improvement to accept a new candidate. Set to 0.0 to accept any improvement.

TYPE: float

patience

Number of iterations without improvement before stopping early. Set to 0 to disable early stopping.

TYPE: int

reflection_model

Model identifier for reflection/mutation operations.

TYPE: str

frontier_type

Frontier tracking strategy for Pareto selection (default: INSTANCE).

TYPE: FrontierType

acceptance_metric

Aggregation method for acceptance decisions on iteration evaluation batches. "sum" uses sum of scores (default, aligns with upstream GEPA). "mean" uses mean of scores (legacy behavior).

TYPE: Literal['sum', 'mean']

use_merge

Enable merge proposals for genetic crossover. Defaults to False.

TYPE: bool

max_merge_invocations

Maximum number of merge attempts per run. Defaults to 10. Must be non-negative.

TYPE: int

reflection_prompt

Custom reflection/mutation prompt template. If provided, this template is used instead of the default when the reflection model proposes improved text. Required placeholders: - {component_text}: The current component text being evolved - {trials}: Trial data with feedback and trajectory for each test case If None or empty string, the default prompt template is used.

TYPE: str | None

stop_callbacks

List of stopper callbacks for custom stop conditions. Each callback receives a StopperState and returns True to signal stop. Defaults to an empty list.

TYPE: list[StopperProtocol]

Examples:

Creating a configuration with defaults:

from gepa_adk.domain.models import EvolutionConfig

config = EvolutionConfig(max_iterations=100, patience=10)
print(config.max_iterations)  # 100
print(config.reflection_model)  # ollama_chat/gpt-oss:20b
Note

All numeric parameters are validated in post_init to ensure they meet their constraints. Invalid values raise ConfigurationError.

Source code in src/gepa_adk/domain/models.py
@dataclass(slots=True, kw_only=True)
class EvolutionConfig:
    """Configuration parameters for an evolution run.

    Defines the parameters that control how evolution proceeds, including
    iteration limits, concurrency settings, and stopping criteria.

    Attributes:
        max_iterations (int): Maximum number of evolution iterations. 0 means
            just evaluate baseline without evolving.
        max_concurrent_evals (int): Number of concurrent batch evaluations.
            Must be at least 1.
        min_improvement_threshold (float): Minimum score improvement to accept
            a new candidate. Set to 0.0 to accept any improvement.
        patience (int): Number of iterations without improvement before stopping
            early. Set to 0 to disable early stopping.
        reflection_model (str): Model identifier for reflection/mutation
            operations.
        frontier_type (FrontierType): Frontier tracking strategy for Pareto
            selection (default: INSTANCE).
        acceptance_metric (Literal["sum", "mean"]): Aggregation method for
            acceptance decisions on iteration evaluation batches. "sum" uses
            sum of scores (default, aligns with upstream GEPA). "mean" uses
            mean of scores (legacy behavior).
        use_merge (bool): Enable merge proposals for genetic crossover.
            Defaults to False.
        max_merge_invocations (int): Maximum number of merge attempts per run.
            Defaults to 10. Must be non-negative.
        reflection_prompt (str | None): Custom reflection/mutation prompt template.
            If provided, this template is used instead of the default when the
            reflection model proposes improved text. Required placeholders:
            - {component_text}: The current component text being evolved
            - {trials}: Trial data with feedback and trajectory for each test case
            If None or empty string, the default prompt template is used.
        stop_callbacks (list[StopperProtocol]): List of stopper callbacks for
            custom stop conditions. Each callback receives a StopperState and
            returns True to signal stop. Defaults to an empty list.

    Examples:
        Creating a configuration with defaults:

        ```python
        from gepa_adk.domain.models import EvolutionConfig

        config = EvolutionConfig(max_iterations=100, patience=10)
        print(config.max_iterations)  # 100
        print(config.reflection_model)  # ollama_chat/gpt-oss:20b
        ```

    Note:
        All numeric parameters are validated in __post_init__ to ensure
        they meet their constraints. Invalid values raise ConfigurationError.
    """

    max_iterations: int = 50
    max_concurrent_evals: int = 5
    min_improvement_threshold: float = 0.01
    patience: int = 5
    reflection_model: str = "ollama_chat/gpt-oss:20b"
    frontier_type: FrontierType = FrontierType.INSTANCE
    acceptance_metric: Literal["sum", "mean"] = "sum"
    use_merge: bool = False
    max_merge_invocations: int = 10
    reflection_prompt: str | None = None
    stop_callbacks: list["StopperProtocol"] = field(default_factory=list)

    def __post_init__(self) -> None:
        """Validate configuration parameters after initialization.

        Raises:
            ConfigurationError: If any parameter violates its constraints.

        Note:
            Operates automatically after dataclass __init__ completes. Validates
            all fields and raises ConfigurationError with context on failure.
        """
        if self.max_iterations < 0:
            raise ConfigurationError(
                "max_iterations must be non-negative",
                field="max_iterations",
                value=self.max_iterations,
                constraint=">= 0",
            )

        if self.max_concurrent_evals < 1:
            raise ConfigurationError(
                "max_concurrent_evals must be at least 1",
                field="max_concurrent_evals",
                value=self.max_concurrent_evals,
                constraint=">= 1",
            )

        if self.min_improvement_threshold < 0.0:
            raise ConfigurationError(
                "min_improvement_threshold must be non-negative",
                field="min_improvement_threshold",
                value=self.min_improvement_threshold,
                constraint=">= 0.0",
            )

        if self.patience < 0:
            raise ConfigurationError(
                "patience must be non-negative",
                field="patience",
                value=self.patience,
                constraint=">= 0",
            )

        if not self.reflection_model:
            raise ConfigurationError(
                "reflection_model must be a non-empty string",
                field="reflection_model",
                value=self.reflection_model,
                constraint="non-empty string",
            )

        if not isinstance(self.frontier_type, FrontierType):
            try:
                self.frontier_type = FrontierType(self.frontier_type)
            except ValueError as exc:
                raise ConfigurationError(
                    "frontier_type must be a supported FrontierType value",
                    field="frontier_type",
                    value=self.frontier_type,
                    constraint=", ".join(t.value for t in FrontierType),
                ) from exc

        if self.acceptance_metric not in ("sum", "mean"):
            raise ConfigurationError(
                "acceptance_metric must be 'sum' or 'mean'",
                field="acceptance_metric",
                value=self.acceptance_metric,
                constraint="sum|mean",
            )

        if self.max_merge_invocations < 0:
            raise ConfigurationError(
                "max_merge_invocations must be non-negative",
                field="max_merge_invocations",
                value=self.max_merge_invocations,
                constraint=">= 0",
            )

        # Validate reflection_prompt if provided
        self._validate_reflection_prompt()

    def _validate_reflection_prompt(self) -> None:
        """Validate reflection_prompt and handle empty string.

        Converts empty string to None with info log. Warns if required
        placeholders are missing but allows the config to be created.

        Note:
            Soft validation approach - missing placeholders trigger warnings
            but don't prevent config creation for maximum flexibility.
        """
        # Handle empty string as "use default"
        if self.reflection_prompt == "":
            logger.info(
                "config.reflection_prompt.empty",
                message="Empty reflection_prompt provided, using default template",
            )
            # Use object.__setattr__ because slots=True prevents direct assignment
            object.__setattr__(self, "reflection_prompt", None)
            return

        # Skip validation if None
        if self.reflection_prompt is None:
            return

        # Warn about missing placeholders
        if "{component_text}" not in self.reflection_prompt:
            logger.warning(
                "config.reflection_prompt.missing_placeholder",
                placeholder="component_text",
                message="reflection_prompt is missing {component_text} placeholder",
            )

        if "{trials}" not in self.reflection_prompt:
            logger.warning(
                "config.reflection_prompt.missing_placeholder",
                placeholder="trials",
                message="reflection_prompt is missing {trials} placeholder",
            )

__post_init__

__post_init__() -> None

Validate configuration parameters after initialization.

RAISES DESCRIPTION
ConfigurationError

If any parameter violates its constraints.

Note

Operates automatically after dataclass init completes. Validates all fields and raises ConfigurationError with context on failure.

Source code in src/gepa_adk/domain/models.py
def __post_init__(self) -> None:
    """Validate configuration parameters after initialization.

    Raises:
        ConfigurationError: If any parameter violates its constraints.

    Note:
        Operates automatically after dataclass __init__ completes. Validates
        all fields and raises ConfigurationError with context on failure.
    """
    if self.max_iterations < 0:
        raise ConfigurationError(
            "max_iterations must be non-negative",
            field="max_iterations",
            value=self.max_iterations,
            constraint=">= 0",
        )

    if self.max_concurrent_evals < 1:
        raise ConfigurationError(
            "max_concurrent_evals must be at least 1",
            field="max_concurrent_evals",
            value=self.max_concurrent_evals,
            constraint=">= 1",
        )

    if self.min_improvement_threshold < 0.0:
        raise ConfigurationError(
            "min_improvement_threshold must be non-negative",
            field="min_improvement_threshold",
            value=self.min_improvement_threshold,
            constraint=">= 0.0",
        )

    if self.patience < 0:
        raise ConfigurationError(
            "patience must be non-negative",
            field="patience",
            value=self.patience,
            constraint=">= 0",
        )

    if not self.reflection_model:
        raise ConfigurationError(
            "reflection_model must be a non-empty string",
            field="reflection_model",
            value=self.reflection_model,
            constraint="non-empty string",
        )

    if not isinstance(self.frontier_type, FrontierType):
        try:
            self.frontier_type = FrontierType(self.frontier_type)
        except ValueError as exc:
            raise ConfigurationError(
                "frontier_type must be a supported FrontierType value",
                field="frontier_type",
                value=self.frontier_type,
                constraint=", ".join(t.value for t in FrontierType),
            ) from exc

    if self.acceptance_metric not in ("sum", "mean"):
        raise ConfigurationError(
            "acceptance_metric must be 'sum' or 'mean'",
            field="acceptance_metric",
            value=self.acceptance_metric,
            constraint="sum|mean",
        )

    if self.max_merge_invocations < 0:
        raise ConfigurationError(
            "max_merge_invocations must be non-negative",
            field="max_merge_invocations",
            value=self.max_merge_invocations,
            constraint=">= 0",
        )

    # Validate reflection_prompt if provided
    self._validate_reflection_prompt()

EvolutionResult dataclass

Outcome of a completed evolution run.

Contains the final results after evolution completes, including all evolved component values, performance metrics, and full history.

ATTRIBUTE DESCRIPTION
original_score

Starting performance score (baseline).

TYPE: float

final_score

Ending performance score (best achieved).

TYPE: float

evolved_components

Dictionary mapping component names to their final evolved text values. Keys include "instruction" and optionally "output_schema" or other components. Access individual components via result.evolved_components["instruction"].

TYPE: dict[str, str]

iteration_history

Chronological list of iteration records.

TYPE: list[IterationRecord]

total_iterations

Number of iterations performed.

TYPE: int

valset_score

Score on validation set used for acceptance decisions. None if no validation set was used.

TYPE: float | None

trainset_score

Score on trainset used for reflection diagnostics. None if not computed.

TYPE: float | None

objective_scores

Optional per-example multi-objective scores from the best candidate's final evaluation. None when no objective scores were tracked. Each dict maps objective name to score value. Index-aligned with evaluation batch examples.

TYPE: list[dict[str, float]] | None

Examples:

Creating and analyzing a result:

from gepa_adk.domain.models import EvolutionResult, IterationRecord

result = EvolutionResult(
    original_score=0.60,
    final_score=0.85,
    evolved_components={"instruction": "Be helpful and concise"},
    iteration_history=[],
    total_iterations=10,
)
print(result.evolved_components["instruction"])  # "Be helpful and concise"
print(result.improvement)  # 0.25
print(result.improved)  # True
Note

As a frozen dataclass, EvolutionResult instances cannot be modified.

Source code in src/gepa_adk/domain/models.py
@dataclass(slots=True, frozen=True, kw_only=True)
class EvolutionResult:
    """Outcome of a completed evolution run.

    Contains the final results after evolution completes, including
    all evolved component values, performance metrics, and full history.

    Attributes:
        original_score (float): Starting performance score (baseline).
        final_score (float): Ending performance score (best achieved).
        evolved_components (dict[str, str]): Dictionary mapping component names
            to their final evolved text values. Keys include "instruction" and
            optionally "output_schema" or other components. Access individual
            components via ``result.evolved_components["instruction"]``.
        iteration_history (list[IterationRecord]): Chronological list of
            iteration records.
        total_iterations (int): Number of iterations performed.
        valset_score (float | None): Score on validation set used for
            acceptance decisions. None if no validation set was used.
        trainset_score (float | None): Score on trainset used for reflection
            diagnostics. None if not computed.
        objective_scores (list[dict[str, float]] | None): Optional per-example
            multi-objective scores from the best candidate's final evaluation.
            None when no objective scores were tracked. Each dict maps objective
            name to score value. Index-aligned with evaluation batch examples.

    Examples:
        Creating and analyzing a result:

        ```python
        from gepa_adk.domain.models import EvolutionResult, IterationRecord

        result = EvolutionResult(
            original_score=0.60,
            final_score=0.85,
            evolved_components={"instruction": "Be helpful and concise"},
            iteration_history=[],
            total_iterations=10,
        )
        print(result.evolved_components["instruction"])  # "Be helpful and concise"
        print(result.improvement)  # 0.25
        print(result.improved)  # True
        ```

    Note:
        As a frozen dataclass, EvolutionResult instances cannot be modified.
    """

    original_score: float
    final_score: float
    evolved_components: dict[str, str]
    iteration_history: list[IterationRecord]
    total_iterations: int
    valset_score: float | None = None
    trainset_score: float | None = None
    objective_scores: list[dict[str, float]] | None = None

    @property
    def improvement(self) -> float:
        """Calculate the score improvement from original to final.

        Returns:
            The difference between final_score and original_score.
            Positive values indicate improvement, negative indicates degradation.

        Note:
            Override is not needed since frozen dataclasses support properties.
        """
        return self.final_score - self.original_score

    @property
    def improved(self) -> bool:
        """Check if the final score is better than the original.

        Returns:
            True if final_score > original_score, False otherwise.

        Note:
            Only returns True for strict improvement, not equal scores.
        """
        return self.final_score > self.original_score

improvement property

improvement: float

Calculate the score improvement from original to final.

RETURNS DESCRIPTION
float

The difference between final_score and original_score.

float

Positive values indicate improvement, negative indicates degradation.

Note

Override is not needed since frozen dataclasses support properties.

improved property

improved: bool

Check if the final score is better than the original.

RETURNS DESCRIPTION
bool

True if final_score > original_score, False otherwise.

Note

Only returns True for strict improvement, not equal scores.

IterationRecord dataclass

Captures metrics for a single evolution iteration.

This is an immutable record of what happened during one iteration of the evolution process. Records are created by the engine and stored in EvolutionResult.iteration_history.

ATTRIBUTE DESCRIPTION
iteration_number

1-indexed iteration number for human readability.

TYPE: int

score

Score achieved in this iteration (typically in [0.0, 1.0]).

TYPE: float

component_text

The component_text that was evaluated in this iteration (e.g., the instruction text for the "instruction" component).

TYPE: str

evolved_component

The name of the component that was evolved in this iteration (e.g., "instruction", "output_schema"). Used for tracking which component changed in round-robin evolution strategies.

TYPE: str

accepted

Whether this proposal was accepted as the new best.

TYPE: bool

objective_scores

Optional per-example multi-objective scores from the valset evaluation. None when adapter does not provide objective scores. Each dict maps objective name to score value. Index-aligned with evaluation batch examples.

TYPE: list[dict[str, float]] | None

Examples:

Creating an iteration record:

from gepa_adk.domain.models import IterationRecord

record = IterationRecord(
    iteration_number=1,
    score=0.85,
    component_text="Be helpful",
    evolved_component="instruction",
    accepted=True,
)
print(record.score)  # 0.85
print(record.evolved_component)  # "instruction"
print(record.accepted)  # True
Note

An immutable record that captures iteration metrics. Once created, IterationRecord instances cannot be modified, ensuring historical accuracy of the evolution trace.

Source code in src/gepa_adk/domain/models.py
@dataclass(slots=True, frozen=True, kw_only=True)
class IterationRecord:
    """Captures metrics for a single evolution iteration.

    This is an immutable record of what happened during one iteration
    of the evolution process. Records are created by the engine and
    stored in EvolutionResult.iteration_history.

    Attributes:
        iteration_number (int): 1-indexed iteration number for human
            readability.
        score (float): Score achieved in this iteration (typically in
            [0.0, 1.0]).
        component_text (str): The component_text that was evaluated in this
            iteration (e.g., the instruction text for the "instruction" component).
        evolved_component (str): The name of the component that was evolved
            in this iteration (e.g., "instruction", "output_schema"). Used for
            tracking which component changed in round-robin evolution strategies.
        accepted (bool): Whether this proposal was accepted as the new best.
        objective_scores (list[dict[str, float]] | None): Optional per-example
            multi-objective scores from the valset evaluation. None when adapter
            does not provide objective scores. Each dict maps objective name to
            score value. Index-aligned with evaluation batch examples.

    Examples:
        Creating an iteration record:

        ```python
        from gepa_adk.domain.models import IterationRecord

        record = IterationRecord(
            iteration_number=1,
            score=0.85,
            component_text="Be helpful",
            evolved_component="instruction",
            accepted=True,
        )
        print(record.score)  # 0.85
        print(record.evolved_component)  # "instruction"
        print(record.accepted)  # True
        ```

    Note:
        An immutable record that captures iteration metrics. Once created,
        IterationRecord instances cannot be modified, ensuring historical
        accuracy of the evolution trace.
    """

    iteration_number: int
    score: float
    component_text: str
    evolved_component: str
    accepted: bool
    objective_scores: list[dict[str, float]] | None = None

MultiAgentEvolutionResult dataclass

Outcome of a completed multi-agent evolution run.

Contains evolved component_text for all agents in the group, along with performance metrics and evolution history.

ATTRIBUTE DESCRIPTION
evolved_components

Mapping of agent name to evolved component_text.

TYPE: dict[str, str]

original_score

Starting performance score (baseline).

TYPE: float

final_score

Ending performance score (best achieved).

TYPE: float

primary_agent

Name of the agent whose output was used for scoring.

TYPE: str

iteration_history

Chronological list of iteration records.

TYPE: list[IterationRecord]

total_iterations

Number of iterations performed.

TYPE: int

Examples:

Creating and analyzing a multi-agent result:

from gepa_adk.domain.models import MultiAgentEvolutionResult, IterationRecord

result = MultiAgentEvolutionResult(
    evolved_components={
        "generator": "Generate high-quality code",
        "critic": "Review code thoroughly",
    },
    original_score=0.60,
    final_score=0.85,
    primary_agent="generator",
    iteration_history=[],
    total_iterations=10,
)
print(result.improvement)  # 0.25
print(result.improved)  # True
print(result.agent_names)  # ["critic", "generator"]
Note

An immutable result container for multi-agent evolution. Once created, MultiAgentEvolutionResult instances cannot be modified. Use computed properties like improvement, improved, and agent_names to analyze results without modifying the underlying data.

Source code in src/gepa_adk/domain/models.py
@dataclass(slots=True, frozen=True, kw_only=True)
class MultiAgentEvolutionResult:
    """Outcome of a completed multi-agent evolution run.

    Contains evolved component_text for all agents in the group,
    along with performance metrics and evolution history.

    Attributes:
        evolved_components (dict[str, str]): Mapping of agent name to evolved
            component_text.
        original_score (float): Starting performance score (baseline).
        final_score (float): Ending performance score (best achieved).
        primary_agent (str): Name of the agent whose output was used for scoring.
        iteration_history (list[IterationRecord]): Chronological list of iteration records.
        total_iterations (int): Number of iterations performed.

    Examples:
        Creating and analyzing a multi-agent result:

        ```python
        from gepa_adk.domain.models import MultiAgentEvolutionResult, IterationRecord

        result = MultiAgentEvolutionResult(
            evolved_components={
                "generator": "Generate high-quality code",
                "critic": "Review code thoroughly",
            },
            original_score=0.60,
            final_score=0.85,
            primary_agent="generator",
            iteration_history=[],
            total_iterations=10,
        )
        print(result.improvement)  # 0.25
        print(result.improved)  # True
        print(result.agent_names)  # ["critic", "generator"]
        ```

    Note:
        An immutable result container for multi-agent evolution. Once created,
        MultiAgentEvolutionResult instances cannot be modified. Use computed
        properties like `improvement`, `improved`, and `agent_names` to analyze
        results without modifying the underlying data.
    """

    evolved_components: dict[str, str]
    original_score: float
    final_score: float
    primary_agent: str
    iteration_history: list[IterationRecord]
    total_iterations: int

    @property
    def improvement(self) -> float:
        """Calculate the score improvement from original to final.

        Returns:
            The difference between final_score and original_score.
            Positive values indicate improvement, negative indicates degradation.

        Note:
            Override is not needed since frozen dataclasses support properties.
        """
        return self.final_score - self.original_score

    @property
    def improved(self) -> bool:
        """Check if the final score is better than the original.

        Returns:
            True if final_score > original_score, False otherwise.

        Note:
            Only returns True for strict improvement, not equal scores.
        """
        return self.final_score > self.original_score

    @property
    def agent_names(self) -> list[str]:
        """Get sorted list of evolved agent names.

        Returns:
            Sorted list of agent names from evolved_components keys.

        Note:
            Outputs a new list each time, sorted alphabetically for
            consistent ordering regardless of insertion order.
        """
        return sorted(self.evolved_components.keys())

improvement property

improvement: float

Calculate the score improvement from original to final.

RETURNS DESCRIPTION
float

The difference between final_score and original_score.

float

Positive values indicate improvement, negative indicates degradation.

Note

Override is not needed since frozen dataclasses support properties.

improved property

improved: bool

Check if the final score is better than the original.

RETURNS DESCRIPTION
bool

True if final_score > original_score, False otherwise.

Note

Only returns True for strict improvement, not equal scores.

agent_names property

agent_names: list[str]

Get sorted list of evolved agent names.

RETURNS DESCRIPTION
list[str]

Sorted list of agent names from evolved_components keys.

Note

Outputs a new list each time, sorted alphabetically for consistent ordering regardless of insertion order.

ParetoFrontier dataclass

Tracks non-dominated candidates across multiple frontier dimensions.

ATTRIBUTE DESCRIPTION
example_leaders

Instance-level: example_idx → leader candidate indices.

TYPE: dict[int, set[int]]

best_scores

Instance-level: example_idx → best score.

TYPE: dict[int, float]

objective_leaders

Objective-level: objective_name → leader candidate indices.

TYPE: dict[str, set[int]]

objective_best_scores

Objective-level: objective_name → best score.

TYPE: dict[str, float]

cartesian_leaders

Cartesian: (example_idx, objective) → leader candidate indices.

TYPE: dict[tuple[int, str], set[int]]

cartesian_best_scores

Cartesian: (example_idx, objective) → best score.

TYPE: dict[tuple[int, str], float]

Note

A frontier stores the best candidate indices per dimension for sampling. The active dimension depends on frontier_type.

Examples:

frontier = ParetoFrontier()
frontier.update(0, {0: 0.8, 1: 0.6})
Source code in src/gepa_adk/domain/state.py
@dataclass(slots=True)
class ParetoFrontier:
    """Tracks non-dominated candidates across multiple frontier dimensions.

    Attributes:
        example_leaders (dict[int, set[int]]): Instance-level: example_idx →
            leader candidate indices.
        best_scores (dict[int, float]): Instance-level: example_idx → best score.
        objective_leaders (dict[str, set[int]]): Objective-level: objective_name →
            leader candidate indices.
        objective_best_scores (dict[str, float]): Objective-level: objective_name →
            best score.
        cartesian_leaders (dict[tuple[int, str], set[int]]): Cartesian:
            (example_idx, objective) → leader candidate indices.
        cartesian_best_scores (dict[tuple[int, str], float]): Cartesian:
            (example_idx, objective) → best score.

    Note:
        A frontier stores the best candidate indices per dimension for sampling.
        The active dimension depends on frontier_type.

    Examples:
        ```python
        frontier = ParetoFrontier()
        frontier.update(0, {0: 0.8, 1: 0.6})
        ```
    """

    example_leaders: dict[int, set[int]] = field(default_factory=dict)
    best_scores: dict[int, float] = field(default_factory=dict)
    objective_leaders: dict[str, set[int]] = field(default_factory=dict)
    objective_best_scores: dict[str, float] = field(default_factory=dict)
    cartesian_leaders: dict[tuple[int, str], set[int]] = field(default_factory=dict)
    cartesian_best_scores: dict[tuple[int, str], float] = field(default_factory=dict)

    def update(
        self,
        candidate_idx: int,
        scores: dict[int, float],
        *,
        logger: FrontierLogger | None = None,
    ) -> None:
        """Update frontier leadership with a candidate's scores.

        Args:
            candidate_idx: Index of the candidate being added.
            scores: Mapping of example index to score.
            logger: Optional structured logger for leader updates.

        Note:
            Outputs updated leader sets and best scores for instance-level
            frontier tracking.

        Examples:
            ```python
            frontier.update(2, {0: 0.7, 1: 0.9})
            ```
        """
        for example_idx, score in scores.items():
            best_score = self.best_scores.get(example_idx)
            if best_score is None or score > best_score:
                previous_leaders = self.example_leaders.get(example_idx, set())
                self.best_scores[example_idx] = score
                self.example_leaders[example_idx] = {candidate_idx}
                if logger is not None:
                    logger.info(
                        "pareto_frontier.leader_updated",
                        example_idx=example_idx,
                        candidate_idx=candidate_idx,
                        score=score,
                        previous_leaders=sorted(previous_leaders),
                    )
            elif score == best_score:
                leaders = self.example_leaders.setdefault(example_idx, set())
                if candidate_idx not in leaders:
                    leaders.add(candidate_idx)
                    if logger is not None:
                        logger.info(
                            "pareto_frontier.leader_tied",
                            example_idx=example_idx,
                            candidate_idx=candidate_idx,
                            score=score,
                        )

    def get_non_dominated(self) -> set[int]:
        """Return candidate indices that lead any example.

        Note:
            Outputs the union of all leader sets across example indices.
        """
        candidates: set[int] = set()
        for leaders in self.example_leaders.values():
            candidates.update(leaders)
        return candidates

    def get_selection_weights(self) -> dict[int, int]:
        """Return selection weights based on leadership frequency.

        Note:
            Outputs weights proportional to how many examples each candidate
            leads, enabling weighted sampling.
        """
        weights: dict[int, int] = {}
        for leaders in self.example_leaders.values():
            for candidate_idx in leaders:
                weights[candidate_idx] = weights.get(candidate_idx, 0) + 1
        return weights

    def update_objective(
        self,
        candidate_idx: int,
        objective_scores: dict[str, float],
        *,
        logger: FrontierLogger | None = None,
    ) -> None:
        """Update objective-level frontier with a candidate's objective scores.

        Args:
            candidate_idx: Index of the candidate being added.
            objective_scores: Mapping of objective name to score.
            logger: Optional structured logger for leader updates.

        Note:
            Outputs updated objective leader sets and best scores for
            objective-level frontier tracking.

        Examples:
            ```python
            frontier.update_objective(0, {"accuracy": 0.9, "latency": 0.7})
            ```
        """
        for objective_name, score in objective_scores.items():
            best_score = self.objective_best_scores.get(objective_name)
            if best_score is None or score > best_score:
                previous_leaders = self.objective_leaders.get(objective_name, set())
                self.objective_best_scores[objective_name] = score
                self.objective_leaders[objective_name] = {candidate_idx}
                if logger is not None:
                    logger.info(
                        "pareto_frontier.objective_leader_updated",
                        objective_name=objective_name,
                        candidate_idx=candidate_idx,
                        score=score,
                        previous_leaders=sorted(previous_leaders),
                    )
            elif score == best_score:
                leaders = self.objective_leaders.setdefault(objective_name, set())
                if candidate_idx not in leaders:
                    leaders.add(candidate_idx)
                    if logger is not None:
                        logger.info(
                            "pareto_frontier.objective_leader_tied",
                            objective_name=objective_name,
                            candidate_idx=candidate_idx,
                            score=score,
                        )

    def update_cartesian(
        self,
        candidate_idx: int,
        scores: dict[int, float],
        objective_scores: dict[int, dict[str, float]],
        *,
        logger: FrontierLogger | None = None,
    ) -> None:
        """Update cartesian frontier per (example, objective) pair.

        Args:
            candidate_idx: Index of the candidate being added.
            scores: Mapping of example index to score.
            objective_scores: Mapping of example index to objective scores dict.
            logger: Optional structured logger for leader updates.

        Note:
            Outputs updated cartesian leader sets and best scores for
            per (example, objective) pair frontier tracking.

        Examples:
            ```python
            frontier.update_cartesian(
                0, {0: 0.8, 1: 0.6}, {0: {"accuracy": 0.9}, 1: {"accuracy": 0.7}}
            )
            ```
        """
        for example_idx, example_objectives in objective_scores.items():
            for objective_name, score in example_objectives.items():
                key = (example_idx, objective_name)
                best_score = self.cartesian_best_scores.get(key)
                if best_score is None or score > best_score:
                    previous_leaders = self.cartesian_leaders.get(key, set())
                    self.cartesian_best_scores[key] = score
                    self.cartesian_leaders[key] = {candidate_idx}
                    if logger is not None:
                        logger.info(
                            "pareto_frontier.cartesian_leader_updated",
                            example_idx=example_idx,
                            objective_name=objective_name,
                            candidate_idx=candidate_idx,
                            score=score,
                            previous_leaders=sorted(previous_leaders),
                        )
                elif score == best_score:
                    leaders = self.cartesian_leaders.setdefault(key, set())
                    if candidate_idx not in leaders:
                        leaders.add(candidate_idx)
                        if logger is not None:
                            logger.info(
                                "pareto_frontier.cartesian_leader_tied",
                                example_idx=example_idx,
                                objective_name=objective_name,
                                candidate_idx=candidate_idx,
                                score=score,
                            )

    def get_pareto_front_mapping(
        self, frontier_type: FrontierType
    ) -> dict[FrontierKey, set[int]]:
        """Return frontier mapping for specified frontier type.

        Args:
            frontier_type (FrontierType): Type of frontier to return mapping for.

        Returns:
            dict[FrontierKey, set[int]]: Mapping from frontier key to set of
                candidate indices.

        Raises:
            ValueError: If frontier_type is not a supported value.

        Note:
            Outputs a mapping with keys appropriate for the frontier type
            (int for INSTANCE, str for OBJECTIVE, tuples for HYBRID/CARTESIAN).

        Examples:
            ```python
            mapping = frontier.get_pareto_front_mapping(FrontierType.INSTANCE)
            # Returns: {0: {1, 2}, 1: {2, 3}}
            ```
        """
        if frontier_type == FrontierType.INSTANCE:
            return {
                key: leaders.copy() for key, leaders in self.example_leaders.items()
            }
        elif frontier_type == FrontierType.OBJECTIVE:
            return {
                key: leaders.copy() for key, leaders in self.objective_leaders.items()
            }
        elif frontier_type == FrontierType.HYBRID:
            mapping: dict[FrontierKey, set[int]] = {}
            # Add instance-level with type tag
            for example_idx, leaders in self.example_leaders.items():
                mapping[("val_id", example_idx)] = leaders.copy()
            # Add objective-level with type tag
            for objective_name, leaders in self.objective_leaders.items():
                mapping[("objective", objective_name)] = leaders.copy()
            return mapping
        elif frontier_type == FrontierType.CARTESIAN:
            mapping: dict[FrontierKey, set[int]] = {}
            for (
                example_idx,
                objective_name,
            ), leaders in self.cartesian_leaders.items():
                mapping[("cartesian", example_idx, objective_name)] = leaders.copy()
            return mapping
        else:
            raise ValueError(f"Unknown frontier type: {frontier_type}")

update

update(
    candidate_idx: int,
    scores: dict[int, float],
    *,
    logger: FrontierLogger | None = None,
) -> None

Update frontier leadership with a candidate's scores.

PARAMETER DESCRIPTION
candidate_idx

Index of the candidate being added.

TYPE: int

scores

Mapping of example index to score.

TYPE: dict[int, float]

logger

Optional structured logger for leader updates.

TYPE: FrontierLogger | None DEFAULT: None

Note

Outputs updated leader sets and best scores for instance-level frontier tracking.

Examples:

frontier.update(2, {0: 0.7, 1: 0.9})
Source code in src/gepa_adk/domain/state.py
def update(
    self,
    candidate_idx: int,
    scores: dict[int, float],
    *,
    logger: FrontierLogger | None = None,
) -> None:
    """Update frontier leadership with a candidate's scores.

    Args:
        candidate_idx: Index of the candidate being added.
        scores: Mapping of example index to score.
        logger: Optional structured logger for leader updates.

    Note:
        Outputs updated leader sets and best scores for instance-level
        frontier tracking.

    Examples:
        ```python
        frontier.update(2, {0: 0.7, 1: 0.9})
        ```
    """
    for example_idx, score in scores.items():
        best_score = self.best_scores.get(example_idx)
        if best_score is None or score > best_score:
            previous_leaders = self.example_leaders.get(example_idx, set())
            self.best_scores[example_idx] = score
            self.example_leaders[example_idx] = {candidate_idx}
            if logger is not None:
                logger.info(
                    "pareto_frontier.leader_updated",
                    example_idx=example_idx,
                    candidate_idx=candidate_idx,
                    score=score,
                    previous_leaders=sorted(previous_leaders),
                )
        elif score == best_score:
            leaders = self.example_leaders.setdefault(example_idx, set())
            if candidate_idx not in leaders:
                leaders.add(candidate_idx)
                if logger is not None:
                    logger.info(
                        "pareto_frontier.leader_tied",
                        example_idx=example_idx,
                        candidate_idx=candidate_idx,
                        score=score,
                    )

get_non_dominated

get_non_dominated() -> set[int]

Return candidate indices that lead any example.

Note

Outputs the union of all leader sets across example indices.

Source code in src/gepa_adk/domain/state.py
def get_non_dominated(self) -> set[int]:
    """Return candidate indices that lead any example.

    Note:
        Outputs the union of all leader sets across example indices.
    """
    candidates: set[int] = set()
    for leaders in self.example_leaders.values():
        candidates.update(leaders)
    return candidates

get_selection_weights

get_selection_weights() -> dict[int, int]

Return selection weights based on leadership frequency.

Note

Outputs weights proportional to how many examples each candidate leads, enabling weighted sampling.

Source code in src/gepa_adk/domain/state.py
def get_selection_weights(self) -> dict[int, int]:
    """Return selection weights based on leadership frequency.

    Note:
        Outputs weights proportional to how many examples each candidate
        leads, enabling weighted sampling.
    """
    weights: dict[int, int] = {}
    for leaders in self.example_leaders.values():
        for candidate_idx in leaders:
            weights[candidate_idx] = weights.get(candidate_idx, 0) + 1
    return weights

update_objective

update_objective(
    candidate_idx: int,
    objective_scores: dict[str, float],
    *,
    logger: FrontierLogger | None = None,
) -> None

Update objective-level frontier with a candidate's objective scores.

PARAMETER DESCRIPTION
candidate_idx

Index of the candidate being added.

TYPE: int

objective_scores

Mapping of objective name to score.

TYPE: dict[str, float]

logger

Optional structured logger for leader updates.

TYPE: FrontierLogger | None DEFAULT: None

Note

Outputs updated objective leader sets and best scores for objective-level frontier tracking.

Examples:

frontier.update_objective(0, {"accuracy": 0.9, "latency": 0.7})
Source code in src/gepa_adk/domain/state.py
def update_objective(
    self,
    candidate_idx: int,
    objective_scores: dict[str, float],
    *,
    logger: FrontierLogger | None = None,
) -> None:
    """Update objective-level frontier with a candidate's objective scores.

    Args:
        candidate_idx: Index of the candidate being added.
        objective_scores: Mapping of objective name to score.
        logger: Optional structured logger for leader updates.

    Note:
        Outputs updated objective leader sets and best scores for
        objective-level frontier tracking.

    Examples:
        ```python
        frontier.update_objective(0, {"accuracy": 0.9, "latency": 0.7})
        ```
    """
    for objective_name, score in objective_scores.items():
        best_score = self.objective_best_scores.get(objective_name)
        if best_score is None or score > best_score:
            previous_leaders = self.objective_leaders.get(objective_name, set())
            self.objective_best_scores[objective_name] = score
            self.objective_leaders[objective_name] = {candidate_idx}
            if logger is not None:
                logger.info(
                    "pareto_frontier.objective_leader_updated",
                    objective_name=objective_name,
                    candidate_idx=candidate_idx,
                    score=score,
                    previous_leaders=sorted(previous_leaders),
                )
        elif score == best_score:
            leaders = self.objective_leaders.setdefault(objective_name, set())
            if candidate_idx not in leaders:
                leaders.add(candidate_idx)
                if logger is not None:
                    logger.info(
                        "pareto_frontier.objective_leader_tied",
                        objective_name=objective_name,
                        candidate_idx=candidate_idx,
                        score=score,
                    )

update_cartesian

update_cartesian(
    candidate_idx: int,
    scores: dict[int, float],
    objective_scores: dict[int, dict[str, float]],
    *,
    logger: FrontierLogger | None = None,
) -> None

Update cartesian frontier per (example, objective) pair.

PARAMETER DESCRIPTION
candidate_idx

Index of the candidate being added.

TYPE: int

scores

Mapping of example index to score.

TYPE: dict[int, float]

objective_scores

Mapping of example index to objective scores dict.

TYPE: dict[int, dict[str, float]]

logger

Optional structured logger for leader updates.

TYPE: FrontierLogger | None DEFAULT: None

Note

Outputs updated cartesian leader sets and best scores for per (example, objective) pair frontier tracking.

Examples:

frontier.update_cartesian(
    0, {0: 0.8, 1: 0.6}, {0: {"accuracy": 0.9}, 1: {"accuracy": 0.7}}
)
Source code in src/gepa_adk/domain/state.py
def update_cartesian(
    self,
    candidate_idx: int,
    scores: dict[int, float],
    objective_scores: dict[int, dict[str, float]],
    *,
    logger: FrontierLogger | None = None,
) -> None:
    """Update cartesian frontier per (example, objective) pair.

    Args:
        candidate_idx: Index of the candidate being added.
        scores: Mapping of example index to score.
        objective_scores: Mapping of example index to objective scores dict.
        logger: Optional structured logger for leader updates.

    Note:
        Outputs updated cartesian leader sets and best scores for
        per (example, objective) pair frontier tracking.

    Examples:
        ```python
        frontier.update_cartesian(
            0, {0: 0.8, 1: 0.6}, {0: {"accuracy": 0.9}, 1: {"accuracy": 0.7}}
        )
        ```
    """
    for example_idx, example_objectives in objective_scores.items():
        for objective_name, score in example_objectives.items():
            key = (example_idx, objective_name)
            best_score = self.cartesian_best_scores.get(key)
            if best_score is None or score > best_score:
                previous_leaders = self.cartesian_leaders.get(key, set())
                self.cartesian_best_scores[key] = score
                self.cartesian_leaders[key] = {candidate_idx}
                if logger is not None:
                    logger.info(
                        "pareto_frontier.cartesian_leader_updated",
                        example_idx=example_idx,
                        objective_name=objective_name,
                        candidate_idx=candidate_idx,
                        score=score,
                        previous_leaders=sorted(previous_leaders),
                    )
            elif score == best_score:
                leaders = self.cartesian_leaders.setdefault(key, set())
                if candidate_idx not in leaders:
                    leaders.add(candidate_idx)
                    if logger is not None:
                        logger.info(
                            "pareto_frontier.cartesian_leader_tied",
                            example_idx=example_idx,
                            objective_name=objective_name,
                            candidate_idx=candidate_idx,
                            score=score,
                        )

get_pareto_front_mapping

get_pareto_front_mapping(
    frontier_type: FrontierType,
) -> dict[FrontierKey, set[int]]

Return frontier mapping for specified frontier type.

PARAMETER DESCRIPTION
frontier_type

Type of frontier to return mapping for.

TYPE: FrontierType

RETURNS DESCRIPTION
dict[FrontierKey, set[int]]

dict[FrontierKey, set[int]]: Mapping from frontier key to set of candidate indices.

RAISES DESCRIPTION
ValueError

If frontier_type is not a supported value.

Note

Outputs a mapping with keys appropriate for the frontier type (int for INSTANCE, str for OBJECTIVE, tuples for HYBRID/CARTESIAN).

Examples:

mapping = frontier.get_pareto_front_mapping(FrontierType.INSTANCE)
# Returns: {0: {1, 2}, 1: {2, 3}}
Source code in src/gepa_adk/domain/state.py
def get_pareto_front_mapping(
    self, frontier_type: FrontierType
) -> dict[FrontierKey, set[int]]:
    """Return frontier mapping for specified frontier type.

    Args:
        frontier_type (FrontierType): Type of frontier to return mapping for.

    Returns:
        dict[FrontierKey, set[int]]: Mapping from frontier key to set of
            candidate indices.

    Raises:
        ValueError: If frontier_type is not a supported value.

    Note:
        Outputs a mapping with keys appropriate for the frontier type
        (int for INSTANCE, str for OBJECTIVE, tuples for HYBRID/CARTESIAN).

    Examples:
        ```python
        mapping = frontier.get_pareto_front_mapping(FrontierType.INSTANCE)
        # Returns: {0: {1, 2}, 1: {2, 3}}
        ```
    """
    if frontier_type == FrontierType.INSTANCE:
        return {
            key: leaders.copy() for key, leaders in self.example_leaders.items()
        }
    elif frontier_type == FrontierType.OBJECTIVE:
        return {
            key: leaders.copy() for key, leaders in self.objective_leaders.items()
        }
    elif frontier_type == FrontierType.HYBRID:
        mapping: dict[FrontierKey, set[int]] = {}
        # Add instance-level with type tag
        for example_idx, leaders in self.example_leaders.items():
            mapping[("val_id", example_idx)] = leaders.copy()
        # Add objective-level with type tag
        for objective_name, leaders in self.objective_leaders.items():
            mapping[("objective", objective_name)] = leaders.copy()
        return mapping
    elif frontier_type == FrontierType.CARTESIAN:
        mapping: dict[FrontierKey, set[int]] = {}
        for (
            example_idx,
            objective_name,
        ), leaders in self.cartesian_leaders.items():
            mapping[("cartesian", example_idx, objective_name)] = leaders.copy()
        return mapping
    else:
        raise ValueError(f"Unknown frontier type: {frontier_type}")

ParetoState dataclass

Tracks evolution state for Pareto-aware selection.

ATTRIBUTE DESCRIPTION
candidates

Candidates discovered during evolution.

TYPE: list[Candidate]

candidate_scores

Per-example scores.

TYPE: dict[int, dict[int, float]]

frontier

Current frontier leader sets.

TYPE: ParetoFrontier

frontier_type

Frontier tracking strategy.

TYPE: FrontierType

iteration

Current iteration number.

TYPE: int

best_average_idx

Index of best-average candidate.

TYPE: int | None

parent_indices

Genealogy map tracking parent relationships. Maps candidate_idx → [parent_idx, ...] or [None] for seeds.

TYPE: dict[int, list[int | None]]

Note

A single state object keeps frontier and candidate metrics aligned.

Examples:

state = ParetoState()
state.add_candidate(Candidate(components={"instruction": "seed"}), [0.5])
Source code in src/gepa_adk/domain/state.py
@dataclass(slots=True)
class ParetoState:
    """Tracks evolution state for Pareto-aware selection.

    Attributes:
        candidates (list[Candidate]): Candidates discovered during evolution.
        candidate_scores (dict[int, dict[int, float]]): Per-example scores.
        frontier (ParetoFrontier): Current frontier leader sets.
        frontier_type (FrontierType): Frontier tracking strategy.
        iteration (int): Current iteration number.
        best_average_idx (int | None): Index of best-average candidate.
        parent_indices (dict[int, list[int | None]]): Genealogy map tracking
            parent relationships. Maps candidate_idx → [parent_idx, ...] or [None] for seeds.

    Note:
        A single state object keeps frontier and candidate metrics aligned.

    Examples:
        ```python
        state = ParetoState()
        state.add_candidate(Candidate(components={"instruction": "seed"}), [0.5])
        ```
    """

    candidates: list[Candidate] = field(default_factory=list)
    candidate_scores: dict[int, dict[int, float]] = field(default_factory=dict)
    candidate_objective_scores: dict[int, dict[str, float]] = field(
        default_factory=dict
    )
    frontier: ParetoFrontier = field(default_factory=ParetoFrontier)
    frontier_type: FrontierType = FrontierType.INSTANCE
    iteration: int = 0
    best_average_idx: int | None = None
    parent_indices: dict[int, list[int | None]] = field(default_factory=dict)
    _frontier_type_initialized: bool = field(default=False, init=False)

    def __post_init__(self) -> None:
        """Validate state configuration and initialize averages.

        Note:
            Checks candidate_scores indices are valid and marks frontier_type
            as initialized for immutability enforcement.
        """
        if self.candidate_scores:
            max_index = len(self.candidates) - 1
            for candidate_idx in self.candidate_scores:
                if candidate_idx > max_index:
                    raise ConfigurationError(
                        "candidate_scores index out of range",
                        field="candidate_scores",
                        value=candidate_idx,
                        constraint="<= len(candidates) - 1",
                    )
        self._frontier_type_initialized = True
        self.update_best_average()

    def __setattr__(self, name: str, value: object) -> None:
        """Enforce frontier_type immutability after initialization (T069).

        Note:
            Only frontier_type is protected because it determines the frontier
            update routing logic in add_candidate(). Other fields (candidates,
            frontier, candidate_scores) are intentionally mutable to support
            evolution state updates. Using frozen=True would prevent all
            mutations, which is too restrictive for evolution state management.
        """
        # Allow setting during __init__ and __post_init__
        if name == "frontier_type":
            # Check if we're in initialization phase
            if hasattr(self, "_frontier_type_initialized"):
                # Already initialized, check if we're trying to change it
                if (
                    self._frontier_type_initialized
                    and getattr(self, "frontier_type", None) != value
                ):
                    raise ConfigurationError(
                        "frontier_type cannot be changed after ParetoState initialization",
                        field="frontier_type",
                        value=value,
                        constraint="immutable after initialization",
                    )
        # Use object.__setattr__ to avoid recursion during initialization
        object.__setattr__(self, name, value)

    @staticmethod
    def _validate_parent_indices(
        indices: Sequence[int | None], label: str
    ) -> list[int | None]:
        """Validate parent indices for genealogy tracking."""
        validated: list[int | None] = []
        for idx, parent_idx in enumerate(indices):
            if not (isinstance(parent_idx, int) or parent_idx is None):
                raise TypeError(
                    f"{label} elements must be int or None; "
                    f"got {type(parent_idx).__name__} at position {idx}"
                )
            validated.append(parent_idx)
        return validated

    def add_candidate(
        self,
        candidate: Candidate,
        scores: Sequence[Score],
        *,
        score_indices: Sequence[int] | None = None,
        objective_scores: dict[str, float] | None = None,
        per_example_objective_scores: dict[int, dict[str, float]] | None = None,
        parent_indices: list[int] | None = None,
        logger: FrontierLogger | None = None,
    ) -> int:
        """Add a candidate and update frontier tracking.

        Args:
            candidate: Candidate to add.
            scores: Per-example scores for the candidate.
            score_indices: Optional sequence mapping scores to example indices.
                If None, scores are assumed to be indexed 0, 1, 2, ... (full valset).
                If provided, scores[i] corresponds to example index score_indices[i].
            objective_scores: Optional aggregated objective scores
                (required for OBJECTIVE, HYBRID, CARTESIAN).
            per_example_objective_scores: Optional per-example objective scores
                (required for CARTESIAN).
            parent_indices: Optional parent candidate indices for genealogy tracking.
                If None, uses candidate.parent_ids if available, otherwise [None] for seed.
            logger: Optional structured logger for frontier updates.

        Returns:
            Index of the newly added candidate.

        Raises:
            ConfigurationError: If objective_scores are required but not provided.

        Note:
            Outputs the new candidate index after routing to the appropriate
            frontier update method based on frontier_type.

        Examples:
            ```python
            candidate_idx = state.add_candidate(candidate, [0.7, 0.8])
            candidate_idx = state.add_candidate(
                candidate, [0.7, 0.8], objective_scores={"accuracy": 0.9}
            )
            ```
        """
        # Validate objective_scores requirement for objective-based frontier types
        if self.frontier_type in (
            FrontierType.OBJECTIVE,
            FrontierType.HYBRID,
            FrontierType.CARTESIAN,
        ):
            if objective_scores is None:
                raise ConfigurationError(
                    "objective_scores required for frontier_type",
                    field="objective_scores",
                    value=None,
                    constraint=f"required for {self.frontier_type}",
                )
            if (
                self.frontier_type == FrontierType.CARTESIAN
                and per_example_objective_scores is None
            ):
                raise ConfigurationError(
                    "per_example_objective_scores required for CARTESIAN frontier_type",
                    field="per_example_objective_scores",
                    value=None,
                    constraint="required for CARTESIAN",
                )

        candidate_idx = len(self.candidates)
        self.candidates.append(candidate)

        # Track parent indices for genealogy
        if parent_indices is not None:
            self.parent_indices[candidate_idx] = self._validate_parent_indices(
                parent_indices, "parent_indices"
            )
        elif candidate.parent_ids is not None:
            self.parent_indices[candidate_idx] = self._validate_parent_indices(
                candidate.parent_ids, "candidate.parent_ids"
            )
        else:
            # Seed candidate with no parents
            self.parent_indices[candidate_idx] = [None]

        # Map scores to example indices
        if score_indices is not None:
            if len(score_indices) != len(scores):
                raise ValueError(
                    f"score_indices length ({len(score_indices)}) must match "
                    f"scores length ({len(scores)})"
                )
            score_map = dict(zip(score_indices, scores))
        else:
            # Default: scores are indexed 0, 1, 2, ... (full valset)
            score_map = dict(enumerate(scores))
        self.candidate_scores[candidate_idx] = score_map

        # Store objective scores if provided
        if objective_scores is not None:
            self.candidate_objective_scores[candidate_idx] = objective_scores

        # Route to appropriate frontier update based on frontier_type
        if self.frontier_type == FrontierType.INSTANCE:
            self.frontier.update(candidate_idx, score_map, logger=logger)
        elif self.frontier_type == FrontierType.OBJECTIVE:
            assert objective_scores is not None  # Validated above
            self.frontier.update_objective(
                candidate_idx, objective_scores, logger=logger
            )
        elif self.frontier_type == FrontierType.HYBRID:
            assert objective_scores is not None  # Validated above
            self.frontier.update(candidate_idx, score_map, logger=logger)
            self.frontier.update_objective(
                candidate_idx, objective_scores, logger=logger
            )
        elif self.frontier_type == FrontierType.CARTESIAN:
            assert objective_scores is not None  # Validated above
            assert per_example_objective_scores is not None  # Validated above
            self.frontier.update_cartesian(
                candidate_idx, score_map, per_example_objective_scores, logger=logger
            )
        else:
            raise ValueError(f"Unknown frontier type: {self.frontier_type}")

        self.update_best_average()
        return candidate_idx

    def get_average_score(self, candidate_idx: int) -> float:
        """Return average score for a candidate.

        Args:
            candidate_idx: Index of the candidate.

        Returns:
            Mean score across examples.

        Raises:
            NoCandidateAvailableError: If candidate scores are missing.

        Note:
            Outputs the arithmetic mean of all scores for the candidate
            across evaluated examples.

        Examples:
            ```python
            average = state.get_average_score(candidate_idx)
            ```
        """
        scores = self.candidate_scores.get(candidate_idx)
        if not scores:
            raise NoCandidateAvailableError(
                "No scores available for candidate",
                candidate_idx=candidate_idx,
            )
        return fmean(scores.values())

    def update_best_average(self) -> None:
        """Update best_average_idx based on current scores.

        Note:
            Outputs the candidate index with the highest mean score, or None
            if no candidates have scores.
        """
        if not self.candidate_scores:
            self.best_average_idx = None
            return
        best_idx = None
        best_score = float("-inf")
        for candidate_idx, scores in self.candidate_scores.items():
            average = fmean(scores.values())
            if average > best_score:
                best_score = average
                best_idx = candidate_idx
        self.best_average_idx = best_idx

__post_init__

__post_init__() -> None

Validate state configuration and initialize averages.

Note

Checks candidate_scores indices are valid and marks frontier_type as initialized for immutability enforcement.

Source code in src/gepa_adk/domain/state.py
def __post_init__(self) -> None:
    """Validate state configuration and initialize averages.

    Note:
        Checks candidate_scores indices are valid and marks frontier_type
        as initialized for immutability enforcement.
    """
    if self.candidate_scores:
        max_index = len(self.candidates) - 1
        for candidate_idx in self.candidate_scores:
            if candidate_idx > max_index:
                raise ConfigurationError(
                    "candidate_scores index out of range",
                    field="candidate_scores",
                    value=candidate_idx,
                    constraint="<= len(candidates) - 1",
                )
    self._frontier_type_initialized = True
    self.update_best_average()

__setattr__

__setattr__(name: str, value: object) -> None

Enforce frontier_type immutability after initialization (T069).

Note

Only frontier_type is protected because it determines the frontier update routing logic in add_candidate(). Other fields (candidates, frontier, candidate_scores) are intentionally mutable to support evolution state updates. Using frozen=True would prevent all mutations, which is too restrictive for evolution state management.

Source code in src/gepa_adk/domain/state.py
def __setattr__(self, name: str, value: object) -> None:
    """Enforce frontier_type immutability after initialization (T069).

    Note:
        Only frontier_type is protected because it determines the frontier
        update routing logic in add_candidate(). Other fields (candidates,
        frontier, candidate_scores) are intentionally mutable to support
        evolution state updates. Using frozen=True would prevent all
        mutations, which is too restrictive for evolution state management.
    """
    # Allow setting during __init__ and __post_init__
    if name == "frontier_type":
        # Check if we're in initialization phase
        if hasattr(self, "_frontier_type_initialized"):
            # Already initialized, check if we're trying to change it
            if (
                self._frontier_type_initialized
                and getattr(self, "frontier_type", None) != value
            ):
                raise ConfigurationError(
                    "frontier_type cannot be changed after ParetoState initialization",
                    field="frontier_type",
                    value=value,
                    constraint="immutable after initialization",
                )
    # Use object.__setattr__ to avoid recursion during initialization
    object.__setattr__(self, name, value)

add_candidate

add_candidate(
    candidate: Candidate,
    scores: Sequence[Score],
    *,
    score_indices: Sequence[int] | None = None,
    objective_scores: dict[str, float] | None = None,
    per_example_objective_scores: dict[
        int, dict[str, float]
    ]
    | None = None,
    parent_indices: list[int] | None = None,
    logger: FrontierLogger | None = None,
) -> int

Add a candidate and update frontier tracking.

PARAMETER DESCRIPTION
candidate

Candidate to add.

TYPE: Candidate

scores

Per-example scores for the candidate.

TYPE: Sequence[Score]

score_indices

Optional sequence mapping scores to example indices. If None, scores are assumed to be indexed 0, 1, 2, ... (full valset). If provided, scores[i] corresponds to example index score_indices[i].

TYPE: Sequence[int] | None DEFAULT: None

objective_scores

Optional aggregated objective scores (required for OBJECTIVE, HYBRID, CARTESIAN).

TYPE: dict[str, float] | None DEFAULT: None

per_example_objective_scores

Optional per-example objective scores (required for CARTESIAN).

TYPE: dict[int, dict[str, float]] | None DEFAULT: None

parent_indices

Optional parent candidate indices for genealogy tracking. If None, uses candidate.parent_ids if available, otherwise [None] for seed.

TYPE: list[int] | None DEFAULT: None

logger

Optional structured logger for frontier updates.

TYPE: FrontierLogger | None DEFAULT: None

RETURNS DESCRIPTION
int

Index of the newly added candidate.

RAISES DESCRIPTION
ConfigurationError

If objective_scores are required but not provided.

Note

Outputs the new candidate index after routing to the appropriate frontier update method based on frontier_type.

Examples:

candidate_idx = state.add_candidate(candidate, [0.7, 0.8])
candidate_idx = state.add_candidate(
    candidate, [0.7, 0.8], objective_scores={"accuracy": 0.9}
)
Source code in src/gepa_adk/domain/state.py
def add_candidate(
    self,
    candidate: Candidate,
    scores: Sequence[Score],
    *,
    score_indices: Sequence[int] | None = None,
    objective_scores: dict[str, float] | None = None,
    per_example_objective_scores: dict[int, dict[str, float]] | None = None,
    parent_indices: list[int] | None = None,
    logger: FrontierLogger | None = None,
) -> int:
    """Add a candidate and update frontier tracking.

    Args:
        candidate: Candidate to add.
        scores: Per-example scores for the candidate.
        score_indices: Optional sequence mapping scores to example indices.
            If None, scores are assumed to be indexed 0, 1, 2, ... (full valset).
            If provided, scores[i] corresponds to example index score_indices[i].
        objective_scores: Optional aggregated objective scores
            (required for OBJECTIVE, HYBRID, CARTESIAN).
        per_example_objective_scores: Optional per-example objective scores
            (required for CARTESIAN).
        parent_indices: Optional parent candidate indices for genealogy tracking.
            If None, uses candidate.parent_ids if available, otherwise [None] for seed.
        logger: Optional structured logger for frontier updates.

    Returns:
        Index of the newly added candidate.

    Raises:
        ConfigurationError: If objective_scores are required but not provided.

    Note:
        Outputs the new candidate index after routing to the appropriate
        frontier update method based on frontier_type.

    Examples:
        ```python
        candidate_idx = state.add_candidate(candidate, [0.7, 0.8])
        candidate_idx = state.add_candidate(
            candidate, [0.7, 0.8], objective_scores={"accuracy": 0.9}
        )
        ```
    """
    # Validate objective_scores requirement for objective-based frontier types
    if self.frontier_type in (
        FrontierType.OBJECTIVE,
        FrontierType.HYBRID,
        FrontierType.CARTESIAN,
    ):
        if objective_scores is None:
            raise ConfigurationError(
                "objective_scores required for frontier_type",
                field="objective_scores",
                value=None,
                constraint=f"required for {self.frontier_type}",
            )
        if (
            self.frontier_type == FrontierType.CARTESIAN
            and per_example_objective_scores is None
        ):
            raise ConfigurationError(
                "per_example_objective_scores required for CARTESIAN frontier_type",
                field="per_example_objective_scores",
                value=None,
                constraint="required for CARTESIAN",
            )

    candidate_idx = len(self.candidates)
    self.candidates.append(candidate)

    # Track parent indices for genealogy
    if parent_indices is not None:
        self.parent_indices[candidate_idx] = self._validate_parent_indices(
            parent_indices, "parent_indices"
        )
    elif candidate.parent_ids is not None:
        self.parent_indices[candidate_idx] = self._validate_parent_indices(
            candidate.parent_ids, "candidate.parent_ids"
        )
    else:
        # Seed candidate with no parents
        self.parent_indices[candidate_idx] = [None]

    # Map scores to example indices
    if score_indices is not None:
        if len(score_indices) != len(scores):
            raise ValueError(
                f"score_indices length ({len(score_indices)}) must match "
                f"scores length ({len(scores)})"
            )
        score_map = dict(zip(score_indices, scores))
    else:
        # Default: scores are indexed 0, 1, 2, ... (full valset)
        score_map = dict(enumerate(scores))
    self.candidate_scores[candidate_idx] = score_map

    # Store objective scores if provided
    if objective_scores is not None:
        self.candidate_objective_scores[candidate_idx] = objective_scores

    # Route to appropriate frontier update based on frontier_type
    if self.frontier_type == FrontierType.INSTANCE:
        self.frontier.update(candidate_idx, score_map, logger=logger)
    elif self.frontier_type == FrontierType.OBJECTIVE:
        assert objective_scores is not None  # Validated above
        self.frontier.update_objective(
            candidate_idx, objective_scores, logger=logger
        )
    elif self.frontier_type == FrontierType.HYBRID:
        assert objective_scores is not None  # Validated above
        self.frontier.update(candidate_idx, score_map, logger=logger)
        self.frontier.update_objective(
            candidate_idx, objective_scores, logger=logger
        )
    elif self.frontier_type == FrontierType.CARTESIAN:
        assert objective_scores is not None  # Validated above
        assert per_example_objective_scores is not None  # Validated above
        self.frontier.update_cartesian(
            candidate_idx, score_map, per_example_objective_scores, logger=logger
        )
    else:
        raise ValueError(f"Unknown frontier type: {self.frontier_type}")

    self.update_best_average()
    return candidate_idx

get_average_score

get_average_score(candidate_idx: int) -> float

Return average score for a candidate.

PARAMETER DESCRIPTION
candidate_idx

Index of the candidate.

TYPE: int

RETURNS DESCRIPTION
float

Mean score across examples.

RAISES DESCRIPTION
NoCandidateAvailableError

If candidate scores are missing.

Note

Outputs the arithmetic mean of all scores for the candidate across evaluated examples.

Examples:

average = state.get_average_score(candidate_idx)
Source code in src/gepa_adk/domain/state.py
def get_average_score(self, candidate_idx: int) -> float:
    """Return average score for a candidate.

    Args:
        candidate_idx: Index of the candidate.

    Returns:
        Mean score across examples.

    Raises:
        NoCandidateAvailableError: If candidate scores are missing.

    Note:
        Outputs the arithmetic mean of all scores for the candidate
        across evaluated examples.

    Examples:
        ```python
        average = state.get_average_score(candidate_idx)
        ```
    """
    scores = self.candidate_scores.get(candidate_idx)
    if not scores:
        raise NoCandidateAvailableError(
            "No scores available for candidate",
            candidate_idx=candidate_idx,
        )
    return fmean(scores.values())

update_best_average

update_best_average() -> None

Update best_average_idx based on current scores.

Note

Outputs the candidate index with the highest mean score, or None if no candidates have scores.

Source code in src/gepa_adk/domain/state.py
def update_best_average(self) -> None:
    """Update best_average_idx based on current scores.

    Note:
        Outputs the candidate index with the highest mean score, or None
        if no candidates have scores.
    """
    if not self.candidate_scores:
        self.best_average_idx = None
        return
    best_idx = None
    best_score = float("-inf")
    for candidate_idx, scores in self.candidate_scores.items():
        average = fmean(scores.values())
        if average > best_score:
            best_score = average
            best_idx = candidate_idx
    self.best_average_idx = best_idx

StopperState dataclass

Immutable snapshot of evolution state for stopper decisions.

Provides stoppers with read-only access to evolution metrics without exposing internal engine state.

ATTRIBUTE DESCRIPTION
iteration

Current iteration number (0-indexed).

TYPE: int

best_score

Best score achieved so far.

TYPE: float

stagnation_counter

Number of iterations without improvement.

TYPE: int

total_evaluations

Count of all evaluate() calls made.

TYPE: int

candidates_count

Number of candidates in the frontier.

TYPE: int

elapsed_seconds

Wall-clock time since evolution started.

TYPE: float

Examples:

Creating a state snapshot:

from gepa_adk.domain.stopper import StopperState

state = StopperState(
    iteration=10,
    best_score=0.92,
    stagnation_counter=3,
    total_evaluations=100,
    candidates_count=5,
    elapsed_seconds=300.0,
)
print(state.best_score)  # 0.92
print(state.stagnation_counter)  # 3

Attempting to modify raises an error:

state.iteration = 11  # Raises FrozenInstanceError
Note

A frozen dataclass, all fields are immutable after creation. Using slots=True for memory efficiency.

Source code in src/gepa_adk/domain/stopper.py
@dataclass(frozen=True, slots=True)
class StopperState:
    """Immutable snapshot of evolution state for stopper decisions.

    Provides stoppers with read-only access to evolution metrics
    without exposing internal engine state.

    Attributes:
        iteration (int): Current iteration number (0-indexed).
        best_score (float): Best score achieved so far.
        stagnation_counter (int): Number of iterations without improvement.
        total_evaluations (int): Count of all evaluate() calls made.
        candidates_count (int): Number of candidates in the frontier.
        elapsed_seconds (float): Wall-clock time since evolution started.

    Examples:
        Creating a state snapshot:

        ```python
        from gepa_adk.domain.stopper import StopperState

        state = StopperState(
            iteration=10,
            best_score=0.92,
            stagnation_counter=3,
            total_evaluations=100,
            candidates_count=5,
            elapsed_seconds=300.0,
        )
        print(state.best_score)  # 0.92
        print(state.stagnation_counter)  # 3
        ```

        Attempting to modify raises an error:

        ```python
        state.iteration = 11  # Raises FrozenInstanceError
        ```

    Note:
        A frozen dataclass, all fields are immutable after creation.
        Using slots=True for memory efficiency.
    """

    iteration: int
    best_score: float
    stagnation_counter: int
    total_evaluations: int
    candidates_count: int
    elapsed_seconds: float

ADKTrajectory dataclass

Execution trace from ADK agent evaluation.

Captures complete execution details from a single agent evaluation run, including all tool calls, state changes, token usage, and final output. This data enables debugging, optimization, and reflection-based learning.

ATTRIBUTE DESCRIPTION
tool_calls

Immutable sequence of tool invocations during execution.

TYPE: tuple[ToolCallRecord, ...]

state_deltas

Sequence of state changes (session state updates).

TYPE: tuple[dict[str, Any], ...]

token_usage

Optional token consumption metrics from LLM calls.

TYPE: TokenUsage | None

final_output

Final text response from the agent.

TYPE: str

error

Error message if execution failed, None otherwise.

TYPE: str | None

Examples:

trajectory = ADKTrajectory(
    tool_calls=(ToolCallRecord("search", {"query": "AI"}, ["result1"], 0.1),),
    state_deltas=({"search_count": 1},),
    token_usage=TokenUsage(100, 50, 150),
    final_output="Based on the search...",
    error=None,
)
Note

All fields use immutable types (tuples, not lists) to prevent accidental modification of captured trace data.

Source code in src/gepa_adk/domain/trajectory.py
@dataclass(frozen=True, slots=True)
class ADKTrajectory:
    """Execution trace from ADK agent evaluation.

    Captures complete execution details from a single agent evaluation run,
    including all tool calls, state changes, token usage, and final output.
    This data enables debugging, optimization, and reflection-based learning.

    Attributes:
        tool_calls (tuple[ToolCallRecord, ...]): Immutable sequence of tool
            invocations during execution.
        state_deltas (tuple[dict[str, Any], ...]): Sequence of state changes
            (session state updates).
        token_usage (TokenUsage | None): Optional token consumption metrics
            from LLM calls.
        final_output (str): Final text response from the agent.
        error (str | None): Error message if execution failed, None otherwise.

    Examples:
        ```python
        trajectory = ADKTrajectory(
            tool_calls=(ToolCallRecord("search", {"query": "AI"}, ["result1"], 0.1),),
            state_deltas=({"search_count": 1},),
            token_usage=TokenUsage(100, 50, 150),
            final_output="Based on the search...",
            error=None,
        )
        ```

    Note:
        All fields use immutable types (tuples, not lists) to prevent
        accidental modification of captured trace data.
    """

    tool_calls: tuple[ToolCallRecord, ...]
    state_deltas: tuple[dict[str, Any], ...]
    token_usage: TokenUsage | None
    final_output: str
    error: str | None

MultiAgentTrajectory dataclass

Execution trace from multi-agent pipeline evaluation.

Captures individual agent trajectories and overall pipeline metrics.

ATTRIBUTE DESCRIPTION
agent_trajectories

Mapping of agent name to trajectory.

TYPE: dict[str, ADKTrajectory]

pipeline_output

Final output from the primary agent.

TYPE: str

total_token_usage

Aggregated token usage across all agents.

TYPE: TokenUsage | None

error

Error message if pipeline execution failed.

TYPE: str | None

Examples:

Creating a multi-agent trajectory:

from gepa_adk.domain.trajectory import (
    MultiAgentTrajectory,
    ADKTrajectory,
    TokenUsage,
)

trajectory = MultiAgentTrajectory(
    agent_trajectories={
        "generator": ADKTrajectory(...),
        "critic": ADKTrajectory(...),
    },
    pipeline_output="Generated code output",
    total_token_usage=TokenUsage(200, 100, 300),
    error=None,
)
Note

All fields use immutable types to prevent accidental modification of captured trace data. agent_trajectories maps agent names to their individual ADKTrajectory records.

Source code in src/gepa_adk/domain/trajectory.py
@dataclass(frozen=True, slots=True)
class MultiAgentTrajectory:
    """Execution trace from multi-agent pipeline evaluation.

    Captures individual agent trajectories and overall pipeline metrics.

    Attributes:
        agent_trajectories (dict[str, ADKTrajectory]): Mapping of agent name to trajectory.
        pipeline_output (str): Final output from the primary agent.
        total_token_usage (TokenUsage | None): Aggregated token usage across all agents.
        error (str | None): Error message if pipeline execution failed.

    Examples:
        Creating a multi-agent trajectory:

        ```python
        from gepa_adk.domain.trajectory import (
            MultiAgentTrajectory,
            ADKTrajectory,
            TokenUsage,
        )

        trajectory = MultiAgentTrajectory(
            agent_trajectories={
                "generator": ADKTrajectory(...),
                "critic": ADKTrajectory(...),
            },
            pipeline_output="Generated code output",
            total_token_usage=TokenUsage(200, 100, 300),
            error=None,
        )
        ```

    Note:
        All fields use immutable types to prevent accidental modification
        of captured trace data. agent_trajectories maps agent names to
        their individual ADKTrajectory records.
    """

    agent_trajectories: dict[str, ADKTrajectory]
    pipeline_output: str
    total_token_usage: TokenUsage | None
    error: str | None = None

TokenUsage dataclass

Token usage statistics from LLM calls.

Tracks token consumption for monitoring costs and performance of language model interactions during agent execution.

ATTRIBUTE DESCRIPTION
input_tokens

Number of tokens in the prompt/context.

TYPE: int

output_tokens

Number of tokens generated in the response.

TYPE: int

total_tokens

Sum of input_tokens and output_tokens.

TYPE: int

Examples:

usage = TokenUsage(input_tokens=150, output_tokens=50, total_tokens=200)
Source code in src/gepa_adk/domain/trajectory.py
@dataclass(frozen=True, slots=True)
class TokenUsage:
    """Token usage statistics from LLM calls.

    Tracks token consumption for monitoring costs and performance of
    language model interactions during agent execution.

    Attributes:
        input_tokens (int): Number of tokens in the prompt/context.
        output_tokens (int): Number of tokens generated in the response.
        total_tokens (int): Sum of input_tokens and output_tokens.

    Examples:
        ```python
        usage = TokenUsage(input_tokens=150, output_tokens=50, total_tokens=200)
        ```
    """

    input_tokens: int
    output_tokens: int
    total_tokens: int

ToolCallRecord dataclass

Record of a single tool call during agent execution.

Captures the invocation details of a tool/function call made by an agent during evaluation, including arguments, results, and timing information.

ATTRIBUTE DESCRIPTION
name

Tool or function name that was called.

TYPE: str

arguments

Dictionary of arguments passed to the tool.

TYPE: dict[str, Any]

result

Return value from the tool execution.

TYPE: Any

timestamp

Relative time in seconds from evaluation start.

TYPE: float

Examples:

record = ToolCallRecord(
    name="get_weather",
    arguments={"city": "Paris"},
    result={"temp": 22, "condition": "sunny"},
    timestamp=0.123,
)
Source code in src/gepa_adk/domain/trajectory.py
@dataclass(frozen=True, slots=True)
class ToolCallRecord:
    """Record of a single tool call during agent execution.

    Captures the invocation details of a tool/function call made by an agent
    during evaluation, including arguments, results, and timing information.

    Attributes:
        name (str): Tool or function name that was called.
        arguments (dict[str, Any]): Dictionary of arguments passed to the tool.
        result (Any): Return value from the tool execution.
        timestamp (float): Relative time in seconds from evaluation start.

    Examples:
        ```python
        record = ToolCallRecord(
            name="get_weather",
            arguments={"city": "Paris"},
            result={"temp": 22, "condition": "sunny"},
            timestamp=0.123,
        )
        ```
    """

    name: str
    arguments: dict[str, Any]
    result: Any
    timestamp: float

FrontierType

Bases: str, Enum


              flowchart TD
              gepa_adk.domain.FrontierType[FrontierType]

              

              click gepa_adk.domain.FrontierType href "" "gepa_adk.domain.FrontierType"
            

Supported frontier tracking strategies for Pareto selection.

Note

All four frontier types enable different Pareto dominance tracking strategies for multi-objective optimization.

Examples:

frontier_type = FrontierType.INSTANCE
Source code in src/gepa_adk/domain/types.py
class FrontierType(str, Enum):
    """Supported frontier tracking strategies for Pareto selection.

    Note:
        All four frontier types enable different Pareto dominance tracking
        strategies for multi-objective optimization.

    Examples:
        ```python
        frontier_type = FrontierType.INSTANCE
        ```
    """

    INSTANCE = "instance"
    OBJECTIVE = "objective"
    HYBRID = "hybrid"
    CARTESIAN = "cartesian"

SchemaConstraints dataclass

Constraints for output schema evolution.

Controls which fields must be preserved during schema evolution, including field existence and type constraints. Used by the OutputSchemaHandler to validate proposed schema mutations.

ATTRIBUTE DESCRIPTION
required_fields

Field names that must exist in evolved schemas. Mutations removing these fields are rejected.

TYPE: tuple[str, ...]

preserve_types

Mapping of field names to allowed type(s). Mutations changing a field's type to an incompatible type are rejected.

TYPE: dict[str, type | tuple[type, ...]]

Examples:

Preserve required fields only:

from gepa_adk.domain.types import SchemaConstraints

constraints = SchemaConstraints(
    required_fields=("score", "feedback"),
)

Preserve required fields with type constraints:

constraints = SchemaConstraints(
    required_fields=("score",),
    preserve_types={
        "score": (float, int),  # Allow numeric types
        "order_id": str,  # Must stay string
    },
)
Note

A frozen dataclass ensures immutability during evolution runs. Configuration is validated at evolution start.

Source code in src/gepa_adk/domain/types.py
@dataclass(frozen=True, slots=True)
class SchemaConstraints:
    """Constraints for output schema evolution.

    Controls which fields must be preserved during schema evolution,
    including field existence and type constraints. Used by the
    OutputSchemaHandler to validate proposed schema mutations.

    Attributes:
        required_fields (tuple[str, ...]): Field names that must exist
            in evolved schemas. Mutations removing these fields are rejected.
        preserve_types (dict[str, type | tuple[type, ...]]): Mapping of
            field names to allowed type(s). Mutations changing a field's
            type to an incompatible type are rejected.

    Examples:
        Preserve required fields only:

        ```python
        from gepa_adk.domain.types import SchemaConstraints

        constraints = SchemaConstraints(
            required_fields=("score", "feedback"),
        )
        ```

        Preserve required fields with type constraints:

        ```python
        constraints = SchemaConstraints(
            required_fields=("score",),
            preserve_types={
                "score": (float, int),  # Allow numeric types
                "order_id": str,  # Must stay string
            },
        )
        ```

    Note:
        A frozen dataclass ensures immutability during evolution runs.
        Configuration is validated at evolution start.
    """

    required_fields: tuple[str, ...] = ()
    preserve_types: dict[str, type | tuple[type, ...]] = field(default_factory=dict)

TrajectoryConfig dataclass

Configuration for trajectory extraction behavior.

Controls which components are extracted from ADK event streams, whether sensitive data should be redacted, and whether large values should be truncated.

Note

All configuration fields are immutable after instantiation, ensuring consistent extraction behavior throughout evolution.

ATTRIBUTE DESCRIPTION
include_tool_calls

Extract tool/function call records. Defaults to True.

TYPE: bool

include_state_deltas

Extract session state changes. Defaults to True.

TYPE: bool

include_token_usage

Extract LLM token consumption metrics. Defaults to True.

TYPE: bool

redact_sensitive

Apply sensitive data redaction. When True, fields matching sensitive_keys will be replaced with "[REDACTED]". Defaults to True for secure-by-default behavior.

TYPE: bool

sensitive_keys

Field names to redact via exact match. Case-sensitive. Defaults to ("password", "api_key", "token").

TYPE: tuple[str, ...]

max_string_length

Truncate strings longer than this with a marker indicating truncation. None disables truncation. Defaults to 10000 characters.

TYPE: int | None

Examples:

Default configuration (secure, with truncation):

config = TrajectoryConfig()  # All features enabled, redaction ON

Minimal configuration (tool calls only):

config = TrajectoryConfig(
    include_tool_calls=True,
    include_state_deltas=False,
    include_token_usage=False,
    redact_sensitive=False,
)

Custom sensitive keys and truncation:

config = TrajectoryConfig(
    sensitive_keys=("password", "api_key", "token", "ssn"),
    max_string_length=5000,  # Truncate DOM/screenshots earlier
)

Disable truncation (keep full values):

config = TrajectoryConfig(max_string_length=None)
Note

Redaction takes precedence over truncation. Sensitive values are replaced with "[REDACTED]" first, then truncation applies to the remaining (non-sensitive) strings.

Source code in src/gepa_adk/domain/types.py
@dataclass(frozen=True, slots=True)
class TrajectoryConfig:
    """Configuration for trajectory extraction behavior.

    Controls which components are extracted from ADK event streams,
    whether sensitive data should be redacted, and whether large
    values should be truncated.

    Note:
        All configuration fields are immutable after instantiation,
        ensuring consistent extraction behavior throughout evolution.

    Attributes:
        include_tool_calls (bool): Extract tool/function call records.
            Defaults to True.
        include_state_deltas (bool): Extract session state changes.
            Defaults to True.
        include_token_usage (bool): Extract LLM token consumption metrics.
            Defaults to True.
        redact_sensitive (bool): Apply sensitive data redaction. When True,
            fields matching sensitive_keys will be replaced with "[REDACTED]".
            Defaults to True for secure-by-default behavior.
        sensitive_keys (tuple[str, ...]): Field names to redact via exact
            match. Case-sensitive. Defaults to ("password", "api_key", "token").
        max_string_length (int | None): Truncate strings longer than this
            with a marker indicating truncation. None disables truncation.
            Defaults to 10000 characters.

    Examples:
        Default configuration (secure, with truncation):

        ```python
        config = TrajectoryConfig()  # All features enabled, redaction ON
        ```

        Minimal configuration (tool calls only):

        ```python
        config = TrajectoryConfig(
            include_tool_calls=True,
            include_state_deltas=False,
            include_token_usage=False,
            redact_sensitive=False,
        )
        ```

        Custom sensitive keys and truncation:

        ```python
        config = TrajectoryConfig(
            sensitive_keys=("password", "api_key", "token", "ssn"),
            max_string_length=5000,  # Truncate DOM/screenshots earlier
        )
        ```

        Disable truncation (keep full values):

        ```python
        config = TrajectoryConfig(max_string_length=None)
        ```

    Note:
        Redaction takes precedence over truncation. Sensitive values are
        replaced with "[REDACTED]" first, then truncation applies to the
        remaining (non-sensitive) strings.
    """

    include_tool_calls: bool = True
    include_state_deltas: bool = True
    include_token_usage: bool = True
    redact_sensitive: bool = True
    sensitive_keys: tuple[str, ...] = ("password", "api_key", "token")
    max_string_length: int | None = 10000