Skip to content

Full Module Index

gepa_adk

GEPA-ADK: Async-first evolution engine for agentic development.

This package provides domain models and utilities for evolving agent instructions using the GEPA (Generalized Evolutionary Prompt-programming Architecture) approach.

ATTRIBUTE DESCRIPTION
__version__

Package version from pyproject.toml.

TYPE: str

EvolutionConfig

Configuration parameters for evolution runs.

TYPE: class

EvolutionResult

Outcome of a completed evolution run.

TYPE: class

MultiAgentEvolutionResult

Outcome of a multi-agent evolution run.

TYPE: class

Candidate

Instruction candidate being evolved.

TYPE: class

IterationRecord

Metrics for a single evolution iteration.

TYPE: class

Score

Type alias for normalized scores.

TYPE: type

ComponentName

Type alias for component identifiers.

TYPE: type

DEFAULT_COMPONENT_NAME

Default component name constant.

TYPE: str

ModelName

Type alias for model identifiers.

TYPE: type

StopReason

Why an evolution run terminated.

TYPE: enum

FrontierType

Pareto frontier type selector.

TYPE: enum

CURRENT_SCHEMA_VERSION

Current result schema version.

TYPE: int

TrajectoryConfig

Configuration for trajectory extraction.

TYPE: class

SchemaConstraints

Schema-level field constraints for evolution.

TYPE: class

EvolutionError

Base exception for all gepa-adk errors.

TYPE: class

ConfigurationError

Raised when configuration validation fails.

TYPE: class

VideoValidationError

Raised for invalid video input.

TYPE: class

AsyncGEPAEngine

Core async evolution engine.

TYPE: class

MergeProposer

Proposes merged candidates from the Pareto frontier.

TYPE: class

AgentProvider

Protocol for loading and persisting agents.

TYPE: protocol

AsyncGEPAAdapter

Async adapter protocol for evaluation.

TYPE: protocol

ComponentHandler

Protocol for component serialization/application.

TYPE: protocol

EvaluationBatch

Evaluation results container for adapters.

TYPE: class

DataInst

Type variable for adapter input instances.

TYPE: type

Trajectory

Type variable for adapter traces.

TYPE: type

RolloutOutput

Type variable for adapter outputs.

TYPE: type

ComponentSelectorProtocol

Protocol for component selection.

TYPE: protocol

RoundRobinComponentSelector

Round-robin component selector.

TYPE: class

AllComponentSelector

Selector that returns all components.

TYPE: class

create_component_selector

Factory for component selectors.

TYPE: function

SimpleCriticOutput

Pydantic schema for simple critic output.

TYPE: class

CriticOutput

Pydantic schema for advanced critic output.

TYPE: class

SIMPLE_CRITIC_INSTRUCTION

Default simple critic instruction.

TYPE: str

ADVANCED_CRITIC_INSTRUCTION

Default advanced critic instruction.

TYPE: str

STRUCTURED_OUTPUT_CRITIC_INSTRUCTION

Preset instruction for structure evaluation.

TYPE: str

ACCURACY_CRITIC_INSTRUCTION

Preset instruction for factual accuracy evaluation.

TYPE: str

RELEVANCE_CRITIC_INSTRUCTION

Preset instruction for relevance evaluation.

TYPE: str

normalize_feedback

Normalize critic feedback to standard form.

TYPE: function

create_critic

Factory for pre-configured critic agents by preset name.

TYPE: function

critic_presets

Maps preset name to human-readable description.

TYPE: dict

evolve

Async single-agent evolution entry point.

TYPE: function

evolve_sync

Deprecated synchronous wrapper for evolve().

TYPE: function

evolve_group

Async multi-agent group evolution.

TYPE: function

evolve_workflow

Async workflow-level evolution.

TYPE: function

run_sync

Universal sync wrapper for any async evolution call.

TYPE: function

Examples:

Basic usage with configuration and candidates:

from gepa_adk import EvolutionConfig, Candidate

config = EvolutionConfig(max_iterations=10, patience=3)
candidate = Candidate(components={"instruction": "Be helpful"})

Configuring trajectory extraction:

from gepa_adk import TrajectoryConfig

trajectory_config = TrajectoryConfig(
    redact_sensitive=True,
    max_string_length=5000,
)
See Also
Note

This is the main entry point for the gepa-adk package. Domain models are re-exported here for convenient top-level access.

CURRENT_SCHEMA_VERSION module-attribute

CURRENT_SCHEMA_VERSION = 1

Schema version for evolution result serialization.

Incremented when the result schema changes in a way that requires migration logic in from_dict().

DEFAULT_COMPONENT_NAME module-attribute

DEFAULT_COMPONENT_NAME: ComponentName = 'instruction'

Default component name for single-component evolution.

This constant provides a single source of truth for the default component name used when evolving a single component (typically an agent's instruction). Use this constant instead of hardcoding 'instruction' throughout the codebase.

ComponentName module-attribute

ComponentName: TypeAlias = str

Name of a candidate component (e.g., 'instruction', 'output_schema').

ModelName module-attribute

ModelName: TypeAlias = str

Model identifier (e.g., 'gemini-2.5-flash', 'gpt-4o').

Score module-attribute

Score: TypeAlias = float

Normalized score, typically in [0.0, 1.0].

CriticOutput

Bases: BaseModel


              flowchart TD
              gepa_adk.CriticOutput[CriticOutput]

              

              click gepa_adk.CriticOutput href "" "gepa_adk.CriticOutput"
            

Advanced schema for structured critic feedback with dimensions.

This schema defines the expected JSON structure that critic agents should return when configured with output_schema. The score field is required, while other fields are optional and will be preserved in metadata.

ATTRIBUTE DESCRIPTION
score

Score value between 0.0 and 1.0 (required).

TYPE: float

feedback

Human-readable feedback text (optional).

TYPE: str

dimension_scores

Per-dimension evaluation scores (optional).

TYPE: dict[str, float]

actionable_guidance

Specific improvement suggestions (optional).

TYPE: str

Examples:

Advanced critic output:

{
    "score": 0.75,
    "feedback": "Good response but could be more concise",
    "dimension_scores": {
        "accuracy": 0.9,
        "clarity": 0.6,
        "completeness": 0.8
    },
    "actionable_guidance": "Reduce response length by 30%"
}
Note

All critic agents using this schema must return structured JSON. When this schema is used as output_schema on an LlmAgent, the agent can ONLY reply and CANNOT use any tools. This is acceptable for critic agents focused on scoring.

See Also

gepa_adk.adapters.scoring.critic_scorer.SimpleCriticOutput: KISS schema with just score + feedback.

Source code in src/gepa_adk/adapters/scoring/critic_scorer.py
class CriticOutput(BaseModel):
    """Advanced schema for structured critic feedback with dimensions.

    This schema defines the expected JSON structure that critic agents
    should return when configured with output_schema. The score field is
    required, while other fields are optional and will be preserved in
    metadata.

    Attributes:
        score (float): Score value between 0.0 and 1.0 (required).
        feedback (str): Human-readable feedback text (optional).
        dimension_scores (dict[str, float]): Per-dimension evaluation scores (optional).
        actionable_guidance (str): Specific improvement suggestions (optional).

    Examples:
        Advanced critic output:

        ```json
        {
            "score": 0.75,
            "feedback": "Good response but could be more concise",
            "dimension_scores": {
                "accuracy": 0.9,
                "clarity": 0.6,
                "completeness": 0.8
            },
            "actionable_guidance": "Reduce response length by 30%"
        }
        ```

    Note:
        All critic agents using this schema must return structured JSON.
        When this schema is used as output_schema on an LlmAgent, the
        agent can ONLY reply and CANNOT use any tools. This is acceptable
        for critic agents focused on scoring.

    See Also:
        [gepa_adk.adapters.scoring.critic_scorer.SimpleCriticOutput][]:
            KISS schema with just score + feedback.
    """

    score: float = Field(
        ...,
        ge=0.0,
        le=1.0,
        description="Score from 0.0 to 1.0",
    )
    feedback: str = Field(
        default="",
        description="Human-readable feedback",
    )
    dimension_scores: dict[str, float] = Field(
        default_factory=dict,
        description="Per-dimension scores",
    )
    actionable_guidance: str = Field(
        default="",
        description="Improvement suggestions",
    )

SimpleCriticOutput

Bases: BaseModel


              flowchart TD
              gepa_adk.SimpleCriticOutput[SimpleCriticOutput]

              

              click gepa_adk.SimpleCriticOutput href "" "gepa_adk.SimpleCriticOutput"
            

KISS schema for basic critic feedback.

This is the minimal schema for critic agents that only need to provide a score and text feedback. Use this for straightforward evaluation tasks where dimension breakdowns are not needed.

ATTRIBUTE DESCRIPTION
score

Score value between 0.0 and 1.0 (required).

TYPE: float

feedback

Human-readable feedback text (required).

TYPE: str

Examples:

Simple critic output:

{
    "score": 0.75,
    "feedback": "Good response but could be more concise."
}

Using with LlmAgent:

from google.adk.agents import LlmAgent
from gepa_adk.adapters.scoring.critic_scorer import SimpleCriticOutput

critic = LlmAgent(
    name="simple_critic",
    model="gemini-2.5-flash",
    instruction=SIMPLE_CRITIC_INSTRUCTION,
    output_schema=SimpleCriticOutput,
)
Note

Applies to basic evaluation tasks where only a score and feedback are needed. For more detailed evaluations with dimension scores, use CriticOutput instead.

See Also

gepa_adk.adapters.scoring.critic_scorer.CriticOutput: Advanced schema with dimension scores and guidance.

Source code in src/gepa_adk/adapters/scoring/critic_scorer.py
class SimpleCriticOutput(BaseModel):
    """KISS schema for basic critic feedback.

    This is the minimal schema for critic agents that only need to provide
    a score and text feedback. Use this for straightforward evaluation tasks
    where dimension breakdowns are not needed.

    Attributes:
        score (float): Score value between 0.0 and 1.0 (required).
        feedback (str): Human-readable feedback text (required).

    Examples:
        Simple critic output:

        ```json
        {
            "score": 0.75,
            "feedback": "Good response but could be more concise."
        }
        ```

        Using with LlmAgent:

        ```python
        from google.adk.agents import LlmAgent
        from gepa_adk.adapters.scoring.critic_scorer import SimpleCriticOutput

        critic = LlmAgent(
            name="simple_critic",
            model="gemini-2.5-flash",
            instruction=SIMPLE_CRITIC_INSTRUCTION,
            output_schema=SimpleCriticOutput,
        )
        ```

    Note:
        Applies to basic evaluation tasks where only a score and feedback
        are needed. For more detailed evaluations with dimension scores,
        use CriticOutput instead.

    See Also:
        [gepa_adk.adapters.scoring.critic_scorer.CriticOutput][]:
            Advanced schema with dimension scores and guidance.
    """

    score: float = Field(
        ...,
        ge=0.0,
        le=1.0,
        description="Score from 0.0 to 1.0",
    )
    feedback: str = Field(
        ...,
        description="Human-readable feedback explaining the score",
    )

AllComponentSelector

Selects all available components for simultaneous update.

This selector returns the full list of components every time, enabling simultaneous evolution of all parts of the candidate.

Examples:

selector = AllComponentSelector()
all_comps = await selector.select_components(["a", "b"], 1, 0)
# Returns ["a", "b"]
Note

Always returns all components, enabling comprehensive mutations across the entire candidate in a single iteration.

Source code in src/gepa_adk/adapters/selection/component_selector.py
class AllComponentSelector:
    """Selects all available components for simultaneous update.

    This selector returns the full list of components every time, enabling
    simultaneous evolution of all parts of the candidate.

    Examples:
        ```python
        selector = AllComponentSelector()
        all_comps = await selector.select_components(["a", "b"], 1, 0)
        # Returns ["a", "b"]
        ```

    Note:
        Always returns all components, enabling comprehensive mutations
        across the entire candidate in a single iteration.
    """

    async def select_components(
        self, components: list[str], iteration: int, candidate_idx: int
    ) -> list[str]:
        """Select all components to update.

        Args:
            components: List of available component keys.
            iteration: Current global iteration number (unused).
            candidate_idx: Index of the candidate being evolved (unused).

        Returns:
            List containing all component keys.

        Raises:
            ValueError: If components list is empty.

        Examples:
            ```python
            selected = await selector.select_components(["a", "b"], 1, 0)
            ```

        Note:
            Outputs the complete component list unchanged, enabling
            simultaneous evolution of all candidate parts.
        """
        if not components:
            raise ValueError("No components provided for selection")

        return list(components)

select_components async

select_components(
    components: list[str],
    iteration: int,
    candidate_idx: int,
) -> list[str]

Select all components to update.

PARAMETER DESCRIPTION
components

List of available component keys.

TYPE: list[str]

iteration

Current global iteration number (unused).

TYPE: int

candidate_idx

Index of the candidate being evolved (unused).

TYPE: int

RETURNS DESCRIPTION
list[str]

List containing all component keys.

RAISES DESCRIPTION
ValueError

If components list is empty.

Examples:

selected = await selector.select_components(["a", "b"], 1, 0)
Note

Outputs the complete component list unchanged, enabling simultaneous evolution of all candidate parts.

Source code in src/gepa_adk/adapters/selection/component_selector.py
async def select_components(
    self, components: list[str], iteration: int, candidate_idx: int
) -> list[str]:
    """Select all components to update.

    Args:
        components: List of available component keys.
        iteration: Current global iteration number (unused).
        candidate_idx: Index of the candidate being evolved (unused).

    Returns:
        List containing all component keys.

    Raises:
        ValueError: If components list is empty.

    Examples:
        ```python
        selected = await selector.select_components(["a", "b"], 1, 0)
        ```

    Note:
        Outputs the complete component list unchanged, enabling
        simultaneous evolution of all candidate parts.
    """
    if not components:
        raise ValueError("No components provided for selection")

    return list(components)

RoundRobinComponentSelector

Selects components in a round-robin fashion.

This selector cycles through the list of components one by one, maintaining state per candidate index to ensure consistent rotation.

ATTRIBUTE DESCRIPTION
_next_index

Mapping of candidate_idx to next component index.

TYPE: dict[int, int]

Examples:

selector = RoundRobinComponentSelector()
# First call selects first component
c1 = await selector.select_components(["a", "b"], 1, 0)  # ["a"]
# Second call selects second component
c2 = await selector.select_components(["a", "b"], 2, 0)  # ["b"]
Note

Alternates through components sequentially, ensuring balanced evolution across all candidate parts.

Source code in src/gepa_adk/adapters/selection/component_selector.py
class RoundRobinComponentSelector:
    """Selects components in a round-robin fashion.

    This selector cycles through the list of components one by one, maintaining
    state per candidate index to ensure consistent rotation.

    Attributes:
        _next_index (dict[int, int]): Mapping of candidate_idx to next component index.

    Examples:
        ```python
        selector = RoundRobinComponentSelector()
        # First call selects first component
        c1 = await selector.select_components(["a", "b"], 1, 0)  # ["a"]
        # Second call selects second component
        c2 = await selector.select_components(["a", "b"], 2, 0)  # ["b"]
        ```

    Note:
        Alternates through components sequentially, ensuring balanced evolution
        across all candidate parts.
    """

    def __init__(self) -> None:
        """Initialize the round-robin selector.

        Note:
            Creates empty index tracking dictionary for per-candidate rotation state.
        """
        self._next_index: dict[int, int] = defaultdict(int)

    async def select_components(
        self, components: list[str], iteration: int, candidate_idx: int
    ) -> list[str]:
        """Select a single component to update using round-robin logic.

        Args:
            components: List of available component keys.
            iteration: Current global iteration number (unused by this strategy).
            candidate_idx: Index of the candidate being evolved.

        Returns:
            List containing the single selected component key.

        Raises:
            ValueError: If components list is empty.

        Examples:
            ```python
            selected = await selector.select_components(["a", "b"], 1, 0)
            ```

        Note:
            Outputs one component per call, advancing the rotation index for
            the specified candidate.
        """
        if not components:
            raise ValueError("No components provided for selection")

        # Get current index for this candidate
        current_idx = self._next_index[candidate_idx]

        # Select component
        selected = components[current_idx % len(components)]

        # Advance index for next time
        self._next_index[candidate_idx] = (current_idx + 1) % len(components)

        return [selected]

__init__

__init__() -> None

Initialize the round-robin selector.

Note

Creates empty index tracking dictionary for per-candidate rotation state.

Source code in src/gepa_adk/adapters/selection/component_selector.py
def __init__(self) -> None:
    """Initialize the round-robin selector.

    Note:
        Creates empty index tracking dictionary for per-candidate rotation state.
    """
    self._next_index: dict[int, int] = defaultdict(int)

select_components async

select_components(
    components: list[str],
    iteration: int,
    candidate_idx: int,
) -> list[str]

Select a single component to update using round-robin logic.

PARAMETER DESCRIPTION
components

List of available component keys.

TYPE: list[str]

iteration

Current global iteration number (unused by this strategy).

TYPE: int

candidate_idx

Index of the candidate being evolved.

TYPE: int

RETURNS DESCRIPTION
list[str]

List containing the single selected component key.

RAISES DESCRIPTION
ValueError

If components list is empty.

Examples:

selected = await selector.select_components(["a", "b"], 1, 0)
Note

Outputs one component per call, advancing the rotation index for the specified candidate.

Source code in src/gepa_adk/adapters/selection/component_selector.py
async def select_components(
    self, components: list[str], iteration: int, candidate_idx: int
) -> list[str]:
    """Select a single component to update using round-robin logic.

    Args:
        components: List of available component keys.
        iteration: Current global iteration number (unused by this strategy).
        candidate_idx: Index of the candidate being evolved.

    Returns:
        List containing the single selected component key.

    Raises:
        ValueError: If components list is empty.

    Examples:
        ```python
        selected = await selector.select_components(["a", "b"], 1, 0)
        ```

    Note:
        Outputs one component per call, advancing the rotation index for
        the specified candidate.
    """
    if not components:
        raise ValueError("No components provided for selection")

    # Get current index for this candidate
    current_idx = self._next_index[candidate_idx]

    # Select component
    selected = components[current_idx % len(components)]

    # Advance index for next time
    self._next_index[candidate_idx] = (current_idx + 1) % len(components)

    return [selected]

Candidate dataclass

Represents an instruction candidate being evolved.

Unlike GEPA's simple dict[str, str] type alias, this class provides richer state tracking for async scenarios including lineage and metadata.

ATTRIBUTE DESCRIPTION
components

Component name to text value mapping. Common keys include 'instruction' (main agent prompt) and 'output_schema'.

TYPE: dict[str, str]

generation

Generation number in the evolution lineage (0 = initial).

TYPE: int

parent_id

ID of the parent candidate for lineage tracking (legacy field, retained for compatibility).

TYPE: str | None

parent_ids

Multi-parent indices for merge operations. None for seed candidates, [single_idx] for mutations, [idx1, idx2] for merges.

TYPE: list[int] | None

metadata

Extensible metadata dict for async tracking and debugging.

TYPE: dict[str, Any]

Examples:

Creating a candidate:

from gepa_adk.domain.models import Candidate

candidate = Candidate(
    components={"instruction": "Be helpful"},
    generation=0,
)
print(candidate.components["instruction"])  # Be helpful
print(candidate.generation)  # 0
Note

A mutable candidate representation with richer state tracking than GEPA's simple dict. Components and metadata can be modified during the evolution process. Use generation and parent_id to track lineage.

Source code in src/gepa_adk/domain/models.py
@dataclass(slots=True, kw_only=True)
class Candidate:
    """Represents an instruction candidate being evolved.

    Unlike GEPA's simple `dict[str, str]` type alias, this class provides
    richer state tracking for async scenarios including lineage and metadata.

    Attributes:
        components (dict[str, str]): Component name to text value mapping.
            Common keys include 'instruction' (main agent prompt) and
            'output_schema'.
        generation (int): Generation number in the evolution lineage
            (0 = initial).
        parent_id (str | None): ID of the parent candidate for lineage
            tracking (legacy field, retained for compatibility).
        parent_ids (list[int] | None): Multi-parent indices for merge operations.
            None for seed candidates, [single_idx] for mutations, [idx1, idx2] for merges.
        metadata (dict[str, Any]): Extensible metadata dict for async tracking
            and debugging.

    Examples:
        Creating a candidate:

        ```python
        from gepa_adk.domain.models import Candidate

        candidate = Candidate(
            components={"instruction": "Be helpful"},
            generation=0,
        )
        print(candidate.components["instruction"])  # Be helpful
        print(candidate.generation)  # 0
        ```

    Note:
        A mutable candidate representation with richer state tracking than
        GEPA's simple dict. Components and metadata can be modified during
        the evolution process. Use generation and parent_id to track lineage.
    """

    components: dict[str, str] = field(default_factory=dict)
    generation: int = 0
    parent_id: str | None = None
    parent_ids: list[int] | None = None
    metadata: dict[str, Any] = field(default_factory=dict)

ConfigurationError

Bases: EvolutionError


              flowchart TD
              gepa_adk.ConfigurationError[ConfigurationError]
              gepa_adk.domain.exceptions.EvolutionError[EvolutionError]

                              gepa_adk.domain.exceptions.EvolutionError --> gepa_adk.ConfigurationError
                


              click gepa_adk.ConfigurationError href "" "gepa_adk.ConfigurationError"
              click gepa_adk.domain.exceptions.EvolutionError href "" "gepa_adk.domain.exceptions.EvolutionError"
            

Raised when configuration validation fails.

This exception is raised during EvolutionConfig initialization when a parameter violates its validation constraints.

ATTRIBUTE DESCRIPTION
field

The name of the configuration field that failed validation.

TYPE: str | None

value

The invalid value that was provided.

TYPE: object

constraint

Description of the validation constraint that was violated.

TYPE: str | None

Examples:

Creating a configuration error with context:

from gepa_adk.domain.exceptions import ConfigurationError

error = ConfigurationError(
    "max_iterations must be non-negative",
    field="max_iterations",
    value=-5,
    constraint=">= 0",
)
print(error.field, error.value, error.constraint)
# Output: max_iterations -5 >= 0
Note

Arises from user-provided invalid settings, not programming errors. Should be caught and reported with clear guidance on valid values.

Source code in src/gepa_adk/domain/exceptions.py
class ConfigurationError(EvolutionError):
    """Raised when configuration validation fails.

    This exception is raised during EvolutionConfig initialization
    when a parameter violates its validation constraints.

    Attributes:
        field (str | None): The name of the configuration field that failed
            validation.
        value (object): The invalid value that was provided.
        constraint (str | None): Description of the validation constraint that
            was violated.

    Examples:
        Creating a configuration error with context:

        ```python
        from gepa_adk.domain.exceptions import ConfigurationError

        error = ConfigurationError(
            "max_iterations must be non-negative",
            field="max_iterations",
            value=-5,
            constraint=">= 0",
        )
        print(error.field, error.value, error.constraint)
        # Output: max_iterations -5 >= 0
        ```

    Note:
        Arises from user-provided invalid settings, not programming errors.
        Should be caught and reported with clear guidance on valid values.
    """

    def __init__(
        self,
        message: str,
        *,
        field: str | None = None,
        value: object = None,
        constraint: str | None = None,
    ) -> None:
        """Initialize ConfigurationError with context.

        Args:
            message: Human-readable error description.
            field: Name of the invalid configuration field.
            value: The invalid value provided.
            constraint: Description of the validation constraint.

        Note:
            Context fields use keyword-only syntax to ensure explicit labeling
            and prevent positional argument mistakes.
        """
        super().__init__(message)
        self.field = field
        self.value = value
        self.constraint = constraint

    def __str__(self) -> str:
        """Return string representation with context.

        Returns:
            Formatted error message including field and value context.

        Note:
            Outputs formatted error message with field and value context
            when available, preserving base message structure.
        """
        base = super().__str__()
        context_parts = []
        if self.field is not None:
            context_parts.append(f"field={self.field!r}")
        if self.value is not None:
            context_parts.append(f"value={self.value!r}")
        if context_parts:
            return f"{base} [{', '.join(context_parts)}]"
        return base

__init__

__init__(
    message: str,
    *,
    field: str | None = None,
    value: object = None,
    constraint: str | None = None,
) -> None

Initialize ConfigurationError with context.

PARAMETER DESCRIPTION
message

Human-readable error description.

TYPE: str

field

Name of the invalid configuration field.

TYPE: str | None DEFAULT: None

value

The invalid value provided.

TYPE: object DEFAULT: None

constraint

Description of the validation constraint.

TYPE: str | None DEFAULT: None

Note

Context fields use keyword-only syntax to ensure explicit labeling and prevent positional argument mistakes.

Source code in src/gepa_adk/domain/exceptions.py
def __init__(
    self,
    message: str,
    *,
    field: str | None = None,
    value: object = None,
    constraint: str | None = None,
) -> None:
    """Initialize ConfigurationError with context.

    Args:
        message: Human-readable error description.
        field: Name of the invalid configuration field.
        value: The invalid value provided.
        constraint: Description of the validation constraint.

    Note:
        Context fields use keyword-only syntax to ensure explicit labeling
        and prevent positional argument mistakes.
    """
    super().__init__(message)
    self.field = field
    self.value = value
    self.constraint = constraint

__str__

__str__() -> str

Return string representation with context.

RETURNS DESCRIPTION
str

Formatted error message including field and value context.

Note

Outputs formatted error message with field and value context when available, preserving base message structure.

Source code in src/gepa_adk/domain/exceptions.py
def __str__(self) -> str:
    """Return string representation with context.

    Returns:
        Formatted error message including field and value context.

    Note:
        Outputs formatted error message with field and value context
        when available, preserving base message structure.
    """
    base = super().__str__()
    context_parts = []
    if self.field is not None:
        context_parts.append(f"field={self.field!r}")
    if self.value is not None:
        context_parts.append(f"value={self.value!r}")
    if context_parts:
        return f"{base} [{', '.join(context_parts)}]"
    return base

EvolutionConfig dataclass

Configuration parameters for an evolution run.

Defines the parameters that control how evolution proceeds, including iteration limits, concurrency settings, and stopping criteria.

ATTRIBUTE DESCRIPTION
max_iterations

Maximum number of evolution iterations. 0 means just evaluate baseline without evolving.

TYPE: int

max_concurrent_evals

Number of concurrent batch evaluations. Must be at least 1.

TYPE: int

min_improvement_threshold

Minimum score improvement to accept a new candidate. Set to 0.0 to accept any improvement.

TYPE: float

patience

Number of iterations without improvement before stopping early. Set to 0 to disable early stopping.

TYPE: int

reflection_model

Model identifier for reflection/mutation operations.

TYPE: str

frontier_type

Frontier tracking strategy for Pareto selection (default: INSTANCE).

TYPE: FrontierType

acceptance_metric

Aggregation method for acceptance decisions on iteration evaluation batches. "sum" uses sum of scores (default, aligns with upstream GEPA). "mean" uses mean of scores (legacy behavior).

TYPE: Literal['sum', 'mean']

use_merge

Enable merge proposals for genetic crossover. Defaults to False.

TYPE: bool

max_merge_invocations

Maximum number of merge attempts per run. Defaults to 10. Must be non-negative.

TYPE: int

reflection_prompt

Custom reflection/mutation prompt template. If provided, this template is used instead of the default when the reflection model proposes improved text. Required placeholders: - {component_text}: The current component text being evolved - {trials}: Trial data with feedback and trajectory for each test case If None or empty string, the default prompt template is used.

TYPE: str | None

stop_callbacks

List of stopper callbacks for custom stop conditions. Each callback receives a StopperState and returns True to signal stop. Defaults to an empty list.

TYPE: list[StopperProtocol]

seed

Random seed for deterministic engine decisions. When set, a seeded random.Random is created and shared across all stochastic components (candidate selector, merge proposer). None (default) preserves current random behavior.

TYPE: int | None

Examples:

Creating a configuration with defaults:

from gepa_adk.domain.models import EvolutionConfig

config = EvolutionConfig(max_iterations=100, patience=10)
print(config.max_iterations)  # 100
print(config.reflection_model)  # ollama_chat/gpt-oss:20b
Note

All numeric parameters are validated in post_init to ensure they meet their constraints. Cross-field consistency is also checked (e.g., use_merge requires max_merge_invocations > 0, stop_callbacks must be callable). Invalid values raise ConfigurationError.

Determinism applies to engine decisions only (candidate selection, component selection, merge proposals). LLM inference is inherently stochastic and not covered by the seed guarantee.

Source code in src/gepa_adk/domain/models.py
@dataclass(slots=True, kw_only=True)
class EvolutionConfig:
    """Configuration parameters for an evolution run.

    Defines the parameters that control how evolution proceeds, including
    iteration limits, concurrency settings, and stopping criteria.

    Attributes:
        max_iterations (int): Maximum number of evolution iterations. 0 means
            just evaluate baseline without evolving.
        max_concurrent_evals (int): Number of concurrent batch evaluations.
            Must be at least 1.
        min_improvement_threshold (float): Minimum score improvement to accept
            a new candidate. Set to 0.0 to accept any improvement.
        patience (int): Number of iterations without improvement before stopping
            early. Set to 0 to disable early stopping.
        reflection_model (str): Model identifier for reflection/mutation
            operations.
        frontier_type (FrontierType): Frontier tracking strategy for Pareto
            selection (default: INSTANCE).
        acceptance_metric (Literal["sum", "mean"]): Aggregation method for
            acceptance decisions on iteration evaluation batches. "sum" uses
            sum of scores (default, aligns with upstream GEPA). "mean" uses
            mean of scores (legacy behavior).
        use_merge (bool): Enable merge proposals for genetic crossover.
            Defaults to False.
        max_merge_invocations (int): Maximum number of merge attempts per run.
            Defaults to 10. Must be non-negative.
        reflection_prompt (str | None): Custom reflection/mutation prompt template.
            If provided, this template is used instead of the default when the
            reflection model proposes improved text. Required placeholders:
            - {component_text}: The current component text being evolved
            - {trials}: Trial data with feedback and trajectory for each test case
            If None or empty string, the default prompt template is used.
        stop_callbacks (list[StopperProtocol]): List of stopper callbacks for
            custom stop conditions. Each callback receives a StopperState and
            returns True to signal stop. Defaults to an empty list.
        seed (int | None): Random seed for deterministic engine decisions.
            When set, a seeded ``random.Random`` is created and shared across
            all stochastic components (candidate selector, merge proposer).
            ``None`` (default) preserves current random behavior.

    Examples:
        Creating a configuration with defaults:

        ```python
        from gepa_adk.domain.models import EvolutionConfig

        config = EvolutionConfig(max_iterations=100, patience=10)
        print(config.max_iterations)  # 100
        print(config.reflection_model)  # ollama_chat/gpt-oss:20b
        ```

    Note:
        All numeric parameters are validated in __post_init__ to ensure
        they meet their constraints. Cross-field consistency is also checked
        (e.g., use_merge requires max_merge_invocations > 0, stop_callbacks
        must be callable). Invalid values raise ConfigurationError.

        Determinism applies to engine decisions only (candidate selection,
        component selection, merge proposals). LLM inference is inherently
        stochastic and not covered by the seed guarantee.
    """

    max_iterations: int = 50
    max_concurrent_evals: int = 5
    min_improvement_threshold: float = 0.01
    patience: int = 5
    reflection_model: str = "ollama_chat/gpt-oss:20b"
    frontier_type: FrontierType = FrontierType.INSTANCE
    acceptance_metric: Literal["sum", "mean"] = "sum"
    use_merge: bool = False
    max_merge_invocations: int = 10
    reflection_prompt: str | None = None
    stop_callbacks: list["StopperProtocol"] = field(default_factory=list)
    seed: int | None = None

    def __post_init__(self) -> None:
        """Validate configuration parameters after initialization.

        Raises:
            ConfigurationError: If any parameter violates its constraints,
                including non-finite floats (NaN, Inf), cross-field consistency
                rules (e.g., use_merge requires max_merge_invocations > 0,
                stop_callbacks must be callable).

        Note:
            Operates automatically after dataclass __init__ completes. Validates
            all fields including finite-float checks, cross-field consistency,
            and raises ConfigurationError with context on failure.
        """
        if self.max_iterations < 0:
            raise ConfigurationError(
                "max_iterations must be non-negative",
                field="max_iterations",
                value=self.max_iterations,
                constraint=">= 0",
            )

        if self.max_concurrent_evals < 1:
            raise ConfigurationError(
                "max_concurrent_evals must be at least 1",
                field="max_concurrent_evals",
                value=self.max_concurrent_evals,
                constraint=">= 1",
            )

        if not math.isfinite(self.min_improvement_threshold):
            raise ConfigurationError(
                "min_improvement_threshold must be a finite number",
                field="min_improvement_threshold",
                value=self.min_improvement_threshold,
                constraint="finite float",
            )

        if self.min_improvement_threshold < 0.0:
            raise ConfigurationError(
                "min_improvement_threshold must be non-negative",
                field="min_improvement_threshold",
                value=self.min_improvement_threshold,
                constraint=">= 0.0",
            )

        if self.patience < 0:
            raise ConfigurationError(
                "patience must be non-negative",
                field="patience",
                value=self.patience,
                constraint=">= 0",
            )

        if not self.reflection_model:
            raise ConfigurationError(
                "reflection_model must be a non-empty string",
                field="reflection_model",
                value=self.reflection_model,
                constraint="non-empty string",
            )

        if not isinstance(self.frontier_type, FrontierType):
            try:
                self.frontier_type = FrontierType(self.frontier_type)
            except ValueError as exc:
                raise ConfigurationError(
                    "frontier_type must be a supported FrontierType value",
                    field="frontier_type",
                    value=self.frontier_type,
                    constraint=", ".join(t.value for t in FrontierType),
                ) from exc

        if self.acceptance_metric not in ("sum", "mean"):
            raise ConfigurationError(
                "acceptance_metric must be 'sum' or 'mean'",
                field="acceptance_metric",
                value=self.acceptance_metric,
                constraint="sum|mean",
            )

        if self.max_merge_invocations < 0:
            raise ConfigurationError(
                "max_merge_invocations must be non-negative",
                field="max_merge_invocations",
                value=self.max_merge_invocations,
                constraint=">= 0",
            )

        # Cross-field consistency checks
        self._validate_consistency()

        # Validate reflection_prompt if provided
        self._validate_reflection_prompt()

    def _validate_consistency(self) -> None:
        """Validate cross-field consistency rules.

        Raises:
            ConfigurationError: If use_merge is True but max_merge_invocations
                is zero, or if stop_callbacks contains non-callable items.

        Note:
            Hard errors raise ConfigurationError; soft issues log warnings.
            Called from __post_init__ after individual field validation.
        """
        if self.use_merge and self.max_merge_invocations == 0:
            raise ConfigurationError(
                "use_merge=True requires max_merge_invocations > 0",
                field="max_merge_invocations",
                value=self.max_merge_invocations,
                constraint="> 0 when use_merge=True",
            )

        if (
            self.patience > 0
            and self.max_iterations > 0
            and self.patience > self.max_iterations
        ):
            logger.warning(
                "config.patience.exceeds_max_iterations",
                patience=self.patience,
                max_iterations=self.max_iterations,
                message="patience exceeds max_iterations; early stopping will never trigger",
            )

        for i, callback in enumerate(self.stop_callbacks):
            if not callable(callback):
                raise ConfigurationError(
                    f"stop_callbacks[{i}] is not callable",
                    field=f"stop_callbacks[{i}]",
                    value=type(callback).__name__,
                    constraint="must be callable",
                )

    def _validate_reflection_prompt(self) -> None:
        """Validate reflection_prompt and handle empty string.

        Converts empty string to None with info log. Warns if required
        placeholders are missing but allows the config to be created.

        Note:
            Soft validation approach - missing placeholders trigger warnings
            but don't prevent config creation for maximum flexibility.
        """
        # Handle empty string as "use default"
        if self.reflection_prompt == "":
            logger.info(
                "config.reflection_prompt.empty",
                message="Empty reflection_prompt provided, using default template",
            )
            # Use object.__setattr__ because slots=True prevents direct assignment
            object.__setattr__(self, "reflection_prompt", None)
            return

        # Skip validation if None
        if self.reflection_prompt is None:
            return

        # Warn about missing placeholders
        if "{component_text}" not in self.reflection_prompt:
            logger.warning(
                "config.reflection_prompt.missing_placeholder",
                placeholder="component_text",
                message="reflection_prompt is missing {component_text} placeholder",
            )

        if "{trials}" not in self.reflection_prompt:
            logger.warning(
                "config.reflection_prompt.missing_placeholder",
                placeholder="trials",
                message="reflection_prompt is missing {trials} placeholder",
            )

__post_init__

__post_init__() -> None

Validate configuration parameters after initialization.

RAISES DESCRIPTION
ConfigurationError

If any parameter violates its constraints, including non-finite floats (NaN, Inf), cross-field consistency rules (e.g., use_merge requires max_merge_invocations > 0, stop_callbacks must be callable).

Note

Operates automatically after dataclass init completes. Validates all fields including finite-float checks, cross-field consistency, and raises ConfigurationError with context on failure.

Source code in src/gepa_adk/domain/models.py
def __post_init__(self) -> None:
    """Validate configuration parameters after initialization.

    Raises:
        ConfigurationError: If any parameter violates its constraints,
            including non-finite floats (NaN, Inf), cross-field consistency
            rules (e.g., use_merge requires max_merge_invocations > 0,
            stop_callbacks must be callable).

    Note:
        Operates automatically after dataclass __init__ completes. Validates
        all fields including finite-float checks, cross-field consistency,
        and raises ConfigurationError with context on failure.
    """
    if self.max_iterations < 0:
        raise ConfigurationError(
            "max_iterations must be non-negative",
            field="max_iterations",
            value=self.max_iterations,
            constraint=">= 0",
        )

    if self.max_concurrent_evals < 1:
        raise ConfigurationError(
            "max_concurrent_evals must be at least 1",
            field="max_concurrent_evals",
            value=self.max_concurrent_evals,
            constraint=">= 1",
        )

    if not math.isfinite(self.min_improvement_threshold):
        raise ConfigurationError(
            "min_improvement_threshold must be a finite number",
            field="min_improvement_threshold",
            value=self.min_improvement_threshold,
            constraint="finite float",
        )

    if self.min_improvement_threshold < 0.0:
        raise ConfigurationError(
            "min_improvement_threshold must be non-negative",
            field="min_improvement_threshold",
            value=self.min_improvement_threshold,
            constraint=">= 0.0",
        )

    if self.patience < 0:
        raise ConfigurationError(
            "patience must be non-negative",
            field="patience",
            value=self.patience,
            constraint=">= 0",
        )

    if not self.reflection_model:
        raise ConfigurationError(
            "reflection_model must be a non-empty string",
            field="reflection_model",
            value=self.reflection_model,
            constraint="non-empty string",
        )

    if not isinstance(self.frontier_type, FrontierType):
        try:
            self.frontier_type = FrontierType(self.frontier_type)
        except ValueError as exc:
            raise ConfigurationError(
                "frontier_type must be a supported FrontierType value",
                field="frontier_type",
                value=self.frontier_type,
                constraint=", ".join(t.value for t in FrontierType),
            ) from exc

    if self.acceptance_metric not in ("sum", "mean"):
        raise ConfigurationError(
            "acceptance_metric must be 'sum' or 'mean'",
            field="acceptance_metric",
            value=self.acceptance_metric,
            constraint="sum|mean",
        )

    if self.max_merge_invocations < 0:
        raise ConfigurationError(
            "max_merge_invocations must be non-negative",
            field="max_merge_invocations",
            value=self.max_merge_invocations,
            constraint=">= 0",
        )

    # Cross-field consistency checks
    self._validate_consistency()

    # Validate reflection_prompt if provided
    self._validate_reflection_prompt()

EvolutionError

Bases: Exception


              flowchart TD
              gepa_adk.EvolutionError[EvolutionError]

              

              click gepa_adk.EvolutionError href "" "gepa_adk.EvolutionError"
            

Base exception for all gepa-adk errors.

All custom exceptions in gepa-adk should inherit from this class to allow for unified exception handling.

Examples:

Catching evolution errors:

from gepa_adk.domain.exceptions import EvolutionError

try:
    raise EvolutionError("Evolution failed unexpectedly")
except EvolutionError as e:
    print(f"Caught: {e}")
# Output: Caught: Evolution failed unexpectedly
Note

Always use this base class or its subclasses for domain errors. Standard Python exceptions should still be raised for programming errors (e.g., TypeError, ValueError for developer mistakes).

Source code in src/gepa_adk/domain/exceptions.py
class EvolutionError(Exception):
    """Base exception for all gepa-adk errors.

    All custom exceptions in gepa-adk should inherit from this class
    to allow for unified exception handling.

    Examples:
        Catching evolution errors:

        ```python
        from gepa_adk.domain.exceptions import EvolutionError

        try:
            raise EvolutionError("Evolution failed unexpectedly")
        except EvolutionError as e:
            print(f"Caught: {e}")
        # Output: Caught: Evolution failed unexpectedly
        ```

    Note:
        Always use this base class or its subclasses for domain errors.
        Standard Python exceptions should still be raised for programming
        errors (e.g., TypeError, ValueError for developer mistakes).
    """

EvolutionResult dataclass

Outcome of a completed evolution run.

Contains the final results after evolution completes, including all evolved component values, performance metrics, and full history.

ATTRIBUTE DESCRIPTION
schema_version

Schema version for forward-compatible serialization. Always CURRENT_SCHEMA_VERSION for newly created results.

TYPE: int

stop_reason

Why the evolution run terminated. Defaults to StopReason.COMPLETED.

TYPE: StopReason

original_score

Starting performance score (baseline).

TYPE: float

final_score

Ending performance score (best achieved).

TYPE: float

evolved_components

Dictionary mapping component names to their final evolved text values. Keys include "instruction" and optionally "output_schema" or other components. Access individual components via result.evolved_components["instruction"].

TYPE: dict[str, str]

iteration_history

Chronological list of iteration records.

TYPE: list[IterationRecord]

total_iterations

Number of iterations performed.

TYPE: int

valset_score

Score on validation set used for acceptance decisions. None if no validation set was used.

TYPE: float | None

trainset_score

Score on trainset used for reflection diagnostics. None if not computed.

TYPE: float | None

objective_scores

Optional per-example multi-objective scores from the best candidate's final evaluation. None when no objective scores were tracked. Each dict maps objective name to score value. Index-aligned with evaluation batch examples.

TYPE: list[dict[str, float]] | None

original_components

Optional snapshot of pre-evolution component values. When present, enables zero-arg show_diff() calls. None for results created before this field was added or when originals were not captured.

TYPE: dict[str, str] | None

reflection_reasoning

Read-only property returning the reflection reasoning from the last iteration. Convenience accessor; None if no iterations or last iteration has no reasoning.

TYPE: str | None

Examples:

Creating and analyzing a result:

from gepa_adk.domain.models import EvolutionResult, IterationRecord

result = EvolutionResult(
    original_score=0.60,
    final_score=0.85,
    evolved_components={"instruction": "Be helpful and concise"},
    original_components={"instruction": "Be helpful"},
    iteration_history=[],
    total_iterations=10,
)
print(result.improvement)  # 0.25
print(result.show_diff())  # unified diff of instruction changes

Serialization round-trip:

import json

d = result.to_dict()
json_str = json.dumps(d)
restored = EvolutionResult.from_dict(json.loads(json_str))
Note

As a frozen dataclass, EvolutionResult instances cannot be modified.

Source code in src/gepa_adk/domain/models.py
@dataclass(slots=True, frozen=True, kw_only=True)
class EvolutionResult:
    """Outcome of a completed evolution run.

    Contains the final results after evolution completes, including
    all evolved component values, performance metrics, and full history.

    Attributes:
        schema_version (int): Schema version for forward-compatible serialization.
            Always ``CURRENT_SCHEMA_VERSION`` for newly created results.
        stop_reason (StopReason): Why the evolution run terminated. Defaults to
            ``StopReason.COMPLETED``.
        original_score (float): Starting performance score (baseline).
        final_score (float): Ending performance score (best achieved).
        evolved_components (dict[str, str]): Dictionary mapping component names
            to their final evolved text values. Keys include "instruction" and
            optionally "output_schema" or other components. Access individual
            components via ``result.evolved_components["instruction"]``.
        iteration_history (list[IterationRecord]): Chronological list of
            iteration records.
        total_iterations (int): Number of iterations performed.
        valset_score (float | None): Score on validation set used for
            acceptance decisions. None if no validation set was used.
        trainset_score (float | None): Score on trainset used for reflection
            diagnostics. None if not computed.
        objective_scores (list[dict[str, float]] | None): Optional per-example
            multi-objective scores from the best candidate's final evaluation.
            None when no objective scores were tracked. Each dict maps objective
            name to score value. Index-aligned with evaluation batch examples.
        original_components (dict[str, str] | None): Optional snapshot of
            pre-evolution component values. When present, enables zero-arg
            ``show_diff()`` calls. None for results created before this field
            was added or when originals were not captured.
        reflection_reasoning (str | None): Read-only property returning the
            reflection reasoning from the last iteration. Convenience
            accessor; None if no iterations or last iteration has no reasoning.

    Examples:
        Creating and analyzing a result:

        ```python
        from gepa_adk.domain.models import EvolutionResult, IterationRecord

        result = EvolutionResult(
            original_score=0.60,
            final_score=0.85,
            evolved_components={"instruction": "Be helpful and concise"},
            original_components={"instruction": "Be helpful"},
            iteration_history=[],
            total_iterations=10,
        )
        print(result.improvement)  # 0.25
        print(result.show_diff())  # unified diff of instruction changes
        ```

        Serialization round-trip:

        ```python
        import json

        d = result.to_dict()
        json_str = json.dumps(d)
        restored = EvolutionResult.from_dict(json.loads(json_str))
        ```

    Note:
        As a frozen dataclass, EvolutionResult instances cannot be modified.
    """

    schema_version: int = CURRENT_SCHEMA_VERSION
    stop_reason: StopReason = StopReason.COMPLETED
    original_score: float
    final_score: float
    evolved_components: dict[str, str]
    iteration_history: list[IterationRecord]
    total_iterations: int
    valset_score: float | None = None
    trainset_score: float | None = None
    objective_scores: list[dict[str, float]] | None = None
    original_components: dict[str, str] | None = None

    @property
    def reflection_reasoning(self) -> str | None:
        """Return the reflection reasoning from the last iteration.

        Convenience accessor for the most recent iteration's reasoning
        explaining why the reflection agent proposed its mutation.

        Returns:
            The reasoning string from the last iteration record, or None
            if no iterations exist or the last iteration has no reasoning.
        """
        if not self.iteration_history:
            return None
        return self.iteration_history[-1].reflection_reasoning

    def to_dict(self) -> dict[str, Any]:
        """Serialize this result to a stdlib-only dict.

        Returns:
            Dict containing all fields. ``stop_reason`` is serialized
            as its string value. ``iteration_history`` is serialized as a
            list of dicts. Output is directly ``json.dumps()``-compatible.
        """
        return {
            "schema_version": self.schema_version,
            "stop_reason": self.stop_reason.value,
            "original_score": self.original_score,
            "final_score": self.final_score,
            "evolved_components": self.evolved_components,
            "iteration_history": [r.to_dict() for r in self.iteration_history],
            "total_iterations": self.total_iterations,
            "valset_score": self.valset_score,
            "trainset_score": self.trainset_score,
            "objective_scores": self.objective_scores,
            "original_components": self.original_components,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "EvolutionResult":
        """Reconstruct an EvolutionResult from a dict.

        Validates schema version, applies migration if needed, and
        reconstructs all nested objects including optional
        original_components.

        Args:
            data: Dict containing evolution result fields.

        Returns:
            Reconstructed EvolutionResult instance.

        Raises:
            ConfigurationError: If ``schema_version`` exceeds the current
                version or ``stop_reason`` is not a valid enum value.
            KeyError: If a required field is missing from the dict.
        """
        version = data.get("schema_version", 1)
        if version > CURRENT_SCHEMA_VERSION:
            raise ConfigurationError(
                f"Cannot deserialize result with schema_version {version} "
                f"(current version is {CURRENT_SCHEMA_VERSION}). "
                f"Upgrade gepa-adk to load this result.",
                field="schema_version",
                value=version,
                constraint=f"<= {CURRENT_SCHEMA_VERSION}",
            )
        migrated = _migrate_result_dict(data, from_version=version)
        try:
            stop_reason = StopReason(migrated.get("stop_reason", "completed"))
        except ValueError:
            raw = migrated.get("stop_reason")
            raise ConfigurationError(
                f"Invalid stop_reason value: {raw!r}",
                field="stop_reason",
                value=raw,
                constraint=("one of: " + ", ".join(sr.value for sr in StopReason)),
            ) from None
        return cls(
            schema_version=migrated["schema_version"],
            stop_reason=stop_reason,
            original_score=migrated["original_score"],
            final_score=migrated["final_score"],
            evolved_components=migrated["evolved_components"],
            iteration_history=[
                IterationRecord.from_dict(r)
                for r in migrated.get("iteration_history", [])
            ],
            total_iterations=migrated["total_iterations"],
            valset_score=migrated.get("valset_score"),
            trainset_score=migrated.get("trainset_score"),
            objective_scores=migrated.get("objective_scores"),
            original_components=migrated.get("original_components"),
        )

    @property
    def improvement(self) -> float:
        """Calculate the score improvement from original to final.

        Returns:
            The difference between final_score and original_score.
            Positive values indicate improvement, negative indicates degradation.

        Note:
            Override is not needed since frozen dataclasses support properties.
        """
        return self.final_score - self.original_score

    @property
    def improved(self) -> bool:
        """Check if the final score is better than the original.

        Returns:
            True if final_score > original_score, False otherwise.

        Note:
            Only returns True for strict improvement, not equal scores.
        """
        return self.final_score > self.original_score

    def __repr__(self) -> str:
        """Narrative summary of the evolution result.

        Returns:
            Human-readable multi-line summary with improvement percentage,
            iterations, stop reason, component names, and acceptance rate.
            Uses 2-space indent, no box-drawing characters, every line
            greppable.
        """
        if abs(self.original_score) < 1e-9:
            imp_str = f"{self.improvement:+.4f} improvement"
        else:
            pct = self.improvement * 100 / abs(self.original_score)
            sign = "+" if pct > 0 else ""
            imp_str = f"{sign}{pct:.1f}% improvement"
        lines = [
            f"EvolutionResult: {imp_str} "
            f"({self.original_score:.2f} \u2192 {self.final_score:.2f})",
            f"  iterations: {self.total_iterations}, "
            f"stop_reason: {self.stop_reason.value}",
            f"  components: {', '.join(sorted(self.evolved_components))}",
        ]
        if self.total_iterations > 0:
            accepted = sum(1 for r in self.iteration_history if r.accepted)
            lines.append(f"  acceptance_rate: {accepted}/{self.total_iterations}")
        return "\n".join(lines)

    def show_diff(self, original_components: dict[str, str] | None = None) -> str:
        """Show unified diff between original and evolved components.

        Uses stored ``original_components`` if no explicit argument is
        provided. Produces git-diff-style output (``---``/``+++``/``@@``)
        for each component that changed.

        Args:
            original_components: Pre-evolution component values. If None,
                falls back to ``self.original_components``.

        Returns:
            Unified diff string, or ``"No changes detected."`` if all
            components are identical.

        Raises:
            ValueError: If both the argument and ``self.original_components``
                are None.
        """
        originals = (
            original_components
            if original_components is not None
            else self.original_components
        )
        if originals is None:
            raise ValueError(
                "No original components available. "
                "Pass original_components or use a result that stores them."
            )
        return _build_diff(self.evolved_components, originals)

    def _repr_html_(self) -> str:
        """Render an HTML summary for Jupyter notebooks.

        Returns:
            HTML string with summary and components tables. Uses semantic
            ``<th>`` header elements and inline CSS for portability across
            JupyterLab, Colab, and VS Code. Iteration history is wrapped
            in a collapsible ``<details>``/``<summary>`` block.
        """
        if abs(self.original_score) < 1e-9:
            imp_html = f"{self.improvement:+.4f}"
        else:
            pct = self.improvement * 100 / abs(self.original_score)
            sign = "+" if pct > 0 else ""
            imp_html = f"{sign}{pct:.1f}%"
        style = (
            'style="border-collapse:collapse;'
            "font-family:monospace;font-size:13px;"
            'margin:8px 0"'
        )
        td = 'style="padding:4px 12px;border:1px solid #ddd"'
        th = 'style="padding:4px 12px;border:1px solid #ddd;background:#f5f5f5"'

        rows = [
            f"<tr><th {th}>Improvement</th><td {td}>{imp_html}</td></tr>",
            f"<tr><th {th}>Original Score</th>"
            f"<td {td}>{self.original_score:.4f}</td></tr>",
            f"<tr><th {th}>Final Score</th><td {td}>{self.final_score:.4f}</td></tr>",
            f"<tr><th {th}>Iterations</th><td {td}>{self.total_iterations}</td></tr>",
            f"<tr><th {th}>Stop Reason</th>"
            f"<td {td}>{html_mod.escape(self.stop_reason.value)}</td></tr>",
        ]
        summary_table = f"<table {style}>{''.join(rows)}</table>"

        comp_rows = []
        for name in sorted(self.evolved_components):
            val = html_mod.escape(_truncate(self.evolved_components[name], 200))
            comp_rows.append(
                f"<tr><td {td}>{html_mod.escape(name)}</td><td {td}>{val}</td></tr>"
            )
        comp_header = f"<tr><th {th}>Component</th><th {th}>Evolved Value</th></tr>"
        comp_table = f"<table {style}>{comp_header}{''.join(comp_rows)}</table>"

        history_rows = []
        for rec in self.iteration_history:
            accepted_mark = "\u2713" if rec.accepted else ""
            history_rows.append(
                f"<tr><td {td}>{rec.iteration_number}</td>"
                f"<td {td}>{rec.score:.4f}</td>"
                f"<td {td}>{html_mod.escape(rec.evolved_component)}</td>"
                f"<td {td}>{accepted_mark}</td></tr>"
            )
        history_header = (
            f"<tr><th {th}>#</th><th {th}>Score</th>"
            f"<th {th}>Component</th><th {th}>Accepted</th></tr>"
        )
        history_table = (
            f"<table {style}>{history_header}{''.join(history_rows)}</table>"
        )
        history_section = (
            "<details><summary>Iteration History "
            f"({len(self.iteration_history)} records)</summary>"
            f"{history_table}</details>"
        )

        return (
            "<div>"
            "<strong>EvolutionResult</strong>"
            f"{summary_table}{comp_table}{history_section}"
            "</div>"
        )

reflection_reasoning property

reflection_reasoning: str | None

Return the reflection reasoning from the last iteration.

Convenience accessor for the most recent iteration's reasoning explaining why the reflection agent proposed its mutation.

RETURNS DESCRIPTION
str | None

The reasoning string from the last iteration record, or None

str | None

if no iterations exist or the last iteration has no reasoning.

improvement property

improvement: float

Calculate the score improvement from original to final.

RETURNS DESCRIPTION
float

The difference between final_score and original_score.

float

Positive values indicate improvement, negative indicates degradation.

Note

Override is not needed since frozen dataclasses support properties.

improved property

improved: bool

Check if the final score is better than the original.

RETURNS DESCRIPTION
bool

True if final_score > original_score, False otherwise.

Note

Only returns True for strict improvement, not equal scores.

to_dict

to_dict() -> dict[str, Any]

Serialize this result to a stdlib-only dict.

RETURNS DESCRIPTION
dict[str, Any]

Dict containing all fields. stop_reason is serialized

dict[str, Any]

as its string value. iteration_history is serialized as a

dict[str, Any]

list of dicts. Output is directly json.dumps()-compatible.

Source code in src/gepa_adk/domain/models.py
def to_dict(self) -> dict[str, Any]:
    """Serialize this result to a stdlib-only dict.

    Returns:
        Dict containing all fields. ``stop_reason`` is serialized
        as its string value. ``iteration_history`` is serialized as a
        list of dicts. Output is directly ``json.dumps()``-compatible.
    """
    return {
        "schema_version": self.schema_version,
        "stop_reason": self.stop_reason.value,
        "original_score": self.original_score,
        "final_score": self.final_score,
        "evolved_components": self.evolved_components,
        "iteration_history": [r.to_dict() for r in self.iteration_history],
        "total_iterations": self.total_iterations,
        "valset_score": self.valset_score,
        "trainset_score": self.trainset_score,
        "objective_scores": self.objective_scores,
        "original_components": self.original_components,
    }

from_dict classmethod

from_dict(data: dict[str, Any]) -> EvolutionResult

Reconstruct an EvolutionResult from a dict.

Validates schema version, applies migration if needed, and reconstructs all nested objects including optional original_components.

PARAMETER DESCRIPTION
data

Dict containing evolution result fields.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
EvolutionResult

Reconstructed EvolutionResult instance.

RAISES DESCRIPTION
ConfigurationError

If schema_version exceeds the current version or stop_reason is not a valid enum value.

KeyError

If a required field is missing from the dict.

Source code in src/gepa_adk/domain/models.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "EvolutionResult":
    """Reconstruct an EvolutionResult from a dict.

    Validates schema version, applies migration if needed, and
    reconstructs all nested objects including optional
    original_components.

    Args:
        data: Dict containing evolution result fields.

    Returns:
        Reconstructed EvolutionResult instance.

    Raises:
        ConfigurationError: If ``schema_version`` exceeds the current
            version or ``stop_reason`` is not a valid enum value.
        KeyError: If a required field is missing from the dict.
    """
    version = data.get("schema_version", 1)
    if version > CURRENT_SCHEMA_VERSION:
        raise ConfigurationError(
            f"Cannot deserialize result with schema_version {version} "
            f"(current version is {CURRENT_SCHEMA_VERSION}). "
            f"Upgrade gepa-adk to load this result.",
            field="schema_version",
            value=version,
            constraint=f"<= {CURRENT_SCHEMA_VERSION}",
        )
    migrated = _migrate_result_dict(data, from_version=version)
    try:
        stop_reason = StopReason(migrated.get("stop_reason", "completed"))
    except ValueError:
        raw = migrated.get("stop_reason")
        raise ConfigurationError(
            f"Invalid stop_reason value: {raw!r}",
            field="stop_reason",
            value=raw,
            constraint=("one of: " + ", ".join(sr.value for sr in StopReason)),
        ) from None
    return cls(
        schema_version=migrated["schema_version"],
        stop_reason=stop_reason,
        original_score=migrated["original_score"],
        final_score=migrated["final_score"],
        evolved_components=migrated["evolved_components"],
        iteration_history=[
            IterationRecord.from_dict(r)
            for r in migrated.get("iteration_history", [])
        ],
        total_iterations=migrated["total_iterations"],
        valset_score=migrated.get("valset_score"),
        trainset_score=migrated.get("trainset_score"),
        objective_scores=migrated.get("objective_scores"),
        original_components=migrated.get("original_components"),
    )

__repr__

__repr__() -> str

Narrative summary of the evolution result.

RETURNS DESCRIPTION
str

Human-readable multi-line summary with improvement percentage,

str

iterations, stop reason, component names, and acceptance rate.

str

Uses 2-space indent, no box-drawing characters, every line

str

greppable.

Source code in src/gepa_adk/domain/models.py
def __repr__(self) -> str:
    """Narrative summary of the evolution result.

    Returns:
        Human-readable multi-line summary with improvement percentage,
        iterations, stop reason, component names, and acceptance rate.
        Uses 2-space indent, no box-drawing characters, every line
        greppable.
    """
    if abs(self.original_score) < 1e-9:
        imp_str = f"{self.improvement:+.4f} improvement"
    else:
        pct = self.improvement * 100 / abs(self.original_score)
        sign = "+" if pct > 0 else ""
        imp_str = f"{sign}{pct:.1f}% improvement"
    lines = [
        f"EvolutionResult: {imp_str} "
        f"({self.original_score:.2f} \u2192 {self.final_score:.2f})",
        f"  iterations: {self.total_iterations}, "
        f"stop_reason: {self.stop_reason.value}",
        f"  components: {', '.join(sorted(self.evolved_components))}",
    ]
    if self.total_iterations > 0:
        accepted = sum(1 for r in self.iteration_history if r.accepted)
        lines.append(f"  acceptance_rate: {accepted}/{self.total_iterations}")
    return "\n".join(lines)

show_diff

show_diff(
    original_components: dict[str, str] | None = None,
) -> str

Show unified diff between original and evolved components.

Uses stored original_components if no explicit argument is provided. Produces git-diff-style output (---/+++/@@) for each component that changed.

PARAMETER DESCRIPTION
original_components

Pre-evolution component values. If None, falls back to self.original_components.

TYPE: dict[str, str] | None DEFAULT: None

RETURNS DESCRIPTION
str

Unified diff string, or "No changes detected." if all

str

components are identical.

RAISES DESCRIPTION
ValueError

If both the argument and self.original_components are None.

Source code in src/gepa_adk/domain/models.py
def show_diff(self, original_components: dict[str, str] | None = None) -> str:
    """Show unified diff between original and evolved components.

    Uses stored ``original_components`` if no explicit argument is
    provided. Produces git-diff-style output (``---``/``+++``/``@@``)
    for each component that changed.

    Args:
        original_components: Pre-evolution component values. If None,
            falls back to ``self.original_components``.

    Returns:
        Unified diff string, or ``"No changes detected."`` if all
        components are identical.

    Raises:
        ValueError: If both the argument and ``self.original_components``
            are None.
    """
    originals = (
        original_components
        if original_components is not None
        else self.original_components
    )
    if originals is None:
        raise ValueError(
            "No original components available. "
            "Pass original_components or use a result that stores them."
        )
    return _build_diff(self.evolved_components, originals)

FrontierType

Bases: str, Enum


              flowchart TD
              gepa_adk.FrontierType[FrontierType]

              

              click gepa_adk.FrontierType href "" "gepa_adk.FrontierType"
            

Supported frontier tracking strategies for Pareto selection.

Note

All four frontier types enable different Pareto dominance tracking strategies for multi-objective optimization.

Examples:

frontier_type = FrontierType.INSTANCE
Source code in src/gepa_adk/domain/types.py
class FrontierType(str, Enum):
    """Supported frontier tracking strategies for Pareto selection.

    Note:
        All four frontier types enable different Pareto dominance tracking
        strategies for multi-objective optimization.

    Examples:
        ```python
        frontier_type = FrontierType.INSTANCE
        ```
    """

    INSTANCE = "instance"
    OBJECTIVE = "objective"
    HYBRID = "hybrid"
    CARTESIAN = "cartesian"

IterationRecord dataclass

Captures metrics for a single evolution iteration.

This is an immutable record of what happened during one iteration of the evolution process. Records are created by the engine and stored in EvolutionResult.iteration_history.

ATTRIBUTE DESCRIPTION
iteration_number

1-indexed iteration number for human readability.

TYPE: int

score

Score achieved in this iteration (typically in [0.0, 1.0]).

TYPE: float

component_text

The component_text that was evaluated in this iteration (e.g., the instruction text for the "instruction" component).

TYPE: str

evolved_component

The name of the component that was evolved in this iteration (e.g., "instruction", "output_schema"). Used for tracking which component changed in round-robin evolution strategies.

TYPE: str

accepted

Whether this proposal was accepted as the new best.

TYPE: bool

objective_scores

Optional per-example multi-objective scores from the valset evaluation. None when adapter does not provide objective scores. Each dict maps objective name to score value. Index-aligned with evaluation batch examples.

TYPE: list[dict[str, float]] | None

reflection_reasoning

Optional natural language reasoning from the reflection agent explaining why the mutation was proposed. None when reasoning is not available (e.g., model without thinking support or older data without this field).

TYPE: str | None

Examples:

Creating an iteration record:

from gepa_adk.domain.models import IterationRecord

record = IterationRecord(
    iteration_number=1,
    score=0.85,
    component_text="Be helpful",
    evolved_component="instruction",
    accepted=True,
)
print(record.score)  # 0.85
print(record.evolved_component)  # "instruction"
print(record.accepted)  # True

Serialization round-trip:

d = record.to_dict()
restored = IterationRecord.from_dict(d)
assert restored.score == record.score
Note

An immutable record that captures iteration metrics. Once created, IterationRecord instances cannot be modified, ensuring historical accuracy of the evolution trace.

Source code in src/gepa_adk/domain/models.py
@dataclass(slots=True, frozen=True, kw_only=True)
class IterationRecord:
    """Captures metrics for a single evolution iteration.

    This is an immutable record of what happened during one iteration
    of the evolution process. Records are created by the engine and
    stored in EvolutionResult.iteration_history.

    Attributes:
        iteration_number (int): 1-indexed iteration number for human
            readability.
        score (float): Score achieved in this iteration (typically in
            [0.0, 1.0]).
        component_text (str): The component_text that was evaluated in this
            iteration (e.g., the instruction text for the "instruction" component).
        evolved_component (str): The name of the component that was evolved
            in this iteration (e.g., "instruction", "output_schema"). Used for
            tracking which component changed in round-robin evolution strategies.
        accepted (bool): Whether this proposal was accepted as the new best.
        objective_scores (list[dict[str, float]] | None): Optional per-example
            multi-objective scores from the valset evaluation. None when adapter
            does not provide objective scores. Each dict maps objective name to
            score value. Index-aligned with evaluation batch examples.
        reflection_reasoning (str | None): Optional natural language reasoning
            from the reflection agent explaining why the mutation was proposed.
            None when reasoning is not available (e.g., model without thinking
            support or older data without this field).

    Examples:
        Creating an iteration record:

        ```python
        from gepa_adk.domain.models import IterationRecord

        record = IterationRecord(
            iteration_number=1,
            score=0.85,
            component_text="Be helpful",
            evolved_component="instruction",
            accepted=True,
        )
        print(record.score)  # 0.85
        print(record.evolved_component)  # "instruction"
        print(record.accepted)  # True
        ```

        Serialization round-trip:

        ```python
        d = record.to_dict()
        restored = IterationRecord.from_dict(d)
        assert restored.score == record.score
        ```

    Note:
        An immutable record that captures iteration metrics. Once created,
        IterationRecord instances cannot be modified, ensuring historical
        accuracy of the evolution trace.
    """

    iteration_number: int
    score: float
    component_text: str
    evolved_component: str
    accepted: bool
    objective_scores: list[dict[str, float]] | None = None
    reflection_reasoning: str | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize this record to a stdlib-only dict.

        Returns:
            Dict containing all 7 fields. Output is directly
            ``json.dumps()``-compatible.
        """
        return {
            "iteration_number": self.iteration_number,
            "score": self.score,
            "component_text": self.component_text,
            "evolved_component": self.evolved_component,
            "accepted": self.accepted,
            "objective_scores": self.objective_scores,
            "reflection_reasoning": self.reflection_reasoning,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "IterationRecord":
        """Reconstruct an IterationRecord from a dict.

        Unknown keys are silently ignored for forward compatibility,
        allowing older code to load records produced by newer versions.
        Optional fields (``objective_scores``, ``reflection_reasoning``)
        default to None when missing from the input dict.

        Args:
            data: Dict containing iteration record fields.

        Returns:
            Reconstructed IterationRecord instance.

        Raises:
            KeyError: If a required field is missing from the dict.
        """
        return cls(
            iteration_number=data["iteration_number"],
            score=data["score"],
            component_text=data["component_text"],
            evolved_component=data["evolved_component"],
            accepted=data["accepted"],
            objective_scores=data.get("objective_scores"),
            reflection_reasoning=data.get("reflection_reasoning"),
        )

to_dict

to_dict() -> dict[str, Any]

Serialize this record to a stdlib-only dict.

RETURNS DESCRIPTION
dict[str, Any]

Dict containing all 7 fields. Output is directly

dict[str, Any]

json.dumps()-compatible.

Source code in src/gepa_adk/domain/models.py
def to_dict(self) -> dict[str, Any]:
    """Serialize this record to a stdlib-only dict.

    Returns:
        Dict containing all 7 fields. Output is directly
        ``json.dumps()``-compatible.
    """
    return {
        "iteration_number": self.iteration_number,
        "score": self.score,
        "component_text": self.component_text,
        "evolved_component": self.evolved_component,
        "accepted": self.accepted,
        "objective_scores": self.objective_scores,
        "reflection_reasoning": self.reflection_reasoning,
    }

from_dict classmethod

from_dict(data: dict[str, Any]) -> IterationRecord

Reconstruct an IterationRecord from a dict.

Unknown keys are silently ignored for forward compatibility, allowing older code to load records produced by newer versions. Optional fields (objective_scores, reflection_reasoning) default to None when missing from the input dict.

PARAMETER DESCRIPTION
data

Dict containing iteration record fields.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
IterationRecord

Reconstructed IterationRecord instance.

RAISES DESCRIPTION
KeyError

If a required field is missing from the dict.

Source code in src/gepa_adk/domain/models.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "IterationRecord":
    """Reconstruct an IterationRecord from a dict.

    Unknown keys are silently ignored for forward compatibility,
    allowing older code to load records produced by newer versions.
    Optional fields (``objective_scores``, ``reflection_reasoning``)
    default to None when missing from the input dict.

    Args:
        data: Dict containing iteration record fields.

    Returns:
        Reconstructed IterationRecord instance.

    Raises:
        KeyError: If a required field is missing from the dict.
    """
    return cls(
        iteration_number=data["iteration_number"],
        score=data["score"],
        component_text=data["component_text"],
        evolved_component=data["evolved_component"],
        accepted=data["accepted"],
        objective_scores=data.get("objective_scores"),
        reflection_reasoning=data.get("reflection_reasoning"),
    )

MultiAgentEvolutionResult dataclass

Outcome of a completed multi-agent evolution run.

Contains evolved component_text for all agents in the group, along with performance metrics and evolution history.

ATTRIBUTE DESCRIPTION
schema_version

Schema version for forward-compatible serialization.

TYPE: int

stop_reason

Why the evolution run terminated.

TYPE: StopReason

evolved_components

Mapping of agent name to evolved component_text.

TYPE: dict[str, str]

original_score

Starting performance score (baseline).

TYPE: float

final_score

Ending performance score (best achieved).

TYPE: float

primary_agent

Name of the agent whose output was used for scoring.

TYPE: str

iteration_history

Chronological list of iteration records.

TYPE: list[IterationRecord]

total_iterations

Number of iterations performed.

TYPE: int

original_components

Optional snapshot of pre-evolution component values. When present, enables zero-arg show_diff() calls. None for results created before this field was added or when originals were not captured.

TYPE: dict[str, str] | None

Examples:

Creating and analyzing a multi-agent result:

from gepa_adk.domain.models import MultiAgentEvolutionResult, IterationRecord

result = MultiAgentEvolutionResult(
    evolved_components={
        "generator": "Generate high-quality code",
        "critic": "Review code thoroughly",
    },
    original_components={
        "generator": "Generate code",
        "critic": "Review code",
    },
    original_score=0.60,
    final_score=0.85,
    primary_agent="generator",
    iteration_history=[],
    total_iterations=10,
)
print(result.improvement)  # 0.25
print(result.show_diff())  # unified diff of component changes
print(result.agent_names)  # ["critic", "generator"]
assert result.schema_version == 1

Serialization round-trip:

import json

d = result.to_dict()
restored = MultiAgentEvolutionResult.from_dict(json.loads(json.dumps(d)))
Note

An immutable result container for multi-agent evolution. Once created, MultiAgentEvolutionResult instances cannot be modified. Use computed properties like improvement, improved, and agent_names to analyze results without modifying the underlying data.

Source code in src/gepa_adk/domain/models.py
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
@dataclass(slots=True, frozen=True, kw_only=True)
class MultiAgentEvolutionResult:
    """Outcome of a completed multi-agent evolution run.

    Contains evolved component_text for all agents in the group,
    along with performance metrics and evolution history.

    Attributes:
        schema_version (int): Schema version for forward-compatible serialization.
        stop_reason (StopReason): Why the evolution run terminated.
        evolved_components (dict[str, str]): Mapping of agent name to evolved
            component_text.
        original_score (float): Starting performance score (baseline).
        final_score (float): Ending performance score (best achieved).
        primary_agent (str): Name of the agent whose output was used for scoring.
        iteration_history (list[IterationRecord]): Chronological list of iteration records.
        total_iterations (int): Number of iterations performed.
        original_components (dict[str, str] | None): Optional snapshot of
            pre-evolution component values. When present, enables zero-arg
            ``show_diff()`` calls. None for results created before this field
            was added or when originals were not captured.

    Examples:
        Creating and analyzing a multi-agent result:

        ```python
        from gepa_adk.domain.models import MultiAgentEvolutionResult, IterationRecord

        result = MultiAgentEvolutionResult(
            evolved_components={
                "generator": "Generate high-quality code",
                "critic": "Review code thoroughly",
            },
            original_components={
                "generator": "Generate code",
                "critic": "Review code",
            },
            original_score=0.60,
            final_score=0.85,
            primary_agent="generator",
            iteration_history=[],
            total_iterations=10,
        )
        print(result.improvement)  # 0.25
        print(result.show_diff())  # unified diff of component changes
        print(result.agent_names)  # ["critic", "generator"]
        assert result.schema_version == 1
        ```

        Serialization round-trip:

        ```python
        import json

        d = result.to_dict()
        restored = MultiAgentEvolutionResult.from_dict(json.loads(json.dumps(d)))
        ```

    Note:
        An immutable result container for multi-agent evolution. Once created,
        MultiAgentEvolutionResult instances cannot be modified. Use computed
        properties like `improvement`, `improved`, and `agent_names` to analyze
        results without modifying the underlying data.
    """

    schema_version: int = CURRENT_SCHEMA_VERSION
    stop_reason: StopReason = StopReason.COMPLETED
    evolved_components: dict[str, str]
    original_score: float
    final_score: float
    primary_agent: str
    iteration_history: list[IterationRecord]
    total_iterations: int
    original_components: dict[str, str] | None = None

    def to_dict(self) -> dict[str, Any]:
        """Serialize this result to a stdlib-only dict.

        Returns:
            Dict containing all 9 fields. ``stop_reason`` is serialized
            as its string value. ``iteration_history`` is serialized as a
            list of dicts. Output is directly ``json.dumps()``-compatible.
        """
        return {
            "schema_version": self.schema_version,
            "stop_reason": self.stop_reason.value,
            "evolved_components": self.evolved_components,
            "original_score": self.original_score,
            "final_score": self.final_score,
            "primary_agent": self.primary_agent,
            "iteration_history": [r.to_dict() for r in self.iteration_history],
            "total_iterations": self.total_iterations,
            "original_components": self.original_components,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "MultiAgentEvolutionResult":
        """Reconstruct a MultiAgentEvolutionResult from a dict.

        Validates schema version, applies migration if needed, and
        reconstructs all nested objects including optional
        original_components.

        Args:
            data: Dict containing multi-agent evolution result fields.

        Returns:
            Reconstructed MultiAgentEvolutionResult instance.

        Raises:
            ConfigurationError: If ``schema_version`` exceeds the current
                version or ``stop_reason`` is not a valid enum value.
            KeyError: If a required field is missing from the dict.
        """
        version = data.get("schema_version", 1)
        if version > CURRENT_SCHEMA_VERSION:
            raise ConfigurationError(
                f"Cannot deserialize result with schema_version {version} "
                f"(current version is {CURRENT_SCHEMA_VERSION}). "
                f"Upgrade gepa-adk to load this result.",
                field="schema_version",
                value=version,
                constraint=f"<= {CURRENT_SCHEMA_VERSION}",
            )
        migrated = _migrate_result_dict(data, from_version=version)
        try:
            stop_reason = StopReason(migrated.get("stop_reason", "completed"))
        except ValueError:
            raw = migrated.get("stop_reason")
            raise ConfigurationError(
                f"Invalid stop_reason value: {raw!r}",
                field="stop_reason",
                value=raw,
                constraint=("one of: " + ", ".join(sr.value for sr in StopReason)),
            ) from None
        return cls(
            schema_version=migrated["schema_version"],
            stop_reason=stop_reason,
            evolved_components=migrated["evolved_components"],
            original_score=migrated["original_score"],
            final_score=migrated["final_score"],
            primary_agent=migrated["primary_agent"],
            iteration_history=[
                IterationRecord.from_dict(r)
                for r in migrated.get("iteration_history", [])
            ],
            total_iterations=migrated["total_iterations"],
            original_components=migrated.get("original_components"),
        )

    @property
    def improvement(self) -> float:
        """Calculate the score improvement from original to final.

        Returns:
            The difference between final_score and original_score.
            Positive values indicate improvement, negative indicates degradation.

        Note:
            Override is not needed since frozen dataclasses support properties.
        """
        return self.final_score - self.original_score

    @property
    def improved(self) -> bool:
        """Check if the final score is better than the original.

        Returns:
            True if final_score > original_score, False otherwise.

        Note:
            Only returns True for strict improvement, not equal scores.
        """
        return self.final_score > self.original_score

    @property
    def agent_names(self) -> list[str]:
        """Get sorted list of evolved agent names.

        Returns:
            Sorted list of agent names from evolved_components keys.

        Note:
            Outputs a new list each time, sorted alphabetically for
            consistent ordering regardless of insertion order.
        """
        return sorted(self.evolved_components.keys())

    def __repr__(self) -> str:
        """Narrative summary of the multi-agent evolution result.

        Returns:
            Human-readable multi-line summary with improvement percentage,
            iterations, stop reason, primary agent, agent names, and
            acceptance rate. Uses 2-space indent, no box-drawing characters,
            every line greppable.
        """
        if abs(self.original_score) < 1e-9:
            imp_str = f"{self.improvement:+.4f} improvement"
        else:
            pct = self.improvement * 100 / abs(self.original_score)
            sign = "+" if pct > 0 else ""
            imp_str = f"{sign}{pct:.1f}% improvement"
        lines = [
            f"MultiAgentEvolutionResult: {imp_str} "
            f"({self.original_score:.2f} \u2192 {self.final_score:.2f})",
            f"  iterations: {self.total_iterations}, "
            f"stop_reason: {self.stop_reason.value}",
            f"  primary_agent: {self.primary_agent}",
            f"  agents: {', '.join(sorted(self.evolved_components))}",
        ]
        if self.total_iterations > 0:
            accepted = sum(1 for r in self.iteration_history if r.accepted)
            lines.append(f"  acceptance_rate: {accepted}/{self.total_iterations}")
        return "\n".join(lines)

    def show_diff(self, original_components: dict[str, str] | None = None) -> str:
        """Show unified diff between original and evolved components.

        Uses stored ``original_components`` if no explicit argument is
        provided. Produces git-diff-style output (``---``/``+++``/``@@``)
        for each component that changed.

        Args:
            original_components: Pre-evolution component values. If None,
                falls back to ``self.original_components``.

        Returns:
            Unified diff string, or ``"No changes detected."`` if all
            components are identical.

        Raises:
            ValueError: If both the argument and ``self.original_components``
                are None.
        """
        originals = (
            original_components
            if original_components is not None
            else self.original_components
        )
        if originals is None:
            raise ValueError(
                "No original components available. "
                "Pass original_components or use a result that stores them."
            )
        return _build_diff(self.evolved_components, originals)

    def _repr_html_(self) -> str:
        """Render an HTML summary for Jupyter notebooks.

        Returns:
            HTML string with summary and components tables. Uses semantic
            ``<th>`` header elements and inline CSS for portability across
            JupyterLab, Colab, and VS Code. Iteration history is wrapped
            in a collapsible ``<details>``/``<summary>`` block.
        """
        if abs(self.original_score) < 1e-9:
            imp_html = f"{self.improvement:+.4f}"
        else:
            pct = self.improvement * 100 / abs(self.original_score)
            sign = "+" if pct > 0 else ""
            imp_html = f"{sign}{pct:.1f}%"
        style = (
            'style="border-collapse:collapse;'
            "font-family:monospace;font-size:13px;"
            'margin:8px 0"'
        )
        td = 'style="padding:4px 12px;border:1px solid #ddd"'
        th = 'style="padding:4px 12px;border:1px solid #ddd;background:#f5f5f5"'

        rows = [
            f"<tr><th {th}>Improvement</th><td {td}>{imp_html}</td></tr>",
            f"<tr><th {th}>Original Score</th>"
            f"<td {td}>{self.original_score:.4f}</td></tr>",
            f"<tr><th {th}>Final Score</th><td {td}>{self.final_score:.4f}</td></tr>",
            f"<tr><th {th}>Iterations</th><td {td}>{self.total_iterations}</td></tr>",
            f"<tr><th {th}>Stop Reason</th>"
            f"<td {td}>{html_mod.escape(self.stop_reason.value)}</td></tr>",
            f"<tr><th {th}>Primary Agent</th>"
            f"<td {td}>{html_mod.escape(self.primary_agent)}</td></tr>",
        ]
        summary_table = f"<table {style}>{''.join(rows)}</table>"

        comp_rows = []
        for name in sorted(self.evolved_components):
            val = html_mod.escape(_truncate(self.evolved_components[name], 200))
            comp_rows.append(
                f"<tr><td {td}>{html_mod.escape(name)}</td><td {td}>{val}</td></tr>"
            )
        comp_header = f"<tr><th {th}>Agent</th><th {th}>Evolved Value</th></tr>"
        comp_table = f"<table {style}>{comp_header}{''.join(comp_rows)}</table>"

        history_rows = []
        for rec in self.iteration_history:
            accepted_mark = "\u2713" if rec.accepted else ""
            history_rows.append(
                f"<tr><td {td}>{rec.iteration_number}</td>"
                f"<td {td}>{rec.score:.4f}</td>"
                f"<td {td}>{html_mod.escape(rec.evolved_component)}</td>"
                f"<td {td}>{accepted_mark}</td></tr>"
            )
        history_header = (
            f"<tr><th {th}>#</th><th {th}>Score</th>"
            f"<th {th}>Component</th><th {th}>Accepted</th></tr>"
        )
        history_table = (
            f"<table {style}>{history_header}{''.join(history_rows)}</table>"
        )
        history_section = (
            "<details><summary>Iteration History "
            f"({len(self.iteration_history)} records)</summary>"
            f"{history_table}</details>"
        )

        return (
            "<div>"
            "<strong>MultiAgentEvolutionResult</strong>"
            f"{summary_table}{comp_table}{history_section}"
            "</div>"
        )

improvement property

improvement: float

Calculate the score improvement from original to final.

RETURNS DESCRIPTION
float

The difference between final_score and original_score.

float

Positive values indicate improvement, negative indicates degradation.

Note

Override is not needed since frozen dataclasses support properties.

improved property

improved: bool

Check if the final score is better than the original.

RETURNS DESCRIPTION
bool

True if final_score > original_score, False otherwise.

Note

Only returns True for strict improvement, not equal scores.

agent_names property

agent_names: list[str]

Get sorted list of evolved agent names.

RETURNS DESCRIPTION
list[str]

Sorted list of agent names from evolved_components keys.

Note

Outputs a new list each time, sorted alphabetically for consistent ordering regardless of insertion order.

to_dict

to_dict() -> dict[str, Any]

Serialize this result to a stdlib-only dict.

RETURNS DESCRIPTION
dict[str, Any]

Dict containing all 9 fields. stop_reason is serialized

dict[str, Any]

as its string value. iteration_history is serialized as a

dict[str, Any]

list of dicts. Output is directly json.dumps()-compatible.

Source code in src/gepa_adk/domain/models.py
def to_dict(self) -> dict[str, Any]:
    """Serialize this result to a stdlib-only dict.

    Returns:
        Dict containing all 9 fields. ``stop_reason`` is serialized
        as its string value. ``iteration_history`` is serialized as a
        list of dicts. Output is directly ``json.dumps()``-compatible.
    """
    return {
        "schema_version": self.schema_version,
        "stop_reason": self.stop_reason.value,
        "evolved_components": self.evolved_components,
        "original_score": self.original_score,
        "final_score": self.final_score,
        "primary_agent": self.primary_agent,
        "iteration_history": [r.to_dict() for r in self.iteration_history],
        "total_iterations": self.total_iterations,
        "original_components": self.original_components,
    }

from_dict classmethod

from_dict(
    data: dict[str, Any],
) -> MultiAgentEvolutionResult

Reconstruct a MultiAgentEvolutionResult from a dict.

Validates schema version, applies migration if needed, and reconstructs all nested objects including optional original_components.

PARAMETER DESCRIPTION
data

Dict containing multi-agent evolution result fields.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
MultiAgentEvolutionResult

Reconstructed MultiAgentEvolutionResult instance.

RAISES DESCRIPTION
ConfigurationError

If schema_version exceeds the current version or stop_reason is not a valid enum value.

KeyError

If a required field is missing from the dict.

Source code in src/gepa_adk/domain/models.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "MultiAgentEvolutionResult":
    """Reconstruct a MultiAgentEvolutionResult from a dict.

    Validates schema version, applies migration if needed, and
    reconstructs all nested objects including optional
    original_components.

    Args:
        data: Dict containing multi-agent evolution result fields.

    Returns:
        Reconstructed MultiAgentEvolutionResult instance.

    Raises:
        ConfigurationError: If ``schema_version`` exceeds the current
            version or ``stop_reason`` is not a valid enum value.
        KeyError: If a required field is missing from the dict.
    """
    version = data.get("schema_version", 1)
    if version > CURRENT_SCHEMA_VERSION:
        raise ConfigurationError(
            f"Cannot deserialize result with schema_version {version} "
            f"(current version is {CURRENT_SCHEMA_VERSION}). "
            f"Upgrade gepa-adk to load this result.",
            field="schema_version",
            value=version,
            constraint=f"<= {CURRENT_SCHEMA_VERSION}",
        )
    migrated = _migrate_result_dict(data, from_version=version)
    try:
        stop_reason = StopReason(migrated.get("stop_reason", "completed"))
    except ValueError:
        raw = migrated.get("stop_reason")
        raise ConfigurationError(
            f"Invalid stop_reason value: {raw!r}",
            field="stop_reason",
            value=raw,
            constraint=("one of: " + ", ".join(sr.value for sr in StopReason)),
        ) from None
    return cls(
        schema_version=migrated["schema_version"],
        stop_reason=stop_reason,
        evolved_components=migrated["evolved_components"],
        original_score=migrated["original_score"],
        final_score=migrated["final_score"],
        primary_agent=migrated["primary_agent"],
        iteration_history=[
            IterationRecord.from_dict(r)
            for r in migrated.get("iteration_history", [])
        ],
        total_iterations=migrated["total_iterations"],
        original_components=migrated.get("original_components"),
    )

__repr__

__repr__() -> str

Narrative summary of the multi-agent evolution result.

RETURNS DESCRIPTION
str

Human-readable multi-line summary with improvement percentage,

str

iterations, stop reason, primary agent, agent names, and

str

acceptance rate. Uses 2-space indent, no box-drawing characters,

str

every line greppable.

Source code in src/gepa_adk/domain/models.py
def __repr__(self) -> str:
    """Narrative summary of the multi-agent evolution result.

    Returns:
        Human-readable multi-line summary with improvement percentage,
        iterations, stop reason, primary agent, agent names, and
        acceptance rate. Uses 2-space indent, no box-drawing characters,
        every line greppable.
    """
    if abs(self.original_score) < 1e-9:
        imp_str = f"{self.improvement:+.4f} improvement"
    else:
        pct = self.improvement * 100 / abs(self.original_score)
        sign = "+" if pct > 0 else ""
        imp_str = f"{sign}{pct:.1f}% improvement"
    lines = [
        f"MultiAgentEvolutionResult: {imp_str} "
        f"({self.original_score:.2f} \u2192 {self.final_score:.2f})",
        f"  iterations: {self.total_iterations}, "
        f"stop_reason: {self.stop_reason.value}",
        f"  primary_agent: {self.primary_agent}",
        f"  agents: {', '.join(sorted(self.evolved_components))}",
    ]
    if self.total_iterations > 0:
        accepted = sum(1 for r in self.iteration_history if r.accepted)
        lines.append(f"  acceptance_rate: {accepted}/{self.total_iterations}")
    return "\n".join(lines)

show_diff

show_diff(
    original_components: dict[str, str] | None = None,
) -> str

Show unified diff between original and evolved components.

Uses stored original_components if no explicit argument is provided. Produces git-diff-style output (---/+++/@@) for each component that changed.

PARAMETER DESCRIPTION
original_components

Pre-evolution component values. If None, falls back to self.original_components.

TYPE: dict[str, str] | None DEFAULT: None

RETURNS DESCRIPTION
str

Unified diff string, or "No changes detected." if all

str

components are identical.

RAISES DESCRIPTION
ValueError

If both the argument and self.original_components are None.

Source code in src/gepa_adk/domain/models.py
def show_diff(self, original_components: dict[str, str] | None = None) -> str:
    """Show unified diff between original and evolved components.

    Uses stored ``original_components`` if no explicit argument is
    provided. Produces git-diff-style output (``---``/``+++``/``@@``)
    for each component that changed.

    Args:
        original_components: Pre-evolution component values. If None,
            falls back to ``self.original_components``.

    Returns:
        Unified diff string, or ``"No changes detected."`` if all
        components are identical.

    Raises:
        ValueError: If both the argument and ``self.original_components``
            are None.
    """
    originals = (
        original_components
        if original_components is not None
        else self.original_components
    )
    if originals is None:
        raise ValueError(
            "No original components available. "
            "Pass original_components or use a result that stores them."
        )
    return _build_diff(self.evolved_components, originals)

SchemaConstraints dataclass

Constraints for output schema evolution.

Controls which fields must be preserved during schema evolution, including field existence and type constraints. Used by the OutputSchemaHandler to validate proposed schema mutations.

ATTRIBUTE DESCRIPTION
required_fields

Field names that must exist in evolved schemas. Mutations removing these fields are rejected.

TYPE: tuple[str, ...]

preserve_types

Mapping of field names to allowed type(s). Mutations changing a field's type to an incompatible type are rejected.

TYPE: dict[str, type | tuple[type, ...]]

Examples:

Preserve required fields only:

from gepa_adk.domain.types import SchemaConstraints

constraints = SchemaConstraints(
    required_fields=("score", "feedback"),
)

Preserve required fields with type constraints:

constraints = SchemaConstraints(
    required_fields=("score",),
    preserve_types={
        "score": (float, int),  # Allow numeric types
        "order_id": str,  # Must stay string
    },
)
Note

A frozen dataclass ensures immutability during evolution runs. Configuration is validated at evolution start.

Source code in src/gepa_adk/domain/types.py
@dataclass(frozen=True, slots=True)
class SchemaConstraints:
    """Constraints for output schema evolution.

    Controls which fields must be preserved during schema evolution,
    including field existence and type constraints. Used by the
    OutputSchemaHandler to validate proposed schema mutations.

    Attributes:
        required_fields (tuple[str, ...]): Field names that must exist
            in evolved schemas. Mutations removing these fields are rejected.
        preserve_types (dict[str, type | tuple[type, ...]]): Mapping of
            field names to allowed type(s). Mutations changing a field's
            type to an incompatible type are rejected.

    Examples:
        Preserve required fields only:

        ```python
        from gepa_adk.domain.types import SchemaConstraints

        constraints = SchemaConstraints(
            required_fields=("score", "feedback"),
        )
        ```

        Preserve required fields with type constraints:

        ```python
        constraints = SchemaConstraints(
            required_fields=("score",),
            preserve_types={
                "score": (float, int),  # Allow numeric types
                "order_id": str,  # Must stay string
            },
        )
        ```

    Note:
        A frozen dataclass ensures immutability during evolution runs.
        Configuration is validated at evolution start.
    """

    required_fields: tuple[str, ...] = ()
    preserve_types: dict[str, type | tuple[type, ...]] = field(default_factory=dict)

StopReason

Bases: str, Enum


              flowchart TD
              gepa_adk.StopReason[StopReason]

              

              click gepa_adk.StopReason href "" "gepa_adk.StopReason"
            

Why an evolution run terminated.

Each value represents a distinct termination condition. The engine sets the appropriate value when building the final EvolutionResult.

KEYBOARD_INTERRUPT, TIMEOUT, and CANCELLED are defined for future graceful interrupt support and are not set by the engine in this release.

Examples:

reason = StopReason.MAX_ITERATIONS
assert reason == "max_iterations"
Source code in src/gepa_adk/domain/types.py
class StopReason(str, Enum):
    """Why an evolution run terminated.

    Each value represents a distinct termination condition. The engine sets the
    appropriate value when building the final ``EvolutionResult``.

    ``KEYBOARD_INTERRUPT``, ``TIMEOUT``, and ``CANCELLED`` are defined for
    future graceful interrupt support and are not set by the engine in this
    release.

    Examples:
        ```python
        reason = StopReason.MAX_ITERATIONS
        assert reason == "max_iterations"
        ```
    """

    COMPLETED = "completed"
    MAX_ITERATIONS = "max_iterations"
    STOPPER_TRIGGERED = "stopper_triggered"
    KEYBOARD_INTERRUPT = "keyboard_interrupt"
    TIMEOUT = "timeout"
    CANCELLED = "cancelled"

TrajectoryConfig dataclass

Configuration for trajectory extraction behavior.

Controls which components are extracted from ADK event streams, whether sensitive data should be redacted, and whether large values should be truncated.

Note

All configuration fields are immutable after instantiation, ensuring consistent extraction behavior throughout evolution.

ATTRIBUTE DESCRIPTION
include_tool_calls

Extract tool/function call records. Defaults to True.

TYPE: bool

include_state_deltas

Extract session state changes. Defaults to True.

TYPE: bool

include_token_usage

Extract LLM token consumption metrics. Defaults to True.

TYPE: bool

redact_sensitive

Apply sensitive data redaction. When True, fields matching sensitive_keys will be replaced with "[REDACTED]". Defaults to True for secure-by-default behavior.

TYPE: bool

sensitive_keys

Field names to redact via exact match. Case-sensitive. Defaults to DEFAULT_SENSITIVE_KEYS.

TYPE: tuple[str, ...]

max_string_length

Truncate strings longer than this with a marker indicating truncation. None disables truncation. Defaults to 10000 characters.

TYPE: int | None

Examples:

Default configuration (secure, with truncation):

config = TrajectoryConfig()  # All features enabled, redaction ON

Minimal configuration (tool calls only):

config = TrajectoryConfig(
    include_tool_calls=True,
    include_state_deltas=False,
    include_token_usage=False,
    redact_sensitive=False,
)

Custom sensitive keys (extends defaults):

config = TrajectoryConfig(
    sensitive_keys=DEFAULT_SENSITIVE_KEYS + ("ssn",),
    max_string_length=5000,  # Truncate DOM/screenshots earlier
)

Disable truncation (keep full values):

config = TrajectoryConfig(max_string_length=None)
Note

Redaction takes precedence over truncation. Sensitive values are replaced with "[REDACTED]" first, then truncation applies to the remaining (non-sensitive) strings.

Source code in src/gepa_adk/domain/types.py
@dataclass(frozen=True, slots=True)
class TrajectoryConfig:
    """Configuration for trajectory extraction behavior.

    Controls which components are extracted from ADK event streams,
    whether sensitive data should be redacted, and whether large
    values should be truncated.

    Note:
        All configuration fields are immutable after instantiation,
        ensuring consistent extraction behavior throughout evolution.

    Attributes:
        include_tool_calls (bool): Extract tool/function call records.
            Defaults to True.
        include_state_deltas (bool): Extract session state changes.
            Defaults to True.
        include_token_usage (bool): Extract LLM token consumption metrics.
            Defaults to True.
        redact_sensitive (bool): Apply sensitive data redaction. When True,
            fields matching sensitive_keys will be replaced with "[REDACTED]".
            Defaults to True for secure-by-default behavior.
        sensitive_keys (tuple[str, ...]): Field names to redact via exact
            match. Case-sensitive. Defaults to ``DEFAULT_SENSITIVE_KEYS``.
        max_string_length (int | None): Truncate strings longer than this
            with a marker indicating truncation. None disables truncation.
            Defaults to 10000 characters.

    Examples:
        Default configuration (secure, with truncation):

        ```python
        config = TrajectoryConfig()  # All features enabled, redaction ON
        ```

        Minimal configuration (tool calls only):

        ```python
        config = TrajectoryConfig(
            include_tool_calls=True,
            include_state_deltas=False,
            include_token_usage=False,
            redact_sensitive=False,
        )
        ```

        Custom sensitive keys (extends defaults):

        ```python
        config = TrajectoryConfig(
            sensitive_keys=DEFAULT_SENSITIVE_KEYS + ("ssn",),
            max_string_length=5000,  # Truncate DOM/screenshots earlier
        )
        ```

        Disable truncation (keep full values):

        ```python
        config = TrajectoryConfig(max_string_length=None)
        ```

    Note:
        Redaction takes precedence over truncation. Sensitive values are
        replaced with "[REDACTED]" first, then truncation applies to the
        remaining (non-sensitive) strings.
    """

    include_tool_calls: bool = True
    include_state_deltas: bool = True
    include_token_usage: bool = True
    redact_sensitive: bool = True
    sensitive_keys: tuple[str, ...] = DEFAULT_SENSITIVE_KEYS
    max_string_length: int | None = 10000

VideoValidationError

Bases: ConfigurationError


              flowchart TD
              gepa_adk.VideoValidationError[VideoValidationError]
              gepa_adk.domain.exceptions.ConfigurationError[ConfigurationError]
              gepa_adk.domain.exceptions.EvolutionError[EvolutionError]

                              gepa_adk.domain.exceptions.ConfigurationError --> gepa_adk.VideoValidationError
                                gepa_adk.domain.exceptions.EvolutionError --> gepa_adk.domain.exceptions.ConfigurationError
                



              click gepa_adk.VideoValidationError href "" "gepa_adk.VideoValidationError"
              click gepa_adk.domain.exceptions.ConfigurationError href "" "gepa_adk.domain.exceptions.ConfigurationError"
              click gepa_adk.domain.exceptions.EvolutionError href "" "gepa_adk.domain.exceptions.EvolutionError"
            

Raised when video file validation fails.

This exception is raised during video file processing when a video file does not exist, exceeds size limits, or has an invalid MIME type.

ATTRIBUTE DESCRIPTION
video_path

The path to the video file that failed validation.

TYPE: str

field

The configuration field name (default "video").

TYPE: str

constraint

Description of the validation constraint that was violated.

TYPE: str

Examples:

Raising a video validation error:

from gepa_adk.domain.exceptions import VideoValidationError

raise VideoValidationError(
    "Video file not found",
    video_path="/path/to/missing.mp4",
    constraint="file must exist",
)

Handling video validation errors:

from gepa_adk.domain.exceptions import VideoValidationError

try:
    await video_service.prepare_video_parts(["/bad/path.mp4"])
except VideoValidationError as e:
    print(f"Invalid video: {e.video_path}")
    print(f"Constraint violated: {e.constraint}")
Note

Arises from video file validation failures during multimodal input processing. File existence, size limits (2GB), and MIME type (video/*) are validated before loading video content.

Source code in src/gepa_adk/domain/exceptions.py
class VideoValidationError(ConfigurationError):
    """Raised when video file validation fails.

    This exception is raised during video file processing when a video
    file does not exist, exceeds size limits, or has an invalid MIME type.

    Attributes:
        video_path (str): The path to the video file that failed validation.
        field (str): The configuration field name (default "video").
        constraint (str): Description of the validation constraint that was violated.

    Examples:
        Raising a video validation error:

        ```python
        from gepa_adk.domain.exceptions import VideoValidationError

        raise VideoValidationError(
            "Video file not found",
            video_path="/path/to/missing.mp4",
            constraint="file must exist",
        )
        ```

        Handling video validation errors:

        ```python
        from gepa_adk.domain.exceptions import VideoValidationError

        try:
            await video_service.prepare_video_parts(["/bad/path.mp4"])
        except VideoValidationError as e:
            print(f"Invalid video: {e.video_path}")
            print(f"Constraint violated: {e.constraint}")
        ```

    Note:
        Arises from video file validation failures during multimodal input
        processing. File existence, size limits (2GB), and MIME type
        (video/*) are validated before loading video content.
    """

    def __init__(
        self,
        message: str,
        *,
        video_path: str,
        field: str = "video",
        constraint: str,
    ) -> None:
        """Initialize VideoValidationError with video file context.

        Args:
            message: Human-readable error description.
            video_path: The path to the video file that failed validation.
            field: Name of the configuration field (default "video").
            constraint: Description of the validation constraint violated.

        Note:
            Context fields use keyword-only syntax to ensure explicit labeling
            and prevent positional argument mistakes.
        """
        super().__init__(message, field=field, value=video_path, constraint=constraint)
        self.video_path = video_path

    def __str__(self) -> str:
        """Return string representation with video path context.

        Returns:
            Formatted error message including video_path and constraint.

        Note:
            Outputs formatted error message with video_path for easy
            identification of the problematic file in error logs.
        """
        base = Exception.__str__(self)
        return (
            f"{base} [video_path={self.video_path!r}, constraint={self.constraint!r}]"
        )

__init__

__init__(
    message: str,
    *,
    video_path: str,
    field: str = "video",
    constraint: str,
) -> None

Initialize VideoValidationError with video file context.

PARAMETER DESCRIPTION
message

Human-readable error description.

TYPE: str

video_path

The path to the video file that failed validation.

TYPE: str

field

Name of the configuration field (default "video").

TYPE: str DEFAULT: 'video'

constraint

Description of the validation constraint violated.

TYPE: str

Note

Context fields use keyword-only syntax to ensure explicit labeling and prevent positional argument mistakes.

Source code in src/gepa_adk/domain/exceptions.py
def __init__(
    self,
    message: str,
    *,
    video_path: str,
    field: str = "video",
    constraint: str,
) -> None:
    """Initialize VideoValidationError with video file context.

    Args:
        message: Human-readable error description.
        video_path: The path to the video file that failed validation.
        field: Name of the configuration field (default "video").
        constraint: Description of the validation constraint violated.

    Note:
        Context fields use keyword-only syntax to ensure explicit labeling
        and prevent positional argument mistakes.
    """
    super().__init__(message, field=field, value=video_path, constraint=constraint)
    self.video_path = video_path

__str__

__str__() -> str

Return string representation with video path context.

RETURNS DESCRIPTION
str

Formatted error message including video_path and constraint.

Note

Outputs formatted error message with video_path for easy identification of the problematic file in error logs.

Source code in src/gepa_adk/domain/exceptions.py
def __str__(self) -> str:
    """Return string representation with video path context.

    Returns:
        Formatted error message including video_path and constraint.

    Note:
        Outputs formatted error message with video_path for easy
        identification of the problematic file in error logs.
    """
    base = Exception.__str__(self)
    return (
        f"{base} [video_path={self.video_path!r}, constraint={self.constraint!r}]"
    )

AsyncGEPAEngine

Bases: Generic[DataInst, Trajectory, RolloutOutput]


              flowchart TD
              gepa_adk.AsyncGEPAEngine[AsyncGEPAEngine]

              

              click gepa_adk.AsyncGEPAEngine href "" "gepa_adk.AsyncGEPAEngine"
            

Async evolution engine orchestrating the GEPA loop.

This engine executes the core evolution algorithm: 1. Evaluate baseline candidate 2. For each iteration until max_iterations or convergence: a. Generate reflective dataset from traces b. Propose new candidate text c. Evaluate proposal d. Accept if improves above threshold e. Record iteration 3. Return frozen EvolutionResult

ATTRIBUTE DESCRIPTION
adapter

Implementation of AsyncGEPAAdapter protocol.

TYPE: AsyncGEPAAdapter

config

Evolution parameters.

TYPE: EvolutionConfig

Examples:

Basic usage:

from gepa_adk.engine import AsyncGEPAEngine
from gepa_adk.domain.models import EvolutionConfig, Candidate

engine = AsyncGEPAEngine(
    adapter=my_adapter,
    config=EvolutionConfig(max_iterations=50),
    initial_candidate=Candidate(components={"instruction": "Be helpful"}),
    batch=training_data,
)
result = await engine.run()
print(f"Final score: {result.final_score}")
Note

Avoid reusing engine instances after run() completes. When a seeded rng is provided, it is used for the auto-created merge proposer. The API layer passes the same rng to candidate selectors for full determinism across stochastic components.

Source code in src/gepa_adk/engine/async_engine.py
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
class AsyncGEPAEngine(Generic[DataInst, Trajectory, RolloutOutput]):
    """Async evolution engine orchestrating the GEPA loop.

    This engine executes the core evolution algorithm:
    1. Evaluate baseline candidate
    2. For each iteration until max_iterations or convergence:
       a. Generate reflective dataset from traces
       b. Propose new candidate text
       c. Evaluate proposal
       d. Accept if improves above threshold
       e. Record iteration
    3. Return frozen EvolutionResult

    Attributes:
        adapter (AsyncGEPAAdapter): Implementation of AsyncGEPAAdapter protocol.
        config (EvolutionConfig): Evolution parameters.

    Examples:
        Basic usage:

        ```python
        from gepa_adk.engine import AsyncGEPAEngine
        from gepa_adk.domain.models import EvolutionConfig, Candidate

        engine = AsyncGEPAEngine(
            adapter=my_adapter,
            config=EvolutionConfig(max_iterations=50),
            initial_candidate=Candidate(components={"instruction": "Be helpful"}),
            batch=training_data,
        )
        result = await engine.run()
        print(f"Final score: {result.final_score}")
        ```

    Note:
        Avoid reusing engine instances after run() completes.
        When a seeded ``rng`` is provided, it is used for the auto-created
        merge proposer. The API layer passes the same ``rng`` to candidate
        selectors for full determinism across stochastic components.
    """

    def __init__(
        self,
        adapter: AsyncGEPAAdapter[DataInst, Trajectory, RolloutOutput],
        config: EvolutionConfig,
        initial_candidate: Candidate,
        batch: list[DataInst],
        valset: list[DataInst] | None = None,
        candidate_selector: CandidateSelectorProtocol | None = None,
        component_selector: ComponentSelectorProtocol | None = None,
        evaluation_policy: EvaluationPolicyProtocol | None = None,
        merge_proposer: ProposerProtocol | None = None,
        rng: random.Random | None = None,
    ) -> None:
        """Initialize the evolution engine.

        Args:
            adapter: Implementation of AsyncGEPAAdapter protocol for evaluation
                and proposal generation.
            config: Evolution parameters controlling iterations, thresholds,
                and early stopping.
            initial_candidate: Starting candidate with at least one component.
            batch: Trainset data instances for reflection and mutation.
            valset: Optional validation data for scoring candidates. Defaults
                to trainset when omitted. Must be non-empty if provided.
            candidate_selector: Optional selector strategy for Pareto-aware
                candidate sampling. When provided, initializes ParetoState
                for multi-objective tracking.
            component_selector: Optional selector strategy for choosing which
                components to update. Defaults to RoundRobinComponentSelector.
            evaluation_policy: Optional policy for selecting which validation
                examples to evaluate per iteration. Defaults to FullEvaluationPolicy.
            merge_proposer: Optional proposer for merge operations. If provided
                and config.use_merge is True, merge proposals will be attempted
                after successful mutations.
            rng: Optional seeded random.Random instance for deterministic engine
                decisions. When provided, used for the auto-created merge proposer.
                None preserves current random behavior.

        Raises:
            ValueError: If batch is empty, valset is provided but empty,
                or initial_candidate has no components.

        Examples:
            Creating an engine:

            ```python
            engine = AsyncGEPAEngine(
                adapter=my_adapter,
                config=EvolutionConfig(max_iterations=50),
                initial_candidate=Candidate(components={"instruction": "Be helpful"}),
                batch=training_data,
                candidate_selector=selector,
            )
            ```

        Note:
            Configures trainset and valset routing for reflection and scoring.
            Initializes stopper lifecycle tracking for custom stop callbacks.
        """
        # Validation
        if len(batch) == 0:
            raise ValueError("batch must contain at least one data instance")
        if valset is not None and len(valset) == 0:
            raise ValueError(
                "valset must contain at least one validation data instance"
            )

        if not initial_candidate.components:
            raise ValueError("initial_candidate must have at least one component")

        # Store dependencies
        self.adapter = adapter
        self.config = config
        self._initial_candidate = initial_candidate
        self._trainset = batch
        self._valset = valset if valset is not None else batch
        self._state: _EngineState | None = None
        self._rng = rng
        self._candidate_selector = candidate_selector
        self._component_selector = component_selector or RoundRobinComponentSelector()
        self._pareto_state: ParetoState | None = None
        self._candidate_eval_batches: dict[int, EvaluationBatch] = {}
        if merge_proposer is not None:
            self._merge_proposer = merge_proposer
        elif config.use_merge:
            from gepa_adk.engine.merge_proposer import MergeProposer

            self._merge_proposer = MergeProposer(rng=rng or random.Random())
        else:
            self._merge_proposer = None
        self._merges_due: int = 0
        self._merge_invocations: int = 0
        # Stopper state tracking (T001, T002)
        self._start_time: float | None = None
        self._total_evaluations: int = 0
        self._active_stoppers: list[object] = []
        # Import here to avoid circular dependency
        if evaluation_policy is None:
            from gepa_adk.adapters.selection.evaluation_policy import (
                FullEvaluationPolicy,
            )

            self._evaluation_policy: EvaluationPolicyProtocol = FullEvaluationPolicy()
        else:
            self._evaluation_policy = evaluation_policy

    def _aggregate_acceptance_score(self, scores: list[float]) -> float:
        """Aggregate scores for acceptance decisions based on acceptance_metric.

        Args:
            scores: List of per-example scores from evaluation batch.

        Returns:
            Aggregated acceptance score (sum or mean based on config).

        Raises:
            InvalidScoreListError: If scores list is empty or contains
                non-finite values.

        Note:
            Sums or averages acceptance scores after validating they are
            non-empty and finite. Uses sum or mean based on config.acceptance_metric.
        """
        # Validate scores are non-empty
        if not scores:
            raise InvalidScoreListError(
                "Cannot aggregate acceptance score from empty score list",
                scores=scores,
                reason="empty",
            )

        # Validate scores are finite
        if not all(math.isfinite(score) for score in scores):
            raise InvalidScoreListError(
                "Cannot aggregate acceptance score from non-finite values (NaN/inf)",
                scores=scores,
                reason="non-finite",
            )

        # Aggregate based on acceptance_metric
        if self.config.acceptance_metric == "sum":
            return sum(scores)
        else:  # acceptance_metric == "mean"
            return sum(scores) / len(scores)

    def _setup_stoppers(self) -> list[object]:
        """Call setup() on stoppers that have lifecycle methods.

        Returns:
            List of stoppers that had setup() called (for cleanup in reverse order).

        Note:
            Only invokes setup() on stoppers implementing the lifecycle method.
            Stoppers that fail setup() are excluded from stop_callbacks for the
            remainder of execution to prevent inconsistent state.
        """
        setup_stoppers: list[object] = []
        active_stoppers: list[object] = []
        stop_callbacks = self.config.stop_callbacks
        if stop_callbacks:
            for stopper in stop_callbacks:
                setup_method = getattr(stopper, "setup", None)
                if setup_method is not None and callable(setup_method):
                    try:
                        setup_method()
                        setup_stoppers.append(stopper)
                        active_stoppers.append(stopper)
                    except Exception:
                        logger.exception(
                            "stopper.setup_error",
                            stopper=type(stopper).__name__,
                        )
                        # Stopper excluded from active list due to setup failure
                else:
                    # Stopper has no setup() method, still active
                    active_stoppers.append(stopper)
        # Store active stoppers for _should_stop() to use
        self._active_stoppers = active_stoppers
        return setup_stoppers

    def _cleanup_stoppers(self, setup_stoppers: list[object]) -> None:
        """Call cleanup() on stoppers in reverse order of setup.

        Args:
            setup_stoppers: List of stoppers that had setup() called.

        Note:
            Observes reverse-order cleanup contract (T025).
            If cleanup() raises, logs error and continues (T026).
        """
        for stopper in reversed(setup_stoppers):
            cleanup_method = getattr(stopper, "cleanup", None)
            if cleanup_method is not None and callable(cleanup_method):
                try:
                    cleanup_method()
                except Exception:
                    logger.exception(
                        "stopper.cleanup_error",
                        stopper=type(stopper).__name__,
                    )

    def _build_stopper_state(self) -> StopperState:
        """Build a StopperState snapshot from current engine state.

        Constructs an immutable snapshot of evolution state for stopper
        callbacks to evaluate. Captures all metrics needed by stoppers
        including elapsed time and total evaluations.

        Returns:
            Frozen StopperState containing current iteration, best score,
            stagnation counter, total evaluations, candidates count, and
            elapsed time.

        Note:
            Obtains elapsed_seconds from monotonic time since run() started.
            Uses zero if _start_time has not yet been set.
        """
        assert self._state is not None, "Engine state not initialized"
        elapsed = (
            time.monotonic() - self._start_time if self._start_time is not None else 0.0
        )
        candidates_count = (
            len(self._pareto_state.candidates) if self._pareto_state is not None else 0
        )
        return StopperState(
            iteration=self._state.iteration,
            best_score=self._state.best_score,
            stagnation_counter=self._state.stagnation_counter,
            total_evaluations=self._total_evaluations,
            candidates_count=candidates_count,
            elapsed_seconds=elapsed,
        )

    @property
    def pareto_state(self) -> ParetoState | None:
        """Return the current Pareto state, if initialized."""
        return self._pareto_state

    def _build_component_list(self, candidate: Candidate) -> list[str]:
        """Build list of available component keys from candidate.

        Excludes generic 'instruction' alias if agent-specific keys exist
        (e.g., 'agent1_instruction').

        Args:
            candidate: Candidate to extract component keys from.

        Returns:
            List of component keys to consider for update.

        Note:
            Selects component keys, filtering out the default component name when
            more specific per-agent component keys are present.
        """
        keys = list(candidate.components.keys())
        if len(keys) > 1 and DEFAULT_COMPONENT_NAME in keys:
            # If multiple keys exist, assume default component might be an alias/proxy
            # or simply one of many.
            # For now, simplistic rule: if other keys exist, exclude default.
            return [k for k in keys if k != DEFAULT_COMPONENT_NAME]
        return keys

    async def _initialize_baseline(self) -> None:
        """Initialize baseline evaluation.

        Evaluates the initial candidate on trainset for reflection and
        on valset for scoring. Caches the reflection batch for use in
        the first mutation proposal.

        Note:
            Sets up both reflection and scoring baselines up front.
        """
        # Create pareto_state before evaluation if candidate_selector exists
        # so that _evaluate_scoring can use evaluation_policy
        if self._candidate_selector is not None:
            self._pareto_state = ParetoState(frontier_type=self.config.frontier_type)

        reflection_batch = await self.adapter.evaluate(
            self._trainset,
            self._initial_candidate.components,
            capture_traces=True,
        )
        self._total_evaluations += len(reflection_batch.scores)
        # Use _evaluate_scoring for baseline to get eval_indices
        (
            baseline_score,
            scoring_batch,
            baseline_eval_indices,
        ) = await self._evaluate_scoring(self._initial_candidate)
        baseline_reflection_score = sum(reflection_batch.scores) / len(
            reflection_batch.scores
        )
        baseline_valset_mean = (
            sum(scoring_batch.scores) / len(scoring_batch.scores)
            if scoring_batch.scores
            else 0.0
        )
        self._state = _EngineState(
            best_candidate=self._initial_candidate,
            best_score=baseline_score,
            original_score=baseline_score,
            iteration=0,
            stagnation_counter=0,
            iteration_history=[],
            last_eval_batch=reflection_batch,
            best_reflection_score=baseline_reflection_score,
            best_valset_mean=baseline_valset_mean,
            best_objective_scores=scoring_batch.objective_scores,
        )
        if self._candidate_selector is not None:
            # Prepare objective scores for baseline if needed
            objective_scores: dict[str, float] | None = None
            per_example_objective_scores: dict[int, dict[str, float]] | None = None

            if scoring_batch.objective_scores is not None:
                from statistics import fmean

                if self.config.frontier_type in (
                    FrontierType.OBJECTIVE,
                    FrontierType.HYBRID,
                ):
                    objective_scores_by_name: dict[str, list[float]] = {}
                    for obj_scores in scoring_batch.objective_scores:
                        for obj_name, obj_score in obj_scores.items():
                            objective_scores_by_name.setdefault(obj_name, []).append(
                                obj_score
                            )
                    objective_scores = {
                        obj_name: fmean(scores)
                        for obj_name, scores in objective_scores_by_name.items()
                    }

                if self.config.frontier_type == FrontierType.CARTESIAN:
                    per_example_objective_scores = {
                        baseline_eval_indices[i]: scoring_batch.objective_scores[i]
                        for i in range(len(baseline_eval_indices))
                    }
                    objective_scores_by_name: dict[str, list[float]] = {}
                    for obj_scores in scoring_batch.objective_scores:
                        for obj_name, obj_score in obj_scores.items():
                            objective_scores_by_name.setdefault(obj_name, []).append(
                                obj_score
                            )
                    objective_scores = {
                        obj_name: fmean(scores)
                        for obj_name, scores in objective_scores_by_name.items()
                    }

            assert self._pareto_state is not None, "Pareto state not initialized"
            candidate_idx = self._pareto_state.add_candidate(
                self._initial_candidate,
                scoring_batch.scores,
                score_indices=baseline_eval_indices,
                objective_scores=objective_scores,
                per_example_objective_scores=per_example_objective_scores,
                logger=logger,
            )
            self._candidate_eval_batches[candidate_idx] = reflection_batch

    async def _evaluate_reflection(
        self, candidate: Candidate
    ) -> tuple[float, EvaluationBatch]:
        """Evaluate a candidate on the trainset for reflection.

        Args:
            candidate: Candidate to evaluate.

        Returns:
            Tuple of (mean score across trainset examples, evaluation batch).

        Note:
            Supplies trajectories for reflective dataset construction.
        """
        eval_batch = await self.adapter.evaluate(
            self._trainset,
            candidate.components,
            capture_traces=True,
        )
        self._total_evaluations += len(eval_batch.scores)
        score = sum(eval_batch.scores) / len(eval_batch.scores)
        return score, eval_batch

    async def _evaluate_scoring(
        self, candidate: Candidate
    ) -> tuple[float, EvaluationBatch, list[int]]:
        """Evaluate a candidate on the valset for scoring decisions.

        Args:
            candidate: Candidate to evaluate on the validation set.

        Returns:
            Tuple of (aggregated acceptance score, evaluation batch, eval_indices).
            Score is aggregated using acceptance_metric (sum or mean).
            eval_indices are the valset indices that were actually evaluated.

        Note:
            Supplies scores without traces for acceptance decisions.
            Aggregation method (sum/mean) is determined by config.acceptance_metric.
            Uses evaluation_policy to determine which examples to evaluate.
        """
        # Get indices to evaluate from evaluation policy
        valset_ids = list(range(len(self._valset)))
        if self._pareto_state is not None:
            eval_indices = self._evaluation_policy.get_eval_batch(
                valset_ids, self._pareto_state
            )
        else:
            # Fallback to all indices if no pareto state yet
            eval_indices = valset_ids

        # Filter valset to only include selected indices
        is_full_eval = len(eval_indices) == len(valset_ids) and set(
            eval_indices
        ) == set(valset_ids)
        eval_valset = (
            self._valset if is_full_eval else [self._valset[i] for i in eval_indices]
        )

        eval_batch = await self.adapter.evaluate(
            eval_valset,
            candidate.components,
            capture_traces=False,
        )
        self._total_evaluations += len(eval_batch.scores)
        score = self._aggregate_acceptance_score(eval_batch.scores)
        return score, eval_batch, eval_indices

    async def _propose_mutation(self) -> tuple[Candidate, list[str]]:
        """Propose a new candidate via reflective mutation.

        Uses the cached evaluation batch from the most recent best candidate
        evaluation to generate the reflective dataset, avoiding redundant
        adapter calls.

        Returns:
            Tuple of (new candidate with proposed component updates,
            list of component names that were updated).

        Note:
            Spawns a new candidate with updated components based on reflective
            dataset analysis and component selector strategy.
        """
        assert self._state is not None, "Engine state not initialized"
        assert self._state.last_eval_batch is not None, "No eval batch cached"

        selected_candidate = self._state.best_candidate
        selected_idx: int | None = None
        eval_batch = self._state.last_eval_batch

        if self._candidate_selector is not None and self._pareto_state is not None:
            try:
                selected_idx = await self._candidate_selector.select_candidate(
                    self._pareto_state
                )
                selected_candidate = self._pareto_state.candidates[selected_idx]
                eval_batch = self._candidate_eval_batches.get(selected_idx)
                logger.info(
                    "pareto_selection.mutation_parent_selected",
                    candidate_idx=selected_idx,
                    iteration=self._state.iteration,
                    selector_type=type(self._candidate_selector).__name__,
                )
            except NoCandidateAvailableError as exc:
                logger.info(
                    "pareto_selection.empty_frontier_fallback",
                    iteration=self._state.iteration,
                    selector_type=type(self._candidate_selector).__name__,
                    error=str(exc),
                )
                eval_batch = self._state.last_eval_batch

        if eval_batch is None:
            eval_batch = await self.adapter.evaluate(
                self._trainset,
                selected_candidate.components,
                capture_traces=True,
            )
            self._total_evaluations += len(eval_batch.scores)
            if selected_idx is not None:
                self._candidate_eval_batches[selected_idx] = eval_batch

        # Build component list
        available_components = self._build_component_list(selected_candidate)

        # Select components to update
        components_to_update = await self._component_selector.select_components(
            components=available_components,
            iteration=self._state.iteration,
            candidate_idx=selected_idx if selected_idx is not None else 0,
        )

        logger.info(
            "mutation.components_selected",
            iteration=self._state.iteration,
            components=components_to_update,
            selector=type(self._component_selector).__name__,
        )

        # Build reflective dataset
        reflective_dataset = await self.adapter.make_reflective_dataset(
            selected_candidate.components,
            eval_batch,
            components_to_update,
        )

        # Propose new texts
        proposed_components = await self.adapter.propose_new_texts(
            selected_candidate.components,
            reflective_dataset,
            components_to_update,
        )

        # Create new candidate with proposed components
        new_components = dict(selected_candidate.components)
        new_components.update(proposed_components)
        return (
            Candidate(
                components=new_components,
                generation=selected_candidate.generation,
                parent_id=selected_candidate.parent_id,
            ),
            components_to_update,
        )

    def _record_iteration(
        self,
        score: float,
        component_text: str,
        evolved_component: str,
        accepted: bool,
        objective_scores: list[dict[str, float]] | None = None,
        reflection_reasoning: str | None = None,
    ) -> None:
        """Record iteration outcome.

        Args:
            score: Score achieved in this iteration.
            component_text: The text of the component that was evaluated.
            evolved_component: The name of the component that was evolved
                (e.g., "instruction", "output_schema").
            accepted: Whether proposal was accepted.
            objective_scores: Optional objective scores from this iteration's
                evaluation. None when adapter does not provide objective scores.
            reflection_reasoning: Optional natural language reasoning from
                the reflection agent explaining the mutation. None when
                reasoning is not available.

        Note:
            Stores an IterationRecord in the engine state's iteration_history,
            preserving chronological evolution trace for analysis.
        """
        assert self._state is not None, "Engine state not initialized"
        record = IterationRecord(
            iteration_number=self._state.iteration,
            score=score,
            component_text=component_text,
            evolved_component=evolved_component,
            accepted=accepted,
            objective_scores=objective_scores,
            reflection_reasoning=reflection_reasoning,
        )
        self._state.iteration_history.append(record)

    def _should_stop(self) -> StopReason | None:
        """Check if evolution should terminate.

        Returns:
            The ``StopReason`` that triggered termination, or ``None`` if
            evolution should continue. Conditions are checked in priority
            order: max iterations, early stopping patience, custom stoppers.

        Note:
            Only active stoppers (those that passed setup or have no setup
            method) are invoked. Patience-based early stopping maps to
            ``MAX_ITERATIONS`` because it is a built-in convergence
            criterion, not a user-provided custom stopper.
        """
        assert self._state is not None, "Engine state not initialized"
        # Condition 1: Max iterations reached (built-in, fast path)
        if self._state.iteration >= self.config.max_iterations:
            return StopReason.MAX_ITERATIONS

        # Condition 2: Early stopping (patience exhausted, built-in)
        if self.config.patience > 0:
            if self._state.stagnation_counter >= self.config.patience:
                return StopReason.MAX_ITERATIONS

        # Condition 3: Custom stoppers (T010-T013)
        # Use _active_stoppers which excludes stoppers that failed setup
        active_stoppers = getattr(self, "_active_stoppers", None)
        if active_stoppers:
            stopper_state = self._build_stopper_state()
            for stopper in active_stoppers:
                try:
                    if stopper(stopper_state):
                        # Log stopper trigger (T013)
                        logger.info(
                            "stopper.triggered",
                            stopper=type(stopper).__name__,
                            iteration=self._state.iteration,
                        )
                        return StopReason.STOPPER_TRIGGERED
                except Exception:
                    # T032: Handle stopper exception gracefully
                    logger.exception(
                        "stopper.error",
                        stopper=type(stopper).__name__,
                        iteration=self._state.iteration,
                    )
                    # Continue checking other stoppers

        return None

    def _should_accept(self, proposal_score: float, best_score: float) -> bool:
        """Check if proposal should be accepted.

        Args:
            proposal_score: Score of the proposed candidate.
            best_score: Current best score.

        Returns:
            True if proposal_score > best_score + min_improvement_threshold.

        Note:
            Signals True when proposal exceeds best score by the configured
            improvement threshold, enabling configurable acceptance sensitivity.
        """
        threshold = self.config.min_improvement_threshold
        return proposal_score > best_score + threshold

    def _validate_schema_component(self, proposal: Candidate) -> bool:
        """Validate output_schema component if present.

        Validates that proposed schema text is syntactically correct and
        structurally valid (inherits from BaseModel, no imports/functions).
        Invalid schemas are rejected to prevent evolution from accepting
        non-functional schema proposals.

        Args:
            proposal: Candidate containing components to validate.

        Returns:
            True if valid or no output_schema component present.
            False if output_schema validation fails.

        Note:
            This validation runs before expensive evaluation to reject
            invalid schemas early. Security checks (no imports, no functions)
            are enforced to prevent code injection.
        """
        if "output_schema" not in proposal.components:
            return True

        schema_text = proposal.components["output_schema"]

        try:
            # Import here to avoid circular dependency at module load
            from gepa_adk.utils.schema_utils import validate_schema_text

            validate_schema_text(schema_text)
            logger.debug(
                "schema_validation.passed",
                iteration=self._state.iteration if self._state else None,
            )
            return True
        except SchemaValidationError as e:
            logger.warning(
                "schema_validation.rejected",
                iteration=self._state.iteration if self._state else None,
                validation_stage=e.validation_stage,
                line_number=e.line_number,
                error=e.validation_error,
            )
            return False

    def _accept_proposal(
        self,
        proposal: Candidate,
        score: float,
        eval_batch: EvaluationBatch,
        *,
        candidate_idx: int | None = None,
        reflection_score: float | None = None,
        valset_mean: float | None = None,
        objective_scores: list[dict[str, float]] | None = None,
    ) -> None:
        """Accept a proposal and update state.

        Args:
            proposal: Proposed candidate to accept.
            score: Acceptance score of the proposed candidate (sum or mean).
            eval_batch: Reflection batch from proposal evaluation (cached for
                next iteration's reflective dataset generation).
            candidate_idx: Optional ParetoState candidate index to update with
                lineage metadata.
            reflection_score: Optional trainset score to store with best
                candidate metadata.
            valset_mean: Optional valset mean score to track separately from
                acceptance score.
            objective_scores: Optional objective scores from scoring batch.
                None when adapter does not provide objective scores.

        Note:
            Swaps cached reflection batch for next proposal iteration.
            Tracks acceptance score and valset mean separately.
        """
        assert self._state is not None, "Engine state not initialized"
        # Create new candidate with lineage
        new_candidate = Candidate(
            components=dict(proposal.components),
            generation=self._state.best_candidate.generation + 1,
            parent_id=f"gen-{self._state.best_candidate.generation}",
        )
        if candidate_idx is not None and self._pareto_state is not None:
            self._pareto_state.candidates[candidate_idx] = new_candidate
        self._state.best_candidate = new_candidate
        self._state.best_score = score
        self._state.stagnation_counter = 0
        self._state.last_eval_batch = eval_batch
        if reflection_score is not None:
            self._state.best_reflection_score = reflection_score
        if valset_mean is not None:
            self._state.best_valset_mean = valset_mean
        self._state.best_objective_scores = objective_scores

    def _build_result(
        self, stop_reason: StopReason = StopReason.COMPLETED
    ) -> EvolutionResult:
        """Build final result from current state.

        Args:
            stop_reason: Why the evolution run terminated.

        Returns:
            Frozen EvolutionResult with all metrics and original_components.

        Note:
            Synthesizes a frozen EvolutionResult containing all evolution metrics,
            history, and original_components snapshot, suitable for immutable
            result reporting. The evolved_components dict contains all component
            values from the best candidate.
        """
        assert self._state is not None, "Engine state not initialized"
        return EvolutionResult(
            stop_reason=stop_reason,
            original_score=self._state.original_score,
            final_score=self._state.best_score,
            evolved_components=dict(self._state.best_candidate.components),
            iteration_history=self._state.iteration_history,
            total_iterations=self._state.iteration,
            valset_score=self._state.best_valset_mean,
            trainset_score=self._state.best_reflection_score,
            objective_scores=self._state.best_objective_scores,
            original_components=dict(self._initial_candidate.components),
        )

    async def run(self) -> EvolutionResult:
        """Execute the evolution loop.

        Runs the core evolution loop:
        1. Evaluate baseline candidate
        2. For each iteration until max_iterations or convergence:
           a. Generate reflective dataset from traces
           b. Propose new candidate text
           c. Evaluate proposal
           d. Accept if improves above threshold
           e. Record iteration
        3. Return frozen EvolutionResult
        4. On ``KeyboardInterrupt`` or ``asyncio.CancelledError``, return
           partial result with best-so-far components

        Returns:
            EvolutionResult containing evolution metrics and components.
            On normal completion, ``stop_reason`` reflects the termination
            condition. On interrupt, a partial result is returned with
            ``StopReason.KEYBOARD_INTERRUPT`` or ``StopReason.CANCELLED``.

        Raises:
            KeyboardInterrupt: Re-raised if interrupt occurs before baseline
                evaluation completes (no meaningful partial result possible).
            asyncio.CancelledError: Re-raised if cancellation occurs before
                baseline evaluation completes.
            Exception: Adapter ``Exception`` subclasses propagate unchanged.

        Examples:
            Running evolution:

            ```python
            result = await engine.run()
            print(f"Improved: {result.improved}")
            print(f"Best score: {result.final_score}")
            ```

        Note:
            Outputs a frozen EvolutionResult after completing the evolution
            loop. Engine instance should not be reused after run() completes.
            Method is idempotent if called multiple times (restarts fresh).
            Fail-fast behavior: adapter ``Exception`` subclasses propagate
            unchanged. ``KeyboardInterrupt`` and ``asyncio.CancelledError``
            (``BaseException`` subclasses) are caught and converted to partial
            results with appropriate ``StopReason``. Logs seed value at start
            for reproducibility tracking.
        """
        logger.info("engine.start", seed=self.config.seed)

        # Initialize stopper state tracking (T004)
        self._start_time = time.monotonic()
        self._total_evaluations = 0

        # Setup stopper lifecycle (T023)
        setup_stoppers = self._setup_stoppers()

        try:
            return await self._run_evolution_loop()
        except KeyboardInterrupt:
            logger.info(
                "evolution.interrupted",
                iteration=self._state.iteration if self._state else 0,
            )
            if self._state is None:
                raise
            return self._build_result(stop_reason=StopReason.KEYBOARD_INTERRUPT)
        except asyncio.CancelledError:
            logger.info(
                "evolution.cancelled",
                iteration=self._state.iteration if self._state else 0,
            )
            if self._state is None:
                raise
            return self._build_result(stop_reason=StopReason.CANCELLED)
        finally:
            # Cleanup stopper lifecycle (T024)
            self._cleanup_stoppers(setup_stoppers)

    async def _run_evolution_loop(self) -> EvolutionResult:
        """Execute the core evolution loop.

        This method contains the actual evolution loop logic, separated
        from lifecycle management for clean try/finally handling.

        Returns:
            EvolutionResult with evolution outcomes.

        Note:
            Only called from run(). Handles the evolution loop body
            while run() manages stopper lifecycle. The loop tracks
            ``StopReason`` to report why evolution terminated.
            Each iteration records ``reflection_reasoning`` from the
            adapter's proposer via a ``getattr`` chain when available.
        """
        # Initialize baseline
        await self._initialize_baseline()
        assert self._state is not None, "Engine state not initialized"

        # Evolution loop
        stop_reason = self._should_stop()
        while stop_reason is None:
            self._state.iteration += 1

            # Propose mutation (returns candidate and list of components evolved)
            proposal, evolved_components_list = await self._propose_mutation()

            # Validate schema component if present (reject invalid early)
            if not self._validate_schema_component(proposal):
                # Invalid schema - skip evaluation and count as stagnation
                self._state.stagnation_counter += 1
                logger.debug(
                    "evolution.proposal_skipped",
                    iteration=self._state.iteration,
                    reason="schema_validation_failed",
                )
                continue

            # Evaluate proposal
            reflection_score, reflection_batch = await self._evaluate_reflection(
                proposal
            )
            proposal_score, scoring_batch, eval_indices = await self._evaluate_scoring(
                proposal
            )

            candidate_idx = None
            if self._pareto_state is not None:
                # Use eval_indices returned from _evaluate_scoring (T066)
                # Prepare objective scores if available (T068)
                objective_scores: dict[str, float] | None = None
                per_example_objective_scores: dict[int, dict[str, float]] | None = None

                if scoring_batch.objective_scores is not None:
                    from statistics import fmean

                    if self.config.frontier_type in (
                        FrontierType.OBJECTIVE,
                        FrontierType.HYBRID,
                    ):
                        # Aggregate objective scores across evaluated examples
                        objective_scores_accum: dict[str, list[float]] = {}
                        for obj_scores in scoring_batch.objective_scores:
                            for obj_name, obj_score in obj_scores.items():
                                objective_scores_accum.setdefault(obj_name, []).append(
                                    obj_score
                                )
                        # Take mean per objective
                        objective_scores = {
                            obj_name: fmean(scores)
                            for obj_name, scores in objective_scores_accum.items()
                        }

                    if self.config.frontier_type == FrontierType.CARTESIAN:
                        # For CARTESIAN, need per-example objective scores mapped to valset indices
                        per_example_objective_scores = {
                            eval_indices[i]: scoring_batch.objective_scores[i]
                            for i in range(len(eval_indices))
                        }
                        # Also need aggregated for validation
                        objective_scores_by_name: dict[str, list[float]] = {}
                        for obj_scores in scoring_batch.objective_scores:
                            for obj_name, obj_score in obj_scores.items():
                                objective_scores_by_name.setdefault(
                                    obj_name, []
                                ).append(obj_score)
                        objective_scores = {
                            obj_name: fmean(scores)
                            for obj_name, scores in objective_scores_by_name.items()
                        }

                # Determine parent indices for genealogy tracking
                parent_indices: list[int] | None = None
                if self._candidate_selector is not None:
                    try:
                        parent_idx = await self._candidate_selector.select_candidate(
                            self._pareto_state
                        )
                        parent_indices = [parent_idx]
                    except NoCandidateAvailableError:
                        parent_indices = None
                else:
                    # Use best candidate as parent
                    if self._pareto_state.best_average_idx is not None:
                        parent_indices = [self._pareto_state.best_average_idx]

                # Pass scores with correct index mapping (T066)
                candidate_idx = self._pareto_state.add_candidate(
                    proposal,
                    scoring_batch.scores,
                    score_indices=eval_indices,
                    objective_scores=objective_scores,
                    per_example_objective_scores=per_example_objective_scores,
                    parent_indices=parent_indices,
                    logger=logger,
                )
                self._candidate_eval_batches[candidate_idx] = reflection_batch
                logger.info(
                    "pareto_frontier.candidate_added",
                    candidate_idx=candidate_idx,
                    iteration=self._state.iteration,
                )

            # Calculate valset mean using only evaluated scores (T067)
            valset_mean = (
                sum(scoring_batch.scores) / len(scoring_batch.scores)
                if scoring_batch.scores
                else 0.0
            )

            # Accept if improves above threshold
            accepted = self._should_accept(proposal_score, self._state.best_score)
            if accepted:
                self._accept_proposal(
                    proposal,
                    proposal_score,
                    reflection_batch,
                    candidate_idx=candidate_idx,
                    reflection_score=reflection_score,
                    valset_mean=valset_mean,
                    objective_scores=scoring_batch.objective_scores,
                )
                # Schedule merge if enabled
                if (
                    self.config.use_merge
                    and self._merge_proposer is not None
                    and self._merge_invocations < self.config.max_merge_invocations
                ):
                    self._merges_due += 1
                    logger.debug(
                        "merge_scheduling.merge_scheduled",
                        iteration=self._state.iteration,
                        merges_due=self._merges_due,
                    )
            else:
                # Increment stagnation counter on rejection
                self._state.stagnation_counter += 1

            # Attempt merge if scheduled
            if (
                self._merges_due > 0
                and self._merge_proposer is not None
                and self._pareto_state is not None
                and self._merge_invocations < self.config.max_merge_invocations
            ):
                merge_result = await self._merge_proposer.propose(self._pareto_state)
                if merge_result is not None:
                    self._merges_due -= 1
                    self._merge_invocations += 1
                    logger.info(
                        "merge_scheduling.merge_attempted",
                        iteration=self._state.iteration,
                        parent_indices=merge_result.parent_indices,
                        ancestor_idx=merge_result.metadata.get("ancestor_idx"),
                        merges_due=self._merges_due,
                        total_invocations=self._merge_invocations,
                    )
                    # Validate schema component in merge proposal
                    if not self._validate_schema_component(merge_result.candidate):
                        logger.debug(
                            "merge.proposal_skipped",
                            iteration=self._state.iteration,
                            reason="schema_validation_failed",
                        )
                        continue
                    # Evaluate merge proposal
                    (
                        merge_reflection_score,
                        merge_reflection_batch,
                    ) = await self._evaluate_reflection(merge_result.candidate)
                    (
                        merge_proposal_score,
                        merge_scoring_batch,
                        merge_eval_indices,
                    ) = await self._evaluate_scoring(merge_result.candidate)

                    # Add merge candidate to ParetoState
                    merge_candidate_idx = None
                    assert self._pareto_state is not None, (
                        "Pareto state not initialized"
                    )
                    merge_objective_scores: dict[str, float] | None = None
                    merge_per_example_objective_scores: (
                        dict[int, dict[str, float]] | None
                    ) = None

                    if merge_scoring_batch.objective_scores is not None:
                        from statistics import fmean

                        if self.config.frontier_type in (
                            FrontierType.OBJECTIVE,
                            FrontierType.HYBRID,
                        ):
                            merge_objective_scores_accum: dict[str, list[float]] = {}
                            for obj_scores in merge_scoring_batch.objective_scores:
                                for obj_name, obj_score in obj_scores.items():
                                    merge_objective_scores_accum.setdefault(
                                        obj_name, []
                                    ).append(obj_score)
                            merge_objective_scores = {
                                obj_name: fmean(scores)
                                for obj_name, scores in merge_objective_scores_accum.items()
                            }

                        if self.config.frontier_type == FrontierType.CARTESIAN:
                            merge_per_example_objective_scores = {
                                merge_eval_indices[
                                    i
                                ]: merge_scoring_batch.objective_scores[i]
                                for i in range(len(merge_eval_indices))
                            }
                            merge_objective_scores_by_name: dict[str, list[float]] = {}
                            for obj_scores in merge_scoring_batch.objective_scores:
                                for obj_name, obj_score in obj_scores.items():
                                    merge_objective_scores_by_name.setdefault(
                                        obj_name, []
                                    ).append(obj_score)
                            merge_objective_scores = {
                                obj_name: fmean(scores)
                                for obj_name, scores in merge_objective_scores_by_name.items()
                            }

                    merge_candidate_idx = self._pareto_state.add_candidate(
                        merge_result.candidate,
                        merge_scoring_batch.scores,
                        score_indices=merge_eval_indices,
                        objective_scores=merge_objective_scores,
                        per_example_objective_scores=merge_per_example_objective_scores,
                        parent_indices=merge_result.parent_indices,
                        logger=logger,
                    )
                    self._candidate_eval_batches[merge_candidate_idx] = (
                        merge_reflection_batch
                    )

                    merge_valset_mean = (
                        sum(merge_scoring_batch.scores)
                        / len(merge_scoring_batch.scores)
                        if merge_scoring_batch.scores
                        else 0.0
                    )

                    # Accept merge if improves
                    merge_accepted = self._should_accept(
                        merge_proposal_score, self._state.best_score
                    )
                    if merge_accepted:
                        self._accept_proposal(
                            merge_result.candidate,
                            merge_proposal_score,
                            merge_reflection_batch,
                            candidate_idx=merge_candidate_idx,
                            reflection_score=merge_reflection_score,
                            valset_mean=merge_valset_mean,
                            objective_scores=merge_scoring_batch.objective_scores,
                        )
                        logger.info(
                            "merge_scheduling.merge_accepted",
                            iteration=self._state.iteration,
                            merge_score=merge_proposal_score,
                        )
                    else:
                        logger.debug(
                            "merge_scheduling.merge_rejected",
                            iteration=self._state.iteration,
                            merge_score=merge_proposal_score,
                            best_score=self._state.best_score,
                        )
                else:
                    # Merge not possible, decrement counter
                    if self._merges_due > 0:
                        self._merges_due -= 1

            # Record iteration with actual evolved component name (T033)
            # For single-component evolution, use the first (and only) component.
            # For multi-component round-robin, this tracks which component was
            # evolved in this iteration.
            if evolved_components_list:
                evolved_component_name = evolved_components_list[0]
            else:
                # Empty list indicates logic error - use first key from proposal
                logger.warning(
                    "engine.empty_evolved_components_list",
                    iteration=self._state.iteration,
                    proposal_keys=list(proposal.components.keys()),
                )
                evolved_component_name = next(iter(proposal.components.keys()))
            self._record_iteration(
                score=proposal_score,
                component_text=proposal.components.get(evolved_component_name, ""),
                evolved_component=evolved_component_name,
                accepted=accepted,
                objective_scores=scoring_batch.objective_scores,
                # TODO: Surface last_reasoning through adapter protocol instead
                # of reaching into private _proposer attribute. Safe (defaults
                # to None) but couples engine to adapter internals.
                reflection_reasoning=getattr(
                    getattr(self.adapter, "_proposer", None),
                    "last_reasoning",
                    None,
                ),
            )

            stop_reason = self._should_stop()

        # Build and return result
        return self._build_result(stop_reason=stop_reason)

pareto_state property

pareto_state: ParetoState | None

Return the current Pareto state, if initialized.

__init__

__init__(
    adapter: AsyncGEPAAdapter[
        DataInst, Trajectory, RolloutOutput
    ],
    config: EvolutionConfig,
    initial_candidate: Candidate,
    batch: list[DataInst],
    valset: list[DataInst] | None = None,
    candidate_selector: CandidateSelectorProtocol
    | None = None,
    component_selector: ComponentSelectorProtocol
    | None = None,
    evaluation_policy: EvaluationPolicyProtocol
    | None = None,
    merge_proposer: ProposerProtocol | None = None,
    rng: Random | None = None,
) -> None

Initialize the evolution engine.

PARAMETER DESCRIPTION
adapter

Implementation of AsyncGEPAAdapter protocol for evaluation and proposal generation.

TYPE: AsyncGEPAAdapter[DataInst, Trajectory, RolloutOutput]

config

Evolution parameters controlling iterations, thresholds, and early stopping.

TYPE: EvolutionConfig

initial_candidate

Starting candidate with at least one component.

TYPE: Candidate

batch

Trainset data instances for reflection and mutation.

TYPE: list[DataInst]

valset

Optional validation data for scoring candidates. Defaults to trainset when omitted. Must be non-empty if provided.

TYPE: list[DataInst] | None DEFAULT: None

candidate_selector

Optional selector strategy for Pareto-aware candidate sampling. When provided, initializes ParetoState for multi-objective tracking.

TYPE: CandidateSelectorProtocol | None DEFAULT: None

component_selector

Optional selector strategy for choosing which components to update. Defaults to RoundRobinComponentSelector.

TYPE: ComponentSelectorProtocol | None DEFAULT: None

evaluation_policy

Optional policy for selecting which validation examples to evaluate per iteration. Defaults to FullEvaluationPolicy.

TYPE: EvaluationPolicyProtocol | None DEFAULT: None

merge_proposer

Optional proposer for merge operations. If provided and config.use_merge is True, merge proposals will be attempted after successful mutations.

TYPE: ProposerProtocol | None DEFAULT: None

rng

Optional seeded random.Random instance for deterministic engine decisions. When provided, used for the auto-created merge proposer. None preserves current random behavior.

TYPE: Random | None DEFAULT: None

RAISES DESCRIPTION
ValueError

If batch is empty, valset is provided but empty, or initial_candidate has no components.

Examples:

Creating an engine:

engine = AsyncGEPAEngine(
    adapter=my_adapter,
    config=EvolutionConfig(max_iterations=50),
    initial_candidate=Candidate(components={"instruction": "Be helpful"}),
    batch=training_data,
    candidate_selector=selector,
)
Note

Configures trainset and valset routing for reflection and scoring. Initializes stopper lifecycle tracking for custom stop callbacks.

Source code in src/gepa_adk/engine/async_engine.py
def __init__(
    self,
    adapter: AsyncGEPAAdapter[DataInst, Trajectory, RolloutOutput],
    config: EvolutionConfig,
    initial_candidate: Candidate,
    batch: list[DataInst],
    valset: list[DataInst] | None = None,
    candidate_selector: CandidateSelectorProtocol | None = None,
    component_selector: ComponentSelectorProtocol | None = None,
    evaluation_policy: EvaluationPolicyProtocol | None = None,
    merge_proposer: ProposerProtocol | None = None,
    rng: random.Random | None = None,
) -> None:
    """Initialize the evolution engine.

    Args:
        adapter: Implementation of AsyncGEPAAdapter protocol for evaluation
            and proposal generation.
        config: Evolution parameters controlling iterations, thresholds,
            and early stopping.
        initial_candidate: Starting candidate with at least one component.
        batch: Trainset data instances for reflection and mutation.
        valset: Optional validation data for scoring candidates. Defaults
            to trainset when omitted. Must be non-empty if provided.
        candidate_selector: Optional selector strategy for Pareto-aware
            candidate sampling. When provided, initializes ParetoState
            for multi-objective tracking.
        component_selector: Optional selector strategy for choosing which
            components to update. Defaults to RoundRobinComponentSelector.
        evaluation_policy: Optional policy for selecting which validation
            examples to evaluate per iteration. Defaults to FullEvaluationPolicy.
        merge_proposer: Optional proposer for merge operations. If provided
            and config.use_merge is True, merge proposals will be attempted
            after successful mutations.
        rng: Optional seeded random.Random instance for deterministic engine
            decisions. When provided, used for the auto-created merge proposer.
            None preserves current random behavior.

    Raises:
        ValueError: If batch is empty, valset is provided but empty,
            or initial_candidate has no components.

    Examples:
        Creating an engine:

        ```python
        engine = AsyncGEPAEngine(
            adapter=my_adapter,
            config=EvolutionConfig(max_iterations=50),
            initial_candidate=Candidate(components={"instruction": "Be helpful"}),
            batch=training_data,
            candidate_selector=selector,
        )
        ```

    Note:
        Configures trainset and valset routing for reflection and scoring.
        Initializes stopper lifecycle tracking for custom stop callbacks.
    """
    # Validation
    if len(batch) == 0:
        raise ValueError("batch must contain at least one data instance")
    if valset is not None and len(valset) == 0:
        raise ValueError(
            "valset must contain at least one validation data instance"
        )

    if not initial_candidate.components:
        raise ValueError("initial_candidate must have at least one component")

    # Store dependencies
    self.adapter = adapter
    self.config = config
    self._initial_candidate = initial_candidate
    self._trainset = batch
    self._valset = valset if valset is not None else batch
    self._state: _EngineState | None = None
    self._rng = rng
    self._candidate_selector = candidate_selector
    self._component_selector = component_selector or RoundRobinComponentSelector()
    self._pareto_state: ParetoState | None = None
    self._candidate_eval_batches: dict[int, EvaluationBatch] = {}
    if merge_proposer is not None:
        self._merge_proposer = merge_proposer
    elif config.use_merge:
        from gepa_adk.engine.merge_proposer import MergeProposer

        self._merge_proposer = MergeProposer(rng=rng or random.Random())
    else:
        self._merge_proposer = None
    self._merges_due: int = 0
    self._merge_invocations: int = 0
    # Stopper state tracking (T001, T002)
    self._start_time: float | None = None
    self._total_evaluations: int = 0
    self._active_stoppers: list[object] = []
    # Import here to avoid circular dependency
    if evaluation_policy is None:
        from gepa_adk.adapters.selection.evaluation_policy import (
            FullEvaluationPolicy,
        )

        self._evaluation_policy: EvaluationPolicyProtocol = FullEvaluationPolicy()
    else:
        self._evaluation_policy = evaluation_policy

run async

run() -> EvolutionResult

Execute the evolution loop.

Runs the core evolution loop: 1. Evaluate baseline candidate 2. For each iteration until max_iterations or convergence: a. Generate reflective dataset from traces b. Propose new candidate text c. Evaluate proposal d. Accept if improves above threshold e. Record iteration 3. Return frozen EvolutionResult 4. On KeyboardInterrupt or asyncio.CancelledError, return partial result with best-so-far components

RETURNS DESCRIPTION
EvolutionResult

EvolutionResult containing evolution metrics and components.

EvolutionResult

On normal completion, stop_reason reflects the termination

EvolutionResult

condition. On interrupt, a partial result is returned with

EvolutionResult

StopReason.KEYBOARD_INTERRUPT or StopReason.CANCELLED.

RAISES DESCRIPTION
KeyboardInterrupt

Re-raised if interrupt occurs before baseline evaluation completes (no meaningful partial result possible).

CancelledError

Re-raised if cancellation occurs before baseline evaluation completes.

Exception

Adapter Exception subclasses propagate unchanged.

Examples:

Running evolution:

result = await engine.run()
print(f"Improved: {result.improved}")
print(f"Best score: {result.final_score}")
Note

Outputs a frozen EvolutionResult after completing the evolution loop. Engine instance should not be reused after run() completes. Method is idempotent if called multiple times (restarts fresh). Fail-fast behavior: adapter Exception subclasses propagate unchanged. KeyboardInterrupt and asyncio.CancelledError (BaseException subclasses) are caught and converted to partial results with appropriate StopReason. Logs seed value at start for reproducibility tracking.

Source code in src/gepa_adk/engine/async_engine.py
async def run(self) -> EvolutionResult:
    """Execute the evolution loop.

    Runs the core evolution loop:
    1. Evaluate baseline candidate
    2. For each iteration until max_iterations or convergence:
       a. Generate reflective dataset from traces
       b. Propose new candidate text
       c. Evaluate proposal
       d. Accept if improves above threshold
       e. Record iteration
    3. Return frozen EvolutionResult
    4. On ``KeyboardInterrupt`` or ``asyncio.CancelledError``, return
       partial result with best-so-far components

    Returns:
        EvolutionResult containing evolution metrics and components.
        On normal completion, ``stop_reason`` reflects the termination
        condition. On interrupt, a partial result is returned with
        ``StopReason.KEYBOARD_INTERRUPT`` or ``StopReason.CANCELLED``.

    Raises:
        KeyboardInterrupt: Re-raised if interrupt occurs before baseline
            evaluation completes (no meaningful partial result possible).
        asyncio.CancelledError: Re-raised if cancellation occurs before
            baseline evaluation completes.
        Exception: Adapter ``Exception`` subclasses propagate unchanged.

    Examples:
        Running evolution:

        ```python
        result = await engine.run()
        print(f"Improved: {result.improved}")
        print(f"Best score: {result.final_score}")
        ```

    Note:
        Outputs a frozen EvolutionResult after completing the evolution
        loop. Engine instance should not be reused after run() completes.
        Method is idempotent if called multiple times (restarts fresh).
        Fail-fast behavior: adapter ``Exception`` subclasses propagate
        unchanged. ``KeyboardInterrupt`` and ``asyncio.CancelledError``
        (``BaseException`` subclasses) are caught and converted to partial
        results with appropriate ``StopReason``. Logs seed value at start
        for reproducibility tracking.
    """
    logger.info("engine.start", seed=self.config.seed)

    # Initialize stopper state tracking (T004)
    self._start_time = time.monotonic()
    self._total_evaluations = 0

    # Setup stopper lifecycle (T023)
    setup_stoppers = self._setup_stoppers()

    try:
        return await self._run_evolution_loop()
    except KeyboardInterrupt:
        logger.info(
            "evolution.interrupted",
            iteration=self._state.iteration if self._state else 0,
        )
        if self._state is None:
            raise
        return self._build_result(stop_reason=StopReason.KEYBOARD_INTERRUPT)
    except asyncio.CancelledError:
        logger.info(
            "evolution.cancelled",
            iteration=self._state.iteration if self._state else 0,
        )
        if self._state is None:
            raise
        return self._build_result(stop_reason=StopReason.CANCELLED)
    finally:
        # Cleanup stopper lifecycle (T024)
        self._cleanup_stoppers(setup_stoppers)

MergeProposer

Proposer that combines two Pareto-optimal candidates via genetic crossover.

Selects two candidates from the frontier that share a common ancestor, identifies which components each improved, and creates a merged candidate that combines improvements from both branches.

ATTRIBUTE DESCRIPTION
rng

Random number generator for candidate selection.

TYPE: Random

val_overlap_floor

Minimum overlapping validation coverage required.

TYPE: int

max_attempts

Maximum merge attempts before giving up.

TYPE: int

attempted_merges

Set of attempted merge triplets to prevent duplicates.

TYPE: set[AncestorLog]

Examples:

Creating a merge proposer:

proposer = MergeProposer(rng=random.Random(42))
result = await proposer.propose(state)
if result:
    print(f"Merged from parents {result.parent_indices}")

Note: A proposer that combines two Pareto-optimal candidates via genetic crossover. Selects candidates from the frontier that share a common ancestor and merges their complementary component improvements.

Source code in src/gepa_adk/engine/merge_proposer.py
class MergeProposer:
    """Proposer that combines two Pareto-optimal candidates via genetic crossover.

    Selects two candidates from the frontier that share a common ancestor,
    identifies which components each improved, and creates a merged candidate
    that combines improvements from both branches.

    Attributes:
        rng (random.Random): Random number generator for candidate selection.
        val_overlap_floor (int): Minimum overlapping validation coverage required.
        max_attempts (int): Maximum merge attempts before giving up.
        attempted_merges (set[AncestorLog]): Set of attempted merge triplets to prevent duplicates.

    Examples:
        Creating a merge proposer:

        ```python
        proposer = MergeProposer(rng=random.Random(42))
        result = await proposer.propose(state)
        if result:
            print(f"Merged from parents {result.parent_indices}")
        ```
    Note:
        A proposer that combines two Pareto-optimal candidates via genetic crossover.
        Selects candidates from the frontier that share a common ancestor and merges
        their complementary component improvements.
    """

    def __init__(
        self,
        rng: random.Random,
        val_overlap_floor: int = 5,
        max_attempts: int = 10,
    ) -> None:
        """Initialize MergeProposer.

        Args:
            rng: Random number generator for candidate selection.
            val_overlap_floor: Minimum overlapping validation examples required.
            max_attempts: Maximum merge attempts before giving up.

        Note:
            Creates a new MergeProposer instance with the specified random number
            generator and configuration parameters. The attempted_merges set is
            initialized empty to track merge attempts.
        """
        self.rng = rng
        self.val_overlap_floor = val_overlap_floor
        self.max_attempts = max_attempts
        self.attempted_merges: set[AncestorLog] = set()

    async def propose(
        self,
        state: ParetoState,
        eval_batch: object | None = None,  # Ignored for merge proposals
    ) -> ProposalResult | None:
        """Attempt to merge two frontier candidates.

        Args:
            state (ParetoState): Current Pareto state with candidates, scores, and genealogy.
                Must contain at least 2 candidates on the Pareto frontier with a shared
                common ancestor for merge to succeed.
            eval_batch (object | None): Ignored for merge proposals. Merge operations
                do not require evaluation batch data as they combine existing candidates.

        Returns:
            ProposalResult | None: ProposalResult with merged candidate and both parent indices,
            or None if merge not possible (e.g., no common ancestor, insufficient frontier,
            or no complementary component changes).

        Examples:
            Proposing a merge from evolution state:

            ```python
            proposer = MergeProposer(rng=random.Random(42))
            result = await proposer.propose(state)
            if result:
                print(f"Merged from parents {result.parent_indices}")
                print(f"Ancestor: {result.metadata['ancestor_idx']}")
            ```

        Note:
            Operations select candidates from Pareto frontier only. Requires common ancestor
            and complementary component changes for successful merge. Validates
            minimum validation overlap before merging.
        """
        # Find suitable merge candidates
        merge_candidates = self._find_merge_candidates(state)
        if merge_candidates is None:
            logger.debug("merge_proposer.no_candidates", reason="no_suitable_pair")
            return None

        parent1_idx, parent2_idx, ancestor_idx = merge_candidates

        # Get candidate components
        ancestor = state.candidates[ancestor_idx]
        parent1 = state.candidates[parent1_idx]
        parent2 = state.candidates[parent2_idx]

        # Check if merge is desirable (complementary changes)
        if not has_desirable_predictors(
            ancestor.components, parent1.components, parent2.components
        ):
            logger.debug(
                "merge_proposer.no_desirable_predictors",
                parent1_idx=parent1_idx,
                parent2_idx=parent2_idx,
                ancestor_idx=ancestor_idx,
            )
            return None

        # Check validation overlap
        scores1 = state.candidate_scores.get(parent1_idx, {})
        scores2 = state.candidate_scores.get(parent2_idx, {})
        overlap = set(scores1.keys()) & set(scores2.keys())
        valid_overlap = {idx for idx in overlap if isinstance(idx, int) and idx >= 0}
        if len(valid_overlap) < self.val_overlap_floor:
            logger.debug(
                "merge_proposer.insufficient_overlap",
                parent1_idx=parent1_idx,
                parent2_idx=parent2_idx,
                overlap_count=len(valid_overlap),
                required=self.val_overlap_floor,
            )
            return None

        # Calculate average scores for component selection
        avg_score1 = fmean(scores1.values()) if scores1 else 0.0
        avg_score2 = fmean(scores2.values()) if scores2 else 0.0

        # Merge components
        merged_components = self._merge_components(
            ancestor.components,
            parent1.components,
            parent2.components,
            avg_score1,
            avg_score2,
        )

        # Create merged candidate
        merged_candidate = Candidate(
            components=merged_components,
            generation=max(parent1.generation, parent2.generation) + 1,
            parent_ids=[parent1_idx, parent2_idx],
        )

        # Log successful merge
        logger.info(
            "merge_proposer.merge_success",
            parent1_idx=parent1_idx,
            parent2_idx=parent2_idx,
            ancestor_idx=ancestor_idx,
            components_merged=list(merged_components.keys()),
        )

        return ProposalResult(
            candidate=merged_candidate,
            parent_indices=[parent1_idx, parent2_idx],
            tag="merge",
            metadata={"ancestor_idx": ancestor_idx},
        )

    def _find_merge_candidates(
        self,
        state: ParetoState,
    ) -> tuple[int, int, int] | None:
        """Find two candidates suitable for merging.

        Args:
            state: Current Pareto state.

        Returns:
            Tuple of (parent1_idx, parent2_idx, ancestor_idx) or None if no
            suitable pair found.

        Note:
            Searches for suitable merge candidates from the Pareto frontier.
            Requires common ancestor and prevents duplicate merge attempts.
        """
        # Get frontier candidates (non-dominated)
        frontier_candidates = state.frontier.get_non_dominated()

        if len(frontier_candidates) < 2:
            logger.debug(
                "merge_proposer.insufficient_frontier",
                frontier_size=len(frontier_candidates),
            )
            return None

        # Try to find a suitable pair
        for _ in range(self.max_attempts):
            # Sample two different candidates from frontier
            candidate_list = list(frontier_candidates)
            if len(candidate_list) < 2:
                return None

            parent1_idx = self.rng.choice(candidate_list)
            parent2_idx = self.rng.choice(candidate_list)

            # Must be different candidates
            if parent1_idx == parent2_idx:
                continue

            # Ensure consistent ordering for deduplication
            if parent1_idx > parent2_idx:
                parent1_idx, parent2_idx = parent2_idx, parent1_idx

            # Find common ancestor
            ancestor_idx = find_common_ancestor(
                parent1_idx, parent2_idx, state.parent_indices
            )

            if ancestor_idx is None:
                continue

            # Check if already attempted
            merge_log: AncestorLog = (parent1_idx, parent2_idx, ancestor_idx)
            if merge_log in self.attempted_merges:
                continue

            # Check ancestor score constraint (ancestor should not be better than descendants)
            ancestor_scores = state.candidate_scores.get(ancestor_idx, {})
            parent1_scores = state.candidate_scores.get(parent1_idx, {})
            parent2_scores = state.candidate_scores.get(parent2_idx, {})

            if ancestor_scores and parent1_scores and parent2_scores:
                ancestor_avg = fmean(ancestor_scores.values())
                parent1_avg = fmean(parent1_scores.values())
                parent2_avg = fmean(parent2_scores.values())

                # Ancestor should not be better than both descendants
                if ancestor_avg > max(parent1_avg, parent2_avg):
                    logger.debug(
                        "merge_proposer.ancestor_too_good",
                        ancestor_idx=ancestor_idx,
                        ancestor_avg=ancestor_avg,
                        parent1_avg=parent1_avg,
                        parent2_avg=parent2_avg,
                    )
                    continue

            # Mark as attempted
            self.attempted_merges.add(merge_log)

            return (parent1_idx, parent2_idx, ancestor_idx)

        logger.debug("merge_proposer.max_attempts_exceeded", attempts=self.max_attempts)
        return None

    def _merge_components(
        self,
        ancestor: dict[str, str],
        parent1: dict[str, str],
        parent2: dict[str, str],
        score1: float,
        score2: float,
    ) -> dict[str, str]:
        """Merge components from two parents based on ancestor divergence.

        Args:
            ancestor: Component dictionary from common ancestor.
            parent1: Component dictionary from first parent.
            parent2: Component dictionary from second parent.
            score1: Average score of first parent.
            score2: Average score of second parent.

        Returns:
            Merged component dictionary.

        Note:
            Strategy for merging components:
            - If both parents same → take either
            - If one unchanged from ancestor, other changed → take changed value
            - If both changed differently → take higher scorer's value
            Components present only in parents (not ancestor) are ignored.
        """
        merged: dict[str, str] = {}

        for key in ancestor.keys():
            anc_val = ancestor[key]
            p1_val = parent1.get(key, anc_val)
            p2_val = parent2.get(key, anc_val)

            if p1_val == p2_val:
                # Both same - take either
                merged[key] = p1_val
            elif p1_val == anc_val and p2_val != anc_val:
                # P1 unchanged, P2 changed - take P2's innovation
                merged[key] = p2_val
            elif p2_val == anc_val and p1_val != anc_val:
                # P2 unchanged, P1 changed - take P1's innovation
                merged[key] = p1_val
            else:
                # Both changed differently - take higher scorer's value
                merged[key] = p1_val if score1 >= score2 else p2_val

        return merged

__init__

__init__(
    rng: Random,
    val_overlap_floor: int = 5,
    max_attempts: int = 10,
) -> None

Initialize MergeProposer.

PARAMETER DESCRIPTION
rng

Random number generator for candidate selection.

TYPE: Random

val_overlap_floor

Minimum overlapping validation examples required.

TYPE: int DEFAULT: 5

max_attempts

Maximum merge attempts before giving up.

TYPE: int DEFAULT: 10

Note

Creates a new MergeProposer instance with the specified random number generator and configuration parameters. The attempted_merges set is initialized empty to track merge attempts.

Source code in src/gepa_adk/engine/merge_proposer.py
def __init__(
    self,
    rng: random.Random,
    val_overlap_floor: int = 5,
    max_attempts: int = 10,
) -> None:
    """Initialize MergeProposer.

    Args:
        rng: Random number generator for candidate selection.
        val_overlap_floor: Minimum overlapping validation examples required.
        max_attempts: Maximum merge attempts before giving up.

    Note:
        Creates a new MergeProposer instance with the specified random number
        generator and configuration parameters. The attempted_merges set is
        initialized empty to track merge attempts.
    """
    self.rng = rng
    self.val_overlap_floor = val_overlap_floor
    self.max_attempts = max_attempts
    self.attempted_merges: set[AncestorLog] = set()

propose async

propose(
    state: ParetoState, eval_batch: object | None = None
) -> ProposalResult | None

Attempt to merge two frontier candidates.

PARAMETER DESCRIPTION
state

Current Pareto state with candidates, scores, and genealogy. Must contain at least 2 candidates on the Pareto frontier with a shared common ancestor for merge to succeed.

TYPE: ParetoState

eval_batch

Ignored for merge proposals. Merge operations do not require evaluation batch data as they combine existing candidates.

TYPE: object | None DEFAULT: None

RETURNS DESCRIPTION
ProposalResult | None

ProposalResult | None: ProposalResult with merged candidate and both parent indices,

ProposalResult | None

or None if merge not possible (e.g., no common ancestor, insufficient frontier,

ProposalResult | None

or no complementary component changes).

Examples:

Proposing a merge from evolution state:

proposer = MergeProposer(rng=random.Random(42))
result = await proposer.propose(state)
if result:
    print(f"Merged from parents {result.parent_indices}")
    print(f"Ancestor: {result.metadata['ancestor_idx']}")
Note

Operations select candidates from Pareto frontier only. Requires common ancestor and complementary component changes for successful merge. Validates minimum validation overlap before merging.

Source code in src/gepa_adk/engine/merge_proposer.py
async def propose(
    self,
    state: ParetoState,
    eval_batch: object | None = None,  # Ignored for merge proposals
) -> ProposalResult | None:
    """Attempt to merge two frontier candidates.

    Args:
        state (ParetoState): Current Pareto state with candidates, scores, and genealogy.
            Must contain at least 2 candidates on the Pareto frontier with a shared
            common ancestor for merge to succeed.
        eval_batch (object | None): Ignored for merge proposals. Merge operations
            do not require evaluation batch data as they combine existing candidates.

    Returns:
        ProposalResult | None: ProposalResult with merged candidate and both parent indices,
        or None if merge not possible (e.g., no common ancestor, insufficient frontier,
        or no complementary component changes).

    Examples:
        Proposing a merge from evolution state:

        ```python
        proposer = MergeProposer(rng=random.Random(42))
        result = await proposer.propose(state)
        if result:
            print(f"Merged from parents {result.parent_indices}")
            print(f"Ancestor: {result.metadata['ancestor_idx']}")
        ```

    Note:
        Operations select candidates from Pareto frontier only. Requires common ancestor
        and complementary component changes for successful merge. Validates
        minimum validation overlap before merging.
    """
    # Find suitable merge candidates
    merge_candidates = self._find_merge_candidates(state)
    if merge_candidates is None:
        logger.debug("merge_proposer.no_candidates", reason="no_suitable_pair")
        return None

    parent1_idx, parent2_idx, ancestor_idx = merge_candidates

    # Get candidate components
    ancestor = state.candidates[ancestor_idx]
    parent1 = state.candidates[parent1_idx]
    parent2 = state.candidates[parent2_idx]

    # Check if merge is desirable (complementary changes)
    if not has_desirable_predictors(
        ancestor.components, parent1.components, parent2.components
    ):
        logger.debug(
            "merge_proposer.no_desirable_predictors",
            parent1_idx=parent1_idx,
            parent2_idx=parent2_idx,
            ancestor_idx=ancestor_idx,
        )
        return None

    # Check validation overlap
    scores1 = state.candidate_scores.get(parent1_idx, {})
    scores2 = state.candidate_scores.get(parent2_idx, {})
    overlap = set(scores1.keys()) & set(scores2.keys())
    valid_overlap = {idx for idx in overlap if isinstance(idx, int) and idx >= 0}
    if len(valid_overlap) < self.val_overlap_floor:
        logger.debug(
            "merge_proposer.insufficient_overlap",
            parent1_idx=parent1_idx,
            parent2_idx=parent2_idx,
            overlap_count=len(valid_overlap),
            required=self.val_overlap_floor,
        )
        return None

    # Calculate average scores for component selection
    avg_score1 = fmean(scores1.values()) if scores1 else 0.0
    avg_score2 = fmean(scores2.values()) if scores2 else 0.0

    # Merge components
    merged_components = self._merge_components(
        ancestor.components,
        parent1.components,
        parent2.components,
        avg_score1,
        avg_score2,
    )

    # Create merged candidate
    merged_candidate = Candidate(
        components=merged_components,
        generation=max(parent1.generation, parent2.generation) + 1,
        parent_ids=[parent1_idx, parent2_idx],
    )

    # Log successful merge
    logger.info(
        "merge_proposer.merge_success",
        parent1_idx=parent1_idx,
        parent2_idx=parent2_idx,
        ancestor_idx=ancestor_idx,
        components_merged=list(merged_components.keys()),
    )

    return ProposalResult(
        candidate=merged_candidate,
        parent_indices=[parent1_idx, parent2_idx],
        tag="merge",
        metadata={"ancestor_idx": ancestor_idx},
    )

AgentProvider

Bases: Protocol


              flowchart TD
              gepa_adk.AgentProvider[AgentProvider]

              

              click gepa_adk.AgentProvider href "" "gepa_adk.AgentProvider"
            

Protocol for loading and persisting agents.

Implementations provide agent configuration storage and retrieval, enabling the evolution system to load agents and persist evolved instructions.

Examples:

Implement a minimal in-memory provider:

class InMemoryProvider:
    def __init__(self):
        self._agents = {}

    def get_agent(self, name: str) -> LlmAgent:
        if not name:
            raise ValueError("Agent name cannot be empty")
        if name not in self._agents:
            raise KeyError(f"Agent not found: {name}")
        return self._agents[name]

    def save_instruction(self, name: str, instruction: str) -> None:
        if not name:
            raise ValueError("Agent name cannot be empty")
        if name not in self._agents:
            raise KeyError(f"Agent not found: {name}")
        self._agents[name].instruction = instruction

    def list_agents(self) -> list[str]:
        return list(self._agents.keys())

Verify protocol compliance:

from gepa_adk.ports import AgentProvider

provider = InMemoryProvider()
assert isinstance(provider, AgentProvider)  # Runtime check works
Note

All implementations must provide get_agent(), save_instruction(), and list_agents() methods. Use @runtime_checkable to enable isinstance() checks for protocol compliance.

Source code in src/gepa_adk/ports/agent_provider.py
@runtime_checkable
class AgentProvider(Protocol):
    """Protocol for loading and persisting agents.

    Implementations provide agent configuration storage and retrieval,
    enabling the evolution system to load agents and persist evolved
    instructions.

    Examples:
        Implement a minimal in-memory provider:

        ```python
        class InMemoryProvider:
            def __init__(self):
                self._agents = {}

            def get_agent(self, name: str) -> LlmAgent:
                if not name:
                    raise ValueError("Agent name cannot be empty")
                if name not in self._agents:
                    raise KeyError(f"Agent not found: {name}")
                return self._agents[name]

            def save_instruction(self, name: str, instruction: str) -> None:
                if not name:
                    raise ValueError("Agent name cannot be empty")
                if name not in self._agents:
                    raise KeyError(f"Agent not found: {name}")
                self._agents[name].instruction = instruction

            def list_agents(self) -> list[str]:
                return list(self._agents.keys())
        ```

        Verify protocol compliance:

        ```python
        from gepa_adk.ports import AgentProvider

        provider = InMemoryProvider()
        assert isinstance(provider, AgentProvider)  # Runtime check works
        ```

    Note:
        All implementations must provide get_agent(), save_instruction(),
        and list_agents() methods. Use @runtime_checkable to enable
        isinstance() checks for protocol compliance.
    """

    def get_agent(self, name: str) -> "LlmAgent":
        """Load an agent by its unique name.

        Args:
            name: The unique identifier for the agent.

        Returns:
            The configured LlmAgent instance ready for use.

        Raises:
            KeyError: If no agent with the given name exists.
            ValueError: If name is empty or invalid.

        Examples:
            Load a named agent:

            ```python
            provider = MyAgentProvider()
            agent = provider.get_agent("my_agent")
            print(agent.instruction)
            ```

        Note:
            Only non-empty names are accepted. Configured agents ready
            for use with the evolution system should be returned.
        """
        ...

    def save_instruction(self, name: str, instruction: str) -> None:
        """Persist an evolved instruction for a named agent.

        Args:
            name: The unique identifier for the agent.
            instruction: The new instruction text to persist.

        Raises:
            KeyError: If no agent with the given name exists.
            ValueError: If name is empty or invalid.
            IOError: If persistence fails (implementation-specific).

        Examples:
            Save an evolved instruction:

            ```python
            provider.save_instruction(
                "my_agent", "You are a helpful assistant specialized in coding."
            )
            ```

        Note:
            Only after successful persistence should subsequent calls to
            get_agent() return an agent with the updated instruction.
            Implementations should validate empty names before processing.
        """
        ...

    def list_agents(self) -> list[str]:
        """List all available agent names.

        Returns:
            A list of agent name strings. Empty list if no agents.

        Examples:
            Discover available agents:

            ```python
            provider = MyAgentProvider()
            for name in provider.list_agents():
                print(f"Found agent: {name}")
            ```

        Note:
            Ordering of returned names is not guaranteed by the protocol.
            Some implementations may return names in a specific order,
            but callers should not rely upon any particular sequence.
        """
        ...

get_agent

get_agent(name: str) -> 'LlmAgent'

Load an agent by its unique name.

PARAMETER DESCRIPTION
name

The unique identifier for the agent.

TYPE: str

RETURNS DESCRIPTION
'LlmAgent'

The configured LlmAgent instance ready for use.

RAISES DESCRIPTION
KeyError

If no agent with the given name exists.

ValueError

If name is empty or invalid.

Examples:

Load a named agent:

provider = MyAgentProvider()
agent = provider.get_agent("my_agent")
print(agent.instruction)
Note

Only non-empty names are accepted. Configured agents ready for use with the evolution system should be returned.

Source code in src/gepa_adk/ports/agent_provider.py
def get_agent(self, name: str) -> "LlmAgent":
    """Load an agent by its unique name.

    Args:
        name: The unique identifier for the agent.

    Returns:
        The configured LlmAgent instance ready for use.

    Raises:
        KeyError: If no agent with the given name exists.
        ValueError: If name is empty or invalid.

    Examples:
        Load a named agent:

        ```python
        provider = MyAgentProvider()
        agent = provider.get_agent("my_agent")
        print(agent.instruction)
        ```

    Note:
        Only non-empty names are accepted. Configured agents ready
        for use with the evolution system should be returned.
    """
    ...

save_instruction

save_instruction(name: str, instruction: str) -> None

Persist an evolved instruction for a named agent.

PARAMETER DESCRIPTION
name

The unique identifier for the agent.

TYPE: str

instruction

The new instruction text to persist.

TYPE: str

RAISES DESCRIPTION
KeyError

If no agent with the given name exists.

ValueError

If name is empty or invalid.

IOError

If persistence fails (implementation-specific).

Examples:

Save an evolved instruction:

provider.save_instruction(
    "my_agent", "You are a helpful assistant specialized in coding."
)
Note

Only after successful persistence should subsequent calls to get_agent() return an agent with the updated instruction. Implementations should validate empty names before processing.

Source code in src/gepa_adk/ports/agent_provider.py
def save_instruction(self, name: str, instruction: str) -> None:
    """Persist an evolved instruction for a named agent.

    Args:
        name: The unique identifier for the agent.
        instruction: The new instruction text to persist.

    Raises:
        KeyError: If no agent with the given name exists.
        ValueError: If name is empty or invalid.
        IOError: If persistence fails (implementation-specific).

    Examples:
        Save an evolved instruction:

        ```python
        provider.save_instruction(
            "my_agent", "You are a helpful assistant specialized in coding."
        )
        ```

    Note:
        Only after successful persistence should subsequent calls to
        get_agent() return an agent with the updated instruction.
        Implementations should validate empty names before processing.
    """
    ...

list_agents

list_agents() -> list[str]

List all available agent names.

RETURNS DESCRIPTION
list[str]

A list of agent name strings. Empty list if no agents.

Examples:

Discover available agents:

provider = MyAgentProvider()
for name in provider.list_agents():
    print(f"Found agent: {name}")
Note

Ordering of returned names is not guaranteed by the protocol. Some implementations may return names in a specific order, but callers should not rely upon any particular sequence.

Source code in src/gepa_adk/ports/agent_provider.py
def list_agents(self) -> list[str]:
    """List all available agent names.

    Returns:
        A list of agent name strings. Empty list if no agents.

    Examples:
        Discover available agents:

        ```python
        provider = MyAgentProvider()
        for name in provider.list_agents():
            print(f"Found agent: {name}")
        ```

    Note:
        Ordering of returned names is not guaranteed by the protocol.
        Some implementations may return names in a specific order,
        but callers should not rely upon any particular sequence.
    """
    ...

AsyncGEPAAdapter

Bases: Protocol[DataInst, Trajectory, RolloutOutput]


              flowchart TD
              gepa_adk.AsyncGEPAAdapter[AsyncGEPAAdapter]

              

              click gepa_adk.AsyncGEPAAdapter href "" "gepa_adk.AsyncGEPAAdapter"
            

Protocol for async GEPA adapters used by the evolution engine.

Implementations provide evaluation, reflection dataset generation, and proposal updates for candidate component texts.

Examples:

Implement a minimal adapter:

class MyAdapter:
    async def evaluate(self, batch, candidate, capture_traces=False):
        return EvaluationBatch(outputs=[], scores=[])

    async def make_reflective_dataset(
        self, candidate, eval_batch, components_to_update
    ):
        return {component: [] for component in components_to_update}

    async def propose_new_texts(
        self, candidate, reflective_dataset, components_to_update
    ):
        return {
            component: candidate[component]
            for component in components_to_update
        }
Note

Adapters must implement all three async methods to satisfy the protocol. Use runtime_checkable for isinstance() checks.

Source code in src/gepa_adk/ports/adapter.py
@runtime_checkable
class AsyncGEPAAdapter(Protocol[DataInst, Trajectory, RolloutOutput]):
    """Protocol for async GEPA adapters used by the evolution engine.

    Implementations provide evaluation, reflection dataset generation, and
    proposal updates for candidate component texts.

    Examples:
        Implement a minimal adapter:

        ```python
        class MyAdapter:
            async def evaluate(self, batch, candidate, capture_traces=False):
                return EvaluationBatch(outputs=[], scores=[])

            async def make_reflective_dataset(
                self, candidate, eval_batch, components_to_update
            ):
                return {component: [] for component in components_to_update}

            async def propose_new_texts(
                self, candidate, reflective_dataset, components_to_update
            ):
                return {
                    component: candidate[component]
                    for component in components_to_update
                }
        ```

    Note:
        Adapters must implement all three async methods to satisfy
        the protocol. Use runtime_checkable for isinstance() checks.
    """

    async def evaluate(
        self,
        batch: list[DataInst],
        candidate: dict[str, str],
        capture_traces: bool = False,
    ) -> EvaluationBatch[Trajectory, RolloutOutput]:
        """Evaluate a candidate over a batch of inputs.

        Args:
            batch: Input data instances to evaluate.
            candidate: Component name to text mapping.
            capture_traces: Whether to capture execution traces.

        Returns:
            Evaluation results with outputs, scores, and optional traces.

        Examples:
            Basic evaluation:

            ```python
            result = await adapter.evaluate(batch, candidate)
            assert len(result.scores) == len(batch)
            ```

        Note:
            Output and score lists must have the same length as the
            input batch. Set capture_traces=True to enable reflection.
        """

    async def make_reflective_dataset(
        self,
        candidate: dict[str, str],
        eval_batch: EvaluationBatch[Trajectory, RolloutOutput],
        components_to_update: list[str],
    ) -> Mapping[str, Sequence[Mapping[str, Any]]]:
        """Build reflective datasets from evaluation traces.

        Args:
            candidate: Current candidate components.
            eval_batch: Evaluation results with traces.
            components_to_update: Components to generate datasets for.

        Returns:
            Mapping of component name to reflective examples.

        Examples:
            Build datasets for specific components:

            ```python
            dataset = await adapter.make_reflective_dataset(
                candidate,
                eval_batch,
                ["instruction"],
            )
            ```

        Note:
            Only call this method when eval_batch contains trajectories.
            Each component receives its own list of reflective examples.
        """

    async def propose_new_texts(
        self,
        candidate: dict[str, str],
        reflective_dataset: Mapping[str, Sequence[Mapping[str, Any]]],
        components_to_update: list[str],
    ) -> dict[str, str]:
        """Propose updated component texts from reflective datasets.

        Args:
            candidate: Current candidate components.
            reflective_dataset: Reflective examples per component.
            components_to_update: Components to propose updates for.

        Returns:
            Mapping of component name to new proposed text.

        Examples:
            Generate new component texts:

            ```python
            proposals = await adapter.propose_new_texts(
                candidate,
                reflective_dataset,
                ["instruction"],
            )
            ```

        Note:
            Outputs should contain improved text for each requested
            component. The evolution engine uses these as mutation candidates.
        """

evaluate async

evaluate(
    batch: list[DataInst],
    candidate: dict[str, str],
    capture_traces: bool = False,
) -> EvaluationBatch[Trajectory, RolloutOutput]

Evaluate a candidate over a batch of inputs.

PARAMETER DESCRIPTION
batch

Input data instances to evaluate.

TYPE: list[DataInst]

candidate

Component name to text mapping.

TYPE: dict[str, str]

capture_traces

Whether to capture execution traces.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
EvaluationBatch[Trajectory, RolloutOutput]

Evaluation results with outputs, scores, and optional traces.

Examples:

Basic evaluation:

result = await adapter.evaluate(batch, candidate)
assert len(result.scores) == len(batch)
Note

Output and score lists must have the same length as the input batch. Set capture_traces=True to enable reflection.

Source code in src/gepa_adk/ports/adapter.py
async def evaluate(
    self,
    batch: list[DataInst],
    candidate: dict[str, str],
    capture_traces: bool = False,
) -> EvaluationBatch[Trajectory, RolloutOutput]:
    """Evaluate a candidate over a batch of inputs.

    Args:
        batch: Input data instances to evaluate.
        candidate: Component name to text mapping.
        capture_traces: Whether to capture execution traces.

    Returns:
        Evaluation results with outputs, scores, and optional traces.

    Examples:
        Basic evaluation:

        ```python
        result = await adapter.evaluate(batch, candidate)
        assert len(result.scores) == len(batch)
        ```

    Note:
        Output and score lists must have the same length as the
        input batch. Set capture_traces=True to enable reflection.
    """

make_reflective_dataset async

make_reflective_dataset(
    candidate: dict[str, str],
    eval_batch: EvaluationBatch[Trajectory, RolloutOutput],
    components_to_update: list[str],
) -> Mapping[str, Sequence[Mapping[str, Any]]]

Build reflective datasets from evaluation traces.

PARAMETER DESCRIPTION
candidate

Current candidate components.

TYPE: dict[str, str]

eval_batch

Evaluation results with traces.

TYPE: EvaluationBatch[Trajectory, RolloutOutput]

components_to_update

Components to generate datasets for.

TYPE: list[str]

RETURNS DESCRIPTION
Mapping[str, Sequence[Mapping[str, Any]]]

Mapping of component name to reflective examples.

Examples:

Build datasets for specific components:

dataset = await adapter.make_reflective_dataset(
    candidate,
    eval_batch,
    ["instruction"],
)
Note

Only call this method when eval_batch contains trajectories. Each component receives its own list of reflective examples.

Source code in src/gepa_adk/ports/adapter.py
async def make_reflective_dataset(
    self,
    candidate: dict[str, str],
    eval_batch: EvaluationBatch[Trajectory, RolloutOutput],
    components_to_update: list[str],
) -> Mapping[str, Sequence[Mapping[str, Any]]]:
    """Build reflective datasets from evaluation traces.

    Args:
        candidate: Current candidate components.
        eval_batch: Evaluation results with traces.
        components_to_update: Components to generate datasets for.

    Returns:
        Mapping of component name to reflective examples.

    Examples:
        Build datasets for specific components:

        ```python
        dataset = await adapter.make_reflective_dataset(
            candidate,
            eval_batch,
            ["instruction"],
        )
        ```

    Note:
        Only call this method when eval_batch contains trajectories.
        Each component receives its own list of reflective examples.
    """

propose_new_texts async

propose_new_texts(
    candidate: dict[str, str],
    reflective_dataset: Mapping[
        str, Sequence[Mapping[str, Any]]
    ],
    components_to_update: list[str],
) -> dict[str, str]

Propose updated component texts from reflective datasets.

PARAMETER DESCRIPTION
candidate

Current candidate components.

TYPE: dict[str, str]

reflective_dataset

Reflective examples per component.

TYPE: Mapping[str, Sequence[Mapping[str, Any]]]

components_to_update

Components to propose updates for.

TYPE: list[str]

RETURNS DESCRIPTION
dict[str, str]

Mapping of component name to new proposed text.

Examples:

Generate new component texts:

proposals = await adapter.propose_new_texts(
    candidate,
    reflective_dataset,
    ["instruction"],
)
Note

Outputs should contain improved text for each requested component. The evolution engine uses these as mutation candidates.

Source code in src/gepa_adk/ports/adapter.py
async def propose_new_texts(
    self,
    candidate: dict[str, str],
    reflective_dataset: Mapping[str, Sequence[Mapping[str, Any]]],
    components_to_update: list[str],
) -> dict[str, str]:
    """Propose updated component texts from reflective datasets.

    Args:
        candidate: Current candidate components.
        reflective_dataset: Reflective examples per component.
        components_to_update: Components to propose updates for.

    Returns:
        Mapping of component name to new proposed text.

    Examples:
        Generate new component texts:

        ```python
        proposals = await adapter.propose_new_texts(
            candidate,
            reflective_dataset,
            ["instruction"],
        )
        ```

    Note:
        Outputs should contain improved text for each requested
        component. The evolution engine uses these as mutation candidates.
    """

ComponentHandler

Bases: Protocol


              flowchart TD
              gepa_adk.ComponentHandler[ComponentHandler]

              

              click gepa_adk.ComponentHandler href "" "gepa_adk.ComponentHandler"
            

Protocol for component serialization and application.

Handles the serialize/apply/restore cycle for one component type during evolution. Implementations must be stateless and thread-safe.

The protocol defines three operations: 1. serialize: Extract current component value as string 2. apply: Set new value, return original for restoration 3. restore: Reinstate original value after evaluation

Examples:

Implement a custom handler:

class TemperatureHandler:
    def serialize(self, agent: LlmAgent) -> str:
        config = getattr(agent, "generate_content_config", None)
        if config and hasattr(config, "temperature"):
            return str(config.temperature)
        return "1.0"

    def apply(self, agent: LlmAgent, value: str) -> Any:
        config = getattr(agent, "generate_content_config", None)
        original = config.temperature if config else 1.0
        if config:
            config.temperature = float(value)
        return original

    def restore(self, agent: LlmAgent, original: Any) -> None:
        config = getattr(agent, "generate_content_config", None)
        if config:
            config.temperature = original
See Also
Note

All methods are synchronous - no I/O operations should be performed. Apply() should log warnings and keep the original value rather than raising exceptions on invalid inputs for error safety.

Source code in src/gepa_adk/ports/component_handler.py
@runtime_checkable
class ComponentHandler(Protocol):
    """Protocol for component serialization and application.

    Handles the serialize/apply/restore cycle for one component type during
    evolution. Implementations must be stateless and thread-safe.

    The protocol defines three operations:
    1. **serialize**: Extract current component value as string
    2. **apply**: Set new value, return original for restoration
    3. **restore**: Reinstate original value after evaluation

    Examples:
        Implement a custom handler:

        ```python
        class TemperatureHandler:
            def serialize(self, agent: LlmAgent) -> str:
                config = getattr(agent, "generate_content_config", None)
                if config and hasattr(config, "temperature"):
                    return str(config.temperature)
                return "1.0"

            def apply(self, agent: LlmAgent, value: str) -> Any:
                config = getattr(agent, "generate_content_config", None)
                original = config.temperature if config else 1.0
                if config:
                    config.temperature = float(value)
                return original

            def restore(self, agent: LlmAgent, original: Any) -> None:
                config = getattr(agent, "generate_content_config", None)
                if config:
                    config.temperature = original
        ```

    See Also:
        - [`InstructionHandler`]\
[gepa_adk.adapters.components.component_handlers.InstructionHandler]:
            Handler for agent instruction.
        - [`OutputSchemaHandler`]\
[gepa_adk.adapters.components.component_handlers.OutputSchemaHandler]:
            Handler for agent output schema.

    Note:
        All methods are synchronous - no I/O operations should be performed.
        Apply() should log warnings and keep the original value rather than
        raising exceptions on invalid inputs for error safety.
    """

    def serialize(self, agent: "LlmAgent") -> str:
        """Extract component value from agent as string for evolution.

        Args:
            agent: The LlmAgent instance to extract component from.

        Returns:
            String representation of the component value.
            Returns empty string if component is not set.

        Examples:
            ```python
            handler = InstructionHandler()
            text = handler.serialize(agent)
            # text == "You are a helpful assistant."
            ```

        Note:
            Operations must never raise exceptions for missing values.
            Return empty string or sensible default instead.
        """
        ...

    def apply(self, agent: "LlmAgent", value: str) -> Any:
        """Apply evolved value to agent, return original for restore.

        Args:
            agent: The LlmAgent instance to modify.
            value: The new component value as string.

        Returns:
            The original component value (type depends on component).
            This value will be passed to restore() later.

        Examples:
            ```python
            handler = InstructionHandler()
            original = handler.apply(agent, "New instruction")
            # agent.instruction is now "New instruction"
            # original contains previous instruction value
            ```

        Note:
            On application failure (e.g., invalid schema), log warning
            and return original without modifying agent. Never raise
            exceptions - graceful degradation is required.
        """
        ...

    def restore(self, agent: "LlmAgent", original: Any) -> None:
        """Restore original value after evaluation.

        Args:
            agent: The LlmAgent instance to restore.
            original: The original value returned by apply().

        Examples:
            ```python
            handler = InstructionHandler()
            original = handler.apply(agent, "Temp instruction")
            # ... run evaluation ...
            handler.restore(agent, original)
            # agent.instruction is back to original value
            ```

        Note:
            Original value restoration always succeeds - never raises exceptions.
            None values reset to component default.
        """
        ...

serialize

serialize(agent: 'LlmAgent') -> str

Extract component value from agent as string for evolution.

PARAMETER DESCRIPTION
agent

The LlmAgent instance to extract component from.

TYPE: 'LlmAgent'

RETURNS DESCRIPTION
str

String representation of the component value.

str

Returns empty string if component is not set.

Examples:

handler = InstructionHandler()
text = handler.serialize(agent)
# text == "You are a helpful assistant."
Note

Operations must never raise exceptions for missing values. Return empty string or sensible default instead.

Source code in src/gepa_adk/ports/component_handler.py
def serialize(self, agent: "LlmAgent") -> str:
    """Extract component value from agent as string for evolution.

    Args:
        agent: The LlmAgent instance to extract component from.

    Returns:
        String representation of the component value.
        Returns empty string if component is not set.

    Examples:
        ```python
        handler = InstructionHandler()
        text = handler.serialize(agent)
        # text == "You are a helpful assistant."
        ```

    Note:
        Operations must never raise exceptions for missing values.
        Return empty string or sensible default instead.
    """
    ...

apply

apply(agent: 'LlmAgent', value: str) -> Any

Apply evolved value to agent, return original for restore.

PARAMETER DESCRIPTION
agent

The LlmAgent instance to modify.

TYPE: 'LlmAgent'

value

The new component value as string.

TYPE: str

RETURNS DESCRIPTION
Any

The original component value (type depends on component).

Any

This value will be passed to restore() later.

Examples:

handler = InstructionHandler()
original = handler.apply(agent, "New instruction")
# agent.instruction is now "New instruction"
# original contains previous instruction value
Note

On application failure (e.g., invalid schema), log warning and return original without modifying agent. Never raise exceptions - graceful degradation is required.

Source code in src/gepa_adk/ports/component_handler.py
def apply(self, agent: "LlmAgent", value: str) -> Any:
    """Apply evolved value to agent, return original for restore.

    Args:
        agent: The LlmAgent instance to modify.
        value: The new component value as string.

    Returns:
        The original component value (type depends on component).
        This value will be passed to restore() later.

    Examples:
        ```python
        handler = InstructionHandler()
        original = handler.apply(agent, "New instruction")
        # agent.instruction is now "New instruction"
        # original contains previous instruction value
        ```

    Note:
        On application failure (e.g., invalid schema), log warning
        and return original without modifying agent. Never raise
        exceptions - graceful degradation is required.
    """
    ...

restore

restore(agent: 'LlmAgent', original: Any) -> None

Restore original value after evaluation.

PARAMETER DESCRIPTION
agent

The LlmAgent instance to restore.

TYPE: 'LlmAgent'

original

The original value returned by apply().

TYPE: Any

Examples:

handler = InstructionHandler()
original = handler.apply(agent, "Temp instruction")
# ... run evaluation ...
handler.restore(agent, original)
# agent.instruction is back to original value
Note

Original value restoration always succeeds - never raises exceptions. None values reset to component default.

Source code in src/gepa_adk/ports/component_handler.py
def restore(self, agent: "LlmAgent", original: Any) -> None:
    """Restore original value after evaluation.

    Args:
        agent: The LlmAgent instance to restore.
        original: The original value returned by apply().

    Examples:
        ```python
        handler = InstructionHandler()
        original = handler.apply(agent, "Temp instruction")
        # ... run evaluation ...
        handler.restore(agent, original)
        # agent.instruction is back to original value
        ```

    Note:
        Original value restoration always succeeds - never raises exceptions.
        None values reset to component default.
    """
    ...

EvaluationBatch dataclass

Bases: Generic[Trajectory, RolloutOutput]


              flowchart TD
              gepa_adk.EvaluationBatch[EvaluationBatch]

              

              click gepa_adk.EvaluationBatch href "" "gepa_adk.EvaluationBatch"
            

Container for evaluation outputs and scores.

ATTRIBUTE DESCRIPTION
outputs

Per-example outputs produced during evaluation.

TYPE: list[RolloutOutput]

scores

Per-example normalized scores (higher is better).

TYPE: list[Score]

trajectories

Optional per-example execution traces.

TYPE: list[Trajectory] | None

objective_scores

Optional multi-objective scores per example.

TYPE: list[dict[ComponentName, Score]] | None

metadata

Optional per-example scorer metadata. When provided, metadata[i] corresponds to outputs[i] and scores[i] (index-aligned). Metadata dicts may contain scorer-specific fields like 'feedback', 'actionable_guidance', or 'dimension_scores' from CriticScorer implementations.

TYPE: list[dict[str, Any]] | None

inputs

Optional per-example input text that was used to generate each output. Used by make_reflective_dataset to provide context for reflection. When provided, inputs[i] corresponds to the input that produced outputs[i].

TYPE: list[str] | None

Examples:

Create a batch with optional traces:

batch = EvaluationBatch(
    outputs=["ok", "ok"],
    scores=[0.9, 0.8],
    trajectories=[{"trace": 1}, {"trace": 2}],
)

Create a batch with inputs for reflection:

batch = EvaluationBatch(
    outputs=["Hello!", "Good morrow!"],
    scores=[0.3, 0.9],
    inputs=["I am the King", "I am your friend"],
)
Note

All fields are immutable once created due to frozen=True. Use this as the standard return type from adapter evaluations. When metadata is not None, len(metadata) must equal len(outputs) and len(scores).

Source code in src/gepa_adk/ports/adapter.py
@dataclass(frozen=True, slots=True)
class EvaluationBatch(Generic[Trajectory, RolloutOutput]):
    """Container for evaluation outputs and scores.

    Attributes:
        outputs (list[RolloutOutput]): Per-example outputs produced during evaluation.
        scores (list[Score]): Per-example normalized scores (higher is better).
        trajectories (list[Trajectory] | None): Optional per-example execution traces.
        objective_scores (list[dict[ComponentName, Score]] | None): Optional
            multi-objective scores per example.
        metadata (list[dict[str, Any]] | None): Optional per-example scorer metadata.
            When provided, metadata[i] corresponds to outputs[i] and scores[i]
            (index-aligned). Metadata dicts may contain scorer-specific fields like
            'feedback', 'actionable_guidance', or 'dimension_scores' from
            CriticScorer implementations.
        inputs (list[str] | None): Optional per-example input text that was used
            to generate each output. Used by make_reflective_dataset to provide
            context for reflection. When provided, inputs[i] corresponds to the
            input that produced outputs[i].

    Examples:
        Create a batch with optional traces:

        ```python
        batch = EvaluationBatch(
            outputs=["ok", "ok"],
            scores=[0.9, 0.8],
            trajectories=[{"trace": 1}, {"trace": 2}],
        )
        ```

        Create a batch with inputs for reflection:

        ```python
        batch = EvaluationBatch(
            outputs=["Hello!", "Good morrow!"],
            scores=[0.3, 0.9],
            inputs=["I am the King", "I am your friend"],
        )
        ```

    Note:
        All fields are immutable once created due to frozen=True.
        Use this as the standard return type from adapter evaluations.
        When metadata is not None, len(metadata) must equal len(outputs) and len(scores).
    """

    outputs: list[RolloutOutput]
    scores: list[Score]
    trajectories: list[Trajectory] | None = None
    objective_scores: list[dict[ComponentName, Score]] | None = None
    metadata: list[dict[str, Any]] | None = None
    inputs: list[str] | None = None

ComponentSelectorProtocol

Bases: Protocol


              flowchart TD
              gepa_adk.ComponentSelectorProtocol[ComponentSelectorProtocol]

              

              click gepa_adk.ComponentSelectorProtocol href "" "gepa_adk.ComponentSelectorProtocol"
            

Async protocol for component selection strategies.

Note

Adapters implementing this protocol determine which candidate components to update during mutation, enabling flexible evolution strategies.

Examples:

class MySelector:
    async def select_components(
        self, components: list[str], iteration: int, candidate_idx: int
    ) -> list[str]:
        return components[:1]
Source code in src/gepa_adk/ports/component_selector.py
@runtime_checkable
class ComponentSelectorProtocol(Protocol):
    """Async protocol for component selection strategies.

    Note:
        Adapters implementing this protocol determine which candidate components
        to update during mutation, enabling flexible evolution strategies.

    Examples:
        ```python
        class MySelector:
            async def select_components(
                self, components: list[str], iteration: int, candidate_idx: int
            ) -> list[str]:
                return components[:1]
        ```
    """

    async def select_components(
        self, components: list[str], iteration: int, candidate_idx: int
    ) -> list[str]:
        """Select components to update for the current iteration.

        Args:
            components: List of available component keys (e.g. ["instruction", "input_schema"]).
            iteration: Current global iteration number (0-based).
            candidate_idx: Index of the candidate being evolved.

        Returns:
            List of component keys to update.

        Raises:
            ValueError: If components list is empty.

        Note:
            Outputs a list of component keys to update, enabling selective
            mutation of specific candidate components.

        Examples:
            ```python
            selected = await selector.select_components(
                components=["instruction", "schema"], iteration=1, candidate_idx=0
            )
            ```
        """
        ...

select_components async

select_components(
    components: list[str],
    iteration: int,
    candidate_idx: int,
) -> list[str]

Select components to update for the current iteration.

PARAMETER DESCRIPTION
components

List of available component keys (e.g. ["instruction", "input_schema"]).

TYPE: list[str]

iteration

Current global iteration number (0-based).

TYPE: int

candidate_idx

Index of the candidate being evolved.

TYPE: int

RETURNS DESCRIPTION
list[str]

List of component keys to update.

RAISES DESCRIPTION
ValueError

If components list is empty.

Note

Outputs a list of component keys to update, enabling selective mutation of specific candidate components.

Examples:

selected = await selector.select_components(
    components=["instruction", "schema"], iteration=1, candidate_idx=0
)
Source code in src/gepa_adk/ports/component_selector.py
async def select_components(
    self, components: list[str], iteration: int, candidate_idx: int
) -> list[str]:
    """Select components to update for the current iteration.

    Args:
        components: List of available component keys (e.g. ["instruction", "input_schema"]).
        iteration: Current global iteration number (0-based).
        candidate_idx: Index of the candidate being evolved.

    Returns:
        List of component keys to update.

    Raises:
        ValueError: If components list is empty.

    Note:
        Outputs a list of component keys to update, enabling selective
        mutation of specific candidate components.

    Examples:
        ```python
        selected = await selector.select_components(
            components=["instruction", "schema"], iteration=1, candidate_idx=0
        )
        ```
    """
    ...

create_critic

create_critic(
    name: str, *, model: str | None = None
) -> LlmAgent

Create a pre-configured critic agent by preset name.

PARAMETER DESCRIPTION
name

Preset name. Must be a key in _PRESET_INSTRUCTIONS.

TYPE: str

model

Optional model override. When None, ADK uses its default.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
LlmAgent

Configured LlmAgent with CriticOutput schema and preset instruction.

RAISES DESCRIPTION
ConfigurationError

If name is not a valid preset.

Source code in src/gepa_adk/adapters/scoring/critic_scorer.py
def create_critic(name: str, *, model: str | None = None) -> LlmAgent:
    """Create a pre-configured critic agent by preset name.

    Args:
        name: Preset name. Must be a key in ``_PRESET_INSTRUCTIONS``.
        model: Optional model override. When None, ADK uses its default.

    Returns:
        Configured LlmAgent with CriticOutput schema and preset instruction.

    Raises:
        ConfigurationError: If name is not a valid preset.
    """
    if name not in _PRESET_INSTRUCTIONS:
        valid_presets = ", ".join(sorted(_PRESET_INSTRUCTIONS))
        raise ConfigurationError(
            f"Unknown critic preset '{name}'. Valid presets: {valid_presets}",
            constraint=f"Must be one of: {valid_presets}",
            value=name,
            field="name",
        )

    model_kwargs: dict[str, Any] = {}
    if model is not None:
        model_kwargs["model"] = model

    return LlmAgent(
        name=f"{name}_critic",
        instruction=_PRESET_INSTRUCTIONS[name],
        output_schema=CriticOutput,
        **model_kwargs,
    )

normalize_feedback

normalize_feedback(
    score: float, metadata: dict[str, Any] | None
) -> dict[str, Any]

Normalize critic feedback to consistent trial format.

Converts both simple and advanced critic outputs to a standardized format for use in trial records. This enables the reflection agent to receive consistent feedback regardless of which critic schema was used.

PARAMETER DESCRIPTION
score

The numeric score from the critic (0.0-1.0).

TYPE: float

metadata

Optional metadata dict from critic output. May contain: - feedback (str): Simple feedback text - dimension_scores (dict): Per-dimension scores - actionable_guidance (str): Improvement suggestions - Any additional fields from critic output

TYPE: dict[str, Any] | None

RETURNS DESCRIPTION
dict[str, Any]

Normalized feedback dict with structure:

dict[str, Any]

```python

dict[str, Any]

{ "score": 0.75, "feedback_text": "Main feedback message", "dimension_scores": {...}, # Optional "actionable_guidance": "...", # Optional

dict[str, Any]

}

dict[str, Any]

```

Examples:

Normalize simple feedback:

normalized = normalize_feedback(0.8, {"feedback": "Good job"})
# {"score": 0.8, "feedback_text": "Good job"}

Normalize advanced feedback:

normalized = normalize_feedback(
    0.6,
    {
        "feedback": "Needs work",
        "dimension_scores": {"clarity": 0.5},
        "actionable_guidance": "Add examples",
    },
)
# {
#     "score": 0.6,
#     "feedback_text": "Needs work",
#     "dimension_scores": {"clarity": 0.5},
#     "actionable_guidance": "Add examples",
# }

Handle missing feedback:

normalized = normalize_feedback(0.5, None)
# {"score": 0.5, "feedback_text": ""}
Note

Supports both SimpleCriticOutput and CriticOutput schemas for flexible critic integration. Extracts the "feedback" field and renames it to "feedback_text" for consistent trial structure. Additional fields like dimension_scores are preserved when present.

Source code in src/gepa_adk/adapters/scoring/critic_scorer.py
def normalize_feedback(
    score: float,
    metadata: dict[str, Any] | None,
) -> dict[str, Any]:
    """Normalize critic feedback to consistent trial format.

    Converts both simple and advanced critic outputs to a standardized
    format for use in trial records. This enables the reflection agent
    to receive consistent feedback regardless of which critic schema
    was used.

    Args:
        score: The numeric score from the critic (0.0-1.0).
        metadata: Optional metadata dict from critic output. May contain:
            - feedback (str): Simple feedback text
            - dimension_scores (dict): Per-dimension scores
            - actionable_guidance (str): Improvement suggestions
            - Any additional fields from critic output

    Returns:
        Normalized feedback dict with structure:
        ```python
        {
            "score": 0.75,
            "feedback_text": "Main feedback message",
            "dimension_scores": {...},  # Optional
            "actionable_guidance": "...",  # Optional
        }
        ```

    Examples:
        Normalize simple feedback:

        ```python
        normalized = normalize_feedback(0.8, {"feedback": "Good job"})
        # {"score": 0.8, "feedback_text": "Good job"}
        ```

        Normalize advanced feedback:

        ```python
        normalized = normalize_feedback(
            0.6,
            {
                "feedback": "Needs work",
                "dimension_scores": {"clarity": 0.5},
                "actionable_guidance": "Add examples",
            },
        )
        # {
        #     "score": 0.6,
        #     "feedback_text": "Needs work",
        #     "dimension_scores": {"clarity": 0.5},
        #     "actionable_guidance": "Add examples",
        # }
        ```

        Handle missing feedback:

        ```python
        normalized = normalize_feedback(0.5, None)
        # {"score": 0.5, "feedback_text": ""}
        ```

    Note:
        Supports both SimpleCriticOutput and CriticOutput schemas for flexible
        critic integration. Extracts the "feedback" field and renames it to
        "feedback_text" for consistent trial structure. Additional fields
        like dimension_scores are preserved when present.
    """
    result: dict[str, Any] = {"score": score}

    if metadata is None:
        result["feedback_text"] = ""
        return result

    # Extract feedback text - handle both "feedback" and "feedback_text" keys
    feedback_text = metadata.get("feedback_text") or metadata.get("feedback") or ""
    if isinstance(feedback_text, str) and feedback_text.strip():
        result["feedback_text"] = feedback_text.strip()
    else:
        result["feedback_text"] = ""

    # Preserve dimension_scores if present
    dimension_scores = metadata.get("dimension_scores")
    if dimension_scores and isinstance(dimension_scores, dict):
        result["dimension_scores"] = dimension_scores

    # Preserve actionable_guidance if present
    actionable_guidance = metadata.get("actionable_guidance")
    if actionable_guidance and isinstance(actionable_guidance, str):
        guidance_str = actionable_guidance.strip()
        if guidance_str:
            result["actionable_guidance"] = guidance_str

    return result

create_component_selector

create_component_selector(
    selector_type: str,
) -> ComponentSelectorProtocol

Create a component selector strategy from a string alias.

PARAMETER DESCRIPTION
selector_type

Name of the selector strategy. Supported values: - 'round_robin', 'roundrobin': Round-robin cycling. - 'all', 'all_components': All components simultaneously.

TYPE: str

RETURNS DESCRIPTION
ComponentSelectorProtocol

Instance of requested component selector.

RAISES DESCRIPTION
ValueError

If selector_type is unknown.

Examples:

# Create round-robin selector
selector = create_component_selector("round_robin")

# Create all-components selector
selector = create_component_selector("all")
Note

Supports flexible string aliases with normalization for common variations (underscores, hyphens, case-insensitive).

Source code in src/gepa_adk/adapters/selection/component_selector.py
def create_component_selector(selector_type: str) -> ComponentSelectorProtocol:
    """Create a component selector strategy from a string alias.

    Args:
        selector_type: Name of the selector strategy.
            Supported values:
            - 'round_robin', 'roundrobin': Round-robin cycling.
            - 'all', 'all_components': All components simultaneously.

    Returns:
        Instance of requested component selector.

    Raises:
        ValueError: If selector_type is unknown.

    Examples:
        ```python
        # Create round-robin selector
        selector = create_component_selector("round_robin")

        # Create all-components selector
        selector = create_component_selector("all")
        ```

    Note:
        Supports flexible string aliases with normalization for common
        variations (underscores, hyphens, case-insensitive).
    """
    normalized = selector_type.lower().replace("_", "").replace("-", "")

    if normalized == "roundrobin":
        return RoundRobinComponentSelector()
    elif normalized in ("all", "allcomponents"):
        return AllComponentSelector()

    raise ValueError(f"Unknown component selector: {selector_type}")

evolve async

evolve(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    *,
    valset: list[dict[str, Any]] | None = None,
    critic: LlmAgent | None = None,
    reflection_agent: LlmAgent | None = None,
    config: EvolutionConfig | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    state_guard: StateGuard | None = None,
    candidate_selector: CandidateSelectorProtocol
    | str
    | None = None,
    component_selector: ComponentSelectorProtocol
    | str
    | None = None,
    executor: AgentExecutorProtocol | None = None,
    components: list[str] | None = None,
    schema_constraints: SchemaConstraints | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> EvolutionResult

Evolve an ADK agent's instruction.

Optimizes the instruction for a single ADK agent using evolutionary optimization. The agent's instruction is iteratively improved based on performance on the training set.

PARAMETER DESCRIPTION
agent

The ADK LlmAgent to evolve.

TYPE: LlmAgent

trainset

Training examples [{"input": "...", "expected": "..."}].

TYPE: list[dict[str, Any]]

PARAMETER DESCRIPTION
valset

Optional validation examples used for scoring and acceptance. Defaults to the trainset when omitted.

TYPE: list[dict[str, Any]] | None

critic

Optional ADK agent for scoring (uses schema scoring if None).

TYPE: LlmAgent | None

reflection_agent

Optional ADK agent for proposals. If None, creates a default reflection agent using config.reflection_model.

TYPE: LlmAgent | None

config

Evolution configuration (uses defaults if None).

TYPE: EvolutionConfig | None

trajectory_config

Trajectory capture settings (uses defaults if None).

TYPE: TrajectoryConfig | None

state_guard

Optional state token preservation settings.

TYPE: StateGuard | None

candidate_selector

Optional selector instance or selector name.

TYPE: CandidateSelectorProtocol | str | None

component_selector

Optional selector instance or selector name for choosing which components to update.

TYPE: ComponentSelectorProtocol | str | None

executor

Optional AgentExecutorProtocol implementation for unified agent execution. When provided, both the ADKAdapter and CriticScorer use this executor for consistent session management and execution. If None, creates an AgentExecutor automatically.

TYPE: AgentExecutorProtocol | None

components

List of component names to include in evolution. Supported: - "instruction": The agent's instruction text (default if None). - "output_schema": The agent's Pydantic output_schema (serialized). When None, defaults to ["instruction"]. Use ["output_schema"] with a schema reflection agent to evolve the output schema.

TYPE: list[str] | None

schema_constraints

Optional SchemaConstraints for output_schema evolution. When provided, proposed schema mutations are validated against these constraints. Mutations that violate constraints (e.g., remove required fields) are rejected and the original schema is preserved.

TYPE: SchemaConstraints | None

app

Optional ADK App instance. When provided, evolution uses the app's configuration. Note that App does not hold services directly; pass a Runner for service extraction, or combine with session_service param. See the App/Runner integration guide for details.

TYPE: App | None

runner

Optional ADK Runner instance. When provided, evolution extracts and uses the runner's session_service for all agent executions (evolved agents, critic, and reflection agent). Takes precedence over both app and executor parameters. This enables seamless integration with existing ADK infrastructure.

TYPE: Runner | None

RETURNS DESCRIPTION
EvolutionResult

EvolutionResult with evolved_components dict and metrics.

RAISES DESCRIPTION
ConfigurationError

If invalid parameters provided, including pre-flight validation failures: non-LlmAgent agent or critic, empty trainset, duplicate or empty component names, missing critic and output_schema, or EvolutionConfig consistency errors.

EvolutionError

If evolution fails during execution.

Note

Pre-flight validation runs synchronously before any LLM calls. Single-agent evolution with trainset reflection and valset scoring.

For reproducible evolution, pass a seeded config: config=EvolutionConfig(seed=42).

Examples:

Basic usage with output_schema:

from pydantic import BaseModel, Field
from google.adk.agents import LlmAgent
from gepa_adk import evolve


class OutputSchema(BaseModel):
    answer: str
    score: float = Field(ge=0.0, le=1.0)


agent = LlmAgent(
    name="assistant",
    model="gemini-2.5-flash",
    instruction="You are a helpful assistant.",
    output_schema=OutputSchema,
)

trainset = [
    {"input": "What is 2+2?", "expected": "4"},
    {"input": "What is the capital of France?", "expected": "Paris"},
]

result = await evolve(agent, trainset)
print(f"Evolved: {result.evolved_components['instruction']}")

With critic agent:

from pydantic import BaseModel, Field
from google.adk.agents import LlmAgent
from gepa_adk import evolve


class CriticOutput(BaseModel):
    score: float = Field(ge=0.0, le=1.0)


critic = LlmAgent(
    name="critic",
    model="gemini-2.5-flash",
    instruction="Score the response quality.",
    output_schema=CriticOutput,
)

result = await evolve(agent, trainset, critic=critic)

Evolving output_schema with schema reflection:

from gepa_adk.adapters.agents.reflection_agents import (
    create_schema_reflection_agent,
)

# Create schema reflection agent with validation tool
schema_reflector = create_schema_reflection_agent("gemini-2.5-flash")

# Evolve output_schema component
result = await evolve(
    agent,
    trainset,
    critic=critic,
    reflection_agent=schema_reflector,
    components=["output_schema"],  # Evolve schema, not instruction
)
print(f"Evolved schema: {result.evolved_components['output_schema']}")

Using App/Runner for existing infrastructure integration:

from google.adk.apps.app import App
from google.adk.runners import Runner
from google.adk.sessions import DatabaseSessionService

# Configure Runner with your production session service
session_service = DatabaseSessionService(connection_string="...")
runner = Runner(
    app_name="my_app",
    agent=agent,
    session_service=session_service,
)

# Evolution uses your Runner's session_service for all operations
result = await evolve(
    agent,
    trainset,
    runner=runner,  # Services extracted from runner
)
Source code in src/gepa_adk/api.py
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
async def evolve(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    *,
    valset: list[dict[str, Any]] | None = None,
    critic: LlmAgent | None = None,
    reflection_agent: LlmAgent | None = None,
    config: EvolutionConfig | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    state_guard: StateGuard | None = None,
    candidate_selector: CandidateSelectorProtocol | str | None = None,
    component_selector: ComponentSelectorProtocol | str | None = None,
    executor: AgentExecutorProtocol | None = None,
    components: list[str] | None = None,
    schema_constraints: SchemaConstraints | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> EvolutionResult:
    """Evolve an ADK agent's instruction.

    Optimizes the instruction for a single ADK agent using evolutionary
    optimization. The agent's instruction is iteratively improved based on
    performance on the training set.

    Args:
        agent: The ADK LlmAgent to evolve.
        trainset: Training examples [{"input": "...", "expected": "..."}].

    Keyword Args:
        valset: Optional validation examples used for scoring and acceptance.
            Defaults to the trainset when omitted.
        critic: Optional ADK agent for scoring (uses schema scoring if None).
        reflection_agent: Optional ADK agent for proposals. If None, creates a
            default reflection agent using config.reflection_model.
        config: Evolution configuration (uses defaults if None).
        trajectory_config: Trajectory capture settings (uses defaults if None).
        state_guard: Optional state token preservation settings.
        candidate_selector: Optional selector instance or selector name.
        component_selector: Optional selector instance or selector name for
            choosing which components to update.
        executor: Optional AgentExecutorProtocol implementation for unified
            agent execution. When provided, both the ADKAdapter and CriticScorer
            use this executor for consistent session management and execution.
            If None, creates an AgentExecutor automatically.
        components: List of component names to include in evolution. Supported:
            - "instruction": The agent's instruction text (default if None).
            - "output_schema": The agent's Pydantic output_schema (serialized).
            When None, defaults to ["instruction"]. Use ["output_schema"] with
            a schema reflection agent to evolve the output schema.
        schema_constraints: Optional SchemaConstraints for output_schema evolution.
            When provided, proposed schema mutations are validated against these
            constraints. Mutations that violate constraints (e.g., remove required
            fields) are rejected and the original schema is preserved.
        app: Optional ADK App instance. When provided, evolution uses the app's
            configuration. Note that App does not hold services directly; pass
            a Runner for service extraction, or combine with session_service param.
            See the App/Runner integration guide for details.
        runner: Optional ADK Runner instance. When provided, evolution extracts
            and uses the runner's session_service for all agent executions
            (evolved agents, critic, and reflection agent). Takes precedence
            over both app and executor parameters. This enables seamless
            integration with existing ADK infrastructure.

    Returns:
        EvolutionResult with evolved_components dict and metrics.

    Raises:
        ConfigurationError: If invalid parameters provided, including
            pre-flight validation failures: non-LlmAgent agent or critic,
            empty trainset, duplicate or empty component names, missing
            critic and output_schema, or EvolutionConfig consistency errors.
        EvolutionError: If evolution fails during execution.

    Note:
        Pre-flight validation runs synchronously before any LLM calls.
        Single-agent evolution with trainset reflection and valset scoring.

        For reproducible evolution, pass a seeded config:
        ``config=EvolutionConfig(seed=42)``.

    Examples:
        Basic usage with output_schema:

        ```python
        from pydantic import BaseModel, Field
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve


        class OutputSchema(BaseModel):
            answer: str
            score: float = Field(ge=0.0, le=1.0)


        agent = LlmAgent(
            name="assistant",
            model="gemini-2.5-flash",
            instruction="You are a helpful assistant.",
            output_schema=OutputSchema,
        )

        trainset = [
            {"input": "What is 2+2?", "expected": "4"},
            {"input": "What is the capital of France?", "expected": "Paris"},
        ]

        result = await evolve(agent, trainset)
        print(f"Evolved: {result.evolved_components['instruction']}")
        ```

        With critic agent:

        ```python
        from pydantic import BaseModel, Field
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve


        class CriticOutput(BaseModel):
            score: float = Field(ge=0.0, le=1.0)


        critic = LlmAgent(
            name="critic",
            model="gemini-2.5-flash",
            instruction="Score the response quality.",
            output_schema=CriticOutput,
        )

        result = await evolve(agent, trainset, critic=critic)
        ```

        Evolving output_schema with schema reflection:

        ```python
        from gepa_adk.adapters.agents.reflection_agents import (
            create_schema_reflection_agent,
        )

        # Create schema reflection agent with validation tool
        schema_reflector = create_schema_reflection_agent("gemini-2.5-flash")

        # Evolve output_schema component
        result = await evolve(
            agent,
            trainset,
            critic=critic,
            reflection_agent=schema_reflector,
            components=["output_schema"],  # Evolve schema, not instruction
        )
        print(f"Evolved schema: {result.evolved_components['output_schema']}")
        ```

        Using App/Runner for existing infrastructure integration:

        ```python
        from google.adk.apps.app import App
        from google.adk.runners import Runner
        from google.adk.sessions import DatabaseSessionService

        # Configure Runner with your production session service
        session_service = DatabaseSessionService(connection_string="...")
        runner = Runner(
            app_name="my_app",
            agent=agent,
            session_service=session_service,
        )

        # Evolution uses your Runner's session_service for all operations
        result = await evolve(
            agent,
            trainset,
            runner=runner,  # Services extracted from runner
        )
        ```
    """
    # Pre-flight validation (Story 2.5)
    _pre_flight_validate_evolve(agent, trainset, critic, components)
    required_keys = (
        set(trainset[0].keys()) if trainset and len(trainset) > 0 else {"input"}
    )

    resolved_valset = valset if valset is not None else trainset
    if valset is not None:
        _validate_dataset(
            valset,
            "valset",
            allow_empty=False,
            required_keys=required_keys,
        )

    # Log reflection_agent configuration if provided
    if reflection_agent is not None:
        logger.debug(
            "evolve.reflection_agent.configured",
            agent_name=agent.name,
            reflection_agent_name=reflection_agent.name,
            message="Using ADK reflection agent for instruction improvement",
        )

    # Capture original instruction for StateGuard validation
    original_instruction = str(agent.instruction)

    candidate_selector_label = (
        candidate_selector
        if isinstance(candidate_selector, str)
        else type(candidate_selector).__name__
        if candidate_selector is not None
        else None
    )

    component_selector_label = (
        component_selector
        if isinstance(component_selector, str)
        else type(component_selector).__name__
        if component_selector is not None
        else None
    )

    # Log evolution start
    logger.info(
        "evolve.start",
        agent_name=agent.name,
        trainset_size=len(trainset),
        valset_size=len(resolved_valset),
        valset_defaulted=valset is None,
        has_critic=critic is not None,
        has_reflection_agent=reflection_agent is not None,
        has_state_guard=state_guard is not None,
        candidate_selector=candidate_selector_label,
        component_selector=component_selector_label,
    )

    # Resolve services from runner/app with precedence warnings (#227)
    # Log precedence warnings if multiple config sources provided (T009)
    if runner is not None and app is not None:
        logger.warning(
            "evolve.precedence.runner_over_app",
            message="Both runner and app provided; using runner (runner takes precedence)",
            runner_app_name=runner.app_name,
            app_name=app.name,
        )
    if runner is not None and executor is not None:
        logger.warning(
            "evolve.precedence.runner_over_executor",
            message="Both runner and executor provided; using runner's session_service",
            runner_app_name=runner.app_name,
        )

    # Extract services using precedence rules (T006)
    resolved_session_service, _artifact_service = _resolve_evolution_services(
        runner=runner,
        app=app,
        session_service=None,  # evolve() doesn't have direct session_service param
    )

    # Resolve app_name for session isolation (#239)
    resolved_app_name = _resolve_app_name(runner=runner, app=app)

    # Create executor with resolved session_service (T007)
    # Runner takes precedence over user-provided executor
    if runner is not None:
        resolved_executor = AgentExecutor(
            session_service=resolved_session_service,
            app_name=resolved_app_name,
        )
    else:
        resolved_executor = executor or AgentExecutor(
            session_service=resolved_session_service,
            app_name=resolved_app_name,
        )

    # Build scorer
    scorer: Scorer
    if critic:
        scorer = CriticScorer(critic_agent=critic, executor=resolved_executor)
    elif hasattr(agent, "output_schema") and agent.output_schema is not None:
        # Use schema-based scorer when agent has output_schema
        scorer = SchemaBasedScorer(output_schema=agent.output_schema)
    else:
        raise ConfigurationError(
            "Either critic must be provided or agent must have output_schema",
            field="critic",
            value=None,
            constraint="must provide critic or agent.output_schema",
        )

    # Resolve config
    resolved_config = config or EvolutionConfig()
    rng = (
        random.Random(resolved_config.seed)
        if resolved_config.seed is not None
        else None
    )

    # Create reflection agent if not provided
    resolved_reflection_agent = reflection_agent
    if resolved_reflection_agent is None:
        # Create default reflection agent with config settings
        resolved_reflection_agent = LlmAgent(
            name="reflection_agent",
            model=_resolve_model_for_agent(resolved_config.reflection_model),
            instruction=resolved_config.reflection_prompt or REFLECTION_INSTRUCTION,
        )
        logger.debug(
            "evolve.reflection_agent.default",
            reflection_model=resolved_config.reflection_model,
        )

    # Build proposer chain in the composition root (api.py)
    adk_reflection_fn = create_adk_reflection_fn(
        resolved_reflection_agent,
        executor=resolved_executor,
    )
    proposer = AsyncReflectiveMutationProposer(adk_reflection_fn=adk_reflection_fn)

    # Create adapter with resolved session_service (T008)
    adapter = ADKAdapter(
        agent=agent,
        scorer=scorer,
        trajectory_config=trajectory_config,
        proposer=proposer,
        executor=resolved_executor,
        schema_constraints=schema_constraints,
        session_service=resolved_session_service,
    )

    # Build initial candidate components based on requested components
    resolved_components = components if components else [DEFAULT_COMPONENT_NAME]
    initial_components: dict[str, str] = {}
    original_component_values: dict[str, str] = {}

    for comp_name in resolved_components:
        if comp_name == DEFAULT_COMPONENT_NAME:
            initial_components[comp_name] = original_instruction
            original_component_values[comp_name] = original_instruction
        elif comp_name == COMPONENT_OUTPUT_SCHEMA:
            if not hasattr(agent, "output_schema") or agent.output_schema is None:
                raise ConfigurationError(
                    f"Cannot evolve '{COMPONENT_OUTPUT_SCHEMA}': agent has no output_schema",
                    field="components",
                    value=comp_name,
                    constraint="agent must have output_schema to evolve it",
                )
            schema_text = serialize_pydantic_schema(agent.output_schema)
            initial_components[comp_name] = schema_text
            original_component_values[comp_name] = schema_text
        else:
            raise ConfigurationError(
                f"Unknown component: '{comp_name}'. Supported: "
                f"'{DEFAULT_COMPONENT_NAME}', '{COMPONENT_OUTPUT_SCHEMA}'",
                field="components",
                value=comp_name,
                constraint="must be a supported component name",
            )

    logger.debug(
        "evolve.components.resolved",
        agent_name=agent.name,
        components=resolved_components,
    )

    initial_candidate = Candidate(components=initial_components)

    # Create engine
    resolved_candidate_selector: CandidateSelectorProtocol | None = None
    if candidate_selector is not None:
        if isinstance(candidate_selector, str):
            resolved_candidate_selector = create_candidate_selector(
                candidate_selector, rng=rng
            )
        else:
            resolved_candidate_selector = candidate_selector

    resolved_component_selector: ComponentSelectorProtocol | None = None
    if component_selector is not None:
        if isinstance(component_selector, str):
            resolved_component_selector = create_component_selector(component_selector)
        else:
            resolved_component_selector = component_selector

    engine = AsyncGEPAEngine(
        adapter=adapter,
        config=resolved_config,
        initial_candidate=initial_candidate,
        batch=trainset,
        valset=resolved_valset,
        candidate_selector=resolved_candidate_selector,
        component_selector=resolved_component_selector,
        rng=rng,
    )

    # Run evolution with cleanup
    try:
        result = await engine.run()

        valset_score = result.valset_score
        trainset_score = result.trainset_score

        if trainset_score is not None:
            logger.info(
                "evolve.trainset.scored",
                agent_name=agent.name,
                trainset_size=len(trainset),
                trainset_score=trainset_score,
            )
        if valset_score is not None:
            logger.info(
                "evolve.valset.scored",
                agent_name=agent.name,
                valset_size=len(resolved_valset),
                valset_score=valset_score,
                valset_defaulted=valset is None,
            )

        # Apply state guard validation if provided (for token preservation)
        # Only applies to text components (instruction), not to output_schema
        validated_components = dict(result.evolved_components)
        for comp_name in resolved_components:
            if comp_name in result.evolved_components:
                if comp_name == DEFAULT_COMPONENT_NAME and state_guard is not None:
                    validated_components[comp_name] = _apply_state_guard_validation(
                        state_guard=state_guard,
                        original_component_text=original_component_values[comp_name],
                        evolved_component_text=result.evolved_components[comp_name],
                        agent_name=agent.name,
                    )
                else:
                    validated_components[comp_name] = result.evolved_components[
                        comp_name
                    ]

        # Log evolution completion
        logger.info(
            "evolve.complete",
            agent_name=agent.name,
            original_score=result.original_score,
            final_score=result.final_score,
            improvement=result.improvement,
            total_iterations=result.total_iterations,
            valset_score=valset_score,
            trainset_score=trainset_score,
            components=resolved_components,
        )

        # Return result with validated evolved_components and valset_score
        # (creates new instance since frozen)
        return EvolutionResult(
            original_score=result.original_score,
            final_score=result.final_score,
            evolved_components=validated_components,
            iteration_history=result.iteration_history,
            total_iterations=result.total_iterations,
            valset_score=valset_score,
            trainset_score=trainset_score,
        )
    finally:
        # Clean up adapter resources (clears handler constraints)
        adapter.cleanup()

evolve_group async

evolve_group(
    agents: dict[str, LlmAgent],
    primary: str,
    trainset: list[dict[str, Any]],
    *,
    components: dict[str, list[str]] | None = None,
    critic: LlmAgent | None = None,
    share_session: bool = True,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol
    | str
    | None = None,
    reflection_agent: LlmAgent | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    workflow: SequentialAgent
    | LoopAgent
    | ParallelAgent
    | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult

Evolve multiple agents together with per-agent component configuration.

Optimizes specified components for each agent by targeting the primary agent's output score. When share_session=True, agents execute sequentially with shared session state, enabling later agents to access earlier agents' outputs via template strings.

PARAMETER DESCRIPTION
agents

Named ADK agents to evolve together as dict mapping agent names to LlmAgent instances. Must have at least one agent.

TYPE: dict[str, LlmAgent]

primary

Name of the agent whose output is used for scoring. Must match one of the agent names in the dict.

TYPE: str

trainset

Training examples for evaluation. Each example should have an "input" key and optionally an "expected" key.

TYPE: list[dict[str, Any]]

PARAMETER DESCRIPTION
components

Per-agent component configuration mapping agent names to lists of component names to evolve. If None, defaults to evolving "instruction" for all agents. Use empty list to exclude an agent from evolution. Available component names: "instruction", "output_schema", "generate_content_config".

TYPE: dict[str, list[str]] | None

critic

Optional critic agent for scoring. If None, the primary agent must have an output_schema for schema-based scoring.

TYPE: LlmAgent | None

share_session

Whether agents share session state during execution. When True (default), uses SequentialAgent. When False, agents execute with isolated sessions.

TYPE: bool

config

Evolution configuration. If None, uses EvolutionConfig defaults.

TYPE: EvolutionConfig | None

state_guard

Optional StateGuard instance for validating and repairing state injection tokens in evolved instructions.

TYPE: StateGuard | None

component_selector

Optional selector instance or selector name for choosing which components to update.

TYPE: ComponentSelectorProtocol | str | None

reflection_agent

Optional ADK agent for proposals. If None, creates a default reflection agent using config.reflection_model.

TYPE: LlmAgent | None

trajectory_config

Trajectory capture settings (uses defaults if None).

TYPE: TrajectoryConfig | None

workflow

Optional original workflow structure to preserve during evaluation. When provided, LoopAgent iterations and ParallelAgent concurrency are preserved instead of flattening to SequentialAgent. Used internally by evolve_workflow(); not typically set directly.

TYPE: SequentialAgent | LoopAgent | ParallelAgent | None

session_service

Optional ADK session service for state management. If None (default), creates an InMemorySessionService internally. Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService) to persist sessions alongside other agent executions in a shared database.

TYPE: BaseSessionService | None

app

Optional ADK App instance. When provided, evolution uses the app's configuration. Note that App does not hold services directly; pass a Runner for service extraction, or combine with session_service param.

TYPE: App | None

runner

Optional ADK Runner instance. When provided, evolution extracts and uses the runner's session_service for all agent executions (evolved agents, critic, and reflection agent). Takes precedence over both app and session_service parameters. This enables seamless integration with existing ADK infrastructure.

TYPE: Runner | None

RETURNS DESCRIPTION
MultiAgentEvolutionResult

MultiAgentEvolutionResult containing evolved_components dict

MultiAgentEvolutionResult

mapping qualified component names (agent.component format) to their

MultiAgentEvolutionResult

optimized values, along with score metrics, iteration history,

MultiAgentEvolutionResult

original_components (filtered to the qualified keyspace),

MultiAgentEvolutionResult

schema_version, and stop_reason propagated from the engine result.

RAISES DESCRIPTION
ConfigurationError

If pre-flight validation fails: invalid agent names, non-LlmAgent critic, empty trainset, duplicate or empty component names per agent, or EvolutionConfig consistency errors.

MultiAgentValidationError

If agents dict is empty, primary agent not found, or no scorer and primary lacks output_schema.

ValueError

If components mapping contains unknown agents, unknown component handlers, or is missing entries for agents.

EvolutionError

If evolution fails during execution.

Examples:

Basic usage with per-agent components (API v0.3.x):

from google.adk.agents import LlmAgent
from gepa_adk import evolve_group

generator = LlmAgent(
    name="generator",
    model="gemini-2.5-flash",
    instruction="Generate code based on the requirement.",
)
critic = LlmAgent(
    name="critic",
    model="gemini-2.5-flash",
    instruction="Review the code in {generator_output}.",
)
validator = LlmAgent(
    name="validator",
    model="gemini-2.5-flash",
    instruction="Validate the reviewed code.",
    output_schema=ValidationResult,
)

result = await evolve_group(
    agents={
        "generator": generator,
        "critic": critic,
        "validator": validator,
    },
    primary="validator",
    trainset=training_data,
    components={
        "generator": ["instruction", "output_schema"],
        "critic": ["instruction"],
        "validator": ["instruction"],
    },
)

# Access evolved components using qualified names
print(result.evolved_components["generator.instruction"])
print(result.evolved_components["critic.instruction"])
print(result.evolved_components["validator.instruction"])

Exclude an agent from evolution:

result = await evolve_group(
    agents={"generator": gen, "static_validator": val},
    primary="generator",
    trainset=training_data,
    components={
        "generator": ["instruction"],
        "static_validator": [],  # Excluded from evolution
    },
)

Using custom session service for persistence:

from google.adk.sessions import SqliteSessionService

# Use SQLite for session persistence
session_service = SqliteSessionService(db_path="evolution_sessions.db")

result = await evolve_group(
    agents={"generator": gen, "critic": critic},
    primary="critic",
    trainset=training_data,
    session_service=session_service,  # Sessions persisted to SQLite
)

Using App/Runner for existing infrastructure integration:

from google.adk.runners import Runner
from google.adk.sessions import DatabaseSessionService

# Configure Runner with your production session service
runner = Runner(
    app_name="my_app",
    agent=generator,  # Any agent from the group
    session_service=DatabaseSessionService(connection_string="..."),
)

# Evolution uses Runner's session_service for all operations
result = await evolve_group(
    agents={"generator": gen, "refiner": ref},
    primary="refiner",
    trainset=training_data,
    runner=runner,  # Services extracted from runner
)
Note

Breaking change in v0.3.x: The agents parameter changed from list[LlmAgent] to dict[str, LlmAgent]. Candidate keys now use qualified names (agent.component) instead of {agent_name}_instruction.

For reproducible evolution, pass a seeded config: config=EvolutionConfig(seed=42).

Source code in src/gepa_adk/api.py
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
async def evolve_group(
    agents: dict[str, LlmAgent],
    primary: str,
    trainset: list[dict[str, Any]],
    *,
    components: dict[str, list[str]] | None = None,
    critic: LlmAgent | None = None,
    share_session: bool = True,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol | str | None = None,
    reflection_agent: LlmAgent | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    workflow: SequentialAgent | LoopAgent | ParallelAgent | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult:
    """Evolve multiple agents together with per-agent component configuration.

    Optimizes specified components for each agent by targeting the primary
    agent's output score. When share_session=True, agents execute sequentially
    with shared session state, enabling later agents to access earlier
    agents' outputs via template strings.

    Args:
        agents: Named ADK agents to evolve together as dict mapping agent
            names to LlmAgent instances. Must have at least one agent.
        primary: Name of the agent whose output is used for scoring.
            Must match one of the agent names in the dict.
        trainset: Training examples for evaluation. Each example should
            have an "input" key and optionally an "expected" key.

    Keyword Args:
        components: Per-agent component configuration mapping agent names
            to lists of component names to evolve. If None, defaults to
            evolving "instruction" for all agents. Use empty list to
            exclude an agent from evolution. Available component names:
            "instruction", "output_schema", "generate_content_config".
        critic: Optional critic agent for scoring. If None, the primary
            agent must have an output_schema for schema-based scoring.
        share_session: Whether agents share session state during
            execution. When True (default), uses SequentialAgent.
            When False, agents execute with isolated sessions.
        config: Evolution configuration. If None, uses EvolutionConfig
            defaults.
        state_guard: Optional StateGuard instance for validating and
            repairing state injection tokens in evolved instructions.
        component_selector: Optional selector instance or selector name for
            choosing which components to update.
        reflection_agent: Optional ADK agent for proposals. If None, creates a
            default reflection agent using config.reflection_model.
        trajectory_config: Trajectory capture settings (uses defaults if None).
        workflow: Optional original workflow structure to preserve during
            evaluation. When provided, LoopAgent iterations and ParallelAgent
            concurrency are preserved instead of flattening to SequentialAgent.
            Used internally by evolve_workflow(); not typically set directly.
        session_service: Optional ADK session service for state management.
            If None (default), creates an InMemorySessionService internally.
            Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService)
            to persist sessions alongside other agent executions in a shared database.
        app: Optional ADK App instance. When provided, evolution uses the app's
            configuration. Note that App does not hold services directly; pass
            a Runner for service extraction, or combine with session_service param.
        runner: Optional ADK Runner instance. When provided, evolution extracts
            and uses the runner's session_service for all agent executions
            (evolved agents, critic, and reflection agent). Takes precedence
            over both app and session_service parameters. This enables seamless
            integration with existing ADK infrastructure.

    Returns:
        MultiAgentEvolutionResult containing evolved_components dict
        mapping qualified component names (agent.component format) to their
        optimized values, along with score metrics, iteration history,
        original_components (filtered to the qualified keyspace),
        schema_version, and stop_reason propagated from the engine result.

    Raises:
        ConfigurationError: If pre-flight validation fails: invalid agent
            names, non-LlmAgent critic, empty trainset, duplicate or empty
            component names per agent, or EvolutionConfig consistency errors.
        MultiAgentValidationError: If agents dict is empty, primary agent
            not found, or no scorer and primary lacks output_schema.
        ValueError: If components mapping contains unknown agents, unknown
            component handlers, or is missing entries for agents.
        EvolutionError: If evolution fails during execution.

    Examples:
        Basic usage with per-agent components (API v0.3.x):

        ```python
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve_group

        generator = LlmAgent(
            name="generator",
            model="gemini-2.5-flash",
            instruction="Generate code based on the requirement.",
        )
        critic = LlmAgent(
            name="critic",
            model="gemini-2.5-flash",
            instruction="Review the code in {generator_output}.",
        )
        validator = LlmAgent(
            name="validator",
            model="gemini-2.5-flash",
            instruction="Validate the reviewed code.",
            output_schema=ValidationResult,
        )

        result = await evolve_group(
            agents={
                "generator": generator,
                "critic": critic,
                "validator": validator,
            },
            primary="validator",
            trainset=training_data,
            components={
                "generator": ["instruction", "output_schema"],
                "critic": ["instruction"],
                "validator": ["instruction"],
            },
        )

        # Access evolved components using qualified names
        print(result.evolved_components["generator.instruction"])
        print(result.evolved_components["critic.instruction"])
        print(result.evolved_components["validator.instruction"])
        ```

        Exclude an agent from evolution:

        ```python
        result = await evolve_group(
            agents={"generator": gen, "static_validator": val},
            primary="generator",
            trainset=training_data,
            components={
                "generator": ["instruction"],
                "static_validator": [],  # Excluded from evolution
            },
        )
        ```

        Using custom session service for persistence:

        ```python
        from google.adk.sessions import SqliteSessionService

        # Use SQLite for session persistence
        session_service = SqliteSessionService(db_path="evolution_sessions.db")

        result = await evolve_group(
            agents={"generator": gen, "critic": critic},
            primary="critic",
            trainset=training_data,
            session_service=session_service,  # Sessions persisted to SQLite
        )
        ```

        Using App/Runner for existing infrastructure integration:

        ```python
        from google.adk.runners import Runner
        from google.adk.sessions import DatabaseSessionService

        # Configure Runner with your production session service
        runner = Runner(
            app_name="my_app",
            agent=generator,  # Any agent from the group
            session_service=DatabaseSessionService(connection_string="..."),
        )

        # Evolution uses Runner's session_service for all operations
        result = await evolve_group(
            agents={"generator": gen, "refiner": ref},
            primary="refiner",
            trainset=training_data,
            runner=runner,  # Services extracted from runner
        )
        ```

    Note:
        Breaking change in v0.3.x: The `agents` parameter changed from
        `list[LlmAgent]` to `dict[str, LlmAgent]`. Candidate keys now use
        qualified names (agent.component) instead of {agent_name}_instruction.

        For reproducible evolution, pass a seeded config:
        ``config=EvolutionConfig(seed=42)``.
    """
    # Pre-flight validation (T012a + Story 2.5)
    _pre_flight_validate_group(agents, trainset, critic, components)

    # Default components: evolve "instruction" for all agents
    if components is None:
        components = {name: ["instruction"] for name in agents}

    # Capture original instructions for StateGuard validation
    original_instructions = {
        name: str(agent.instruction) for name, agent in agents.items()
    }

    # Log precedence warnings if multiple config sources provided (#227 T009)
    if runner is not None and app is not None:
        logger.warning(
            "evolve_group.precedence.runner_over_app",
            message="Both runner and app provided; using runner (runner takes precedence)",
            runner_app_name=runner.app_name,
            app_name=app.name,
        )

    # Resolve services using precedence rules: runner > app > session_service > default (#227)
    resolved_session_service, _artifact_service = _resolve_evolution_services(
        runner=runner,
        app=app,
        session_service=session_service,
    )

    # Resolve app_name for session isolation (#239)
    resolved_app_name = _resolve_app_name(runner=runner, app=app)

    # Create unified executor for consistent session management (FR-003)
    executor = AgentExecutor(
        session_service=resolved_session_service,
        app_name=resolved_app_name,
    )

    # Build scorer with executor (FR-005)
    scorer = None
    if critic:
        scorer = CriticScorer(critic_agent=critic, executor=executor)

    # Resolve config for reflection_model
    resolved_config = config or EvolutionConfig()
    rng = (
        random.Random(resolved_config.seed)
        if resolved_config.seed is not None
        else None
    )

    # Create reflection-based proposer with executor (FR-006)
    # Use provided reflection_agent or create a default one
    if reflection_agent is None:
        reflection_agent = LlmAgent(
            name="reflection_agent",
            model=_resolve_model_for_agent(resolved_config.reflection_model),
            instruction=resolved_config.reflection_prompt or REFLECTION_INSTRUCTION,
        )
    adk_reflection_fn = create_adk_reflection_fn(
        reflection_agent,
        executor=executor,
    )
    proposer = AsyncReflectiveMutationProposer(adk_reflection_fn=adk_reflection_fn)

    # Create adapter with executor (FR-004)
    adapter = MultiAgentAdapter(
        agents=agents,
        primary=primary,
        components=components,
        scorer=scorer,
        share_session=share_session,
        session_service=resolved_session_service,
        trajectory_config=trajectory_config,
        proposer=proposer,
        executor=executor,
        workflow=workflow,  # Preserve workflow structure (#215)
    )

    # Build seed candidate using qualified names (agent.component format per ADR-012)
    primary_agent = agents[primary]
    # Extract all configured components using their handlers
    seed_candidate_components: dict[str, str] = {}
    for agent_name, comp_list in components.items():
        agent = agents[agent_name]
        for comp_name in comp_list:
            qualified_name = f"{agent_name}.{comp_name}"
            handler = get_handler(comp_name)
            seed_candidate_components[qualified_name] = handler.serialize(agent)
    # Add required "instruction" key for engine compatibility
    seed_candidate_components["instruction"] = str(primary_agent.instruction)
    initial_candidate = Candidate(components=seed_candidate_components)

    # Create engine
    resolved_component_selector: ComponentSelectorProtocol | None = None
    if component_selector is not None:
        if isinstance(component_selector, str):
            resolved_component_selector = create_component_selector(component_selector)
        else:
            resolved_component_selector = component_selector

    engine = AsyncGEPAEngine(
        adapter=adapter,
        config=resolved_config,
        initial_candidate=initial_candidate,
        batch=trainset,
        component_selector=resolved_component_selector,
        rng=rng,
    )

    # Run evolution
    evolution_result = await engine.run()

    # Extract best candidate components from engine state using qualified names
    # The engine stores evolved_components from the candidate, which now uses
    # qualified names (agent.component format per ADR-012)
    evolved_components = _extract_evolved_components(
        evolution_result=evolution_result,
        seed_components=seed_candidate_components,
        agents=agents,
        components=components,
        primary=primary,
    )

    # Apply StateGuard validation to instruction components only
    if state_guard is not None:
        validated_components = {}
        for qualified_name, evolved_value in evolved_components.items():
            # Only apply StateGuard to instruction components
            if qualified_name.endswith(".instruction"):
                agent_name = qualified_name.rsplit(".", 1)[0]
                original_instruction = original_instructions.get(agent_name, "")
                validated_components[qualified_name] = _apply_state_guard_validation(
                    state_guard=state_guard,
                    original_component_text=original_instruction,
                    evolved_component_text=evolved_value,
                    agent_name=agent_name,
                )
            else:
                # Non-instruction components pass through unchanged
                validated_components[qualified_name] = evolved_value
        evolved_components = validated_components

    # Filter original_components to the qualified keyspace used by
    # evolved_components, stripping engine-internal keys (e.g. bare
    # "instruction" added for engine compatibility).
    orig = evolution_result.original_components
    if orig is not None:
        orig = {k: v for k, v in orig.items() if k in evolved_components}

    # Convert EvolutionResult to MultiAgentEvolutionResult
    return MultiAgentEvolutionResult(
        schema_version=evolution_result.schema_version,
        stop_reason=evolution_result.stop_reason,
        evolved_components=evolved_components,
        original_score=evolution_result.original_score,
        final_score=evolution_result.final_score,
        primary_agent=primary,
        iteration_history=evolution_result.iteration_history,
        total_iterations=evolution_result.total_iterations,
        original_components=orig,
    )

evolve_sync

evolve_sync(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    **kwargs: Any,
) -> EvolutionResult

Synchronous wrapper for evolve().

.. deprecated:: Use run_sync(evolve(agent, trainset, ...)) instead.

Runs the async evolve() function in a blocking manner. Handles nested event loops automatically.

PARAMETER DESCRIPTION
agent

The ADK LlmAgent to evolve.

TYPE: LlmAgent

trainset

Training examples.

TYPE: list[dict[str, Any]]

**kwargs

Optional keyword arguments passed to evolve().

TYPE: Any DEFAULT: {}

PARAMETER DESCRIPTION
valset

Optional validation examples for held-out evaluation.

TYPE: list[dict[str, Any]] | None

critic

Optional ADK agent for scoring.

TYPE: LlmAgent | None

reflection_agent

Optional ADK agent for proposals (not yet implemented).

TYPE: LlmAgent | None

config

EvolutionConfig for customizing evolution parameters.

TYPE: EvolutionConfig | None

trajectory_config

TrajectoryConfig for trace capture settings.

TYPE: TrajectoryConfig | None

state_guard

Optional state token preservation settings.

TYPE: StateGuard | None

candidate_selector

Optional selector instance or selector name.

TYPE: CandidateSelectorProtocol | str | None

executor

Optional unified agent executor for consistent session management across all agent types.

TYPE: AgentExecutorProtocol | None

RETURNS DESCRIPTION
EvolutionResult

EvolutionResult with evolved_components dict and metrics.

RAISES DESCRIPTION
ConfigurationError

If invalid parameters provided.

EvolutionError

If evolution fails during execution.

WARNS DESCRIPTION
DeprecationWarning

Always emitted when called. Use run_sync(evolve(...)) instead.

Examples:

Basic usage in a script:

from pydantic import BaseModel, Field
from google.adk.agents import LlmAgent
from gepa_adk import evolve_sync


class OutputSchema(BaseModel):
    answer: str
    score: float = Field(ge=0.0, le=1.0)


agent = LlmAgent(
    name="assistant",
    model="gemini-2.5-flash",
    instruction="You are a helpful assistant.",
    output_schema=OutputSchema,
)

trainset = [
    {"input": "What is 2+2?", "expected": "4"},
]

result = evolve_sync(agent, trainset)
print(f"Evolved: {result.evolved_components['instruction']}")

With configuration:

from gepa_adk import evolve_sync, EvolutionConfig

config = EvolutionConfig(max_iterations=50)
result = evolve_sync(agent, trainset, config=config)
Note

Deprecated. Use run_sync(evolve(agent, trainset, ...)) instead. run_sync is a universal wrapper that works with all async evolution functions.

Source code in src/gepa_adk/api.py
def evolve_sync(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    **kwargs: Any,
) -> EvolutionResult:
    """Synchronous wrapper for evolve().

    .. deprecated::
        Use ``run_sync(evolve(agent, trainset, ...))`` instead.

    Runs the async evolve() function in a blocking manner.
    Handles nested event loops automatically.

    Args:
        agent: The ADK LlmAgent to evolve.
        trainset: Training examples.
        **kwargs: Optional keyword arguments passed to evolve().

    Other Parameters:
        valset (list[dict[str, Any]] | None): Optional validation examples for
            held-out evaluation.
        critic (LlmAgent | None): Optional ADK agent for scoring.
        reflection_agent (LlmAgent | None): Optional ADK agent for proposals
            (not yet implemented).
        config (EvolutionConfig | None): EvolutionConfig for customizing
            evolution parameters.
        trajectory_config (TrajectoryConfig | None): TrajectoryConfig for trace
            capture settings.
        state_guard (StateGuard | None): Optional state token preservation
            settings.
        candidate_selector (CandidateSelectorProtocol | str | None): Optional
            selector instance or selector name.
        executor (AgentExecutorProtocol | None): Optional unified agent executor
            for consistent session management across all agent types.

    Returns:
        EvolutionResult with evolved_components dict and metrics.

    Raises:
        ConfigurationError: If invalid parameters provided.
        EvolutionError: If evolution fails during execution.

    Warns:
        DeprecationWarning: Always emitted when called. Use
            ``run_sync(evolve(...))`` instead.

    Examples:
        Basic usage in a script:

        ```python
        from pydantic import BaseModel, Field
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve_sync


        class OutputSchema(BaseModel):
            answer: str
            score: float = Field(ge=0.0, le=1.0)


        agent = LlmAgent(
            name="assistant",
            model="gemini-2.5-flash",
            instruction="You are a helpful assistant.",
            output_schema=OutputSchema,
        )

        trainset = [
            {"input": "What is 2+2?", "expected": "4"},
        ]

        result = evolve_sync(agent, trainset)
        print(f"Evolved: {result.evolved_components['instruction']}")
        ```

        With configuration:

        ```python
        from gepa_adk import evolve_sync, EvolutionConfig

        config = EvolutionConfig(max_iterations=50)
        result = evolve_sync(agent, trainset, config=config)
        ```

    Note:
        Deprecated. Use ``run_sync(evolve(agent, trainset, ...))`` instead.
        ``run_sync`` is a universal wrapper that works with all async
        evolution functions.
    """
    import warnings

    warnings.warn(
        "evolve_sync() is deprecated, use run_sync(evolve(...)) instead",
        DeprecationWarning,
        stacklevel=2,
    )
    return run_sync(evolve(agent, trainset, **kwargs))

evolve_workflow async

evolve_workflow(
    workflow: SequentialAgent | LoopAgent | ParallelAgent,
    trainset: list[dict[str, Any]],
    *,
    critic: LlmAgent | None = None,
    primary: str | None = None,
    max_depth: int = 5,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol
    | str
    | None = None,
    round_robin: bool = False,
    components: dict[str, list[str]] | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult

Evolve LlmAgents within a workflow agent structure.

Discovers all LlmAgent instances within a workflow (SequentialAgent, LoopAgent, or ParallelAgent) and evolves them while preserving the workflow structure. Uses shared session state to maintain workflow context during evaluation.

PARAMETER DESCRIPTION
workflow

Workflow agent containing LlmAgents to evolve. Must be SequentialAgent, LoopAgent, or ParallelAgent.

TYPE: SequentialAgent | LoopAgent | ParallelAgent

trainset

Training examples for evaluation. Each example should have an "input" key and optionally an "expected" key.

TYPE: list[dict[str, Any]]

PARAMETER DESCRIPTION
critic

Optional critic agent for scoring. If None, the primary agent must have an output_schema for schema-based scoring.

TYPE: LlmAgent | None

primary

Name of the agent to score. Defaults to the last LlmAgent found in the workflow (for sequential workflows, this is typically the final output producer).

TYPE: str | None

max_depth

Maximum recursion depth for nested workflows (default: 5). Limits how deeply nested workflow structures are traversed.

TYPE: int

config

Evolution configuration. If None, uses EvolutionConfig defaults.

TYPE: EvolutionConfig | None

state_guard

Optional StateGuard instance for validating and repairing state injection tokens in evolved component_text.

TYPE: StateGuard | None

component_selector

Optional selector instance or selector name for choosing which components to update.

TYPE: ComponentSelectorProtocol | str | None

round_robin

If False (default), only the first discovered agent's instruction is evolved across all iterations. If True, all agents' instructions are evolved in round-robin fashion (the engine cycles through agents each iteration). Ignored when components is provided.

TYPE: bool

components

Optional per-agent component configuration mapping agent names to lists of component names to evolve. When provided, takes precedence over round_robin. Use empty list to exclude an agent.

TYPE: dict[str, list[str]] | None

session_service

Optional ADK session service for state management. If None (default), creates an InMemorySessionService internally. Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService) to persist sessions alongside other agent executions in a shared database.

TYPE: BaseSessionService | None

app

Optional ADK App instance. When provided, evolution uses the app's configuration. Note that App does not hold services directly; pass a Runner for service extraction, or combine with session_service param.

TYPE: App | None

runner

Optional ADK Runner instance. When provided, evolution extracts and uses the runner's session_service for all agent executions (evolved agents, critic, and reflection agent). Takes precedence over both app and session_service parameters. This enables seamless integration with existing ADK infrastructure.

TYPE: Runner | None

RETURNS DESCRIPTION
MultiAgentEvolutionResult

MultiAgentEvolutionResult containing evolved_components dict mapping

MultiAgentEvolutionResult

agent names to their optimized component_text, along with score

MultiAgentEvolutionResult

metrics and iteration history.

RAISES DESCRIPTION
ConfigurationError

If pre-flight validation fails: non-LlmAgent critic, empty trainset, duplicate or empty component names, or EvolutionConfig consistency errors.

WorkflowEvolutionError

If workflow contains no LlmAgents.

MultiAgentValidationError

If primary agent not found or no scorer available.

EvolutionError

If evolution fails during execution.

Examples:

Default behavior (evolve first agent only):

from google.adk.agents import LlmAgent, SequentialAgent
from gepa_adk import evolve_workflow

generator = LlmAgent(name="generator", instruction="Generate code")
refiner = LlmAgent(name="refiner", instruction="Refine code")
writer = LlmAgent(name="writer", instruction="Write docs")
pipeline = SequentialAgent(
    name="Pipeline", sub_agents=[generator, refiner, writer]
)

# Only generator.instruction is evolved across all iterations
result = await evolve_workflow(workflow=pipeline, trainset=trainset)

Round-robin evolution (evolve all agents):

# All agents are evolved in round-robin: generator -> refiner -> writer -> ...
result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    round_robin=True,
)

Explicit components override (takes precedence over round_robin):

# Only generator and writer are evolved; refiner is excluded
result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    components={
        "generator": ["instruction"],
        "writer": ["instruction"],
        "refiner": [],  # Excluded
    },
)

Using custom session service for persistence:

from google.adk.sessions import SqliteSessionService

# Persist workflow evolution sessions to SQLite
session_service = SqliteSessionService(db_path="workflow_sessions.db")

result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    session_service=session_service,
)

Using App/Runner for existing infrastructure integration:

from google.adk.runners import Runner
from google.adk.sessions import DatabaseSessionService

# Configure Runner with your production session service
runner = Runner(
    app_name="my_workflow_app",
    agent=pipeline,  # The workflow agent
    session_service=DatabaseSessionService(connection_string="..."),
)

# Evolution uses Runner's session_service for all operations
result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    runner=runner,  # Services extracted from runner
)
Note

Pre-flight validation runs synchronously before any LLM calls. Supports workflow agents (SequentialAgent, LoopAgent, ParallelAgent) with recursive traversal and depth limiting via max_depth parameter. Handles nested structures. LoopAgent and ParallelAgent configurations (max_iterations, etc.) are preserved during evolution. Always uses share_session=True to maintain workflow context (FR-010).

For reproducible evolution, pass a seeded config: config=EvolutionConfig(seed=42).

Source code in src/gepa_adk/api.py
async def evolve_workflow(
    workflow: SequentialAgent | LoopAgent | ParallelAgent,
    trainset: list[dict[str, Any]],
    *,
    critic: LlmAgent | None = None,
    primary: str | None = None,
    max_depth: int = 5,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol | str | None = None,
    round_robin: bool = False,
    components: dict[str, list[str]] | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult:
    """Evolve LlmAgents within a workflow agent structure.

    Discovers all LlmAgent instances within a workflow (SequentialAgent,
    LoopAgent, or ParallelAgent) and evolves them while preserving the
    workflow structure. Uses shared session state to maintain workflow
    context during evaluation.

    Args:
        workflow: Workflow agent containing LlmAgents to evolve. Must be
            SequentialAgent, LoopAgent, or ParallelAgent.
        trainset: Training examples for evaluation. Each example should have
            an "input" key and optionally an "expected" key.

    Keyword Args:
        critic: Optional critic agent for scoring. If None, the primary agent
            must have an output_schema for schema-based scoring.
        primary: Name of the agent to score. Defaults to the last LlmAgent
            found in the workflow (for sequential workflows, this is typically
            the final output producer).
        max_depth: Maximum recursion depth for nested workflows (default: 5).
            Limits how deeply nested workflow structures are traversed.
        config: Evolution configuration. If None, uses EvolutionConfig defaults.
        state_guard: Optional StateGuard instance for validating and
            repairing state injection tokens in evolved component_text.
        component_selector: Optional selector instance or selector name for
            choosing which components to update.
        round_robin: If False (default), only the first discovered agent's
            instruction is evolved across all iterations. If True, all agents'
            instructions are evolved in round-robin fashion (the engine cycles
            through agents each iteration). Ignored when components is provided.
        components: Optional per-agent component configuration mapping agent
            names to lists of component names to evolve. When provided, takes
            precedence over round_robin. Use empty list to exclude an agent.
        session_service: Optional ADK session service for state management.
            If None (default), creates an InMemorySessionService internally.
            Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService)
            to persist sessions alongside other agent executions in a shared database.
        app: Optional ADK App instance. When provided, evolution uses the app's
            configuration. Note that App does not hold services directly; pass
            a Runner for service extraction, or combine with session_service param.
        runner: Optional ADK Runner instance. When provided, evolution extracts
            and uses the runner's session_service for all agent executions
            (evolved agents, critic, and reflection agent). Takes precedence
            over both app and session_service parameters. This enables seamless
            integration with existing ADK infrastructure.

    Returns:
        MultiAgentEvolutionResult containing evolved_components dict mapping
        agent names to their optimized component_text, along with score
        metrics and iteration history.

    Raises:
        ConfigurationError: If pre-flight validation fails: non-LlmAgent
            critic, empty trainset, duplicate or empty component names,
            or EvolutionConfig consistency errors.
        WorkflowEvolutionError: If workflow contains no LlmAgents.
        MultiAgentValidationError: If primary agent not found or no scorer
            available.
        EvolutionError: If evolution fails during execution.

    Examples:
        Default behavior (evolve first agent only):

        ```python
        from google.adk.agents import LlmAgent, SequentialAgent
        from gepa_adk import evolve_workflow

        generator = LlmAgent(name="generator", instruction="Generate code")
        refiner = LlmAgent(name="refiner", instruction="Refine code")
        writer = LlmAgent(name="writer", instruction="Write docs")
        pipeline = SequentialAgent(
            name="Pipeline", sub_agents=[generator, refiner, writer]
        )

        # Only generator.instruction is evolved across all iterations
        result = await evolve_workflow(workflow=pipeline, trainset=trainset)
        ```

        Round-robin evolution (evolve all agents):

        ```python
        # All agents are evolved in round-robin: generator -> refiner -> writer -> ...
        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            round_robin=True,
        )
        ```

        Explicit components override (takes precedence over round_robin):

        ```python
        # Only generator and writer are evolved; refiner is excluded
        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            components={
                "generator": ["instruction"],
                "writer": ["instruction"],
                "refiner": [],  # Excluded
            },
        )
        ```

        Using custom session service for persistence:

        ```python
        from google.adk.sessions import SqliteSessionService

        # Persist workflow evolution sessions to SQLite
        session_service = SqliteSessionService(db_path="workflow_sessions.db")

        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            session_service=session_service,
        )
        ```

        Using App/Runner for existing infrastructure integration:

        ```python
        from google.adk.runners import Runner
        from google.adk.sessions import DatabaseSessionService

        # Configure Runner with your production session service
        runner = Runner(
            app_name="my_workflow_app",
            agent=pipeline,  # The workflow agent
            session_service=DatabaseSessionService(connection_string="..."),
        )

        # Evolution uses Runner's session_service for all operations
        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            runner=runner,  # Services extracted from runner
        )
        ```

    Note:
        Pre-flight validation runs synchronously before any LLM calls.
        Supports workflow agents (SequentialAgent, LoopAgent, ParallelAgent)
        with recursive traversal and depth limiting via max_depth parameter.
        Handles nested structures. LoopAgent and ParallelAgent configurations
        (max_iterations, etc.) are preserved during evolution. Always uses
        share_session=True to maintain workflow context (FR-010).

        For reproducible evolution, pass a seeded config:
        ``config=EvolutionConfig(seed=42)``.
    """
    # Pre-flight validation (Story 2.5)
    _pre_flight_validate_workflow(trainset, critic, components)

    logger.info(
        "Starting workflow evolution",
        workflow_name=workflow.name,
        workflow_type=type(workflow).__name__,
    )

    # Find all LlmAgents in the workflow recursively up to max_depth (US3)
    llm_agents = find_llm_agents(workflow, max_depth=max_depth)

    # Validate that at least one LlmAgent was found
    if not llm_agents:
        error_msg = (
            f"No LlmAgents found in workflow '{workflow.name}'. "
            "Workflow must contain at least one LlmAgent to evolve."
        )
        logger.error(
            "Workflow evolution failed", workflow_name=workflow.name, error=error_msg
        )
        raise WorkflowEvolutionError(
            error_msg,
            workflow_name=workflow.name,
        )

    logger.info(
        "Found LlmAgents in workflow",
        workflow_name=workflow.name,
        agent_count=len(llm_agents),
        agent_names=[agent.name for agent in llm_agents],
    )

    # Determine primary agent (default to last agent for sequential workflows)
    if primary is None:
        primary = llm_agents[-1].name
        logger.debug(
            "Using default primary agent",
            workflow_name=workflow.name,
            primary=primary,
        )

    # Convert list to dict for evolve_group (API v0.3.x)
    agents_dict = {agent.name: agent for agent in llm_agents}

    # Build components dict based on round_robin flag
    # Explicit components parameter takes precedence over round_robin
    resolved_components: dict[str, list[str]] | None = None
    if components is not None:
        # Explicit components provided - use as-is
        resolved_components = components
        logger.debug(
            "Using explicit components",
            workflow_name=workflow.name,
            components=list(components.keys()),
        )
    elif round_robin:
        # round_robin=True: evolve all agents
        resolved_components = {agent.name: ["instruction"] for agent in llm_agents}
        logger.debug(
            "Using round_robin mode - evolving all agents",
            workflow_name=workflow.name,
            agents=[agent.name for agent in llm_agents],
        )
    else:
        # Default: evolve only the first agent
        first_agent = llm_agents[0]
        resolved_components = {first_agent.name: ["instruction"]}
        # Add empty lists for other agents (excluded from evolution)
        for agent in llm_agents[1:]:
            resolved_components[agent.name] = []
        logger.debug(
            "Using default mode - evolving first agent only",
            workflow_name=workflow.name,
            first_agent=first_agent.name,
        )

    # Delegate to evolve_group with share_session=True (FR-010)
    logger.debug(
        "Delegating to evolve_group",
        workflow_name=workflow.name,
        agent_count=len(llm_agents),
        primary=primary,
        share_session=True,
        round_robin=round_robin,
    )

    return await evolve_group(
        agents=agents_dict,
        primary=primary,
        trainset=trainset,
        components=resolved_components,
        critic=critic,
        share_session=True,  # FR-010: Always use shared session for workflow context
        config=config,
        state_guard=state_guard,
        component_selector=component_selector,
        workflow=workflow,  # Preserve workflow structure (#215)
        session_service=session_service,  # Pass through for persistence (#226)
        app=app,  # Pass through for App/Runner pattern (#227)
        runner=runner,  # Pass through for App/Runner pattern (#227)
    )

run_sync

run_sync(coro: Coroutine[Any, Any, _T]) -> _T

Run an async coroutine synchronously and return its result.

Universal sync wrapper that accepts any coroutine (e.g., evolve(), evolve_group(), evolve_workflow()) and runs it in a blocking manner. Uses asyncio.run() as the primary mechanism, with nest_asyncio as a fallback for environments with a running event loop. The fallback saves and restores the original event loop to avoid polluting the event loop policy state.

PARAMETER DESCRIPTION
coro

A coroutine object to execute (e.g., evolve(agent, trainset)). Must be a coroutine, not a function or other awaitable.

TYPE: Coroutine[Any, Any, _T]

RETURNS DESCRIPTION
_T

The result of the coroutine execution. The return type matches

_T

the coroutine's return type (e.g., EvolutionResult for evolve(),

_T

MultiAgentEvolutionResult for evolve_group()).

RAISES DESCRIPTION
TypeError

If coro is not a coroutine object.

RuntimeError

If a running event loop is detected and nest_asyncio is not installed.

Examples:

Single-agent evolution:

from gepa_adk import run_sync, evolve

result = run_sync(evolve(agent, trainset=trainset))

Multi-agent group evolution:

from gepa_adk import run_sync, evolve_group

result = run_sync(evolve_group(agents, "primary", trainset=trainset))
Note

In Jupyter notebooks or IPython, the event loop is already running. Use await evolve(...) directly instead of run_sync(evolve(...)). The nest_asyncio fallback may work but await is preferred.

Source code in src/gepa_adk/api.py
def run_sync(coro: Coroutine[Any, Any, _T]) -> _T:
    """Run an async coroutine synchronously and return its result.

    Universal sync wrapper that accepts any coroutine (e.g., evolve(),
    evolve_group(), evolve_workflow()) and runs it in a blocking manner.
    Uses ``asyncio.run()`` as the primary mechanism, with ``nest_asyncio``
    as a fallback for environments with a running event loop.  The fallback
    saves and restores the original event loop to avoid polluting the
    event loop policy state.

    Args:
        coro: A coroutine object to execute (e.g., ``evolve(agent, trainset)``).
            Must be a coroutine, not a function or other awaitable.

    Returns:
        The result of the coroutine execution. The return type matches
        the coroutine's return type (e.g., EvolutionResult for evolve(),
        MultiAgentEvolutionResult for evolve_group()).

    Raises:
        TypeError: If ``coro`` is not a coroutine object.
        RuntimeError: If a running event loop is detected and ``nest_asyncio``
            is not installed.

    Examples:
        Single-agent evolution:

        ```python
        from gepa_adk import run_sync, evolve

        result = run_sync(evolve(agent, trainset=trainset))
        ```

        Multi-agent group evolution:

        ```python
        from gepa_adk import run_sync, evolve_group

        result = run_sync(evolve_group(agents, "primary", trainset=trainset))
        ```

    Note:
        In Jupyter notebooks or IPython, the event loop is already running.
        Use ``await evolve(...)`` directly instead of ``run_sync(evolve(...))``.
        The ``nest_asyncio`` fallback may work but ``await`` is preferred.
    """
    import asyncio

    if not asyncio.iscoroutine(coro):
        raise TypeError(
            f"run_sync() requires a coroutine object, got {type(coro).__name__}. "
            "Call the async function first: run_sync(evolve(...)) not run_sync(evolve)"
        )

    try:
        return asyncio.run(coro)
    except RuntimeError as e:
        if "asyncio.run() cannot be called from a running event loop" in str(e):
            try:
                import nest_asyncio

                nest_asyncio.apply()
                try:
                    old_loop = asyncio.get_running_loop()
                except RuntimeError:
                    old_loop = None
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
                try:
                    return loop.run_until_complete(coro)
                finally:
                    loop.close()
                    if old_loop is not None:
                        asyncio.set_event_loop(old_loop)
            except ImportError:
                raise RuntimeError(
                    "A running event loop was detected and nest_asyncio is not "
                    "installed. Install it with: uv add nest-asyncio\n"
                    "Alternatively, use 'await evolve(...)' directly in async "
                    "contexts like Jupyter notebooks."
                ) from e
        raise

main

main() -> None

Entry point for CLI invocation.

Source code in src/gepa_adk/__init__.py
def main() -> None:
    """Entry point for CLI invocation."""
    print("Hello from gepa-adk!")