Skip to content

Full Module Index

gepa_adk

GEPA-ADK: Async-first evolution engine for agentic development.

This package provides domain models and utilities for evolving agent instructions using the GEPA (Generalized Evolutionary Prompt-programming Architecture) approach.

ATTRIBUTE DESCRIPTION
__version__

Package version from pyproject.toml.

TYPE: str

EvolutionConfig

Configuration parameters for evolution runs.

TYPE: class

EvolutionResult

Outcome of a completed evolution run.

TYPE: class

Candidate

Instruction candidate being evolved.

TYPE: class

IterationRecord

Metrics for a single evolution iteration.

TYPE: class

Score

Type alias for normalized scores.

TYPE: type

ComponentName

Type alias for component identifiers.

TYPE: type

ModelName

Type alias for model identifiers.

TYPE: type

TrajectoryConfig

Configuration for trajectory extraction.

TYPE: class

EvolutionError

Base exception for all gepa-adk errors.

TYPE: class

ConfigurationError

Raised when configuration validation fails.

TYPE: class

AsyncGEPAAdapter

Async adapter protocol for evaluation.

TYPE: protocol

EvaluationBatch

Evaluation results container for adapters.

TYPE: class

DataInst

Type variable for adapter input instances.

TYPE: type

Trajectory

Type variable for adapter traces.

TYPE: type

RolloutOutput

Type variable for adapter outputs.

TYPE: type

Examples:

Basic usage with configuration and candidates:

from gepa_adk import EvolutionConfig, Candidate

config = EvolutionConfig(max_iterations=10, patience=3)
candidate = Candidate(components={"instruction": "Be helpful"})

Configuring trajectory extraction:

from gepa_adk import TrajectoryConfig

trajectory_config = TrajectoryConfig(
    redact_sensitive=True,
    max_string_length=5000,
)
See Also
Note

This is the main entry point for the gepa-adk package. Domain models are re-exported here for convenient top-level access.

DEFAULT_COMPONENT_NAME module-attribute

DEFAULT_COMPONENT_NAME: ComponentName = 'instruction'

Default component name for single-component evolution.

This constant provides a single source of truth for the default component name used when evolving a single component (typically an agent's instruction). Use this constant instead of hardcoding 'instruction' throughout the codebase.

ComponentName module-attribute

ComponentName: TypeAlias = str

Name of a candidate component (e.g., 'instruction', 'output_schema').

ModelName module-attribute

ModelName: TypeAlias = str

Model identifier (e.g., 'gemini-2.5-flash', 'gpt-4o').

Score module-attribute

Score: TypeAlias = float

Normalized score, typically in [0.0, 1.0].

AllComponentSelector

Selects all available components for simultaneous update.

This selector returns the full list of components every time, enabling simultaneous evolution of all parts of the candidate.

Examples:

selector = AllComponentSelector()
all_comps = await selector.select_components(["a", "b"], 1, 0)
# Returns ["a", "b"]
Note

Always returns all components, enabling comprehensive mutations across the entire candidate in a single iteration.

Source code in src/gepa_adk/adapters/component_selector.py
class AllComponentSelector:
    """Selects all available components for simultaneous update.

    This selector returns the full list of components every time, enabling
    simultaneous evolution of all parts of the candidate.

    Examples:
        ```python
        selector = AllComponentSelector()
        all_comps = await selector.select_components(["a", "b"], 1, 0)
        # Returns ["a", "b"]
        ```

    Note:
        Always returns all components, enabling comprehensive mutations
        across the entire candidate in a single iteration.
    """

    async def select_components(
        self, components: list[str], iteration: int, candidate_idx: int
    ) -> list[str]:
        """Select all components to update.

        Args:
            components: List of available component keys.
            iteration: Current global iteration number (unused).
            candidate_idx: Index of the candidate being evolved (unused).

        Returns:
            List containing all component keys.

        Raises:
            ValueError: If components list is empty.

        Examples:
            ```python
            selected = await selector.select_components(["a", "b"], 1, 0)
            ```

        Note:
            Outputs the complete component list unchanged, enabling
            simultaneous evolution of all candidate parts.
        """
        if not components:
            raise ValueError("No components provided for selection")

        return list(components)

select_components async

select_components(
    components: list[str],
    iteration: int,
    candidate_idx: int,
) -> list[str]

Select all components to update.

PARAMETER DESCRIPTION
components

List of available component keys.

TYPE: list[str]

iteration

Current global iteration number (unused).

TYPE: int

candidate_idx

Index of the candidate being evolved (unused).

TYPE: int

RETURNS DESCRIPTION
list[str]

List containing all component keys.

RAISES DESCRIPTION
ValueError

If components list is empty.

Examples:

selected = await selector.select_components(["a", "b"], 1, 0)
Note

Outputs the complete component list unchanged, enabling simultaneous evolution of all candidate parts.

Source code in src/gepa_adk/adapters/component_selector.py
async def select_components(
    self, components: list[str], iteration: int, candidate_idx: int
) -> list[str]:
    """Select all components to update.

    Args:
        components: List of available component keys.
        iteration: Current global iteration number (unused).
        candidate_idx: Index of the candidate being evolved (unused).

    Returns:
        List containing all component keys.

    Raises:
        ValueError: If components list is empty.

    Examples:
        ```python
        selected = await selector.select_components(["a", "b"], 1, 0)
        ```

    Note:
        Outputs the complete component list unchanged, enabling
        simultaneous evolution of all candidate parts.
    """
    if not components:
        raise ValueError("No components provided for selection")

    return list(components)

RoundRobinComponentSelector

Selects components in a round-robin fashion.

This selector cycles through the list of components one by one, maintaining state per candidate index to ensure consistent rotation.

ATTRIBUTE DESCRIPTION
_next_index

Mapping of candidate_idx to next component index.

TYPE: dict[int, int]

Examples:

selector = RoundRobinComponentSelector()
# First call selects first component
c1 = await selector.select_components(["a", "b"], 1, 0)  # ["a"]
# Second call selects second component
c2 = await selector.select_components(["a", "b"], 2, 0)  # ["b"]
Note

Alternates through components sequentially, ensuring balanced evolution across all candidate parts.

Source code in src/gepa_adk/adapters/component_selector.py
class RoundRobinComponentSelector:
    """Selects components in a round-robin fashion.

    This selector cycles through the list of components one by one, maintaining
    state per candidate index to ensure consistent rotation.

    Attributes:
        _next_index (dict[int, int]): Mapping of candidate_idx to next component index.

    Examples:
        ```python
        selector = RoundRobinComponentSelector()
        # First call selects first component
        c1 = await selector.select_components(["a", "b"], 1, 0)  # ["a"]
        # Second call selects second component
        c2 = await selector.select_components(["a", "b"], 2, 0)  # ["b"]
        ```

    Note:
        Alternates through components sequentially, ensuring balanced evolution
        across all candidate parts.
    """

    def __init__(self) -> None:
        """Initialize the round-robin selector.

        Note:
            Creates empty index tracking dictionary for per-candidate rotation state.
        """
        self._next_index: dict[int, int] = defaultdict(int)

    async def select_components(
        self, components: list[str], iteration: int, candidate_idx: int
    ) -> list[str]:
        """Select a single component to update using round-robin logic.

        Args:
            components: List of available component keys.
            iteration: Current global iteration number (unused by this strategy).
            candidate_idx: Index of the candidate being evolved.

        Returns:
            List containing the single selected component key.

        Raises:
            ValueError: If components list is empty.

        Examples:
            ```python
            selected = await selector.select_components(["a", "b"], 1, 0)
            ```

        Note:
            Outputs one component per call, advancing the rotation index for
            the specified candidate.
        """
        if not components:
            raise ValueError("No components provided for selection")

        # Get current index for this candidate
        current_idx = self._next_index[candidate_idx]

        # Select component
        selected = components[current_idx % len(components)]

        # Advance index for next time
        self._next_index[candidate_idx] = (current_idx + 1) % len(components)

        return [selected]

__init__

__init__() -> None

Initialize the round-robin selector.

Note

Creates empty index tracking dictionary for per-candidate rotation state.

Source code in src/gepa_adk/adapters/component_selector.py
def __init__(self) -> None:
    """Initialize the round-robin selector.

    Note:
        Creates empty index tracking dictionary for per-candidate rotation state.
    """
    self._next_index: dict[int, int] = defaultdict(int)

select_components async

select_components(
    components: list[str],
    iteration: int,
    candidate_idx: int,
) -> list[str]

Select a single component to update using round-robin logic.

PARAMETER DESCRIPTION
components

List of available component keys.

TYPE: list[str]

iteration

Current global iteration number (unused by this strategy).

TYPE: int

candidate_idx

Index of the candidate being evolved.

TYPE: int

RETURNS DESCRIPTION
list[str]

List containing the single selected component key.

RAISES DESCRIPTION
ValueError

If components list is empty.

Examples:

selected = await selector.select_components(["a", "b"], 1, 0)
Note

Outputs one component per call, advancing the rotation index for the specified candidate.

Source code in src/gepa_adk/adapters/component_selector.py
async def select_components(
    self, components: list[str], iteration: int, candidate_idx: int
) -> list[str]:
    """Select a single component to update using round-robin logic.

    Args:
        components: List of available component keys.
        iteration: Current global iteration number (unused by this strategy).
        candidate_idx: Index of the candidate being evolved.

    Returns:
        List containing the single selected component key.

    Raises:
        ValueError: If components list is empty.

    Examples:
        ```python
        selected = await selector.select_components(["a", "b"], 1, 0)
        ```

    Note:
        Outputs one component per call, advancing the rotation index for
        the specified candidate.
    """
    if not components:
        raise ValueError("No components provided for selection")

    # Get current index for this candidate
    current_idx = self._next_index[candidate_idx]

    # Select component
    selected = components[current_idx % len(components)]

    # Advance index for next time
    self._next_index[candidate_idx] = (current_idx + 1) % len(components)

    return [selected]

CriticOutput

Bases: BaseModel


              flowchart TD
              gepa_adk.CriticOutput[CriticOutput]

              

              click gepa_adk.CriticOutput href "" "gepa_adk.CriticOutput"
            

Advanced schema for structured critic feedback with dimensions.

This schema defines the expected JSON structure that critic agents should return when configured with output_schema. The score field is required, while other fields are optional and will be preserved in metadata.

ATTRIBUTE DESCRIPTION
score

Score value between 0.0 and 1.0 (required).

TYPE: float

feedback

Human-readable feedback text (optional).

TYPE: str

dimension_scores

Per-dimension evaluation scores (optional).

TYPE: dict[str, float]

actionable_guidance

Specific improvement suggestions (optional).

TYPE: str

Examples:

Advanced critic output:

{
    "score": 0.75,
    "feedback": "Good response but could be more concise",
    "dimension_scores": {
        "accuracy": 0.9,
        "clarity": 0.6,
        "completeness": 0.8
    },
    "actionable_guidance": "Reduce response length by 30%"
}
Note

All critic agents using this schema must return structured JSON. When this schema is used as output_schema on an LlmAgent, the agent can ONLY reply and CANNOT use any tools. This is acceptable for critic agents focused on scoring.

See Also

SimpleCriticOutput: KISS schema with just score + feedback.

Source code in src/gepa_adk/adapters/critic_scorer.py
class CriticOutput(BaseModel):
    """Advanced schema for structured critic feedback with dimensions.

    This schema defines the expected JSON structure that critic agents
    should return when configured with output_schema. The score field is
    required, while other fields are optional and will be preserved in
    metadata.

    Attributes:
        score: Score value between 0.0 and 1.0 (required).
        feedback: Human-readable feedback text (optional).
        dimension_scores: Per-dimension evaluation scores (optional).
        actionable_guidance: Specific improvement suggestions (optional).

    Examples:
        Advanced critic output:

        ```json
        {
            "score": 0.75,
            "feedback": "Good response but could be more concise",
            "dimension_scores": {
                "accuracy": 0.9,
                "clarity": 0.6,
                "completeness": 0.8
            },
            "actionable_guidance": "Reduce response length by 30%"
        }
        ```

    Note:
        All critic agents using this schema must return structured JSON.
        When this schema is used as output_schema on an LlmAgent, the
        agent can ONLY reply and CANNOT use any tools. This is acceptable
        for critic agents focused on scoring.

    See Also:
        SimpleCriticOutput: KISS schema with just score + feedback.
    """

    score: float = Field(
        ...,
        ge=0.0,
        le=1.0,
        description="Score from 0.0 to 1.0",
    )
    feedback: str = Field(
        default="",
        description="Human-readable feedback",
    )
    dimension_scores: dict[str, float] = Field(
        default_factory=dict,
        description="Per-dimension scores",
    )
    actionable_guidance: str = Field(
        default="",
        description="Improvement suggestions",
    )

SimpleCriticOutput

Bases: BaseModel


              flowchart TD
              gepa_adk.SimpleCriticOutput[SimpleCriticOutput]

              

              click gepa_adk.SimpleCriticOutput href "" "gepa_adk.SimpleCriticOutput"
            

KISS schema for basic critic feedback.

This is the minimal schema for critic agents that only need to provide a score and text feedback. Use this for straightforward evaluation tasks where dimension breakdowns are not needed.

ATTRIBUTE DESCRIPTION
score

Score value between 0.0 and 1.0 (required).

TYPE: float

feedback

Human-readable feedback text (required).

TYPE: str

Examples:

Simple critic output:

{
    "score": 0.75,
    "feedback": "Good response but could be more concise."
}

Using with LlmAgent:

from google.adk.agents import LlmAgent
from gepa_adk.adapters.critic_scorer import SimpleCriticOutput

critic = LlmAgent(
    name="simple_critic",
    model="gemini-2.5-flash",
    instruction=SIMPLE_CRITIC_INSTRUCTION,
    output_schema=SimpleCriticOutput,
)
Note

Applies to basic evaluation tasks where only a score and feedback are needed. For more detailed evaluations with dimension scores, use CriticOutput instead.

See Also

CriticOutput: Advanced schema with dimension scores and guidance.

Source code in src/gepa_adk/adapters/critic_scorer.py
class SimpleCriticOutput(BaseModel):
    """KISS schema for basic critic feedback.

    This is the minimal schema for critic agents that only need to provide
    a score and text feedback. Use this for straightforward evaluation tasks
    where dimension breakdowns are not needed.

    Attributes:
        score: Score value between 0.0 and 1.0 (required).
        feedback: Human-readable feedback text (required).

    Examples:
        Simple critic output:

        ```json
        {
            "score": 0.75,
            "feedback": "Good response but could be more concise."
        }
        ```

        Using with LlmAgent:

        ```python
        from google.adk.agents import LlmAgent
        from gepa_adk.adapters.critic_scorer import SimpleCriticOutput

        critic = LlmAgent(
            name="simple_critic",
            model="gemini-2.5-flash",
            instruction=SIMPLE_CRITIC_INSTRUCTION,
            output_schema=SimpleCriticOutput,
        )
        ```

    Note:
        Applies to basic evaluation tasks where only a score and feedback
        are needed. For more detailed evaluations with dimension scores,
        use CriticOutput instead.

    See Also:
        CriticOutput: Advanced schema with dimension scores and guidance.
    """

    score: float = Field(
        ...,
        ge=0.0,
        le=1.0,
        description="Score from 0.0 to 1.0",
    )
    feedback: str = Field(
        ...,
        description="Human-readable feedback explaining the score",
    )

Candidate dataclass

Represents an instruction candidate being evolved.

Unlike GEPA's simple dict[str, str] type alias, this class provides richer state tracking for async scenarios including lineage and metadata.

ATTRIBUTE DESCRIPTION
components

Component name to text value mapping. Common keys include 'instruction' (main agent prompt) and 'output_schema'.

TYPE: dict[str, str]

generation

Generation number in the evolution lineage (0 = initial).

TYPE: int

parent_id

ID of the parent candidate for lineage tracking (legacy field, retained for compatibility).

TYPE: str | None

parent_ids

Multi-parent indices for merge operations. None for seed candidates, [single_idx] for mutations, [idx1, idx2] for merges.

TYPE: list[int] | None

metadata

Extensible metadata dict for async tracking and debugging.

TYPE: dict[str, Any]

Examples:

Creating a candidate:

from gepa_adk.domain.models import Candidate

candidate = Candidate(
    components={"instruction": "Be helpful"},
    generation=0,
)
print(candidate.components["instruction"])  # Be helpful
print(candidate.generation)  # 0
Note

A mutable candidate representation with richer state tracking than GEPA's simple dict. Components and metadata can be modified during the evolution process. Use generation and parent_id to track lineage.

Source code in src/gepa_adk/domain/models.py
@dataclass(slots=True, kw_only=True)
class Candidate:
    """Represents an instruction candidate being evolved.

    Unlike GEPA's simple `dict[str, str]` type alias, this class provides
    richer state tracking for async scenarios including lineage and metadata.

    Attributes:
        components (dict[str, str]): Component name to text value mapping.
            Common keys include 'instruction' (main agent prompt) and
            'output_schema'.
        generation (int): Generation number in the evolution lineage
            (0 = initial).
        parent_id (str | None): ID of the parent candidate for lineage
            tracking (legacy field, retained for compatibility).
        parent_ids (list[int] | None): Multi-parent indices for merge operations.
            None for seed candidates, [single_idx] for mutations, [idx1, idx2] for merges.
        metadata (dict[str, Any]): Extensible metadata dict for async tracking
            and debugging.

    Examples:
        Creating a candidate:

        ```python
        from gepa_adk.domain.models import Candidate

        candidate = Candidate(
            components={"instruction": "Be helpful"},
            generation=0,
        )
        print(candidate.components["instruction"])  # Be helpful
        print(candidate.generation)  # 0
        ```

    Note:
        A mutable candidate representation with richer state tracking than
        GEPA's simple dict. Components and metadata can be modified during
        the evolution process. Use generation and parent_id to track lineage.
    """

    components: dict[str, str] = field(default_factory=dict)
    generation: int = 0
    parent_id: str | None = None
    parent_ids: list[int] | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

ConfigurationError

Bases: EvolutionError


              flowchart TD
              gepa_adk.ConfigurationError[ConfigurationError]
              gepa_adk.domain.exceptions.EvolutionError[EvolutionError]

                              gepa_adk.domain.exceptions.EvolutionError --> gepa_adk.ConfigurationError
                


              click gepa_adk.ConfigurationError href "" "gepa_adk.ConfigurationError"
              click gepa_adk.domain.exceptions.EvolutionError href "" "gepa_adk.domain.exceptions.EvolutionError"
            

Raised when configuration validation fails.

This exception is raised during EvolutionConfig initialization when a parameter violates its validation constraints.

ATTRIBUTE DESCRIPTION
field

The name of the configuration field that failed validation.

TYPE: str | None

value

The invalid value that was provided.

TYPE: object

constraint

Description of the validation constraint that was violated.

TYPE: str | None

Examples:

Creating a configuration error with context:

from gepa_adk.domain.exceptions import ConfigurationError

error = ConfigurationError(
    "max_iterations must be non-negative",
    field="max_iterations",
    value=-5,
    constraint=">= 0",
)
print(error.field, error.value, error.constraint)
# Output: max_iterations -5 >= 0
Note

Arises from user-provided invalid settings, not programming errors. Should be caught and reported with clear guidance on valid values.

Source code in src/gepa_adk/domain/exceptions.py
class ConfigurationError(EvolutionError):
    """Raised when configuration validation fails.

    This exception is raised during EvolutionConfig initialization
    when a parameter violates its validation constraints.

    Attributes:
        field (str | None): The name of the configuration field that failed
            validation.
        value (object): The invalid value that was provided.
        constraint (str | None): Description of the validation constraint that
            was violated.

    Examples:
        Creating a configuration error with context:

        ```python
        from gepa_adk.domain.exceptions import ConfigurationError

        error = ConfigurationError(
            "max_iterations must be non-negative",
            field="max_iterations",
            value=-5,
            constraint=">= 0",
        )
        print(error.field, error.value, error.constraint)
        # Output: max_iterations -5 >= 0
        ```

    Note:
        Arises from user-provided invalid settings, not programming errors.
        Should be caught and reported with clear guidance on valid values.
    """

    def __init__(
        self,
        message: str,
        *,
        field: str | None = None,
        value: object = None,
        constraint: str | None = None,
    ) -> None:
        """Initialize ConfigurationError with context.

        Args:
            message: Human-readable error description.
            field: Name of the invalid configuration field.
            value: The invalid value provided.
            constraint: Description of the validation constraint.

        Note:
            Context fields use keyword-only syntax to ensure explicit labeling
            and prevent positional argument mistakes.
        """
        super().__init__(message)
        self.field = field
        self.value = value
        self.constraint = constraint

    def __str__(self) -> str:
        """Return string representation with context.

        Returns:
            Formatted error message including field and value context.

        Note:
            Outputs formatted error message with field and value context
            when available, preserving base message structure.
        """
        base = super().__str__()
        context_parts = []
        if self.field is not None:
            context_parts.append(f"field={self.field!r}")
        if self.value is not None:
            context_parts.append(f"value={self.value!r}")
        if context_parts:
            return f"{base} [{', '.join(context_parts)}]"
        return base

__init__

__init__(
    message: str,
    *,
    field: str | None = None,
    value: object = None,
    constraint: str | None = None,
) -> None

Initialize ConfigurationError with context.

PARAMETER DESCRIPTION
message

Human-readable error description.

TYPE: str

field

Name of the invalid configuration field.

TYPE: str | None DEFAULT: None

value

The invalid value provided.

TYPE: object DEFAULT: None

constraint

Description of the validation constraint.

TYPE: str | None DEFAULT: None

Note

Context fields use keyword-only syntax to ensure explicit labeling and prevent positional argument mistakes.

Source code in src/gepa_adk/domain/exceptions.py
def __init__(
    self,
    message: str,
    *,
    field: str | None = None,
    value: object = None,
    constraint: str | None = None,
) -> None:
    """Initialize ConfigurationError with context.

    Args:
        message: Human-readable error description.
        field: Name of the invalid configuration field.
        value: The invalid value provided.
        constraint: Description of the validation constraint.

    Note:
        Context fields use keyword-only syntax to ensure explicit labeling
        and prevent positional argument mistakes.
    """
    super().__init__(message)
    self.field = field
    self.value = value
    self.constraint = constraint

__str__

__str__() -> str

Return string representation with context.

RETURNS DESCRIPTION
str

Formatted error message including field and value context.

Note

Outputs formatted error message with field and value context when available, preserving base message structure.

Source code in src/gepa_adk/domain/exceptions.py
def __str__(self) -> str:
    """Return string representation with context.

    Returns:
        Formatted error message including field and value context.

    Note:
        Outputs formatted error message with field and value context
        when available, preserving base message structure.
    """
    base = super().__str__()
    context_parts = []
    if self.field is not None:
        context_parts.append(f"field={self.field!r}")
    if self.value is not None:
        context_parts.append(f"value={self.value!r}")
    if context_parts:
        return f"{base} [{', '.join(context_parts)}]"
    return base

EvolutionConfig dataclass

Configuration parameters for an evolution run.

Defines the parameters that control how evolution proceeds, including iteration limits, concurrency settings, and stopping criteria.

ATTRIBUTE DESCRIPTION
max_iterations

Maximum number of evolution iterations. 0 means just evaluate baseline without evolving.

TYPE: int

max_concurrent_evals

Number of concurrent batch evaluations. Must be at least 1.

TYPE: int

min_improvement_threshold

Minimum score improvement to accept a new candidate. Set to 0.0 to accept any improvement.

TYPE: float

patience

Number of iterations without improvement before stopping early. Set to 0 to disable early stopping.

TYPE: int

reflection_model

Model identifier for reflection/mutation operations.

TYPE: str

frontier_type

Frontier tracking strategy for Pareto selection (default: INSTANCE).

TYPE: FrontierType

acceptance_metric

Aggregation method for acceptance decisions on iteration evaluation batches. "sum" uses sum of scores (default, aligns with upstream GEPA). "mean" uses mean of scores (legacy behavior).

TYPE: Literal['sum', 'mean']

use_merge

Enable merge proposals for genetic crossover. Defaults to False.

TYPE: bool

max_merge_invocations

Maximum number of merge attempts per run. Defaults to 10. Must be non-negative.

TYPE: int

reflection_prompt

Custom reflection/mutation prompt template. If provided, this template is used instead of the default when the reflection model proposes improved text. Required placeholders: - {component_text}: The current component text being evolved - {trials}: Trial data with feedback and trajectory for each test case If None or empty string, the default prompt template is used.

TYPE: str | None

stop_callbacks

List of stopper callbacks for custom stop conditions. Each callback receives a StopperState and returns True to signal stop. Defaults to an empty list.

TYPE: list[StopperProtocol]

Examples:

Creating a configuration with defaults:

from gepa_adk.domain.models import EvolutionConfig

config = EvolutionConfig(max_iterations=100, patience=10)
print(config.max_iterations)  # 100
print(config.reflection_model)  # ollama_chat/gpt-oss:20b
Note

All numeric parameters are validated in post_init to ensure they meet their constraints. Invalid values raise ConfigurationError.

Source code in src/gepa_adk/domain/models.py
@dataclass(slots=True, kw_only=True)
class EvolutionConfig:
    """Configuration parameters for an evolution run.

    Defines the parameters that control how evolution proceeds, including
    iteration limits, concurrency settings, and stopping criteria.

    Attributes:
        max_iterations (int): Maximum number of evolution iterations. 0 means
            just evaluate baseline without evolving.
        max_concurrent_evals (int): Number of concurrent batch evaluations.
            Must be at least 1.
        min_improvement_threshold (float): Minimum score improvement to accept
            a new candidate. Set to 0.0 to accept any improvement.
        patience (int): Number of iterations without improvement before stopping
            early. Set to 0 to disable early stopping.
        reflection_model (str): Model identifier for reflection/mutation
            operations.
        frontier_type (FrontierType): Frontier tracking strategy for Pareto
            selection (default: INSTANCE).
        acceptance_metric (Literal["sum", "mean"]): Aggregation method for
            acceptance decisions on iteration evaluation batches. "sum" uses
            sum of scores (default, aligns with upstream GEPA). "mean" uses
            mean of scores (legacy behavior).
        use_merge (bool): Enable merge proposals for genetic crossover.
            Defaults to False.
        max_merge_invocations (int): Maximum number of merge attempts per run.
            Defaults to 10. Must be non-negative.
        reflection_prompt (str | None): Custom reflection/mutation prompt template.
            If provided, this template is used instead of the default when the
            reflection model proposes improved text. Required placeholders:
            - {component_text}: The current component text being evolved
            - {trials}: Trial data with feedback and trajectory for each test case
            If None or empty string, the default prompt template is used.
        stop_callbacks (list[StopperProtocol]): List of stopper callbacks for
            custom stop conditions. Each callback receives a StopperState and
            returns True to signal stop. Defaults to an empty list.

    Examples:
        Creating a configuration with defaults:

        ```python
        from gepa_adk.domain.models import EvolutionConfig

        config = EvolutionConfig(max_iterations=100, patience=10)
        print(config.max_iterations)  # 100
        print(config.reflection_model)  # ollama_chat/gpt-oss:20b
        ```

    Note:
        All numeric parameters are validated in __post_init__ to ensure
        they meet their constraints. Invalid values raise ConfigurationError.
    """

    max_iterations: int = 50
    max_concurrent_evals: int = 5
    min_improvement_threshold: float = 0.01
    patience: int = 5
    reflection_model: str = "ollama_chat/gpt-oss:20b"
    frontier_type: FrontierType = FrontierType.INSTANCE
    acceptance_metric: Literal["sum", "mean"] = "sum"
    use_merge: bool = False
    max_merge_invocations: int = 10
    reflection_prompt: str | None = None
    stop_callbacks: list["StopperProtocol"] = field(default_factory=list)

    def __post_init__(self) -> None:
        """Validate configuration parameters after initialization.

        Raises:
            ConfigurationError: If any parameter violates its constraints.

        Note:
            Operates automatically after dataclass __init__ completes. Validates
            all fields and raises ConfigurationError with context on failure.
        """
        if self.max_iterations < 0:
            raise ConfigurationError(
                "max_iterations must be non-negative",
                field="max_iterations",
                value=self.max_iterations,
                constraint=">= 0",
            )

        if self.max_concurrent_evals < 1:
            raise ConfigurationError(
                "max_concurrent_evals must be at least 1",
                field="max_concurrent_evals",
                value=self.max_concurrent_evals,
                constraint=">= 1",
            )

        if self.min_improvement_threshold < 0.0:
            raise ConfigurationError(
                "min_improvement_threshold must be non-negative",
                field="min_improvement_threshold",
                value=self.min_improvement_threshold,
                constraint=">= 0.0",
            )

        if self.patience < 0:
            raise ConfigurationError(
                "patience must be non-negative",
                field="patience",
                value=self.patience,
                constraint=">= 0",
            )

        if not self.reflection_model:
            raise ConfigurationError(
                "reflection_model must be a non-empty string",
                field="reflection_model",
                value=self.reflection_model,
                constraint="non-empty string",
            )

        if not isinstance(self.frontier_type, FrontierType):
            try:
                self.frontier_type = FrontierType(self.frontier_type)
            except ValueError as exc:
                raise ConfigurationError(
                    "frontier_type must be a supported FrontierType value",
                    field="frontier_type",
                    value=self.frontier_type,
                    constraint=", ".join(t.value for t in FrontierType),
                ) from exc

        if self.acceptance_metric not in ("sum", "mean"):
            raise ConfigurationError(
                "acceptance_metric must be 'sum' or 'mean'",
                field="acceptance_metric",
                value=self.acceptance_metric,
                constraint="sum|mean",
            )

        if self.max_merge_invocations < 0:
            raise ConfigurationError(
                "max_merge_invocations must be non-negative",
                field="max_merge_invocations",
                value=self.max_merge_invocations,
                constraint=">= 0",
            )

        # Validate reflection_prompt if provided
        self._validate_reflection_prompt()

    def _validate_reflection_prompt(self) -> None:
        """Validate reflection_prompt and handle empty string.

        Converts empty string to None with info log. Warns if required
        placeholders are missing but allows the config to be created.

        Note:
            Soft validation approach - missing placeholders trigger warnings
            but don't prevent config creation for maximum flexibility.
        """
        # Handle empty string as "use default"
        if self.reflection_prompt == "":
            logger.info(
                "config.reflection_prompt.empty",
                message="Empty reflection_prompt provided, using default template",
            )
            # Use object.__setattr__ because slots=True prevents direct assignment
            object.__setattr__(self, "reflection_prompt", None)
            return

        # Skip validation if None
        if self.reflection_prompt is None:
            return

        # Warn about missing placeholders
        if "{component_text}" not in self.reflection_prompt:
            logger.warning(
                "config.reflection_prompt.missing_placeholder",
                placeholder="component_text",
                message="reflection_prompt is missing {component_text} placeholder",
            )

        if "{trials}" not in self.reflection_prompt:
            logger.warning(
                "config.reflection_prompt.missing_placeholder",
                placeholder="trials",
                message="reflection_prompt is missing {trials} placeholder",
            )

__post_init__

__post_init__() -> None

Validate configuration parameters after initialization.

RAISES DESCRIPTION
ConfigurationError

If any parameter violates its constraints.

Note

Operates automatically after dataclass init completes. Validates all fields and raises ConfigurationError with context on failure.

Source code in src/gepa_adk/domain/models.py
def __post_init__(self) -> None:
    """Validate configuration parameters after initialization.

    Raises:
        ConfigurationError: If any parameter violates its constraints.

    Note:
        Operates automatically after dataclass __init__ completes. Validates
        all fields and raises ConfigurationError with context on failure.
    """
    if self.max_iterations < 0:
        raise ConfigurationError(
            "max_iterations must be non-negative",
            field="max_iterations",
            value=self.max_iterations,
            constraint=">= 0",
        )

    if self.max_concurrent_evals < 1:
        raise ConfigurationError(
            "max_concurrent_evals must be at least 1",
            field="max_concurrent_evals",
            value=self.max_concurrent_evals,
            constraint=">= 1",
        )

    if self.min_improvement_threshold < 0.0:
        raise ConfigurationError(
            "min_improvement_threshold must be non-negative",
            field="min_improvement_threshold",
            value=self.min_improvement_threshold,
            constraint=">= 0.0",
        )

    if self.patience < 0:
        raise ConfigurationError(
            "patience must be non-negative",
            field="patience",
            value=self.patience,
            constraint=">= 0",
        )

    if not self.reflection_model:
        raise ConfigurationError(
            "reflection_model must be a non-empty string",
            field="reflection_model",
            value=self.reflection_model,
            constraint="non-empty string",
        )

    if not isinstance(self.frontier_type, FrontierType):
        try:
            self.frontier_type = FrontierType(self.frontier_type)
        except ValueError as exc:
            raise ConfigurationError(
                "frontier_type must be a supported FrontierType value",
                field="frontier_type",
                value=self.frontier_type,
                constraint=", ".join(t.value for t in FrontierType),
            ) from exc

    if self.acceptance_metric not in ("sum", "mean"):
        raise ConfigurationError(
            "acceptance_metric must be 'sum' or 'mean'",
            field="acceptance_metric",
            value=self.acceptance_metric,
            constraint="sum|mean",
        )

    if self.max_merge_invocations < 0:
        raise ConfigurationError(
            "max_merge_invocations must be non-negative",
            field="max_merge_invocations",
            value=self.max_merge_invocations,
            constraint=">= 0",
        )

    # Validate reflection_prompt if provided
    self._validate_reflection_prompt()

EvolutionError

Bases: Exception


              flowchart TD
              gepa_adk.EvolutionError[EvolutionError]

              

              click gepa_adk.EvolutionError href "" "gepa_adk.EvolutionError"
            

Base exception for all gepa-adk errors.

All custom exceptions in gepa-adk should inherit from this class to allow for unified exception handling.

Examples:

Catching evolution errors:

from gepa_adk.domain.exceptions import EvolutionError

try:
    raise EvolutionError("Evolution failed unexpectedly")
except EvolutionError as e:
    print(f"Caught: {e}")
# Output: Caught: Evolution failed unexpectedly
Note

Always use this base class or its subclasses for domain errors. Standard Python exceptions should still be raised for programming errors (e.g., TypeError, ValueError for developer mistakes).

Source code in src/gepa_adk/domain/exceptions.py
class EvolutionError(Exception):
    """Base exception for all gepa-adk errors.

    All custom exceptions in gepa-adk should inherit from this class
    to allow for unified exception handling.

    Examples:
        Catching evolution errors:

        ```python
        from gepa_adk.domain.exceptions import EvolutionError

        try:
            raise EvolutionError("Evolution failed unexpectedly")
        except EvolutionError as e:
            print(f"Caught: {e}")
        # Output: Caught: Evolution failed unexpectedly
        ```

    Note:
        Always use this base class or its subclasses for domain errors.
        Standard Python exceptions should still be raised for programming
        errors (e.g., TypeError, ValueError for developer mistakes).
    """

EvolutionResult dataclass

Outcome of a completed evolution run.

Contains the final results after evolution completes, including all evolved component values, performance metrics, and full history.

ATTRIBUTE DESCRIPTION
original_score

Starting performance score (baseline).

TYPE: float

final_score

Ending performance score (best achieved).

TYPE: float

evolved_components

Dictionary mapping component names to their final evolved text values. Keys include "instruction" and optionally "output_schema" or other components. Access individual components via result.evolved_components["instruction"].

TYPE: dict[str, str]

iteration_history

Chronological list of iteration records.

TYPE: list[IterationRecord]

total_iterations

Number of iterations performed.

TYPE: int

valset_score

Score on validation set used for acceptance decisions. None if no validation set was used.

TYPE: float | None

trainset_score

Score on trainset used for reflection diagnostics. None if not computed.

TYPE: float | None

objective_scores

Optional per-example multi-objective scores from the best candidate's final evaluation. None when no objective scores were tracked. Each dict maps objective name to score value. Index-aligned with evaluation batch examples.

TYPE: list[dict[str, float]] | None

Examples:

Creating and analyzing a result:

from gepa_adk.domain.models import EvolutionResult, IterationRecord

result = EvolutionResult(
    original_score=0.60,
    final_score=0.85,
    evolved_components={"instruction": "Be helpful and concise"},
    iteration_history=[],
    total_iterations=10,
)
print(result.evolved_components["instruction"])  # "Be helpful and concise"
print(result.improvement)  # 0.25
print(result.improved)  # True
Note

As a frozen dataclass, EvolutionResult instances cannot be modified.

Source code in src/gepa_adk/domain/models.py
@dataclass(slots=True, frozen=True, kw_only=True)
class EvolutionResult:
    """Outcome of a completed evolution run.

    Contains the final results after evolution completes, including
    all evolved component values, performance metrics, and full history.

    Attributes:
        original_score (float): Starting performance score (baseline).
        final_score (float): Ending performance score (best achieved).
        evolved_components (dict[str, str]): Dictionary mapping component names
            to their final evolved text values. Keys include "instruction" and
            optionally "output_schema" or other components. Access individual
            components via ``result.evolved_components["instruction"]``.
        iteration_history (list[IterationRecord]): Chronological list of
            iteration records.
        total_iterations (int): Number of iterations performed.
        valset_score (float | None): Score on validation set used for
            acceptance decisions. None if no validation set was used.
        trainset_score (float | None): Score on trainset used for reflection
            diagnostics. None if not computed.
        objective_scores (list[dict[str, float]] | None): Optional per-example
            multi-objective scores from the best candidate's final evaluation.
            None when no objective scores were tracked. Each dict maps objective
            name to score value. Index-aligned with evaluation batch examples.

    Examples:
        Creating and analyzing a result:

        ```python
        from gepa_adk.domain.models import EvolutionResult, IterationRecord

        result = EvolutionResult(
            original_score=0.60,
            final_score=0.85,
            evolved_components={"instruction": "Be helpful and concise"},
            iteration_history=[],
            total_iterations=10,
        )
        print(result.evolved_components["instruction"])  # "Be helpful and concise"
        print(result.improvement)  # 0.25
        print(result.improved)  # True
        ```

    Note:
        As a frozen dataclass, EvolutionResult instances cannot be modified.
    """

    original_score: float
    final_score: float
    evolved_components: dict[str, str]
    iteration_history: list[IterationRecord]
    total_iterations: int
    valset_score: float | None = None
    trainset_score: float | None = None
    objective_scores: list[dict[str, float]] | None = None

    @property
    def improvement(self) -> float:
        """Calculate the score improvement from original to final.

        Returns:
            The difference between final_score and original_score.
            Positive values indicate improvement, negative indicates degradation.

        Note:
            Override is not needed since frozen dataclasses support properties.
        """
        return self.final_score - self.original_score

    @property
    def improved(self) -> bool:
        """Check if the final score is better than the original.

        Returns:
            True if final_score > original_score, False otherwise.

        Note:
            Only returns True for strict improvement, not equal scores.
        """
        return self.final_score > self.original_score

improvement property

improvement: float

Calculate the score improvement from original to final.

RETURNS DESCRIPTION
float

The difference between final_score and original_score.

float

Positive values indicate improvement, negative indicates degradation.

Note

Override is not needed since frozen dataclasses support properties.

improved property

improved: bool

Check if the final score is better than the original.

RETURNS DESCRIPTION
bool

True if final_score > original_score, False otherwise.

Note

Only returns True for strict improvement, not equal scores.

FrontierType

Bases: str, Enum


              flowchart TD
              gepa_adk.FrontierType[FrontierType]

              

              click gepa_adk.FrontierType href "" "gepa_adk.FrontierType"
            

Supported frontier tracking strategies for Pareto selection.

Note

All four frontier types enable different Pareto dominance tracking strategies for multi-objective optimization.

Examples:

frontier_type = FrontierType.INSTANCE
Source code in src/gepa_adk/domain/types.py
class FrontierType(str, Enum):
    """Supported frontier tracking strategies for Pareto selection.

    Note:
        All four frontier types enable different Pareto dominance tracking
        strategies for multi-objective optimization.

    Examples:
        ```python
        frontier_type = FrontierType.INSTANCE
        ```
    """

    INSTANCE = "instance"
    OBJECTIVE = "objective"
    HYBRID = "hybrid"
    CARTESIAN = "cartesian"

IterationRecord dataclass

Captures metrics for a single evolution iteration.

This is an immutable record of what happened during one iteration of the evolution process. Records are created by the engine and stored in EvolutionResult.iteration_history.

ATTRIBUTE DESCRIPTION
iteration_number

1-indexed iteration number for human readability.

TYPE: int

score

Score achieved in this iteration (typically in [0.0, 1.0]).

TYPE: float

component_text

The component_text that was evaluated in this iteration (e.g., the instruction text for the "instruction" component).

TYPE: str

evolved_component

The name of the component that was evolved in this iteration (e.g., "instruction", "output_schema"). Used for tracking which component changed in round-robin evolution strategies.

TYPE: str

accepted

Whether this proposal was accepted as the new best.

TYPE: bool

objective_scores

Optional per-example multi-objective scores from the valset evaluation. None when adapter does not provide objective scores. Each dict maps objective name to score value. Index-aligned with evaluation batch examples.

TYPE: list[dict[str, float]] | None

Examples:

Creating an iteration record:

from gepa_adk.domain.models import IterationRecord

record = IterationRecord(
    iteration_number=1,
    score=0.85,
    component_text="Be helpful",
    evolved_component="instruction",
    accepted=True,
)
print(record.score)  # 0.85
print(record.evolved_component)  # "instruction"
print(record.accepted)  # True
Note

An immutable record that captures iteration metrics. Once created, IterationRecord instances cannot be modified, ensuring historical accuracy of the evolution trace.

Source code in src/gepa_adk/domain/models.py
@dataclass(slots=True, frozen=True, kw_only=True)
class IterationRecord:
    """Captures metrics for a single evolution iteration.

    This is an immutable record of what happened during one iteration
    of the evolution process. Records are created by the engine and
    stored in EvolutionResult.iteration_history.

    Attributes:
        iteration_number (int): 1-indexed iteration number for human
            readability.
        score (float): Score achieved in this iteration (typically in
            [0.0, 1.0]).
        component_text (str): The component_text that was evaluated in this
            iteration (e.g., the instruction text for the "instruction" component).
        evolved_component (str): The name of the component that was evolved
            in this iteration (e.g., "instruction", "output_schema"). Used for
            tracking which component changed in round-robin evolution strategies.
        accepted (bool): Whether this proposal was accepted as the new best.
        objective_scores (list[dict[str, float]] | None): Optional per-example
            multi-objective scores from the valset evaluation. None when adapter
            does not provide objective scores. Each dict maps objective name to
            score value. Index-aligned with evaluation batch examples.

    Examples:
        Creating an iteration record:

        ```python
        from gepa_adk.domain.models import IterationRecord

        record = IterationRecord(
            iteration_number=1,
            score=0.85,
            component_text="Be helpful",
            evolved_component="instruction",
            accepted=True,
        )
        print(record.score)  # 0.85
        print(record.evolved_component)  # "instruction"
        print(record.accepted)  # True
        ```

    Note:
        An immutable record that captures iteration metrics. Once created,
        IterationRecord instances cannot be modified, ensuring historical
        accuracy of the evolution trace.
    """

    iteration_number: int
    score: float
    component_text: str
    evolved_component: str
    accepted: bool
    objective_scores: list[dict[str, float]] | None = None

MultiAgentEvolutionResult dataclass

Outcome of a completed multi-agent evolution run.

Contains evolved component_text for all agents in the group, along with performance metrics and evolution history.

ATTRIBUTE DESCRIPTION
evolved_components

Mapping of agent name to evolved component_text.

TYPE: dict[str, str]

original_score

Starting performance score (baseline).

TYPE: float

final_score

Ending performance score (best achieved).

TYPE: float

primary_agent

Name of the agent whose output was used for scoring.

TYPE: str

iteration_history

Chronological list of iteration records.

TYPE: list[IterationRecord]

total_iterations

Number of iterations performed.

TYPE: int

Examples:

Creating and analyzing a multi-agent result:

from gepa_adk.domain.models import MultiAgentEvolutionResult, IterationRecord

result = MultiAgentEvolutionResult(
    evolved_components={
        "generator": "Generate high-quality code",
        "critic": "Review code thoroughly",
    },
    original_score=0.60,
    final_score=0.85,
    primary_agent="generator",
    iteration_history=[],
    total_iterations=10,
)
print(result.improvement)  # 0.25
print(result.improved)  # True
print(result.agent_names)  # ["critic", "generator"]
Note

An immutable result container for multi-agent evolution. Once created, MultiAgentEvolutionResult instances cannot be modified. Use computed properties like improvement, improved, and agent_names to analyze results without modifying the underlying data.

Source code in src/gepa_adk/domain/models.py
@dataclass(slots=True, frozen=True, kw_only=True)
class MultiAgentEvolutionResult:
    """Outcome of a completed multi-agent evolution run.

    Contains evolved component_text for all agents in the group,
    along with performance metrics and evolution history.

    Attributes:
        evolved_components (dict[str, str]): Mapping of agent name to evolved
            component_text.
        original_score (float): Starting performance score (baseline).
        final_score (float): Ending performance score (best achieved).
        primary_agent (str): Name of the agent whose output was used for scoring.
        iteration_history (list[IterationRecord]): Chronological list of iteration records.
        total_iterations (int): Number of iterations performed.

    Examples:
        Creating and analyzing a multi-agent result:

        ```python
        from gepa_adk.domain.models import MultiAgentEvolutionResult, IterationRecord

        result = MultiAgentEvolutionResult(
            evolved_components={
                "generator": "Generate high-quality code",
                "critic": "Review code thoroughly",
            },
            original_score=0.60,
            final_score=0.85,
            primary_agent="generator",
            iteration_history=[],
            total_iterations=10,
        )
        print(result.improvement)  # 0.25
        print(result.improved)  # True
        print(result.agent_names)  # ["critic", "generator"]
        ```

    Note:
        An immutable result container for multi-agent evolution. Once created,
        MultiAgentEvolutionResult instances cannot be modified. Use computed
        properties like `improvement`, `improved`, and `agent_names` to analyze
        results without modifying the underlying data.
    """

    evolved_components: dict[str, str]
    original_score: float
    final_score: float
    primary_agent: str
    iteration_history: list[IterationRecord]
    total_iterations: int

    @property
    def improvement(self) -> float:
        """Calculate the score improvement from original to final.

        Returns:
            The difference between final_score and original_score.
            Positive values indicate improvement, negative indicates degradation.

        Note:
            Override is not needed since frozen dataclasses support properties.
        """
        return self.final_score - self.original_score

    @property
    def improved(self) -> bool:
        """Check if the final score is better than the original.

        Returns:
            True if final_score > original_score, False otherwise.

        Note:
            Only returns True for strict improvement, not equal scores.
        """
        return self.final_score > self.original_score

    @property
    def agent_names(self) -> list[str]:
        """Get sorted list of evolved agent names.

        Returns:
            Sorted list of agent names from evolved_components keys.

        Note:
            Outputs a new list each time, sorted alphabetically for
            consistent ordering regardless of insertion order.
        """
        return sorted(self.evolved_components.keys())

improvement property

improvement: float

Calculate the score improvement from original to final.

RETURNS DESCRIPTION
float

The difference between final_score and original_score.

float

Positive values indicate improvement, negative indicates degradation.

Note

Override is not needed since frozen dataclasses support properties.

improved property

improved: bool

Check if the final score is better than the original.

RETURNS DESCRIPTION
bool

True if final_score > original_score, False otherwise.

Note

Only returns True for strict improvement, not equal scores.

agent_names property

agent_names: list[str]

Get sorted list of evolved agent names.

RETURNS DESCRIPTION
list[str]

Sorted list of agent names from evolved_components keys.

Note

Outputs a new list each time, sorted alphabetically for consistent ordering regardless of insertion order.

SchemaConstraints dataclass

Constraints for output schema evolution.

Controls which fields must be preserved during schema evolution, including field existence and type constraints. Used by the OutputSchemaHandler to validate proposed schema mutations.

ATTRIBUTE DESCRIPTION
required_fields

Field names that must exist in evolved schemas. Mutations removing these fields are rejected.

TYPE: tuple[str, ...]

preserve_types

Mapping of field names to allowed type(s). Mutations changing a field's type to an incompatible type are rejected.

TYPE: dict[str, type | tuple[type, ...]]

Examples:

Preserve required fields only:

from gepa_adk.domain.types import SchemaConstraints

constraints = SchemaConstraints(
    required_fields=("score", "feedback"),
)

Preserve required fields with type constraints:

constraints = SchemaConstraints(
    required_fields=("score",),
    preserve_types={
        "score": (float, int),  # Allow numeric types
        "order_id": str,  # Must stay string
    },
)
Note

A frozen dataclass ensures immutability during evolution runs. Configuration is validated at evolution start.

Source code in src/gepa_adk/domain/types.py
@dataclass(frozen=True, slots=True)
class SchemaConstraints:
    """Constraints for output schema evolution.

    Controls which fields must be preserved during schema evolution,
    including field existence and type constraints. Used by the
    OutputSchemaHandler to validate proposed schema mutations.

    Attributes:
        required_fields (tuple[str, ...]): Field names that must exist
            in evolved schemas. Mutations removing these fields are rejected.
        preserve_types (dict[str, type | tuple[type, ...]]): Mapping of
            field names to allowed type(s). Mutations changing a field's
            type to an incompatible type are rejected.

    Examples:
        Preserve required fields only:

        ```python
        from gepa_adk.domain.types import SchemaConstraints

        constraints = SchemaConstraints(
            required_fields=("score", "feedback"),
        )
        ```

        Preserve required fields with type constraints:

        ```python
        constraints = SchemaConstraints(
            required_fields=("score",),
            preserve_types={
                "score": (float, int),  # Allow numeric types
                "order_id": str,  # Must stay string
            },
        )
        ```

    Note:
        A frozen dataclass ensures immutability during evolution runs.
        Configuration is validated at evolution start.
    """

    required_fields: tuple[str, ...] = ()
    preserve_types: dict[str, type | tuple[type, ...]] = field(default_factory=dict)

TrajectoryConfig dataclass

Configuration for trajectory extraction behavior.

Controls which components are extracted from ADK event streams, whether sensitive data should be redacted, and whether large values should be truncated.

Note

All configuration fields are immutable after instantiation, ensuring consistent extraction behavior throughout evolution.

ATTRIBUTE DESCRIPTION
include_tool_calls

Extract tool/function call records. Defaults to True.

TYPE: bool

include_state_deltas

Extract session state changes. Defaults to True.

TYPE: bool

include_token_usage

Extract LLM token consumption metrics. Defaults to True.

TYPE: bool

redact_sensitive

Apply sensitive data redaction. When True, fields matching sensitive_keys will be replaced with "[REDACTED]". Defaults to True for secure-by-default behavior.

TYPE: bool

sensitive_keys

Field names to redact via exact match. Case-sensitive. Defaults to ("password", "api_key", "token").

TYPE: tuple[str, ...]

max_string_length

Truncate strings longer than this with a marker indicating truncation. None disables truncation. Defaults to 10000 characters.

TYPE: int | None

Examples:

Default configuration (secure, with truncation):

config = TrajectoryConfig()  # All features enabled, redaction ON

Minimal configuration (tool calls only):

config = TrajectoryConfig(
    include_tool_calls=True,
    include_state_deltas=False,
    include_token_usage=False,
    redact_sensitive=False,
)

Custom sensitive keys and truncation:

config = TrajectoryConfig(
    sensitive_keys=("password", "api_key", "token", "ssn"),
    max_string_length=5000,  # Truncate DOM/screenshots earlier
)

Disable truncation (keep full values):

config = TrajectoryConfig(max_string_length=None)
Note

Redaction takes precedence over truncation. Sensitive values are replaced with "[REDACTED]" first, then truncation applies to the remaining (non-sensitive) strings.

Source code in src/gepa_adk/domain/types.py
@dataclass(frozen=True, slots=True)
class TrajectoryConfig:
    """Configuration for trajectory extraction behavior.

    Controls which components are extracted from ADK event streams,
    whether sensitive data should be redacted, and whether large
    values should be truncated.

    Note:
        All configuration fields are immutable after instantiation,
        ensuring consistent extraction behavior throughout evolution.

    Attributes:
        include_tool_calls (bool): Extract tool/function call records.
            Defaults to True.
        include_state_deltas (bool): Extract session state changes.
            Defaults to True.
        include_token_usage (bool): Extract LLM token consumption metrics.
            Defaults to True.
        redact_sensitive (bool): Apply sensitive data redaction. When True,
            fields matching sensitive_keys will be replaced with "[REDACTED]".
            Defaults to True for secure-by-default behavior.
        sensitive_keys (tuple[str, ...]): Field names to redact via exact
            match. Case-sensitive. Defaults to ("password", "api_key", "token").
        max_string_length (int | None): Truncate strings longer than this
            with a marker indicating truncation. None disables truncation.
            Defaults to 10000 characters.

    Examples:
        Default configuration (secure, with truncation):

        ```python
        config = TrajectoryConfig()  # All features enabled, redaction ON
        ```

        Minimal configuration (tool calls only):

        ```python
        config = TrajectoryConfig(
            include_tool_calls=True,
            include_state_deltas=False,
            include_token_usage=False,
            redact_sensitive=False,
        )
        ```

        Custom sensitive keys and truncation:

        ```python
        config = TrajectoryConfig(
            sensitive_keys=("password", "api_key", "token", "ssn"),
            max_string_length=5000,  # Truncate DOM/screenshots earlier
        )
        ```

        Disable truncation (keep full values):

        ```python
        config = TrajectoryConfig(max_string_length=None)
        ```

    Note:
        Redaction takes precedence over truncation. Sensitive values are
        replaced with "[REDACTED]" first, then truncation applies to the
        remaining (non-sensitive) strings.
    """

    include_tool_calls: bool = True
    include_state_deltas: bool = True
    include_token_usage: bool = True
    redact_sensitive: bool = True
    sensitive_keys: tuple[str, ...] = ("password", "api_key", "token")
    max_string_length: int | None = 10000

VideoValidationError

Bases: ConfigurationError


              flowchart TD
              gepa_adk.VideoValidationError[VideoValidationError]
              gepa_adk.domain.exceptions.ConfigurationError[ConfigurationError]
              gepa_adk.domain.exceptions.EvolutionError[EvolutionError]

                              gepa_adk.domain.exceptions.ConfigurationError --> gepa_adk.VideoValidationError
                                gepa_adk.domain.exceptions.EvolutionError --> gepa_adk.domain.exceptions.ConfigurationError
                



              click gepa_adk.VideoValidationError href "" "gepa_adk.VideoValidationError"
              click gepa_adk.domain.exceptions.ConfigurationError href "" "gepa_adk.domain.exceptions.ConfigurationError"
              click gepa_adk.domain.exceptions.EvolutionError href "" "gepa_adk.domain.exceptions.EvolutionError"
            

Raised when video file validation fails.

This exception is raised during video file processing when a video file does not exist, exceeds size limits, or has an invalid MIME type.

ATTRIBUTE DESCRIPTION
video_path

The path to the video file that failed validation.

TYPE: str

field

The configuration field name (default "video").

TYPE: str

constraint

Description of the validation constraint that was violated.

TYPE: str

Examples:

Raising a video validation error:

from gepa_adk.domain.exceptions import VideoValidationError

raise VideoValidationError(
    "Video file not found",
    video_path="/path/to/missing.mp4",
    constraint="file must exist",
)

Handling video validation errors:

from gepa_adk.domain.exceptions import VideoValidationError

try:
    await video_service.prepare_video_parts(["/bad/path.mp4"])
except VideoValidationError as e:
    print(f"Invalid video: {e.video_path}")
    print(f"Constraint violated: {e.constraint}")
Note

Arises from video file validation failures during multimodal input processing. File existence, size limits (2GB), and MIME type (video/*) are validated before loading video content.

Source code in src/gepa_adk/domain/exceptions.py
class VideoValidationError(ConfigurationError):
    """Raised when video file validation fails.

    This exception is raised during video file processing when a video
    file does not exist, exceeds size limits, or has an invalid MIME type.

    Attributes:
        video_path (str): The path to the video file that failed validation.
        field (str): The configuration field name (default "video").
        constraint (str): Description of the validation constraint that was violated.

    Examples:
        Raising a video validation error:

        ```python
        from gepa_adk.domain.exceptions import VideoValidationError

        raise VideoValidationError(
            "Video file not found",
            video_path="/path/to/missing.mp4",
            constraint="file must exist",
        )
        ```

        Handling video validation errors:

        ```python
        from gepa_adk.domain.exceptions import VideoValidationError

        try:
            await video_service.prepare_video_parts(["/bad/path.mp4"])
        except VideoValidationError as e:
            print(f"Invalid video: {e.video_path}")
            print(f"Constraint violated: {e.constraint}")
        ```

    Note:
        Arises from video file validation failures during multimodal input
        processing. File existence, size limits (2GB), and MIME type
        (video/*) are validated before loading video content.
    """

    def __init__(
        self,
        message: str,
        *,
        video_path: str,
        field: str = "video",
        constraint: str,
    ) -> None:
        """Initialize VideoValidationError with video file context.

        Args:
            message: Human-readable error description.
            video_path: The path to the video file that failed validation.
            field: Name of the configuration field (default "video").
            constraint: Description of the validation constraint violated.

        Note:
            Context fields use keyword-only syntax to ensure explicit labeling
            and prevent positional argument mistakes.
        """
        super().__init__(message, field=field, value=video_path, constraint=constraint)
        self.video_path = video_path

    def __str__(self) -> str:
        """Return string representation with video path context.

        Returns:
            Formatted error message including video_path and constraint.

        Note:
            Outputs formatted error message with video_path for easy
            identification of the problematic file in error logs.
        """
        base = Exception.__str__(self)
        return (
            f"{base} [video_path={self.video_path!r}, constraint={self.constraint!r}]"
        )

__init__

__init__(
    message: str,
    *,
    video_path: str,
    field: str = "video",
    constraint: str,
) -> None

Initialize VideoValidationError with video file context.

PARAMETER DESCRIPTION
message

Human-readable error description.

TYPE: str

video_path

The path to the video file that failed validation.

TYPE: str

field

Name of the configuration field (default "video").

TYPE: str DEFAULT: 'video'

constraint

Description of the validation constraint violated.

TYPE: str

Note

Context fields use keyword-only syntax to ensure explicit labeling and prevent positional argument mistakes.

Source code in src/gepa_adk/domain/exceptions.py
def __init__(
    self,
    message: str,
    *,
    video_path: str,
    field: str = "video",
    constraint: str,
) -> None:
    """Initialize VideoValidationError with video file context.

    Args:
        message: Human-readable error description.
        video_path: The path to the video file that failed validation.
        field: Name of the configuration field (default "video").
        constraint: Description of the validation constraint violated.

    Note:
        Context fields use keyword-only syntax to ensure explicit labeling
        and prevent positional argument mistakes.
    """
    super().__init__(message, field=field, value=video_path, constraint=constraint)
    self.video_path = video_path

__str__

__str__() -> str

Return string representation with video path context.

RETURNS DESCRIPTION
str

Formatted error message including video_path and constraint.

Note

Outputs formatted error message with video_path for easy identification of the problematic file in error logs.

Source code in src/gepa_adk/domain/exceptions.py
def __str__(self) -> str:
    """Return string representation with video path context.

    Returns:
        Formatted error message including video_path and constraint.

    Note:
        Outputs formatted error message with video_path for easy
        identification of the problematic file in error logs.
    """
    base = Exception.__str__(self)
    return (
        f"{base} [video_path={self.video_path!r}, constraint={self.constraint!r}]"
    )

AsyncGEPAEngine

Bases: Generic[DataInst, Trajectory, RolloutOutput]


              flowchart TD
              gepa_adk.AsyncGEPAEngine[AsyncGEPAEngine]

              

              click gepa_adk.AsyncGEPAEngine href "" "gepa_adk.AsyncGEPAEngine"
            

Async evolution engine orchestrating the GEPA loop.

This engine executes the core evolution algorithm: 1. Evaluate baseline candidate 2. For each iteration until max_iterations or convergence: a. Generate reflective dataset from traces b. Propose new candidate text c. Evaluate proposal d. Accept if improves above threshold e. Record iteration 3. Return frozen EvolutionResult

ATTRIBUTE DESCRIPTION
adapter

Implementation of AsyncGEPAAdapter protocol.

TYPE: AsyncGEPAAdapter

config

Evolution parameters.

TYPE: EvolutionConfig

Examples:

Basic usage:

from gepa_adk.engine import AsyncGEPAEngine
from gepa_adk.domain.models import EvolutionConfig, Candidate

engine = AsyncGEPAEngine(
    adapter=my_adapter,
    config=EvolutionConfig(max_iterations=50),
    initial_candidate=Candidate(components={"instruction": "Be helpful"}),
    batch=training_data,
)
result = await engine.run()
print(f"Final score: {result.final_score}")
Note

Avoid reusing engine instances after run() completes.

Source code in src/gepa_adk/engine/async_engine.py
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
class AsyncGEPAEngine(Generic[DataInst, Trajectory, RolloutOutput]):
    """Async evolution engine orchestrating the GEPA loop.

    This engine executes the core evolution algorithm:
    1. Evaluate baseline candidate
    2. For each iteration until max_iterations or convergence:
       a. Generate reflective dataset from traces
       b. Propose new candidate text
       c. Evaluate proposal
       d. Accept if improves above threshold
       e. Record iteration
    3. Return frozen EvolutionResult

    Attributes:
        adapter (AsyncGEPAAdapter): Implementation of AsyncGEPAAdapter protocol.
        config (EvolutionConfig): Evolution parameters.

    Examples:
        Basic usage:

        ```python
        from gepa_adk.engine import AsyncGEPAEngine
        from gepa_adk.domain.models import EvolutionConfig, Candidate

        engine = AsyncGEPAEngine(
            adapter=my_adapter,
            config=EvolutionConfig(max_iterations=50),
            initial_candidate=Candidate(components={"instruction": "Be helpful"}),
            batch=training_data,
        )
        result = await engine.run()
        print(f"Final score: {result.final_score}")
        ```

    Note:
        Avoid reusing engine instances after run() completes.
    """

    def __init__(
        self,
        adapter: AsyncGEPAAdapter[DataInst, Trajectory, RolloutOutput],
        config: EvolutionConfig,
        initial_candidate: Candidate,
        batch: list[DataInst],
        valset: list[DataInst] | None = None,
        candidate_selector: CandidateSelectorProtocol | None = None,
        component_selector: ComponentSelectorProtocol | None = None,
        evaluation_policy: EvaluationPolicyProtocol | None = None,
        merge_proposer: ProposerProtocol | None = None,
    ) -> None:
        """Initialize the evolution engine.

        Args:
            adapter: Implementation of AsyncGEPAAdapter protocol for evaluation
                and proposal generation.
            config: Evolution parameters controlling iterations, thresholds,
                and early stopping.
            initial_candidate: Starting candidate with 'instruction' component.
            batch: Trainset data instances for reflection and mutation.
            valset: Optional validation data for scoring candidates. Defaults
                to trainset when omitted.
            candidate_selector: Optional selector strategy for Pareto-aware
                candidate sampling.
            component_selector: Optional selector strategy for choosing which
                components to update. Defaults to RoundRobinComponentSelector.
            evaluation_policy: Optional policy for selecting which validation
                examples to evaluate per iteration. Defaults to FullEvaluationPolicy.
            merge_proposer: Optional proposer for merge operations. If provided
                and config.use_merge is True, merge proposals will be attempted
                after successful mutations.

        Raises:
            ValueError: If batch is empty or initial_candidate lacks 'instruction'.
            ConfigurationError: If config validation fails (via EvolutionConfig).

        Examples:
            Creating an engine:

            ```python
            engine = AsyncGEPAEngine(
                adapter=my_adapter,
                config=EvolutionConfig(max_iterations=50),
                initial_candidate=Candidate(components={"instruction": "Be helpful"}),
                batch=training_data,
                candidate_selector=selector,
            )
            ```

        Note:
            Configures trainset and valset routing for reflection and scoring.
        """
        # Validation
        if len(batch) == 0:
            raise ValueError("batch must contain at least one data instance")
        if valset is not None and len(valset) == 0:
            raise ValueError(
                "valset must contain at least one validation data instance"
            )

        if not initial_candidate.components:
            raise ValueError("initial_candidate must have at least one component")

        # Store dependencies
        self.adapter = adapter
        self.config = config
        self._initial_candidate = initial_candidate
        self._trainset = batch
        self._valset = valset if valset is not None else batch
        self._state: _EngineState | None = None
        self._candidate_selector = candidate_selector
        self._component_selector = component_selector or RoundRobinComponentSelector()
        self._pareto_state: ParetoState | None = None
        self._candidate_eval_batches: dict[int, EvaluationBatch] = {}
        self._merge_proposer = merge_proposer
        self._merges_due: int = 0
        self._merge_invocations: int = 0
        # Stopper state tracking (T001, T002)
        self._start_time: float | None = None
        self._total_evaluations: int = 0
        self._active_stoppers: list[object] = []
        # Import here to avoid circular dependency
        if evaluation_policy is None:
            from gepa_adk.adapters.evaluation_policy import FullEvaluationPolicy

            self._evaluation_policy: EvaluationPolicyProtocol = FullEvaluationPolicy()
        else:
            self._evaluation_policy = evaluation_policy

    def _aggregate_acceptance_score(self, scores: list[float]) -> float:
        """Aggregate scores for acceptance decisions based on acceptance_metric.

        Args:
            scores: List of per-example scores from evaluation batch.

        Returns:
            Aggregated acceptance score (sum or mean based on config).

        Raises:
            InvalidScoreListError: If scores list is empty or contains
                non-finite values.

        Note:
            Sums or averages acceptance scores after validating they are
            non-empty and finite. Uses sum or mean based on config.acceptance_metric.
        """
        # Validate scores are non-empty
        if not scores:
            raise InvalidScoreListError(
                "Cannot aggregate acceptance score from empty score list",
                scores=scores,
                reason="empty",
            )

        # Validate scores are finite
        if not all(math.isfinite(score) for score in scores):
            raise InvalidScoreListError(
                "Cannot aggregate acceptance score from non-finite values (NaN/inf)",
                scores=scores,
                reason="non-finite",
            )

        # Aggregate based on acceptance_metric
        if self.config.acceptance_metric == "sum":
            return sum(scores)
        else:  # acceptance_metric == "mean"
            return sum(scores) / len(scores)

    def _setup_stoppers(self) -> list[object]:
        """Call setup() on stoppers that have lifecycle methods.

        Returns:
            List of stoppers that had setup() called (for cleanup in reverse order).

        Note:
            Only invokes setup() on stoppers implementing the lifecycle method.
            Stoppers that fail setup() are excluded from stop_callbacks for the
            remainder of execution to prevent inconsistent state.
        """
        setup_stoppers: list[object] = []
        active_stoppers: list[object] = []
        stop_callbacks = self.config.stop_callbacks
        if stop_callbacks:
            for stopper in stop_callbacks:
                setup_method = getattr(stopper, "setup", None)
                if setup_method is not None and callable(setup_method):
                    try:
                        setup_method()
                        setup_stoppers.append(stopper)
                        active_stoppers.append(stopper)
                    except Exception:
                        logger.exception(
                            "stopper.setup_error",
                            stopper=type(stopper).__name__,
                        )
                        # Stopper excluded from active list due to setup failure
                else:
                    # Stopper has no setup() method, still active
                    active_stoppers.append(stopper)
        # Store active stoppers for _should_stop() to use
        self._active_stoppers = active_stoppers
        return setup_stoppers

    def _cleanup_stoppers(self, setup_stoppers: list[object]) -> None:
        """Call cleanup() on stoppers in reverse order of setup.

        Args:
            setup_stoppers: List of stoppers that had setup() called.

        Note:
            Observes reverse-order cleanup contract (T025).
            If cleanup() raises, logs error and continues (T026).
        """
        for stopper in reversed(setup_stoppers):
            cleanup_method = getattr(stopper, "cleanup", None)
            if cleanup_method is not None and callable(cleanup_method):
                try:
                    cleanup_method()
                except Exception:
                    logger.exception(
                        "stopper.cleanup_error",
                        stopper=type(stopper).__name__,
                    )

    def _build_stopper_state(self) -> StopperState:
        """Build a StopperState snapshot from current engine state.

        Constructs an immutable snapshot of evolution state for stopper
        callbacks to evaluate. Captures all metrics needed by stoppers
        including elapsed time and total evaluations.

        Returns:
            Frozen StopperState containing current iteration, best score,
            stagnation counter, total evaluations, candidates count, and
            elapsed time.

        Note:
            Obtains elapsed_seconds from monotonic time since run() started.
            Uses zero if _start_time has not yet been set.
        """
        assert self._state is not None, "Engine state not initialized"
        elapsed = (
            time.monotonic() - self._start_time if self._start_time is not None else 0.0
        )
        candidates_count = (
            len(self._pareto_state.candidates) if self._pareto_state is not None else 0
        )
        return StopperState(
            iteration=self._state.iteration,
            best_score=self._state.best_score,
            stagnation_counter=self._state.stagnation_counter,
            total_evaluations=self._total_evaluations,
            candidates_count=candidates_count,
            elapsed_seconds=elapsed,
        )

    @property
    def pareto_state(self) -> ParetoState | None:
        """Return the current Pareto state, if initialized."""
        return self._pareto_state

    def _build_component_list(self, candidate: Candidate) -> list[str]:
        """Build list of available component keys from candidate.

        Excludes generic 'instruction' alias if agent-specific keys exist
        (e.g., 'agent1_instruction').

        Args:
            candidate: Candidate to extract component keys from.

        Returns:
            List of component keys to consider for update.

        Note:
            Selects component keys, filtering out the default component name when
            more specific per-agent component keys are present.
        """
        keys = list(candidate.components.keys())
        if len(keys) > 1 and DEFAULT_COMPONENT_NAME in keys:
            # If multiple keys exist, assume default component might be an alias/proxy
            # or simply one of many.
            # For now, simplistic rule: if other keys exist, exclude default.
            return [k for k in keys if k != DEFAULT_COMPONENT_NAME]
        return keys

    async def _initialize_baseline(self) -> None:
        """Initialize baseline evaluation.

        Evaluates the initial candidate on trainset for reflection and
        on valset for scoring. Caches the reflection batch for use in
        the first mutation proposal.

        Note:
            Sets up both reflection and scoring baselines up front.
        """
        # Create pareto_state before evaluation if candidate_selector exists
        # so that _evaluate_scoring can use evaluation_policy
        if self._candidate_selector is not None:
            self._pareto_state = ParetoState(frontier_type=self.config.frontier_type)

        reflection_batch = await self.adapter.evaluate(
            self._trainset,
            self._initial_candidate.components,
            capture_traces=True,
        )
        self._total_evaluations += len(reflection_batch.scores)
        # Use _evaluate_scoring for baseline to get eval_indices
        (
            baseline_score,
            scoring_batch,
            baseline_eval_indices,
        ) = await self._evaluate_scoring(self._initial_candidate)
        baseline_reflection_score = sum(reflection_batch.scores) / len(
            reflection_batch.scores
        )
        baseline_valset_mean = (
            sum(scoring_batch.scores) / len(scoring_batch.scores)
            if scoring_batch.scores
            else 0.0
        )
        self._state = _EngineState(
            best_candidate=self._initial_candidate,
            best_score=baseline_score,
            original_score=baseline_score,
            iteration=0,
            stagnation_counter=0,
            iteration_history=[],
            last_eval_batch=reflection_batch,
            best_reflection_score=baseline_reflection_score,
            best_valset_mean=baseline_valset_mean,
            best_objective_scores=scoring_batch.objective_scores,
        )
        if self._candidate_selector is not None:
            # Prepare objective scores for baseline if needed
            objective_scores: dict[str, float] | None = None
            per_example_objective_scores: dict[int, dict[str, float]] | None = None

            if scoring_batch.objective_scores is not None:
                from statistics import fmean

                if self.config.frontier_type in (
                    FrontierType.OBJECTIVE,
                    FrontierType.HYBRID,
                ):
                    objective_scores_by_name: dict[str, list[float]] = {}
                    for obj_scores in scoring_batch.objective_scores:
                        for obj_name, obj_score in obj_scores.items():
                            objective_scores_by_name.setdefault(obj_name, []).append(
                                obj_score
                            )
                    objective_scores = {
                        obj_name: fmean(scores)
                        for obj_name, scores in objective_scores_by_name.items()
                    }

                if self.config.frontier_type == FrontierType.CARTESIAN:
                    per_example_objective_scores = {
                        baseline_eval_indices[i]: scoring_batch.objective_scores[i]
                        for i in range(len(baseline_eval_indices))
                    }
                    objective_scores_by_name: dict[str, list[float]] = {}
                    for obj_scores in scoring_batch.objective_scores:
                        for obj_name, obj_score in obj_scores.items():
                            objective_scores_by_name.setdefault(obj_name, []).append(
                                obj_score
                            )
                    objective_scores = {
                        obj_name: fmean(scores)
                        for obj_name, scores in objective_scores_by_name.items()
                    }

            assert self._pareto_state is not None, "Pareto state not initialized"
            candidate_idx = self._pareto_state.add_candidate(
                self._initial_candidate,
                scoring_batch.scores,
                score_indices=baseline_eval_indices,
                objective_scores=objective_scores,
                per_example_objective_scores=per_example_objective_scores,
                logger=logger,
            )
            self._candidate_eval_batches[candidate_idx] = reflection_batch

    async def _evaluate_reflection(
        self, candidate: Candidate
    ) -> tuple[float, EvaluationBatch]:
        """Evaluate a candidate on the trainset for reflection.

        Args:
            candidate: Candidate to evaluate.

        Returns:
            Tuple of (mean score across trainset examples, evaluation batch).

        Note:
            Supplies trajectories for reflective dataset construction.
        """
        eval_batch = await self.adapter.evaluate(
            self._trainset,
            candidate.components,
            capture_traces=True,
        )
        self._total_evaluations += len(eval_batch.scores)
        score = sum(eval_batch.scores) / len(eval_batch.scores)
        return score, eval_batch

    async def _evaluate_scoring(
        self, candidate: Candidate
    ) -> tuple[float, EvaluationBatch, list[int]]:
        """Evaluate a candidate on the valset for scoring decisions.

        Args:
            candidate: Candidate to evaluate on the validation set.

        Returns:
            Tuple of (aggregated acceptance score, evaluation batch, eval_indices).
            Score is aggregated using acceptance_metric (sum or mean).
            eval_indices are the valset indices that were actually evaluated.

        Note:
            Supplies scores without traces for acceptance decisions.
            Aggregation method (sum/mean) is determined by config.acceptance_metric.
            Uses evaluation_policy to determine which examples to evaluate.
        """
        # Get indices to evaluate from evaluation policy
        valset_ids = list(range(len(self._valset)))
        if self._pareto_state is not None:
            eval_indices = self._evaluation_policy.get_eval_batch(
                valset_ids, self._pareto_state
            )
        else:
            # Fallback to all indices if no pareto state yet
            eval_indices = valset_ids

        # Filter valset to only include selected indices
        is_full_eval = len(eval_indices) == len(valset_ids) and set(
            eval_indices
        ) == set(valset_ids)
        eval_valset = (
            self._valset if is_full_eval else [self._valset[i] for i in eval_indices]
        )

        eval_batch = await self.adapter.evaluate(
            eval_valset,
            candidate.components,
            capture_traces=False,
        )
        self._total_evaluations += len(eval_batch.scores)
        score = self._aggregate_acceptance_score(eval_batch.scores)
        return score, eval_batch, eval_indices

    async def _propose_mutation(self) -> tuple[Candidate, list[str]]:
        """Propose a new candidate via reflective mutation.

        Uses the cached evaluation batch from the most recent best candidate
        evaluation to generate the reflective dataset, avoiding redundant
        adapter calls.

        Returns:
            Tuple of (new candidate with proposed component updates,
            list of component names that were updated).

        Note:
            Spawns a new candidate with updated components based on reflective
            dataset analysis and component selector strategy.
        """
        assert self._state is not None, "Engine state not initialized"
        assert self._state.last_eval_batch is not None, "No eval batch cached"

        selected_candidate = self._state.best_candidate
        selected_idx: int | None = None
        eval_batch = self._state.last_eval_batch

        if self._candidate_selector is not None and self._pareto_state is not None:
            try:
                selected_idx = await self._candidate_selector.select_candidate(
                    self._pareto_state
                )
                selected_candidate = self._pareto_state.candidates[selected_idx]
                eval_batch = self._candidate_eval_batches.get(selected_idx)
                logger.info(
                    "pareto_selection.mutation_parent_selected",
                    candidate_idx=selected_idx,
                    iteration=self._state.iteration,
                    selector_type=type(self._candidate_selector).__name__,
                )
            except NoCandidateAvailableError as exc:
                logger.info(
                    "pareto_selection.empty_frontier_fallback",
                    iteration=self._state.iteration,
                    selector_type=type(self._candidate_selector).__name__,
                    error=str(exc),
                )
                eval_batch = self._state.last_eval_batch

        if eval_batch is None:
            eval_batch = await self.adapter.evaluate(
                self._trainset,
                selected_candidate.components,
                capture_traces=True,
            )
            self._total_evaluations += len(eval_batch.scores)
            if selected_idx is not None:
                self._candidate_eval_batches[selected_idx] = eval_batch

        # Build component list
        available_components = self._build_component_list(selected_candidate)

        # Select components to update
        components_to_update = await self._component_selector.select_components(
            components=available_components,
            iteration=self._state.iteration,
            candidate_idx=selected_idx if selected_idx is not None else 0,
        )

        logger.info(
            "mutation.components_selected",
            iteration=self._state.iteration,
            components=components_to_update,
            selector=type(self._component_selector).__name__,
        )

        # Build reflective dataset
        reflective_dataset = await self.adapter.make_reflective_dataset(
            selected_candidate.components,
            eval_batch,
            components_to_update,
        )

        # Propose new texts
        proposed_components = await self.adapter.propose_new_texts(
            selected_candidate.components,
            reflective_dataset,
            components_to_update,
        )

        # Create new candidate with proposed components
        new_components = dict(selected_candidate.components)
        new_components.update(proposed_components)
        return (
            Candidate(
                components=new_components,
                generation=selected_candidate.generation,
                parent_id=selected_candidate.parent_id,
            ),
            components_to_update,
        )

    def _record_iteration(
        self,
        score: float,
        component_text: str,
        evolved_component: str,
        accepted: bool,
        objective_scores: list[dict[str, float]] | None = None,
    ) -> None:
        """Record iteration outcome.

        Args:
            score: Score achieved in this iteration.
            component_text: The text of the component that was evaluated.
            evolved_component: The name of the component that was evolved
                (e.g., "instruction", "output_schema").
            accepted: Whether proposal was accepted.
            objective_scores: Optional objective scores from this iteration's
                evaluation. None when adapter does not provide objective scores.

        Note:
            Stores an IterationRecord in the engine state's iteration_history,
            preserving chronological evolution trace for analysis.
        """
        assert self._state is not None, "Engine state not initialized"
        record = IterationRecord(
            iteration_number=self._state.iteration,
            score=score,
            component_text=component_text,
            evolved_component=evolved_component,
            accepted=accepted,
            objective_scores=objective_scores,
        )
        self._state.iteration_history.append(record)

    def _should_stop(self) -> bool:
        """Check if evolution should terminate.

        Returns:
            True if any stopping condition met:
            - iteration >= max_iterations
            - patience > 0 AND stagnation_counter >= patience
            - any active stopper returns True

        Note:
            Observes termination conditions in priority order: max iterations,
            early stopping patience, then custom stoppers. Only active stoppers
            (those that passed setup or have no setup method) are invoked.
        """
        assert self._state is not None, "Engine state not initialized"
        # Condition 1: Max iterations reached (built-in, fast path)
        if self._state.iteration >= self.config.max_iterations:
            return True

        # Condition 2: Early stopping (patience exhausted, built-in)
        if self.config.patience > 0:
            if self._state.stagnation_counter >= self.config.patience:
                return True

        # Condition 3: Custom stoppers (T010-T013)
        # Use _active_stoppers which excludes stoppers that failed setup
        active_stoppers = getattr(self, "_active_stoppers", None)
        if active_stoppers:
            stopper_state = self._build_stopper_state()
            for stopper in active_stoppers:
                try:
                    if stopper(stopper_state):
                        # Log stopper trigger (T013)
                        logger.info(
                            "stopper.triggered",
                            stopper=type(stopper).__name__,
                            iteration=self._state.iteration,
                        )
                        return True  # Short-circuit on first True (T011)
                except Exception:
                    # T032: Handle stopper exception gracefully
                    logger.exception(
                        "stopper.error",
                        stopper=type(stopper).__name__,
                        iteration=self._state.iteration,
                    )
                    # Continue checking other stoppers

        return False

    def _should_accept(self, proposal_score: float, best_score: float) -> bool:
        """Check if proposal should be accepted.

        Args:
            proposal_score: Score of the proposed candidate.
            best_score: Current best score.

        Returns:
            True if proposal_score > best_score + min_improvement_threshold.

        Note:
            Signals True when proposal exceeds best score by the configured
            improvement threshold, enabling configurable acceptance sensitivity.
        """
        threshold = self.config.min_improvement_threshold
        return proposal_score > best_score + threshold

    def _validate_schema_component(self, proposal: Candidate) -> bool:
        """Validate output_schema component if present.

        Validates that proposed schema text is syntactically correct and
        structurally valid (inherits from BaseModel, no imports/functions).
        Invalid schemas are rejected to prevent evolution from accepting
        non-functional schema proposals.

        Args:
            proposal: Candidate containing components to validate.

        Returns:
            True if valid or no output_schema component present.
            False if output_schema validation fails.

        Note:
            This validation runs before expensive evaluation to reject
            invalid schemas early. Security checks (no imports, no functions)
            are enforced to prevent code injection.
        """
        if "output_schema" not in proposal.components:
            return True

        schema_text = proposal.components["output_schema"]

        try:
            # Import here to avoid circular dependency at module load
            from gepa_adk.utils.schema_utils import validate_schema_text

            validate_schema_text(schema_text)
            logger.debug(
                "schema_validation.passed",
                iteration=self._state.iteration if self._state else None,
            )
            return True
        except SchemaValidationError as e:
            logger.warning(
                "schema_validation.rejected",
                iteration=self._state.iteration if self._state else None,
                validation_stage=e.validation_stage,
                line_number=e.line_number,
                error=e.validation_error,
            )
            return False

    def _accept_proposal(
        self,
        proposal: Candidate,
        score: float,
        eval_batch: EvaluationBatch,
        *,
        candidate_idx: int | None = None,
        reflection_score: float | None = None,
        valset_mean: float | None = None,
        objective_scores: list[dict[str, float]] | None = None,
    ) -> None:
        """Accept a proposal and update state.

        Args:
            proposal: Proposed candidate to accept.
            score: Acceptance score of the proposed candidate (sum or mean).
            eval_batch: Reflection batch from proposal evaluation (cached for
                next iteration's reflective dataset generation).
            candidate_idx: Optional ParetoState candidate index to update with
                lineage metadata.
            reflection_score: Optional trainset score to store with best
                candidate metadata.
            valset_mean: Optional valset mean score to track separately from
                acceptance score.
            objective_scores: Optional objective scores from scoring batch.
                None when adapter does not provide objective scores.

        Note:
            Swaps cached reflection batch for next proposal iteration.
            Tracks acceptance score and valset mean separately.
        """
        assert self._state is not None, "Engine state not initialized"
        # Create new candidate with lineage
        new_candidate = Candidate(
            components=dict(proposal.components),
            generation=self._state.best_candidate.generation + 1,
            parent_id=f"gen-{self._state.best_candidate.generation}",
        )
        if candidate_idx is not None and self._pareto_state is not None:
            self._pareto_state.candidates[candidate_idx] = new_candidate
        self._state.best_candidate = new_candidate
        self._state.best_score = score
        self._state.stagnation_counter = 0
        self._state.last_eval_batch = eval_batch
        if reflection_score is not None:
            self._state.best_reflection_score = reflection_score
        if valset_mean is not None:
            self._state.best_valset_mean = valset_mean
        self._state.best_objective_scores = objective_scores

    def _build_result(self) -> EvolutionResult:
        """Build final result from current state.

        Returns:
            Frozen EvolutionResult with all metrics.

        Note:
            Synthesizes a frozen EvolutionResult containing all evolution metrics
            and history, suitable for immutable result reporting. The evolved_components
            dict contains all component values from the best candidate.
        """
        assert self._state is not None, "Engine state not initialized"
        return EvolutionResult(
            original_score=self._state.original_score,
            final_score=self._state.best_score,
            evolved_components=dict(self._state.best_candidate.components),
            iteration_history=self._state.iteration_history,
            total_iterations=self._state.iteration,
            valset_score=self._state.best_valset_mean,
            trainset_score=self._state.best_reflection_score,
            objective_scores=self._state.best_objective_scores,
        )

    async def run(self) -> EvolutionResult:
        """Execute the evolution loop.

        Runs the core evolution loop:
        1. Evaluate baseline candidate
        2. For each iteration until max_iterations or convergence:
           a. Generate reflective dataset from traces
           b. Propose new candidate text
           c. Evaluate proposal
           d. Accept if improves above threshold
           e. Record iteration
        3. Return frozen EvolutionResult

        Returns:
            EvolutionResult containing:
                - original_score: Baseline score before evolution
                - final_score: Best score achieved
                - evolved_component_text: Best component_text found
                - iteration_history: List of IterationRecord objects
                - total_iterations: Number of iterations performed

        Raises:
            Exception: Any exceptions from adapter methods propagate unchanged.

        Examples:
            Running evolution:

            ```python
            result = await engine.run()
            print(f"Improved: {result.improved}")
            print(f"Best score: {result.final_score}")
            ```

        Note:
            Outputs a frozen EvolutionResult after completing the evolution
            loop. Engine instance should not be reused after run() completes.
            Method is idempotent if called multiple times (restarts fresh).
            Fail-fast behavior: adapter exceptions are not caught.
        """
        # Initialize stopper state tracking (T004)
        self._start_time = time.monotonic()
        self._total_evaluations = 0

        # Setup stopper lifecycle (T023)
        setup_stoppers = self._setup_stoppers()

        try:
            return await self._run_evolution_loop()
        finally:
            # Cleanup stopper lifecycle (T024)
            self._cleanup_stoppers(setup_stoppers)

    async def _run_evolution_loop(self) -> EvolutionResult:
        """Execute the core evolution loop.

        This method contains the actual evolution loop logic, separated
        from lifecycle management for clean try/finally handling.

        Returns:
            EvolutionResult with evolution outcomes.

        Note:
            Only called from run(). Handles the evolution loop body
            while run() manages stopper lifecycle.
        """
        # Initialize baseline
        await self._initialize_baseline()
        assert self._state is not None, "Engine state not initialized"

        # Evolution loop
        while not self._should_stop():
            self._state.iteration += 1

            # Propose mutation (returns candidate and list of components evolved)
            proposal, evolved_components_list = await self._propose_mutation()

            # Validate schema component if present (reject invalid early)
            if not self._validate_schema_component(proposal):
                # Invalid schema - skip evaluation and count as stagnation
                self._state.stagnation_counter += 1
                logger.debug(
                    "evolution.proposal_skipped",
                    iteration=self._state.iteration,
                    reason="schema_validation_failed",
                )
                continue

            # Evaluate proposal
            reflection_score, reflection_batch = await self._evaluate_reflection(
                proposal
            )
            proposal_score, scoring_batch, eval_indices = await self._evaluate_scoring(
                proposal
            )

            candidate_idx = None
            if self._pareto_state is not None:
                # Use eval_indices returned from _evaluate_scoring (T066)
                # Prepare objective scores if available (T068)
                objective_scores: dict[str, float] | None = None
                per_example_objective_scores: dict[int, dict[str, float]] | None = None

                if scoring_batch.objective_scores is not None:
                    from statistics import fmean

                    if self.config.frontier_type in (
                        FrontierType.OBJECTIVE,
                        FrontierType.HYBRID,
                    ):
                        # Aggregate objective scores across evaluated examples
                        objective_scores_accum: dict[str, list[float]] = {}
                        for obj_scores in scoring_batch.objective_scores:
                            for obj_name, obj_score in obj_scores.items():
                                objective_scores_accum.setdefault(obj_name, []).append(
                                    obj_score
                                )
                        # Take mean per objective
                        objective_scores = {
                            obj_name: fmean(scores)
                            for obj_name, scores in objective_scores_accum.items()
                        }

                    if self.config.frontier_type == FrontierType.CARTESIAN:
                        # For CARTESIAN, need per-example objective scores mapped to valset indices
                        per_example_objective_scores = {
                            eval_indices[i]: scoring_batch.objective_scores[i]
                            for i in range(len(eval_indices))
                        }
                        # Also need aggregated for validation
                        objective_scores_by_name: dict[str, list[float]] = {}
                        for obj_scores in scoring_batch.objective_scores:
                            for obj_name, obj_score in obj_scores.items():
                                objective_scores_by_name.setdefault(
                                    obj_name, []
                                ).append(obj_score)
                        objective_scores = {
                            obj_name: fmean(scores)
                            for obj_name, scores in objective_scores_by_name.items()
                        }

                # Determine parent indices for genealogy tracking
                parent_indices: list[int] | None = None
                if self._candidate_selector is not None:
                    try:
                        parent_idx = await self._candidate_selector.select_candidate(
                            self._pareto_state
                        )
                        parent_indices = [parent_idx]
                    except NoCandidateAvailableError:
                        parent_indices = None
                else:
                    # Use best candidate as parent
                    if self._pareto_state.best_average_idx is not None:
                        parent_indices = [self._pareto_state.best_average_idx]

                # Pass scores with correct index mapping (T066)
                candidate_idx = self._pareto_state.add_candidate(
                    proposal,
                    scoring_batch.scores,
                    score_indices=eval_indices,
                    objective_scores=objective_scores,
                    per_example_objective_scores=per_example_objective_scores,
                    parent_indices=parent_indices,
                    logger=logger,
                )
                self._candidate_eval_batches[candidate_idx] = reflection_batch
                logger.info(
                    "pareto_frontier.candidate_added",
                    candidate_idx=candidate_idx,
                    iteration=self._state.iteration,
                )

            # Calculate valset mean using only evaluated scores (T067)
            valset_mean = (
                sum(scoring_batch.scores) / len(scoring_batch.scores)
                if scoring_batch.scores
                else 0.0
            )

            # Accept if improves above threshold
            accepted = self._should_accept(proposal_score, self._state.best_score)
            if accepted:
                self._accept_proposal(
                    proposal,
                    proposal_score,
                    reflection_batch,
                    candidate_idx=candidate_idx,
                    reflection_score=reflection_score,
                    valset_mean=valset_mean,
                    objective_scores=scoring_batch.objective_scores,
                )
                # Schedule merge if enabled
                if (
                    self.config.use_merge
                    and self._merge_proposer is not None
                    and self._merge_invocations < self.config.max_merge_invocations
                ):
                    self._merges_due += 1
                    logger.debug(
                        "merge_scheduling.merge_scheduled",
                        iteration=self._state.iteration,
                        merges_due=self._merges_due,
                    )
            else:
                # Increment stagnation counter on rejection
                self._state.stagnation_counter += 1

            # Attempt merge if scheduled
            if (
                self._merges_due > 0
                and self._merge_proposer is not None
                and self._pareto_state is not None
                and self._merge_invocations < self.config.max_merge_invocations
            ):
                merge_result = await self._merge_proposer.propose(self._pareto_state)
                if merge_result is not None:
                    self._merges_due -= 1
                    self._merge_invocations += 1
                    logger.info(
                        "merge_scheduling.merge_attempted",
                        iteration=self._state.iteration,
                        parent_indices=merge_result.parent_indices,
                        ancestor_idx=merge_result.metadata.get("ancestor_idx"),
                        merges_due=self._merges_due,
                        total_invocations=self._merge_invocations,
                    )
                    # Validate schema component in merge proposal
                    if not self._validate_schema_component(merge_result.candidate):
                        logger.debug(
                            "merge.proposal_skipped",
                            iteration=self._state.iteration,
                            reason="schema_validation_failed",
                        )
                        continue
                    # Evaluate merge proposal
                    (
                        merge_reflection_score,
                        merge_reflection_batch,
                    ) = await self._evaluate_reflection(merge_result.candidate)
                    (
                        merge_proposal_score,
                        merge_scoring_batch,
                        merge_eval_indices,
                    ) = await self._evaluate_scoring(merge_result.candidate)

                    # Add merge candidate to ParetoState
                    merge_candidate_idx = None
                    assert self._pareto_state is not None, (
                        "Pareto state not initialized"
                    )
                    merge_objective_scores: dict[str, float] | None = None
                    merge_per_example_objective_scores: (
                        dict[int, dict[str, float]] | None
                    ) = None

                    if merge_scoring_batch.objective_scores is not None:
                        from statistics import fmean

                        if self.config.frontier_type in (
                            FrontierType.OBJECTIVE,
                            FrontierType.HYBRID,
                        ):
                            merge_objective_scores_accum: dict[str, list[float]] = {}
                            for obj_scores in merge_scoring_batch.objective_scores:
                                for obj_name, obj_score in obj_scores.items():
                                    merge_objective_scores_accum.setdefault(
                                        obj_name, []
                                    ).append(obj_score)
                            merge_objective_scores = {
                                obj_name: fmean(scores)
                                for obj_name, scores in merge_objective_scores_accum.items()
                            }

                        if self.config.frontier_type == FrontierType.CARTESIAN:
                            merge_per_example_objective_scores = {
                                merge_eval_indices[
                                    i
                                ]: merge_scoring_batch.objective_scores[i]
                                for i in range(len(merge_eval_indices))
                            }
                            merge_objective_scores_by_name: dict[str, list[float]] = {}
                            for obj_scores in merge_scoring_batch.objective_scores:
                                for obj_name, obj_score in obj_scores.items():
                                    merge_objective_scores_by_name.setdefault(
                                        obj_name, []
                                    ).append(obj_score)
                            merge_objective_scores = {
                                obj_name: fmean(scores)
                                for obj_name, scores in merge_objective_scores_by_name.items()
                            }

                    merge_candidate_idx = self._pareto_state.add_candidate(
                        merge_result.candidate,
                        merge_scoring_batch.scores,
                        score_indices=merge_eval_indices,
                        objective_scores=merge_objective_scores,
                        per_example_objective_scores=merge_per_example_objective_scores,
                        parent_indices=merge_result.parent_indices,
                        logger=logger,
                    )
                    self._candidate_eval_batches[merge_candidate_idx] = (
                        merge_reflection_batch
                    )

                    merge_valset_mean = (
                        sum(merge_scoring_batch.scores)
                        / len(merge_scoring_batch.scores)
                        if merge_scoring_batch.scores
                        else 0.0
                    )

                    # Accept merge if improves
                    merge_accepted = self._should_accept(
                        merge_proposal_score, self._state.best_score
                    )
                    if merge_accepted:
                        self._accept_proposal(
                            merge_result.candidate,
                            merge_proposal_score,
                            merge_reflection_batch,
                            candidate_idx=merge_candidate_idx,
                            reflection_score=merge_reflection_score,
                            valset_mean=merge_valset_mean,
                            objective_scores=merge_scoring_batch.objective_scores,
                        )
                        logger.info(
                            "merge_scheduling.merge_accepted",
                            iteration=self._state.iteration,
                            merge_score=merge_proposal_score,
                        )
                    else:
                        logger.debug(
                            "merge_scheduling.merge_rejected",
                            iteration=self._state.iteration,
                            merge_score=merge_proposal_score,
                            best_score=self._state.best_score,
                        )
                else:
                    # Merge not possible, decrement counter
                    if self._merges_due > 0:
                        self._merges_due -= 1

            # Record iteration with actual evolved component name (T033)
            # For single-component evolution, use the first (and only) component.
            # For multi-component round-robin, this tracks which component was
            # evolved in this iteration.
            if evolved_components_list:
                evolved_component_name = evolved_components_list[0]
            else:
                # Empty list indicates logic error - use first key from proposal
                logger.warning(
                    "engine.empty_evolved_components_list",
                    iteration=self._state.iteration,
                    proposal_keys=list(proposal.components.keys()),
                )
                evolved_component_name = next(iter(proposal.components.keys()))
            self._record_iteration(
                score=proposal_score,
                component_text=proposal.components.get(evolved_component_name, ""),
                evolved_component=evolved_component_name,
                accepted=accepted,
                objective_scores=scoring_batch.objective_scores,
            )

        # Build and return result
        return self._build_result()

pareto_state property

pareto_state: ParetoState | None

Return the current Pareto state, if initialized.

__init__

__init__(
    adapter: AsyncGEPAAdapter[
        DataInst, Trajectory, RolloutOutput
    ],
    config: EvolutionConfig,
    initial_candidate: Candidate,
    batch: list[DataInst],
    valset: list[DataInst] | None = None,
    candidate_selector: CandidateSelectorProtocol
    | None = None,
    component_selector: ComponentSelectorProtocol
    | None = None,
    evaluation_policy: EvaluationPolicyProtocol
    | None = None,
    merge_proposer: ProposerProtocol | None = None,
) -> None

Initialize the evolution engine.

PARAMETER DESCRIPTION
adapter

Implementation of AsyncGEPAAdapter protocol for evaluation and proposal generation.

TYPE: AsyncGEPAAdapter[DataInst, Trajectory, RolloutOutput]

config

Evolution parameters controlling iterations, thresholds, and early stopping.

TYPE: EvolutionConfig

initial_candidate

Starting candidate with 'instruction' component.

TYPE: Candidate

batch

Trainset data instances for reflection and mutation.

TYPE: list[DataInst]

valset

Optional validation data for scoring candidates. Defaults to trainset when omitted.

TYPE: list[DataInst] | None DEFAULT: None

candidate_selector

Optional selector strategy for Pareto-aware candidate sampling.

TYPE: CandidateSelectorProtocol | None DEFAULT: None

component_selector

Optional selector strategy for choosing which components to update. Defaults to RoundRobinComponentSelector.

TYPE: ComponentSelectorProtocol | None DEFAULT: None

evaluation_policy

Optional policy for selecting which validation examples to evaluate per iteration. Defaults to FullEvaluationPolicy.

TYPE: EvaluationPolicyProtocol | None DEFAULT: None

merge_proposer

Optional proposer for merge operations. If provided and config.use_merge is True, merge proposals will be attempted after successful mutations.

TYPE: ProposerProtocol | None DEFAULT: None

RAISES DESCRIPTION
ValueError

If batch is empty or initial_candidate lacks 'instruction'.

ConfigurationError

If config validation fails (via EvolutionConfig).

Examples:

Creating an engine:

engine = AsyncGEPAEngine(
    adapter=my_adapter,
    config=EvolutionConfig(max_iterations=50),
    initial_candidate=Candidate(components={"instruction": "Be helpful"}),
    batch=training_data,
    candidate_selector=selector,
)
Note

Configures trainset and valset routing for reflection and scoring.

Source code in src/gepa_adk/engine/async_engine.py
def __init__(
    self,
    adapter: AsyncGEPAAdapter[DataInst, Trajectory, RolloutOutput],
    config: EvolutionConfig,
    initial_candidate: Candidate,
    batch: list[DataInst],
    valset: list[DataInst] | None = None,
    candidate_selector: CandidateSelectorProtocol | None = None,
    component_selector: ComponentSelectorProtocol | None = None,
    evaluation_policy: EvaluationPolicyProtocol | None = None,
    merge_proposer: ProposerProtocol | None = None,
) -> None:
    """Initialize the evolution engine.

    Args:
        adapter: Implementation of AsyncGEPAAdapter protocol for evaluation
            and proposal generation.
        config: Evolution parameters controlling iterations, thresholds,
            and early stopping.
        initial_candidate: Starting candidate with 'instruction' component.
        batch: Trainset data instances for reflection and mutation.
        valset: Optional validation data for scoring candidates. Defaults
            to trainset when omitted.
        candidate_selector: Optional selector strategy for Pareto-aware
            candidate sampling.
        component_selector: Optional selector strategy for choosing which
            components to update. Defaults to RoundRobinComponentSelector.
        evaluation_policy: Optional policy for selecting which validation
            examples to evaluate per iteration. Defaults to FullEvaluationPolicy.
        merge_proposer: Optional proposer for merge operations. If provided
            and config.use_merge is True, merge proposals will be attempted
            after successful mutations.

    Raises:
        ValueError: If batch is empty or initial_candidate lacks 'instruction'.
        ConfigurationError: If config validation fails (via EvolutionConfig).

    Examples:
        Creating an engine:

        ```python
        engine = AsyncGEPAEngine(
            adapter=my_adapter,
            config=EvolutionConfig(max_iterations=50),
            initial_candidate=Candidate(components={"instruction": "Be helpful"}),
            batch=training_data,
            candidate_selector=selector,
        )
        ```

    Note:
        Configures trainset and valset routing for reflection and scoring.
    """
    # Validation
    if len(batch) == 0:
        raise ValueError("batch must contain at least one data instance")
    if valset is not None and len(valset) == 0:
        raise ValueError(
            "valset must contain at least one validation data instance"
        )

    if not initial_candidate.components:
        raise ValueError("initial_candidate must have at least one component")

    # Store dependencies
    self.adapter = adapter
    self.config = config
    self._initial_candidate = initial_candidate
    self._trainset = batch
    self._valset = valset if valset is not None else batch
    self._state: _EngineState | None = None
    self._candidate_selector = candidate_selector
    self._component_selector = component_selector or RoundRobinComponentSelector()
    self._pareto_state: ParetoState | None = None
    self._candidate_eval_batches: dict[int, EvaluationBatch] = {}
    self._merge_proposer = merge_proposer
    self._merges_due: int = 0
    self._merge_invocations: int = 0
    # Stopper state tracking (T001, T002)
    self._start_time: float | None = None
    self._total_evaluations: int = 0
    self._active_stoppers: list[object] = []
    # Import here to avoid circular dependency
    if evaluation_policy is None:
        from gepa_adk.adapters.evaluation_policy import FullEvaluationPolicy

        self._evaluation_policy: EvaluationPolicyProtocol = FullEvaluationPolicy()
    else:
        self._evaluation_policy = evaluation_policy

run async

run() -> EvolutionResult

Execute the evolution loop.

Runs the core evolution loop: 1. Evaluate baseline candidate 2. For each iteration until max_iterations or convergence: a. Generate reflective dataset from traces b. Propose new candidate text c. Evaluate proposal d. Accept if improves above threshold e. Record iteration 3. Return frozen EvolutionResult

RETURNS DESCRIPTION
EvolutionResult

EvolutionResult containing: - original_score: Baseline score before evolution - final_score: Best score achieved - evolved_component_text: Best component_text found - iteration_history: List of IterationRecord objects - total_iterations: Number of iterations performed

RAISES DESCRIPTION
Exception

Any exceptions from adapter methods propagate unchanged.

Examples:

Running evolution:

result = await engine.run()
print(f"Improved: {result.improved}")
print(f"Best score: {result.final_score}")
Note

Outputs a frozen EvolutionResult after completing the evolution loop. Engine instance should not be reused after run() completes. Method is idempotent if called multiple times (restarts fresh). Fail-fast behavior: adapter exceptions are not caught.

Source code in src/gepa_adk/engine/async_engine.py
async def run(self) -> EvolutionResult:
    """Execute the evolution loop.

    Runs the core evolution loop:
    1. Evaluate baseline candidate
    2. For each iteration until max_iterations or convergence:
       a. Generate reflective dataset from traces
       b. Propose new candidate text
       c. Evaluate proposal
       d. Accept if improves above threshold
       e. Record iteration
    3. Return frozen EvolutionResult

    Returns:
        EvolutionResult containing:
            - original_score: Baseline score before evolution
            - final_score: Best score achieved
            - evolved_component_text: Best component_text found
            - iteration_history: List of IterationRecord objects
            - total_iterations: Number of iterations performed

    Raises:
        Exception: Any exceptions from adapter methods propagate unchanged.

    Examples:
        Running evolution:

        ```python
        result = await engine.run()
        print(f"Improved: {result.improved}")
        print(f"Best score: {result.final_score}")
        ```

    Note:
        Outputs a frozen EvolutionResult after completing the evolution
        loop. Engine instance should not be reused after run() completes.
        Method is idempotent if called multiple times (restarts fresh).
        Fail-fast behavior: adapter exceptions are not caught.
    """
    # Initialize stopper state tracking (T004)
    self._start_time = time.monotonic()
    self._total_evaluations = 0

    # Setup stopper lifecycle (T023)
    setup_stoppers = self._setup_stoppers()

    try:
        return await self._run_evolution_loop()
    finally:
        # Cleanup stopper lifecycle (T024)
        self._cleanup_stoppers(setup_stoppers)

MergeProposer

Proposer that combines two Pareto-optimal candidates via genetic crossover.

Selects two candidates from the frontier that share a common ancestor, identifies which components each improved, and creates a merged candidate that combines improvements from both branches.

ATTRIBUTE DESCRIPTION
rng

Random number generator for candidate selection.

TYPE: Random

val_overlap_floor

Minimum overlapping validation coverage required.

TYPE: int

max_attempts

Maximum merge attempts before giving up.

TYPE: int

attempted_merges

Set of attempted merge triplets to prevent duplicates.

TYPE: set[AncestorLog]

Examples:

Creating a merge proposer:

proposer = MergeProposer(rng=random.Random(42))
result = await proposer.propose(state)
if result:
    print(f"Merged from parents {result.parent_indices}")

Note: A proposer that combines two Pareto-optimal candidates via genetic crossover. Selects candidates from the frontier that share a common ancestor and merges their complementary component improvements.

Source code in src/gepa_adk/engine/merge_proposer.py
class MergeProposer:
    """Proposer that combines two Pareto-optimal candidates via genetic crossover.

    Selects two candidates from the frontier that share a common ancestor,
    identifies which components each improved, and creates a merged candidate
    that combines improvements from both branches.

    Attributes:
        rng (random.Random): Random number generator for candidate selection.
        val_overlap_floor (int): Minimum overlapping validation coverage required.
        max_attempts (int): Maximum merge attempts before giving up.
        attempted_merges (set[AncestorLog]): Set of attempted merge triplets to prevent duplicates.

    Examples:
        Creating a merge proposer:

        ```python
        proposer = MergeProposer(rng=random.Random(42))
        result = await proposer.propose(state)
        if result:
            print(f"Merged from parents {result.parent_indices}")
        ```
    Note:
        A proposer that combines two Pareto-optimal candidates via genetic crossover.
        Selects candidates from the frontier that share a common ancestor and merges
        their complementary component improvements.
    """

    def __init__(
        self,
        rng: random.Random,
        val_overlap_floor: int = 5,
        max_attempts: int = 10,
    ) -> None:
        """Initialize MergeProposer.

        Args:
            rng: Random number generator for candidate selection.
            val_overlap_floor: Minimum overlapping validation examples required.
            max_attempts: Maximum merge attempts before giving up.

        Note:
            Creates a new MergeProposer instance with the specified random number
            generator and configuration parameters. The attempted_merges set is
            initialized empty to track merge attempts.
        """
        self.rng = rng
        self.val_overlap_floor = val_overlap_floor
        self.max_attempts = max_attempts
        self.attempted_merges: set[AncestorLog] = set()

    async def propose(
        self,
        state: ParetoState,
        eval_batch: object | None = None,  # Ignored for merge proposals
    ) -> ProposalResult | None:
        """Attempt to merge two frontier candidates.

        Args:
            state (ParetoState): Current Pareto state with candidates, scores, and genealogy.
                Must contain at least 2 candidates on the Pareto frontier with a shared
                common ancestor for merge to succeed.
            eval_batch (object | None): Ignored for merge proposals. Merge operations
                do not require evaluation batch data as they combine existing candidates.

        Returns:
            ProposalResult | None: ProposalResult with merged candidate and both parent indices,
            or None if merge not possible (e.g., no common ancestor, insufficient frontier,
            or no complementary component changes).

        Examples:
            Proposing a merge from evolution state:

            ```python
            proposer = MergeProposer(rng=random.Random(42))
            result = await proposer.propose(state)
            if result:
                print(f"Merged from parents {result.parent_indices}")
                print(f"Ancestor: {result.metadata['ancestor_idx']}")
            ```

        Note:
            Operations select candidates from Pareto frontier only. Requires common ancestor
            and complementary component changes for successful merge. Validates
            minimum validation overlap before merging.
        """
        # Find suitable merge candidates
        merge_candidates = self._find_merge_candidates(state)
        if merge_candidates is None:
            logger.debug("merge_proposer.no_candidates", reason="no_suitable_pair")
            return None

        parent1_idx, parent2_idx, ancestor_idx = merge_candidates

        # Get candidate components
        ancestor = state.candidates[ancestor_idx]
        parent1 = state.candidates[parent1_idx]
        parent2 = state.candidates[parent2_idx]

        # Check if merge is desirable (complementary changes)
        if not has_desirable_predictors(
            ancestor.components, parent1.components, parent2.components
        ):
            logger.debug(
                "merge_proposer.no_desirable_predictors",
                parent1_idx=parent1_idx,
                parent2_idx=parent2_idx,
                ancestor_idx=ancestor_idx,
            )
            return None

        # Check validation overlap
        scores1 = state.candidate_scores.get(parent1_idx, {})
        scores2 = state.candidate_scores.get(parent2_idx, {})
        overlap = set(scores1.keys()) & set(scores2.keys())
        valid_overlap = {idx for idx in overlap if isinstance(idx, int) and idx >= 0}
        if len(valid_overlap) < self.val_overlap_floor:
            logger.debug(
                "merge_proposer.insufficient_overlap",
                parent1_idx=parent1_idx,
                parent2_idx=parent2_idx,
                overlap_count=len(valid_overlap),
                required=self.val_overlap_floor,
            )
            return None

        # Calculate average scores for component selection
        avg_score1 = fmean(scores1.values()) if scores1 else 0.0
        avg_score2 = fmean(scores2.values()) if scores2 else 0.0

        # Merge components
        merged_components = self._merge_components(
            ancestor.components,
            parent1.components,
            parent2.components,
            avg_score1,
            avg_score2,
        )

        # Create merged candidate
        merged_candidate = Candidate(
            components=merged_components,
            generation=max(parent1.generation, parent2.generation) + 1,
            parent_ids=[parent1_idx, parent2_idx],
        )

        # Log successful merge
        logger.info(
            "merge_proposer.merge_success",
            parent1_idx=parent1_idx,
            parent2_idx=parent2_idx,
            ancestor_idx=ancestor_idx,
            components_merged=list(merged_components.keys()),
        )

        return ProposalResult(
            candidate=merged_candidate,
            parent_indices=[parent1_idx, parent2_idx],
            tag="merge",
            metadata={"ancestor_idx": ancestor_idx},
        )

    def _find_merge_candidates(
        self,
        state: ParetoState,
    ) -> tuple[int, int, int] | None:
        """Find two candidates suitable for merging.

        Args:
            state: Current Pareto state.

        Returns:
            Tuple of (parent1_idx, parent2_idx, ancestor_idx) or None if no
            suitable pair found.

        Note:
            Searches for suitable merge candidates from the Pareto frontier.
            Requires common ancestor and prevents duplicate merge attempts.
        """
        # Get frontier candidates (non-dominated)
        frontier_candidates = state.frontier.get_non_dominated()

        if len(frontier_candidates) < 2:
            logger.debug(
                "merge_proposer.insufficient_frontier",
                frontier_size=len(frontier_candidates),
            )
            return None

        # Try to find a suitable pair
        for _ in range(self.max_attempts):
            # Sample two different candidates from frontier
            candidate_list = list(frontier_candidates)
            if len(candidate_list) < 2:
                return None

            parent1_idx = self.rng.choice(candidate_list)
            parent2_idx = self.rng.choice(candidate_list)

            # Must be different candidates
            if parent1_idx == parent2_idx:
                continue

            # Ensure consistent ordering for deduplication
            if parent1_idx > parent2_idx:
                parent1_idx, parent2_idx = parent2_idx, parent1_idx

            # Find common ancestor
            ancestor_idx = find_common_ancestor(
                parent1_idx, parent2_idx, state.parent_indices
            )

            if ancestor_idx is None:
                continue

            # Check if already attempted
            merge_log: AncestorLog = (parent1_idx, parent2_idx, ancestor_idx)
            if merge_log in self.attempted_merges:
                continue

            # Check ancestor score constraint (ancestor should not be better than descendants)
            ancestor_scores = state.candidate_scores.get(ancestor_idx, {})
            parent1_scores = state.candidate_scores.get(parent1_idx, {})
            parent2_scores = state.candidate_scores.get(parent2_idx, {})

            if ancestor_scores and parent1_scores and parent2_scores:
                ancestor_avg = fmean(ancestor_scores.values())
                parent1_avg = fmean(parent1_scores.values())
                parent2_avg = fmean(parent2_scores.values())

                # Ancestor should not be better than both descendants
                if ancestor_avg > max(parent1_avg, parent2_avg):
                    logger.debug(
                        "merge_proposer.ancestor_too_good",
                        ancestor_idx=ancestor_idx,
                        ancestor_avg=ancestor_avg,
                        parent1_avg=parent1_avg,
                        parent2_avg=parent2_avg,
                    )
                    continue

            # Mark as attempted
            self.attempted_merges.add(merge_log)

            return (parent1_idx, parent2_idx, ancestor_idx)

        logger.debug("merge_proposer.max_attempts_exceeded", attempts=self.max_attempts)
        return None

    def _merge_components(
        self,
        ancestor: dict[str, str],
        parent1: dict[str, str],
        parent2: dict[str, str],
        score1: float,
        score2: float,
    ) -> dict[str, str]:
        """Merge components from two parents based on ancestor divergence.

        Args:
            ancestor: Component dictionary from common ancestor.
            parent1: Component dictionary from first parent.
            parent2: Component dictionary from second parent.
            score1: Average score of first parent.
            score2: Average score of second parent.

        Returns:
            Merged component dictionary.

        Note:
            Strategy for merging components:
            - If both parents same → take either
            - If one unchanged from ancestor, other changed → take changed value
            - If both changed differently → take higher scorer's value
            Components present only in parents (not ancestor) are ignored.
        """
        merged: dict[str, str] = {}

        for key in ancestor.keys():
            anc_val = ancestor[key]
            p1_val = parent1.get(key, anc_val)
            p2_val = parent2.get(key, anc_val)

            if p1_val == p2_val:
                # Both same - take either
                merged[key] = p1_val
            elif p1_val == anc_val and p2_val != anc_val:
                # P1 unchanged, P2 changed - take P2's innovation
                merged[key] = p2_val
            elif p2_val == anc_val and p1_val != anc_val:
                # P2 unchanged, P1 changed - take P1's innovation
                merged[key] = p1_val
            else:
                # Both changed differently - take higher scorer's value
                merged[key] = p1_val if score1 >= score2 else p2_val

        return merged

__init__

__init__(
    rng: Random,
    val_overlap_floor: int = 5,
    max_attempts: int = 10,
) -> None

Initialize MergeProposer.

PARAMETER DESCRIPTION
rng

Random number generator for candidate selection.

TYPE: Random

val_overlap_floor

Minimum overlapping validation examples required.

TYPE: int DEFAULT: 5

max_attempts

Maximum merge attempts before giving up.

TYPE: int DEFAULT: 10

Note

Creates a new MergeProposer instance with the specified random number generator and configuration parameters. The attempted_merges set is initialized empty to track merge attempts.

Source code in src/gepa_adk/engine/merge_proposer.py
def __init__(
    self,
    rng: random.Random,
    val_overlap_floor: int = 5,
    max_attempts: int = 10,
) -> None:
    """Initialize MergeProposer.

    Args:
        rng: Random number generator for candidate selection.
        val_overlap_floor: Minimum overlapping validation examples required.
        max_attempts: Maximum merge attempts before giving up.

    Note:
        Creates a new MergeProposer instance with the specified random number
        generator and configuration parameters. The attempted_merges set is
        initialized empty to track merge attempts.
    """
    self.rng = rng
    self.val_overlap_floor = val_overlap_floor
    self.max_attempts = max_attempts
    self.attempted_merges: set[AncestorLog] = set()

propose async

propose(
    state: ParetoState, eval_batch: object | None = None
) -> ProposalResult | None

Attempt to merge two frontier candidates.

PARAMETER DESCRIPTION
state

Current Pareto state with candidates, scores, and genealogy. Must contain at least 2 candidates on the Pareto frontier with a shared common ancestor for merge to succeed.

TYPE: ParetoState

eval_batch

Ignored for merge proposals. Merge operations do not require evaluation batch data as they combine existing candidates.

TYPE: object | None DEFAULT: None

RETURNS DESCRIPTION
ProposalResult | None

ProposalResult | None: ProposalResult with merged candidate and both parent indices,

ProposalResult | None

or None if merge not possible (e.g., no common ancestor, insufficient frontier,

ProposalResult | None

or no complementary component changes).

Examples:

Proposing a merge from evolution state:

proposer = MergeProposer(rng=random.Random(42))
result = await proposer.propose(state)
if result:
    print(f"Merged from parents {result.parent_indices}")
    print(f"Ancestor: {result.metadata['ancestor_idx']}")
Note

Operations select candidates from Pareto frontier only. Requires common ancestor and complementary component changes for successful merge. Validates minimum validation overlap before merging.

Source code in src/gepa_adk/engine/merge_proposer.py
async def propose(
    self,
    state: ParetoState,
    eval_batch: object | None = None,  # Ignored for merge proposals
) -> ProposalResult | None:
    """Attempt to merge two frontier candidates.

    Args:
        state (ParetoState): Current Pareto state with candidates, scores, and genealogy.
            Must contain at least 2 candidates on the Pareto frontier with a shared
            common ancestor for merge to succeed.
        eval_batch (object | None): Ignored for merge proposals. Merge operations
            do not require evaluation batch data as they combine existing candidates.

    Returns:
        ProposalResult | None: ProposalResult with merged candidate and both parent indices,
        or None if merge not possible (e.g., no common ancestor, insufficient frontier,
        or no complementary component changes).

    Examples:
        Proposing a merge from evolution state:

        ```python
        proposer = MergeProposer(rng=random.Random(42))
        result = await proposer.propose(state)
        if result:
            print(f"Merged from parents {result.parent_indices}")
            print(f"Ancestor: {result.metadata['ancestor_idx']}")
        ```

    Note:
        Operations select candidates from Pareto frontier only. Requires common ancestor
        and complementary component changes for successful merge. Validates
        minimum validation overlap before merging.
    """
    # Find suitable merge candidates
    merge_candidates = self._find_merge_candidates(state)
    if merge_candidates is None:
        logger.debug("merge_proposer.no_candidates", reason="no_suitable_pair")
        return None

    parent1_idx, parent2_idx, ancestor_idx = merge_candidates

    # Get candidate components
    ancestor = state.candidates[ancestor_idx]
    parent1 = state.candidates[parent1_idx]
    parent2 = state.candidates[parent2_idx]

    # Check if merge is desirable (complementary changes)
    if not has_desirable_predictors(
        ancestor.components, parent1.components, parent2.components
    ):
        logger.debug(
            "merge_proposer.no_desirable_predictors",
            parent1_idx=parent1_idx,
            parent2_idx=parent2_idx,
            ancestor_idx=ancestor_idx,
        )
        return None

    # Check validation overlap
    scores1 = state.candidate_scores.get(parent1_idx, {})
    scores2 = state.candidate_scores.get(parent2_idx, {})
    overlap = set(scores1.keys()) & set(scores2.keys())
    valid_overlap = {idx for idx in overlap if isinstance(idx, int) and idx >= 0}
    if len(valid_overlap) < self.val_overlap_floor:
        logger.debug(
            "merge_proposer.insufficient_overlap",
            parent1_idx=parent1_idx,
            parent2_idx=parent2_idx,
            overlap_count=len(valid_overlap),
            required=self.val_overlap_floor,
        )
        return None

    # Calculate average scores for component selection
    avg_score1 = fmean(scores1.values()) if scores1 else 0.0
    avg_score2 = fmean(scores2.values()) if scores2 else 0.0

    # Merge components
    merged_components = self._merge_components(
        ancestor.components,
        parent1.components,
        parent2.components,
        avg_score1,
        avg_score2,
    )

    # Create merged candidate
    merged_candidate = Candidate(
        components=merged_components,
        generation=max(parent1.generation, parent2.generation) + 1,
        parent_ids=[parent1_idx, parent2_idx],
    )

    # Log successful merge
    logger.info(
        "merge_proposer.merge_success",
        parent1_idx=parent1_idx,
        parent2_idx=parent2_idx,
        ancestor_idx=ancestor_idx,
        components_merged=list(merged_components.keys()),
    )

    return ProposalResult(
        candidate=merged_candidate,
        parent_indices=[parent1_idx, parent2_idx],
        tag="merge",
        metadata={"ancestor_idx": ancestor_idx},
    )

AsyncGEPAAdapter

Bases: Protocol[DataInst, Trajectory, RolloutOutput]


              flowchart TD
              gepa_adk.AsyncGEPAAdapter[AsyncGEPAAdapter]

              

              click gepa_adk.AsyncGEPAAdapter href "" "gepa_adk.AsyncGEPAAdapter"
            

Protocol for async GEPA adapters used by the evolution engine.

Implementations provide evaluation, reflection dataset generation, and proposal updates for candidate component texts.

Examples:

Implement a minimal adapter:

class MyAdapter:
    async def evaluate(self, batch, candidate, capture_traces=False):
        return EvaluationBatch(outputs=[], scores=[])

    async def make_reflective_dataset(
        self, candidate, eval_batch, components_to_update
    ):
        return {component: [] for component in components_to_update}

    async def propose_new_texts(
        self, candidate, reflective_dataset, components_to_update
    ):
        return {
            component: candidate[component]
            for component in components_to_update
        }
Note

Adapters must implement all three async methods to satisfy the protocol. Use runtime_checkable for isinstance() checks.

Source code in src/gepa_adk/ports/adapter.py
@runtime_checkable
class AsyncGEPAAdapter(Protocol[DataInst, Trajectory, RolloutOutput]):
    """Protocol for async GEPA adapters used by the evolution engine.

    Implementations provide evaluation, reflection dataset generation, and
    proposal updates for candidate component texts.

    Examples:
        Implement a minimal adapter:

        ```python
        class MyAdapter:
            async def evaluate(self, batch, candidate, capture_traces=False):
                return EvaluationBatch(outputs=[], scores=[])

            async def make_reflective_dataset(
                self, candidate, eval_batch, components_to_update
            ):
                return {component: [] for component in components_to_update}

            async def propose_new_texts(
                self, candidate, reflective_dataset, components_to_update
            ):
                return {
                    component: candidate[component]
                    for component in components_to_update
                }
        ```

    Note:
        Adapters must implement all three async methods to satisfy
        the protocol. Use runtime_checkable for isinstance() checks.
    """

    async def evaluate(
        self,
        batch: list[DataInst],
        candidate: dict[str, str],
        capture_traces: bool = False,
    ) -> EvaluationBatch[Trajectory, RolloutOutput]:
        """Evaluate a candidate over a batch of inputs.

        Args:
            batch: Input data instances to evaluate.
            candidate: Component name to text mapping.
            capture_traces: Whether to capture execution traces.

        Returns:
            Evaluation results with outputs, scores, and optional traces.

        Examples:
            Basic evaluation:

            ```python
            result = await adapter.evaluate(batch, candidate)
            assert len(result.scores) == len(batch)
            ```

        Note:
            Output and score lists must have the same length as the
            input batch. Set capture_traces=True to enable reflection.
        """

    async def make_reflective_dataset(
        self,
        candidate: dict[str, str],
        eval_batch: EvaluationBatch[Trajectory, RolloutOutput],
        components_to_update: list[str],
    ) -> Mapping[str, Sequence[Mapping[str, Any]]]:
        """Build reflective datasets from evaluation traces.

        Args:
            candidate: Current candidate components.
            eval_batch: Evaluation results with traces.
            components_to_update: Components to generate datasets for.

        Returns:
            Mapping of component name to reflective examples.

        Examples:
            Build datasets for specific components:

            ```python
            dataset = await adapter.make_reflective_dataset(
                candidate,
                eval_batch,
                ["instruction"],
            )
            ```

        Note:
            Only call this method when eval_batch contains trajectories.
            Each component receives its own list of reflective examples.
        """

    async def propose_new_texts(
        self,
        candidate: dict[str, str],
        reflective_dataset: Mapping[str, Sequence[Mapping[str, Any]]],
        components_to_update: list[str],
    ) -> dict[str, str]:
        """Propose updated component texts from reflective datasets.

        Args:
            candidate: Current candidate components.
            reflective_dataset: Reflective examples per component.
            components_to_update: Components to propose updates for.

        Returns:
            Mapping of component name to new proposed text.

        Examples:
            Generate new component texts:

            ```python
            proposals = await adapter.propose_new_texts(
                candidate,
                reflective_dataset,
                ["instruction"],
            )
            ```

        Note:
            Outputs should contain improved text for each requested
            component. The evolution engine uses these as mutation candidates.
        """

evaluate async

evaluate(
    batch: list[DataInst],
    candidate: dict[str, str],
    capture_traces: bool = False,
) -> EvaluationBatch[Trajectory, RolloutOutput]

Evaluate a candidate over a batch of inputs.

PARAMETER DESCRIPTION
batch

Input data instances to evaluate.

TYPE: list[DataInst]

candidate

Component name to text mapping.

TYPE: dict[str, str]

capture_traces

Whether to capture execution traces.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
EvaluationBatch[Trajectory, RolloutOutput]

Evaluation results with outputs, scores, and optional traces.

Examples:

Basic evaluation:

result = await adapter.evaluate(batch, candidate)
assert len(result.scores) == len(batch)
Note

Output and score lists must have the same length as the input batch. Set capture_traces=True to enable reflection.

Source code in src/gepa_adk/ports/adapter.py
async def evaluate(
    self,
    batch: list[DataInst],
    candidate: dict[str, str],
    capture_traces: bool = False,
) -> EvaluationBatch[Trajectory, RolloutOutput]:
    """Evaluate a candidate over a batch of inputs.

    Args:
        batch: Input data instances to evaluate.
        candidate: Component name to text mapping.
        capture_traces: Whether to capture execution traces.

    Returns:
        Evaluation results with outputs, scores, and optional traces.

    Examples:
        Basic evaluation:

        ```python
        result = await adapter.evaluate(batch, candidate)
        assert len(result.scores) == len(batch)
        ```

    Note:
        Output and score lists must have the same length as the
        input batch. Set capture_traces=True to enable reflection.
    """

make_reflective_dataset async

make_reflective_dataset(
    candidate: dict[str, str],
    eval_batch: EvaluationBatch[Trajectory, RolloutOutput],
    components_to_update: list[str],
) -> Mapping[str, Sequence[Mapping[str, Any]]]

Build reflective datasets from evaluation traces.

PARAMETER DESCRIPTION
candidate

Current candidate components.

TYPE: dict[str, str]

eval_batch

Evaluation results with traces.

TYPE: EvaluationBatch[Trajectory, RolloutOutput]

components_to_update

Components to generate datasets for.

TYPE: list[str]

RETURNS DESCRIPTION
Mapping[str, Sequence[Mapping[str, Any]]]

Mapping of component name to reflective examples.

Examples:

Build datasets for specific components:

dataset = await adapter.make_reflective_dataset(
    candidate,
    eval_batch,
    ["instruction"],
)
Note

Only call this method when eval_batch contains trajectories. Each component receives its own list of reflective examples.

Source code in src/gepa_adk/ports/adapter.py
async def make_reflective_dataset(
    self,
    candidate: dict[str, str],
    eval_batch: EvaluationBatch[Trajectory, RolloutOutput],
    components_to_update: list[str],
) -> Mapping[str, Sequence[Mapping[str, Any]]]:
    """Build reflective datasets from evaluation traces.

    Args:
        candidate: Current candidate components.
        eval_batch: Evaluation results with traces.
        components_to_update: Components to generate datasets for.

    Returns:
        Mapping of component name to reflective examples.

    Examples:
        Build datasets for specific components:

        ```python
        dataset = await adapter.make_reflective_dataset(
            candidate,
            eval_batch,
            ["instruction"],
        )
        ```

    Note:
        Only call this method when eval_batch contains trajectories.
        Each component receives its own list of reflective examples.
    """

propose_new_texts async

propose_new_texts(
    candidate: dict[str, str],
    reflective_dataset: Mapping[
        str, Sequence[Mapping[str, Any]]
    ],
    components_to_update: list[str],
) -> dict[str, str]

Propose updated component texts from reflective datasets.

PARAMETER DESCRIPTION
candidate

Current candidate components.

TYPE: dict[str, str]

reflective_dataset

Reflective examples per component.

TYPE: Mapping[str, Sequence[Mapping[str, Any]]]

components_to_update

Components to propose updates for.

TYPE: list[str]

RETURNS DESCRIPTION
dict[str, str]

Mapping of component name to new proposed text.

Examples:

Generate new component texts:

proposals = await adapter.propose_new_texts(
    candidate,
    reflective_dataset,
    ["instruction"],
)
Note

Outputs should contain improved text for each requested component. The evolution engine uses these as mutation candidates.

Source code in src/gepa_adk/ports/adapter.py
async def propose_new_texts(
    self,
    candidate: dict[str, str],
    reflective_dataset: Mapping[str, Sequence[Mapping[str, Any]]],
    components_to_update: list[str],
) -> dict[str, str]:
    """Propose updated component texts from reflective datasets.

    Args:
        candidate: Current candidate components.
        reflective_dataset: Reflective examples per component.
        components_to_update: Components to propose updates for.

    Returns:
        Mapping of component name to new proposed text.

    Examples:
        Generate new component texts:

        ```python
        proposals = await adapter.propose_new_texts(
            candidate,
            reflective_dataset,
            ["instruction"],
        )
        ```

    Note:
        Outputs should contain improved text for each requested
        component. The evolution engine uses these as mutation candidates.
    """

EvaluationBatch dataclass

Bases: Generic[Trajectory, RolloutOutput]


              flowchart TD
              gepa_adk.EvaluationBatch[EvaluationBatch]

              

              click gepa_adk.EvaluationBatch href "" "gepa_adk.EvaluationBatch"
            

Container for evaluation outputs and scores.

ATTRIBUTE DESCRIPTION
outputs

Per-example outputs produced during evaluation.

TYPE: list[RolloutOutput]

scores

Per-example normalized scores (higher is better).

TYPE: list[Score]

trajectories

Optional per-example execution traces.

TYPE: list[Trajectory] | None

objective_scores

Optional multi-objective scores per example.

TYPE: list[dict[ComponentName, Score]] | None

metadata

Optional per-example scorer metadata. When provided, metadata[i] corresponds to outputs[i] and scores[i] (index-aligned). Metadata dicts may contain scorer-specific fields like 'feedback', 'actionable_guidance', or 'dimension_scores' from CriticScorer implementations.

TYPE: list[dict[str, Any]] | None

inputs

Optional per-example input text that was used to generate each output. Used by make_reflective_dataset to provide context for reflection. When provided, inputs[i] corresponds to the input that produced outputs[i].

TYPE: list[str] | None

Examples:

Create a batch with optional traces:

batch = EvaluationBatch(
    outputs=["ok", "ok"],
    scores=[0.9, 0.8],
    trajectories=[{"trace": 1}, {"trace": 2}],
)

Create a batch with inputs for reflection:

batch = EvaluationBatch(
    outputs=["Hello!", "Good morrow!"],
    scores=[0.3, 0.9],
    inputs=["I am the King", "I am your friend"],
)
Note

All fields are immutable once created due to frozen=True. Use this as the standard return type from adapter evaluations. When metadata is not None, len(metadata) must equal len(outputs) and len(scores).

Source code in src/gepa_adk/ports/adapter.py
@dataclass(frozen=True, slots=True)
class EvaluationBatch(Generic[Trajectory, RolloutOutput]):
    """Container for evaluation outputs and scores.

    Attributes:
        outputs (list[RolloutOutput]): Per-example outputs produced during evaluation.
        scores (list[Score]): Per-example normalized scores (higher is better).
        trajectories (list[Trajectory] | None): Optional per-example execution traces.
        objective_scores (list[dict[ComponentName, Score]] | None): Optional
            multi-objective scores per example.
        metadata (list[dict[str, Any]] | None): Optional per-example scorer metadata.
            When provided, metadata[i] corresponds to outputs[i] and scores[i]
            (index-aligned). Metadata dicts may contain scorer-specific fields like
            'feedback', 'actionable_guidance', or 'dimension_scores' from
            CriticScorer implementations.
        inputs (list[str] | None): Optional per-example input text that was used
            to generate each output. Used by make_reflective_dataset to provide
            context for reflection. When provided, inputs[i] corresponds to the
            input that produced outputs[i].

    Examples:
        Create a batch with optional traces:

        ```python
        batch = EvaluationBatch(
            outputs=["ok", "ok"],
            scores=[0.9, 0.8],
            trajectories=[{"trace": 1}, {"trace": 2}],
        )
        ```

        Create a batch with inputs for reflection:

        ```python
        batch = EvaluationBatch(
            outputs=["Hello!", "Good morrow!"],
            scores=[0.3, 0.9],
            inputs=["I am the King", "I am your friend"],
        )
        ```

    Note:
        All fields are immutable once created due to frozen=True.
        Use this as the standard return type from adapter evaluations.
        When metadata is not None, len(metadata) must equal len(outputs) and len(scores).
    """

    outputs: list[RolloutOutput]
    scores: list[Score]
    trajectories: list[Trajectory] | None = None
    objective_scores: list[dict[ComponentName, Score]] | None = None
    metadata: list[dict[str, Any]] | None = None
    inputs: list[str] | None = None

ComponentSelectorProtocol

Bases: Protocol


              flowchart TD
              gepa_adk.ComponentSelectorProtocol[ComponentSelectorProtocol]

              

              click gepa_adk.ComponentSelectorProtocol href "" "gepa_adk.ComponentSelectorProtocol"
            

Async protocol for component selection strategies.

Note

Adapters implementing this protocol determine which candidate components to update during mutation, enabling flexible evolution strategies.

Examples:

class MySelector:
    async def select_components(
        self, components: list[str], iteration: int, candidate_idx: int
    ) -> list[str]:
        return components[:1]
Source code in src/gepa_adk/ports/selector.py
@runtime_checkable
class ComponentSelectorProtocol(Protocol):
    """Async protocol for component selection strategies.

    Note:
        Adapters implementing this protocol determine which candidate components
        to update during mutation, enabling flexible evolution strategies.

    Examples:
        ```python
        class MySelector:
            async def select_components(
                self, components: list[str], iteration: int, candidate_idx: int
            ) -> list[str]:
                return components[:1]
        ```
    """

    async def select_components(
        self, components: list[str], iteration: int, candidate_idx: int
    ) -> list[str]:
        """Select components to update for the current iteration.

        Args:
            components: List of available component keys (e.g. ["instruction", "input_schema"]).
            iteration: Current global iteration number (0-based).
            candidate_idx: Index of the candidate being evolved.

        Returns:
            List of component keys to update.

        Raises:
            ValueError: If components list is empty.

        Note:
            Outputs a list of component keys to update, enabling selective
            mutation of specific candidate components.

        Examples:
            ```python
            selected = await selector.select_components(
                components=["instruction", "schema"], iteration=1, candidate_idx=0
            )
            ```
        """
        ...

select_components async

select_components(
    components: list[str],
    iteration: int,
    candidate_idx: int,
) -> list[str]

Select components to update for the current iteration.

PARAMETER DESCRIPTION
components

List of available component keys (e.g. ["instruction", "input_schema"]).

TYPE: list[str]

iteration

Current global iteration number (0-based).

TYPE: int

candidate_idx

Index of the candidate being evolved.

TYPE: int

RETURNS DESCRIPTION
list[str]

List of component keys to update.

RAISES DESCRIPTION
ValueError

If components list is empty.

Note

Outputs a list of component keys to update, enabling selective mutation of specific candidate components.

Examples:

selected = await selector.select_components(
    components=["instruction", "schema"], iteration=1, candidate_idx=0
)
Source code in src/gepa_adk/ports/selector.py
async def select_components(
    self, components: list[str], iteration: int, candidate_idx: int
) -> list[str]:
    """Select components to update for the current iteration.

    Args:
        components: List of available component keys (e.g. ["instruction", "input_schema"]).
        iteration: Current global iteration number (0-based).
        candidate_idx: Index of the candidate being evolved.

    Returns:
        List of component keys to update.

    Raises:
        ValueError: If components list is empty.

    Note:
        Outputs a list of component keys to update, enabling selective
        mutation of specific candidate components.

    Examples:
        ```python
        selected = await selector.select_components(
            components=["instruction", "schema"], iteration=1, candidate_idx=0
        )
        ```
    """
    ...

create_component_selector

create_component_selector(
    selector_type: str,
) -> ComponentSelectorProtocol

Create a component selector strategy from a string alias.

PARAMETER DESCRIPTION
selector_type

Name of the selector strategy. Supported values: - 'round_robin', 'roundrobin': Round-robin cycling. - 'all', 'all_components': All components simultaneously.

TYPE: str

RETURNS DESCRIPTION
ComponentSelectorProtocol

Instance of requested component selector.

RAISES DESCRIPTION
ValueError

If selector_type is unknown.

Examples:

# Create round-robin selector
selector = create_component_selector("round_robin")

# Create all-components selector
selector = create_component_selector("all")
Note

Supports flexible string aliases with normalization for common variations (underscores, hyphens, case-insensitive).

Source code in src/gepa_adk/adapters/component_selector.py
def create_component_selector(selector_type: str) -> ComponentSelectorProtocol:
    """Create a component selector strategy from a string alias.

    Args:
        selector_type: Name of the selector strategy.
            Supported values:
            - 'round_robin', 'roundrobin': Round-robin cycling.
            - 'all', 'all_components': All components simultaneously.

    Returns:
        Instance of requested component selector.

    Raises:
        ValueError: If selector_type is unknown.

    Examples:
        ```python
        # Create round-robin selector
        selector = create_component_selector("round_robin")

        # Create all-components selector
        selector = create_component_selector("all")
        ```

    Note:
        Supports flexible string aliases with normalization for common
        variations (underscores, hyphens, case-insensitive).
    """
    normalized = selector_type.lower().replace("_", "").replace("-", "")

    if normalized == "roundrobin":
        return RoundRobinComponentSelector()
    elif normalized in ("all", "allcomponents"):
        return AllComponentSelector()

    raise ValueError(f"Unknown component selector: {selector_type}")

normalize_feedback

normalize_feedback(
    score: float, metadata: dict[str, Any] | None
) -> dict[str, Any]

Normalize critic feedback to consistent trial format.

Converts both simple and advanced critic outputs to a standardized format for use in trial records. This enables the reflection agent to receive consistent feedback regardless of which critic schema was used.

PARAMETER DESCRIPTION
score

The numeric score from the critic (0.0-1.0).

TYPE: float

metadata

Optional metadata dict from critic output. May contain: - feedback (str): Simple feedback text - dimension_scores (dict): Per-dimension scores - actionable_guidance (str): Improvement suggestions - Any additional fields from critic output

TYPE: dict[str, Any] | None

RETURNS DESCRIPTION
dict[str, Any]

Normalized feedback dict with structure:

dict[str, Any]

```python

dict[str, Any]

{ "score": 0.75, "feedback_text": "Main feedback message", "dimension_scores": {...}, # Optional "actionable_guidance": "...", # Optional

dict[str, Any]

}

dict[str, Any]

```

Examples:

Normalize simple feedback:

normalized = normalize_feedback(0.8, {"feedback": "Good job"})
# {"score": 0.8, "feedback_text": "Good job"}

Normalize advanced feedback:

normalized = normalize_feedback(
    0.6,
    {
        "feedback": "Needs work",
        "dimension_scores": {"clarity": 0.5},
        "actionable_guidance": "Add examples",
    },
)
# {
#     "score": 0.6,
#     "feedback_text": "Needs work",
#     "dimension_scores": {"clarity": 0.5},
#     "actionable_guidance": "Add examples",
# }

Handle missing feedback:

normalized = normalize_feedback(0.5, None)
# {"score": 0.5, "feedback_text": ""}
Note

Supports both SimpleCriticOutput and CriticOutput schemas for flexible critic integration. Extracts the "feedback" field and renames it to "feedback_text" for consistent trial structure. Additional fields like dimension_scores are preserved when present.

Source code in src/gepa_adk/adapters/critic_scorer.py
def normalize_feedback(
    score: float,
    metadata: dict[str, Any] | None,
) -> dict[str, Any]:
    """Normalize critic feedback to consistent trial format.

    Converts both simple and advanced critic outputs to a standardized
    format for use in trial records. This enables the reflection agent
    to receive consistent feedback regardless of which critic schema
    was used.

    Args:
        score: The numeric score from the critic (0.0-1.0).
        metadata: Optional metadata dict from critic output. May contain:
            - feedback (str): Simple feedback text
            - dimension_scores (dict): Per-dimension scores
            - actionable_guidance (str): Improvement suggestions
            - Any additional fields from critic output

    Returns:
        Normalized feedback dict with structure:
        ```python
        {
            "score": 0.75,
            "feedback_text": "Main feedback message",
            "dimension_scores": {...},  # Optional
            "actionable_guidance": "...",  # Optional
        }
        ```

    Examples:
        Normalize simple feedback:

        ```python
        normalized = normalize_feedback(0.8, {"feedback": "Good job"})
        # {"score": 0.8, "feedback_text": "Good job"}
        ```

        Normalize advanced feedback:

        ```python
        normalized = normalize_feedback(
            0.6,
            {
                "feedback": "Needs work",
                "dimension_scores": {"clarity": 0.5},
                "actionable_guidance": "Add examples",
            },
        )
        # {
        #     "score": 0.6,
        #     "feedback_text": "Needs work",
        #     "dimension_scores": {"clarity": 0.5},
        #     "actionable_guidance": "Add examples",
        # }
        ```

        Handle missing feedback:

        ```python
        normalized = normalize_feedback(0.5, None)
        # {"score": 0.5, "feedback_text": ""}
        ```

    Note:
        Supports both SimpleCriticOutput and CriticOutput schemas for flexible
        critic integration. Extracts the "feedback" field and renames it to
        "feedback_text" for consistent trial structure. Additional fields
        like dimension_scores are preserved when present.
    """
    result: dict[str, Any] = {"score": score}

    if metadata is None:
        result["feedback_text"] = ""
        return result

    # Extract feedback text - handle both "feedback" and "feedback_text" keys
    feedback_text = metadata.get("feedback_text") or metadata.get("feedback") or ""
    if isinstance(feedback_text, str) and feedback_text.strip():
        result["feedback_text"] = feedback_text.strip()
    else:
        result["feedback_text"] = ""

    # Preserve dimension_scores if present
    dimension_scores = metadata.get("dimension_scores")
    if dimension_scores and isinstance(dimension_scores, dict):
        result["dimension_scores"] = dimension_scores

    # Preserve actionable_guidance if present
    actionable_guidance = metadata.get("actionable_guidance")
    if actionable_guidance and isinstance(actionable_guidance, str):
        guidance_str = actionable_guidance.strip()
        if guidance_str:
            result["actionable_guidance"] = guidance_str

    return result

evolve async

evolve(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    valset: list[dict[str, Any]] | None = None,
    critic: LlmAgent | None = None,
    reflection_agent: LlmAgent | None = None,
    config: EvolutionConfig | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    state_guard: StateGuard | None = None,
    candidate_selector: CandidateSelectorProtocol
    | str
    | None = None,
    component_selector: ComponentSelectorProtocol
    | str
    | None = None,
    executor: AgentExecutorProtocol | None = None,
    components: list[str] | None = None,
    schema_constraints: SchemaConstraints | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> EvolutionResult

Evolve an ADK agent's instruction.

Optimizes the instruction for a single ADK agent using evolutionary optimization. The agent's instruction is iteratively improved based on performance on the training set.

PARAMETER DESCRIPTION
agent

The ADK LlmAgent to evolve.

TYPE: LlmAgent

trainset

Training examples [{"input": "...", "expected": "..."}].

TYPE: list[dict[str, Any]]

valset

Optional validation examples used for scoring and acceptance. Defaults to the trainset when omitted.

TYPE: list[dict[str, Any]] | None DEFAULT: None

critic

Optional ADK agent for scoring (uses schema scoring if None).

TYPE: LlmAgent | None DEFAULT: None

reflection_agent

Optional ADK agent for proposals. If None, creates a default reflection agent using config.reflection_model.

TYPE: LlmAgent | None DEFAULT: None

config

Evolution configuration (uses defaults if None).

TYPE: EvolutionConfig | None DEFAULT: None

trajectory_config

Trajectory capture settings (uses defaults if None).

TYPE: TrajectoryConfig | None DEFAULT: None

state_guard

Optional state token preservation settings.

TYPE: StateGuard | None DEFAULT: None

candidate_selector

Optional selector instance or selector name.

TYPE: CandidateSelectorProtocol | str | None DEFAULT: None

component_selector

Optional selector instance or selector name for choosing which components to update.

TYPE: ComponentSelectorProtocol | str | None DEFAULT: None

executor

Optional AgentExecutorProtocol implementation for unified agent execution. When provided, both the ADKAdapter and CriticScorer use this executor for consistent session management and execution. If None, creates an AgentExecutor automatically.

TYPE: AgentExecutorProtocol | None DEFAULT: None

components

List of component names to include in evolution. Supported: - "instruction": The agent's instruction text (default if None). - "output_schema": The agent's Pydantic output_schema (serialized). When None, defaults to ["instruction"]. Use ["output_schema"] with a schema reflection agent to evolve the output schema.

TYPE: list[str] | None DEFAULT: None

schema_constraints

Optional SchemaConstraints for output_schema evolution. When provided, proposed schema mutations are validated against these constraints. Mutations that violate constraints (e.g., remove required fields) are rejected and the original schema is preserved.

TYPE: SchemaConstraints | None DEFAULT: None

app

Optional ADK App instance. When provided, evolution uses the app's configuration. Note that App does not hold services directly; pass a Runner for service extraction, or combine with session_service param. See the App/Runner integration guide for details.

TYPE: App | None DEFAULT: None

runner

Optional ADK Runner instance. When provided, evolution extracts and uses the runner's session_service for all agent executions (evolved agents, critic, and reflection agent). Takes precedence over both app and executor parameters. This enables seamless integration with existing ADK infrastructure.

TYPE: Runner | None DEFAULT: None

RETURNS DESCRIPTION
EvolutionResult

EvolutionResult with evolved_components dict and metrics.

RAISES DESCRIPTION
ConfigurationError

If invalid parameters provided.

EvolutionError

If evolution fails during execution.

Note

Single-agent evolution with trainset reflection and valset scoring.

Examples:

Basic usage with output_schema:

from pydantic import BaseModel, Field
from google.adk.agents import LlmAgent
from gepa_adk import evolve


class OutputSchema(BaseModel):
    answer: str
    score: float = Field(ge=0.0, le=1.0)


agent = LlmAgent(
    name="assistant",
    model="gemini-2.5-flash",
    instruction="You are a helpful assistant.",
    output_schema=OutputSchema,
)

trainset = [
    {"input": "What is 2+2?", "expected": "4"},
    {"input": "What is the capital of France?", "expected": "Paris"},
]

result = await evolve(agent, trainset)
print(f"Evolved: {result.evolved_components['instruction']}")

With critic agent:

from pydantic import BaseModel, Field
from google.adk.agents import LlmAgent
from gepa_adk import evolve


class CriticOutput(BaseModel):
    score: float = Field(ge=0.0, le=1.0)


critic = LlmAgent(
    name="critic",
    model="gemini-2.5-flash",
    instruction="Score the response quality.",
    output_schema=CriticOutput,
)

result = await evolve(agent, trainset, critic=critic)

Evolving output_schema with schema reflection:

from gepa_adk.engine.reflection_agents import create_schema_reflection_agent

# Create schema reflection agent with validation tool
schema_reflector = create_schema_reflection_agent("gemini-2.5-flash")

# Evolve output_schema component
result = await evolve(
    agent,
    trainset,
    critic=critic,
    reflection_agent=schema_reflector,
    components=["output_schema"],  # Evolve schema, not instruction
)
print(f"Evolved schema: {result.evolved_components['output_schema']}")

Using App/Runner for existing infrastructure integration:

from google.adk.apps.app import App
from google.adk.runners import Runner
from google.adk.sessions import DatabaseSessionService

# Configure Runner with your production session service
session_service = DatabaseSessionService(connection_string="...")
runner = Runner(
    app_name="my_app",
    agent=agent,
    session_service=session_service,
)

# Evolution uses your Runner's session_service for all operations
result = await evolve(
    agent,
    trainset,
    runner=runner,  # Services extracted from runner
)
Source code in src/gepa_adk/api.py
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
async def evolve(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    valset: list[dict[str, Any]] | None = None,
    critic: LlmAgent | None = None,
    reflection_agent: LlmAgent | None = None,
    config: EvolutionConfig | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    state_guard: StateGuard | None = None,
    candidate_selector: CandidateSelectorProtocol | str | None = None,
    component_selector: ComponentSelectorProtocol | str | None = None,
    executor: AgentExecutorProtocol | None = None,
    components: list[str] | None = None,
    schema_constraints: SchemaConstraints | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> EvolutionResult:
    """Evolve an ADK agent's instruction.

    Optimizes the instruction for a single ADK agent using evolutionary
    optimization. The agent's instruction is iteratively improved based on
    performance on the training set.

    Args:
        agent: The ADK LlmAgent to evolve.
        trainset: Training examples [{"input": "...", "expected": "..."}].
        valset: Optional validation examples used for scoring and acceptance.
            Defaults to the trainset when omitted.
        critic: Optional ADK agent for scoring (uses schema scoring if None).
        reflection_agent: Optional ADK agent for proposals. If None, creates a
            default reflection agent using config.reflection_model.
        config: Evolution configuration (uses defaults if None).
        trajectory_config: Trajectory capture settings (uses defaults if None).
        state_guard: Optional state token preservation settings.
        candidate_selector: Optional selector instance or selector name.
        component_selector: Optional selector instance or selector name for
            choosing which components to update.
        executor: Optional AgentExecutorProtocol implementation for unified
            agent execution. When provided, both the ADKAdapter and CriticScorer
            use this executor for consistent session management and execution.
            If None, creates an AgentExecutor automatically.
        components: List of component names to include in evolution. Supported:
            - "instruction": The agent's instruction text (default if None).
            - "output_schema": The agent's Pydantic output_schema (serialized).
            When None, defaults to ["instruction"]. Use ["output_schema"] with
            a schema reflection agent to evolve the output schema.
        schema_constraints: Optional SchemaConstraints for output_schema evolution.
            When provided, proposed schema mutations are validated against these
            constraints. Mutations that violate constraints (e.g., remove required
            fields) are rejected and the original schema is preserved.
        app: Optional ADK App instance. When provided, evolution uses the app's
            configuration. Note that App does not hold services directly; pass
            a Runner for service extraction, or combine with session_service param.
            See the App/Runner integration guide for details.
        runner: Optional ADK Runner instance. When provided, evolution extracts
            and uses the runner's session_service for all agent executions
            (evolved agents, critic, and reflection agent). Takes precedence
            over both app and executor parameters. This enables seamless
            integration with existing ADK infrastructure.

    Returns:
        EvolutionResult with evolved_components dict and metrics.

    Raises:
        ConfigurationError: If invalid parameters provided.
        EvolutionError: If evolution fails during execution.

    Note:
        Single-agent evolution with trainset reflection and valset scoring.

    Examples:
        Basic usage with output_schema:

        ```python
        from pydantic import BaseModel, Field
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve


        class OutputSchema(BaseModel):
            answer: str
            score: float = Field(ge=0.0, le=1.0)


        agent = LlmAgent(
            name="assistant",
            model="gemini-2.5-flash",
            instruction="You are a helpful assistant.",
            output_schema=OutputSchema,
        )

        trainset = [
            {"input": "What is 2+2?", "expected": "4"},
            {"input": "What is the capital of France?", "expected": "Paris"},
        ]

        result = await evolve(agent, trainset)
        print(f"Evolved: {result.evolved_components['instruction']}")
        ```

        With critic agent:

        ```python
        from pydantic import BaseModel, Field
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve


        class CriticOutput(BaseModel):
            score: float = Field(ge=0.0, le=1.0)


        critic = LlmAgent(
            name="critic",
            model="gemini-2.5-flash",
            instruction="Score the response quality.",
            output_schema=CriticOutput,
        )

        result = await evolve(agent, trainset, critic=critic)
        ```

        Evolving output_schema with schema reflection:

        ```python
        from gepa_adk.engine.reflection_agents import create_schema_reflection_agent

        # Create schema reflection agent with validation tool
        schema_reflector = create_schema_reflection_agent("gemini-2.5-flash")

        # Evolve output_schema component
        result = await evolve(
            agent,
            trainset,
            critic=critic,
            reflection_agent=schema_reflector,
            components=["output_schema"],  # Evolve schema, not instruction
        )
        print(f"Evolved schema: {result.evolved_components['output_schema']}")
        ```

        Using App/Runner for existing infrastructure integration:

        ```python
        from google.adk.apps.app import App
        from google.adk.runners import Runner
        from google.adk.sessions import DatabaseSessionService

        # Configure Runner with your production session service
        session_service = DatabaseSessionService(connection_string="...")
        runner = Runner(
            app_name="my_app",
            agent=agent,
            session_service=session_service,
        )

        # Evolution uses your Runner's session_service for all operations
        result = await evolve(
            agent,
            trainset,
            runner=runner,  # Services extracted from runner
        )
        ```
    """
    # Validate inputs
    _validate_evolve_inputs(agent, trainset)
    required_keys = (
        set(trainset[0].keys()) if trainset and len(trainset) > 0 else {"input"}
    )

    resolved_valset = valset if valset is not None else trainset
    if valset is not None:
        _validate_dataset(
            valset,
            "valset",
            allow_empty=False,
            required_keys=required_keys,
        )

    # Log reflection_agent configuration if provided
    if reflection_agent is not None:
        logger.debug(
            "evolve.reflection_agent.configured",
            agent_name=agent.name,
            reflection_agent_name=reflection_agent.name,
            message="Using ADK reflection agent for instruction improvement",
        )

    # Capture original instruction for StateGuard validation
    original_instruction = str(agent.instruction)

    candidate_selector_label = (
        candidate_selector
        if isinstance(candidate_selector, str)
        else type(candidate_selector).__name__
        if candidate_selector is not None
        else None
    )

    component_selector_label = (
        component_selector
        if isinstance(component_selector, str)
        else type(component_selector).__name__
        if component_selector is not None
        else None
    )

    # Log evolution start
    logger.info(
        "evolve.start",
        agent_name=agent.name,
        trainset_size=len(trainset),
        valset_size=len(resolved_valset),
        valset_defaulted=valset is None,
        has_critic=critic is not None,
        has_reflection_agent=reflection_agent is not None,
        has_state_guard=state_guard is not None,
        candidate_selector=candidate_selector_label,
        component_selector=component_selector_label,
    )

    # Resolve services from runner/app with precedence warnings (#227)
    # Log precedence warnings if multiple config sources provided (T009)
    if runner is not None and app is not None:
        logger.warning(
            "evolve.precedence.runner_over_app",
            message="Both runner and app provided; using runner (runner takes precedence)",
            runner_app_name=runner.app_name,
            app_name=app.name,
        )
    if runner is not None and executor is not None:
        logger.warning(
            "evolve.precedence.runner_over_executor",
            message="Both runner and executor provided; using runner's session_service",
            runner_app_name=runner.app_name,
        )

    # Extract services using precedence rules (T006)
    resolved_session_service, _artifact_service = _resolve_evolution_services(
        runner=runner,
        app=app,
        session_service=None,  # evolve() doesn't have direct session_service param
    )

    # Resolve app_name for session isolation (#239)
    resolved_app_name = _resolve_app_name(runner=runner, app=app)

    # Create executor with resolved session_service (T007)
    # Runner takes precedence over user-provided executor
    if runner is not None:
        resolved_executor = AgentExecutor(
            session_service=resolved_session_service,
            app_name=resolved_app_name,
        )
    else:
        resolved_executor = executor or AgentExecutor(
            session_service=resolved_session_service,
            app_name=resolved_app_name,
        )

    # Build scorer
    scorer: Scorer
    if critic:
        scorer = CriticScorer(critic_agent=critic, executor=resolved_executor)
    elif hasattr(agent, "output_schema") and agent.output_schema is not None:
        # Use schema-based scorer when agent has output_schema
        scorer = SchemaBasedScorer(output_schema=agent.output_schema)
    else:
        raise ConfigurationError(
            "Either critic must be provided or agent must have output_schema",
            field="critic",
            value=None,
            constraint="must provide critic or agent.output_schema",
        )

    # Resolve config
    resolved_config = config or EvolutionConfig()

    # Create reflection agent if not provided
    resolved_reflection_agent = reflection_agent
    if resolved_reflection_agent is None:
        # Create default reflection agent with config settings
        resolved_reflection_agent = LlmAgent(
            name="reflection_agent",
            model=_resolve_model_for_agent(resolved_config.reflection_model),
            instruction=resolved_config.reflection_prompt or REFLECTION_INSTRUCTION,
        )
        logger.debug(
            "evolve.reflection_agent.default",
            reflection_model=resolved_config.reflection_model,
        )

    # Create adapter with resolved session_service (T008)
    # The adapter passes session_service to create_adk_reflection_fn()
    # ensuring the reflection agent shares the same session service
    adapter = ADKAdapter(
        agent=agent,
        scorer=scorer,
        trajectory_config=trajectory_config,
        reflection_agent=resolved_reflection_agent,
        executor=resolved_executor,
        schema_constraints=schema_constraints,
        session_service=resolved_session_service,
    )

    # Build initial candidate components based on requested components
    resolved_components = components if components else [DEFAULT_COMPONENT_NAME]
    initial_components: dict[str, str] = {}
    original_component_values: dict[str, str] = {}

    for comp_name in resolved_components:
        if comp_name == DEFAULT_COMPONENT_NAME:
            initial_components[comp_name] = original_instruction
            original_component_values[comp_name] = original_instruction
        elif comp_name == COMPONENT_OUTPUT_SCHEMA:
            if not hasattr(agent, "output_schema") or agent.output_schema is None:
                raise ConfigurationError(
                    f"Cannot evolve '{COMPONENT_OUTPUT_SCHEMA}': agent has no output_schema",
                    field="components",
                    value=comp_name,
                    constraint="agent must have output_schema to evolve it",
                )
            schema_text = serialize_pydantic_schema(agent.output_schema)
            initial_components[comp_name] = schema_text
            original_component_values[comp_name] = schema_text
        else:
            raise ConfigurationError(
                f"Unknown component: '{comp_name}'. Supported: "
                f"'{DEFAULT_COMPONENT_NAME}', '{COMPONENT_OUTPUT_SCHEMA}'",
                field="components",
                value=comp_name,
                constraint="must be a supported component name",
            )

    logger.debug(
        "evolve.components.resolved",
        agent_name=agent.name,
        components=resolved_components,
    )

    initial_candidate = Candidate(components=initial_components)

    # Create engine
    resolved_candidate_selector: CandidateSelectorProtocol | None = None
    if candidate_selector is not None:
        if isinstance(candidate_selector, str):
            resolved_candidate_selector = create_candidate_selector(candidate_selector)
        else:
            resolved_candidate_selector = candidate_selector

    resolved_component_selector: ComponentSelectorProtocol | None = None
    if component_selector is not None:
        if isinstance(component_selector, str):
            resolved_component_selector = create_component_selector(component_selector)
        else:
            resolved_component_selector = component_selector

    engine = AsyncGEPAEngine(
        adapter=adapter,
        config=resolved_config,
        initial_candidate=initial_candidate,
        batch=trainset,
        valset=resolved_valset,
        candidate_selector=resolved_candidate_selector,
        component_selector=resolved_component_selector,
    )

    # Run evolution with cleanup
    try:
        result = await engine.run()

        valset_score = result.valset_score
        trainset_score = result.trainset_score

        if trainset_score is not None:
            logger.info(
                "evolve.trainset.scored",
                agent_name=agent.name,
                trainset_size=len(trainset),
                trainset_score=trainset_score,
            )
        if valset_score is not None:
            logger.info(
                "evolve.valset.scored",
                agent_name=agent.name,
                valset_size=len(resolved_valset),
                valset_score=valset_score,
                valset_defaulted=valset is None,
            )

        # Apply state guard validation if provided (for token preservation)
        # Only applies to text components (instruction), not to output_schema
        validated_components = dict(result.evolved_components)
        for comp_name in resolved_components:
            if comp_name in result.evolved_components:
                if comp_name == DEFAULT_COMPONENT_NAME and state_guard is not None:
                    validated_components[comp_name] = _apply_state_guard_validation(
                        state_guard=state_guard,
                        original_component_text=original_component_values[comp_name],
                        evolved_component_text=result.evolved_components[comp_name],
                        agent_name=agent.name,
                    )
                else:
                    validated_components[comp_name] = result.evolved_components[
                        comp_name
                    ]

        # Log evolution completion
        logger.info(
            "evolve.complete",
            agent_name=agent.name,
            original_score=result.original_score,
            final_score=result.final_score,
            improvement=result.improvement,
            total_iterations=result.total_iterations,
            valset_score=valset_score,
            trainset_score=trainset_score,
            components=resolved_components,
        )

        # Return result with validated evolved_components and valset_score
        # (creates new instance since frozen)
        return EvolutionResult(
            original_score=result.original_score,
            final_score=result.final_score,
            evolved_components=validated_components,
            iteration_history=result.iteration_history,
            total_iterations=result.total_iterations,
            valset_score=valset_score,
            trainset_score=trainset_score,
        )
    finally:
        # Clean up adapter resources (clears handler constraints)
        adapter.cleanup()

evolve_group async

evolve_group(
    agents: dict[str, LlmAgent],
    primary: str,
    trainset: list[dict[str, Any]],
    components: dict[str, list[str]] | None = None,
    critic: LlmAgent | None = None,
    share_session: bool = True,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol
    | str
    | None = None,
    reflection_agent: LlmAgent | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    workflow: SequentialAgent
    | LoopAgent
    | ParallelAgent
    | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult

Evolve multiple agents together with per-agent component configuration.

Optimizes specified components for each agent by targeting the primary agent's output score. When share_session=True, agents execute sequentially with shared session state, enabling later agents to access earlier agents' outputs via template strings.

PARAMETER DESCRIPTION
agents

Named ADK agents to evolve together as dict mapping agent names to LlmAgent instances. Must have at least one agent.

TYPE: dict[str, LlmAgent]

primary

Name of the agent whose output is used for scoring. Must match one of the agent names in the dict.

TYPE: str

trainset

Training examples for evaluation. Each example should have an "input" key and optionally an "expected" key.

TYPE: list[dict[str, Any]]

components

Per-agent component configuration mapping agent names to lists of component names to evolve. If None, defaults to evolving "instruction" for all agents. Use empty list to exclude an agent from evolution. Available component names: "instruction", "output_schema", "generate_content_config".

TYPE: dict[str, list[str]] | None DEFAULT: None

critic

Optional critic agent for scoring. If None, the primary agent must have an output_schema for schema-based scoring.

TYPE: LlmAgent | None DEFAULT: None

share_session

Whether agents share session state during execution. When True (default), uses SequentialAgent. When False, agents execute with isolated sessions.

TYPE: bool DEFAULT: True

config

Evolution configuration. If None, uses EvolutionConfig defaults.

TYPE: EvolutionConfig | None DEFAULT: None

state_guard

Optional StateGuard instance for validating and repairing state injection tokens in evolved instructions.

TYPE: StateGuard | None DEFAULT: None

component_selector

Optional selector instance or selector name for choosing which components to update.

TYPE: ComponentSelectorProtocol | str | None DEFAULT: None

reflection_agent

Optional ADK agent for proposals. If None, creates a default reflection agent using config.reflection_model.

TYPE: LlmAgent | None DEFAULT: None

trajectory_config

Trajectory capture settings (uses defaults if None).

TYPE: TrajectoryConfig | None DEFAULT: None

workflow

Optional original workflow structure to preserve during evaluation. When provided, LoopAgent iterations and ParallelAgent concurrency are preserved instead of flattening to SequentialAgent. Used internally by evolve_workflow(); not typically set directly.

TYPE: SequentialAgent | LoopAgent | ParallelAgent | None DEFAULT: None

session_service

Optional ADK session service for state management. If None (default), creates an InMemorySessionService internally. Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService) to persist sessions alongside other agent executions in a shared database.

TYPE: BaseSessionService | None DEFAULT: None

app

Optional ADK App instance. When provided, evolution uses the app's configuration. Note that App does not hold services directly; pass a Runner for service extraction, or combine with session_service param.

TYPE: App | None DEFAULT: None

runner

Optional ADK Runner instance. When provided, evolution extracts and uses the runner's session_service for all agent executions (evolved agents, critic, and reflection agent). Takes precedence over both app and session_service parameters. This enables seamless integration with existing ADK infrastructure.

TYPE: Runner | None DEFAULT: None

RETURNS DESCRIPTION
MultiAgentEvolutionResult

MultiAgentEvolutionResult containing evolved_components dict

MultiAgentEvolutionResult

mapping qualified component names (agent.component format) to their

MultiAgentEvolutionResult

optimized values, along with score metrics and iteration history.

RAISES DESCRIPTION
MultiAgentValidationError

If agents dict is empty, primary agent not found, or no scorer and primary lacks output_schema.

ValueError

If components mapping contains unknown agents, unknown component handlers, or is missing entries for agents.

EvolutionError

If evolution fails during execution.

Examples:

Basic usage with per-agent components (API v0.3.x):

from google.adk.agents import LlmAgent
from gepa_adk import evolve_group

generator = LlmAgent(
    name="generator",
    model="gemini-2.5-flash",
    instruction="Generate code based on the requirement.",
)
critic = LlmAgent(
    name="critic",
    model="gemini-2.5-flash",
    instruction="Review the code in {generator_output}.",
)
validator = LlmAgent(
    name="validator",
    model="gemini-2.5-flash",
    instruction="Validate the reviewed code.",
    output_schema=ValidationResult,
)

result = await evolve_group(
    agents={
        "generator": generator,
        "critic": critic,
        "validator": validator,
    },
    primary="validator",
    trainset=training_data,
    components={
        "generator": ["instruction", "output_schema"],
        "critic": ["instruction"],
        "validator": ["instruction"],
    },
)

# Access evolved components using qualified names
print(result.evolved_components["generator.instruction"])
print(result.evolved_components["critic.instruction"])
print(result.evolved_components["validator.instruction"])

Exclude an agent from evolution:

result = await evolve_group(
    agents={"generator": gen, "static_validator": val},
    primary="generator",
    trainset=training_data,
    components={
        "generator": ["instruction"],
        "static_validator": [],  # Excluded from evolution
    },
)

Using custom session service for persistence:

from google.adk.sessions import SqliteSessionService

# Use SQLite for session persistence
session_service = SqliteSessionService(db_path="evolution_sessions.db")

result = await evolve_group(
    agents={"generator": gen, "critic": critic},
    primary="critic",
    trainset=training_data,
    session_service=session_service,  # Sessions persisted to SQLite
)

Using App/Runner for existing infrastructure integration:

from google.adk.runners import Runner
from google.adk.sessions import DatabaseSessionService

# Configure Runner with your production session service
runner = Runner(
    app_name="my_app",
    agent=generator,  # Any agent from the group
    session_service=DatabaseSessionService(connection_string="..."),
)

# Evolution uses Runner's session_service for all operations
result = await evolve_group(
    agents={"generator": gen, "refiner": ref},
    primary="refiner",
    trainset=training_data,
    runner=runner,  # Services extracted from runner
)
Note

Breaking change in v0.3.x: The agents parameter changed from list[LlmAgent] to dict[str, LlmAgent]. Candidate keys now use qualified names (agent.component) instead of {agent_name}_instruction.

Source code in src/gepa_adk/api.py
async def evolve_group(
    agents: dict[str, LlmAgent],
    primary: str,
    trainset: list[dict[str, Any]],
    components: dict[str, list[str]] | None = None,
    critic: LlmAgent | None = None,
    share_session: bool = True,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol | str | None = None,
    reflection_agent: LlmAgent | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    workflow: SequentialAgent | LoopAgent | ParallelAgent | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult:
    """Evolve multiple agents together with per-agent component configuration.

    Optimizes specified components for each agent by targeting the primary
    agent's output score. When share_session=True, agents execute sequentially
    with shared session state, enabling later agents to access earlier
    agents' outputs via template strings.

    Args:
        agents: Named ADK agents to evolve together as dict mapping agent
            names to LlmAgent instances. Must have at least one agent.
        primary: Name of the agent whose output is used for scoring.
            Must match one of the agent names in the dict.
        trainset: Training examples for evaluation. Each example should
            have an "input" key and optionally an "expected" key.
        components: Per-agent component configuration mapping agent names
            to lists of component names to evolve. If None, defaults to
            evolving "instruction" for all agents. Use empty list to
            exclude an agent from evolution. Available component names:
            "instruction", "output_schema", "generate_content_config".
        critic: Optional critic agent for scoring. If None, the primary
            agent must have an output_schema for schema-based scoring.
        share_session: Whether agents share session state during
            execution. When True (default), uses SequentialAgent.
            When False, agents execute with isolated sessions.
        config: Evolution configuration. If None, uses EvolutionConfig
            defaults.
        state_guard: Optional StateGuard instance for validating and
            repairing state injection tokens in evolved instructions.
        component_selector: Optional selector instance or selector name for
            choosing which components to update.
        reflection_agent: Optional ADK agent for proposals. If None, creates a
            default reflection agent using config.reflection_model.
        trajectory_config: Trajectory capture settings (uses defaults if None).
        workflow: Optional original workflow structure to preserve during
            evaluation. When provided, LoopAgent iterations and ParallelAgent
            concurrency are preserved instead of flattening to SequentialAgent.
            Used internally by evolve_workflow(); not typically set directly.
        session_service: Optional ADK session service for state management.
            If None (default), creates an InMemorySessionService internally.
            Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService)
            to persist sessions alongside other agent executions in a shared database.
        app: Optional ADK App instance. When provided, evolution uses the app's
            configuration. Note that App does not hold services directly; pass
            a Runner for service extraction, or combine with session_service param.
        runner: Optional ADK Runner instance. When provided, evolution extracts
            and uses the runner's session_service for all agent executions
            (evolved agents, critic, and reflection agent). Takes precedence
            over both app and session_service parameters. This enables seamless
            integration with existing ADK infrastructure.

    Returns:
        MultiAgentEvolutionResult containing evolved_components dict
        mapping qualified component names (agent.component format) to their
        optimized values, along with score metrics and iteration history.

    Raises:
        MultiAgentValidationError: If agents dict is empty, primary agent
            not found, or no scorer and primary lacks output_schema.
        ValueError: If components mapping contains unknown agents, unknown
            component handlers, or is missing entries for agents.
        EvolutionError: If evolution fails during execution.

    Examples:
        Basic usage with per-agent components (API v0.3.x):

        ```python
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve_group

        generator = LlmAgent(
            name="generator",
            model="gemini-2.5-flash",
            instruction="Generate code based on the requirement.",
        )
        critic = LlmAgent(
            name="critic",
            model="gemini-2.5-flash",
            instruction="Review the code in {generator_output}.",
        )
        validator = LlmAgent(
            name="validator",
            model="gemini-2.5-flash",
            instruction="Validate the reviewed code.",
            output_schema=ValidationResult,
        )

        result = await evolve_group(
            agents={
                "generator": generator,
                "critic": critic,
                "validator": validator,
            },
            primary="validator",
            trainset=training_data,
            components={
                "generator": ["instruction", "output_schema"],
                "critic": ["instruction"],
                "validator": ["instruction"],
            },
        )

        # Access evolved components using qualified names
        print(result.evolved_components["generator.instruction"])
        print(result.evolved_components["critic.instruction"])
        print(result.evolved_components["validator.instruction"])
        ```

        Exclude an agent from evolution:

        ```python
        result = await evolve_group(
            agents={"generator": gen, "static_validator": val},
            primary="generator",
            trainset=training_data,
            components={
                "generator": ["instruction"],
                "static_validator": [],  # Excluded from evolution
            },
        )
        ```

        Using custom session service for persistence:

        ```python
        from google.adk.sessions import SqliteSessionService

        # Use SQLite for session persistence
        session_service = SqliteSessionService(db_path="evolution_sessions.db")

        result = await evolve_group(
            agents={"generator": gen, "critic": critic},
            primary="critic",
            trainset=training_data,
            session_service=session_service,  # Sessions persisted to SQLite
        )
        ```

        Using App/Runner for existing infrastructure integration:

        ```python
        from google.adk.runners import Runner
        from google.adk.sessions import DatabaseSessionService

        # Configure Runner with your production session service
        runner = Runner(
            app_name="my_app",
            agent=generator,  # Any agent from the group
            session_service=DatabaseSessionService(connection_string="..."),
        )

        # Evolution uses Runner's session_service for all operations
        result = await evolve_group(
            agents={"generator": gen, "refiner": ref},
            primary="refiner",
            trainset=training_data,
            runner=runner,  # Services extracted from runner
        )
        ```

    Note:
        Breaking change in v0.3.x: The `agents` parameter changed from
        `list[LlmAgent]` to `dict[str, LlmAgent]`. Candidate keys now use
        qualified names (agent.component) instead of {agent_name}_instruction.
    """
    # Validate agent names are valid identifiers (T012a)
    for agent_name in agents:
        _validate_component_name(
            agent_name,
            context="evolve_group agent",
        )

    # Default components: evolve "instruction" for all agents
    if components is None:
        components = {name: ["instruction"] for name in agents}

    # Capture original instructions for StateGuard validation
    original_instructions = {
        name: str(agent.instruction) for name, agent in agents.items()
    }

    # Log precedence warnings if multiple config sources provided (#227 T009)
    if runner is not None and app is not None:
        logger.warning(
            "evolve_group.precedence.runner_over_app",
            message="Both runner and app provided; using runner (runner takes precedence)",
            runner_app_name=runner.app_name,
            app_name=app.name,
        )

    # Resolve services using precedence rules: runner > app > session_service > default (#227)
    resolved_session_service, _artifact_service = _resolve_evolution_services(
        runner=runner,
        app=app,
        session_service=session_service,
    )

    # Resolve app_name for session isolation (#239)
    resolved_app_name = _resolve_app_name(runner=runner, app=app)

    # Create unified executor for consistent session management (FR-003)
    executor = AgentExecutor(
        session_service=resolved_session_service,
        app_name=resolved_app_name,
    )

    # Build scorer with executor (FR-005)
    scorer = None
    if critic:
        scorer = CriticScorer(critic_agent=critic, executor=executor)

    # Resolve config for reflection_model
    resolved_config = config or EvolutionConfig()

    # Create reflection-based proposer with executor (FR-006)
    # Use provided reflection_agent or create a default one
    if reflection_agent is None:
        reflection_agent = LlmAgent(
            name="reflection_agent",
            model=_resolve_model_for_agent(resolved_config.reflection_model),
            instruction=resolved_config.reflection_prompt or REFLECTION_INSTRUCTION,
        )
    adk_reflection_fn = create_adk_reflection_fn(
        reflection_agent,
        executor=executor,
        session_service=resolved_session_service,
    )
    proposer = AsyncReflectiveMutationProposer(adk_reflection_fn=adk_reflection_fn)

    # Create adapter with executor (FR-004)
    adapter = MultiAgentAdapter(
        agents=agents,
        primary=primary,
        components=components,
        scorer=scorer,
        share_session=share_session,
        session_service=resolved_session_service,
        trajectory_config=trajectory_config,
        proposer=proposer,
        executor=executor,
        workflow=workflow,  # Preserve workflow structure (#215)
    )

    # Build seed candidate using qualified names (agent.component format per ADR-012)
    primary_agent = agents[primary]
    # Extract all configured components using their handlers
    seed_candidate_components: dict[str, str] = {}
    for agent_name, comp_list in components.items():
        agent = agents[agent_name]
        for comp_name in comp_list:
            qualified_name = f"{agent_name}.{comp_name}"
            handler = get_handler(comp_name)
            seed_candidate_components[qualified_name] = handler.serialize(agent)
    # Add required "instruction" key for engine compatibility
    seed_candidate_components["instruction"] = str(primary_agent.instruction)
    initial_candidate = Candidate(components=seed_candidate_components)

    # Create engine
    resolved_component_selector: ComponentSelectorProtocol | None = None
    if component_selector is not None:
        if isinstance(component_selector, str):
            resolved_component_selector = create_component_selector(component_selector)
        else:
            resolved_component_selector = component_selector

    engine = AsyncGEPAEngine(
        adapter=adapter,
        config=resolved_config,
        initial_candidate=initial_candidate,
        batch=trainset,
        component_selector=resolved_component_selector,
    )

    # Run evolution
    evolution_result = await engine.run()

    # Extract best candidate components from engine state using qualified names
    # The engine stores evolved_components from the candidate, which now uses
    # qualified names (agent.component format per ADR-012)
    evolved_components = _extract_evolved_components(
        evolution_result=evolution_result,
        seed_components=seed_candidate_components,
        agents=agents,
        components=components,
        primary=primary,
    )

    # Apply StateGuard validation to instruction components only
    if state_guard is not None:
        validated_components = {}
        for qualified_name, evolved_value in evolved_components.items():
            # Only apply StateGuard to instruction components
            if qualified_name.endswith(".instruction"):
                agent_name = qualified_name.rsplit(".", 1)[0]
                original_instruction = original_instructions.get(agent_name, "")
                validated_components[qualified_name] = _apply_state_guard_validation(
                    state_guard=state_guard,
                    original_component_text=original_instruction,
                    evolved_component_text=evolved_value,
                    agent_name=agent_name,
                )
            else:
                # Non-instruction components pass through unchanged
                validated_components[qualified_name] = evolved_value
        evolved_components = validated_components

    # Convert EvolutionResult to MultiAgentEvolutionResult
    return MultiAgentEvolutionResult(
        evolved_components=evolved_components,
        original_score=evolution_result.original_score,
        final_score=evolution_result.final_score,
        primary_agent=primary,
        iteration_history=evolution_result.iteration_history,
        total_iterations=evolution_result.total_iterations,
    )

evolve_sync

evolve_sync(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    **kwargs: Any,
) -> EvolutionResult

Synchronous wrapper for evolve().

Runs the async evolve() function in a blocking manner. Handles nested event loops automatically (Jupyter compatible).

PARAMETER DESCRIPTION
agent

The ADK LlmAgent to evolve.

TYPE: LlmAgent

trainset

Training examples.

TYPE: list[dict[str, Any]]

**kwargs

Optional keyword arguments passed to evolve().

TYPE: Any DEFAULT: {}

PARAMETER DESCRIPTION
valset

Optional validation examples for held-out evaluation.

TYPE: list[dict[str, Any]] | None

critic

Optional ADK agent for scoring.

TYPE: LlmAgent | None

reflection_agent

Optional ADK agent for proposals (not yet implemented).

TYPE: LlmAgent | None

config

EvolutionConfig for customizing evolution parameters.

TYPE: EvolutionConfig | None

trajectory_config

TrajectoryConfig for trace capture settings.

TYPE: TrajectoryConfig | None

state_guard

Optional state token preservation settings.

TYPE: StateGuard | None

candidate_selector

Optional selector instance or selector name.

TYPE: CandidateSelectorProtocol | str | None

executor

Optional unified agent executor for consistent session management across all agent types.

TYPE: AgentExecutorProtocol | None

RETURNS DESCRIPTION
EvolutionResult

EvolutionResult with evolved_components dict and metrics.

RAISES DESCRIPTION
ConfigurationError

If invalid parameters provided.

EvolutionError

If evolution fails during execution.

Examples:

Basic usage in a script:

from pydantic import BaseModel, Field
from google.adk.agents import LlmAgent
from gepa_adk import evolve_sync


class OutputSchema(BaseModel):
    answer: str
    score: float = Field(ge=0.0, le=1.0)


agent = LlmAgent(
    name="assistant",
    model="gemini-2.5-flash",
    instruction="You are a helpful assistant.",
    output_schema=OutputSchema,
)

trainset = [
    {"input": "What is 2+2?", "expected": "4"},
]

result = evolve_sync(agent, trainset)
print(f"Evolved: {result.evolved_components['instruction']}")

With configuration:

from gepa_adk import evolve_sync, EvolutionConfig

config = EvolutionConfig(max_iterations=50)
result = evolve_sync(agent, trainset, config=config)
Note

Synchronous wrapper for scripts and Jupyter notebooks. Automatically handles nested event loops using nest_asyncio when needed.

Source code in src/gepa_adk/api.py
def evolve_sync(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    **kwargs: Any,
) -> EvolutionResult:
    """Synchronous wrapper for evolve().

    Runs the async evolve() function in a blocking manner.
    Handles nested event loops automatically (Jupyter compatible).

    Args:
        agent: The ADK LlmAgent to evolve.
        trainset: Training examples.
        **kwargs: Optional keyword arguments passed to evolve().

    Keyword Args:
        valset (list[dict[str, Any]] | None): Optional validation examples for
            held-out evaluation.
        critic (LlmAgent | None): Optional ADK agent for scoring.
        reflection_agent (LlmAgent | None): Optional ADK agent for proposals
            (not yet implemented).
        config (EvolutionConfig | None): EvolutionConfig for customizing
            evolution parameters.
        trajectory_config (TrajectoryConfig | None): TrajectoryConfig for trace
            capture settings.
        state_guard (StateGuard | None): Optional state token preservation
            settings.
        candidate_selector (CandidateSelectorProtocol | str | None): Optional
            selector instance or selector name.
        executor (AgentExecutorProtocol | None): Optional unified agent executor
            for consistent session management across all agent types.

    Returns:
        EvolutionResult with evolved_components dict and metrics.

    Raises:
        ConfigurationError: If invalid parameters provided.
        EvolutionError: If evolution fails during execution.

    Examples:
        Basic usage in a script:

        ```python
        from pydantic import BaseModel, Field
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve_sync


        class OutputSchema(BaseModel):
            answer: str
            score: float = Field(ge=0.0, le=1.0)


        agent = LlmAgent(
            name="assistant",
            model="gemini-2.5-flash",
            instruction="You are a helpful assistant.",
            output_schema=OutputSchema,
        )

        trainset = [
            {"input": "What is 2+2?", "expected": "4"},
        ]

        result = evolve_sync(agent, trainset)
        print(f"Evolved: {result.evolved_components['instruction']}")
        ```

        With configuration:

        ```python
        from gepa_adk import evolve_sync, EvolutionConfig

        config = EvolutionConfig(max_iterations=50)
        result = evolve_sync(agent, trainset, config=config)
        ```

    Note:
        Synchronous wrapper for scripts and Jupyter notebooks. Automatically
        handles nested event loops using nest_asyncio when needed.
    """
    import asyncio

    try:
        # Try standard asyncio.run() first
        return asyncio.run(evolve(agent, trainset, **kwargs))
    except RuntimeError as e:
        # Handle nested event loop case (e.g., Jupyter notebooks)
        if "asyncio.run() cannot be called from a running event loop" in str(e):
            # Use nest_asyncio for nested event loops
            try:
                import nest_asyncio

                nest_asyncio.apply()
                # Now we can use asyncio.run() even in nested context
                return asyncio.run(evolve(agent, trainset, **kwargs))
            except ImportError:
                # We're here because asyncio.run() failed due to running event loop.
                # Without nest_asyncio, we can't handle nested event loops.
                raise RuntimeError(
                    "nest_asyncio is required for nested event loops. "
                    "Install it with: uv add nest_asyncio"
                ) from e
        raise

evolve_workflow async

evolve_workflow(
    workflow: SequentialAgent | LoopAgent | ParallelAgent,
    trainset: list[dict[str, Any]],
    critic: LlmAgent | None = None,
    primary: str | None = None,
    max_depth: int = 5,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol
    | str
    | None = None,
    round_robin: bool = False,
    components: dict[str, list[str]] | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult

Evolve LlmAgents within a workflow agent structure.

Discovers all LlmAgent instances within a workflow (SequentialAgent, LoopAgent, or ParallelAgent) and evolves them while preserving the workflow structure. Uses shared session state to maintain workflow context during evaluation.

PARAMETER DESCRIPTION
workflow

Workflow agent containing LlmAgents to evolve. Must be SequentialAgent, LoopAgent, or ParallelAgent.

TYPE: SequentialAgent | LoopAgent | ParallelAgent

trainset

Training examples for evaluation. Each example should have an "input" key and optionally an "expected" key.

TYPE: list[dict[str, Any]]

critic

Optional critic agent for scoring. If None, the primary agent must have an output_schema for schema-based scoring.

TYPE: LlmAgent | None DEFAULT: None

primary

Name of the agent to score. Defaults to the last LlmAgent found in the workflow (for sequential workflows, this is typically the final output producer).

TYPE: str | None DEFAULT: None

max_depth

Maximum recursion depth for nested workflows (default: 5). Limits how deeply nested workflow structures are traversed.

TYPE: int DEFAULT: 5

config

Evolution configuration. If None, uses EvolutionConfig defaults.

TYPE: EvolutionConfig | None DEFAULT: None

state_guard

Optional StateGuard instance for validating and repairing state injection tokens in evolved component_text.

TYPE: StateGuard | None DEFAULT: None

component_selector

Optional selector instance or selector name for choosing which components to update.

TYPE: ComponentSelectorProtocol | str | None DEFAULT: None

round_robin

If False (default), only the first discovered agent's instruction is evolved across all iterations. If True, all agents' instructions are evolved in round-robin fashion (the engine cycles through agents each iteration). Ignored when components is provided.

TYPE: bool DEFAULT: False

components

Optional per-agent component configuration mapping agent names to lists of component names to evolve. When provided, takes precedence over round_robin. Use empty list to exclude an agent.

TYPE: dict[str, list[str]] | None DEFAULT: None

session_service

Optional ADK session service for state management. If None (default), creates an InMemorySessionService internally. Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService) to persist sessions alongside other agent executions in a shared database.

TYPE: BaseSessionService | None DEFAULT: None

app

Optional ADK App instance. When provided, evolution uses the app's configuration. Note that App does not hold services directly; pass a Runner for service extraction, or combine with session_service param.

TYPE: App | None DEFAULT: None

runner

Optional ADK Runner instance. When provided, evolution extracts and uses the runner's session_service for all agent executions (evolved agents, critic, and reflection agent). Takes precedence over both app and session_service parameters. This enables seamless integration with existing ADK infrastructure.

TYPE: Runner | None DEFAULT: None

RETURNS DESCRIPTION
MultiAgentEvolutionResult

MultiAgentEvolutionResult containing evolved_components dict mapping

MultiAgentEvolutionResult

agent names to their optimized component_text, along with score

MultiAgentEvolutionResult

metrics and iteration history.

RAISES DESCRIPTION
WorkflowEvolutionError

If workflow contains no LlmAgents.

MultiAgentValidationError

If primary agent not found or no scorer available.

EvolutionError

If evolution fails during execution.

Examples:

Default behavior (evolve first agent only):

from google.adk.agents import LlmAgent, SequentialAgent
from gepa_adk import evolve_workflow

generator = LlmAgent(name="generator", instruction="Generate code")
refiner = LlmAgent(name="refiner", instruction="Refine code")
writer = LlmAgent(name="writer", instruction="Write docs")
pipeline = SequentialAgent(
    name="Pipeline", sub_agents=[generator, refiner, writer]
)

# Only generator.instruction is evolved across all iterations
result = await evolve_workflow(workflow=pipeline, trainset=trainset)

Round-robin evolution (evolve all agents):

# All agents are evolved in round-robin: generator -> refiner -> writer -> ...
result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    round_robin=True,
)

Explicit components override (takes precedence over round_robin):

# Only generator and writer are evolved; refiner is excluded
result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    components={
        "generator": ["instruction"],
        "writer": ["instruction"],
        "refiner": [],  # Excluded
    },
)

Using custom session service for persistence:

from google.adk.sessions import SqliteSessionService

# Persist workflow evolution sessions to SQLite
session_service = SqliteSessionService(db_path="workflow_sessions.db")

result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    session_service=session_service,
)

Using App/Runner for existing infrastructure integration:

from google.adk.runners import Runner
from google.adk.sessions import DatabaseSessionService

# Configure Runner with your production session service
runner = Runner(
    app_name="my_workflow_app",
    agent=pipeline,  # The workflow agent
    session_service=DatabaseSessionService(connection_string="..."),
)

# Evolution uses Runner's session_service for all operations
result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    runner=runner,  # Services extracted from runner
)
Note

Supports workflow agents (SequentialAgent, LoopAgent, ParallelAgent) with recursive traversal and depth limiting via max_depth parameter. Handles nested structures. LoopAgent and ParallelAgent configurations (max_iterations, etc.) are preserved during evolution. Always uses share_session=True to maintain workflow context (FR-010).

Source code in src/gepa_adk/api.py
async def evolve_workflow(
    workflow: SequentialAgent | LoopAgent | ParallelAgent,
    trainset: list[dict[str, Any]],
    critic: LlmAgent | None = None,
    primary: str | None = None,
    max_depth: int = 5,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol | str | None = None,
    round_robin: bool = False,
    components: dict[str, list[str]] | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult:
    """Evolve LlmAgents within a workflow agent structure.

    Discovers all LlmAgent instances within a workflow (SequentialAgent,
    LoopAgent, or ParallelAgent) and evolves them while preserving the
    workflow structure. Uses shared session state to maintain workflow
    context during evaluation.

    Args:
        workflow: Workflow agent containing LlmAgents to evolve. Must be
            SequentialAgent, LoopAgent, or ParallelAgent.
        trainset: Training examples for evaluation. Each example should have
            an "input" key and optionally an "expected" key.
        critic: Optional critic agent for scoring. If None, the primary agent
            must have an output_schema for schema-based scoring.
        primary: Name of the agent to score. Defaults to the last LlmAgent
            found in the workflow (for sequential workflows, this is typically
            the final output producer).
        max_depth: Maximum recursion depth for nested workflows (default: 5).
            Limits how deeply nested workflow structures are traversed.
        config: Evolution configuration. If None, uses EvolutionConfig defaults.
        state_guard: Optional StateGuard instance for validating and
            repairing state injection tokens in evolved component_text.
        component_selector: Optional selector instance or selector name for
            choosing which components to update.
        round_robin: If False (default), only the first discovered agent's
            instruction is evolved across all iterations. If True, all agents'
            instructions are evolved in round-robin fashion (the engine cycles
            through agents each iteration). Ignored when components is provided.
        components: Optional per-agent component configuration mapping agent
            names to lists of component names to evolve. When provided, takes
            precedence over round_robin. Use empty list to exclude an agent.
        session_service: Optional ADK session service for state management.
            If None (default), creates an InMemorySessionService internally.
            Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService)
            to persist sessions alongside other agent executions in a shared database.
        app: Optional ADK App instance. When provided, evolution uses the app's
            configuration. Note that App does not hold services directly; pass
            a Runner for service extraction, or combine with session_service param.
        runner: Optional ADK Runner instance. When provided, evolution extracts
            and uses the runner's session_service for all agent executions
            (evolved agents, critic, and reflection agent). Takes precedence
            over both app and session_service parameters. This enables seamless
            integration with existing ADK infrastructure.

    Returns:
        MultiAgentEvolutionResult containing evolved_components dict mapping
        agent names to their optimized component_text, along with score
        metrics and iteration history.

    Raises:
        WorkflowEvolutionError: If workflow contains no LlmAgents.
        MultiAgentValidationError: If primary agent not found or no scorer
            available.
        EvolutionError: If evolution fails during execution.

    Examples:
        Default behavior (evolve first agent only):

        ```python
        from google.adk.agents import LlmAgent, SequentialAgent
        from gepa_adk import evolve_workflow

        generator = LlmAgent(name="generator", instruction="Generate code")
        refiner = LlmAgent(name="refiner", instruction="Refine code")
        writer = LlmAgent(name="writer", instruction="Write docs")
        pipeline = SequentialAgent(
            name="Pipeline", sub_agents=[generator, refiner, writer]
        )

        # Only generator.instruction is evolved across all iterations
        result = await evolve_workflow(workflow=pipeline, trainset=trainset)
        ```

        Round-robin evolution (evolve all agents):

        ```python
        # All agents are evolved in round-robin: generator -> refiner -> writer -> ...
        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            round_robin=True,
        )
        ```

        Explicit components override (takes precedence over round_robin):

        ```python
        # Only generator and writer are evolved; refiner is excluded
        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            components={
                "generator": ["instruction"],
                "writer": ["instruction"],
                "refiner": [],  # Excluded
            },
        )
        ```

        Using custom session service for persistence:

        ```python
        from google.adk.sessions import SqliteSessionService

        # Persist workflow evolution sessions to SQLite
        session_service = SqliteSessionService(db_path="workflow_sessions.db")

        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            session_service=session_service,
        )
        ```

        Using App/Runner for existing infrastructure integration:

        ```python
        from google.adk.runners import Runner
        from google.adk.sessions import DatabaseSessionService

        # Configure Runner with your production session service
        runner = Runner(
            app_name="my_workflow_app",
            agent=pipeline,  # The workflow agent
            session_service=DatabaseSessionService(connection_string="..."),
        )

        # Evolution uses Runner's session_service for all operations
        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            runner=runner,  # Services extracted from runner
        )
        ```

    Note:
        Supports workflow agents (SequentialAgent, LoopAgent, ParallelAgent)
        with recursive traversal and depth limiting via max_depth parameter.
        Handles nested structures. LoopAgent and ParallelAgent configurations
        (max_iterations, etc.) are preserved during evolution. Always uses
        share_session=True to maintain workflow context (FR-010).
    """
    logger.info(
        "Starting workflow evolution",
        workflow_name=workflow.name,
        workflow_type=type(workflow).__name__,
    )

    # Find all LlmAgents in the workflow recursively up to max_depth (US3)
    llm_agents = find_llm_agents(workflow, max_depth=max_depth)

    # Validate that at least one LlmAgent was found
    if not llm_agents:
        error_msg = (
            f"No LlmAgents found in workflow '{workflow.name}'. "
            "Workflow must contain at least one LlmAgent to evolve."
        )
        logger.error(
            "Workflow evolution failed", workflow_name=workflow.name, error=error_msg
        )
        raise WorkflowEvolutionError(
            error_msg,
            workflow_name=workflow.name,
        )

    logger.info(
        "Found LlmAgents in workflow",
        workflow_name=workflow.name,
        agent_count=len(llm_agents),
        agent_names=[agent.name for agent in llm_agents],
    )

    # Determine primary agent (default to last agent for sequential workflows)
    if primary is None:
        primary = llm_agents[-1].name
        logger.debug(
            "Using default primary agent",
            workflow_name=workflow.name,
            primary=primary,
        )

    # Convert list to dict for evolve_group (API v0.3.x)
    agents_dict = {agent.name: agent for agent in llm_agents}

    # Build components dict based on round_robin flag
    # Explicit components parameter takes precedence over round_robin
    resolved_components: dict[str, list[str]] | None = None
    if components is not None:
        # Explicit components provided - use as-is
        resolved_components = components
        logger.debug(
            "Using explicit components",
            workflow_name=workflow.name,
            components=list(components.keys()),
        )
    elif round_robin:
        # round_robin=True: evolve all agents
        resolved_components = {agent.name: ["instruction"] for agent in llm_agents}
        logger.debug(
            "Using round_robin mode - evolving all agents",
            workflow_name=workflow.name,
            agents=[agent.name for agent in llm_agents],
        )
    else:
        # Default: evolve only the first agent
        first_agent = llm_agents[0]
        resolved_components = {first_agent.name: ["instruction"]}
        # Add empty lists for other agents (excluded from evolution)
        for agent in llm_agents[1:]:
            resolved_components[agent.name] = []
        logger.debug(
            "Using default mode - evolving first agent only",
            workflow_name=workflow.name,
            first_agent=first_agent.name,
        )

    # Delegate to evolve_group with share_session=True (FR-010)
    logger.debug(
        "Delegating to evolve_group",
        workflow_name=workflow.name,
        agent_count=len(llm_agents),
        primary=primary,
        share_session=True,
        round_robin=round_robin,
    )

    return await evolve_group(
        agents=agents_dict,
        primary=primary,
        trainset=trainset,
        components=resolved_components,
        critic=critic,
        share_session=True,  # FR-010: Always use shared session for workflow context
        config=config,
        state_guard=state_guard,
        component_selector=component_selector,
        workflow=workflow,  # Preserve workflow structure (#215)
        session_service=session_service,  # Pass through for persistence (#226)
        app=app,  # Pass through for App/Runner pattern (#227)
        runner=runner,  # Pass through for App/Runner pattern (#227)
    )

main

main() -> None

Entry point for CLI invocation.

Source code in src/gepa_adk/__init__.py
def main() -> None:
    """Entry point for CLI invocation."""
    print("Hello from gepa-adk!")