Ports
ports ¶
Protocol and interface definitions for external integrations.
This package defines the ports layer of the hexagonal architecture, providing protocol interfaces that adapters implement to integrate with external systems. All protocols follow async-first design principles and support runtime checking.
| ATTRIBUTE | DESCRIPTION |
|---|---|
AgentProvider | Protocol for loading and persisting agents. TYPE: |
AsyncGEPAAdapter | Async adapter contract for evaluations. TYPE: |
EvaluationBatch | Container for evaluation outputs and scores. TYPE: |
Scorer | Protocol for scoring agent outputs. TYPE: |
ProposerProtocol | Protocol for candidate proposal strategies. TYPE: |
AgentExecutorProtocol | Protocol for unified agent execution. TYPE: |
ExecutionResult | Result of an agent execution. TYPE: |
ExecutionStatus | Status of agent execution. TYPE: |
DataInst | Type variable for input instances. TYPE: |
Trajectory | Type variable for execution traces. TYPE: |
RolloutOutput | Type variable for evaluation outputs. TYPE: |
Examples:
Import the agent provider protocol:
Import the adapter protocol:
Import the scorer protocol:
Import the proposer protocol:
Import the agent executor protocol:
See Also
gepa_adk.ports.agent_provider: Agent provider protocol for loading and persisting agents.gepa_adk.ports.adapter: Async adapter protocol and types.gepa_adk.ports.scorer: Scorer protocol for custom scoring logic.gepa_adk.ports.proposer: Proposer protocol for candidate generation.gepa_adk.ports.agent_executor: Agent executor protocol for unified agent execution.
Note
This layer follows hexagonal architecture principles, defining ports that adapters implement to integrate with external systems.
ProposalResult dataclass ¶
Result of a successful proposal operation.
| ATTRIBUTE | DESCRIPTION |
|---|---|
candidate | The proposed candidate with components. TYPE: |
parent_indices | Indices of parent candidate(s) in ParetoState. TYPE: |
tag | Type of proposal ("mutation" or "merge"). TYPE: |
metadata | Additional proposal-specific metadata. TYPE: |
Examples:
Creating a mutation proposal result:
from gepa_adk.domain.types import ProposalResult
from gepa_adk.domain.models import Candidate
result = ProposalResult(
candidate=Candidate(components={"instruction": "Be helpful"}),
parent_indices=[5],
tag="mutation",
)
Creating a merge proposal result:
result = ProposalResult(
candidate=Candidate(components={"instruction": "..."}),
parent_indices=[5, 8],
tag="merge",
metadata={"ancestor_idx": 2},
)
Note
A frozen dataclass ensures immutability of proposal results. Parent indices must be valid indices into the ParetoState.candidates list.
Source code in src/gepa_adk/domain/types.py
AsyncGEPAAdapter ¶
Bases: Protocol[DataInst, Trajectory, RolloutOutput]
flowchart TD
gepa_adk.ports.AsyncGEPAAdapter[AsyncGEPAAdapter]
click gepa_adk.ports.AsyncGEPAAdapter href "" "gepa_adk.ports.AsyncGEPAAdapter"
Protocol for async GEPA adapters used by the evolution engine.
Implementations provide evaluation, reflection dataset generation, and proposal updates for candidate component texts.
Examples:
Implement a minimal adapter:
class MyAdapter:
async def evaluate(self, batch, candidate, capture_traces=False):
return EvaluationBatch(outputs=[], scores=[])
async def make_reflective_dataset(
self, candidate, eval_batch, components_to_update
):
return {component: [] for component in components_to_update}
async def propose_new_texts(
self, candidate, reflective_dataset, components_to_update
):
return {
component: candidate[component]
for component in components_to_update
}
Note
Adapters must implement all three async methods to satisfy the protocol. Use runtime_checkable for isinstance() checks.
Source code in src/gepa_adk/ports/adapter.py
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | |
evaluate async ¶
evaluate(
batch: list[DataInst],
candidate: dict[str, str],
capture_traces: bool = False,
) -> EvaluationBatch[Trajectory, RolloutOutput]
Evaluate a candidate over a batch of inputs.
| PARAMETER | DESCRIPTION |
|---|---|
batch | Input data instances to evaluate. TYPE: |
candidate | Component name to text mapping. TYPE: |
capture_traces | Whether to capture execution traces. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
EvaluationBatch[Trajectory, RolloutOutput] | Evaluation results with outputs, scores, and optional traces. |
Examples:
Basic evaluation:
Note
Output and score lists must have the same length as the input batch. Set capture_traces=True to enable reflection.
Source code in src/gepa_adk/ports/adapter.py
make_reflective_dataset async ¶
make_reflective_dataset(
candidate: dict[str, str],
eval_batch: EvaluationBatch[Trajectory, RolloutOutput],
components_to_update: list[str],
) -> Mapping[str, Sequence[Mapping[str, Any]]]
Build reflective datasets from evaluation traces.
| PARAMETER | DESCRIPTION |
|---|---|
candidate | Current candidate components. TYPE: |
eval_batch | Evaluation results with traces. TYPE: |
components_to_update | Components to generate datasets for. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
Mapping[str, Sequence[Mapping[str, Any]]] | Mapping of component name to reflective examples. |
Examples:
Build datasets for specific components:
Note
Only call this method when eval_batch contains trajectories. Each component receives its own list of reflective examples.
Source code in src/gepa_adk/ports/adapter.py
propose_new_texts async ¶
propose_new_texts(
candidate: dict[str, str],
reflective_dataset: Mapping[
str, Sequence[Mapping[str, Any]]
],
components_to_update: list[str],
) -> dict[str, str]
Propose updated component texts from reflective datasets.
| PARAMETER | DESCRIPTION |
|---|---|
candidate | Current candidate components. TYPE: |
reflective_dataset | Reflective examples per component. TYPE: |
components_to_update | Components to propose updates for. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
dict[str, str] | Mapping of component name to new proposed text. |
Examples:
Generate new component texts:
Note
Outputs should contain improved text for each requested component. The evolution engine uses these as mutation candidates.
Source code in src/gepa_adk/ports/adapter.py
EvaluationBatch dataclass ¶
Bases: Generic[Trajectory, RolloutOutput]
flowchart TD
gepa_adk.ports.EvaluationBatch[EvaluationBatch]
click gepa_adk.ports.EvaluationBatch href "" "gepa_adk.ports.EvaluationBatch"
Container for evaluation outputs and scores.
| ATTRIBUTE | DESCRIPTION |
|---|---|
outputs | Per-example outputs produced during evaluation. TYPE: |
scores | Per-example normalized scores (higher is better). TYPE: |
trajectories | Optional per-example execution traces. TYPE: |
objective_scores | Optional multi-objective scores per example. TYPE: |
metadata | Optional per-example scorer metadata. When provided, metadata[i] corresponds to outputs[i] and scores[i] (index-aligned). Metadata dicts may contain scorer-specific fields like 'feedback', 'actionable_guidance', or 'dimension_scores' from CriticScorer implementations. TYPE: |
inputs | Optional per-example input text that was used to generate each output. Used by make_reflective_dataset to provide context for reflection. When provided, inputs[i] corresponds to the input that produced outputs[i]. TYPE: |
Examples:
Create a batch with optional traces:
batch = EvaluationBatch(
outputs=["ok", "ok"],
scores=[0.9, 0.8],
trajectories=[{"trace": 1}, {"trace": 2}],
)
Create a batch with inputs for reflection:
batch = EvaluationBatch(
outputs=["Hello!", "Good morrow!"],
scores=[0.3, 0.9],
inputs=["I am the King", "I am your friend"],
)
Note
All fields are immutable once created due to frozen=True. Use this as the standard return type from adapter evaluations. When metadata is not None, len(metadata) must equal len(outputs) and len(scores).
Source code in src/gepa_adk/ports/adapter.py
AgentExecutorProtocol ¶
Bases: Protocol
flowchart TD
gepa_adk.ports.AgentExecutorProtocol[AgentExecutorProtocol]
click gepa_adk.ports.AgentExecutorProtocol href "" "gepa_adk.ports.AgentExecutorProtocol"
Protocol for unified agent execution.
Defines the interface for executing any ADK agent type (LlmAgent, workflow agents) with consistent behavior, session management, and result handling.
This protocol enables: - Feature parity across generator, critic, and reflection agents - Dependency injection for testing - Single point of change for new ADK features
Examples:
Using an executor:
executor: AgentExecutorProtocol = AgentExecutor()
result = await executor.execute_agent(
agent=my_agent,
input_text="Hello",
timeout_seconds=60,
)
if result.status == ExecutionStatus.SUCCESS:
print(result.extracted_value)
With instruction override (for evolution):
result = await executor.execute_agent(
agent=my_agent,
input_text="Hello",
instruction_override="You are a helpful assistant.",
)
See Also
ExecutionResult: Return type for execute_agent.ExecutionStatus: Status enum for execution outcomes.
Note
All implementations should validate that the agent parameter is a valid ADK LlmAgent. The Any type is used here to avoid coupling the ports layer to ADK types.
Source code in src/gepa_adk/ports/agent_executor.py
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 | |
execute_agent async ¶
execute_agent(
agent: Any,
input_text: str,
*,
input_content: Any | None = None,
instruction_override: str | None = None,
output_schema_override: Any | None = None,
session_state: dict[str, Any] | None = None,
existing_session_id: str | None = None,
timeout_seconds: int = 300,
) -> ExecutionResult
Execute an agent and return structured result.
Runs the specified agent with the given input, optionally applying instruction or schema overrides for evolution scenarios. Manages session lifecycle and captures execution events.
| PARAMETER | DESCRIPTION |
|---|---|
agent | ADK LlmAgent to execute. The agent's tools, output_key, and other ADK features are preserved during execution. TYPE: |
input_text | User message to send to the agent. Used as fallback if input_content is not provided. TYPE: |
input_content | Optional ADK Content object for multimodal input (e.g., video parts). If provided, takes precedence over input_text. TYPE: |
instruction_override | If provided, replaces the agent's instruction for this execution only. Original agent is not modified. TYPE: |
output_schema_override | If provided, replaces the agent's output schema for this execution only (type[BaseModel]). Used for schema evolution. TYPE: |
session_state | Initial state to inject into the session. Used for template variable substitution (e.g., {component_text}). TYPE: |
existing_session_id | If provided, reuses an existing session instead of creating a new one. Useful for critic accessing generator state. TYPE: |
timeout_seconds | Maximum execution time in seconds. Defaults to 300. Execution terminates with TIMEOUT status if exceeded. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
ExecutionResult | ExecutionResult with status, output, and debugging information. |
| RAISES | DESCRIPTION |
|---|---|
SessionNotFoundError | If existing_session_id is provided but session does not exist. |
Examples:
Basic execution:
result = await executor.execute_agent(
agent=greeter,
input_text="Hello!",
)
print(result.extracted_value) # "Hello! How can I help?"
With session state for reflection:
result = await executor.execute_agent(
agent=reflector,
input_text="Improve the instruction",
session_state={
"component_text": "Be helpful.",
"trials": '[{"score": 0.5}]',
},
)
Session sharing between agents:
# Generator creates session
gen_result = await executor.execute_agent(
agent=generator,
input_text="Write a story.",
)
# Critic reuses generator's session
critic_result = await executor.execute_agent(
agent=critic,
input_text=f"Evaluate: {gen_result.extracted_value}",
existing_session_id=gen_result.session_id,
)
Note
The agent parameter is typed as Any to avoid coupling to ADK types in the ports layer. Implementations should validate that the agent is a valid LlmAgent.
Source code in src/gepa_adk/ports/agent_executor.py
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 | |
ExecutionResult dataclass ¶
Result of an agent execution.
Provides consistent return type across all agent types (generator, critic, reflection) with status, output, timing, and debugging information.
| ATTRIBUTE | DESCRIPTION |
|---|---|
status | Outcome status of the execution. TYPE: |
session_id | ADK session identifier used for this execution. TYPE: |
extracted_value | Output text extracted from agent response. None if agent produced no output or execution failed. TYPE: |
error_message | Error details if status is FAILED or TIMEOUT. None for successful executions. TYPE: |
execution_time_seconds | Duration of execution in seconds. TYPE: |
captured_events | ADK events captured during execution for debugging and trajectory analysis. None if event capture disabled. TYPE: |
Examples:
Successful execution:
result = ExecutionResult(
status=ExecutionStatus.SUCCESS,
session_id="sess_123",
extracted_value="Hello, world!",
execution_time_seconds=1.5,
)
Failed execution:
result = ExecutionResult(
status=ExecutionStatus.FAILED,
session_id="sess_456",
error_message="Model rate limit exceeded",
execution_time_seconds=0.2,
)
Note
All captured_events contain raw ADK Event objects for debugging and trajectory recording. Events are captured even on timeout or failure.
Source code in src/gepa_adk/ports/agent_executor.py
ExecutionStatus ¶
Bases: str, Enum
flowchart TD
gepa_adk.ports.ExecutionStatus[ExecutionStatus]
click gepa_adk.ports.ExecutionStatus href "" "gepa_adk.ports.ExecutionStatus"
Status of agent execution.
This enum represents the outcome status of an agent execution, used by ExecutionResult to indicate how the execution completed.
| ATTRIBUTE | DESCRIPTION |
|---|---|
SUCCESS | Agent completed execution normally with valid output.
|
FAILED | Agent encountered an error during execution.
|
TIMEOUT | Agent execution exceeded configured timeout.
|
Examples:
Checking execution status:
from gepa_adk.ports.agent_executor import ExecutionStatus
if result.status == ExecutionStatus.SUCCESS:
print(f"Output: {result.extracted_value}")
elif result.status == ExecutionStatus.TIMEOUT:
print(f"Timed out after {result.execution_time_seconds}s")
elif result.status == ExecutionStatus.FAILED:
print(f"Error: {result.error_message}")
Note
All status values reflect execution outcome, not output quality. A successful execution may still produce low-quality output that fails scoring.
Source code in src/gepa_adk/ports/agent_executor.py
AgentProvider ¶
Bases: Protocol
flowchart TD
gepa_adk.ports.AgentProvider[AgentProvider]
click gepa_adk.ports.AgentProvider href "" "gepa_adk.ports.AgentProvider"
Protocol for loading and persisting agents.
Implementations provide agent configuration storage and retrieval, enabling the evolution system to load agents and persist evolved instructions.
Examples:
Implement a minimal in-memory provider:
class InMemoryProvider:
def __init__(self):
self._agents = {}
def get_agent(self, name: str) -> LlmAgent:
if not name:
raise ValueError("Agent name cannot be empty")
if name not in self._agents:
raise KeyError(f"Agent not found: {name}")
return self._agents[name]
def save_instruction(self, name: str, instruction: str) -> None:
if not name:
raise ValueError("Agent name cannot be empty")
if name not in self._agents:
raise KeyError(f"Agent not found: {name}")
self._agents[name].instruction = instruction
def list_agents(self) -> list[str]:
return list(self._agents.keys())
Verify protocol compliance:
from gepa_adk.ports import AgentProvider
provider = InMemoryProvider()
assert isinstance(provider, AgentProvider) # Runtime check works
Note
All implementations must provide get_agent(), save_instruction(), and list_agents() methods. Use @runtime_checkable to enable isinstance() checks for protocol compliance.
Source code in src/gepa_adk/ports/agent_provider.py
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 | |
get_agent ¶
Load an agent by its unique name.
| PARAMETER | DESCRIPTION |
|---|---|
name | The unique identifier for the agent. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
'LlmAgent' | The configured LlmAgent instance ready for use. |
| RAISES | DESCRIPTION |
|---|---|
KeyError | If no agent with the given name exists. |
ValueError | If name is empty or invalid. |
Examples:
Load a named agent:
Note
Only non-empty names are accepted. Configured agents ready for use with the evolution system should be returned.
Source code in src/gepa_adk/ports/agent_provider.py
save_instruction ¶
Persist an evolved instruction for a named agent.
| PARAMETER | DESCRIPTION |
|---|---|
name | The unique identifier for the agent. TYPE: |
instruction | The new instruction text to persist. TYPE: |
| RAISES | DESCRIPTION |
|---|---|
KeyError | If no agent with the given name exists. |
ValueError | If name is empty or invalid. |
IOError | If persistence fails (implementation-specific). |
Examples:
Save an evolved instruction:
Note
Only after successful persistence should subsequent calls to get_agent() return an agent with the updated instruction. Implementations should validate empty names before processing.
Source code in src/gepa_adk/ports/agent_provider.py
list_agents ¶
List all available agent names.
| RETURNS | DESCRIPTION |
|---|---|
list[str] | A list of agent name strings. Empty list if no agents. |
Examples:
Discover available agents:
Note
Ordering of returned names is not guaranteed by the protocol. Some implementations may return names in a specific order, but callers should not rely upon any particular sequence.
Source code in src/gepa_adk/ports/agent_provider.py
ComponentHandler ¶
Bases: Protocol
flowchart TD
gepa_adk.ports.ComponentHandler[ComponentHandler]
click gepa_adk.ports.ComponentHandler href "" "gepa_adk.ports.ComponentHandler"
Protocol for component serialization and application.
Handles the serialize/apply/restore cycle for one component type during evolution. Implementations must be stateless and thread-safe.
The protocol defines three operations: 1. serialize: Extract current component value as string 2. apply: Set new value, return original for restoration 3. restore: Reinstate original value after evaluation
Examples:
Implement a custom handler:
class TemperatureHandler:
def serialize(self, agent: LlmAgent) -> str:
config = getattr(agent, "generate_content_config", None)
if config and hasattr(config, "temperature"):
return str(config.temperature)
return "1.0"
def apply(self, agent: LlmAgent, value: str) -> Any:
config = getattr(agent, "generate_content_config", None)
original = config.temperature if config else 1.0
if config:
config.temperature = float(value)
return original
def restore(self, agent: LlmAgent, original: Any) -> None:
config = getattr(agent, "generate_content_config", None)
if config:
config.temperature = original
See Also
InstructionHandler: Handler for agent instruction.OutputSchemaHandler: Handler for agent output schema.
Note
All methods are synchronous - no I/O operations should be performed. Apply() should log warnings and keep the original value rather than raising exceptions on invalid inputs for error safety.
Source code in src/gepa_adk/ports/component_handler.py
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | |
serialize ¶
Extract component value from agent as string for evolution.
| PARAMETER | DESCRIPTION |
|---|---|
agent | The LlmAgent instance to extract component from. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
str | String representation of the component value. |
str | Returns empty string if component is not set. |
Examples:
handler = InstructionHandler()
text = handler.serialize(agent)
# text == "You are a helpful assistant."
Note
Operations must never raise exceptions for missing values. Return empty string or sensible default instead.
Source code in src/gepa_adk/ports/component_handler.py
apply ¶
Apply evolved value to agent, return original for restore.
| PARAMETER | DESCRIPTION |
|---|---|
agent | The LlmAgent instance to modify. TYPE: |
value | The new component value as string. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
Any | The original component value (type depends on component). |
Any | This value will be passed to restore() later. |
Examples:
handler = InstructionHandler()
original = handler.apply(agent, "New instruction")
# agent.instruction is now "New instruction"
# original contains previous instruction value
Note
On application failure (e.g., invalid schema), log warning and return original without modifying agent. Never raise exceptions - graceful degradation is required.
Source code in src/gepa_adk/ports/component_handler.py
restore ¶
Restore original value after evaluation.
| PARAMETER | DESCRIPTION |
|---|---|
agent | The LlmAgent instance to restore. TYPE: |
original | The original value returned by apply(). TYPE: |
Examples:
handler = InstructionHandler()
original = handler.apply(agent, "Temp instruction")
# ... run evaluation ...
handler.restore(agent, original)
# agent.instruction is back to original value
Note
Original value restoration always succeeds - never raises exceptions. None values reset to component default.
Source code in src/gepa_adk/ports/component_handler.py
ProposerProtocol ¶
Bases: Protocol
flowchart TD
gepa_adk.ports.ProposerProtocol[ProposerProtocol]
click gepa_adk.ports.ProposerProtocol href "" "gepa_adk.ports.ProposerProtocol"
Protocol for candidate proposal strategies.
Proposers generate new candidates for evolution. The two main implementations are mutation-based (reflective improvement) and merge-based (genetic crossover).
Note
Attributes are not required by this protocol. Implementations may define configuration attributes as needed.
Examples:
class MyProposer:
async def propose(
self,
state: ParetoState,
eval_batch: EvaluationBatch | None = None,
) -> ProposalResult | None:
# Generate proposal
return ProposalResult(candidate=..., parent_indices=[...], tag="custom")
Note
All proposers return None when no valid proposal can be generated.
Source code in src/gepa_adk/ports/proposer.py
propose async ¶
propose(
state: ParetoState,
eval_batch: EvaluationBatch | None = None,
) -> ProposalResult | None
Propose a new candidate based on current evolution state.
| PARAMETER | DESCRIPTION |
|---|---|
state | Current Pareto state with candidates and frontier. Contains the current evolution state including all discovered candidates, their scores, and the Pareto frontier for selection. TYPE: |
eval_batch | Optional evaluation batch for reflective proposals. Used by mutation-based proposers to generate reflective datasets. Ignored by merge-based proposers which operate on existing candidates. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
ProposalResult | None | ProposalResult | None: ProposalResult containing the proposed candidate and metadata, |
ProposalResult | None | or None if no proposal is possible (e.g., no suitable candidates, insufficient |
ProposalResult | None | frontier, or proposal generation failed). |
Examples:
Implementing a custom proposer:
class MyProposer:
async def propose(
self,
state: ParetoState,
eval_batch: EvaluationBatch | None = None,
) -> ProposalResult | None:
# Generate proposal logic
return ProposalResult(
candidate=Candidate(components={"instruction": "..."}),
parent_indices=[5],
tag="mutation",
)
Note
Operations must be idempotent and not modify state. The method should be safe to call multiple times with the same state without side effects.
Source code in src/gepa_adk/ports/proposer.py
Scorer ¶
Bases: Protocol
flowchart TD
gepa_adk.ports.Scorer[Scorer]
click gepa_adk.ports.Scorer href "" "gepa_adk.ports.Scorer"
Protocol for scoring agent outputs.
Implementations provide scoring logic that evaluates how well an agent's output matches expected results or quality criteria.
Both synchronous and asynchronous methods are defined. Implementations should provide both, though callers may use only one based on context.
Examples:
Implement a simple fixed scorer for testing:
class FixedScorer:
def score(
self,
input_text: str,
output: str,
expected: str | None = None,
) -> tuple[float, dict]:
return 0.5, {"note": "Fixed score for testing"}
async def async_score(
self,
input_text: str,
output: str,
expected: str | None = None,
) -> tuple[float, dict]:
return self.score(input_text, output, expected)
Verify protocol compliance:
from gepa_adk.ports import Scorer
scorer = FixedScorer()
assert isinstance(scorer, Scorer) # Runtime check works
Note
All implementations must provide both score() and async_score() methods to satisfy the protocol. Score values should be normalized between 0.0 and 1.0 by convention, with higher values indicating better performance. The protocol does not enforce this range.
Source code in src/gepa_adk/ports/scorer.py
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | |
score ¶
Score an agent output synchronously.
| PARAMETER | DESCRIPTION |
|---|---|
input_text | The input provided to the agent. TYPE: |
output | The agent's generated output to score. TYPE: |
expected | Optional expected/reference output for comparison. Pass None for open-ended evaluation without expected output. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
float | A tuple of (score, metadata) where: |
dict[str, Any] |
|
tuple[float, dict[str, Any]] |
|
Examples:
Basic usage:
score, meta = scorer.score("What is 2+2?", "4", "4")
assert score == 1.0
assert meta.get("exact_match") is True
Scoring without expected output:
score, meta = scorer.score("Explain gravity", response)
# Scorer evaluates based on quality criteria, not exact match
Note
Operations complete synchronously and block until scoring finishes. Use async_score() for I/O-bound operations like LLM calls.
Source code in src/gepa_adk/ports/scorer.py
async_score async ¶
async_score(
input_text: str,
output: str,
expected: str | None = None,
) -> tuple[float, dict[str, Any]]
Score an agent output asynchronously.
| PARAMETER | DESCRIPTION |
|---|---|
input_text | The input provided to the agent. TYPE: |
output | The agent's generated output to score. TYPE: |
expected | Optional expected/reference output for comparison. Pass None for open-ended evaluation without expected output. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
float | A tuple of (score, metadata) where: |
dict[str, Any] |
|
tuple[float, dict[str, Any]] |
|
Examples:
Async usage:
score, meta = await scorer.async_score("Explain gravity", response)
print(f"Quality: {score:.2f} - {meta.get('feedback')}")
Concurrent scoring:
import asyncio
tasks = [
scorer.async_score(input, output, expected)
for input, output, expected in batch
]
scores = await asyncio.gather(*tasks)
Note
Operations run asynchronously and can be executed concurrently. Prefer this method for I/O-bound scoring operations such as LLM-based evaluation or external API calls.
Source code in src/gepa_adk/ports/scorer.py
CandidateSelectorProtocol ¶
Bases: Protocol
flowchart TD
gepa_adk.ports.CandidateSelectorProtocol[CandidateSelectorProtocol]
click gepa_adk.ports.CandidateSelectorProtocol href "" "gepa_adk.ports.CandidateSelectorProtocol"
Async protocol for candidate selection strategies.
Note
Adapters implementing this protocol provide strategies for selecting candidates from the Pareto frontier for mutation.
Examples:
Source code in src/gepa_adk/ports/selector.py
select_candidate async ¶
select_candidate(state: ParetoState) -> int
Select a candidate index for mutation.
| PARAMETER | DESCRIPTION |
|---|---|
state | Current evolution state with Pareto frontier tracking. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
int | Index of selected candidate. |
| RAISES | DESCRIPTION |
|---|---|
NoCandidateAvailableError | If state has no candidates. |
Note
Outputs a candidate index from the frontier for mutation, enabling Pareto-aware selection strategies.
Examples:
Source code in src/gepa_adk/ports/selector.py
ComponentSelectorProtocol ¶
Bases: Protocol
flowchart TD
gepa_adk.ports.ComponentSelectorProtocol[ComponentSelectorProtocol]
click gepa_adk.ports.ComponentSelectorProtocol href "" "gepa_adk.ports.ComponentSelectorProtocol"
Async protocol for component selection strategies.
Note
Adapters implementing this protocol determine which candidate components to update during mutation, enabling flexible evolution strategies.
Examples:
class MySelector:
async def select_components(
self, components: list[str], iteration: int, candidate_idx: int
) -> list[str]:
return components[:1]
Source code in src/gepa_adk/ports/selector.py
select_components async ¶
Select components to update for the current iteration.
| PARAMETER | DESCRIPTION |
|---|---|
components | List of available component keys (e.g. ["instruction", "input_schema"]). TYPE: |
iteration | Current global iteration number (0-based). TYPE: |
candidate_idx | Index of the candidate being evolved. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
list[str] | List of component keys to update. |
| RAISES | DESCRIPTION |
|---|---|
ValueError | If components list is empty. |
Note
Outputs a list of component keys to update, enabling selective mutation of specific candidate components.
Examples:
selected = await selector.select_components(
components=["instruction", "schema"], iteration=1, candidate_idx=0
)
Source code in src/gepa_adk/ports/selector.py
EvaluationPolicyProtocol ¶
Bases: Protocol
flowchart TD
gepa_adk.ports.EvaluationPolicyProtocol[EvaluationPolicyProtocol]
click gepa_adk.ports.EvaluationPolicyProtocol href "" "gepa_adk.ports.EvaluationPolicyProtocol"
Protocol for valset evaluation strategies.
Determines which validation examples to evaluate per iteration and how to identify the best candidate based on evaluation results.
Note
Adapters implementing this protocol control evaluation cost and candidate selection strategies for scalable evolution runs.
Examples:
class MyPolicy:
def get_eval_batch(
self, valset_ids: Sequence[int], state: ParetoState
) -> list[int]:
return list(valset_ids)
def get_best_candidate(self, state: ParetoState) -> int:
return 0
def get_valset_score(self, candidate_idx: int, state: ParetoState) -> float:
return 0.5
Source code in src/gepa_adk/ports/selector.py
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | |
get_eval_batch ¶
get_eval_batch(
valset_ids: Sequence[int],
state: ParetoState,
target_candidate_idx: int | None = None,
) -> list[int]
Select validation example indices to evaluate.
| PARAMETER | DESCRIPTION |
|---|---|
valset_ids | All available validation example indices (0 to N-1). TYPE: |
state | Current evolution state with candidate scores. TYPE: |
target_candidate_idx | Optional candidate being evaluated (for adaptive policies). TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
list[int] | List of example indices to evaluate this iteration. |
list[int] | Must be a subset of valset_ids (or equal). |
Note
Outputs a list of valset indices to evaluate, which may be a subset or the full valset depending on the policy implementation.
Examples:
batch = policy.get_eval_batch([0, 1, 2, 3, 4], state)
# Returns: [0, 1, 2, 3, 4] for full evaluation
Source code in src/gepa_adk/ports/selector.py
get_best_candidate ¶
get_best_candidate(state: ParetoState) -> int
Return index of best candidate based on evaluation results.
| PARAMETER | DESCRIPTION |
|---|---|
state | Current evolution state with candidate_scores populated. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
int | Index of best performing candidate. |
| RAISES | DESCRIPTION |
|---|---|
NoCandidateAvailableError | If state has no candidates. |
Note
Outputs the index of the best candidate based on the policy's scoring strategy (e.g., highest average score).
Examples:
Source code in src/gepa_adk/ports/selector.py
get_valset_score ¶
get_valset_score(
candidate_idx: int, state: ParetoState
) -> float
Return aggregated valset score for a candidate.
| PARAMETER | DESCRIPTION |
|---|---|
candidate_idx | Index of candidate to score. TYPE: |
state | Current evolution state. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
float | Aggregated score (typically mean across evaluated examples). |
float | Returns float('-inf') if candidate has no evaluated scores. |
Note
Outputs an aggregated score for the candidate, typically the mean across evaluated examples, or negative infinity if no scores exist.
Examples:
Source code in src/gepa_adk/ports/selector.py
StopperProtocol ¶
Bases: Protocol
flowchart TD
gepa_adk.ports.StopperProtocol[StopperProtocol]
click gepa_adk.ports.StopperProtocol href "" "gepa_adk.ports.StopperProtocol"
Protocol for stop condition objects.
A stopper is a callable that returns True when evolution should stop. Stoppers receive a StopperState snapshot of current evolution progress.
The protocol uses structural typing - any callable with the correct signature satisfies it, no explicit inheritance required.
Examples:
Class-based stopper:
from gepa_adk.ports.stopper import StopperProtocol
from gepa_adk.domain.stopper import StopperState
class TimeoutStopper:
def __init__(self, max_seconds: float) -> None:
self.max_seconds = max_seconds
def __call__(self, state: StopperState) -> bool:
return state.elapsed_seconds >= self.max_seconds
stopper = TimeoutStopper(3600.0) # 1 hour
isinstance(stopper, StopperProtocol) # True
Function-based stopper:
def score_threshold_stopper(state: StopperState) -> bool:
return state.best_score >= 0.95
isinstance(score_threshold_stopper, StopperProtocol) # True
Note
All stoppers should be pure functions of their input state - they ought not have side effects or depend on external mutable state for deterministic behavior.
Source code in src/gepa_adk/ports/stopper.py
__call__ ¶
__call__(state: StopperState) -> bool
Check if evolution should stop.
| PARAMETER | DESCRIPTION |
|---|---|
state | Current evolution state snapshot containing iteration count, best score, stagnation counter, and other metrics. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
bool | True if evolution should stop, False to continue. |
Note
Often this method is called after each iteration. Return True as soon as the stop condition is met to avoid unnecessary computation.
Source code in src/gepa_adk/ports/stopper.py
VideoBlobServiceProtocol ¶
Bases: Protocol
flowchart TD
gepa_adk.ports.VideoBlobServiceProtocol[VideoBlobServiceProtocol]
click gepa_adk.ports.VideoBlobServiceProtocol href "" "gepa_adk.ports.VideoBlobServiceProtocol"
Protocol for loading video files as multimodal content parts.
Implementations provide video file I/O and validation logic, converting video files to Part objects for multimodal agent input.
The protocol defines an async method for loading multiple videos and a sync method for validating individual files.
Examples:
Implement a video blob service:
class SimpleVideoBlobService:
async def prepare_video_parts(
self,
video_paths: list[str],
) -> list[Any]:
# Load videos and return Part objects
parts = []
for path in video_paths:
self.validate_video_file(path)
# ... load and create Part
return parts
def validate_video_file(
self,
video_path: str,
) -> VideoFileInfo:
# Validate and return metadata
...
Verify protocol compliance:
from gepa_adk.ports import VideoBlobServiceProtocol
service = SimpleVideoBlobService()
assert isinstance(service, VideoBlobServiceProtocol)
Note
All implementations must provide both prepare_video_parts() and validate_video_file() methods. Video size is limited to 2GB per the Gemini API constraint. Only video/* MIME types are accepted.
Source code in src/gepa_adk/ports/video_blob_service.py
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 | |
prepare_video_parts async ¶
Load video files and create Part objects for multimodal content.
Reads video files from disk and converts them to Part objects suitable for inclusion in ADK Content. Files are validated before loading.
| PARAMETER | DESCRIPTION |
|---|---|
video_paths | List of absolute paths to video files. Must be non-empty. All paths must point to existing video files under 2GB with video/* MIME types. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
list[Any] | List of Part objects, one per input path. Order is preserved |
list[Any] | such that output[i] corresponds to video_paths[i]. Part objects |
list[Any] | contain inline video data with appropriate MIME type. |
| RAISES | DESCRIPTION |
|---|---|
ValueError | If video_paths is empty. |
VideoValidationError | If any file is not found, exceeds 2GB, or has a non-video MIME type. |
PermissionError | If any file cannot be read due to permissions. |
OSError | If any file cannot be read due to I/O errors. |
Examples:
Load a single video:
Load multiple videos preserving order:
paths = ["/data/intro.mp4", "/data/main.mp4", "/data/outro.mp4"]
parts = await service.prepare_video_parts(paths)
assert len(parts) == 3
# parts[0] is intro, parts[1] is main, parts[2] is outro
Note
Operations include async file I/O for reading video bytes. Video files are validated before loading to provide early failure with clear error messages.
Source code in src/gepa_adk/ports/video_blob_service.py
validate_video_file ¶
Validate a video file and return its metadata.
Checks that the file exists, is within size limits, and has a valid video MIME type. Returns metadata on success.
| PARAMETER | DESCRIPTION |
|---|---|
video_path | Absolute path to the video file to validate. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
'VideoFileInfo' | VideoFileInfo containing validated metadata: |
'VideoFileInfo' |
|
'VideoFileInfo' |
|
'VideoFileInfo' |
|
| RAISES | DESCRIPTION |
|---|---|
VideoValidationError | If the file does not exist, exceeds 2GB, or has a non-video MIME type. The exception includes the video_path and constraint fields for debugging. |
Examples:
Validate a video file:
info = service.validate_video_file("/data/lecture.mp4")
print(f"Size: {info.size_bytes} bytes")
print(f"Type: {info.mime_type}")
Handle validation errors:
from gepa_adk.domain.exceptions import VideoValidationError
try:
info = service.validate_video_file("/missing.mp4")
except VideoValidationError as e:
print(f"Invalid: {e.video_path} - {e.constraint}")
Note
Operates synchronously for fast pre-validation without async context. File content is not read, only metadata is checked.