Skip to content

Ports

ports

Protocol and interface definitions for external integrations.

This package defines the ports layer of the hexagonal architecture, providing protocol interfaces that adapters implement to integrate with external systems. All protocols follow async-first design principles and support runtime checking.

ATTRIBUTE DESCRIPTION
AgentProvider

Protocol for loading and persisting agents.

TYPE: protocol

AsyncGEPAAdapter

Async adapter contract for evaluations.

TYPE: protocol

EvaluationBatch

Container for evaluation outputs and scores.

TYPE: class

Scorer

Protocol for scoring agent outputs.

TYPE: protocol

ProposerProtocol

Protocol for candidate proposal strategies.

TYPE: protocol

AgentExecutorProtocol

Protocol for unified agent execution.

TYPE: protocol

ExecutionResult

Result of an agent execution.

TYPE: dataclass

ExecutionStatus

Status of agent execution.

TYPE: enum

DataInst

Type variable for input instances.

TYPE: type

Trajectory

Type variable for execution traces.

TYPE: type

RolloutOutput

Type variable for evaluation outputs.

TYPE: type

Examples:

Import the agent provider protocol:

from gepa_adk.ports import AgentProvider

Import the adapter protocol:

from gepa_adk.ports import AsyncGEPAAdapter, EvaluationBatch

Import the scorer protocol:

from gepa_adk.ports import Scorer

Import the proposer protocol:

from gepa_adk.ports import ProposerProtocol
from gepa_adk.domain.types import ProposalResult

Import the agent executor protocol:

from gepa_adk.ports import (
    AgentExecutorProtocol,
    ExecutionResult,
    ExecutionStatus,
)
See Also
Note

This layer follows hexagonal architecture principles, defining ports that adapters implement to integrate with external systems.

ProposalResult dataclass

Result of a successful proposal operation.

ATTRIBUTE DESCRIPTION
candidate

The proposed candidate with components.

TYPE: Candidate

parent_indices

Indices of parent candidate(s) in ParetoState.

TYPE: list[int]

tag

Type of proposal ("mutation" or "merge").

TYPE: str

metadata

Additional proposal-specific metadata.

TYPE: dict[str, Any]

Examples:

Creating a mutation proposal result:

from gepa_adk.domain.types import ProposalResult
from gepa_adk.domain.models import Candidate

result = ProposalResult(
    candidate=Candidate(components={"instruction": "Be helpful"}),
    parent_indices=[5],
    tag="mutation",
)

Creating a merge proposal result:

result = ProposalResult(
    candidate=Candidate(components={"instruction": "..."}),
    parent_indices=[5, 8],
    tag="merge",
    metadata={"ancestor_idx": 2},
)
Note

A frozen dataclass ensures immutability of proposal results. Parent indices must be valid indices into the ParetoState.candidates list.

Source code in src/gepa_adk/domain/types.py
@dataclass(frozen=True, slots=True)
class ProposalResult:
    """Result of a successful proposal operation.

    Attributes:
        candidate (Candidate): The proposed candidate with components.
        parent_indices (list[int]): Indices of parent candidate(s) in ParetoState.
        tag (str): Type of proposal ("mutation" or "merge").
        metadata (dict[str, Any]): Additional proposal-specific metadata.

    Examples:
        Creating a mutation proposal result:

        ```python
        from gepa_adk.domain.types import ProposalResult
        from gepa_adk.domain.models import Candidate

        result = ProposalResult(
            candidate=Candidate(components={"instruction": "Be helpful"}),
            parent_indices=[5],
            tag="mutation",
        )
        ```

        Creating a merge proposal result:

        ```python
        result = ProposalResult(
            candidate=Candidate(components={"instruction": "..."}),
            parent_indices=[5, 8],
            tag="merge",
            metadata={"ancestor_idx": 2},
        )
        ```

    Note:
        A frozen dataclass ensures immutability of proposal results.
        Parent indices must be valid indices into the ParetoState.candidates list.
    """

    candidate: "Candidate"  # Forward reference to avoid circular import
    parent_indices: list[int]
    tag: str
    metadata: dict[str, Any] = field(default_factory=dict)

AsyncGEPAAdapter

Bases: Protocol[DataInst, Trajectory, RolloutOutput]


              flowchart TD
              gepa_adk.ports.AsyncGEPAAdapter[AsyncGEPAAdapter]

              

              click gepa_adk.ports.AsyncGEPAAdapter href "" "gepa_adk.ports.AsyncGEPAAdapter"
            

Protocol for async GEPA adapters used by the evolution engine.

Implementations provide evaluation, reflection dataset generation, and proposal updates for candidate component texts.

Examples:

Implement a minimal adapter:

class MyAdapter:
    async def evaluate(self, batch, candidate, capture_traces=False):
        return EvaluationBatch(outputs=[], scores=[])

    async def make_reflective_dataset(
        self, candidate, eval_batch, components_to_update
    ):
        return {component: [] for component in components_to_update}

    async def propose_new_texts(
        self, candidate, reflective_dataset, components_to_update
    ):
        return {
            component: candidate[component]
            for component in components_to_update
        }
Note

Adapters must implement all three async methods to satisfy the protocol. Use runtime_checkable for isinstance() checks.

Source code in src/gepa_adk/ports/adapter.py
@runtime_checkable
class AsyncGEPAAdapter(Protocol[DataInst, Trajectory, RolloutOutput]):
    """Protocol for async GEPA adapters used by the evolution engine.

    Implementations provide evaluation, reflection dataset generation, and
    proposal updates for candidate component texts.

    Examples:
        Implement a minimal adapter:

        ```python
        class MyAdapter:
            async def evaluate(self, batch, candidate, capture_traces=False):
                return EvaluationBatch(outputs=[], scores=[])

            async def make_reflective_dataset(
                self, candidate, eval_batch, components_to_update
            ):
                return {component: [] for component in components_to_update}

            async def propose_new_texts(
                self, candidate, reflective_dataset, components_to_update
            ):
                return {
                    component: candidate[component]
                    for component in components_to_update
                }
        ```

    Note:
        Adapters must implement all three async methods to satisfy
        the protocol. Use runtime_checkable for isinstance() checks.
    """

    async def evaluate(
        self,
        batch: list[DataInst],
        candidate: dict[str, str],
        capture_traces: bool = False,
    ) -> EvaluationBatch[Trajectory, RolloutOutput]:
        """Evaluate a candidate over a batch of inputs.

        Args:
            batch: Input data instances to evaluate.
            candidate: Component name to text mapping.
            capture_traces: Whether to capture execution traces.

        Returns:
            Evaluation results with outputs, scores, and optional traces.

        Examples:
            Basic evaluation:

            ```python
            result = await adapter.evaluate(batch, candidate)
            assert len(result.scores) == len(batch)
            ```

        Note:
            Output and score lists must have the same length as the
            input batch. Set capture_traces=True to enable reflection.
        """

    async def make_reflective_dataset(
        self,
        candidate: dict[str, str],
        eval_batch: EvaluationBatch[Trajectory, RolloutOutput],
        components_to_update: list[str],
    ) -> Mapping[str, Sequence[Mapping[str, Any]]]:
        """Build reflective datasets from evaluation traces.

        Args:
            candidate: Current candidate components.
            eval_batch: Evaluation results with traces.
            components_to_update: Components to generate datasets for.

        Returns:
            Mapping of component name to reflective examples.

        Examples:
            Build datasets for specific components:

            ```python
            dataset = await adapter.make_reflective_dataset(
                candidate,
                eval_batch,
                ["instruction"],
            )
            ```

        Note:
            Only call this method when eval_batch contains trajectories.
            Each component receives its own list of reflective examples.
        """

    async def propose_new_texts(
        self,
        candidate: dict[str, str],
        reflective_dataset: Mapping[str, Sequence[Mapping[str, Any]]],
        components_to_update: list[str],
    ) -> dict[str, str]:
        """Propose updated component texts from reflective datasets.

        Args:
            candidate: Current candidate components.
            reflective_dataset: Reflective examples per component.
            components_to_update: Components to propose updates for.

        Returns:
            Mapping of component name to new proposed text.

        Examples:
            Generate new component texts:

            ```python
            proposals = await adapter.propose_new_texts(
                candidate,
                reflective_dataset,
                ["instruction"],
            )
            ```

        Note:
            Outputs should contain improved text for each requested
            component. The evolution engine uses these as mutation candidates.
        """

evaluate async

evaluate(
    batch: list[DataInst],
    candidate: dict[str, str],
    capture_traces: bool = False,
) -> EvaluationBatch[Trajectory, RolloutOutput]

Evaluate a candidate over a batch of inputs.

PARAMETER DESCRIPTION
batch

Input data instances to evaluate.

TYPE: list[DataInst]

candidate

Component name to text mapping.

TYPE: dict[str, str]

capture_traces

Whether to capture execution traces.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
EvaluationBatch[Trajectory, RolloutOutput]

Evaluation results with outputs, scores, and optional traces.

Examples:

Basic evaluation:

result = await adapter.evaluate(batch, candidate)
assert len(result.scores) == len(batch)
Note

Output and score lists must have the same length as the input batch. Set capture_traces=True to enable reflection.

Source code in src/gepa_adk/ports/adapter.py
async def evaluate(
    self,
    batch: list[DataInst],
    candidate: dict[str, str],
    capture_traces: bool = False,
) -> EvaluationBatch[Trajectory, RolloutOutput]:
    """Evaluate a candidate over a batch of inputs.

    Args:
        batch: Input data instances to evaluate.
        candidate: Component name to text mapping.
        capture_traces: Whether to capture execution traces.

    Returns:
        Evaluation results with outputs, scores, and optional traces.

    Examples:
        Basic evaluation:

        ```python
        result = await adapter.evaluate(batch, candidate)
        assert len(result.scores) == len(batch)
        ```

    Note:
        Output and score lists must have the same length as the
        input batch. Set capture_traces=True to enable reflection.
    """

make_reflective_dataset async

make_reflective_dataset(
    candidate: dict[str, str],
    eval_batch: EvaluationBatch[Trajectory, RolloutOutput],
    components_to_update: list[str],
) -> Mapping[str, Sequence[Mapping[str, Any]]]

Build reflective datasets from evaluation traces.

PARAMETER DESCRIPTION
candidate

Current candidate components.

TYPE: dict[str, str]

eval_batch

Evaluation results with traces.

TYPE: EvaluationBatch[Trajectory, RolloutOutput]

components_to_update

Components to generate datasets for.

TYPE: list[str]

RETURNS DESCRIPTION
Mapping[str, Sequence[Mapping[str, Any]]]

Mapping of component name to reflective examples.

Examples:

Build datasets for specific components:

dataset = await adapter.make_reflective_dataset(
    candidate,
    eval_batch,
    ["instruction"],
)
Note

Only call this method when eval_batch contains trajectories. Each component receives its own list of reflective examples.

Source code in src/gepa_adk/ports/adapter.py
async def make_reflective_dataset(
    self,
    candidate: dict[str, str],
    eval_batch: EvaluationBatch[Trajectory, RolloutOutput],
    components_to_update: list[str],
) -> Mapping[str, Sequence[Mapping[str, Any]]]:
    """Build reflective datasets from evaluation traces.

    Args:
        candidate: Current candidate components.
        eval_batch: Evaluation results with traces.
        components_to_update: Components to generate datasets for.

    Returns:
        Mapping of component name to reflective examples.

    Examples:
        Build datasets for specific components:

        ```python
        dataset = await adapter.make_reflective_dataset(
            candidate,
            eval_batch,
            ["instruction"],
        )
        ```

    Note:
        Only call this method when eval_batch contains trajectories.
        Each component receives its own list of reflective examples.
    """

propose_new_texts async

propose_new_texts(
    candidate: dict[str, str],
    reflective_dataset: Mapping[
        str, Sequence[Mapping[str, Any]]
    ],
    components_to_update: list[str],
) -> dict[str, str]

Propose updated component texts from reflective datasets.

PARAMETER DESCRIPTION
candidate

Current candidate components.

TYPE: dict[str, str]

reflective_dataset

Reflective examples per component.

TYPE: Mapping[str, Sequence[Mapping[str, Any]]]

components_to_update

Components to propose updates for.

TYPE: list[str]

RETURNS DESCRIPTION
dict[str, str]

Mapping of component name to new proposed text.

Examples:

Generate new component texts:

proposals = await adapter.propose_new_texts(
    candidate,
    reflective_dataset,
    ["instruction"],
)
Note

Outputs should contain improved text for each requested component. The evolution engine uses these as mutation candidates.

Source code in src/gepa_adk/ports/adapter.py
async def propose_new_texts(
    self,
    candidate: dict[str, str],
    reflective_dataset: Mapping[str, Sequence[Mapping[str, Any]]],
    components_to_update: list[str],
) -> dict[str, str]:
    """Propose updated component texts from reflective datasets.

    Args:
        candidate: Current candidate components.
        reflective_dataset: Reflective examples per component.
        components_to_update: Components to propose updates for.

    Returns:
        Mapping of component name to new proposed text.

    Examples:
        Generate new component texts:

        ```python
        proposals = await adapter.propose_new_texts(
            candidate,
            reflective_dataset,
            ["instruction"],
        )
        ```

    Note:
        Outputs should contain improved text for each requested
        component. The evolution engine uses these as mutation candidates.
    """

EvaluationBatch dataclass

Bases: Generic[Trajectory, RolloutOutput]


              flowchart TD
              gepa_adk.ports.EvaluationBatch[EvaluationBatch]

              

              click gepa_adk.ports.EvaluationBatch href "" "gepa_adk.ports.EvaluationBatch"
            

Container for evaluation outputs and scores.

ATTRIBUTE DESCRIPTION
outputs

Per-example outputs produced during evaluation.

TYPE: list[RolloutOutput]

scores

Per-example normalized scores (higher is better).

TYPE: list[Score]

trajectories

Optional per-example execution traces.

TYPE: list[Trajectory] | None

objective_scores

Optional multi-objective scores per example.

TYPE: list[dict[ComponentName, Score]] | None

metadata

Optional per-example scorer metadata. When provided, metadata[i] corresponds to outputs[i] and scores[i] (index-aligned). Metadata dicts may contain scorer-specific fields like 'feedback', 'actionable_guidance', or 'dimension_scores' from CriticScorer implementations.

TYPE: list[dict[str, Any]] | None

inputs

Optional per-example input text that was used to generate each output. Used by make_reflective_dataset to provide context for reflection. When provided, inputs[i] corresponds to the input that produced outputs[i].

TYPE: list[str] | None

Examples:

Create a batch with optional traces:

batch = EvaluationBatch(
    outputs=["ok", "ok"],
    scores=[0.9, 0.8],
    trajectories=[{"trace": 1}, {"trace": 2}],
)

Create a batch with inputs for reflection:

batch = EvaluationBatch(
    outputs=["Hello!", "Good morrow!"],
    scores=[0.3, 0.9],
    inputs=["I am the King", "I am your friend"],
)
Note

All fields are immutable once created due to frozen=True. Use this as the standard return type from adapter evaluations. When metadata is not None, len(metadata) must equal len(outputs) and len(scores).

Source code in src/gepa_adk/ports/adapter.py
@dataclass(frozen=True, slots=True)
class EvaluationBatch(Generic[Trajectory, RolloutOutput]):
    """Container for evaluation outputs and scores.

    Attributes:
        outputs (list[RolloutOutput]): Per-example outputs produced during evaluation.
        scores (list[Score]): Per-example normalized scores (higher is better).
        trajectories (list[Trajectory] | None): Optional per-example execution traces.
        objective_scores (list[dict[ComponentName, Score]] | None): Optional
            multi-objective scores per example.
        metadata (list[dict[str, Any]] | None): Optional per-example scorer metadata.
            When provided, metadata[i] corresponds to outputs[i] and scores[i]
            (index-aligned). Metadata dicts may contain scorer-specific fields like
            'feedback', 'actionable_guidance', or 'dimension_scores' from
            CriticScorer implementations.
        inputs (list[str] | None): Optional per-example input text that was used
            to generate each output. Used by make_reflective_dataset to provide
            context for reflection. When provided, inputs[i] corresponds to the
            input that produced outputs[i].

    Examples:
        Create a batch with optional traces:

        ```python
        batch = EvaluationBatch(
            outputs=["ok", "ok"],
            scores=[0.9, 0.8],
            trajectories=[{"trace": 1}, {"trace": 2}],
        )
        ```

        Create a batch with inputs for reflection:

        ```python
        batch = EvaluationBatch(
            outputs=["Hello!", "Good morrow!"],
            scores=[0.3, 0.9],
            inputs=["I am the King", "I am your friend"],
        )
        ```

    Note:
        All fields are immutable once created due to frozen=True.
        Use this as the standard return type from adapter evaluations.
        When metadata is not None, len(metadata) must equal len(outputs) and len(scores).
    """

    outputs: list[RolloutOutput]
    scores: list[Score]
    trajectories: list[Trajectory] | None = None
    objective_scores: list[dict[ComponentName, Score]] | None = None
    metadata: list[dict[str, Any]] | None = None
    inputs: list[str] | None = None

AgentExecutorProtocol

Bases: Protocol


              flowchart TD
              gepa_adk.ports.AgentExecutorProtocol[AgentExecutorProtocol]

              

              click gepa_adk.ports.AgentExecutorProtocol href "" "gepa_adk.ports.AgentExecutorProtocol"
            

Protocol for unified agent execution.

Defines the interface for executing any ADK agent type (LlmAgent, workflow agents) with consistent behavior, session management, and result handling.

This protocol enables: - Feature parity across generator, critic, and reflection agents - Dependency injection for testing - Single point of change for new ADK features

Examples:

Using an executor:

executor: AgentExecutorProtocol = AgentExecutor()

result = await executor.execute_agent(
    agent=my_agent,
    input_text="Hello",
    timeout_seconds=60,
)

if result.status == ExecutionStatus.SUCCESS:
    print(result.extracted_value)

With instruction override (for evolution):

result = await executor.execute_agent(
    agent=my_agent,
    input_text="Hello",
    instruction_override="You are a helpful assistant.",
)
See Also
Note

All implementations should validate that the agent parameter is a valid ADK LlmAgent. The Any type is used here to avoid coupling the ports layer to ADK types.

Source code in src/gepa_adk/ports/agent_executor.py
@runtime_checkable
class AgentExecutorProtocol(Protocol):
    """Protocol for unified agent execution.

    Defines the interface for executing any ADK agent type (LlmAgent,
    workflow agents) with consistent behavior, session management, and
    result handling.

    This protocol enables:
    - Feature parity across generator, critic, and reflection agents
    - Dependency injection for testing
    - Single point of change for new ADK features

    Examples:
        Using an executor:

        ```python
        executor: AgentExecutorProtocol = AgentExecutor()

        result = await executor.execute_agent(
            agent=my_agent,
            input_text="Hello",
            timeout_seconds=60,
        )

        if result.status == ExecutionStatus.SUCCESS:
            print(result.extracted_value)
        ```

        With instruction override (for evolution):

        ```python
        result = await executor.execute_agent(
            agent=my_agent,
            input_text="Hello",
            instruction_override="You are a helpful assistant.",
        )
        ```

    See Also:
        - [`ExecutionResult`][gepa_adk.ports.agent_executor.ExecutionResult]:
            Return type for execute_agent.
        - [`ExecutionStatus`][gepa_adk.ports.agent_executor.ExecutionStatus]:
            Status enum for execution outcomes.

    Note:
        All implementations should validate that the agent parameter is a
        valid ADK LlmAgent. The Any type is used here to avoid coupling
        the ports layer to ADK types.
    """

    async def execute_agent(
        self,
        agent: Any,
        input_text: str,
        *,
        input_content: Any | None = None,
        instruction_override: str | None = None,
        output_schema_override: Any | None = None,
        session_state: dict[str, Any] | None = None,
        existing_session_id: str | None = None,
        timeout_seconds: int = 300,
    ) -> ExecutionResult:
        """Execute an agent and return structured result.

        Runs the specified agent with the given input, optionally applying
        instruction or schema overrides for evolution scenarios. Manages
        session lifecycle and captures execution events.

        Args:
            agent: ADK LlmAgent to execute. The agent's tools, output_key,
                and other ADK features are preserved during execution.
            input_text: User message to send to the agent. Used as fallback
                if input_content is not provided.
            input_content: Optional ADK Content object for multimodal input
                (e.g., video parts). If provided, takes precedence over input_text.
            instruction_override: If provided, replaces the agent's instruction
                for this execution only. Original agent is not modified.
            output_schema_override: If provided, replaces the agent's output
                schema for this execution only (type[BaseModel]). Used for schema evolution.
            session_state: Initial state to inject into the session. Used for
                template variable substitution (e.g., {component_text}).
            existing_session_id: If provided, reuses an existing session instead
                of creating a new one. Useful for critic accessing generator state.
            timeout_seconds: Maximum execution time in seconds. Defaults to 300.
                Execution terminates with TIMEOUT status if exceeded.

        Returns:
            ExecutionResult with status, output, and debugging information.

        Raises:
            SessionNotFoundError: If existing_session_id is provided but session
                does not exist.

        Examples:
            Basic execution:

            ```python
            result = await executor.execute_agent(
                agent=greeter,
                input_text="Hello!",
            )
            print(result.extracted_value)  # "Hello! How can I help?"
            ```

            With session state for reflection:

            ```python
            result = await executor.execute_agent(
                agent=reflector,
                input_text="Improve the instruction",
                session_state={
                    "component_text": "Be helpful.",
                    "trials": '[{"score": 0.5}]',
                },
            )
            ```

            Session sharing between agents:

            ```python
            # Generator creates session
            gen_result = await executor.execute_agent(
                agent=generator,
                input_text="Write a story.",
            )

            # Critic reuses generator's session
            critic_result = await executor.execute_agent(
                agent=critic,
                input_text=f"Evaluate: {gen_result.extracted_value}",
                existing_session_id=gen_result.session_id,
            )
            ```

        Note:
            The agent parameter is typed as Any to avoid coupling to ADK types
            in the ports layer. Implementations should validate that the agent
            is a valid LlmAgent.
        """
        ...

execute_agent async

execute_agent(
    agent: Any,
    input_text: str,
    *,
    input_content: Any | None = None,
    instruction_override: str | None = None,
    output_schema_override: Any | None = None,
    session_state: dict[str, Any] | None = None,
    existing_session_id: str | None = None,
    timeout_seconds: int = 300,
) -> ExecutionResult

Execute an agent and return structured result.

Runs the specified agent with the given input, optionally applying instruction or schema overrides for evolution scenarios. Manages session lifecycle and captures execution events.

PARAMETER DESCRIPTION
agent

ADK LlmAgent to execute. The agent's tools, output_key, and other ADK features are preserved during execution.

TYPE: Any

input_text

User message to send to the agent. Used as fallback if input_content is not provided.

TYPE: str

input_content

Optional ADK Content object for multimodal input (e.g., video parts). If provided, takes precedence over input_text.

TYPE: Any | None DEFAULT: None

instruction_override

If provided, replaces the agent's instruction for this execution only. Original agent is not modified.

TYPE: str | None DEFAULT: None

output_schema_override

If provided, replaces the agent's output schema for this execution only (type[BaseModel]). Used for schema evolution.

TYPE: Any | None DEFAULT: None

session_state

Initial state to inject into the session. Used for template variable substitution (e.g., {component_text}).

TYPE: dict[str, Any] | None DEFAULT: None

existing_session_id

If provided, reuses an existing session instead of creating a new one. Useful for critic accessing generator state.

TYPE: str | None DEFAULT: None

timeout_seconds

Maximum execution time in seconds. Defaults to 300. Execution terminates with TIMEOUT status if exceeded.

TYPE: int DEFAULT: 300

RETURNS DESCRIPTION
ExecutionResult

ExecutionResult with status, output, and debugging information.

RAISES DESCRIPTION
SessionNotFoundError

If existing_session_id is provided but session does not exist.

Examples:

Basic execution:

result = await executor.execute_agent(
    agent=greeter,
    input_text="Hello!",
)
print(result.extracted_value)  # "Hello! How can I help?"

With session state for reflection:

result = await executor.execute_agent(
    agent=reflector,
    input_text="Improve the instruction",
    session_state={
        "component_text": "Be helpful.",
        "trials": '[{"score": 0.5}]',
    },
)

Session sharing between agents:

# Generator creates session
gen_result = await executor.execute_agent(
    agent=generator,
    input_text="Write a story.",
)

# Critic reuses generator's session
critic_result = await executor.execute_agent(
    agent=critic,
    input_text=f"Evaluate: {gen_result.extracted_value}",
    existing_session_id=gen_result.session_id,
)
Note

The agent parameter is typed as Any to avoid coupling to ADK types in the ports layer. Implementations should validate that the agent is a valid LlmAgent.

Source code in src/gepa_adk/ports/agent_executor.py
async def execute_agent(
    self,
    agent: Any,
    input_text: str,
    *,
    input_content: Any | None = None,
    instruction_override: str | None = None,
    output_schema_override: Any | None = None,
    session_state: dict[str, Any] | None = None,
    existing_session_id: str | None = None,
    timeout_seconds: int = 300,
) -> ExecutionResult:
    """Execute an agent and return structured result.

    Runs the specified agent with the given input, optionally applying
    instruction or schema overrides for evolution scenarios. Manages
    session lifecycle and captures execution events.

    Args:
        agent: ADK LlmAgent to execute. The agent's tools, output_key,
            and other ADK features are preserved during execution.
        input_text: User message to send to the agent. Used as fallback
            if input_content is not provided.
        input_content: Optional ADK Content object for multimodal input
            (e.g., video parts). If provided, takes precedence over input_text.
        instruction_override: If provided, replaces the agent's instruction
            for this execution only. Original agent is not modified.
        output_schema_override: If provided, replaces the agent's output
            schema for this execution only (type[BaseModel]). Used for schema evolution.
        session_state: Initial state to inject into the session. Used for
            template variable substitution (e.g., {component_text}).
        existing_session_id: If provided, reuses an existing session instead
            of creating a new one. Useful for critic accessing generator state.
        timeout_seconds: Maximum execution time in seconds. Defaults to 300.
            Execution terminates with TIMEOUT status if exceeded.

    Returns:
        ExecutionResult with status, output, and debugging information.

    Raises:
        SessionNotFoundError: If existing_session_id is provided but session
            does not exist.

    Examples:
        Basic execution:

        ```python
        result = await executor.execute_agent(
            agent=greeter,
            input_text="Hello!",
        )
        print(result.extracted_value)  # "Hello! How can I help?"
        ```

        With session state for reflection:

        ```python
        result = await executor.execute_agent(
            agent=reflector,
            input_text="Improve the instruction",
            session_state={
                "component_text": "Be helpful.",
                "trials": '[{"score": 0.5}]',
            },
        )
        ```

        Session sharing between agents:

        ```python
        # Generator creates session
        gen_result = await executor.execute_agent(
            agent=generator,
            input_text="Write a story.",
        )

        # Critic reuses generator's session
        critic_result = await executor.execute_agent(
            agent=critic,
            input_text=f"Evaluate: {gen_result.extracted_value}",
            existing_session_id=gen_result.session_id,
        )
        ```

    Note:
        The agent parameter is typed as Any to avoid coupling to ADK types
        in the ports layer. Implementations should validate that the agent
        is a valid LlmAgent.
    """
    ...

ExecutionResult dataclass

Result of an agent execution.

Provides consistent return type across all agent types (generator, critic, reflection) with status, output, timing, and debugging information.

ATTRIBUTE DESCRIPTION
status

Outcome status of the execution.

TYPE: ExecutionStatus

session_id

ADK session identifier used for this execution.

TYPE: str

extracted_value

Output text extracted from agent response. None if agent produced no output or execution failed.

TYPE: str | None

error_message

Error details if status is FAILED or TIMEOUT. None for successful executions.

TYPE: str | None

execution_time_seconds

Duration of execution in seconds.

TYPE: float

captured_events

ADK events captured during execution for debugging and trajectory analysis. None if event capture disabled.

TYPE: list[Any] | None

Examples:

Successful execution:

result = ExecutionResult(
    status=ExecutionStatus.SUCCESS,
    session_id="sess_123",
    extracted_value="Hello, world!",
    execution_time_seconds=1.5,
)

Failed execution:

result = ExecutionResult(
    status=ExecutionStatus.FAILED,
    session_id="sess_456",
    error_message="Model rate limit exceeded",
    execution_time_seconds=0.2,
)
Note

All captured_events contain raw ADK Event objects for debugging and trajectory recording. Events are captured even on timeout or failure.

Source code in src/gepa_adk/ports/agent_executor.py
@dataclass
class ExecutionResult:
    """Result of an agent execution.

    Provides consistent return type across all agent types (generator, critic,
    reflection) with status, output, timing, and debugging information.

    Attributes:
        status (ExecutionStatus): Outcome status of the execution.
        session_id (str): ADK session identifier used for this execution.
        extracted_value (str | None): Output text extracted from agent response. None if
            agent produced no output or execution failed.
        error_message (str | None): Error details if status is FAILED or TIMEOUT. None
            for successful executions.
        execution_time_seconds (float): Duration of execution in seconds.
        captured_events (list[Any] | None): ADK events captured during execution for debugging
            and trajectory analysis. None if event capture disabled.

    Examples:
        Successful execution:

        ```python
        result = ExecutionResult(
            status=ExecutionStatus.SUCCESS,
            session_id="sess_123",
            extracted_value="Hello, world!",
            execution_time_seconds=1.5,
        )
        ```

        Failed execution:

        ```python
        result = ExecutionResult(
            status=ExecutionStatus.FAILED,
            session_id="sess_456",
            error_message="Model rate limit exceeded",
            execution_time_seconds=0.2,
        )
        ```

    Note:
        All captured_events contain raw ADK Event objects for debugging
        and trajectory recording. Events are captured even on timeout
        or failure.
    """

    status: ExecutionStatus
    session_id: str
    extracted_value: str | None = None
    error_message: str | None = None
    execution_time_seconds: float = 0.0
    captured_events: list[Any] | None = field(default=None)

ExecutionStatus

Bases: str, Enum


              flowchart TD
              gepa_adk.ports.ExecutionStatus[ExecutionStatus]

              

              click gepa_adk.ports.ExecutionStatus href "" "gepa_adk.ports.ExecutionStatus"
            

Status of agent execution.

This enum represents the outcome status of an agent execution, used by ExecutionResult to indicate how the execution completed.

ATTRIBUTE DESCRIPTION
SUCCESS

Agent completed execution normally with valid output.

FAILED

Agent encountered an error during execution.

TIMEOUT

Agent execution exceeded configured timeout.

Examples:

Checking execution status:

from gepa_adk.ports.agent_executor import ExecutionStatus

if result.status == ExecutionStatus.SUCCESS:
    print(f"Output: {result.extracted_value}")
elif result.status == ExecutionStatus.TIMEOUT:
    print(f"Timed out after {result.execution_time_seconds}s")
elif result.status == ExecutionStatus.FAILED:
    print(f"Error: {result.error_message}")
Note

All status values reflect execution outcome, not output quality. A successful execution may still produce low-quality output that fails scoring.

Source code in src/gepa_adk/ports/agent_executor.py
class ExecutionStatus(str, Enum):
    """Status of agent execution.

    This enum represents the outcome status of an agent execution,
    used by ExecutionResult to indicate how the execution completed.

    Attributes:
        SUCCESS: Agent completed execution normally with valid output.
        FAILED: Agent encountered an error during execution.
        TIMEOUT: Agent execution exceeded configured timeout.

    Examples:
        Checking execution status:

        ```python
        from gepa_adk.ports.agent_executor import ExecutionStatus

        if result.status == ExecutionStatus.SUCCESS:
            print(f"Output: {result.extracted_value}")
        elif result.status == ExecutionStatus.TIMEOUT:
            print(f"Timed out after {result.execution_time_seconds}s")
        elif result.status == ExecutionStatus.FAILED:
            print(f"Error: {result.error_message}")
        ```

    Note:
        All status values reflect execution outcome, not output quality.
        A successful execution may still produce low-quality output
        that fails scoring.
    """

    SUCCESS = "success"
    FAILED = "failed"
    TIMEOUT = "timeout"

AgentProvider

Bases: Protocol


              flowchart TD
              gepa_adk.ports.AgentProvider[AgentProvider]

              

              click gepa_adk.ports.AgentProvider href "" "gepa_adk.ports.AgentProvider"
            

Protocol for loading and persisting agents.

Implementations provide agent configuration storage and retrieval, enabling the evolution system to load agents and persist evolved instructions.

Examples:

Implement a minimal in-memory provider:

class InMemoryProvider:
    def __init__(self):
        self._agents = {}

    def get_agent(self, name: str) -> LlmAgent:
        if not name:
            raise ValueError("Agent name cannot be empty")
        if name not in self._agents:
            raise KeyError(f"Agent not found: {name}")
        return self._agents[name]

    def save_instruction(self, name: str, instruction: str) -> None:
        if not name:
            raise ValueError("Agent name cannot be empty")
        if name not in self._agents:
            raise KeyError(f"Agent not found: {name}")
        self._agents[name].instruction = instruction

    def list_agents(self) -> list[str]:
        return list(self._agents.keys())

Verify protocol compliance:

from gepa_adk.ports import AgentProvider

provider = InMemoryProvider()
assert isinstance(provider, AgentProvider)  # Runtime check works
Note

All implementations must provide get_agent(), save_instruction(), and list_agents() methods. Use @runtime_checkable to enable isinstance() checks for protocol compliance.

Source code in src/gepa_adk/ports/agent_provider.py
@runtime_checkable
class AgentProvider(Protocol):
    """Protocol for loading and persisting agents.

    Implementations provide agent configuration storage and retrieval,
    enabling the evolution system to load agents and persist evolved
    instructions.

    Examples:
        Implement a minimal in-memory provider:

        ```python
        class InMemoryProvider:
            def __init__(self):
                self._agents = {}

            def get_agent(self, name: str) -> LlmAgent:
                if not name:
                    raise ValueError("Agent name cannot be empty")
                if name not in self._agents:
                    raise KeyError(f"Agent not found: {name}")
                return self._agents[name]

            def save_instruction(self, name: str, instruction: str) -> None:
                if not name:
                    raise ValueError("Agent name cannot be empty")
                if name not in self._agents:
                    raise KeyError(f"Agent not found: {name}")
                self._agents[name].instruction = instruction

            def list_agents(self) -> list[str]:
                return list(self._agents.keys())
        ```

        Verify protocol compliance:

        ```python
        from gepa_adk.ports import AgentProvider

        provider = InMemoryProvider()
        assert isinstance(provider, AgentProvider)  # Runtime check works
        ```

    Note:
        All implementations must provide get_agent(), save_instruction(),
        and list_agents() methods. Use @runtime_checkable to enable
        isinstance() checks for protocol compliance.
    """

    def get_agent(self, name: str) -> "LlmAgent":
        """Load an agent by its unique name.

        Args:
            name: The unique identifier for the agent.

        Returns:
            The configured LlmAgent instance ready for use.

        Raises:
            KeyError: If no agent with the given name exists.
            ValueError: If name is empty or invalid.

        Examples:
            Load a named agent:

            ```python
            provider = MyAgentProvider()
            agent = provider.get_agent("my_agent")
            print(agent.instruction)
            ```

        Note:
            Only non-empty names are accepted. Configured agents ready
            for use with the evolution system should be returned.
        """
        ...

    def save_instruction(self, name: str, instruction: str) -> None:
        """Persist an evolved instruction for a named agent.

        Args:
            name: The unique identifier for the agent.
            instruction: The new instruction text to persist.

        Raises:
            KeyError: If no agent with the given name exists.
            ValueError: If name is empty or invalid.
            IOError: If persistence fails (implementation-specific).

        Examples:
            Save an evolved instruction:

            ```python
            provider.save_instruction(
                "my_agent", "You are a helpful assistant specialized in coding."
            )
            ```

        Note:
            Only after successful persistence should subsequent calls to
            get_agent() return an agent with the updated instruction.
            Implementations should validate empty names before processing.
        """
        ...

    def list_agents(self) -> list[str]:
        """List all available agent names.

        Returns:
            A list of agent name strings. Empty list if no agents.

        Examples:
            Discover available agents:

            ```python
            provider = MyAgentProvider()
            for name in provider.list_agents():
                print(f"Found agent: {name}")
            ```

        Note:
            Ordering of returned names is not guaranteed by the protocol.
            Some implementations may return names in a specific order,
            but callers should not rely upon any particular sequence.
        """
        ...

get_agent

get_agent(name: str) -> 'LlmAgent'

Load an agent by its unique name.

PARAMETER DESCRIPTION
name

The unique identifier for the agent.

TYPE: str

RETURNS DESCRIPTION
'LlmAgent'

The configured LlmAgent instance ready for use.

RAISES DESCRIPTION
KeyError

If no agent with the given name exists.

ValueError

If name is empty or invalid.

Examples:

Load a named agent:

provider = MyAgentProvider()
agent = provider.get_agent("my_agent")
print(agent.instruction)
Note

Only non-empty names are accepted. Configured agents ready for use with the evolution system should be returned.

Source code in src/gepa_adk/ports/agent_provider.py
def get_agent(self, name: str) -> "LlmAgent":
    """Load an agent by its unique name.

    Args:
        name: The unique identifier for the agent.

    Returns:
        The configured LlmAgent instance ready for use.

    Raises:
        KeyError: If no agent with the given name exists.
        ValueError: If name is empty or invalid.

    Examples:
        Load a named agent:

        ```python
        provider = MyAgentProvider()
        agent = provider.get_agent("my_agent")
        print(agent.instruction)
        ```

    Note:
        Only non-empty names are accepted. Configured agents ready
        for use with the evolution system should be returned.
    """
    ...

save_instruction

save_instruction(name: str, instruction: str) -> None

Persist an evolved instruction for a named agent.

PARAMETER DESCRIPTION
name

The unique identifier for the agent.

TYPE: str

instruction

The new instruction text to persist.

TYPE: str

RAISES DESCRIPTION
KeyError

If no agent with the given name exists.

ValueError

If name is empty or invalid.

IOError

If persistence fails (implementation-specific).

Examples:

Save an evolved instruction:

provider.save_instruction(
    "my_agent", "You are a helpful assistant specialized in coding."
)
Note

Only after successful persistence should subsequent calls to get_agent() return an agent with the updated instruction. Implementations should validate empty names before processing.

Source code in src/gepa_adk/ports/agent_provider.py
def save_instruction(self, name: str, instruction: str) -> None:
    """Persist an evolved instruction for a named agent.

    Args:
        name: The unique identifier for the agent.
        instruction: The new instruction text to persist.

    Raises:
        KeyError: If no agent with the given name exists.
        ValueError: If name is empty or invalid.
        IOError: If persistence fails (implementation-specific).

    Examples:
        Save an evolved instruction:

        ```python
        provider.save_instruction(
            "my_agent", "You are a helpful assistant specialized in coding."
        )
        ```

    Note:
        Only after successful persistence should subsequent calls to
        get_agent() return an agent with the updated instruction.
        Implementations should validate empty names before processing.
    """
    ...

list_agents

list_agents() -> list[str]

List all available agent names.

RETURNS DESCRIPTION
list[str]

A list of agent name strings. Empty list if no agents.

Examples:

Discover available agents:

provider = MyAgentProvider()
for name in provider.list_agents():
    print(f"Found agent: {name}")
Note

Ordering of returned names is not guaranteed by the protocol. Some implementations may return names in a specific order, but callers should not rely upon any particular sequence.

Source code in src/gepa_adk/ports/agent_provider.py
def list_agents(self) -> list[str]:
    """List all available agent names.

    Returns:
        A list of agent name strings. Empty list if no agents.

    Examples:
        Discover available agents:

        ```python
        provider = MyAgentProvider()
        for name in provider.list_agents():
            print(f"Found agent: {name}")
        ```

    Note:
        Ordering of returned names is not guaranteed by the protocol.
        Some implementations may return names in a specific order,
        but callers should not rely upon any particular sequence.
    """
    ...

ComponentHandler

Bases: Protocol


              flowchart TD
              gepa_adk.ports.ComponentHandler[ComponentHandler]

              

              click gepa_adk.ports.ComponentHandler href "" "gepa_adk.ports.ComponentHandler"
            

Protocol for component serialization and application.

Handles the serialize/apply/restore cycle for one component type during evolution. Implementations must be stateless and thread-safe.

The protocol defines three operations: 1. serialize: Extract current component value as string 2. apply: Set new value, return original for restoration 3. restore: Reinstate original value after evaluation

Examples:

Implement a custom handler:

class TemperatureHandler:
    def serialize(self, agent: LlmAgent) -> str:
        config = getattr(agent, "generate_content_config", None)
        if config and hasattr(config, "temperature"):
            return str(config.temperature)
        return "1.0"

    def apply(self, agent: LlmAgent, value: str) -> Any:
        config = getattr(agent, "generate_content_config", None)
        original = config.temperature if config else 1.0
        if config:
            config.temperature = float(value)
        return original

    def restore(self, agent: LlmAgent, original: Any) -> None:
        config = getattr(agent, "generate_content_config", None)
        if config:
            config.temperature = original
See Also
Note

All methods are synchronous - no I/O operations should be performed. Apply() should log warnings and keep the original value rather than raising exceptions on invalid inputs for error safety.

Source code in src/gepa_adk/ports/component_handler.py
@runtime_checkable
class ComponentHandler(Protocol):
    """Protocol for component serialization and application.

    Handles the serialize/apply/restore cycle for one component type during
    evolution. Implementations must be stateless and thread-safe.

    The protocol defines three operations:
    1. **serialize**: Extract current component value as string
    2. **apply**: Set new value, return original for restoration
    3. **restore**: Reinstate original value after evaluation

    Examples:
        Implement a custom handler:

        ```python
        class TemperatureHandler:
            def serialize(self, agent: LlmAgent) -> str:
                config = getattr(agent, "generate_content_config", None)
                if config and hasattr(config, "temperature"):
                    return str(config.temperature)
                return "1.0"

            def apply(self, agent: LlmAgent, value: str) -> Any:
                config = getattr(agent, "generate_content_config", None)
                original = config.temperature if config else 1.0
                if config:
                    config.temperature = float(value)
                return original

            def restore(self, agent: LlmAgent, original: Any) -> None:
                config = getattr(agent, "generate_content_config", None)
                if config:
                    config.temperature = original
        ```

    See Also:
        - [`InstructionHandler`][gepa_adk.adapters.component_handlers.InstructionHandler]:
            Handler for agent instruction.
        - [`OutputSchemaHandler`][gepa_adk.adapters.component_handlers.OutputSchemaHandler]:
            Handler for agent output schema.

    Note:
        All methods are synchronous - no I/O operations should be performed.
        Apply() should log warnings and keep the original value rather than
        raising exceptions on invalid inputs for error safety.
    """

    def serialize(self, agent: "LlmAgent") -> str:
        """Extract component value from agent as string for evolution.

        Args:
            agent: The LlmAgent instance to extract component from.

        Returns:
            String representation of the component value.
            Returns empty string if component is not set.

        Examples:
            ```python
            handler = InstructionHandler()
            text = handler.serialize(agent)
            # text == "You are a helpful assistant."
            ```

        Note:
            Operations must never raise exceptions for missing values.
            Return empty string or sensible default instead.
        """
        ...

    def apply(self, agent: "LlmAgent", value: str) -> Any:
        """Apply evolved value to agent, return original for restore.

        Args:
            agent: The LlmAgent instance to modify.
            value: The new component value as string.

        Returns:
            The original component value (type depends on component).
            This value will be passed to restore() later.

        Examples:
            ```python
            handler = InstructionHandler()
            original = handler.apply(agent, "New instruction")
            # agent.instruction is now "New instruction"
            # original contains previous instruction value
            ```

        Note:
            On application failure (e.g., invalid schema), log warning
            and return original without modifying agent. Never raise
            exceptions - graceful degradation is required.
        """
        ...

    def restore(self, agent: "LlmAgent", original: Any) -> None:
        """Restore original value after evaluation.

        Args:
            agent: The LlmAgent instance to restore.
            original: The original value returned by apply().

        Examples:
            ```python
            handler = InstructionHandler()
            original = handler.apply(agent, "Temp instruction")
            # ... run evaluation ...
            handler.restore(agent, original)
            # agent.instruction is back to original value
            ```

        Note:
            Original value restoration always succeeds - never raises exceptions.
            None values reset to component default.
        """
        ...

serialize

serialize(agent: 'LlmAgent') -> str

Extract component value from agent as string for evolution.

PARAMETER DESCRIPTION
agent

The LlmAgent instance to extract component from.

TYPE: 'LlmAgent'

RETURNS DESCRIPTION
str

String representation of the component value.

str

Returns empty string if component is not set.

Examples:

handler = InstructionHandler()
text = handler.serialize(agent)
# text == "You are a helpful assistant."
Note

Operations must never raise exceptions for missing values. Return empty string or sensible default instead.

Source code in src/gepa_adk/ports/component_handler.py
def serialize(self, agent: "LlmAgent") -> str:
    """Extract component value from agent as string for evolution.

    Args:
        agent: The LlmAgent instance to extract component from.

    Returns:
        String representation of the component value.
        Returns empty string if component is not set.

    Examples:
        ```python
        handler = InstructionHandler()
        text = handler.serialize(agent)
        # text == "You are a helpful assistant."
        ```

    Note:
        Operations must never raise exceptions for missing values.
        Return empty string or sensible default instead.
    """
    ...

apply

apply(agent: 'LlmAgent', value: str) -> Any

Apply evolved value to agent, return original for restore.

PARAMETER DESCRIPTION
agent

The LlmAgent instance to modify.

TYPE: 'LlmAgent'

value

The new component value as string.

TYPE: str

RETURNS DESCRIPTION
Any

The original component value (type depends on component).

Any

This value will be passed to restore() later.

Examples:

handler = InstructionHandler()
original = handler.apply(agent, "New instruction")
# agent.instruction is now "New instruction"
# original contains previous instruction value
Note

On application failure (e.g., invalid schema), log warning and return original without modifying agent. Never raise exceptions - graceful degradation is required.

Source code in src/gepa_adk/ports/component_handler.py
def apply(self, agent: "LlmAgent", value: str) -> Any:
    """Apply evolved value to agent, return original for restore.

    Args:
        agent: The LlmAgent instance to modify.
        value: The new component value as string.

    Returns:
        The original component value (type depends on component).
        This value will be passed to restore() later.

    Examples:
        ```python
        handler = InstructionHandler()
        original = handler.apply(agent, "New instruction")
        # agent.instruction is now "New instruction"
        # original contains previous instruction value
        ```

    Note:
        On application failure (e.g., invalid schema), log warning
        and return original without modifying agent. Never raise
        exceptions - graceful degradation is required.
    """
    ...

restore

restore(agent: 'LlmAgent', original: Any) -> None

Restore original value after evaluation.

PARAMETER DESCRIPTION
agent

The LlmAgent instance to restore.

TYPE: 'LlmAgent'

original

The original value returned by apply().

TYPE: Any

Examples:

handler = InstructionHandler()
original = handler.apply(agent, "Temp instruction")
# ... run evaluation ...
handler.restore(agent, original)
# agent.instruction is back to original value
Note

Original value restoration always succeeds - never raises exceptions. None values reset to component default.

Source code in src/gepa_adk/ports/component_handler.py
def restore(self, agent: "LlmAgent", original: Any) -> None:
    """Restore original value after evaluation.

    Args:
        agent: The LlmAgent instance to restore.
        original: The original value returned by apply().

    Examples:
        ```python
        handler = InstructionHandler()
        original = handler.apply(agent, "Temp instruction")
        # ... run evaluation ...
        handler.restore(agent, original)
        # agent.instruction is back to original value
        ```

    Note:
        Original value restoration always succeeds - never raises exceptions.
        None values reset to component default.
    """
    ...

ProposerProtocol

Bases: Protocol


              flowchart TD
              gepa_adk.ports.ProposerProtocol[ProposerProtocol]

              

              click gepa_adk.ports.ProposerProtocol href "" "gepa_adk.ports.ProposerProtocol"
            

Protocol for candidate proposal strategies.

Proposers generate new candidates for evolution. The two main implementations are mutation-based (reflective improvement) and merge-based (genetic crossover).

Note

Attributes are not required by this protocol. Implementations may define configuration attributes as needed.

Examples:

class MyProposer:
    async def propose(
        self,
        state: ParetoState,
        eval_batch: EvaluationBatch | None = None,
    ) -> ProposalResult | None:
        # Generate proposal
        return ProposalResult(candidate=..., parent_indices=[...], tag="custom")
Note

All proposers return None when no valid proposal can be generated.

Source code in src/gepa_adk/ports/proposer.py
@runtime_checkable
class ProposerProtocol(Protocol):
    """Protocol for candidate proposal strategies.

    Proposers generate new candidates for evolution. The two main implementations
    are mutation-based (reflective improvement) and merge-based (genetic crossover).

    Note:
        Attributes are not required by this protocol. Implementations may define
        configuration attributes as needed.

    Examples:
        ```python
        class MyProposer:
            async def propose(
                self,
                state: ParetoState,
                eval_batch: EvaluationBatch | None = None,
            ) -> ProposalResult | None:
                # Generate proposal
                return ProposalResult(candidate=..., parent_indices=[...], tag="custom")
        ```

    Note:
        All proposers return None when no valid proposal can be generated.
    """

    async def propose(
        self,
        state: ParetoState,
        eval_batch: EvaluationBatch | None = None,
    ) -> ProposalResult | None:
        """Propose a new candidate based on current evolution state.

        Args:
            state (ParetoState): Current Pareto state with candidates and frontier.
                Contains the current evolution state including all discovered candidates,
                their scores, and the Pareto frontier for selection.
            eval_batch (EvaluationBatch | None): Optional evaluation batch for reflective proposals.
                Used by mutation-based proposers to generate reflective datasets. Ignored
                by merge-based proposers which operate on existing candidates.

        Returns:
            ProposalResult | None: ProposalResult containing the proposed candidate and metadata,
            or None if no proposal is possible (e.g., no suitable candidates, insufficient
            frontier, or proposal generation failed).

        Examples:
            Implementing a custom proposer:

            ```python
            class MyProposer:
                async def propose(
                    self,
                    state: ParetoState,
                    eval_batch: EvaluationBatch | None = None,
                ) -> ProposalResult | None:
                    # Generate proposal logic
                    return ProposalResult(
                        candidate=Candidate(components={"instruction": "..."}),
                        parent_indices=[5],
                        tag="mutation",
                    )
            ```

        Note:
            Operations must be idempotent and not modify state. The method should
            be safe to call multiple times with the same state without side effects.
        """
        ...

propose async

propose(
    state: ParetoState,
    eval_batch: EvaluationBatch | None = None,
) -> ProposalResult | None

Propose a new candidate based on current evolution state.

PARAMETER DESCRIPTION
state

Current Pareto state with candidates and frontier. Contains the current evolution state including all discovered candidates, their scores, and the Pareto frontier for selection.

TYPE: ParetoState

eval_batch

Optional evaluation batch for reflective proposals. Used by mutation-based proposers to generate reflective datasets. Ignored by merge-based proposers which operate on existing candidates.

TYPE: EvaluationBatch | None DEFAULT: None

RETURNS DESCRIPTION
ProposalResult | None

ProposalResult | None: ProposalResult containing the proposed candidate and metadata,

ProposalResult | None

or None if no proposal is possible (e.g., no suitable candidates, insufficient

ProposalResult | None

frontier, or proposal generation failed).

Examples:

Implementing a custom proposer:

class MyProposer:
    async def propose(
        self,
        state: ParetoState,
        eval_batch: EvaluationBatch | None = None,
    ) -> ProposalResult | None:
        # Generate proposal logic
        return ProposalResult(
            candidate=Candidate(components={"instruction": "..."}),
            parent_indices=[5],
            tag="mutation",
        )
Note

Operations must be idempotent and not modify state. The method should be safe to call multiple times with the same state without side effects.

Source code in src/gepa_adk/ports/proposer.py
async def propose(
    self,
    state: ParetoState,
    eval_batch: EvaluationBatch | None = None,
) -> ProposalResult | None:
    """Propose a new candidate based on current evolution state.

    Args:
        state (ParetoState): Current Pareto state with candidates and frontier.
            Contains the current evolution state including all discovered candidates,
            their scores, and the Pareto frontier for selection.
        eval_batch (EvaluationBatch | None): Optional evaluation batch for reflective proposals.
            Used by mutation-based proposers to generate reflective datasets. Ignored
            by merge-based proposers which operate on existing candidates.

    Returns:
        ProposalResult | None: ProposalResult containing the proposed candidate and metadata,
        or None if no proposal is possible (e.g., no suitable candidates, insufficient
        frontier, or proposal generation failed).

    Examples:
        Implementing a custom proposer:

        ```python
        class MyProposer:
            async def propose(
                self,
                state: ParetoState,
                eval_batch: EvaluationBatch | None = None,
            ) -> ProposalResult | None:
                # Generate proposal logic
                return ProposalResult(
                    candidate=Candidate(components={"instruction": "..."}),
                    parent_indices=[5],
                    tag="mutation",
                )
        ```

    Note:
        Operations must be idempotent and not modify state. The method should
        be safe to call multiple times with the same state without side effects.
    """
    ...

Scorer

Bases: Protocol


              flowchart TD
              gepa_adk.ports.Scorer[Scorer]

              

              click gepa_adk.ports.Scorer href "" "gepa_adk.ports.Scorer"
            

Protocol for scoring agent outputs.

Implementations provide scoring logic that evaluates how well an agent's output matches expected results or quality criteria.

Both synchronous and asynchronous methods are defined. Implementations should provide both, though callers may use only one based on context.

Examples:

Implement a simple fixed scorer for testing:

class FixedScorer:
    def score(
        self,
        input_text: str,
        output: str,
        expected: str | None = None,
    ) -> tuple[float, dict]:
        return 0.5, {"note": "Fixed score for testing"}

    async def async_score(
        self,
        input_text: str,
        output: str,
        expected: str | None = None,
    ) -> tuple[float, dict]:
        return self.score(input_text, output, expected)

Verify protocol compliance:

from gepa_adk.ports import Scorer

scorer = FixedScorer()
assert isinstance(scorer, Scorer)  # Runtime check works
Note

All implementations must provide both score() and async_score() methods to satisfy the protocol. Score values should be normalized between 0.0 and 1.0 by convention, with higher values indicating better performance. The protocol does not enforce this range.

Source code in src/gepa_adk/ports/scorer.py
@runtime_checkable
class Scorer(Protocol):
    """Protocol for scoring agent outputs.

    Implementations provide scoring logic that evaluates how well
    an agent's output matches expected results or quality criteria.

    Both synchronous and asynchronous methods are defined. Implementations
    should provide both, though callers may use only one based on context.

    Examples:
        Implement a simple fixed scorer for testing:

        ```python
        class FixedScorer:
            def score(
                self,
                input_text: str,
                output: str,
                expected: str | None = None,
            ) -> tuple[float, dict]:
                return 0.5, {"note": "Fixed score for testing"}

            async def async_score(
                self,
                input_text: str,
                output: str,
                expected: str | None = None,
            ) -> tuple[float, dict]:
                return self.score(input_text, output, expected)
        ```

        Verify protocol compliance:

        ```python
        from gepa_adk.ports import Scorer

        scorer = FixedScorer()
        assert isinstance(scorer, Scorer)  # Runtime check works
        ```

    Note:
        All implementations must provide both score() and async_score()
        methods to satisfy the protocol. Score values should be normalized
        between 0.0 and 1.0 by convention, with higher values indicating
        better performance. The protocol does not enforce this range.
    """

    def score(
        self,
        input_text: str,
        output: str,
        expected: str | None = None,
    ) -> tuple[float, dict[str, Any]]:
        """Score an agent output synchronously.

        Args:
            input_text: The input provided to the agent.
            output: The agent's generated output to score.
            expected: Optional expected/reference output for comparison.
                Pass None for open-ended evaluation without expected output.

        Returns:
            A tuple of (score, metadata) where:

            - score: Float value, conventionally 0.0-1.0, higher is better
            - metadata: Dict with arbitrary scoring details (e.g., feedback,
                dimension_scores, reasoning). Should be JSON-serializable.

        Examples:
            Basic usage:

            ```python
            score, meta = scorer.score("What is 2+2?", "4", "4")
            assert score == 1.0
            assert meta.get("exact_match") is True
            ```

            Scoring without expected output:

            ```python
            score, meta = scorer.score("Explain gravity", response)
            # Scorer evaluates based on quality criteria, not exact match
            ```

        Note:
            Operations complete synchronously and block until scoring finishes.
            Use async_score() for I/O-bound operations like LLM calls.
        """
        ...

    async def async_score(
        self,
        input_text: str,
        output: str,
        expected: str | None = None,
    ) -> tuple[float, dict[str, Any]]:
        """Score an agent output asynchronously.

        Args:
            input_text: The input provided to the agent.
            output: The agent's generated output to score.
            expected: Optional expected/reference output for comparison.
                Pass None for open-ended evaluation without expected output.

        Returns:
            A tuple of (score, metadata) where:

            - score: Float value, conventionally 0.0-1.0, higher is better
            - metadata: Dict with arbitrary scoring details (e.g., feedback,
                dimension_scores, reasoning). Should be JSON-serializable.

        Examples:
            Async usage:

            ```python
            score, meta = await scorer.async_score("Explain gravity", response)
            print(f"Quality: {score:.2f} - {meta.get('feedback')}")
            ```

            Concurrent scoring:

            ```python
            import asyncio

            tasks = [
                scorer.async_score(input, output, expected)
                for input, output, expected in batch
            ]
            scores = await asyncio.gather(*tasks)
            ```

        Note:
            Operations run asynchronously and can be executed concurrently.
            Prefer this method for I/O-bound scoring operations such as
            LLM-based evaluation or external API calls.
        """
        ...

score

score(
    input_text: str,
    output: str,
    expected: str | None = None,
) -> tuple[float, dict[str, Any]]

Score an agent output synchronously.

PARAMETER DESCRIPTION
input_text

The input provided to the agent.

TYPE: str

output

The agent's generated output to score.

TYPE: str

expected

Optional expected/reference output for comparison. Pass None for open-ended evaluation without expected output.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
float

A tuple of (score, metadata) where:

dict[str, Any]
  • score: Float value, conventionally 0.0-1.0, higher is better
tuple[float, dict[str, Any]]
  • metadata: Dict with arbitrary scoring details (e.g., feedback, dimension_scores, reasoning). Should be JSON-serializable.

Examples:

Basic usage:

score, meta = scorer.score("What is 2+2?", "4", "4")
assert score == 1.0
assert meta.get("exact_match") is True

Scoring without expected output:

score, meta = scorer.score("Explain gravity", response)
# Scorer evaluates based on quality criteria, not exact match
Note

Operations complete synchronously and block until scoring finishes. Use async_score() for I/O-bound operations like LLM calls.

Source code in src/gepa_adk/ports/scorer.py
def score(
    self,
    input_text: str,
    output: str,
    expected: str | None = None,
) -> tuple[float, dict[str, Any]]:
    """Score an agent output synchronously.

    Args:
        input_text: The input provided to the agent.
        output: The agent's generated output to score.
        expected: Optional expected/reference output for comparison.
            Pass None for open-ended evaluation without expected output.

    Returns:
        A tuple of (score, metadata) where:

        - score: Float value, conventionally 0.0-1.0, higher is better
        - metadata: Dict with arbitrary scoring details (e.g., feedback,
            dimension_scores, reasoning). Should be JSON-serializable.

    Examples:
        Basic usage:

        ```python
        score, meta = scorer.score("What is 2+2?", "4", "4")
        assert score == 1.0
        assert meta.get("exact_match") is True
        ```

        Scoring without expected output:

        ```python
        score, meta = scorer.score("Explain gravity", response)
        # Scorer evaluates based on quality criteria, not exact match
        ```

    Note:
        Operations complete synchronously and block until scoring finishes.
        Use async_score() for I/O-bound operations like LLM calls.
    """
    ...

async_score async

async_score(
    input_text: str,
    output: str,
    expected: str | None = None,
) -> tuple[float, dict[str, Any]]

Score an agent output asynchronously.

PARAMETER DESCRIPTION
input_text

The input provided to the agent.

TYPE: str

output

The agent's generated output to score.

TYPE: str

expected

Optional expected/reference output for comparison. Pass None for open-ended evaluation without expected output.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
float

A tuple of (score, metadata) where:

dict[str, Any]
  • score: Float value, conventionally 0.0-1.0, higher is better
tuple[float, dict[str, Any]]
  • metadata: Dict with arbitrary scoring details (e.g., feedback, dimension_scores, reasoning). Should be JSON-serializable.

Examples:

Async usage:

score, meta = await scorer.async_score("Explain gravity", response)
print(f"Quality: {score:.2f} - {meta.get('feedback')}")

Concurrent scoring:

import asyncio

tasks = [
    scorer.async_score(input, output, expected)
    for input, output, expected in batch
]
scores = await asyncio.gather(*tasks)
Note

Operations run asynchronously and can be executed concurrently. Prefer this method for I/O-bound scoring operations such as LLM-based evaluation or external API calls.

Source code in src/gepa_adk/ports/scorer.py
async def async_score(
    self,
    input_text: str,
    output: str,
    expected: str | None = None,
) -> tuple[float, dict[str, Any]]:
    """Score an agent output asynchronously.

    Args:
        input_text: The input provided to the agent.
        output: The agent's generated output to score.
        expected: Optional expected/reference output for comparison.
            Pass None for open-ended evaluation without expected output.

    Returns:
        A tuple of (score, metadata) where:

        - score: Float value, conventionally 0.0-1.0, higher is better
        - metadata: Dict with arbitrary scoring details (e.g., feedback,
            dimension_scores, reasoning). Should be JSON-serializable.

    Examples:
        Async usage:

        ```python
        score, meta = await scorer.async_score("Explain gravity", response)
        print(f"Quality: {score:.2f} - {meta.get('feedback')}")
        ```

        Concurrent scoring:

        ```python
        import asyncio

        tasks = [
            scorer.async_score(input, output, expected)
            for input, output, expected in batch
        ]
        scores = await asyncio.gather(*tasks)
        ```

    Note:
        Operations run asynchronously and can be executed concurrently.
        Prefer this method for I/O-bound scoring operations such as
        LLM-based evaluation or external API calls.
    """
    ...

CandidateSelectorProtocol

Bases: Protocol


              flowchart TD
              gepa_adk.ports.CandidateSelectorProtocol[CandidateSelectorProtocol]

              

              click gepa_adk.ports.CandidateSelectorProtocol href "" "gepa_adk.ports.CandidateSelectorProtocol"
            

Async protocol for candidate selection strategies.

Note

Adapters implementing this protocol provide strategies for selecting candidates from the Pareto frontier for mutation.

Examples:

class Selector:
    async def select_candidate(self, state: ParetoState) -> int:
        return 0
Source code in src/gepa_adk/ports/selector.py
@runtime_checkable
class CandidateSelectorProtocol(Protocol):
    """Async protocol for candidate selection strategies.

    Note:
        Adapters implementing this protocol provide strategies for selecting
        candidates from the Pareto frontier for mutation.

    Examples:
        ```python
        class Selector:
            async def select_candidate(self, state: ParetoState) -> int:
                return 0
        ```
    """

    async def select_candidate(self, state: ParetoState) -> int:
        """Select a candidate index for mutation.

        Args:
            state: Current evolution state with Pareto frontier tracking.

        Returns:
            Index of selected candidate.

        Raises:
            NoCandidateAvailableError: If state has no candidates.

        Note:
            Outputs a candidate index from the frontier for mutation,
            enabling Pareto-aware selection strategies.

        Examples:
            ```python
            candidate_idx = await selector.select_candidate(state)
            ```
        """
        ...

select_candidate async

select_candidate(state: ParetoState) -> int

Select a candidate index for mutation.

PARAMETER DESCRIPTION
state

Current evolution state with Pareto frontier tracking.

TYPE: ParetoState

RETURNS DESCRIPTION
int

Index of selected candidate.

RAISES DESCRIPTION
NoCandidateAvailableError

If state has no candidates.

Note

Outputs a candidate index from the frontier for mutation, enabling Pareto-aware selection strategies.

Examples:

candidate_idx = await selector.select_candidate(state)
Source code in src/gepa_adk/ports/selector.py
async def select_candidate(self, state: ParetoState) -> int:
    """Select a candidate index for mutation.

    Args:
        state: Current evolution state with Pareto frontier tracking.

    Returns:
        Index of selected candidate.

    Raises:
        NoCandidateAvailableError: If state has no candidates.

    Note:
        Outputs a candidate index from the frontier for mutation,
        enabling Pareto-aware selection strategies.

    Examples:
        ```python
        candidate_idx = await selector.select_candidate(state)
        ```
    """
    ...

ComponentSelectorProtocol

Bases: Protocol


              flowchart TD
              gepa_adk.ports.ComponentSelectorProtocol[ComponentSelectorProtocol]

              

              click gepa_adk.ports.ComponentSelectorProtocol href "" "gepa_adk.ports.ComponentSelectorProtocol"
            

Async protocol for component selection strategies.

Note

Adapters implementing this protocol determine which candidate components to update during mutation, enabling flexible evolution strategies.

Examples:

class MySelector:
    async def select_components(
        self, components: list[str], iteration: int, candidate_idx: int
    ) -> list[str]:
        return components[:1]
Source code in src/gepa_adk/ports/selector.py
@runtime_checkable
class ComponentSelectorProtocol(Protocol):
    """Async protocol for component selection strategies.

    Note:
        Adapters implementing this protocol determine which candidate components
        to update during mutation, enabling flexible evolution strategies.

    Examples:
        ```python
        class MySelector:
            async def select_components(
                self, components: list[str], iteration: int, candidate_idx: int
            ) -> list[str]:
                return components[:1]
        ```
    """

    async def select_components(
        self, components: list[str], iteration: int, candidate_idx: int
    ) -> list[str]:
        """Select components to update for the current iteration.

        Args:
            components: List of available component keys (e.g. ["instruction", "input_schema"]).
            iteration: Current global iteration number (0-based).
            candidate_idx: Index of the candidate being evolved.

        Returns:
            List of component keys to update.

        Raises:
            ValueError: If components list is empty.

        Note:
            Outputs a list of component keys to update, enabling selective
            mutation of specific candidate components.

        Examples:
            ```python
            selected = await selector.select_components(
                components=["instruction", "schema"], iteration=1, candidate_idx=0
            )
            ```
        """
        ...

select_components async

select_components(
    components: list[str],
    iteration: int,
    candidate_idx: int,
) -> list[str]

Select components to update for the current iteration.

PARAMETER DESCRIPTION
components

List of available component keys (e.g. ["instruction", "input_schema"]).

TYPE: list[str]

iteration

Current global iteration number (0-based).

TYPE: int

candidate_idx

Index of the candidate being evolved.

TYPE: int

RETURNS DESCRIPTION
list[str]

List of component keys to update.

RAISES DESCRIPTION
ValueError

If components list is empty.

Note

Outputs a list of component keys to update, enabling selective mutation of specific candidate components.

Examples:

selected = await selector.select_components(
    components=["instruction", "schema"], iteration=1, candidate_idx=0
)
Source code in src/gepa_adk/ports/selector.py
async def select_components(
    self, components: list[str], iteration: int, candidate_idx: int
) -> list[str]:
    """Select components to update for the current iteration.

    Args:
        components: List of available component keys (e.g. ["instruction", "input_schema"]).
        iteration: Current global iteration number (0-based).
        candidate_idx: Index of the candidate being evolved.

    Returns:
        List of component keys to update.

    Raises:
        ValueError: If components list is empty.

    Note:
        Outputs a list of component keys to update, enabling selective
        mutation of specific candidate components.

    Examples:
        ```python
        selected = await selector.select_components(
            components=["instruction", "schema"], iteration=1, candidate_idx=0
        )
        ```
    """
    ...

EvaluationPolicyProtocol

Bases: Protocol


              flowchart TD
              gepa_adk.ports.EvaluationPolicyProtocol[EvaluationPolicyProtocol]

              

              click gepa_adk.ports.EvaluationPolicyProtocol href "" "gepa_adk.ports.EvaluationPolicyProtocol"
            

Protocol for valset evaluation strategies.

Determines which validation examples to evaluate per iteration and how to identify the best candidate based on evaluation results.

Note

Adapters implementing this protocol control evaluation cost and candidate selection strategies for scalable evolution runs.

Examples:

class MyPolicy:
    def get_eval_batch(
        self, valset_ids: Sequence[int], state: ParetoState
    ) -> list[int]:
        return list(valset_ids)

    def get_best_candidate(self, state: ParetoState) -> int:
        return 0

    def get_valset_score(self, candidate_idx: int, state: ParetoState) -> float:
        return 0.5
Source code in src/gepa_adk/ports/selector.py
@runtime_checkable
class EvaluationPolicyProtocol(Protocol):
    """Protocol for valset evaluation strategies.

    Determines which validation examples to evaluate per iteration
    and how to identify the best candidate based on evaluation results.

    Note:
        Adapters implementing this protocol control evaluation cost and
        candidate selection strategies for scalable evolution runs.

    Examples:
        ```python
        class MyPolicy:
            def get_eval_batch(
                self, valset_ids: Sequence[int], state: ParetoState
            ) -> list[int]:
                return list(valset_ids)

            def get_best_candidate(self, state: ParetoState) -> int:
                return 0

            def get_valset_score(self, candidate_idx: int, state: ParetoState) -> float:
                return 0.5
        ```
    """

    def get_eval_batch(
        self,
        valset_ids: Sequence[int],
        state: ParetoState,
        target_candidate_idx: int | None = None,
    ) -> list[int]:
        """Select validation example indices to evaluate.

        Args:
            valset_ids: All available validation example indices (0 to N-1).
            state: Current evolution state with candidate scores.
            target_candidate_idx: Optional candidate being evaluated (for adaptive policies).

        Returns:
            List of example indices to evaluate this iteration.
            Must be a subset of valset_ids (or equal).

        Note:
            Outputs a list of valset indices to evaluate, which may be a subset
            or the full valset depending on the policy implementation.

        Examples:
            ```python
            batch = policy.get_eval_batch([0, 1, 2, 3, 4], state)
            # Returns: [0, 1, 2, 3, 4] for full evaluation
            ```
        """
        ...

    def get_best_candidate(self, state: ParetoState) -> int:
        """Return index of best candidate based on evaluation results.

        Args:
            state: Current evolution state with candidate_scores populated.

        Returns:
            Index of best performing candidate.

        Raises:
            NoCandidateAvailableError: If state has no candidates.

        Note:
            Outputs the index of the best candidate based on the policy's
            scoring strategy (e.g., highest average score).

        Examples:
            ```python
            best_idx = policy.get_best_candidate(state)
            ```
        """
        ...

    def get_valset_score(self, candidate_idx: int, state: ParetoState) -> float:
        """Return aggregated valset score for a candidate.

        Args:
            candidate_idx: Index of candidate to score.
            state: Current evolution state.

        Returns:
            Aggregated score (typically mean across evaluated examples).
            Returns float('-inf') if candidate has no evaluated scores.

        Note:
            Outputs an aggregated score for the candidate, typically the mean
            across evaluated examples, or negative infinity if no scores exist.

        Examples:
            ```python
            score = policy.get_valset_score(0, state)
            ```
        """
        ...

get_eval_batch

get_eval_batch(
    valset_ids: Sequence[int],
    state: ParetoState,
    target_candidate_idx: int | None = None,
) -> list[int]

Select validation example indices to evaluate.

PARAMETER DESCRIPTION
valset_ids

All available validation example indices (0 to N-1).

TYPE: Sequence[int]

state

Current evolution state with candidate scores.

TYPE: ParetoState

target_candidate_idx

Optional candidate being evaluated (for adaptive policies).

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
list[int]

List of example indices to evaluate this iteration.

list[int]

Must be a subset of valset_ids (or equal).

Note

Outputs a list of valset indices to evaluate, which may be a subset or the full valset depending on the policy implementation.

Examples:

batch = policy.get_eval_batch([0, 1, 2, 3, 4], state)
# Returns: [0, 1, 2, 3, 4] for full evaluation
Source code in src/gepa_adk/ports/selector.py
def get_eval_batch(
    self,
    valset_ids: Sequence[int],
    state: ParetoState,
    target_candidate_idx: int | None = None,
) -> list[int]:
    """Select validation example indices to evaluate.

    Args:
        valset_ids: All available validation example indices (0 to N-1).
        state: Current evolution state with candidate scores.
        target_candidate_idx: Optional candidate being evaluated (for adaptive policies).

    Returns:
        List of example indices to evaluate this iteration.
        Must be a subset of valset_ids (or equal).

    Note:
        Outputs a list of valset indices to evaluate, which may be a subset
        or the full valset depending on the policy implementation.

    Examples:
        ```python
        batch = policy.get_eval_batch([0, 1, 2, 3, 4], state)
        # Returns: [0, 1, 2, 3, 4] for full evaluation
        ```
    """
    ...

get_best_candidate

get_best_candidate(state: ParetoState) -> int

Return index of best candidate based on evaluation results.

PARAMETER DESCRIPTION
state

Current evolution state with candidate_scores populated.

TYPE: ParetoState

RETURNS DESCRIPTION
int

Index of best performing candidate.

RAISES DESCRIPTION
NoCandidateAvailableError

If state has no candidates.

Note

Outputs the index of the best candidate based on the policy's scoring strategy (e.g., highest average score).

Examples:

best_idx = policy.get_best_candidate(state)
Source code in src/gepa_adk/ports/selector.py
def get_best_candidate(self, state: ParetoState) -> int:
    """Return index of best candidate based on evaluation results.

    Args:
        state: Current evolution state with candidate_scores populated.

    Returns:
        Index of best performing candidate.

    Raises:
        NoCandidateAvailableError: If state has no candidates.

    Note:
        Outputs the index of the best candidate based on the policy's
        scoring strategy (e.g., highest average score).

    Examples:
        ```python
        best_idx = policy.get_best_candidate(state)
        ```
    """
    ...

get_valset_score

get_valset_score(
    candidate_idx: int, state: ParetoState
) -> float

Return aggregated valset score for a candidate.

PARAMETER DESCRIPTION
candidate_idx

Index of candidate to score.

TYPE: int

state

Current evolution state.

TYPE: ParetoState

RETURNS DESCRIPTION
float

Aggregated score (typically mean across evaluated examples).

float

Returns float('-inf') if candidate has no evaluated scores.

Note

Outputs an aggregated score for the candidate, typically the mean across evaluated examples, or negative infinity if no scores exist.

Examples:

score = policy.get_valset_score(0, state)
Source code in src/gepa_adk/ports/selector.py
def get_valset_score(self, candidate_idx: int, state: ParetoState) -> float:
    """Return aggregated valset score for a candidate.

    Args:
        candidate_idx: Index of candidate to score.
        state: Current evolution state.

    Returns:
        Aggregated score (typically mean across evaluated examples).
        Returns float('-inf') if candidate has no evaluated scores.

    Note:
        Outputs an aggregated score for the candidate, typically the mean
        across evaluated examples, or negative infinity if no scores exist.

    Examples:
        ```python
        score = policy.get_valset_score(0, state)
        ```
    """
    ...

StopperProtocol

Bases: Protocol


              flowchart TD
              gepa_adk.ports.StopperProtocol[StopperProtocol]

              

              click gepa_adk.ports.StopperProtocol href "" "gepa_adk.ports.StopperProtocol"
            

Protocol for stop condition objects.

A stopper is a callable that returns True when evolution should stop. Stoppers receive a StopperState snapshot of current evolution progress.

The protocol uses structural typing - any callable with the correct signature satisfies it, no explicit inheritance required.

Examples:

Class-based stopper:

from gepa_adk.ports.stopper import StopperProtocol
from gepa_adk.domain.stopper import StopperState


class TimeoutStopper:
    def __init__(self, max_seconds: float) -> None:
        self.max_seconds = max_seconds

    def __call__(self, state: StopperState) -> bool:
        return state.elapsed_seconds >= self.max_seconds


stopper = TimeoutStopper(3600.0)  # 1 hour
isinstance(stopper, StopperProtocol)  # True

Function-based stopper:

def score_threshold_stopper(state: StopperState) -> bool:
    return state.best_score >= 0.95


isinstance(score_threshold_stopper, StopperProtocol)  # True
Note

All stoppers should be pure functions of their input state - they ought not have side effects or depend on external mutable state for deterministic behavior.

Source code in src/gepa_adk/ports/stopper.py
@runtime_checkable
class StopperProtocol(Protocol):
    """Protocol for stop condition objects.

    A stopper is a callable that returns True when evolution should stop.
    Stoppers receive a StopperState snapshot of current evolution progress.

    The protocol uses structural typing - any callable with the correct
    signature satisfies it, no explicit inheritance required.

    Examples:
        Class-based stopper:

        ```python
        from gepa_adk.ports.stopper import StopperProtocol
        from gepa_adk.domain.stopper import StopperState


        class TimeoutStopper:
            def __init__(self, max_seconds: float) -> None:
                self.max_seconds = max_seconds

            def __call__(self, state: StopperState) -> bool:
                return state.elapsed_seconds >= self.max_seconds


        stopper = TimeoutStopper(3600.0)  # 1 hour
        isinstance(stopper, StopperProtocol)  # True
        ```

        Function-based stopper:

        ```python
        def score_threshold_stopper(state: StopperState) -> bool:
            return state.best_score >= 0.95


        isinstance(score_threshold_stopper, StopperProtocol)  # True
        ```

    Note:
        All stoppers should be pure functions of their input state - they
        ought not have side effects or depend on external mutable state for
        deterministic behavior.
    """

    def __call__(self, state: StopperState) -> bool:
        """Check if evolution should stop.

        Args:
            state: Current evolution state snapshot containing iteration count,
                best score, stagnation counter, and other metrics.

        Returns:
            True if evolution should stop, False to continue.

        Note:
            Often this method is called after each iteration. Return True as
            soon as the stop condition is met to avoid unnecessary computation.
        """
        ...

__call__

__call__(state: StopperState) -> bool

Check if evolution should stop.

PARAMETER DESCRIPTION
state

Current evolution state snapshot containing iteration count, best score, stagnation counter, and other metrics.

TYPE: StopperState

RETURNS DESCRIPTION
bool

True if evolution should stop, False to continue.

Note

Often this method is called after each iteration. Return True as soon as the stop condition is met to avoid unnecessary computation.

Source code in src/gepa_adk/ports/stopper.py
def __call__(self, state: StopperState) -> bool:
    """Check if evolution should stop.

    Args:
        state: Current evolution state snapshot containing iteration count,
            best score, stagnation counter, and other metrics.

    Returns:
        True if evolution should stop, False to continue.

    Note:
        Often this method is called after each iteration. Return True as
        soon as the stop condition is met to avoid unnecessary computation.
    """
    ...

VideoBlobServiceProtocol

Bases: Protocol


              flowchart TD
              gepa_adk.ports.VideoBlobServiceProtocol[VideoBlobServiceProtocol]

              

              click gepa_adk.ports.VideoBlobServiceProtocol href "" "gepa_adk.ports.VideoBlobServiceProtocol"
            

Protocol for loading video files as multimodal content parts.

Implementations provide video file I/O and validation logic, converting video files to Part objects for multimodal agent input.

The protocol defines an async method for loading multiple videos and a sync method for validating individual files.

Examples:

Implement a video blob service:

class SimpleVideoBlobService:
    async def prepare_video_parts(
        self,
        video_paths: list[str],
    ) -> list[Any]:
        # Load videos and return Part objects
        parts = []
        for path in video_paths:
            self.validate_video_file(path)
            # ... load and create Part
        return parts

    def validate_video_file(
        self,
        video_path: str,
    ) -> VideoFileInfo:
        # Validate and return metadata
        ...

Verify protocol compliance:

from gepa_adk.ports import VideoBlobServiceProtocol

service = SimpleVideoBlobService()
assert isinstance(service, VideoBlobServiceProtocol)
Note

All implementations must provide both prepare_video_parts() and validate_video_file() methods. Video size is limited to 2GB per the Gemini API constraint. Only video/* MIME types are accepted.

Source code in src/gepa_adk/ports/video_blob_service.py
@runtime_checkable
class VideoBlobServiceProtocol(Protocol):
    """Protocol for loading video files as multimodal content parts.

    Implementations provide video file I/O and validation logic,
    converting video files to Part objects for multimodal agent input.

    The protocol defines an async method for loading multiple videos
    and a sync method for validating individual files.

    Examples:
        Implement a video blob service:

        ```python
        class SimpleVideoBlobService:
            async def prepare_video_parts(
                self,
                video_paths: list[str],
            ) -> list[Any]:
                # Load videos and return Part objects
                parts = []
                for path in video_paths:
                    self.validate_video_file(path)
                    # ... load and create Part
                return parts

            def validate_video_file(
                self,
                video_path: str,
            ) -> VideoFileInfo:
                # Validate and return metadata
                ...
        ```

        Verify protocol compliance:

        ```python
        from gepa_adk.ports import VideoBlobServiceProtocol

        service = SimpleVideoBlobService()
        assert isinstance(service, VideoBlobServiceProtocol)
        ```

    Note:
        All implementations must provide both prepare_video_parts() and
        validate_video_file() methods. Video size is limited to 2GB per
        the Gemini API constraint. Only video/* MIME types are accepted.
    """

    async def prepare_video_parts(
        self,
        video_paths: list[str],
    ) -> list[Any]:
        """Load video files and create Part objects for multimodal content.

        Reads video files from disk and converts them to Part objects
        suitable for inclusion in ADK Content. Files are validated before
        loading.

        Args:
            video_paths: List of absolute paths to video files. Must be
                non-empty. All paths must point to existing video files
                under 2GB with video/* MIME types.

        Returns:
            List of Part objects, one per input path. Order is preserved
            such that output[i] corresponds to video_paths[i]. Part objects
            contain inline video data with appropriate MIME type.

        Raises:
            ValueError: If video_paths is empty.
            VideoValidationError: If any file is not found, exceeds 2GB,
                or has a non-video MIME type.
            PermissionError: If any file cannot be read due to permissions.
            OSError: If any file cannot be read due to I/O errors.

        Examples:
            Load a single video:

            ```python
            parts = await service.prepare_video_parts(["/data/video.mp4"])
            assert len(parts) == 1
            ```

            Load multiple videos preserving order:

            ```python
            paths = ["/data/intro.mp4", "/data/main.mp4", "/data/outro.mp4"]
            parts = await service.prepare_video_parts(paths)
            assert len(parts) == 3
            # parts[0] is intro, parts[1] is main, parts[2] is outro
            ```

        Note:
            Operations include async file I/O for reading video bytes.
            Video files are validated before loading to provide early
            failure with clear error messages.
        """
        ...

    def validate_video_file(
        self,
        video_path: str,
    ) -> "VideoFileInfo":
        """Validate a video file and return its metadata.

        Checks that the file exists, is within size limits, and has
        a valid video MIME type. Returns metadata on success.

        Args:
            video_path: Absolute path to the video file to validate.

        Returns:
            VideoFileInfo containing validated metadata:

            - path: The validated file path
            - size_bytes: File size in bytes
            - mime_type: Detected MIME type (e.g., "video/mp4")

        Raises:
            VideoValidationError: If the file does not exist, exceeds 2GB,
                or has a non-video MIME type. The exception includes the
                video_path and constraint fields for debugging.

        Examples:
            Validate a video file:

            ```python
            info = service.validate_video_file("/data/lecture.mp4")
            print(f"Size: {info.size_bytes} bytes")
            print(f"Type: {info.mime_type}")
            ```

            Handle validation errors:

            ```python
            from gepa_adk.domain.exceptions import VideoValidationError

            try:
                info = service.validate_video_file("/missing.mp4")
            except VideoValidationError as e:
                print(f"Invalid: {e.video_path} - {e.constraint}")
            ```

        Note:
            Operates synchronously for fast pre-validation without async
            context. File content is not read, only metadata is checked.
        """
        ...

prepare_video_parts async

prepare_video_parts(video_paths: list[str]) -> list[Any]

Load video files and create Part objects for multimodal content.

Reads video files from disk and converts them to Part objects suitable for inclusion in ADK Content. Files are validated before loading.

PARAMETER DESCRIPTION
video_paths

List of absolute paths to video files. Must be non-empty. All paths must point to existing video files under 2GB with video/* MIME types.

TYPE: list[str]

RETURNS DESCRIPTION
list[Any]

List of Part objects, one per input path. Order is preserved

list[Any]

such that output[i] corresponds to video_paths[i]. Part objects

list[Any]

contain inline video data with appropriate MIME type.

RAISES DESCRIPTION
ValueError

If video_paths is empty.

VideoValidationError

If any file is not found, exceeds 2GB, or has a non-video MIME type.

PermissionError

If any file cannot be read due to permissions.

OSError

If any file cannot be read due to I/O errors.

Examples:

Load a single video:

parts = await service.prepare_video_parts(["/data/video.mp4"])
assert len(parts) == 1

Load multiple videos preserving order:

paths = ["/data/intro.mp4", "/data/main.mp4", "/data/outro.mp4"]
parts = await service.prepare_video_parts(paths)
assert len(parts) == 3
# parts[0] is intro, parts[1] is main, parts[2] is outro
Note

Operations include async file I/O for reading video bytes. Video files are validated before loading to provide early failure with clear error messages.

Source code in src/gepa_adk/ports/video_blob_service.py
async def prepare_video_parts(
    self,
    video_paths: list[str],
) -> list[Any]:
    """Load video files and create Part objects for multimodal content.

    Reads video files from disk and converts them to Part objects
    suitable for inclusion in ADK Content. Files are validated before
    loading.

    Args:
        video_paths: List of absolute paths to video files. Must be
            non-empty. All paths must point to existing video files
            under 2GB with video/* MIME types.

    Returns:
        List of Part objects, one per input path. Order is preserved
        such that output[i] corresponds to video_paths[i]. Part objects
        contain inline video data with appropriate MIME type.

    Raises:
        ValueError: If video_paths is empty.
        VideoValidationError: If any file is not found, exceeds 2GB,
            or has a non-video MIME type.
        PermissionError: If any file cannot be read due to permissions.
        OSError: If any file cannot be read due to I/O errors.

    Examples:
        Load a single video:

        ```python
        parts = await service.prepare_video_parts(["/data/video.mp4"])
        assert len(parts) == 1
        ```

        Load multiple videos preserving order:

        ```python
        paths = ["/data/intro.mp4", "/data/main.mp4", "/data/outro.mp4"]
        parts = await service.prepare_video_parts(paths)
        assert len(parts) == 3
        # parts[0] is intro, parts[1] is main, parts[2] is outro
        ```

    Note:
        Operations include async file I/O for reading video bytes.
        Video files are validated before loading to provide early
        failure with clear error messages.
    """
    ...

validate_video_file

validate_video_file(video_path: str) -> 'VideoFileInfo'

Validate a video file and return its metadata.

Checks that the file exists, is within size limits, and has a valid video MIME type. Returns metadata on success.

PARAMETER DESCRIPTION
video_path

Absolute path to the video file to validate.

TYPE: str

RETURNS DESCRIPTION
'VideoFileInfo'

VideoFileInfo containing validated metadata:

'VideoFileInfo'
  • path: The validated file path
'VideoFileInfo'
  • size_bytes: File size in bytes
'VideoFileInfo'
  • mime_type: Detected MIME type (e.g., "video/mp4")
RAISES DESCRIPTION
VideoValidationError

If the file does not exist, exceeds 2GB, or has a non-video MIME type. The exception includes the video_path and constraint fields for debugging.

Examples:

Validate a video file:

info = service.validate_video_file("/data/lecture.mp4")
print(f"Size: {info.size_bytes} bytes")
print(f"Type: {info.mime_type}")

Handle validation errors:

from gepa_adk.domain.exceptions import VideoValidationError

try:
    info = service.validate_video_file("/missing.mp4")
except VideoValidationError as e:
    print(f"Invalid: {e.video_path} - {e.constraint}")
Note

Operates synchronously for fast pre-validation without async context. File content is not read, only metadata is checked.

Source code in src/gepa_adk/ports/video_blob_service.py
def validate_video_file(
    self,
    video_path: str,
) -> "VideoFileInfo":
    """Validate a video file and return its metadata.

    Checks that the file exists, is within size limits, and has
    a valid video MIME type. Returns metadata on success.

    Args:
        video_path: Absolute path to the video file to validate.

    Returns:
        VideoFileInfo containing validated metadata:

        - path: The validated file path
        - size_bytes: File size in bytes
        - mime_type: Detected MIME type (e.g., "video/mp4")

    Raises:
        VideoValidationError: If the file does not exist, exceeds 2GB,
            or has a non-video MIME type. The exception includes the
            video_path and constraint fields for debugging.

    Examples:
        Validate a video file:

        ```python
        info = service.validate_video_file("/data/lecture.mp4")
        print(f"Size: {info.size_bytes} bytes")
        print(f"Type: {info.mime_type}")
        ```

        Handle validation errors:

        ```python
        from gepa_adk.domain.exceptions import VideoValidationError

        try:
            info = service.validate_video_file("/missing.mp4")
        except VideoValidationError as e:
            print(f"Invalid: {e.video_path} - {e.constraint}")
        ```

    Note:
        Operates synchronously for fast pre-validation without async
        context. File content is not read, only metadata is checked.
    """
    ...