Skip to content

Core API

api

Public API functions for gepa-adk evolution engine.

This module provides high-level async functions for evolving agent instructions using the GEPA (Generalized Evolutionary Prompt-programming Architecture) approach.

Note

The public API exposes evolve() and evolve_sync() as primary entry points. All async functions should be awaited. For synchronous usage in scripts or notebooks, use evolve_sync() which handles event loop management internally.

SchemaBasedScorer

Scorer that extracts scores from agent's structured output_schema.

When an agent has an output_schema, its output is structured JSON. This scorer parses that JSON and extracts a "score" field.

ATTRIBUTE DESCRIPTION
output_schema

The Pydantic BaseModel schema class from agent.output_schema. Must contain a "score" field.

TYPE: type[BaseModel]

Examples:

Basic usage:

from pydantic import BaseModel, Field
from google.adk.agents import LlmAgent
from gepa_adk.api import SchemaBasedScorer


class OutputSchema(BaseModel):
    score: float = Field(ge=0.0, le=1.0)
    result: str


agent = LlmAgent(
    name="agent",
    model="gemini-2.5-flash",
    output_schema=OutputSchema,
)

scorer = SchemaBasedScorer(output_schema=OutputSchema)
score, metadata = await scorer.async_score(
    input_text="test",
    output='{"score": 0.8, "result": "good"}',
)
Note

Adheres to Scorer protocol. Requires output_schema to have a "score" field. If score field is missing, raises MissingScoreFieldError.

Source code in src/gepa_adk/api.py
class SchemaBasedScorer:
    """Scorer that extracts scores from agent's structured output_schema.

    When an agent has an output_schema, its output is structured JSON.
    This scorer parses that JSON and extracts a "score" field.

    Attributes:
        output_schema (type[BaseModel]): The Pydantic BaseModel schema class
            from agent.output_schema. Must contain a "score" field.

    Examples:
        Basic usage:

        ```python
        from pydantic import BaseModel, Field
        from google.adk.agents import LlmAgent
        from gepa_adk.api import SchemaBasedScorer


        class OutputSchema(BaseModel):
            score: float = Field(ge=0.0, le=1.0)
            result: str


        agent = LlmAgent(
            name="agent",
            model="gemini-2.5-flash",
            output_schema=OutputSchema,
        )

        scorer = SchemaBasedScorer(output_schema=OutputSchema)
        score, metadata = await scorer.async_score(
            input_text="test",
            output='{"score": 0.8, "result": "good"}',
        )
        ```

    Note:
        Adheres to Scorer protocol. Requires output_schema to have a "score"
        field. If score field is missing, raises MissingScoreFieldError.
    """

    def __init__(self, output_schema: type[BaseModel]) -> None:
        """Initialize schema-based scorer.

        Args:
            output_schema: Pydantic BaseModel class from agent.output_schema.

        Raises:
            ConfigurationError: If output_schema doesn't have a "score" field.

        Note:
            Checks that the schema contains a "score" field during initialization.
        """
        self.output_schema = output_schema

        # Verify schema has score field
        if (
            not hasattr(output_schema, "model_fields")
            or "score" not in output_schema.model_fields
        ):
            raise ConfigurationError(
                f"output_schema {output_schema.__name__} must have a 'score' field",
                field="output_schema",
                value=output_schema.__name__,
                constraint="must have 'score' field",
            )

    def score(
        self,
        input_text: str,
        output: str,
        expected: str | None = None,
    ) -> tuple[float, dict[str, Any]]:
        """Score an agent output synchronously.

        Args:
            input_text: The input provided to the agent.
            output: The agent's structured JSON output.
            expected: Optional expected output (not used for schema-based scoring).

        Returns:
            Tuple of (score, metadata) where score is extracted from output JSON
            and metadata contains all other fields from the schema.

        Raises:
            OutputParseError: If output cannot be parsed as JSON.
            SchemaValidationError: If output doesn't match the schema.
            MissingScoreFieldError: If score field is null in parsed output.

        Examples:
            Basic scoring with JSON output:

            ```python
            scorer = SchemaBasedScorer(output_schema=MySchema)
            score, metadata = scorer.score(
                input_text="What is 2+2?",
                output='{"score": 0.9, "result": "4"}',
            )
            # score == 0.9, metadata == {"result": "4"}
            ```

        Note:
            Operates synchronously by parsing JSON and extracting the score field.
            The expected parameter is ignored for schema-based scoring.
        """
        try:
            # Parse JSON output
            parsed = json.loads(output)
            # Parse with Pydantic schema for validation
            schema_instance = self.output_schema.model_validate(parsed)

            # Extract score - schema validated in __init__ has "score" field,
            # and model_validate succeeded, so score attribute exists.
            # The value could still be None if schema allows nullable scores.
            score_value = cast(_ScoreSchema, schema_instance).score
            if score_value is None:
                raise MissingScoreFieldError(
                    f"output_schema {self.output_schema.__name__} has score=None; "
                    "score must be a numeric value",
                    parsed_output=parsed,
                )

            # Explicit cast since we've validated it's not None
            score = float(score_value)

            # Build metadata from all other fields
            metadata = schema_instance.model_dump(exclude={"score"})

            return score, metadata

        except json.JSONDecodeError as e:
            raise OutputParseError(
                f"Failed to parse output as JSON: {e}",
                raw_output=output,
                parse_error=str(e),
                cause=e,
            ) from e
        except ValidationError as e:
            raise SchemaValidationError(
                f"Output does not match schema {self.output_schema.__name__}: {e}",
                raw_output=output,
                validation_error=str(e),
                cause=e,
            ) from e

    async def async_score(
        self,
        input_text: str,
        output: str,
        expected: str | None = None,
    ) -> tuple[float, dict[str, Any]]:
        """Score an agent output asynchronously.

        Args:
            input_text: The input provided to the agent.
            output: The agent's structured JSON output.
            expected: Optional expected output (not used for schema-based scoring).

        Returns:
            Tuple of (score, metadata) where score is extracted from output JSON
            and metadata contains all other fields from the schema.

        Raises:
            OutputParseError: If output cannot be parsed as JSON.
            SchemaValidationError: If output doesn't match the schema.
            MissingScoreFieldError: If score field is null in parsed output.

        Examples:
            Async scoring with JSON output:

            ```python
            scorer = SchemaBasedScorer(output_schema=MySchema)
            score, metadata = await scorer.async_score(
                input_text="What is 2+2?",
                output='{"score": 0.9, "result": "4"}',
            )
            # score == 0.9, metadata == {"result": "4"}
            ```

        Note:
            Operates by delegating to synchronous score() since JSON parsing
            does not require async I/O operations.
        """
        # Schema-based scoring is synchronous (just JSON parsing)
        return self.score(input_text, output, expected)

__init__

__init__(output_schema: type[BaseModel]) -> None

Initialize schema-based scorer.

PARAMETER DESCRIPTION
output_schema

Pydantic BaseModel class from agent.output_schema.

TYPE: type[BaseModel]

RAISES DESCRIPTION
ConfigurationError

If output_schema doesn't have a "score" field.

Note

Checks that the schema contains a "score" field during initialization.

Source code in src/gepa_adk/api.py
def __init__(self, output_schema: type[BaseModel]) -> None:
    """Initialize schema-based scorer.

    Args:
        output_schema: Pydantic BaseModel class from agent.output_schema.

    Raises:
        ConfigurationError: If output_schema doesn't have a "score" field.

    Note:
        Checks that the schema contains a "score" field during initialization.
    """
    self.output_schema = output_schema

    # Verify schema has score field
    if (
        not hasattr(output_schema, "model_fields")
        or "score" not in output_schema.model_fields
    ):
        raise ConfigurationError(
            f"output_schema {output_schema.__name__} must have a 'score' field",
            field="output_schema",
            value=output_schema.__name__,
            constraint="must have 'score' field",
        )

score

score(
    input_text: str,
    output: str,
    expected: str | None = None,
) -> tuple[float, dict[str, Any]]

Score an agent output synchronously.

PARAMETER DESCRIPTION
input_text

The input provided to the agent.

TYPE: str

output

The agent's structured JSON output.

TYPE: str

expected

Optional expected output (not used for schema-based scoring).

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
float

Tuple of (score, metadata) where score is extracted from output JSON

dict[str, Any]

and metadata contains all other fields from the schema.

RAISES DESCRIPTION
OutputParseError

If output cannot be parsed as JSON.

SchemaValidationError

If output doesn't match the schema.

MissingScoreFieldError

If score field is null in parsed output.

Examples:

Basic scoring with JSON output:

scorer = SchemaBasedScorer(output_schema=MySchema)
score, metadata = scorer.score(
    input_text="What is 2+2?",
    output='{"score": 0.9, "result": "4"}',
)
# score == 0.9, metadata == {"result": "4"}
Note

Operates synchronously by parsing JSON and extracting the score field. The expected parameter is ignored for schema-based scoring.

Source code in src/gepa_adk/api.py
def score(
    self,
    input_text: str,
    output: str,
    expected: str | None = None,
) -> tuple[float, dict[str, Any]]:
    """Score an agent output synchronously.

    Args:
        input_text: The input provided to the agent.
        output: The agent's structured JSON output.
        expected: Optional expected output (not used for schema-based scoring).

    Returns:
        Tuple of (score, metadata) where score is extracted from output JSON
        and metadata contains all other fields from the schema.

    Raises:
        OutputParseError: If output cannot be parsed as JSON.
        SchemaValidationError: If output doesn't match the schema.
        MissingScoreFieldError: If score field is null in parsed output.

    Examples:
        Basic scoring with JSON output:

        ```python
        scorer = SchemaBasedScorer(output_schema=MySchema)
        score, metadata = scorer.score(
            input_text="What is 2+2?",
            output='{"score": 0.9, "result": "4"}',
        )
        # score == 0.9, metadata == {"result": "4"}
        ```

    Note:
        Operates synchronously by parsing JSON and extracting the score field.
        The expected parameter is ignored for schema-based scoring.
    """
    try:
        # Parse JSON output
        parsed = json.loads(output)
        # Parse with Pydantic schema for validation
        schema_instance = self.output_schema.model_validate(parsed)

        # Extract score - schema validated in __init__ has "score" field,
        # and model_validate succeeded, so score attribute exists.
        # The value could still be None if schema allows nullable scores.
        score_value = cast(_ScoreSchema, schema_instance).score
        if score_value is None:
            raise MissingScoreFieldError(
                f"output_schema {self.output_schema.__name__} has score=None; "
                "score must be a numeric value",
                parsed_output=parsed,
            )

        # Explicit cast since we've validated it's not None
        score = float(score_value)

        # Build metadata from all other fields
        metadata = schema_instance.model_dump(exclude={"score"})

        return score, metadata

    except json.JSONDecodeError as e:
        raise OutputParseError(
            f"Failed to parse output as JSON: {e}",
            raw_output=output,
            parse_error=str(e),
            cause=e,
        ) from e
    except ValidationError as e:
        raise SchemaValidationError(
            f"Output does not match schema {self.output_schema.__name__}: {e}",
            raw_output=output,
            validation_error=str(e),
            cause=e,
        ) from e

async_score async

async_score(
    input_text: str,
    output: str,
    expected: str | None = None,
) -> tuple[float, dict[str, Any]]

Score an agent output asynchronously.

PARAMETER DESCRIPTION
input_text

The input provided to the agent.

TYPE: str

output

The agent's structured JSON output.

TYPE: str

expected

Optional expected output (not used for schema-based scoring).

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
float

Tuple of (score, metadata) where score is extracted from output JSON

dict[str, Any]

and metadata contains all other fields from the schema.

RAISES DESCRIPTION
OutputParseError

If output cannot be parsed as JSON.

SchemaValidationError

If output doesn't match the schema.

MissingScoreFieldError

If score field is null in parsed output.

Examples:

Async scoring with JSON output:

scorer = SchemaBasedScorer(output_schema=MySchema)
score, metadata = await scorer.async_score(
    input_text="What is 2+2?",
    output='{"score": 0.9, "result": "4"}',
)
# score == 0.9, metadata == {"result": "4"}
Note

Operates by delegating to synchronous score() since JSON parsing does not require async I/O operations.

Source code in src/gepa_adk/api.py
async def async_score(
    self,
    input_text: str,
    output: str,
    expected: str | None = None,
) -> tuple[float, dict[str, Any]]:
    """Score an agent output asynchronously.

    Args:
        input_text: The input provided to the agent.
        output: The agent's structured JSON output.
        expected: Optional expected output (not used for schema-based scoring).

    Returns:
        Tuple of (score, metadata) where score is extracted from output JSON
        and metadata contains all other fields from the schema.

    Raises:
        OutputParseError: If output cannot be parsed as JSON.
        SchemaValidationError: If output doesn't match the schema.
        MissingScoreFieldError: If score field is null in parsed output.

    Examples:
        Async scoring with JSON output:

        ```python
        scorer = SchemaBasedScorer(output_schema=MySchema)
        score, metadata = await scorer.async_score(
            input_text="What is 2+2?",
            output='{"score": 0.9, "result": "4"}',
        )
        # score == 0.9, metadata == {"result": "4"}
        ```

    Note:
        Operates by delegating to synchronous score() since JSON parsing
        does not require async I/O operations.
    """
    # Schema-based scoring is synchronous (just JSON parsing)
    return self.score(input_text, output, expected)

evolve_group async

evolve_group(
    agents: dict[str, LlmAgent],
    primary: str,
    trainset: list[dict[str, Any]],
    components: dict[str, list[str]] | None = None,
    critic: LlmAgent | None = None,
    share_session: bool = True,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol
    | str
    | None = None,
    reflection_agent: LlmAgent | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    workflow: SequentialAgent
    | LoopAgent
    | ParallelAgent
    | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult

Evolve multiple agents together with per-agent component configuration.

Optimizes specified components for each agent by targeting the primary agent's output score. When share_session=True, agents execute sequentially with shared session state, enabling later agents to access earlier agents' outputs via template strings.

PARAMETER DESCRIPTION
agents

Named ADK agents to evolve together as dict mapping agent names to LlmAgent instances. Must have at least one agent.

TYPE: dict[str, LlmAgent]

primary

Name of the agent whose output is used for scoring. Must match one of the agent names in the dict.

TYPE: str

trainset

Training examples for evaluation. Each example should have an "input" key and optionally an "expected" key.

TYPE: list[dict[str, Any]]

components

Per-agent component configuration mapping agent names to lists of component names to evolve. If None, defaults to evolving "instruction" for all agents. Use empty list to exclude an agent from evolution. Available component names: "instruction", "output_schema", "generate_content_config".

TYPE: dict[str, list[str]] | None DEFAULT: None

critic

Optional critic agent for scoring. If None, the primary agent must have an output_schema for schema-based scoring.

TYPE: LlmAgent | None DEFAULT: None

share_session

Whether agents share session state during execution. When True (default), uses SequentialAgent. When False, agents execute with isolated sessions.

TYPE: bool DEFAULT: True

config

Evolution configuration. If None, uses EvolutionConfig defaults.

TYPE: EvolutionConfig | None DEFAULT: None

state_guard

Optional StateGuard instance for validating and repairing state injection tokens in evolved instructions.

TYPE: StateGuard | None DEFAULT: None

component_selector

Optional selector instance or selector name for choosing which components to update.

TYPE: ComponentSelectorProtocol | str | None DEFAULT: None

reflection_agent

Optional ADK agent for proposals. If None, creates a default reflection agent using config.reflection_model.

TYPE: LlmAgent | None DEFAULT: None

trajectory_config

Trajectory capture settings (uses defaults if None).

TYPE: TrajectoryConfig | None DEFAULT: None

workflow

Optional original workflow structure to preserve during evaluation. When provided, LoopAgent iterations and ParallelAgent concurrency are preserved instead of flattening to SequentialAgent. Used internally by evolve_workflow(); not typically set directly.

TYPE: SequentialAgent | LoopAgent | ParallelAgent | None DEFAULT: None

session_service

Optional ADK session service for state management. If None (default), creates an InMemorySessionService internally. Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService) to persist sessions alongside other agent executions in a shared database.

TYPE: BaseSessionService | None DEFAULT: None

app

Optional ADK App instance. When provided, evolution uses the app's configuration. Note that App does not hold services directly; pass a Runner for service extraction, or combine with session_service param.

TYPE: App | None DEFAULT: None

runner

Optional ADK Runner instance. When provided, evolution extracts and uses the runner's session_service for all agent executions (evolved agents, critic, and reflection agent). Takes precedence over both app and session_service parameters. This enables seamless integration with existing ADK infrastructure.

TYPE: Runner | None DEFAULT: None

RETURNS DESCRIPTION
MultiAgentEvolutionResult

MultiAgentEvolutionResult containing evolved_components dict

MultiAgentEvolutionResult

mapping qualified component names (agent.component format) to their

MultiAgentEvolutionResult

optimized values, along with score metrics and iteration history.

RAISES DESCRIPTION
MultiAgentValidationError

If agents dict is empty, primary agent not found, or no scorer and primary lacks output_schema.

ValueError

If components mapping contains unknown agents, unknown component handlers, or is missing entries for agents.

EvolutionError

If evolution fails during execution.

Examples:

Basic usage with per-agent components (API v0.3.x):

from google.adk.agents import LlmAgent
from gepa_adk import evolve_group

generator = LlmAgent(
    name="generator",
    model="gemini-2.5-flash",
    instruction="Generate code based on the requirement.",
)
critic = LlmAgent(
    name="critic",
    model="gemini-2.5-flash",
    instruction="Review the code in {generator_output}.",
)
validator = LlmAgent(
    name="validator",
    model="gemini-2.5-flash",
    instruction="Validate the reviewed code.",
    output_schema=ValidationResult,
)

result = await evolve_group(
    agents={
        "generator": generator,
        "critic": critic,
        "validator": validator,
    },
    primary="validator",
    trainset=training_data,
    components={
        "generator": ["instruction", "output_schema"],
        "critic": ["instruction"],
        "validator": ["instruction"],
    },
)

# Access evolved components using qualified names
print(result.evolved_components["generator.instruction"])
print(result.evolved_components["critic.instruction"])
print(result.evolved_components["validator.instruction"])

Exclude an agent from evolution:

result = await evolve_group(
    agents={"generator": gen, "static_validator": val},
    primary="generator",
    trainset=training_data,
    components={
        "generator": ["instruction"],
        "static_validator": [],  # Excluded from evolution
    },
)

Using custom session service for persistence:

from google.adk.sessions import SqliteSessionService

# Use SQLite for session persistence
session_service = SqliteSessionService(db_path="evolution_sessions.db")

result = await evolve_group(
    agents={"generator": gen, "critic": critic},
    primary="critic",
    trainset=training_data,
    session_service=session_service,  # Sessions persisted to SQLite
)

Using App/Runner for existing infrastructure integration:

from google.adk.runners import Runner
from google.adk.sessions import DatabaseSessionService

# Configure Runner with your production session service
runner = Runner(
    app_name="my_app",
    agent=generator,  # Any agent from the group
    session_service=DatabaseSessionService(connection_string="..."),
)

# Evolution uses Runner's session_service for all operations
result = await evolve_group(
    agents={"generator": gen, "refiner": ref},
    primary="refiner",
    trainset=training_data,
    runner=runner,  # Services extracted from runner
)
Note

Breaking change in v0.3.x: The agents parameter changed from list[LlmAgent] to dict[str, LlmAgent]. Candidate keys now use qualified names (agent.component) instead of {agent_name}_instruction.

Source code in src/gepa_adk/api.py
async def evolve_group(
    agents: dict[str, LlmAgent],
    primary: str,
    trainset: list[dict[str, Any]],
    components: dict[str, list[str]] | None = None,
    critic: LlmAgent | None = None,
    share_session: bool = True,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol | str | None = None,
    reflection_agent: LlmAgent | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    workflow: SequentialAgent | LoopAgent | ParallelAgent | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult:
    """Evolve multiple agents together with per-agent component configuration.

    Optimizes specified components for each agent by targeting the primary
    agent's output score. When share_session=True, agents execute sequentially
    with shared session state, enabling later agents to access earlier
    agents' outputs via template strings.

    Args:
        agents: Named ADK agents to evolve together as dict mapping agent
            names to LlmAgent instances. Must have at least one agent.
        primary: Name of the agent whose output is used for scoring.
            Must match one of the agent names in the dict.
        trainset: Training examples for evaluation. Each example should
            have an "input" key and optionally an "expected" key.
        components: Per-agent component configuration mapping agent names
            to lists of component names to evolve. If None, defaults to
            evolving "instruction" for all agents. Use empty list to
            exclude an agent from evolution. Available component names:
            "instruction", "output_schema", "generate_content_config".
        critic: Optional critic agent for scoring. If None, the primary
            agent must have an output_schema for schema-based scoring.
        share_session: Whether agents share session state during
            execution. When True (default), uses SequentialAgent.
            When False, agents execute with isolated sessions.
        config: Evolution configuration. If None, uses EvolutionConfig
            defaults.
        state_guard: Optional StateGuard instance for validating and
            repairing state injection tokens in evolved instructions.
        component_selector: Optional selector instance or selector name for
            choosing which components to update.
        reflection_agent: Optional ADK agent for proposals. If None, creates a
            default reflection agent using config.reflection_model.
        trajectory_config: Trajectory capture settings (uses defaults if None).
        workflow: Optional original workflow structure to preserve during
            evaluation. When provided, LoopAgent iterations and ParallelAgent
            concurrency are preserved instead of flattening to SequentialAgent.
            Used internally by evolve_workflow(); not typically set directly.
        session_service: Optional ADK session service for state management.
            If None (default), creates an InMemorySessionService internally.
            Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService)
            to persist sessions alongside other agent executions in a shared database.
        app: Optional ADK App instance. When provided, evolution uses the app's
            configuration. Note that App does not hold services directly; pass
            a Runner for service extraction, or combine with session_service param.
        runner: Optional ADK Runner instance. When provided, evolution extracts
            and uses the runner's session_service for all agent executions
            (evolved agents, critic, and reflection agent). Takes precedence
            over both app and session_service parameters. This enables seamless
            integration with existing ADK infrastructure.

    Returns:
        MultiAgentEvolutionResult containing evolved_components dict
        mapping qualified component names (agent.component format) to their
        optimized values, along with score metrics and iteration history.

    Raises:
        MultiAgentValidationError: If agents dict is empty, primary agent
            not found, or no scorer and primary lacks output_schema.
        ValueError: If components mapping contains unknown agents, unknown
            component handlers, or is missing entries for agents.
        EvolutionError: If evolution fails during execution.

    Examples:
        Basic usage with per-agent components (API v0.3.x):

        ```python
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve_group

        generator = LlmAgent(
            name="generator",
            model="gemini-2.5-flash",
            instruction="Generate code based on the requirement.",
        )
        critic = LlmAgent(
            name="critic",
            model="gemini-2.5-flash",
            instruction="Review the code in {generator_output}.",
        )
        validator = LlmAgent(
            name="validator",
            model="gemini-2.5-flash",
            instruction="Validate the reviewed code.",
            output_schema=ValidationResult,
        )

        result = await evolve_group(
            agents={
                "generator": generator,
                "critic": critic,
                "validator": validator,
            },
            primary="validator",
            trainset=training_data,
            components={
                "generator": ["instruction", "output_schema"],
                "critic": ["instruction"],
                "validator": ["instruction"],
            },
        )

        # Access evolved components using qualified names
        print(result.evolved_components["generator.instruction"])
        print(result.evolved_components["critic.instruction"])
        print(result.evolved_components["validator.instruction"])
        ```

        Exclude an agent from evolution:

        ```python
        result = await evolve_group(
            agents={"generator": gen, "static_validator": val},
            primary="generator",
            trainset=training_data,
            components={
                "generator": ["instruction"],
                "static_validator": [],  # Excluded from evolution
            },
        )
        ```

        Using custom session service for persistence:

        ```python
        from google.adk.sessions import SqliteSessionService

        # Use SQLite for session persistence
        session_service = SqliteSessionService(db_path="evolution_sessions.db")

        result = await evolve_group(
            agents={"generator": gen, "critic": critic},
            primary="critic",
            trainset=training_data,
            session_service=session_service,  # Sessions persisted to SQLite
        )
        ```

        Using App/Runner for existing infrastructure integration:

        ```python
        from google.adk.runners import Runner
        from google.adk.sessions import DatabaseSessionService

        # Configure Runner with your production session service
        runner = Runner(
            app_name="my_app",
            agent=generator,  # Any agent from the group
            session_service=DatabaseSessionService(connection_string="..."),
        )

        # Evolution uses Runner's session_service for all operations
        result = await evolve_group(
            agents={"generator": gen, "refiner": ref},
            primary="refiner",
            trainset=training_data,
            runner=runner,  # Services extracted from runner
        )
        ```

    Note:
        Breaking change in v0.3.x: The `agents` parameter changed from
        `list[LlmAgent]` to `dict[str, LlmAgent]`. Candidate keys now use
        qualified names (agent.component) instead of {agent_name}_instruction.
    """
    # Validate agent names are valid identifiers (T012a)
    for agent_name in agents:
        _validate_component_name(
            agent_name,
            context="evolve_group agent",
        )

    # Default components: evolve "instruction" for all agents
    if components is None:
        components = {name: ["instruction"] for name in agents}

    # Capture original instructions for StateGuard validation
    original_instructions = {
        name: str(agent.instruction) for name, agent in agents.items()
    }

    # Log precedence warnings if multiple config sources provided (#227 T009)
    if runner is not None and app is not None:
        logger.warning(
            "evolve_group.precedence.runner_over_app",
            message="Both runner and app provided; using runner (runner takes precedence)",
            runner_app_name=runner.app_name,
            app_name=app.name,
        )

    # Resolve services using precedence rules: runner > app > session_service > default (#227)
    resolved_session_service, _artifact_service = _resolve_evolution_services(
        runner=runner,
        app=app,
        session_service=session_service,
    )

    # Resolve app_name for session isolation (#239)
    resolved_app_name = _resolve_app_name(runner=runner, app=app)

    # Create unified executor for consistent session management (FR-003)
    executor = AgentExecutor(
        session_service=resolved_session_service,
        app_name=resolved_app_name,
    )

    # Build scorer with executor (FR-005)
    scorer = None
    if critic:
        scorer = CriticScorer(critic_agent=critic, executor=executor)

    # Resolve config for reflection_model
    resolved_config = config or EvolutionConfig()

    # Create reflection-based proposer with executor (FR-006)
    # Use provided reflection_agent or create a default one
    if reflection_agent is None:
        reflection_agent = LlmAgent(
            name="reflection_agent",
            model=_resolve_model_for_agent(resolved_config.reflection_model),
            instruction=resolved_config.reflection_prompt or REFLECTION_INSTRUCTION,
        )
    adk_reflection_fn = create_adk_reflection_fn(
        reflection_agent,
        executor=executor,
        session_service=resolved_session_service,
    )
    proposer = AsyncReflectiveMutationProposer(adk_reflection_fn=adk_reflection_fn)

    # Create adapter with executor (FR-004)
    adapter = MultiAgentAdapter(
        agents=agents,
        primary=primary,
        components=components,
        scorer=scorer,
        share_session=share_session,
        session_service=resolved_session_service,
        trajectory_config=trajectory_config,
        proposer=proposer,
        executor=executor,
        workflow=workflow,  # Preserve workflow structure (#215)
    )

    # Build seed candidate using qualified names (agent.component format per ADR-012)
    primary_agent = agents[primary]
    # Extract all configured components using their handlers
    seed_candidate_components: dict[str, str] = {}
    for agent_name, comp_list in components.items():
        agent = agents[agent_name]
        for comp_name in comp_list:
            qualified_name = f"{agent_name}.{comp_name}"
            handler = get_handler(comp_name)
            seed_candidate_components[qualified_name] = handler.serialize(agent)
    # Add required "instruction" key for engine compatibility
    seed_candidate_components["instruction"] = str(primary_agent.instruction)
    initial_candidate = Candidate(components=seed_candidate_components)

    # Create engine
    resolved_component_selector: ComponentSelectorProtocol | None = None
    if component_selector is not None:
        if isinstance(component_selector, str):
            resolved_component_selector = create_component_selector(component_selector)
        else:
            resolved_component_selector = component_selector

    engine = AsyncGEPAEngine(
        adapter=adapter,
        config=resolved_config,
        initial_candidate=initial_candidate,
        batch=trainset,
        component_selector=resolved_component_selector,
    )

    # Run evolution
    evolution_result = await engine.run()

    # Extract best candidate components from engine state using qualified names
    # The engine stores evolved_components from the candidate, which now uses
    # qualified names (agent.component format per ADR-012)
    evolved_components = _extract_evolved_components(
        evolution_result=evolution_result,
        seed_components=seed_candidate_components,
        agents=agents,
        components=components,
        primary=primary,
    )

    # Apply StateGuard validation to instruction components only
    if state_guard is not None:
        validated_components = {}
        for qualified_name, evolved_value in evolved_components.items():
            # Only apply StateGuard to instruction components
            if qualified_name.endswith(".instruction"):
                agent_name = qualified_name.rsplit(".", 1)[0]
                original_instruction = original_instructions.get(agent_name, "")
                validated_components[qualified_name] = _apply_state_guard_validation(
                    state_guard=state_guard,
                    original_component_text=original_instruction,
                    evolved_component_text=evolved_value,
                    agent_name=agent_name,
                )
            else:
                # Non-instruction components pass through unchanged
                validated_components[qualified_name] = evolved_value
        evolved_components = validated_components

    # Convert EvolutionResult to MultiAgentEvolutionResult
    return MultiAgentEvolutionResult(
        evolved_components=evolved_components,
        original_score=evolution_result.original_score,
        final_score=evolution_result.final_score,
        primary_agent=primary,
        iteration_history=evolution_result.iteration_history,
        total_iterations=evolution_result.total_iterations,
    )

evolve_workflow async

evolve_workflow(
    workflow: SequentialAgent | LoopAgent | ParallelAgent,
    trainset: list[dict[str, Any]],
    critic: LlmAgent | None = None,
    primary: str | None = None,
    max_depth: int = 5,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol
    | str
    | None = None,
    round_robin: bool = False,
    components: dict[str, list[str]] | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult

Evolve LlmAgents within a workflow agent structure.

Discovers all LlmAgent instances within a workflow (SequentialAgent, LoopAgent, or ParallelAgent) and evolves them while preserving the workflow structure. Uses shared session state to maintain workflow context during evaluation.

PARAMETER DESCRIPTION
workflow

Workflow agent containing LlmAgents to evolve. Must be SequentialAgent, LoopAgent, or ParallelAgent.

TYPE: SequentialAgent | LoopAgent | ParallelAgent

trainset

Training examples for evaluation. Each example should have an "input" key and optionally an "expected" key.

TYPE: list[dict[str, Any]]

critic

Optional critic agent for scoring. If None, the primary agent must have an output_schema for schema-based scoring.

TYPE: LlmAgent | None DEFAULT: None

primary

Name of the agent to score. Defaults to the last LlmAgent found in the workflow (for sequential workflows, this is typically the final output producer).

TYPE: str | None DEFAULT: None

max_depth

Maximum recursion depth for nested workflows (default: 5). Limits how deeply nested workflow structures are traversed.

TYPE: int DEFAULT: 5

config

Evolution configuration. If None, uses EvolutionConfig defaults.

TYPE: EvolutionConfig | None DEFAULT: None

state_guard

Optional StateGuard instance for validating and repairing state injection tokens in evolved component_text.

TYPE: StateGuard | None DEFAULT: None

component_selector

Optional selector instance or selector name for choosing which components to update.

TYPE: ComponentSelectorProtocol | str | None DEFAULT: None

round_robin

If False (default), only the first discovered agent's instruction is evolved across all iterations. If True, all agents' instructions are evolved in round-robin fashion (the engine cycles through agents each iteration). Ignored when components is provided.

TYPE: bool DEFAULT: False

components

Optional per-agent component configuration mapping agent names to lists of component names to evolve. When provided, takes precedence over round_robin. Use empty list to exclude an agent.

TYPE: dict[str, list[str]] | None DEFAULT: None

session_service

Optional ADK session service for state management. If None (default), creates an InMemorySessionService internally. Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService) to persist sessions alongside other agent executions in a shared database.

TYPE: BaseSessionService | None DEFAULT: None

app

Optional ADK App instance. When provided, evolution uses the app's configuration. Note that App does not hold services directly; pass a Runner for service extraction, or combine with session_service param.

TYPE: App | None DEFAULT: None

runner

Optional ADK Runner instance. When provided, evolution extracts and uses the runner's session_service for all agent executions (evolved agents, critic, and reflection agent). Takes precedence over both app and session_service parameters. This enables seamless integration with existing ADK infrastructure.

TYPE: Runner | None DEFAULT: None

RETURNS DESCRIPTION
MultiAgentEvolutionResult

MultiAgentEvolutionResult containing evolved_components dict mapping

MultiAgentEvolutionResult

agent names to their optimized component_text, along with score

MultiAgentEvolutionResult

metrics and iteration history.

RAISES DESCRIPTION
WorkflowEvolutionError

If workflow contains no LlmAgents.

MultiAgentValidationError

If primary agent not found or no scorer available.

EvolutionError

If evolution fails during execution.

Examples:

Default behavior (evolve first agent only):

from google.adk.agents import LlmAgent, SequentialAgent
from gepa_adk import evolve_workflow

generator = LlmAgent(name="generator", instruction="Generate code")
refiner = LlmAgent(name="refiner", instruction="Refine code")
writer = LlmAgent(name="writer", instruction="Write docs")
pipeline = SequentialAgent(
    name="Pipeline", sub_agents=[generator, refiner, writer]
)

# Only generator.instruction is evolved across all iterations
result = await evolve_workflow(workflow=pipeline, trainset=trainset)

Round-robin evolution (evolve all agents):

# All agents are evolved in round-robin: generator -> refiner -> writer -> ...
result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    round_robin=True,
)

Explicit components override (takes precedence over round_robin):

# Only generator and writer are evolved; refiner is excluded
result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    components={
        "generator": ["instruction"],
        "writer": ["instruction"],
        "refiner": [],  # Excluded
    },
)

Using custom session service for persistence:

from google.adk.sessions import SqliteSessionService

# Persist workflow evolution sessions to SQLite
session_service = SqliteSessionService(db_path="workflow_sessions.db")

result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    session_service=session_service,
)

Using App/Runner for existing infrastructure integration:

from google.adk.runners import Runner
from google.adk.sessions import DatabaseSessionService

# Configure Runner with your production session service
runner = Runner(
    app_name="my_workflow_app",
    agent=pipeline,  # The workflow agent
    session_service=DatabaseSessionService(connection_string="..."),
)

# Evolution uses Runner's session_service for all operations
result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    runner=runner,  # Services extracted from runner
)
Note

Supports workflow agents (SequentialAgent, LoopAgent, ParallelAgent) with recursive traversal and depth limiting via max_depth parameter. Handles nested structures. LoopAgent and ParallelAgent configurations (max_iterations, etc.) are preserved during evolution. Always uses share_session=True to maintain workflow context (FR-010).

Source code in src/gepa_adk/api.py
async def evolve_workflow(
    workflow: SequentialAgent | LoopAgent | ParallelAgent,
    trainset: list[dict[str, Any]],
    critic: LlmAgent | None = None,
    primary: str | None = None,
    max_depth: int = 5,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol | str | None = None,
    round_robin: bool = False,
    components: dict[str, list[str]] | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult:
    """Evolve LlmAgents within a workflow agent structure.

    Discovers all LlmAgent instances within a workflow (SequentialAgent,
    LoopAgent, or ParallelAgent) and evolves them while preserving the
    workflow structure. Uses shared session state to maintain workflow
    context during evaluation.

    Args:
        workflow: Workflow agent containing LlmAgents to evolve. Must be
            SequentialAgent, LoopAgent, or ParallelAgent.
        trainset: Training examples for evaluation. Each example should have
            an "input" key and optionally an "expected" key.
        critic: Optional critic agent for scoring. If None, the primary agent
            must have an output_schema for schema-based scoring.
        primary: Name of the agent to score. Defaults to the last LlmAgent
            found in the workflow (for sequential workflows, this is typically
            the final output producer).
        max_depth: Maximum recursion depth for nested workflows (default: 5).
            Limits how deeply nested workflow structures are traversed.
        config: Evolution configuration. If None, uses EvolutionConfig defaults.
        state_guard: Optional StateGuard instance for validating and
            repairing state injection tokens in evolved component_text.
        component_selector: Optional selector instance or selector name for
            choosing which components to update.
        round_robin: If False (default), only the first discovered agent's
            instruction is evolved across all iterations. If True, all agents'
            instructions are evolved in round-robin fashion (the engine cycles
            through agents each iteration). Ignored when components is provided.
        components: Optional per-agent component configuration mapping agent
            names to lists of component names to evolve. When provided, takes
            precedence over round_robin. Use empty list to exclude an agent.
        session_service: Optional ADK session service for state management.
            If None (default), creates an InMemorySessionService internally.
            Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService)
            to persist sessions alongside other agent executions in a shared database.
        app: Optional ADK App instance. When provided, evolution uses the app's
            configuration. Note that App does not hold services directly; pass
            a Runner for service extraction, or combine with session_service param.
        runner: Optional ADK Runner instance. When provided, evolution extracts
            and uses the runner's session_service for all agent executions
            (evolved agents, critic, and reflection agent). Takes precedence
            over both app and session_service parameters. This enables seamless
            integration with existing ADK infrastructure.

    Returns:
        MultiAgentEvolutionResult containing evolved_components dict mapping
        agent names to their optimized component_text, along with score
        metrics and iteration history.

    Raises:
        WorkflowEvolutionError: If workflow contains no LlmAgents.
        MultiAgentValidationError: If primary agent not found or no scorer
            available.
        EvolutionError: If evolution fails during execution.

    Examples:
        Default behavior (evolve first agent only):

        ```python
        from google.adk.agents import LlmAgent, SequentialAgent
        from gepa_adk import evolve_workflow

        generator = LlmAgent(name="generator", instruction="Generate code")
        refiner = LlmAgent(name="refiner", instruction="Refine code")
        writer = LlmAgent(name="writer", instruction="Write docs")
        pipeline = SequentialAgent(
            name="Pipeline", sub_agents=[generator, refiner, writer]
        )

        # Only generator.instruction is evolved across all iterations
        result = await evolve_workflow(workflow=pipeline, trainset=trainset)
        ```

        Round-robin evolution (evolve all agents):

        ```python
        # All agents are evolved in round-robin: generator -> refiner -> writer -> ...
        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            round_robin=True,
        )
        ```

        Explicit components override (takes precedence over round_robin):

        ```python
        # Only generator and writer are evolved; refiner is excluded
        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            components={
                "generator": ["instruction"],
                "writer": ["instruction"],
                "refiner": [],  # Excluded
            },
        )
        ```

        Using custom session service for persistence:

        ```python
        from google.adk.sessions import SqliteSessionService

        # Persist workflow evolution sessions to SQLite
        session_service = SqliteSessionService(db_path="workflow_sessions.db")

        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            session_service=session_service,
        )
        ```

        Using App/Runner for existing infrastructure integration:

        ```python
        from google.adk.runners import Runner
        from google.adk.sessions import DatabaseSessionService

        # Configure Runner with your production session service
        runner = Runner(
            app_name="my_workflow_app",
            agent=pipeline,  # The workflow agent
            session_service=DatabaseSessionService(connection_string="..."),
        )

        # Evolution uses Runner's session_service for all operations
        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            runner=runner,  # Services extracted from runner
        )
        ```

    Note:
        Supports workflow agents (SequentialAgent, LoopAgent, ParallelAgent)
        with recursive traversal and depth limiting via max_depth parameter.
        Handles nested structures. LoopAgent and ParallelAgent configurations
        (max_iterations, etc.) are preserved during evolution. Always uses
        share_session=True to maintain workflow context (FR-010).
    """
    logger.info(
        "Starting workflow evolution",
        workflow_name=workflow.name,
        workflow_type=type(workflow).__name__,
    )

    # Find all LlmAgents in the workflow recursively up to max_depth (US3)
    llm_agents = find_llm_agents(workflow, max_depth=max_depth)

    # Validate that at least one LlmAgent was found
    if not llm_agents:
        error_msg = (
            f"No LlmAgents found in workflow '{workflow.name}'. "
            "Workflow must contain at least one LlmAgent to evolve."
        )
        logger.error(
            "Workflow evolution failed", workflow_name=workflow.name, error=error_msg
        )
        raise WorkflowEvolutionError(
            error_msg,
            workflow_name=workflow.name,
        )

    logger.info(
        "Found LlmAgents in workflow",
        workflow_name=workflow.name,
        agent_count=len(llm_agents),
        agent_names=[agent.name for agent in llm_agents],
    )

    # Determine primary agent (default to last agent for sequential workflows)
    if primary is None:
        primary = llm_agents[-1].name
        logger.debug(
            "Using default primary agent",
            workflow_name=workflow.name,
            primary=primary,
        )

    # Convert list to dict for evolve_group (API v0.3.x)
    agents_dict = {agent.name: agent for agent in llm_agents}

    # Build components dict based on round_robin flag
    # Explicit components parameter takes precedence over round_robin
    resolved_components: dict[str, list[str]] | None = None
    if components is not None:
        # Explicit components provided - use as-is
        resolved_components = components
        logger.debug(
            "Using explicit components",
            workflow_name=workflow.name,
            components=list(components.keys()),
        )
    elif round_robin:
        # round_robin=True: evolve all agents
        resolved_components = {agent.name: ["instruction"] for agent in llm_agents}
        logger.debug(
            "Using round_robin mode - evolving all agents",
            workflow_name=workflow.name,
            agents=[agent.name for agent in llm_agents],
        )
    else:
        # Default: evolve only the first agent
        first_agent = llm_agents[0]
        resolved_components = {first_agent.name: ["instruction"]}
        # Add empty lists for other agents (excluded from evolution)
        for agent in llm_agents[1:]:
            resolved_components[agent.name] = []
        logger.debug(
            "Using default mode - evolving first agent only",
            workflow_name=workflow.name,
            first_agent=first_agent.name,
        )

    # Delegate to evolve_group with share_session=True (FR-010)
    logger.debug(
        "Delegating to evolve_group",
        workflow_name=workflow.name,
        agent_count=len(llm_agents),
        primary=primary,
        share_session=True,
        round_robin=round_robin,
    )

    return await evolve_group(
        agents=agents_dict,
        primary=primary,
        trainset=trainset,
        components=resolved_components,
        critic=critic,
        share_session=True,  # FR-010: Always use shared session for workflow context
        config=config,
        state_guard=state_guard,
        component_selector=component_selector,
        workflow=workflow,  # Preserve workflow structure (#215)
        session_service=session_service,  # Pass through for persistence (#226)
        app=app,  # Pass through for App/Runner pattern (#227)
        runner=runner,  # Pass through for App/Runner pattern (#227)
    )

evolve async

evolve(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    valset: list[dict[str, Any]] | None = None,
    critic: LlmAgent | None = None,
    reflection_agent: LlmAgent | None = None,
    config: EvolutionConfig | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    state_guard: StateGuard | None = None,
    candidate_selector: CandidateSelectorProtocol
    | str
    | None = None,
    component_selector: ComponentSelectorProtocol
    | str
    | None = None,
    executor: AgentExecutorProtocol | None = None,
    components: list[str] | None = None,
    schema_constraints: SchemaConstraints | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> EvolutionResult

Evolve an ADK agent's instruction.

Optimizes the instruction for a single ADK agent using evolutionary optimization. The agent's instruction is iteratively improved based on performance on the training set.

PARAMETER DESCRIPTION
agent

The ADK LlmAgent to evolve.

TYPE: LlmAgent

trainset

Training examples [{"input": "...", "expected": "..."}].

TYPE: list[dict[str, Any]]

valset

Optional validation examples used for scoring and acceptance. Defaults to the trainset when omitted.

TYPE: list[dict[str, Any]] | None DEFAULT: None

critic

Optional ADK agent for scoring (uses schema scoring if None).

TYPE: LlmAgent | None DEFAULT: None

reflection_agent

Optional ADK agent for proposals. If None, creates a default reflection agent using config.reflection_model.

TYPE: LlmAgent | None DEFAULT: None

config

Evolution configuration (uses defaults if None).

TYPE: EvolutionConfig | None DEFAULT: None

trajectory_config

Trajectory capture settings (uses defaults if None).

TYPE: TrajectoryConfig | None DEFAULT: None

state_guard

Optional state token preservation settings.

TYPE: StateGuard | None DEFAULT: None

candidate_selector

Optional selector instance or selector name.

TYPE: CandidateSelectorProtocol | str | None DEFAULT: None

component_selector

Optional selector instance or selector name for choosing which components to update.

TYPE: ComponentSelectorProtocol | str | None DEFAULT: None

executor

Optional AgentExecutorProtocol implementation for unified agent execution. When provided, both the ADKAdapter and CriticScorer use this executor for consistent session management and execution. If None, creates an AgentExecutor automatically.

TYPE: AgentExecutorProtocol | None DEFAULT: None

components

List of component names to include in evolution. Supported: - "instruction": The agent's instruction text (default if None). - "output_schema": The agent's Pydantic output_schema (serialized). When None, defaults to ["instruction"]. Use ["output_schema"] with a schema reflection agent to evolve the output schema.

TYPE: list[str] | None DEFAULT: None

schema_constraints

Optional SchemaConstraints for output_schema evolution. When provided, proposed schema mutations are validated against these constraints. Mutations that violate constraints (e.g., remove required fields) are rejected and the original schema is preserved.

TYPE: SchemaConstraints | None DEFAULT: None

app

Optional ADK App instance. When provided, evolution uses the app's configuration. Note that App does not hold services directly; pass a Runner for service extraction, or combine with session_service param. See the App/Runner integration guide for details.

TYPE: App | None DEFAULT: None

runner

Optional ADK Runner instance. When provided, evolution extracts and uses the runner's session_service for all agent executions (evolved agents, critic, and reflection agent). Takes precedence over both app and executor parameters. This enables seamless integration with existing ADK infrastructure.

TYPE: Runner | None DEFAULT: None

RETURNS DESCRIPTION
EvolutionResult

EvolutionResult with evolved_components dict and metrics.

RAISES DESCRIPTION
ConfigurationError

If invalid parameters provided.

EvolutionError

If evolution fails during execution.

Note

Single-agent evolution with trainset reflection and valset scoring.

Examples:

Basic usage with output_schema:

from pydantic import BaseModel, Field
from google.adk.agents import LlmAgent
from gepa_adk import evolve


class OutputSchema(BaseModel):
    answer: str
    score: float = Field(ge=0.0, le=1.0)


agent = LlmAgent(
    name="assistant",
    model="gemini-2.5-flash",
    instruction="You are a helpful assistant.",
    output_schema=OutputSchema,
)

trainset = [
    {"input": "What is 2+2?", "expected": "4"},
    {"input": "What is the capital of France?", "expected": "Paris"},
]

result = await evolve(agent, trainset)
print(f"Evolved: {result.evolved_components['instruction']}")

With critic agent:

from pydantic import BaseModel, Field
from google.adk.agents import LlmAgent
from gepa_adk import evolve


class CriticOutput(BaseModel):
    score: float = Field(ge=0.0, le=1.0)


critic = LlmAgent(
    name="critic",
    model="gemini-2.5-flash",
    instruction="Score the response quality.",
    output_schema=CriticOutput,
)

result = await evolve(agent, trainset, critic=critic)

Evolving output_schema with schema reflection:

from gepa_adk.engine.reflection_agents import create_schema_reflection_agent

# Create schema reflection agent with validation tool
schema_reflector = create_schema_reflection_agent("gemini-2.5-flash")

# Evolve output_schema component
result = await evolve(
    agent,
    trainset,
    critic=critic,
    reflection_agent=schema_reflector,
    components=["output_schema"],  # Evolve schema, not instruction
)
print(f"Evolved schema: {result.evolved_components['output_schema']}")

Using App/Runner for existing infrastructure integration:

from google.adk.apps.app import App
from google.adk.runners import Runner
from google.adk.sessions import DatabaseSessionService

# Configure Runner with your production session service
session_service = DatabaseSessionService(connection_string="...")
runner = Runner(
    app_name="my_app",
    agent=agent,
    session_service=session_service,
)

# Evolution uses your Runner's session_service for all operations
result = await evolve(
    agent,
    trainset,
    runner=runner,  # Services extracted from runner
)
Source code in src/gepa_adk/api.py
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
async def evolve(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    valset: list[dict[str, Any]] | None = None,
    critic: LlmAgent | None = None,
    reflection_agent: LlmAgent | None = None,
    config: EvolutionConfig | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    state_guard: StateGuard | None = None,
    candidate_selector: CandidateSelectorProtocol | str | None = None,
    component_selector: ComponentSelectorProtocol | str | None = None,
    executor: AgentExecutorProtocol | None = None,
    components: list[str] | None = None,
    schema_constraints: SchemaConstraints | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> EvolutionResult:
    """Evolve an ADK agent's instruction.

    Optimizes the instruction for a single ADK agent using evolutionary
    optimization. The agent's instruction is iteratively improved based on
    performance on the training set.

    Args:
        agent: The ADK LlmAgent to evolve.
        trainset: Training examples [{"input": "...", "expected": "..."}].
        valset: Optional validation examples used for scoring and acceptance.
            Defaults to the trainset when omitted.
        critic: Optional ADK agent for scoring (uses schema scoring if None).
        reflection_agent: Optional ADK agent for proposals. If None, creates a
            default reflection agent using config.reflection_model.
        config: Evolution configuration (uses defaults if None).
        trajectory_config: Trajectory capture settings (uses defaults if None).
        state_guard: Optional state token preservation settings.
        candidate_selector: Optional selector instance or selector name.
        component_selector: Optional selector instance or selector name for
            choosing which components to update.
        executor: Optional AgentExecutorProtocol implementation for unified
            agent execution. When provided, both the ADKAdapter and CriticScorer
            use this executor for consistent session management and execution.
            If None, creates an AgentExecutor automatically.
        components: List of component names to include in evolution. Supported:
            - "instruction": The agent's instruction text (default if None).
            - "output_schema": The agent's Pydantic output_schema (serialized).
            When None, defaults to ["instruction"]. Use ["output_schema"] with
            a schema reflection agent to evolve the output schema.
        schema_constraints: Optional SchemaConstraints for output_schema evolution.
            When provided, proposed schema mutations are validated against these
            constraints. Mutations that violate constraints (e.g., remove required
            fields) are rejected and the original schema is preserved.
        app: Optional ADK App instance. When provided, evolution uses the app's
            configuration. Note that App does not hold services directly; pass
            a Runner for service extraction, or combine with session_service param.
            See the App/Runner integration guide for details.
        runner: Optional ADK Runner instance. When provided, evolution extracts
            and uses the runner's session_service for all agent executions
            (evolved agents, critic, and reflection agent). Takes precedence
            over both app and executor parameters. This enables seamless
            integration with existing ADK infrastructure.

    Returns:
        EvolutionResult with evolved_components dict and metrics.

    Raises:
        ConfigurationError: If invalid parameters provided.
        EvolutionError: If evolution fails during execution.

    Note:
        Single-agent evolution with trainset reflection and valset scoring.

    Examples:
        Basic usage with output_schema:

        ```python
        from pydantic import BaseModel, Field
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve


        class OutputSchema(BaseModel):
            answer: str
            score: float = Field(ge=0.0, le=1.0)


        agent = LlmAgent(
            name="assistant",
            model="gemini-2.5-flash",
            instruction="You are a helpful assistant.",
            output_schema=OutputSchema,
        )

        trainset = [
            {"input": "What is 2+2?", "expected": "4"},
            {"input": "What is the capital of France?", "expected": "Paris"},
        ]

        result = await evolve(agent, trainset)
        print(f"Evolved: {result.evolved_components['instruction']}")
        ```

        With critic agent:

        ```python
        from pydantic import BaseModel, Field
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve


        class CriticOutput(BaseModel):
            score: float = Field(ge=0.0, le=1.0)


        critic = LlmAgent(
            name="critic",
            model="gemini-2.5-flash",
            instruction="Score the response quality.",
            output_schema=CriticOutput,
        )

        result = await evolve(agent, trainset, critic=critic)
        ```

        Evolving output_schema with schema reflection:

        ```python
        from gepa_adk.engine.reflection_agents import create_schema_reflection_agent

        # Create schema reflection agent with validation tool
        schema_reflector = create_schema_reflection_agent("gemini-2.5-flash")

        # Evolve output_schema component
        result = await evolve(
            agent,
            trainset,
            critic=critic,
            reflection_agent=schema_reflector,
            components=["output_schema"],  # Evolve schema, not instruction
        )
        print(f"Evolved schema: {result.evolved_components['output_schema']}")
        ```

        Using App/Runner for existing infrastructure integration:

        ```python
        from google.adk.apps.app import App
        from google.adk.runners import Runner
        from google.adk.sessions import DatabaseSessionService

        # Configure Runner with your production session service
        session_service = DatabaseSessionService(connection_string="...")
        runner = Runner(
            app_name="my_app",
            agent=agent,
            session_service=session_service,
        )

        # Evolution uses your Runner's session_service for all operations
        result = await evolve(
            agent,
            trainset,
            runner=runner,  # Services extracted from runner
        )
        ```
    """
    # Validate inputs
    _validate_evolve_inputs(agent, trainset)
    required_keys = (
        set(trainset[0].keys()) if trainset and len(trainset) > 0 else {"input"}
    )

    resolved_valset = valset if valset is not None else trainset
    if valset is not None:
        _validate_dataset(
            valset,
            "valset",
            allow_empty=False,
            required_keys=required_keys,
        )

    # Log reflection_agent configuration if provided
    if reflection_agent is not None:
        logger.debug(
            "evolve.reflection_agent.configured",
            agent_name=agent.name,
            reflection_agent_name=reflection_agent.name,
            message="Using ADK reflection agent for instruction improvement",
        )

    # Capture original instruction for StateGuard validation
    original_instruction = str(agent.instruction)

    candidate_selector_label = (
        candidate_selector
        if isinstance(candidate_selector, str)
        else type(candidate_selector).__name__
        if candidate_selector is not None
        else None
    )

    component_selector_label = (
        component_selector
        if isinstance(component_selector, str)
        else type(component_selector).__name__
        if component_selector is not None
        else None
    )

    # Log evolution start
    logger.info(
        "evolve.start",
        agent_name=agent.name,
        trainset_size=len(trainset),
        valset_size=len(resolved_valset),
        valset_defaulted=valset is None,
        has_critic=critic is not None,
        has_reflection_agent=reflection_agent is not None,
        has_state_guard=state_guard is not None,
        candidate_selector=candidate_selector_label,
        component_selector=component_selector_label,
    )

    # Resolve services from runner/app with precedence warnings (#227)
    # Log precedence warnings if multiple config sources provided (T009)
    if runner is not None and app is not None:
        logger.warning(
            "evolve.precedence.runner_over_app",
            message="Both runner and app provided; using runner (runner takes precedence)",
            runner_app_name=runner.app_name,
            app_name=app.name,
        )
    if runner is not None and executor is not None:
        logger.warning(
            "evolve.precedence.runner_over_executor",
            message="Both runner and executor provided; using runner's session_service",
            runner_app_name=runner.app_name,
        )

    # Extract services using precedence rules (T006)
    resolved_session_service, _artifact_service = _resolve_evolution_services(
        runner=runner,
        app=app,
        session_service=None,  # evolve() doesn't have direct session_service param
    )

    # Resolve app_name for session isolation (#239)
    resolved_app_name = _resolve_app_name(runner=runner, app=app)

    # Create executor with resolved session_service (T007)
    # Runner takes precedence over user-provided executor
    if runner is not None:
        resolved_executor = AgentExecutor(
            session_service=resolved_session_service,
            app_name=resolved_app_name,
        )
    else:
        resolved_executor = executor or AgentExecutor(
            session_service=resolved_session_service,
            app_name=resolved_app_name,
        )

    # Build scorer
    scorer: Scorer
    if critic:
        scorer = CriticScorer(critic_agent=critic, executor=resolved_executor)
    elif hasattr(agent, "output_schema") and agent.output_schema is not None:
        # Use schema-based scorer when agent has output_schema
        scorer = SchemaBasedScorer(output_schema=agent.output_schema)
    else:
        raise ConfigurationError(
            "Either critic must be provided or agent must have output_schema",
            field="critic",
            value=None,
            constraint="must provide critic or agent.output_schema",
        )

    # Resolve config
    resolved_config = config or EvolutionConfig()

    # Create reflection agent if not provided
    resolved_reflection_agent = reflection_agent
    if resolved_reflection_agent is None:
        # Create default reflection agent with config settings
        resolved_reflection_agent = LlmAgent(
            name="reflection_agent",
            model=_resolve_model_for_agent(resolved_config.reflection_model),
            instruction=resolved_config.reflection_prompt or REFLECTION_INSTRUCTION,
        )
        logger.debug(
            "evolve.reflection_agent.default",
            reflection_model=resolved_config.reflection_model,
        )

    # Create adapter with resolved session_service (T008)
    # The adapter passes session_service to create_adk_reflection_fn()
    # ensuring the reflection agent shares the same session service
    adapter = ADKAdapter(
        agent=agent,
        scorer=scorer,
        trajectory_config=trajectory_config,
        reflection_agent=resolved_reflection_agent,
        executor=resolved_executor,
        schema_constraints=schema_constraints,
        session_service=resolved_session_service,
    )

    # Build initial candidate components based on requested components
    resolved_components = components if components else [DEFAULT_COMPONENT_NAME]
    initial_components: dict[str, str] = {}
    original_component_values: dict[str, str] = {}

    for comp_name in resolved_components:
        if comp_name == DEFAULT_COMPONENT_NAME:
            initial_components[comp_name] = original_instruction
            original_component_values[comp_name] = original_instruction
        elif comp_name == COMPONENT_OUTPUT_SCHEMA:
            if not hasattr(agent, "output_schema") or agent.output_schema is None:
                raise ConfigurationError(
                    f"Cannot evolve '{COMPONENT_OUTPUT_SCHEMA}': agent has no output_schema",
                    field="components",
                    value=comp_name,
                    constraint="agent must have output_schema to evolve it",
                )
            schema_text = serialize_pydantic_schema(agent.output_schema)
            initial_components[comp_name] = schema_text
            original_component_values[comp_name] = schema_text
        else:
            raise ConfigurationError(
                f"Unknown component: '{comp_name}'. Supported: "
                f"'{DEFAULT_COMPONENT_NAME}', '{COMPONENT_OUTPUT_SCHEMA}'",
                field="components",
                value=comp_name,
                constraint="must be a supported component name",
            )

    logger.debug(
        "evolve.components.resolved",
        agent_name=agent.name,
        components=resolved_components,
    )

    initial_candidate = Candidate(components=initial_components)

    # Create engine
    resolved_candidate_selector: CandidateSelectorProtocol | None = None
    if candidate_selector is not None:
        if isinstance(candidate_selector, str):
            resolved_candidate_selector = create_candidate_selector(candidate_selector)
        else:
            resolved_candidate_selector = candidate_selector

    resolved_component_selector: ComponentSelectorProtocol | None = None
    if component_selector is not None:
        if isinstance(component_selector, str):
            resolved_component_selector = create_component_selector(component_selector)
        else:
            resolved_component_selector = component_selector

    engine = AsyncGEPAEngine(
        adapter=adapter,
        config=resolved_config,
        initial_candidate=initial_candidate,
        batch=trainset,
        valset=resolved_valset,
        candidate_selector=resolved_candidate_selector,
        component_selector=resolved_component_selector,
    )

    # Run evolution with cleanup
    try:
        result = await engine.run()

        valset_score = result.valset_score
        trainset_score = result.trainset_score

        if trainset_score is not None:
            logger.info(
                "evolve.trainset.scored",
                agent_name=agent.name,
                trainset_size=len(trainset),
                trainset_score=trainset_score,
            )
        if valset_score is not None:
            logger.info(
                "evolve.valset.scored",
                agent_name=agent.name,
                valset_size=len(resolved_valset),
                valset_score=valset_score,
                valset_defaulted=valset is None,
            )

        # Apply state guard validation if provided (for token preservation)
        # Only applies to text components (instruction), not to output_schema
        validated_components = dict(result.evolved_components)
        for comp_name in resolved_components:
            if comp_name in result.evolved_components:
                if comp_name == DEFAULT_COMPONENT_NAME and state_guard is not None:
                    validated_components[comp_name] = _apply_state_guard_validation(
                        state_guard=state_guard,
                        original_component_text=original_component_values[comp_name],
                        evolved_component_text=result.evolved_components[comp_name],
                        agent_name=agent.name,
                    )
                else:
                    validated_components[comp_name] = result.evolved_components[
                        comp_name
                    ]

        # Log evolution completion
        logger.info(
            "evolve.complete",
            agent_name=agent.name,
            original_score=result.original_score,
            final_score=result.final_score,
            improvement=result.improvement,
            total_iterations=result.total_iterations,
            valset_score=valset_score,
            trainset_score=trainset_score,
            components=resolved_components,
        )

        # Return result with validated evolved_components and valset_score
        # (creates new instance since frozen)
        return EvolutionResult(
            original_score=result.original_score,
            final_score=result.final_score,
            evolved_components=validated_components,
            iteration_history=result.iteration_history,
            total_iterations=result.total_iterations,
            valset_score=valset_score,
            trainset_score=trainset_score,
        )
    finally:
        # Clean up adapter resources (clears handler constraints)
        adapter.cleanup()

evolve_sync

evolve_sync(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    **kwargs: Any,
) -> EvolutionResult

Synchronous wrapper for evolve().

Runs the async evolve() function in a blocking manner. Handles nested event loops automatically (Jupyter compatible).

PARAMETER DESCRIPTION
agent

The ADK LlmAgent to evolve.

TYPE: LlmAgent

trainset

Training examples.

TYPE: list[dict[str, Any]]

**kwargs

Optional keyword arguments passed to evolve().

TYPE: Any DEFAULT: {}

PARAMETER DESCRIPTION
valset

Optional validation examples for held-out evaluation.

TYPE: list[dict[str, Any]] | None

critic

Optional ADK agent for scoring.

TYPE: LlmAgent | None

reflection_agent

Optional ADK agent for proposals (not yet implemented).

TYPE: LlmAgent | None

config

EvolutionConfig for customizing evolution parameters.

TYPE: EvolutionConfig | None

trajectory_config

TrajectoryConfig for trace capture settings.

TYPE: TrajectoryConfig | None

state_guard

Optional state token preservation settings.

TYPE: StateGuard | None

candidate_selector

Optional selector instance or selector name.

TYPE: CandidateSelectorProtocol | str | None

executor

Optional unified agent executor for consistent session management across all agent types.

TYPE: AgentExecutorProtocol | None

RETURNS DESCRIPTION
EvolutionResult

EvolutionResult with evolved_components dict and metrics.

RAISES DESCRIPTION
ConfigurationError

If invalid parameters provided.

EvolutionError

If evolution fails during execution.

Examples:

Basic usage in a script:

from pydantic import BaseModel, Field
from google.adk.agents import LlmAgent
from gepa_adk import evolve_sync


class OutputSchema(BaseModel):
    answer: str
    score: float = Field(ge=0.0, le=1.0)


agent = LlmAgent(
    name="assistant",
    model="gemini-2.5-flash",
    instruction="You are a helpful assistant.",
    output_schema=OutputSchema,
)

trainset = [
    {"input": "What is 2+2?", "expected": "4"},
]

result = evolve_sync(agent, trainset)
print(f"Evolved: {result.evolved_components['instruction']}")

With configuration:

from gepa_adk import evolve_sync, EvolutionConfig

config = EvolutionConfig(max_iterations=50)
result = evolve_sync(agent, trainset, config=config)
Note

Synchronous wrapper for scripts and Jupyter notebooks. Automatically handles nested event loops using nest_asyncio when needed.

Source code in src/gepa_adk/api.py
def evolve_sync(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    **kwargs: Any,
) -> EvolutionResult:
    """Synchronous wrapper for evolve().

    Runs the async evolve() function in a blocking manner.
    Handles nested event loops automatically (Jupyter compatible).

    Args:
        agent: The ADK LlmAgent to evolve.
        trainset: Training examples.
        **kwargs: Optional keyword arguments passed to evolve().

    Keyword Args:
        valset (list[dict[str, Any]] | None): Optional validation examples for
            held-out evaluation.
        critic (LlmAgent | None): Optional ADK agent for scoring.
        reflection_agent (LlmAgent | None): Optional ADK agent for proposals
            (not yet implemented).
        config (EvolutionConfig | None): EvolutionConfig for customizing
            evolution parameters.
        trajectory_config (TrajectoryConfig | None): TrajectoryConfig for trace
            capture settings.
        state_guard (StateGuard | None): Optional state token preservation
            settings.
        candidate_selector (CandidateSelectorProtocol | str | None): Optional
            selector instance or selector name.
        executor (AgentExecutorProtocol | None): Optional unified agent executor
            for consistent session management across all agent types.

    Returns:
        EvolutionResult with evolved_components dict and metrics.

    Raises:
        ConfigurationError: If invalid parameters provided.
        EvolutionError: If evolution fails during execution.

    Examples:
        Basic usage in a script:

        ```python
        from pydantic import BaseModel, Field
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve_sync


        class OutputSchema(BaseModel):
            answer: str
            score: float = Field(ge=0.0, le=1.0)


        agent = LlmAgent(
            name="assistant",
            model="gemini-2.5-flash",
            instruction="You are a helpful assistant.",
            output_schema=OutputSchema,
        )

        trainset = [
            {"input": "What is 2+2?", "expected": "4"},
        ]

        result = evolve_sync(agent, trainset)
        print(f"Evolved: {result.evolved_components['instruction']}")
        ```

        With configuration:

        ```python
        from gepa_adk import evolve_sync, EvolutionConfig

        config = EvolutionConfig(max_iterations=50)
        result = evolve_sync(agent, trainset, config=config)
        ```

    Note:
        Synchronous wrapper for scripts and Jupyter notebooks. Automatically
        handles nested event loops using nest_asyncio when needed.
    """
    import asyncio

    try:
        # Try standard asyncio.run() first
        return asyncio.run(evolve(agent, trainset, **kwargs))
    except RuntimeError as e:
        # Handle nested event loop case (e.g., Jupyter notebooks)
        if "asyncio.run() cannot be called from a running event loop" in str(e):
            # Use nest_asyncio for nested event loops
            try:
                import nest_asyncio

                nest_asyncio.apply()
                # Now we can use asyncio.run() even in nested context
                return asyncio.run(evolve(agent, trainset, **kwargs))
            except ImportError:
                # We're here because asyncio.run() failed due to running event loop.
                # Without nest_asyncio, we can't handle nested event loops.
                raise RuntimeError(
                    "nest_asyncio is required for nested event loops. "
                    "Install it with: uv add nest_asyncio"
                ) from e
        raise