Skip to content

Core API

api

Public API functions for gepa-adk evolution engine.

This module provides high-level async functions for evolving agent instructions using the GEPA (Generalized Evolutionary Prompt-programming Architecture) approach. Pre-flight validation runs synchronously before any LLM calls to give developers immediate feedback on invalid configurations. Each entry point has a dedicated pre-flight validator (_pre_flight_validate_evolve, _pre_flight_validate_group, _pre_flight_validate_workflow).

Note

The public API exposes evolve(), evolve_group(), evolve_workflow(), and run_sync() as primary entry points. All async functions should be awaited. For synchronous usage in scripts, use run_sync(evolve(...)) which handles event loop management internally. evolve_sync() is deprecated in favor of run_sync(). For reproducible evolution, pass a seeded config: config=EvolutionConfig(seed=42).

Examples:

Single-agent evolution (synchronous):

from google.adk.agents import LlmAgent
from gepa_adk.api import evolve, run_sync

agent = LlmAgent(name="helper", model="gemini-2.5-flash", instruction="Be helpful.")
result = run_sync(evolve(agent, trainset=[{"input": "hi"}]))

Workflow evolution across a multi-agent graph:

from gepa_adk.api import evolve_workflow

result = await evolve_workflow(workflow_root, trainset=trainset)
See Also

gepa_adk.engine: Core evolution engine and mutation proposers. gepa_adk.ports: Protocol definitions consumed by this module. gepa_adk.domain.models: Candidate, EvolutionConfig, and EvolutionResult.

SchemaBasedScorer

Scorer that extracts scores from agent's structured output_schema.

When an agent has an output_schema, its output is structured JSON. This scorer parses that JSON and extracts a "score" field.

ATTRIBUTE DESCRIPTION
output_schema

The Pydantic BaseModel schema class from agent.output_schema. Must contain a "score" field.

TYPE: type[BaseModel]

Examples:

Basic usage:

from pydantic import BaseModel, Field
from google.adk.agents import LlmAgent
from gepa_adk.api import SchemaBasedScorer


class OutputSchema(BaseModel):
    score: float = Field(ge=0.0, le=1.0)
    result: str


agent = LlmAgent(
    name="agent",
    model="gemini-2.5-flash",
    output_schema=OutputSchema,
)

scorer = SchemaBasedScorer(output_schema=OutputSchema)
score, metadata = await scorer.async_score(
    input_text="test",
    output='{"score": 0.8, "result": "good"}',
)
Note

Adheres to Scorer protocol. Requires output_schema to have a "score" field. If score field is missing, raises MissingScoreFieldError.

Source code in src/gepa_adk/api.py
class SchemaBasedScorer:
    """Scorer that extracts scores from agent's structured output_schema.

    When an agent has an output_schema, its output is structured JSON.
    This scorer parses that JSON and extracts a "score" field.

    Attributes:
        output_schema (type[BaseModel]): The Pydantic BaseModel schema class
            from agent.output_schema. Must contain a "score" field.

    Examples:
        Basic usage:

        ```python
        from pydantic import BaseModel, Field
        from google.adk.agents import LlmAgent
        from gepa_adk.api import SchemaBasedScorer


        class OutputSchema(BaseModel):
            score: float = Field(ge=0.0, le=1.0)
            result: str


        agent = LlmAgent(
            name="agent",
            model="gemini-2.5-flash",
            output_schema=OutputSchema,
        )

        scorer = SchemaBasedScorer(output_schema=OutputSchema)
        score, metadata = await scorer.async_score(
            input_text="test",
            output='{"score": 0.8, "result": "good"}',
        )
        ```

    Note:
        Adheres to Scorer protocol. Requires output_schema to have a "score"
        field. If score field is missing, raises MissingScoreFieldError.
    """

    def __init__(self, output_schema: type[BaseModel]) -> None:
        """Initialize schema-based scorer.

        Args:
            output_schema: Pydantic BaseModel class from agent.output_schema.

        Raises:
            ConfigurationError: If output_schema doesn't have a "score" field.

        Note:
            Checks that the schema contains a "score" field during initialization.
        """
        self.output_schema = output_schema

        # Verify schema has score field
        if (
            not hasattr(output_schema, "model_fields")
            or "score" not in output_schema.model_fields
        ):
            raise ConfigurationError(
                f"output_schema {output_schema.__name__} must have a 'score' field",
                field="output_schema",
                value=output_schema.__name__,
                constraint="must have 'score' field",
            )

    def score(
        self,
        input_text: str,
        output: str,
        expected: str | None = None,
    ) -> tuple[float, dict[str, Any]]:
        """Score an agent output synchronously.

        Args:
            input_text: The input provided to the agent.
            output: The agent's structured JSON output.
            expected: Optional expected output (not used for schema-based scoring).

        Returns:
            Tuple of (score, metadata) where score is extracted from output JSON
            and metadata contains all other fields from the schema.

        Raises:
            OutputParseError: If output cannot be parsed as JSON.
            SchemaValidationError: If output doesn't match the schema.
            MissingScoreFieldError: If score field is null in parsed output.

        Examples:
            Basic scoring with JSON output:

            ```python
            scorer = SchemaBasedScorer(output_schema=MySchema)
            score, metadata = scorer.score(
                input_text="What is 2+2?",
                output='{"score": 0.9, "result": "4"}',
            )
            # score == 0.9, metadata == {"result": "4"}
            ```

        Note:
            Operates synchronously by parsing JSON and extracting the score field.
            The expected parameter is ignored for schema-based scoring.
        """
        try:
            # Parse JSON output
            parsed = json.loads(output)
            # Parse with Pydantic schema for validation
            schema_instance = self.output_schema.model_validate(parsed)

            # Extract score - schema validated in __init__ has "score" field,
            # and model_validate succeeded, so score attribute exists.
            # The value could still be None if schema allows nullable scores.
            score_value = cast(_ScoreSchema, schema_instance).score
            if score_value is None:
                raise MissingScoreFieldError(
                    f"output_schema {self.output_schema.__name__} has score=None; "
                    "score must be a numeric value",
                    parsed_output=parsed,
                )

            # Explicit cast since we've validated it's not None
            score = float(score_value)

            # Build metadata from all other fields
            metadata = schema_instance.model_dump(exclude={"score"})

            return score, metadata

        except json.JSONDecodeError as e:
            raise OutputParseError(
                f"Failed to parse output as JSON: {e}",
                raw_output=output,
                parse_error=str(e),
                cause=e,
            ) from e
        except ValidationError as e:
            raise SchemaValidationError(
                f"Output does not match schema {self.output_schema.__name__}: {e}",
                raw_output=output,
                validation_error=str(e),
                cause=e,
            ) from e

    async def async_score(
        self,
        input_text: str,
        output: str,
        expected: str | None = None,
    ) -> tuple[float, dict[str, Any]]:
        """Score an agent output asynchronously.

        Args:
            input_text: The input provided to the agent.
            output: The agent's structured JSON output.
            expected: Optional expected output (not used for schema-based scoring).

        Returns:
            Tuple of (score, metadata) where score is extracted from output JSON
            and metadata contains all other fields from the schema.

        Raises:
            OutputParseError: If output cannot be parsed as JSON.
            SchemaValidationError: If output doesn't match the schema.
            MissingScoreFieldError: If score field is null in parsed output.

        Examples:
            Async scoring with JSON output:

            ```python
            scorer = SchemaBasedScorer(output_schema=MySchema)
            score, metadata = await scorer.async_score(
                input_text="What is 2+2?",
                output='{"score": 0.9, "result": "4"}',
            )
            # score == 0.9, metadata == {"result": "4"}
            ```

        Note:
            Operates by delegating to synchronous score() since JSON parsing
            does not require async I/O operations.
        """
        # Schema-based scoring is synchronous (just JSON parsing)
        return self.score(input_text, output, expected)

__init__

__init__(output_schema: type[BaseModel]) -> None

Initialize schema-based scorer.

PARAMETER DESCRIPTION
output_schema

Pydantic BaseModel class from agent.output_schema.

TYPE: type[BaseModel]

RAISES DESCRIPTION
ConfigurationError

If output_schema doesn't have a "score" field.

Note

Checks that the schema contains a "score" field during initialization.

Source code in src/gepa_adk/api.py
def __init__(self, output_schema: type[BaseModel]) -> None:
    """Initialize schema-based scorer.

    Args:
        output_schema: Pydantic BaseModel class from agent.output_schema.

    Raises:
        ConfigurationError: If output_schema doesn't have a "score" field.

    Note:
        Checks that the schema contains a "score" field during initialization.
    """
    self.output_schema = output_schema

    # Verify schema has score field
    if (
        not hasattr(output_schema, "model_fields")
        or "score" not in output_schema.model_fields
    ):
        raise ConfigurationError(
            f"output_schema {output_schema.__name__} must have a 'score' field",
            field="output_schema",
            value=output_schema.__name__,
            constraint="must have 'score' field",
        )

score

score(
    input_text: str,
    output: str,
    expected: str | None = None,
) -> tuple[float, dict[str, Any]]

Score an agent output synchronously.

PARAMETER DESCRIPTION
input_text

The input provided to the agent.

TYPE: str

output

The agent's structured JSON output.

TYPE: str

expected

Optional expected output (not used for schema-based scoring).

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
float

Tuple of (score, metadata) where score is extracted from output JSON

dict[str, Any]

and metadata contains all other fields from the schema.

RAISES DESCRIPTION
OutputParseError

If output cannot be parsed as JSON.

SchemaValidationError

If output doesn't match the schema.

MissingScoreFieldError

If score field is null in parsed output.

Examples:

Basic scoring with JSON output:

scorer = SchemaBasedScorer(output_schema=MySchema)
score, metadata = scorer.score(
    input_text="What is 2+2?",
    output='{"score": 0.9, "result": "4"}',
)
# score == 0.9, metadata == {"result": "4"}
Note

Operates synchronously by parsing JSON and extracting the score field. The expected parameter is ignored for schema-based scoring.

Source code in src/gepa_adk/api.py
def score(
    self,
    input_text: str,
    output: str,
    expected: str | None = None,
) -> tuple[float, dict[str, Any]]:
    """Score an agent output synchronously.

    Args:
        input_text: The input provided to the agent.
        output: The agent's structured JSON output.
        expected: Optional expected output (not used for schema-based scoring).

    Returns:
        Tuple of (score, metadata) where score is extracted from output JSON
        and metadata contains all other fields from the schema.

    Raises:
        OutputParseError: If output cannot be parsed as JSON.
        SchemaValidationError: If output doesn't match the schema.
        MissingScoreFieldError: If score field is null in parsed output.

    Examples:
        Basic scoring with JSON output:

        ```python
        scorer = SchemaBasedScorer(output_schema=MySchema)
        score, metadata = scorer.score(
            input_text="What is 2+2?",
            output='{"score": 0.9, "result": "4"}',
        )
        # score == 0.9, metadata == {"result": "4"}
        ```

    Note:
        Operates synchronously by parsing JSON and extracting the score field.
        The expected parameter is ignored for schema-based scoring.
    """
    try:
        # Parse JSON output
        parsed = json.loads(output)
        # Parse with Pydantic schema for validation
        schema_instance = self.output_schema.model_validate(parsed)

        # Extract score - schema validated in __init__ has "score" field,
        # and model_validate succeeded, so score attribute exists.
        # The value could still be None if schema allows nullable scores.
        score_value = cast(_ScoreSchema, schema_instance).score
        if score_value is None:
            raise MissingScoreFieldError(
                f"output_schema {self.output_schema.__name__} has score=None; "
                "score must be a numeric value",
                parsed_output=parsed,
            )

        # Explicit cast since we've validated it's not None
        score = float(score_value)

        # Build metadata from all other fields
        metadata = schema_instance.model_dump(exclude={"score"})

        return score, metadata

    except json.JSONDecodeError as e:
        raise OutputParseError(
            f"Failed to parse output as JSON: {e}",
            raw_output=output,
            parse_error=str(e),
            cause=e,
        ) from e
    except ValidationError as e:
        raise SchemaValidationError(
            f"Output does not match schema {self.output_schema.__name__}: {e}",
            raw_output=output,
            validation_error=str(e),
            cause=e,
        ) from e

async_score async

async_score(
    input_text: str,
    output: str,
    expected: str | None = None,
) -> tuple[float, dict[str, Any]]

Score an agent output asynchronously.

PARAMETER DESCRIPTION
input_text

The input provided to the agent.

TYPE: str

output

The agent's structured JSON output.

TYPE: str

expected

Optional expected output (not used for schema-based scoring).

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
float

Tuple of (score, metadata) where score is extracted from output JSON

dict[str, Any]

and metadata contains all other fields from the schema.

RAISES DESCRIPTION
OutputParseError

If output cannot be parsed as JSON.

SchemaValidationError

If output doesn't match the schema.

MissingScoreFieldError

If score field is null in parsed output.

Examples:

Async scoring with JSON output:

scorer = SchemaBasedScorer(output_schema=MySchema)
score, metadata = await scorer.async_score(
    input_text="What is 2+2?",
    output='{"score": 0.9, "result": "4"}',
)
# score == 0.9, metadata == {"result": "4"}
Note

Operates by delegating to synchronous score() since JSON parsing does not require async I/O operations.

Source code in src/gepa_adk/api.py
async def async_score(
    self,
    input_text: str,
    output: str,
    expected: str | None = None,
) -> tuple[float, dict[str, Any]]:
    """Score an agent output asynchronously.

    Args:
        input_text: The input provided to the agent.
        output: The agent's structured JSON output.
        expected: Optional expected output (not used for schema-based scoring).

    Returns:
        Tuple of (score, metadata) where score is extracted from output JSON
        and metadata contains all other fields from the schema.

    Raises:
        OutputParseError: If output cannot be parsed as JSON.
        SchemaValidationError: If output doesn't match the schema.
        MissingScoreFieldError: If score field is null in parsed output.

    Examples:
        Async scoring with JSON output:

        ```python
        scorer = SchemaBasedScorer(output_schema=MySchema)
        score, metadata = await scorer.async_score(
            input_text="What is 2+2?",
            output='{"score": 0.9, "result": "4"}',
        )
        # score == 0.9, metadata == {"result": "4"}
        ```

    Note:
        Operates by delegating to synchronous score() since JSON parsing
        does not require async I/O operations.
    """
    # Schema-based scoring is synchronous (just JSON parsing)
    return self.score(input_text, output, expected)

evolve_group async

evolve_group(
    agents: dict[str, LlmAgent],
    primary: str,
    trainset: list[dict[str, Any]],
    *,
    components: dict[str, list[str]] | None = None,
    critic: LlmAgent | None = None,
    share_session: bool = True,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol
    | str
    | None = None,
    reflection_agent: LlmAgent | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    workflow: SequentialAgent
    | LoopAgent
    | ParallelAgent
    | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult

Evolve multiple agents together with per-agent component configuration.

Optimizes specified components for each agent by targeting the primary agent's output score. When share_session=True, agents execute sequentially with shared session state, enabling later agents to access earlier agents' outputs via template strings.

PARAMETER DESCRIPTION
agents

Named ADK agents to evolve together as dict mapping agent names to LlmAgent instances. Must have at least one agent.

TYPE: dict[str, LlmAgent]

primary

Name of the agent whose output is used for scoring. Must match one of the agent names in the dict.

TYPE: str

trainset

Training examples for evaluation. Each example should have an "input" key and optionally an "expected" key.

TYPE: list[dict[str, Any]]

PARAMETER DESCRIPTION
components

Per-agent component configuration mapping agent names to lists of component names to evolve. If None, defaults to evolving "instruction" for all agents. Use empty list to exclude an agent from evolution. Available component names: "instruction", "output_schema", "generate_content_config".

TYPE: dict[str, list[str]] | None

critic

Optional critic agent for scoring. If None, the primary agent must have an output_schema for schema-based scoring.

TYPE: LlmAgent | None

share_session

Whether agents share session state during execution. When True (default), uses SequentialAgent. When False, agents execute with isolated sessions.

TYPE: bool

config

Evolution configuration. If None, uses EvolutionConfig defaults.

TYPE: EvolutionConfig | None

state_guard

Optional StateGuard instance for validating and repairing state injection tokens in evolved instructions.

TYPE: StateGuard | None

component_selector

Optional selector instance or selector name for choosing which components to update.

TYPE: ComponentSelectorProtocol | str | None

reflection_agent

Optional ADK agent for proposals. If None, creates a default reflection agent using config.reflection_model.

TYPE: LlmAgent | None

trajectory_config

Trajectory capture settings (uses defaults if None).

TYPE: TrajectoryConfig | None

workflow

Optional original workflow structure to preserve during evaluation. When provided, LoopAgent iterations and ParallelAgent concurrency are preserved instead of flattening to SequentialAgent. Used internally by evolve_workflow(); not typically set directly.

TYPE: SequentialAgent | LoopAgent | ParallelAgent | None

session_service

Optional ADK session service for state management. If None (default), creates an InMemorySessionService internally. Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService) to persist sessions alongside other agent executions in a shared database.

TYPE: BaseSessionService | None

app

Optional ADK App instance. When provided, evolution uses the app's configuration. Note that App does not hold services directly; pass a Runner for service extraction, or combine with session_service param.

TYPE: App | None

runner

Optional ADK Runner instance. When provided, evolution extracts and uses the runner's session_service for all agent executions (evolved agents, critic, and reflection agent). Takes precedence over both app and session_service parameters. This enables seamless integration with existing ADK infrastructure.

TYPE: Runner | None

RETURNS DESCRIPTION
MultiAgentEvolutionResult

MultiAgentEvolutionResult containing evolved_components dict

MultiAgentEvolutionResult

mapping qualified component names (agent.component format) to their

MultiAgentEvolutionResult

optimized values, along with score metrics, iteration history,

MultiAgentEvolutionResult

original_components (filtered to the qualified keyspace),

MultiAgentEvolutionResult

schema_version, and stop_reason propagated from the engine result.

RAISES DESCRIPTION
ConfigurationError

If pre-flight validation fails: invalid agent names, non-LlmAgent critic, empty trainset, duplicate or empty component names per agent, or EvolutionConfig consistency errors.

MultiAgentValidationError

If agents dict is empty, primary agent not found, or no scorer and primary lacks output_schema.

ValueError

If components mapping contains unknown agents, unknown component handlers, or is missing entries for agents.

EvolutionError

If evolution fails during execution.

Examples:

Basic usage with per-agent components (API v0.3.x):

from google.adk.agents import LlmAgent
from gepa_adk import evolve_group

generator = LlmAgent(
    name="generator",
    model="gemini-2.5-flash",
    instruction="Generate code based on the requirement.",
)
critic = LlmAgent(
    name="critic",
    model="gemini-2.5-flash",
    instruction="Review the code in {generator_output}.",
)
validator = LlmAgent(
    name="validator",
    model="gemini-2.5-flash",
    instruction="Validate the reviewed code.",
    output_schema=ValidationResult,
)

result = await evolve_group(
    agents={
        "generator": generator,
        "critic": critic,
        "validator": validator,
    },
    primary="validator",
    trainset=training_data,
    components={
        "generator": ["instruction", "output_schema"],
        "critic": ["instruction"],
        "validator": ["instruction"],
    },
)

# Access evolved components using qualified names
print(result.evolved_components["generator.instruction"])
print(result.evolved_components["critic.instruction"])
print(result.evolved_components["validator.instruction"])

Exclude an agent from evolution:

result = await evolve_group(
    agents={"generator": gen, "static_validator": val},
    primary="generator",
    trainset=training_data,
    components={
        "generator": ["instruction"],
        "static_validator": [],  # Excluded from evolution
    },
)

Using custom session service for persistence:

from google.adk.sessions import SqliteSessionService

# Use SQLite for session persistence
session_service = SqliteSessionService(db_path="evolution_sessions.db")

result = await evolve_group(
    agents={"generator": gen, "critic": critic},
    primary="critic",
    trainset=training_data,
    session_service=session_service,  # Sessions persisted to SQLite
)

Using App/Runner for existing infrastructure integration:

from google.adk.runners import Runner
from google.adk.sessions import DatabaseSessionService

# Configure Runner with your production session service
runner = Runner(
    app_name="my_app",
    agent=generator,  # Any agent from the group
    session_service=DatabaseSessionService(connection_string="..."),
)

# Evolution uses Runner's session_service for all operations
result = await evolve_group(
    agents={"generator": gen, "refiner": ref},
    primary="refiner",
    trainset=training_data,
    runner=runner,  # Services extracted from runner
)
Note

Breaking change in v0.3.x: The agents parameter changed from list[LlmAgent] to dict[str, LlmAgent]. Candidate keys now use qualified names (agent.component) instead of {agent_name}_instruction.

For reproducible evolution, pass a seeded config: config=EvolutionConfig(seed=42).

Source code in src/gepa_adk/api.py
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
async def evolve_group(
    agents: dict[str, LlmAgent],
    primary: str,
    trainset: list[dict[str, Any]],
    *,
    components: dict[str, list[str]] | None = None,
    critic: LlmAgent | None = None,
    share_session: bool = True,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol | str | None = None,
    reflection_agent: LlmAgent | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    workflow: SequentialAgent | LoopAgent | ParallelAgent | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult:
    """Evolve multiple agents together with per-agent component configuration.

    Optimizes specified components for each agent by targeting the primary
    agent's output score. When share_session=True, agents execute sequentially
    with shared session state, enabling later agents to access earlier
    agents' outputs via template strings.

    Args:
        agents: Named ADK agents to evolve together as dict mapping agent
            names to LlmAgent instances. Must have at least one agent.
        primary: Name of the agent whose output is used for scoring.
            Must match one of the agent names in the dict.
        trainset: Training examples for evaluation. Each example should
            have an "input" key and optionally an "expected" key.

    Keyword Args:
        components: Per-agent component configuration mapping agent names
            to lists of component names to evolve. If None, defaults to
            evolving "instruction" for all agents. Use empty list to
            exclude an agent from evolution. Available component names:
            "instruction", "output_schema", "generate_content_config".
        critic: Optional critic agent for scoring. If None, the primary
            agent must have an output_schema for schema-based scoring.
        share_session: Whether agents share session state during
            execution. When True (default), uses SequentialAgent.
            When False, agents execute with isolated sessions.
        config: Evolution configuration. If None, uses EvolutionConfig
            defaults.
        state_guard: Optional StateGuard instance for validating and
            repairing state injection tokens in evolved instructions.
        component_selector: Optional selector instance or selector name for
            choosing which components to update.
        reflection_agent: Optional ADK agent for proposals. If None, creates a
            default reflection agent using config.reflection_model.
        trajectory_config: Trajectory capture settings (uses defaults if None).
        workflow: Optional original workflow structure to preserve during
            evaluation. When provided, LoopAgent iterations and ParallelAgent
            concurrency are preserved instead of flattening to SequentialAgent.
            Used internally by evolve_workflow(); not typically set directly.
        session_service: Optional ADK session service for state management.
            If None (default), creates an InMemorySessionService internally.
            Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService)
            to persist sessions alongside other agent executions in a shared database.
        app: Optional ADK App instance. When provided, evolution uses the app's
            configuration. Note that App does not hold services directly; pass
            a Runner for service extraction, or combine with session_service param.
        runner: Optional ADK Runner instance. When provided, evolution extracts
            and uses the runner's session_service for all agent executions
            (evolved agents, critic, and reflection agent). Takes precedence
            over both app and session_service parameters. This enables seamless
            integration with existing ADK infrastructure.

    Returns:
        MultiAgentEvolutionResult containing evolved_components dict
        mapping qualified component names (agent.component format) to their
        optimized values, along with score metrics, iteration history,
        original_components (filtered to the qualified keyspace),
        schema_version, and stop_reason propagated from the engine result.

    Raises:
        ConfigurationError: If pre-flight validation fails: invalid agent
            names, non-LlmAgent critic, empty trainset, duplicate or empty
            component names per agent, or EvolutionConfig consistency errors.
        MultiAgentValidationError: If agents dict is empty, primary agent
            not found, or no scorer and primary lacks output_schema.
        ValueError: If components mapping contains unknown agents, unknown
            component handlers, or is missing entries for agents.
        EvolutionError: If evolution fails during execution.

    Examples:
        Basic usage with per-agent components (API v0.3.x):

        ```python
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve_group

        generator = LlmAgent(
            name="generator",
            model="gemini-2.5-flash",
            instruction="Generate code based on the requirement.",
        )
        critic = LlmAgent(
            name="critic",
            model="gemini-2.5-flash",
            instruction="Review the code in {generator_output}.",
        )
        validator = LlmAgent(
            name="validator",
            model="gemini-2.5-flash",
            instruction="Validate the reviewed code.",
            output_schema=ValidationResult,
        )

        result = await evolve_group(
            agents={
                "generator": generator,
                "critic": critic,
                "validator": validator,
            },
            primary="validator",
            trainset=training_data,
            components={
                "generator": ["instruction", "output_schema"],
                "critic": ["instruction"],
                "validator": ["instruction"],
            },
        )

        # Access evolved components using qualified names
        print(result.evolved_components["generator.instruction"])
        print(result.evolved_components["critic.instruction"])
        print(result.evolved_components["validator.instruction"])
        ```

        Exclude an agent from evolution:

        ```python
        result = await evolve_group(
            agents={"generator": gen, "static_validator": val},
            primary="generator",
            trainset=training_data,
            components={
                "generator": ["instruction"],
                "static_validator": [],  # Excluded from evolution
            },
        )
        ```

        Using custom session service for persistence:

        ```python
        from google.adk.sessions import SqliteSessionService

        # Use SQLite for session persistence
        session_service = SqliteSessionService(db_path="evolution_sessions.db")

        result = await evolve_group(
            agents={"generator": gen, "critic": critic},
            primary="critic",
            trainset=training_data,
            session_service=session_service,  # Sessions persisted to SQLite
        )
        ```

        Using App/Runner for existing infrastructure integration:

        ```python
        from google.adk.runners import Runner
        from google.adk.sessions import DatabaseSessionService

        # Configure Runner with your production session service
        runner = Runner(
            app_name="my_app",
            agent=generator,  # Any agent from the group
            session_service=DatabaseSessionService(connection_string="..."),
        )

        # Evolution uses Runner's session_service for all operations
        result = await evolve_group(
            agents={"generator": gen, "refiner": ref},
            primary="refiner",
            trainset=training_data,
            runner=runner,  # Services extracted from runner
        )
        ```

    Note:
        Breaking change in v0.3.x: The `agents` parameter changed from
        `list[LlmAgent]` to `dict[str, LlmAgent]`. Candidate keys now use
        qualified names (agent.component) instead of {agent_name}_instruction.

        For reproducible evolution, pass a seeded config:
        ``config=EvolutionConfig(seed=42)``.
    """
    # Pre-flight validation (T012a + Story 2.5)
    _pre_flight_validate_group(agents, trainset, critic, components)

    # Default components: evolve "instruction" for all agents
    if components is None:
        components = {name: ["instruction"] for name in agents}

    # Capture original instructions for StateGuard validation
    original_instructions = {
        name: str(agent.instruction) for name, agent in agents.items()
    }

    # Log precedence warnings if multiple config sources provided (#227 T009)
    if runner is not None and app is not None:
        logger.warning(
            "evolve_group.precedence.runner_over_app",
            message="Both runner and app provided; using runner (runner takes precedence)",
            runner_app_name=runner.app_name,
            app_name=app.name,
        )

    # Resolve services using precedence rules: runner > app > session_service > default (#227)
    resolved_session_service, _artifact_service = _resolve_evolution_services(
        runner=runner,
        app=app,
        session_service=session_service,
    )

    # Resolve app_name for session isolation (#239)
    resolved_app_name = _resolve_app_name(runner=runner, app=app)

    # Create unified executor for consistent session management (FR-003)
    executor = AgentExecutor(
        session_service=resolved_session_service,
        app_name=resolved_app_name,
    )

    # Build scorer with executor (FR-005)
    scorer = None
    if critic:
        scorer = CriticScorer(critic_agent=critic, executor=executor)

    # Resolve config for reflection_model
    resolved_config = config or EvolutionConfig()
    rng = (
        random.Random(resolved_config.seed)
        if resolved_config.seed is not None
        else None
    )

    # Create reflection-based proposer with executor (FR-006)
    # Use provided reflection_agent or create a default one
    if reflection_agent is None:
        reflection_agent = LlmAgent(
            name="reflection_agent",
            model=_resolve_model_for_agent(resolved_config.reflection_model),
            instruction=resolved_config.reflection_prompt or REFLECTION_INSTRUCTION,
        )
    adk_reflection_fn = create_adk_reflection_fn(
        reflection_agent,
        executor=executor,
    )
    proposer = AsyncReflectiveMutationProposer(adk_reflection_fn=adk_reflection_fn)

    # Create adapter with executor (FR-004)
    adapter = MultiAgentAdapter(
        agents=agents,
        primary=primary,
        components=components,
        scorer=scorer,
        share_session=share_session,
        session_service=resolved_session_service,
        trajectory_config=trajectory_config,
        proposer=proposer,
        executor=executor,
        workflow=workflow,  # Preserve workflow structure (#215)
    )

    # Build seed candidate using qualified names (agent.component format per ADR-012)
    primary_agent = agents[primary]
    # Extract all configured components using their handlers
    seed_candidate_components: dict[str, str] = {}
    for agent_name, comp_list in components.items():
        agent = agents[agent_name]
        for comp_name in comp_list:
            qualified_name = f"{agent_name}.{comp_name}"
            handler = get_handler(comp_name)
            seed_candidate_components[qualified_name] = handler.serialize(agent)
    # Add required "instruction" key for engine compatibility
    seed_candidate_components["instruction"] = str(primary_agent.instruction)
    initial_candidate = Candidate(components=seed_candidate_components)

    # Create engine
    resolved_component_selector: ComponentSelectorProtocol | None = None
    if component_selector is not None:
        if isinstance(component_selector, str):
            resolved_component_selector = create_component_selector(component_selector)
        else:
            resolved_component_selector = component_selector

    engine = AsyncGEPAEngine(
        adapter=adapter,
        config=resolved_config,
        initial_candidate=initial_candidate,
        batch=trainset,
        component_selector=resolved_component_selector,
        rng=rng,
    )

    # Run evolution
    evolution_result = await engine.run()

    # Extract best candidate components from engine state using qualified names
    # The engine stores evolved_components from the candidate, which now uses
    # qualified names (agent.component format per ADR-012)
    evolved_components = _extract_evolved_components(
        evolution_result=evolution_result,
        seed_components=seed_candidate_components,
        agents=agents,
        components=components,
        primary=primary,
    )

    # Apply StateGuard validation to instruction components only
    if state_guard is not None:
        validated_components = {}
        for qualified_name, evolved_value in evolved_components.items():
            # Only apply StateGuard to instruction components
            if qualified_name.endswith(".instruction"):
                agent_name = qualified_name.rsplit(".", 1)[0]
                original_instruction = original_instructions.get(agent_name, "")
                validated_components[qualified_name] = _apply_state_guard_validation(
                    state_guard=state_guard,
                    original_component_text=original_instruction,
                    evolved_component_text=evolved_value,
                    agent_name=agent_name,
                )
            else:
                # Non-instruction components pass through unchanged
                validated_components[qualified_name] = evolved_value
        evolved_components = validated_components

    # Filter original_components to the qualified keyspace used by
    # evolved_components, stripping engine-internal keys (e.g. bare
    # "instruction" added for engine compatibility).
    orig = evolution_result.original_components
    if orig is not None:
        orig = {k: v for k, v in orig.items() if k in evolved_components}

    # Convert EvolutionResult to MultiAgentEvolutionResult
    return MultiAgentEvolutionResult(
        schema_version=evolution_result.schema_version,
        stop_reason=evolution_result.stop_reason,
        evolved_components=evolved_components,
        original_score=evolution_result.original_score,
        final_score=evolution_result.final_score,
        primary_agent=primary,
        iteration_history=evolution_result.iteration_history,
        total_iterations=evolution_result.total_iterations,
        original_components=orig,
    )

evolve_workflow async

evolve_workflow(
    workflow: SequentialAgent | LoopAgent | ParallelAgent,
    trainset: list[dict[str, Any]],
    *,
    critic: LlmAgent | None = None,
    primary: str | None = None,
    max_depth: int = 5,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol
    | str
    | None = None,
    round_robin: bool = False,
    components: dict[str, list[str]] | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult

Evolve LlmAgents within a workflow agent structure.

Discovers all LlmAgent instances within a workflow (SequentialAgent, LoopAgent, or ParallelAgent) and evolves them while preserving the workflow structure. Uses shared session state to maintain workflow context during evaluation.

PARAMETER DESCRIPTION
workflow

Workflow agent containing LlmAgents to evolve. Must be SequentialAgent, LoopAgent, or ParallelAgent.

TYPE: SequentialAgent | LoopAgent | ParallelAgent

trainset

Training examples for evaluation. Each example should have an "input" key and optionally an "expected" key.

TYPE: list[dict[str, Any]]

PARAMETER DESCRIPTION
critic

Optional critic agent for scoring. If None, the primary agent must have an output_schema for schema-based scoring.

TYPE: LlmAgent | None

primary

Name of the agent to score. Defaults to the last LlmAgent found in the workflow (for sequential workflows, this is typically the final output producer).

TYPE: str | None

max_depth

Maximum recursion depth for nested workflows (default: 5). Limits how deeply nested workflow structures are traversed.

TYPE: int

config

Evolution configuration. If None, uses EvolutionConfig defaults.

TYPE: EvolutionConfig | None

state_guard

Optional StateGuard instance for validating and repairing state injection tokens in evolved component_text.

TYPE: StateGuard | None

component_selector

Optional selector instance or selector name for choosing which components to update.

TYPE: ComponentSelectorProtocol | str | None

round_robin

If False (default), only the first discovered agent's instruction is evolved across all iterations. If True, all agents' instructions are evolved in round-robin fashion (the engine cycles through agents each iteration). Ignored when components is provided.

TYPE: bool

components

Optional per-agent component configuration mapping agent names to lists of component names to evolve. When provided, takes precedence over round_robin. Use empty list to exclude an agent.

TYPE: dict[str, list[str]] | None

session_service

Optional ADK session service for state management. If None (default), creates an InMemorySessionService internally. Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService) to persist sessions alongside other agent executions in a shared database.

TYPE: BaseSessionService | None

app

Optional ADK App instance. When provided, evolution uses the app's configuration. Note that App does not hold services directly; pass a Runner for service extraction, or combine with session_service param.

TYPE: App | None

runner

Optional ADK Runner instance. When provided, evolution extracts and uses the runner's session_service for all agent executions (evolved agents, critic, and reflection agent). Takes precedence over both app and session_service parameters. This enables seamless integration with existing ADK infrastructure.

TYPE: Runner | None

RETURNS DESCRIPTION
MultiAgentEvolutionResult

MultiAgentEvolutionResult containing evolved_components dict mapping

MultiAgentEvolutionResult

agent names to their optimized component_text, along with score

MultiAgentEvolutionResult

metrics and iteration history.

RAISES DESCRIPTION
ConfigurationError

If pre-flight validation fails: non-LlmAgent critic, empty trainset, duplicate or empty component names, or EvolutionConfig consistency errors.

WorkflowEvolutionError

If workflow contains no LlmAgents.

MultiAgentValidationError

If primary agent not found or no scorer available.

EvolutionError

If evolution fails during execution.

Examples:

Default behavior (evolve first agent only):

from google.adk.agents import LlmAgent, SequentialAgent
from gepa_adk import evolve_workflow

generator = LlmAgent(name="generator", instruction="Generate code")
refiner = LlmAgent(name="refiner", instruction="Refine code")
writer = LlmAgent(name="writer", instruction="Write docs")
pipeline = SequentialAgent(
    name="Pipeline", sub_agents=[generator, refiner, writer]
)

# Only generator.instruction is evolved across all iterations
result = await evolve_workflow(workflow=pipeline, trainset=trainset)

Round-robin evolution (evolve all agents):

# All agents are evolved in round-robin: generator -> refiner -> writer -> ...
result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    round_robin=True,
)

Explicit components override (takes precedence over round_robin):

# Only generator and writer are evolved; refiner is excluded
result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    components={
        "generator": ["instruction"],
        "writer": ["instruction"],
        "refiner": [],  # Excluded
    },
)

Using custom session service for persistence:

from google.adk.sessions import SqliteSessionService

# Persist workflow evolution sessions to SQLite
session_service = SqliteSessionService(db_path="workflow_sessions.db")

result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    session_service=session_service,
)

Using App/Runner for existing infrastructure integration:

from google.adk.runners import Runner
from google.adk.sessions import DatabaseSessionService

# Configure Runner with your production session service
runner = Runner(
    app_name="my_workflow_app",
    agent=pipeline,  # The workflow agent
    session_service=DatabaseSessionService(connection_string="..."),
)

# Evolution uses Runner's session_service for all operations
result = await evolve_workflow(
    workflow=pipeline,
    trainset=trainset,
    runner=runner,  # Services extracted from runner
)
Note

Pre-flight validation runs synchronously before any LLM calls. Supports workflow agents (SequentialAgent, LoopAgent, ParallelAgent) with recursive traversal and depth limiting via max_depth parameter. Handles nested structures. LoopAgent and ParallelAgent configurations (max_iterations, etc.) are preserved during evolution. Always uses share_session=True to maintain workflow context (FR-010).

For reproducible evolution, pass a seeded config: config=EvolutionConfig(seed=42).

Source code in src/gepa_adk/api.py
async def evolve_workflow(
    workflow: SequentialAgent | LoopAgent | ParallelAgent,
    trainset: list[dict[str, Any]],
    *,
    critic: LlmAgent | None = None,
    primary: str | None = None,
    max_depth: int = 5,
    config: EvolutionConfig | None = None,
    state_guard: StateGuard | None = None,
    component_selector: ComponentSelectorProtocol | str | None = None,
    round_robin: bool = False,
    components: dict[str, list[str]] | None = None,
    session_service: BaseSessionService | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> MultiAgentEvolutionResult:
    """Evolve LlmAgents within a workflow agent structure.

    Discovers all LlmAgent instances within a workflow (SequentialAgent,
    LoopAgent, or ParallelAgent) and evolves them while preserving the
    workflow structure. Uses shared session state to maintain workflow
    context during evaluation.

    Args:
        workflow: Workflow agent containing LlmAgents to evolve. Must be
            SequentialAgent, LoopAgent, or ParallelAgent.
        trainset: Training examples for evaluation. Each example should have
            an "input" key and optionally an "expected" key.

    Keyword Args:
        critic: Optional critic agent for scoring. If None, the primary agent
            must have an output_schema for schema-based scoring.
        primary: Name of the agent to score. Defaults to the last LlmAgent
            found in the workflow (for sequential workflows, this is typically
            the final output producer).
        max_depth: Maximum recursion depth for nested workflows (default: 5).
            Limits how deeply nested workflow structures are traversed.
        config: Evolution configuration. If None, uses EvolutionConfig defaults.
        state_guard: Optional StateGuard instance for validating and
            repairing state injection tokens in evolved component_text.
        component_selector: Optional selector instance or selector name for
            choosing which components to update.
        round_robin: If False (default), only the first discovered agent's
            instruction is evolved across all iterations. If True, all agents'
            instructions are evolved in round-robin fashion (the engine cycles
            through agents each iteration). Ignored when components is provided.
        components: Optional per-agent component configuration mapping agent
            names to lists of component names to evolve. When provided, takes
            precedence over round_robin. Use empty list to exclude an agent.
        session_service: Optional ADK session service for state management.
            If None (default), creates an InMemorySessionService internally.
            Pass a custom service (e.g., SqliteSessionService, DatabaseSessionService)
            to persist sessions alongside other agent executions in a shared database.
        app: Optional ADK App instance. When provided, evolution uses the app's
            configuration. Note that App does not hold services directly; pass
            a Runner for service extraction, or combine with session_service param.
        runner: Optional ADK Runner instance. When provided, evolution extracts
            and uses the runner's session_service for all agent executions
            (evolved agents, critic, and reflection agent). Takes precedence
            over both app and session_service parameters. This enables seamless
            integration with existing ADK infrastructure.

    Returns:
        MultiAgentEvolutionResult containing evolved_components dict mapping
        agent names to their optimized component_text, along with score
        metrics and iteration history.

    Raises:
        ConfigurationError: If pre-flight validation fails: non-LlmAgent
            critic, empty trainset, duplicate or empty component names,
            or EvolutionConfig consistency errors.
        WorkflowEvolutionError: If workflow contains no LlmAgents.
        MultiAgentValidationError: If primary agent not found or no scorer
            available.
        EvolutionError: If evolution fails during execution.

    Examples:
        Default behavior (evolve first agent only):

        ```python
        from google.adk.agents import LlmAgent, SequentialAgent
        from gepa_adk import evolve_workflow

        generator = LlmAgent(name="generator", instruction="Generate code")
        refiner = LlmAgent(name="refiner", instruction="Refine code")
        writer = LlmAgent(name="writer", instruction="Write docs")
        pipeline = SequentialAgent(
            name="Pipeline", sub_agents=[generator, refiner, writer]
        )

        # Only generator.instruction is evolved across all iterations
        result = await evolve_workflow(workflow=pipeline, trainset=trainset)
        ```

        Round-robin evolution (evolve all agents):

        ```python
        # All agents are evolved in round-robin: generator -> refiner -> writer -> ...
        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            round_robin=True,
        )
        ```

        Explicit components override (takes precedence over round_robin):

        ```python
        # Only generator and writer are evolved; refiner is excluded
        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            components={
                "generator": ["instruction"],
                "writer": ["instruction"],
                "refiner": [],  # Excluded
            },
        )
        ```

        Using custom session service for persistence:

        ```python
        from google.adk.sessions import SqliteSessionService

        # Persist workflow evolution sessions to SQLite
        session_service = SqliteSessionService(db_path="workflow_sessions.db")

        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            session_service=session_service,
        )
        ```

        Using App/Runner for existing infrastructure integration:

        ```python
        from google.adk.runners import Runner
        from google.adk.sessions import DatabaseSessionService

        # Configure Runner with your production session service
        runner = Runner(
            app_name="my_workflow_app",
            agent=pipeline,  # The workflow agent
            session_service=DatabaseSessionService(connection_string="..."),
        )

        # Evolution uses Runner's session_service for all operations
        result = await evolve_workflow(
            workflow=pipeline,
            trainset=trainset,
            runner=runner,  # Services extracted from runner
        )
        ```

    Note:
        Pre-flight validation runs synchronously before any LLM calls.
        Supports workflow agents (SequentialAgent, LoopAgent, ParallelAgent)
        with recursive traversal and depth limiting via max_depth parameter.
        Handles nested structures. LoopAgent and ParallelAgent configurations
        (max_iterations, etc.) are preserved during evolution. Always uses
        share_session=True to maintain workflow context (FR-010).

        For reproducible evolution, pass a seeded config:
        ``config=EvolutionConfig(seed=42)``.
    """
    # Pre-flight validation (Story 2.5)
    _pre_flight_validate_workflow(trainset, critic, components)

    logger.info(
        "Starting workflow evolution",
        workflow_name=workflow.name,
        workflow_type=type(workflow).__name__,
    )

    # Find all LlmAgents in the workflow recursively up to max_depth (US3)
    llm_agents = find_llm_agents(workflow, max_depth=max_depth)

    # Validate that at least one LlmAgent was found
    if not llm_agents:
        error_msg = (
            f"No LlmAgents found in workflow '{workflow.name}'. "
            "Workflow must contain at least one LlmAgent to evolve."
        )
        logger.error(
            "Workflow evolution failed", workflow_name=workflow.name, error=error_msg
        )
        raise WorkflowEvolutionError(
            error_msg,
            workflow_name=workflow.name,
        )

    logger.info(
        "Found LlmAgents in workflow",
        workflow_name=workflow.name,
        agent_count=len(llm_agents),
        agent_names=[agent.name for agent in llm_agents],
    )

    # Determine primary agent (default to last agent for sequential workflows)
    if primary is None:
        primary = llm_agents[-1].name
        logger.debug(
            "Using default primary agent",
            workflow_name=workflow.name,
            primary=primary,
        )

    # Convert list to dict for evolve_group (API v0.3.x)
    agents_dict = {agent.name: agent for agent in llm_agents}

    # Build components dict based on round_robin flag
    # Explicit components parameter takes precedence over round_robin
    resolved_components: dict[str, list[str]] | None = None
    if components is not None:
        # Explicit components provided - use as-is
        resolved_components = components
        logger.debug(
            "Using explicit components",
            workflow_name=workflow.name,
            components=list(components.keys()),
        )
    elif round_robin:
        # round_robin=True: evolve all agents
        resolved_components = {agent.name: ["instruction"] for agent in llm_agents}
        logger.debug(
            "Using round_robin mode - evolving all agents",
            workflow_name=workflow.name,
            agents=[agent.name for agent in llm_agents],
        )
    else:
        # Default: evolve only the first agent
        first_agent = llm_agents[0]
        resolved_components = {first_agent.name: ["instruction"]}
        # Add empty lists for other agents (excluded from evolution)
        for agent in llm_agents[1:]:
            resolved_components[agent.name] = []
        logger.debug(
            "Using default mode - evolving first agent only",
            workflow_name=workflow.name,
            first_agent=first_agent.name,
        )

    # Delegate to evolve_group with share_session=True (FR-010)
    logger.debug(
        "Delegating to evolve_group",
        workflow_name=workflow.name,
        agent_count=len(llm_agents),
        primary=primary,
        share_session=True,
        round_robin=round_robin,
    )

    return await evolve_group(
        agents=agents_dict,
        primary=primary,
        trainset=trainset,
        components=resolved_components,
        critic=critic,
        share_session=True,  # FR-010: Always use shared session for workflow context
        config=config,
        state_guard=state_guard,
        component_selector=component_selector,
        workflow=workflow,  # Preserve workflow structure (#215)
        session_service=session_service,  # Pass through for persistence (#226)
        app=app,  # Pass through for App/Runner pattern (#227)
        runner=runner,  # Pass through for App/Runner pattern (#227)
    )

evolve async

evolve(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    *,
    valset: list[dict[str, Any]] | None = None,
    critic: LlmAgent | None = None,
    reflection_agent: LlmAgent | None = None,
    config: EvolutionConfig | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    state_guard: StateGuard | None = None,
    candidate_selector: CandidateSelectorProtocol
    | str
    | None = None,
    component_selector: ComponentSelectorProtocol
    | str
    | None = None,
    executor: AgentExecutorProtocol | None = None,
    components: list[str] | None = None,
    schema_constraints: SchemaConstraints | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> EvolutionResult

Evolve an ADK agent's instruction.

Optimizes the instruction for a single ADK agent using evolutionary optimization. The agent's instruction is iteratively improved based on performance on the training set.

PARAMETER DESCRIPTION
agent

The ADK LlmAgent to evolve.

TYPE: LlmAgent

trainset

Training examples [{"input": "...", "expected": "..."}].

TYPE: list[dict[str, Any]]

PARAMETER DESCRIPTION
valset

Optional validation examples used for scoring and acceptance. Defaults to the trainset when omitted.

TYPE: list[dict[str, Any]] | None

critic

Optional ADK agent for scoring (uses schema scoring if None).

TYPE: LlmAgent | None

reflection_agent

Optional ADK agent for proposals. If None, creates a default reflection agent using config.reflection_model.

TYPE: LlmAgent | None

config

Evolution configuration (uses defaults if None).

TYPE: EvolutionConfig | None

trajectory_config

Trajectory capture settings (uses defaults if None).

TYPE: TrajectoryConfig | None

state_guard

Optional state token preservation settings.

TYPE: StateGuard | None

candidate_selector

Optional selector instance or selector name.

TYPE: CandidateSelectorProtocol | str | None

component_selector

Optional selector instance or selector name for choosing which components to update.

TYPE: ComponentSelectorProtocol | str | None

executor

Optional AgentExecutorProtocol implementation for unified agent execution. When provided, both the ADKAdapter and CriticScorer use this executor for consistent session management and execution. If None, creates an AgentExecutor automatically.

TYPE: AgentExecutorProtocol | None

components

List of component names to include in evolution. Supported: - "instruction": The agent's instruction text (default if None). - "output_schema": The agent's Pydantic output_schema (serialized). When None, defaults to ["instruction"]. Use ["output_schema"] with a schema reflection agent to evolve the output schema.

TYPE: list[str] | None

schema_constraints

Optional SchemaConstraints for output_schema evolution. When provided, proposed schema mutations are validated against these constraints. Mutations that violate constraints (e.g., remove required fields) are rejected and the original schema is preserved.

TYPE: SchemaConstraints | None

app

Optional ADK App instance. When provided, evolution uses the app's configuration. Note that App does not hold services directly; pass a Runner for service extraction, or combine with session_service param. See the App/Runner integration guide for details.

TYPE: App | None

runner

Optional ADK Runner instance. When provided, evolution extracts and uses the runner's session_service for all agent executions (evolved agents, critic, and reflection agent). Takes precedence over both app and executor parameters. This enables seamless integration with existing ADK infrastructure.

TYPE: Runner | None

RETURNS DESCRIPTION
EvolutionResult

EvolutionResult with evolved_components dict and metrics.

RAISES DESCRIPTION
ConfigurationError

If invalid parameters provided, including pre-flight validation failures: non-LlmAgent agent or critic, empty trainset, duplicate or empty component names, missing critic and output_schema, or EvolutionConfig consistency errors.

EvolutionError

If evolution fails during execution.

Note

Pre-flight validation runs synchronously before any LLM calls. Single-agent evolution with trainset reflection and valset scoring.

For reproducible evolution, pass a seeded config: config=EvolutionConfig(seed=42).

Examples:

Basic usage with output_schema:

from pydantic import BaseModel, Field
from google.adk.agents import LlmAgent
from gepa_adk import evolve


class OutputSchema(BaseModel):
    answer: str
    score: float = Field(ge=0.0, le=1.0)


agent = LlmAgent(
    name="assistant",
    model="gemini-2.5-flash",
    instruction="You are a helpful assistant.",
    output_schema=OutputSchema,
)

trainset = [
    {"input": "What is 2+2?", "expected": "4"},
    {"input": "What is the capital of France?", "expected": "Paris"},
]

result = await evolve(agent, trainset)
print(f"Evolved: {result.evolved_components['instruction']}")

With critic agent:

from pydantic import BaseModel, Field
from google.adk.agents import LlmAgent
from gepa_adk import evolve


class CriticOutput(BaseModel):
    score: float = Field(ge=0.0, le=1.0)


critic = LlmAgent(
    name="critic",
    model="gemini-2.5-flash",
    instruction="Score the response quality.",
    output_schema=CriticOutput,
)

result = await evolve(agent, trainset, critic=critic)

Evolving output_schema with schema reflection:

from gepa_adk.adapters.agents.reflection_agents import (
    create_schema_reflection_agent,
)

# Create schema reflection agent with validation tool
schema_reflector = create_schema_reflection_agent("gemini-2.5-flash")

# Evolve output_schema component
result = await evolve(
    agent,
    trainset,
    critic=critic,
    reflection_agent=schema_reflector,
    components=["output_schema"],  # Evolve schema, not instruction
)
print(f"Evolved schema: {result.evolved_components['output_schema']}")

Using App/Runner for existing infrastructure integration:

from google.adk.apps.app import App
from google.adk.runners import Runner
from google.adk.sessions import DatabaseSessionService

# Configure Runner with your production session service
session_service = DatabaseSessionService(connection_string="...")
runner = Runner(
    app_name="my_app",
    agent=agent,
    session_service=session_service,
)

# Evolution uses your Runner's session_service for all operations
result = await evolve(
    agent,
    trainset,
    runner=runner,  # Services extracted from runner
)
Source code in src/gepa_adk/api.py
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
async def evolve(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    *,
    valset: list[dict[str, Any]] | None = None,
    critic: LlmAgent | None = None,
    reflection_agent: LlmAgent | None = None,
    config: EvolutionConfig | None = None,
    trajectory_config: TrajectoryConfig | None = None,
    state_guard: StateGuard | None = None,
    candidate_selector: CandidateSelectorProtocol | str | None = None,
    component_selector: ComponentSelectorProtocol | str | None = None,
    executor: AgentExecutorProtocol | None = None,
    components: list[str] | None = None,
    schema_constraints: SchemaConstraints | None = None,
    app: App | None = None,
    runner: Runner | None = None,
) -> EvolutionResult:
    """Evolve an ADK agent's instruction.

    Optimizes the instruction for a single ADK agent using evolutionary
    optimization. The agent's instruction is iteratively improved based on
    performance on the training set.

    Args:
        agent: The ADK LlmAgent to evolve.
        trainset: Training examples [{"input": "...", "expected": "..."}].

    Keyword Args:
        valset: Optional validation examples used for scoring and acceptance.
            Defaults to the trainset when omitted.
        critic: Optional ADK agent for scoring (uses schema scoring if None).
        reflection_agent: Optional ADK agent for proposals. If None, creates a
            default reflection agent using config.reflection_model.
        config: Evolution configuration (uses defaults if None).
        trajectory_config: Trajectory capture settings (uses defaults if None).
        state_guard: Optional state token preservation settings.
        candidate_selector: Optional selector instance or selector name.
        component_selector: Optional selector instance or selector name for
            choosing which components to update.
        executor: Optional AgentExecutorProtocol implementation for unified
            agent execution. When provided, both the ADKAdapter and CriticScorer
            use this executor for consistent session management and execution.
            If None, creates an AgentExecutor automatically.
        components: List of component names to include in evolution. Supported:
            - "instruction": The agent's instruction text (default if None).
            - "output_schema": The agent's Pydantic output_schema (serialized).
            When None, defaults to ["instruction"]. Use ["output_schema"] with
            a schema reflection agent to evolve the output schema.
        schema_constraints: Optional SchemaConstraints for output_schema evolution.
            When provided, proposed schema mutations are validated against these
            constraints. Mutations that violate constraints (e.g., remove required
            fields) are rejected and the original schema is preserved.
        app: Optional ADK App instance. When provided, evolution uses the app's
            configuration. Note that App does not hold services directly; pass
            a Runner for service extraction, or combine with session_service param.
            See the App/Runner integration guide for details.
        runner: Optional ADK Runner instance. When provided, evolution extracts
            and uses the runner's session_service for all agent executions
            (evolved agents, critic, and reflection agent). Takes precedence
            over both app and executor parameters. This enables seamless
            integration with existing ADK infrastructure.

    Returns:
        EvolutionResult with evolved_components dict and metrics.

    Raises:
        ConfigurationError: If invalid parameters provided, including
            pre-flight validation failures: non-LlmAgent agent or critic,
            empty trainset, duplicate or empty component names, missing
            critic and output_schema, or EvolutionConfig consistency errors.
        EvolutionError: If evolution fails during execution.

    Note:
        Pre-flight validation runs synchronously before any LLM calls.
        Single-agent evolution with trainset reflection and valset scoring.

        For reproducible evolution, pass a seeded config:
        ``config=EvolutionConfig(seed=42)``.

    Examples:
        Basic usage with output_schema:

        ```python
        from pydantic import BaseModel, Field
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve


        class OutputSchema(BaseModel):
            answer: str
            score: float = Field(ge=0.0, le=1.0)


        agent = LlmAgent(
            name="assistant",
            model="gemini-2.5-flash",
            instruction="You are a helpful assistant.",
            output_schema=OutputSchema,
        )

        trainset = [
            {"input": "What is 2+2?", "expected": "4"},
            {"input": "What is the capital of France?", "expected": "Paris"},
        ]

        result = await evolve(agent, trainset)
        print(f"Evolved: {result.evolved_components['instruction']}")
        ```

        With critic agent:

        ```python
        from pydantic import BaseModel, Field
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve


        class CriticOutput(BaseModel):
            score: float = Field(ge=0.0, le=1.0)


        critic = LlmAgent(
            name="critic",
            model="gemini-2.5-flash",
            instruction="Score the response quality.",
            output_schema=CriticOutput,
        )

        result = await evolve(agent, trainset, critic=critic)
        ```

        Evolving output_schema with schema reflection:

        ```python
        from gepa_adk.adapters.agents.reflection_agents import (
            create_schema_reflection_agent,
        )

        # Create schema reflection agent with validation tool
        schema_reflector = create_schema_reflection_agent("gemini-2.5-flash")

        # Evolve output_schema component
        result = await evolve(
            agent,
            trainset,
            critic=critic,
            reflection_agent=schema_reflector,
            components=["output_schema"],  # Evolve schema, not instruction
        )
        print(f"Evolved schema: {result.evolved_components['output_schema']}")
        ```

        Using App/Runner for existing infrastructure integration:

        ```python
        from google.adk.apps.app import App
        from google.adk.runners import Runner
        from google.adk.sessions import DatabaseSessionService

        # Configure Runner with your production session service
        session_service = DatabaseSessionService(connection_string="...")
        runner = Runner(
            app_name="my_app",
            agent=agent,
            session_service=session_service,
        )

        # Evolution uses your Runner's session_service for all operations
        result = await evolve(
            agent,
            trainset,
            runner=runner,  # Services extracted from runner
        )
        ```
    """
    # Pre-flight validation (Story 2.5)
    _pre_flight_validate_evolve(agent, trainset, critic, components)
    required_keys = (
        set(trainset[0].keys()) if trainset and len(trainset) > 0 else {"input"}
    )

    resolved_valset = valset if valset is not None else trainset
    if valset is not None:
        _validate_dataset(
            valset,
            "valset",
            allow_empty=False,
            required_keys=required_keys,
        )

    # Log reflection_agent configuration if provided
    if reflection_agent is not None:
        logger.debug(
            "evolve.reflection_agent.configured",
            agent_name=agent.name,
            reflection_agent_name=reflection_agent.name,
            message="Using ADK reflection agent for instruction improvement",
        )

    # Capture original instruction for StateGuard validation
    original_instruction = str(agent.instruction)

    candidate_selector_label = (
        candidate_selector
        if isinstance(candidate_selector, str)
        else type(candidate_selector).__name__
        if candidate_selector is not None
        else None
    )

    component_selector_label = (
        component_selector
        if isinstance(component_selector, str)
        else type(component_selector).__name__
        if component_selector is not None
        else None
    )

    # Log evolution start
    logger.info(
        "evolve.start",
        agent_name=agent.name,
        trainset_size=len(trainset),
        valset_size=len(resolved_valset),
        valset_defaulted=valset is None,
        has_critic=critic is not None,
        has_reflection_agent=reflection_agent is not None,
        has_state_guard=state_guard is not None,
        candidate_selector=candidate_selector_label,
        component_selector=component_selector_label,
    )

    # Resolve services from runner/app with precedence warnings (#227)
    # Log precedence warnings if multiple config sources provided (T009)
    if runner is not None and app is not None:
        logger.warning(
            "evolve.precedence.runner_over_app",
            message="Both runner and app provided; using runner (runner takes precedence)",
            runner_app_name=runner.app_name,
            app_name=app.name,
        )
    if runner is not None and executor is not None:
        logger.warning(
            "evolve.precedence.runner_over_executor",
            message="Both runner and executor provided; using runner's session_service",
            runner_app_name=runner.app_name,
        )

    # Extract services using precedence rules (T006)
    resolved_session_service, _artifact_service = _resolve_evolution_services(
        runner=runner,
        app=app,
        session_service=None,  # evolve() doesn't have direct session_service param
    )

    # Resolve app_name for session isolation (#239)
    resolved_app_name = _resolve_app_name(runner=runner, app=app)

    # Create executor with resolved session_service (T007)
    # Runner takes precedence over user-provided executor
    if runner is not None:
        resolved_executor = AgentExecutor(
            session_service=resolved_session_service,
            app_name=resolved_app_name,
        )
    else:
        resolved_executor = executor or AgentExecutor(
            session_service=resolved_session_service,
            app_name=resolved_app_name,
        )

    # Build scorer
    scorer: Scorer
    if critic:
        scorer = CriticScorer(critic_agent=critic, executor=resolved_executor)
    elif hasattr(agent, "output_schema") and agent.output_schema is not None:
        # Use schema-based scorer when agent has output_schema
        scorer = SchemaBasedScorer(output_schema=agent.output_schema)
    else:
        raise ConfigurationError(
            "Either critic must be provided or agent must have output_schema",
            field="critic",
            value=None,
            constraint="must provide critic or agent.output_schema",
        )

    # Resolve config
    resolved_config = config or EvolutionConfig()
    rng = (
        random.Random(resolved_config.seed)
        if resolved_config.seed is not None
        else None
    )

    # Create reflection agent if not provided
    resolved_reflection_agent = reflection_agent
    if resolved_reflection_agent is None:
        # Create default reflection agent with config settings
        resolved_reflection_agent = LlmAgent(
            name="reflection_agent",
            model=_resolve_model_for_agent(resolved_config.reflection_model),
            instruction=resolved_config.reflection_prompt or REFLECTION_INSTRUCTION,
        )
        logger.debug(
            "evolve.reflection_agent.default",
            reflection_model=resolved_config.reflection_model,
        )

    # Build proposer chain in the composition root (api.py)
    adk_reflection_fn = create_adk_reflection_fn(
        resolved_reflection_agent,
        executor=resolved_executor,
    )
    proposer = AsyncReflectiveMutationProposer(adk_reflection_fn=adk_reflection_fn)

    # Create adapter with resolved session_service (T008)
    adapter = ADKAdapter(
        agent=agent,
        scorer=scorer,
        trajectory_config=trajectory_config,
        proposer=proposer,
        executor=resolved_executor,
        schema_constraints=schema_constraints,
        session_service=resolved_session_service,
    )

    # Build initial candidate components based on requested components
    resolved_components = components if components else [DEFAULT_COMPONENT_NAME]
    initial_components: dict[str, str] = {}
    original_component_values: dict[str, str] = {}

    for comp_name in resolved_components:
        if comp_name == DEFAULT_COMPONENT_NAME:
            initial_components[comp_name] = original_instruction
            original_component_values[comp_name] = original_instruction
        elif comp_name == COMPONENT_OUTPUT_SCHEMA:
            if not hasattr(agent, "output_schema") or agent.output_schema is None:
                raise ConfigurationError(
                    f"Cannot evolve '{COMPONENT_OUTPUT_SCHEMA}': agent has no output_schema",
                    field="components",
                    value=comp_name,
                    constraint="agent must have output_schema to evolve it",
                )
            schema_text = serialize_pydantic_schema(agent.output_schema)
            initial_components[comp_name] = schema_text
            original_component_values[comp_name] = schema_text
        else:
            raise ConfigurationError(
                f"Unknown component: '{comp_name}'. Supported: "
                f"'{DEFAULT_COMPONENT_NAME}', '{COMPONENT_OUTPUT_SCHEMA}'",
                field="components",
                value=comp_name,
                constraint="must be a supported component name",
            )

    logger.debug(
        "evolve.components.resolved",
        agent_name=agent.name,
        components=resolved_components,
    )

    initial_candidate = Candidate(components=initial_components)

    # Create engine
    resolved_candidate_selector: CandidateSelectorProtocol | None = None
    if candidate_selector is not None:
        if isinstance(candidate_selector, str):
            resolved_candidate_selector = create_candidate_selector(
                candidate_selector, rng=rng
            )
        else:
            resolved_candidate_selector = candidate_selector

    resolved_component_selector: ComponentSelectorProtocol | None = None
    if component_selector is not None:
        if isinstance(component_selector, str):
            resolved_component_selector = create_component_selector(component_selector)
        else:
            resolved_component_selector = component_selector

    engine = AsyncGEPAEngine(
        adapter=adapter,
        config=resolved_config,
        initial_candidate=initial_candidate,
        batch=trainset,
        valset=resolved_valset,
        candidate_selector=resolved_candidate_selector,
        component_selector=resolved_component_selector,
        rng=rng,
    )

    # Run evolution with cleanup
    try:
        result = await engine.run()

        valset_score = result.valset_score
        trainset_score = result.trainset_score

        if trainset_score is not None:
            logger.info(
                "evolve.trainset.scored",
                agent_name=agent.name,
                trainset_size=len(trainset),
                trainset_score=trainset_score,
            )
        if valset_score is not None:
            logger.info(
                "evolve.valset.scored",
                agent_name=agent.name,
                valset_size=len(resolved_valset),
                valset_score=valset_score,
                valset_defaulted=valset is None,
            )

        # Apply state guard validation if provided (for token preservation)
        # Only applies to text components (instruction), not to output_schema
        validated_components = dict(result.evolved_components)
        for comp_name in resolved_components:
            if comp_name in result.evolved_components:
                if comp_name == DEFAULT_COMPONENT_NAME and state_guard is not None:
                    validated_components[comp_name] = _apply_state_guard_validation(
                        state_guard=state_guard,
                        original_component_text=original_component_values[comp_name],
                        evolved_component_text=result.evolved_components[comp_name],
                        agent_name=agent.name,
                    )
                else:
                    validated_components[comp_name] = result.evolved_components[
                        comp_name
                    ]

        # Log evolution completion
        logger.info(
            "evolve.complete",
            agent_name=agent.name,
            original_score=result.original_score,
            final_score=result.final_score,
            improvement=result.improvement,
            total_iterations=result.total_iterations,
            valset_score=valset_score,
            trainset_score=trainset_score,
            components=resolved_components,
        )

        # Return result with validated evolved_components and valset_score
        # (creates new instance since frozen)
        return EvolutionResult(
            original_score=result.original_score,
            final_score=result.final_score,
            evolved_components=validated_components,
            iteration_history=result.iteration_history,
            total_iterations=result.total_iterations,
            valset_score=valset_score,
            trainset_score=trainset_score,
        )
    finally:
        # Clean up adapter resources (clears handler constraints)
        adapter.cleanup()

run_sync

run_sync(coro: Coroutine[Any, Any, _T]) -> _T

Run an async coroutine synchronously and return its result.

Universal sync wrapper that accepts any coroutine (e.g., evolve(), evolve_group(), evolve_workflow()) and runs it in a blocking manner. Uses asyncio.run() as the primary mechanism, with nest_asyncio as a fallback for environments with a running event loop. The fallback saves and restores the original event loop to avoid polluting the event loop policy state.

PARAMETER DESCRIPTION
coro

A coroutine object to execute (e.g., evolve(agent, trainset)). Must be a coroutine, not a function or other awaitable.

TYPE: Coroutine[Any, Any, _T]

RETURNS DESCRIPTION
_T

The result of the coroutine execution. The return type matches

_T

the coroutine's return type (e.g., EvolutionResult for evolve(),

_T

MultiAgentEvolutionResult for evolve_group()).

RAISES DESCRIPTION
TypeError

If coro is not a coroutine object.

RuntimeError

If a running event loop is detected and nest_asyncio is not installed.

Examples:

Single-agent evolution:

from gepa_adk import run_sync, evolve

result = run_sync(evolve(agent, trainset=trainset))

Multi-agent group evolution:

from gepa_adk import run_sync, evolve_group

result = run_sync(evolve_group(agents, "primary", trainset=trainset))
Note

In Jupyter notebooks or IPython, the event loop is already running. Use await evolve(...) directly instead of run_sync(evolve(...)). The nest_asyncio fallback may work but await is preferred.

Source code in src/gepa_adk/api.py
def run_sync(coro: Coroutine[Any, Any, _T]) -> _T:
    """Run an async coroutine synchronously and return its result.

    Universal sync wrapper that accepts any coroutine (e.g., evolve(),
    evolve_group(), evolve_workflow()) and runs it in a blocking manner.
    Uses ``asyncio.run()`` as the primary mechanism, with ``nest_asyncio``
    as a fallback for environments with a running event loop.  The fallback
    saves and restores the original event loop to avoid polluting the
    event loop policy state.

    Args:
        coro: A coroutine object to execute (e.g., ``evolve(agent, trainset)``).
            Must be a coroutine, not a function or other awaitable.

    Returns:
        The result of the coroutine execution. The return type matches
        the coroutine's return type (e.g., EvolutionResult for evolve(),
        MultiAgentEvolutionResult for evolve_group()).

    Raises:
        TypeError: If ``coro`` is not a coroutine object.
        RuntimeError: If a running event loop is detected and ``nest_asyncio``
            is not installed.

    Examples:
        Single-agent evolution:

        ```python
        from gepa_adk import run_sync, evolve

        result = run_sync(evolve(agent, trainset=trainset))
        ```

        Multi-agent group evolution:

        ```python
        from gepa_adk import run_sync, evolve_group

        result = run_sync(evolve_group(agents, "primary", trainset=trainset))
        ```

    Note:
        In Jupyter notebooks or IPython, the event loop is already running.
        Use ``await evolve(...)`` directly instead of ``run_sync(evolve(...))``.
        The ``nest_asyncio`` fallback may work but ``await`` is preferred.
    """
    import asyncio

    if not asyncio.iscoroutine(coro):
        raise TypeError(
            f"run_sync() requires a coroutine object, got {type(coro).__name__}. "
            "Call the async function first: run_sync(evolve(...)) not run_sync(evolve)"
        )

    try:
        return asyncio.run(coro)
    except RuntimeError as e:
        if "asyncio.run() cannot be called from a running event loop" in str(e):
            try:
                import nest_asyncio

                nest_asyncio.apply()
                try:
                    old_loop = asyncio.get_running_loop()
                except RuntimeError:
                    old_loop = None
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
                try:
                    return loop.run_until_complete(coro)
                finally:
                    loop.close()
                    if old_loop is not None:
                        asyncio.set_event_loop(old_loop)
            except ImportError:
                raise RuntimeError(
                    "A running event loop was detected and nest_asyncio is not "
                    "installed. Install it with: uv add nest-asyncio\n"
                    "Alternatively, use 'await evolve(...)' directly in async "
                    "contexts like Jupyter notebooks."
                ) from e
        raise

evolve_sync

evolve_sync(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    **kwargs: Any,
) -> EvolutionResult

Synchronous wrapper for evolve().

.. deprecated:: Use run_sync(evolve(agent, trainset, ...)) instead.

Runs the async evolve() function in a blocking manner. Handles nested event loops automatically.

PARAMETER DESCRIPTION
agent

The ADK LlmAgent to evolve.

TYPE: LlmAgent

trainset

Training examples.

TYPE: list[dict[str, Any]]

**kwargs

Optional keyword arguments passed to evolve().

TYPE: Any DEFAULT: {}

PARAMETER DESCRIPTION
valset

Optional validation examples for held-out evaluation.

TYPE: list[dict[str, Any]] | None

critic

Optional ADK agent for scoring.

TYPE: LlmAgent | None

reflection_agent

Optional ADK agent for proposals (not yet implemented).

TYPE: LlmAgent | None

config

EvolutionConfig for customizing evolution parameters.

TYPE: EvolutionConfig | None

trajectory_config

TrajectoryConfig for trace capture settings.

TYPE: TrajectoryConfig | None

state_guard

Optional state token preservation settings.

TYPE: StateGuard | None

candidate_selector

Optional selector instance or selector name.

TYPE: CandidateSelectorProtocol | str | None

executor

Optional unified agent executor for consistent session management across all agent types.

TYPE: AgentExecutorProtocol | None

RETURNS DESCRIPTION
EvolutionResult

EvolutionResult with evolved_components dict and metrics.

RAISES DESCRIPTION
ConfigurationError

If invalid parameters provided.

EvolutionError

If evolution fails during execution.

WARNS DESCRIPTION
DeprecationWarning

Always emitted when called. Use run_sync(evolve(...)) instead.

Examples:

Basic usage in a script:

from pydantic import BaseModel, Field
from google.adk.agents import LlmAgent
from gepa_adk import evolve_sync


class OutputSchema(BaseModel):
    answer: str
    score: float = Field(ge=0.0, le=1.0)


agent = LlmAgent(
    name="assistant",
    model="gemini-2.5-flash",
    instruction="You are a helpful assistant.",
    output_schema=OutputSchema,
)

trainset = [
    {"input": "What is 2+2?", "expected": "4"},
]

result = evolve_sync(agent, trainset)
print(f"Evolved: {result.evolved_components['instruction']}")

With configuration:

from gepa_adk import evolve_sync, EvolutionConfig

config = EvolutionConfig(max_iterations=50)
result = evolve_sync(agent, trainset, config=config)
Note

Deprecated. Use run_sync(evolve(agent, trainset, ...)) instead. run_sync is a universal wrapper that works with all async evolution functions.

Source code in src/gepa_adk/api.py
def evolve_sync(
    agent: LlmAgent,
    trainset: list[dict[str, Any]],
    **kwargs: Any,
) -> EvolutionResult:
    """Synchronous wrapper for evolve().

    .. deprecated::
        Use ``run_sync(evolve(agent, trainset, ...))`` instead.

    Runs the async evolve() function in a blocking manner.
    Handles nested event loops automatically.

    Args:
        agent: The ADK LlmAgent to evolve.
        trainset: Training examples.
        **kwargs: Optional keyword arguments passed to evolve().

    Other Parameters:
        valset (list[dict[str, Any]] | None): Optional validation examples for
            held-out evaluation.
        critic (LlmAgent | None): Optional ADK agent for scoring.
        reflection_agent (LlmAgent | None): Optional ADK agent for proposals
            (not yet implemented).
        config (EvolutionConfig | None): EvolutionConfig for customizing
            evolution parameters.
        trajectory_config (TrajectoryConfig | None): TrajectoryConfig for trace
            capture settings.
        state_guard (StateGuard | None): Optional state token preservation
            settings.
        candidate_selector (CandidateSelectorProtocol | str | None): Optional
            selector instance or selector name.
        executor (AgentExecutorProtocol | None): Optional unified agent executor
            for consistent session management across all agent types.

    Returns:
        EvolutionResult with evolved_components dict and metrics.

    Raises:
        ConfigurationError: If invalid parameters provided.
        EvolutionError: If evolution fails during execution.

    Warns:
        DeprecationWarning: Always emitted when called. Use
            ``run_sync(evolve(...))`` instead.

    Examples:
        Basic usage in a script:

        ```python
        from pydantic import BaseModel, Field
        from google.adk.agents import LlmAgent
        from gepa_adk import evolve_sync


        class OutputSchema(BaseModel):
            answer: str
            score: float = Field(ge=0.0, le=1.0)


        agent = LlmAgent(
            name="assistant",
            model="gemini-2.5-flash",
            instruction="You are a helpful assistant.",
            output_schema=OutputSchema,
        )

        trainset = [
            {"input": "What is 2+2?", "expected": "4"},
        ]

        result = evolve_sync(agent, trainset)
        print(f"Evolved: {result.evolved_components['instruction']}")
        ```

        With configuration:

        ```python
        from gepa_adk import evolve_sync, EvolutionConfig

        config = EvolutionConfig(max_iterations=50)
        result = evolve_sync(agent, trainset, config=config)
        ```

    Note:
        Deprecated. Use ``run_sync(evolve(agent, trainset, ...))`` instead.
        ``run_sync`` is a universal wrapper that works with all async
        evolution functions.
    """
    import warnings

    warnings.warn(
        "evolve_sync() is deprecated, use run_sync(evolve(...)) instead",
        DeprecationWarning,
        stacklevel=2,
    )
    return run_sync(evolve(agent, trainset, **kwargs))