Skip to content

Utils

utils

Utility functions for trajectory extraction and processing.

This module provides utilities for extracting, redacting, and truncating trajectory data from ADK agent execution events.

The primary function is extract_trajectory which orchestrates the complete extraction pipeline: raw data extraction → redaction → truncation → trajectory construction. Configuration is provided via TrajectoryConfig from the domain layer.

ATTRIBUTE DESCRIPTION
extract_trajectory

Main trajectory extraction API with configuration support for redaction and truncation.

TYPE: function

Examples:

Extract with default configuration (redaction + truncation enabled):

from gepa_adk.utils import extract_trajectory
from gepa_adk.domain.types import TrajectoryConfig

# Extract with defaults
trajectory = extract_trajectory(events, final_output="Response text")

# Custom configuration
config = TrajectoryConfig(
    include_tool_calls=True,
    redact_sensitive=True,
    sensitive_keys=("password", "api_key", "secret"),
    max_string_length=5000,
)
trajectory = extract_trajectory(events, config=config)
See Also
Note

These utilities are infrastructure concerns, not domain logic. They consume domain models but don't define them.

EncodingSafeProcessor

Sanitize strings for console encoding compatibility.

A structlog processor that ensures all string values in the event dict can be safely written to the console regardless of its encoding (cp1252 on Windows, UTF-8 on macOS/Linux).

The sanitization strategy: 1. Apply smart character replacements (preserve meaning) 2. Encode to console encoding with 'replace' error handler 3. Decode back to string

This produces strings that are guaranteed to be writable to the console without raising UnicodeEncodeError.

ATTRIBUTE DESCRIPTION
REPLACEMENTS

Class constant mapping Unicode characters to ASCII equivalents that preserve semantic meaning.

TYPE: dict[str, str]

encoding

The target console encoding detected at initialization.

TYPE: str

sanitize_string

Sanitize a single string for console encoding.

TYPE: method

Examples:

Use as a structlog processor:

processor = EncodingSafeProcessor()
event = {"event": "User said \u2018hello\u2019"}
result = processor(None, "info", event)
# result["event"] == "User said 'hello'"
Note

All sanitization is idempotent - processing already-sanitized strings produces identical output.

Source code in src/gepa_adk/utils/encoding.py
class EncodingSafeProcessor:
    r"""Sanitize strings for console encoding compatibility.

    A structlog processor that ensures all string values in the event dict
    can be safely written to the console regardless of its encoding (cp1252
    on Windows, UTF-8 on macOS/Linux).

    The sanitization strategy:
    1. Apply smart character replacements (preserve meaning)
    2. Encode to console encoding with 'replace' error handler
    3. Decode back to string

    This produces strings that are guaranteed to be writable to the console
    without raising UnicodeEncodeError.

    Attributes:
        REPLACEMENTS (dict[str, str]): Class constant mapping Unicode characters
            to ASCII equivalents that preserve semantic meaning.
        encoding (str): The target console encoding detected at initialization.
        sanitize_string (method): Sanitize a single string for console encoding.

    Examples:
        Use as a structlog processor:

        ```python
        processor = EncodingSafeProcessor()
        event = {"event": "User said \u2018hello\u2019"}
        result = processor(None, "info", event)
        # result["event"] == "User said 'hello'"
        ```

    Note:
        All sanitization is idempotent - processing already-sanitized strings
        produces identical output.
    """

    # Smart character replacements (preserve meaning)
    # These map common Unicode characters that cause issues on cp1252
    # to their closest ASCII equivalents
    REPLACEMENTS: dict[str, str] = {
        "\u2018": "'",  # Left single quote → apostrophe
        "\u2019": "'",  # Right single quote → apostrophe
        "\u201c": '"',  # Left double quote → quotation mark
        "\u201d": '"',  # Right double quote → quotation mark
        "\u2011": "-",  # Non-breaking hyphen → hyphen-minus
        "\u2013": "-",  # En dash → hyphen-minus
        "\u2014": "--",  # Em dash → double hyphen
        "\u2026": "...",  # Horizontal ellipsis → three periods
        "\u00a0": " ",  # Non-breaking space → regular space
    }

    def __init__(self) -> None:
        """Initialize the processor with console encoding detection.

        Detects the console encoding from sys.stdout.encoding, falling back
        to UTF-8 if detection fails (e.g., when stdout is redirected or
        unavailable).

        Note:
            Console encoding is cached at initialization time to avoid
            repeated attribute lookups during log processing.
        """
        self.encoding: str = getattr(sys.stdout, "encoding", None) or "utf-8"

    def __call__(
        self,
        logger: Any,
        method_name: str,
        event_dict: MutableMapping[str, Any],
    ) -> MutableMapping[str, Any]:
        """Process event dictionary, sanitizing all string values.

        Implements the structlog processor protocol. Recursively sanitizes
        all string values in the event dictionary, including nested dicts
        and lists.

        Args:
            logger: The wrapped logger instance (unused but required by
                protocol).
            method_name: Name of the log method called (e.g., "info", "debug").
                Unused but required by protocol.
            event_dict: Mutable mapping of log event data to sanitize.

        Returns:
            The event_dict with all string values sanitized for console
            encoding compatibility.

        Note:
            Original event_dict is not modified; a new dict is returned
            with sanitized values.
        """
        return self._sanitize_dict(event_dict)

    def sanitize_string(self, s: str) -> str:
        """Sanitize a single string for console encoding.

        Applies smart character replacements first to preserve meaning,
        then uses encode/decode with 'replace' error handler for any
        remaining unencodable characters.

        Args:
            s (str): The string to sanitize.

        Returns:
            A string that is guaranteed to be encodable to the console
            encoding without raising UnicodeEncodeError.

        Examples:
            Sanitize a string with smart quotes:

            ```python
            processor = EncodingSafeProcessor()
            result = processor.sanitize_string("User said \u2018hello\u2019")
            assert result == "User said 'hello'"
            ```

        Note:
            Order of operations: smart replacements run first to preserve
            semantic meaning, then encode/decode handles remaining characters.
        """
        # Apply smart replacements first (preserve meaning)
        for char, replacement in self.REPLACEMENTS.items():
            s = s.replace(char, replacement)

        # Encode/decode with replace for any remaining unencodable chars
        # This handles any Unicode characters not in our explicit mapping
        return s.encode(self.encoding, errors="replace").decode(self.encoding)

    def _sanitize_value(self, value: Any) -> Any:
        """Recursively sanitize any value.

        Handles strings, dicts, lists, and tuples. Other types pass through
        unchanged.

        Args:
            value: The value to sanitize.

        Returns:
            The sanitized value, with the same type as the input (except
            strings may have different content).
        """
        if isinstance(value, str):
            return self.sanitize_string(value)
        elif isinstance(value, MutableMapping):
            return self._sanitize_dict(value)
        elif isinstance(value, (list, tuple)):
            # Preserve the original type (list or tuple)
            return type(value)(self._sanitize_value(v) for v in value)
        # Non-string, non-collection types pass through unchanged
        return value

    def _sanitize_dict(self, d: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
        """Sanitize all values in a dictionary.

        Args:
            d: The dictionary or mutable mapping to sanitize.

        Returns:
            A new dictionary with all string values sanitized.
        """
        return {k: self._sanitize_value(v) for k, v in d.items()}

__init__

__init__() -> None

Initialize the processor with console encoding detection.

Detects the console encoding from sys.stdout.encoding, falling back to UTF-8 if detection fails (e.g., when stdout is redirected or unavailable).

Note

Console encoding is cached at initialization time to avoid repeated attribute lookups during log processing.

Source code in src/gepa_adk/utils/encoding.py
def __init__(self) -> None:
    """Initialize the processor with console encoding detection.

    Detects the console encoding from sys.stdout.encoding, falling back
    to UTF-8 if detection fails (e.g., when stdout is redirected or
    unavailable).

    Note:
        Console encoding is cached at initialization time to avoid
        repeated attribute lookups during log processing.
    """
    self.encoding: str = getattr(sys.stdout, "encoding", None) or "utf-8"

__call__

__call__(
    logger: Any,
    method_name: str,
    event_dict: MutableMapping[str, Any],
) -> MutableMapping[str, Any]

Process event dictionary, sanitizing all string values.

Implements the structlog processor protocol. Recursively sanitizes all string values in the event dictionary, including nested dicts and lists.

PARAMETER DESCRIPTION
logger

The wrapped logger instance (unused but required by protocol).

TYPE: Any

method_name

Name of the log method called (e.g., "info", "debug"). Unused but required by protocol.

TYPE: str

event_dict

Mutable mapping of log event data to sanitize.

TYPE: MutableMapping[str, Any]

RETURNS DESCRIPTION
MutableMapping[str, Any]

The event_dict with all string values sanitized for console

MutableMapping[str, Any]

encoding compatibility.

Note

Original event_dict is not modified; a new dict is returned with sanitized values.

Source code in src/gepa_adk/utils/encoding.py
def __call__(
    self,
    logger: Any,
    method_name: str,
    event_dict: MutableMapping[str, Any],
) -> MutableMapping[str, Any]:
    """Process event dictionary, sanitizing all string values.

    Implements the structlog processor protocol. Recursively sanitizes
    all string values in the event dictionary, including nested dicts
    and lists.

    Args:
        logger: The wrapped logger instance (unused but required by
            protocol).
        method_name: Name of the log method called (e.g., "info", "debug").
            Unused but required by protocol.
        event_dict: Mutable mapping of log event data to sanitize.

    Returns:
        The event_dict with all string values sanitized for console
        encoding compatibility.

    Note:
        Original event_dict is not modified; a new dict is returned
        with sanitized values.
    """
    return self._sanitize_dict(event_dict)

sanitize_string

sanitize_string(s: str) -> str

Sanitize a single string for console encoding.

Applies smart character replacements first to preserve meaning, then uses encode/decode with 'replace' error handler for any remaining unencodable characters.

PARAMETER DESCRIPTION
s

The string to sanitize.

TYPE: str

RETURNS DESCRIPTION
str

A string that is guaranteed to be encodable to the console

str

encoding without raising UnicodeEncodeError.

Examples:

Sanitize a string with smart quotes:

processor = EncodingSafeProcessor()
result = processor.sanitize_string("User said ‘hello’")
assert result == "User said 'hello'"
Note

Order of operations: smart replacements run first to preserve semantic meaning, then encode/decode handles remaining characters.

Source code in src/gepa_adk/utils/encoding.py
def sanitize_string(self, s: str) -> str:
    """Sanitize a single string for console encoding.

    Applies smart character replacements first to preserve meaning,
    then uses encode/decode with 'replace' error handler for any
    remaining unencodable characters.

    Args:
        s (str): The string to sanitize.

    Returns:
        A string that is guaranteed to be encodable to the console
        encoding without raising UnicodeEncodeError.

    Examples:
        Sanitize a string with smart quotes:

        ```python
        processor = EncodingSafeProcessor()
        result = processor.sanitize_string("User said \u2018hello\u2019")
        assert result == "User said 'hello'"
        ```

    Note:
        Order of operations: smart replacements run first to preserve
        semantic meaning, then encode/decode handles remaining characters.
    """
    # Apply smart replacements first (preserve meaning)
    for char, replacement in self.REPLACEMENTS.items():
        s = s.replace(char, replacement)

    # Encode/decode with replace for any remaining unencodable chars
    # This handles any Unicode characters not in our explicit mapping
    return s.encode(self.encoding, errors="replace").decode(self.encoding)

SchemaValidationResult dataclass

Result of validating schema text.

This dataclass contains both the deserialized Pydantic model class and metadata about the schema structure.

ATTRIBUTE DESCRIPTION
schema_class

The deserialized Pydantic BaseModel subclass.

TYPE: type[BaseModel]

class_name

Name of the class found in the schema text.

TYPE: str

field_count

Number of fields defined in the schema.

TYPE: int

field_names

Tuple of field names in the schema.

TYPE: tuple[str, ...]

Examples:

Validating schema text and inspecting results:

result = validate_schema_text(schema_text)
print(f"Class: {result.class_name}")
print(f"Fields: {result.field_names}")
instance = result.schema_class(name="test", value=42)

Creating a result directly (typically done by validate_schema_text):

result = SchemaValidationResult(
    schema_class=MySchema,
    class_name="MySchema",
    field_count=2,
    field_names=("name", "value"),
)
Source code in src/gepa_adk/utils/schema_utils.py
@dataclass(frozen=True)
class SchemaValidationResult:
    """Result of validating schema text.

    This dataclass contains both the deserialized Pydantic model class
    and metadata about the schema structure.

    Attributes:
        schema_class (type[BaseModel]): The deserialized Pydantic BaseModel subclass.
        class_name (str): Name of the class found in the schema text.
        field_count (int): Number of fields defined in the schema.
        field_names (tuple[str, ...]): Tuple of field names in the schema.

    Examples:
        Validating schema text and inspecting results:

        ```python
        result = validate_schema_text(schema_text)
        print(f"Class: {result.class_name}")
        print(f"Fields: {result.field_names}")
        instance = result.schema_class(name="test", value=42)
        ```

        Creating a result directly (typically done by validate_schema_text):

        ```python
        result = SchemaValidationResult(
            schema_class=MySchema,
            class_name="MySchema",
            field_count=2,
            field_names=("name", "value"),
        )
        ```
    """

    schema_class: type[BaseModel]
    class_name: str
    field_count: int
    field_names: tuple[str, ...]

StateGuard

Validates and repairs mutated component_text to preserve ADK state tokens.

StateGuard ensures that required state injection tokens are preserved during component_text evolution, and escapes unauthorized new tokens introduced by reflection. Supports simple tokens ({name}), prefixed tokens ({app:settings}), optional tokens ({name?}), and combined formats ({app:config?}).

ATTRIBUTE DESCRIPTION
required_tokens

List of tokens that must always be present, including braces (e.g., ["{user_id}", "{app:settings}", "{name?}"]).

TYPE: list[str]

repair_missing

Whether to re-append missing tokens. Defaults to True.

TYPE: bool

escape_unauthorized

Whether to escape new unauthorized tokens. Defaults to True.

TYPE: bool

_token_pattern

Compiled regex for token detection (private).

TYPE: Pattern[str]

Examples:

Basic usage with token repair:

guard = StateGuard(required_tokens=["{user_id}", "{context}"])
original = "Hello {user_id}, context: {context}"
mutated = "Hello {user_id}, welcome!"
result = guard.validate(original, mutated)
# result == "Hello {user_id}, welcome!\n\n{context}"

Escaping unauthorized tokens:

guard = StateGuard(required_tokens=["{user_id}"])
original = "Process for {user_id}"
mutated = "Process for {user_id} with {malicious}"
result = guard.validate(original, mutated)
# result == "Process for {user_id} with {{malicious}}"
Note

All validation logic is stateless and operates on string inputs only. No external dependencies or I/O operations are performed.

Supported token formats: - Simple tokens: {name}, {user_id}, {context} - Prefixed tokens: {app:settings}, {user:api_key}, {temp:session} - Optional tokens: {name?}, {user_id?} - Combined: {app:config?}, {user:pref?}

Source code in src/gepa_adk/utils/state_guard.py
class StateGuard:
    r"""Validates and repairs mutated component_text to preserve ADK state tokens.

    StateGuard ensures that required state injection tokens are preserved
    during component_text evolution, and escapes unauthorized new tokens introduced
    by reflection. Supports simple tokens ({name}), prefixed tokens ({app:settings}),
    optional tokens ({name?}), and combined formats ({app:config?}).

    Attributes:
        required_tokens (list[str]): List of tokens that must always be present,
            including braces (e.g., ["{user_id}", "{app:settings}", "{name?}"]).
        repair_missing (bool): Whether to re-append missing tokens. Defaults to True.
        escape_unauthorized (bool): Whether to escape new unauthorized tokens.
            Defaults to True.
        _token_pattern (re.Pattern[str]): Compiled regex for token detection (private).

    Examples:
        Basic usage with token repair:

        ```python
        guard = StateGuard(required_tokens=["{user_id}", "{context}"])
        original = "Hello {user_id}, context: {context}"
        mutated = "Hello {user_id}, welcome!"
        result = guard.validate(original, mutated)
        # result == "Hello {user_id}, welcome!\n\n{context}"
        ```

        Escaping unauthorized tokens:

        ```python
        guard = StateGuard(required_tokens=["{user_id}"])
        original = "Process for {user_id}"
        mutated = "Process for {user_id} with {malicious}"
        result = guard.validate(original, mutated)
        # result == "Process for {user_id} with {{malicious}}"
        ```

    Note:
        All validation logic is stateless and operates on string inputs only.
        No external dependencies or I/O operations are performed.

        Supported token formats:
            - Simple tokens: {name}, {user_id}, {context}
            - Prefixed tokens: {app:settings}, {user:api_key}, {temp:session}
            - Optional tokens: {name?}, {user_id?}
            - Combined: {app:config?}, {user:pref?}
    """

    def __init__(
        self,
        required_tokens: list[str] | None = None,
        repair_missing: bool = True,
        escape_unauthorized: bool = True,
    ) -> None:
        """Initialize StateGuard with configuration.

        Args:
            required_tokens: List of tokens that must be preserved,
                including braces (e.g., ["{user_id}", "{context}"]).
                Defaults to empty list.
            repair_missing: If True, re-append missing required tokens.
                Defaults to True.
            escape_unauthorized: If True, escape new unauthorized tokens.
                Defaults to True.

        Note:
            Configuration determines which tokens are protected and which
            behaviors are enabled. Both repair and escape are enabled by default
            for maximum safety.
        """
        self.required_tokens = required_tokens or []
        self.repair_missing = repair_missing
        self.escape_unauthorized = escape_unauthorized
        self._token_pattern = re.compile(r"(?<!\{)\{(\w+(?::\w+)?(?:\?)?)\}(?!\})")

    def _extract_tokens(self, text: str) -> set[str]:
        r"""Extract token names from text using regex.

        Args:
            text: Text to extract tokens from.

        Returns:
            Set of token names (without braces), including full token content
            for prefixed and optional tokens (e.g., "app:settings", "name?").

        Note:
            Scans text using the regex pattern
            `(?<!\{)\{(\w+(?::\w+)?(?:\?)?)\}(?!\})`.

            This pattern matches:
            - Simple tokens: {name} → "name"
            - Prefixed tokens: {app:settings} → "app:settings"
            - Optional tokens: {name?} → "name?"
            - Combined: {app:config?} → "app:config?"

            The negative lookbehind `(?<!\{)` and lookahead `(?!\})` ensure
            already-escaped tokens like `{{token}}` are NOT matched.

            Artifact references (e.g., {artifact.name}) are not matched as they
            contain dots and have different semantics.
        """
        matches = self._token_pattern.findall(text)
        return set(matches)

    def get_validation_summary(
        self, original: str, mutated: str
    ) -> tuple[list[str], list[str]]:
        """Summarize missing token repairs and unauthorized token escapes.

        Args:
            original: The component_text before mutation (reference for tokens).
            mutated: The component_text after mutation (pre-validation).

        Returns:
            Tuple of (repaired_tokens, escaped_tokens) as token strings with braces.

        Examples:
            Summarize repairs and escapes before validation:

            ```python
            guard = StateGuard(required_tokens=["{user_id}"])
            repaired, escaped = guard.get_validation_summary(
                "Hello {user_id}",
                "Hello {user_id} {malicious}",
            )
            # repaired == []
            # escaped == ["{malicious}"]
            ```

        Note:
            Outputs what validate() would repair or escape given the
            current configuration and inputs, using the same token detection
            logic as validate(), without modifying the component_text.
        """
        original_tokens = self._extract_tokens(original)
        mutated_tokens = self._extract_tokens(mutated)
        required_token_names = {token.strip("{}") for token in self.required_tokens}

        repaired_tokens: list[str] = []
        if self.repair_missing:
            missing_required = (original_tokens & required_token_names) - mutated_tokens
            repaired_tokens = [f"{{{token}}}" for token in sorted(missing_required)]

        escaped_tokens: list[str] = []
        if self.escape_unauthorized:
            new_tokens = mutated_tokens - original_tokens
            unauthorized_tokens = new_tokens - required_token_names
            escaped_tokens = [f"{{{token}}}" for token in sorted(unauthorized_tokens)]

        return repaired_tokens, escaped_tokens

    def validate(self, original: str, mutated: str) -> str:
        r"""Validate and repair mutated component_text.

        Compares the original and mutated component_text to:
        1. Re-append missing required tokens (if `repair_missing=True`)
        2. Escape unauthorized new tokens (if `escape_unauthorized=True`)

        Args:
            original: The component_text before mutation (reference for tokens).
                Used to determine which tokens were present initially.
            mutated: The component_text after mutation (to be validated).
                This is the component_text that may have missing or unauthorized tokens.

        Returns:
            The mutated component_text with repairs and escapes applied.
            Missing required tokens are appended at the end with `\n\n{token}`.
            Unauthorized new tokens are escaped by doubling braces: `{token}` → `{{token}}`.

        Examples:
            Repair missing token:

            ```python
            guard = StateGuard(required_tokens=["{user_id}"])
            result = guard.validate("Hello {user_id}", "Hello")
            # result == "Hello\n\n{user_id}"
            ```

            Escape unauthorized token:

            ```python
            guard = StateGuard(required_tokens=["{user_id}"])
            result = guard.validate(
                "Process {user_id}", "Process {user_id} {malicious}"
            )
            # result == "Process {user_id} {{malicious}}"
            ```

        Note:
            Only tokens present in both the original component_text and the
            required_tokens list are eligible for repair. New tokens not in
            required_tokens are escaped by default.
        """
        result = mutated

        # Extract tokens from both component_text values
        original_tokens = self._extract_tokens(original)
        mutated_tokens = self._extract_tokens(mutated)

        # Normalize required_tokens (strip braces for comparison)
        required_token_names = {token.strip("{}") for token in self.required_tokens}

        # Repair missing tokens
        if self.repair_missing:
            # Find tokens in original AND required_tokens AND missing from mutated
            missing_required = (original_tokens & required_token_names) - mutated_tokens

            # Append missing tokens
            for token_name in sorted(missing_required):
                full_token = f"{{{token_name}}}"
                result += f"\n\n{full_token}"

        # Escape unauthorized new tokens
        if self.escape_unauthorized:
            # Find tokens that are new (in mutated but not in original)
            new_tokens = mutated_tokens - original_tokens

            # Only escape tokens that are NOT in required_tokens
            unauthorized_tokens = new_tokens - required_token_names

            # Escape each unauthorized token
            for token_name in unauthorized_tokens:
                full_token = f"{{{token_name}}}"
                escaped_token = f"{{{{{token_name}}}}}"
                result = result.replace(full_token, escaped_token)

        return result

__init__

__init__(
    required_tokens: list[str] | None = None,
    repair_missing: bool = True,
    escape_unauthorized: bool = True,
) -> None

Initialize StateGuard with configuration.

PARAMETER DESCRIPTION
required_tokens

List of tokens that must be preserved, including braces (e.g., ["{user_id}", "{context}"]). Defaults to empty list.

TYPE: list[str] | None DEFAULT: None

repair_missing

If True, re-append missing required tokens. Defaults to True.

TYPE: bool DEFAULT: True

escape_unauthorized

If True, escape new unauthorized tokens. Defaults to True.

TYPE: bool DEFAULT: True

Note

Configuration determines which tokens are protected and which behaviors are enabled. Both repair and escape are enabled by default for maximum safety.

Source code in src/gepa_adk/utils/state_guard.py
def __init__(
    self,
    required_tokens: list[str] | None = None,
    repair_missing: bool = True,
    escape_unauthorized: bool = True,
) -> None:
    """Initialize StateGuard with configuration.

    Args:
        required_tokens: List of tokens that must be preserved,
            including braces (e.g., ["{user_id}", "{context}"]).
            Defaults to empty list.
        repair_missing: If True, re-append missing required tokens.
            Defaults to True.
        escape_unauthorized: If True, escape new unauthorized tokens.
            Defaults to True.

    Note:
        Configuration determines which tokens are protected and which
        behaviors are enabled. Both repair and escape are enabled by default
        for maximum safety.
    """
    self.required_tokens = required_tokens or []
    self.repair_missing = repair_missing
    self.escape_unauthorized = escape_unauthorized
    self._token_pattern = re.compile(r"(?<!\{)\{(\w+(?::\w+)?(?:\?)?)\}(?!\})")

get_validation_summary

get_validation_summary(
    original: str, mutated: str
) -> tuple[list[str], list[str]]

Summarize missing token repairs and unauthorized token escapes.

PARAMETER DESCRIPTION
original

The component_text before mutation (reference for tokens).

TYPE: str

mutated

The component_text after mutation (pre-validation).

TYPE: str

RETURNS DESCRIPTION
tuple[list[str], list[str]]

Tuple of (repaired_tokens, escaped_tokens) as token strings with braces.

Examples:

Summarize repairs and escapes before validation:

guard = StateGuard(required_tokens=["{user_id}"])
repaired, escaped = guard.get_validation_summary(
    "Hello {user_id}",
    "Hello {user_id} {malicious}",
)
# repaired == []
# escaped == ["{malicious}"]
Note

Outputs what validate() would repair or escape given the current configuration and inputs, using the same token detection logic as validate(), without modifying the component_text.

Source code in src/gepa_adk/utils/state_guard.py
def get_validation_summary(
    self, original: str, mutated: str
) -> tuple[list[str], list[str]]:
    """Summarize missing token repairs and unauthorized token escapes.

    Args:
        original: The component_text before mutation (reference for tokens).
        mutated: The component_text after mutation (pre-validation).

    Returns:
        Tuple of (repaired_tokens, escaped_tokens) as token strings with braces.

    Examples:
        Summarize repairs and escapes before validation:

        ```python
        guard = StateGuard(required_tokens=["{user_id}"])
        repaired, escaped = guard.get_validation_summary(
            "Hello {user_id}",
            "Hello {user_id} {malicious}",
        )
        # repaired == []
        # escaped == ["{malicious}"]
        ```

    Note:
        Outputs what validate() would repair or escape given the
        current configuration and inputs, using the same token detection
        logic as validate(), without modifying the component_text.
    """
    original_tokens = self._extract_tokens(original)
    mutated_tokens = self._extract_tokens(mutated)
    required_token_names = {token.strip("{}") for token in self.required_tokens}

    repaired_tokens: list[str] = []
    if self.repair_missing:
        missing_required = (original_tokens & required_token_names) - mutated_tokens
        repaired_tokens = [f"{{{token}}}" for token in sorted(missing_required)]

    escaped_tokens: list[str] = []
    if self.escape_unauthorized:
        new_tokens = mutated_tokens - original_tokens
        unauthorized_tokens = new_tokens - required_token_names
        escaped_tokens = [f"{{{token}}}" for token in sorted(unauthorized_tokens)]

    return repaired_tokens, escaped_tokens

validate

validate(original: str, mutated: str) -> str

Validate and repair mutated component_text.

Compares the original and mutated component_text to: 1. Re-append missing required tokens (if repair_missing=True) 2. Escape unauthorized new tokens (if escape_unauthorized=True)

PARAMETER DESCRIPTION
original

The component_text before mutation (reference for tokens). Used to determine which tokens were present initially.

TYPE: str

mutated

The component_text after mutation (to be validated). This is the component_text that may have missing or unauthorized tokens.

TYPE: str

RETURNS DESCRIPTION
str

The mutated component_text with repairs and escapes applied.

str

Missing required tokens are appended at the end with \n\n{token}.

str

Unauthorized new tokens are escaped by doubling braces: {token}{{token}}.

Examples:

Repair missing token:

guard = StateGuard(required_tokens=["{user_id}"])
result = guard.validate("Hello {user_id}", "Hello")
# result == "Hello\n\n{user_id}"

Escape unauthorized token:

guard = StateGuard(required_tokens=["{user_id}"])
result = guard.validate(
    "Process {user_id}", "Process {user_id} {malicious}"
)
# result == "Process {user_id} {{malicious}}"
Note

Only tokens present in both the original component_text and the required_tokens list are eligible for repair. New tokens not in required_tokens are escaped by default.

Source code in src/gepa_adk/utils/state_guard.py
def validate(self, original: str, mutated: str) -> str:
    r"""Validate and repair mutated component_text.

    Compares the original and mutated component_text to:
    1. Re-append missing required tokens (if `repair_missing=True`)
    2. Escape unauthorized new tokens (if `escape_unauthorized=True`)

    Args:
        original: The component_text before mutation (reference for tokens).
            Used to determine which tokens were present initially.
        mutated: The component_text after mutation (to be validated).
            This is the component_text that may have missing or unauthorized tokens.

    Returns:
        The mutated component_text with repairs and escapes applied.
        Missing required tokens are appended at the end with `\n\n{token}`.
        Unauthorized new tokens are escaped by doubling braces: `{token}` → `{{token}}`.

    Examples:
        Repair missing token:

        ```python
        guard = StateGuard(required_tokens=["{user_id}"])
        result = guard.validate("Hello {user_id}", "Hello")
        # result == "Hello\n\n{user_id}"
        ```

        Escape unauthorized token:

        ```python
        guard = StateGuard(required_tokens=["{user_id}"])
        result = guard.validate(
            "Process {user_id}", "Process {user_id} {malicious}"
        )
        # result == "Process {user_id} {{malicious}}"
        ```

    Note:
        Only tokens present in both the original component_text and the
        required_tokens list are eligible for repair. New tokens not in
        required_tokens are escaped by default.
    """
    result = mutated

    # Extract tokens from both component_text values
    original_tokens = self._extract_tokens(original)
    mutated_tokens = self._extract_tokens(mutated)

    # Normalize required_tokens (strip braces for comparison)
    required_token_names = {token.strip("{}") for token in self.required_tokens}

    # Repair missing tokens
    if self.repair_missing:
        # Find tokens in original AND required_tokens AND missing from mutated
        missing_required = (original_tokens & required_token_names) - mutated_tokens

        # Append missing tokens
        for token_name in sorted(missing_required):
            full_token = f"{{{token_name}}}"
            result += f"\n\n{full_token}"

    # Escape unauthorized new tokens
    if self.escape_unauthorized:
        # Find tokens that are new (in mutated but not in original)
        new_tokens = mutated_tokens - original_tokens

        # Only escape tokens that are NOT in required_tokens
        unauthorized_tokens = new_tokens - required_token_names

        # Escape each unauthorized token
        for token_name in unauthorized_tokens:
            full_token = f"{{{token_name}}}"
            escaped_token = f"{{{{{token_name}}}}}"
            result = result.replace(full_token, escaped_token)

    return result

deserialize_generate_config

deserialize_generate_config(
    yaml_text: str,
    existing: "GenerateContentConfig | None" = None,
) -> "GenerateContentConfig"

Parse YAML text into GenerateContentConfig, optionally merging with existing.

Parses the YAML text and creates a new GenerateContentConfig. If an existing config is provided, unspecified parameters retain existing values (merge).

PARAMETER DESCRIPTION
yaml_text

YAML string to parse. May be empty.

TYPE: str

existing

Optional existing config to merge with. If provided, unspecified parameters retain existing values.

TYPE: 'GenerateContentConfig | None' DEFAULT: None

RETURNS DESCRIPTION
'GenerateContentConfig'

New GenerateContentConfig instance.

RAISES DESCRIPTION
ConfigValidationError

If yaml_text is malformed YAML.

Examples:

# Parse standalone
config = deserialize_generate_config("temperature: 0.5")
assert config.temperature == 0.5

# Merge with existing
existing = GenerateContentConfig(temperature=0.7, top_p=0.9)
merged = deserialize_generate_config("temperature: 0.5", existing)
assert merged.temperature == 0.5
assert merged.top_p == 0.9  # Preserved from existing
Note

Does NOT validate parameter constraints - use validate_generate_config() separately. This allows inspection of invalid values before rejection.

Source code in src/gepa_adk/utils/config_utils.py
def deserialize_generate_config(
    yaml_text: str,
    existing: "GenerateContentConfig | None" = None,
) -> "GenerateContentConfig":
    """Parse YAML text into GenerateContentConfig, optionally merging with existing.

    Parses the YAML text and creates a new GenerateContentConfig. If an existing
    config is provided, unspecified parameters retain existing values (merge).

    Args:
        yaml_text: YAML string to parse. May be empty.
        existing: Optional existing config to merge with. If provided,
            unspecified parameters retain existing values.

    Returns:
        New GenerateContentConfig instance.

    Raises:
        ConfigValidationError: If yaml_text is malformed YAML.

    Examples:
        ```python
        # Parse standalone
        config = deserialize_generate_config("temperature: 0.5")
        assert config.temperature == 0.5

        # Merge with existing
        existing = GenerateContentConfig(temperature=0.7, top_p=0.9)
        merged = deserialize_generate_config("temperature: 0.5", existing)
        assert merged.temperature == 0.5
        assert merged.top_p == 0.9  # Preserved from existing
        ```

    Note:
        Does NOT validate parameter constraints - use validate_generate_config()
        separately. This allows inspection of invalid values before rejection.
    """
    # Import here to avoid circular imports and keep ADK types in adapters scope
    from google.genai.types import GenerateContentConfig

    # Handle empty input
    if not yaml_text or not yaml_text.strip():
        if existing is not None:
            return existing
        return GenerateContentConfig()

    # Parse YAML
    try:
        parsed = yaml.safe_load(yaml_text)
    except yaml.YAMLError as e:
        raise ConfigValidationError(
            f"Invalid YAML syntax: {e}",
            errors=[str(e)],
        ) from e

    # Handle non-dict YAML (e.g., scalar value)
    if not isinstance(parsed, dict):
        raise ConfigValidationError(
            f"Expected YAML dict, got {type(parsed).__name__}",
            errors=[f"YAML must be a mapping, got {type(parsed).__name__}"],
        )

    # Build merged config dict
    merged_dict: dict[str, Any] = {}

    # Start with existing values if provided
    if existing is not None:
        try:
            existing_dict = existing.model_dump(exclude_none=True)
        except AttributeError:
            existing_dict = {
                k: getattr(existing, k, None)
                for k in EVOLVABLE_PARAMS
                if getattr(existing, k, None) is not None
            }
        # Only include evolvable params from existing
        merged_dict = {k: v for k, v in existing_dict.items() if k in EVOLVABLE_PARAMS}

    # Override with parsed values (evolvable params only)
    for key, value in parsed.items():
        if key in EVOLVABLE_PARAMS:
            merged_dict[key] = value

    logger.debug(
        "config_utils.deserialize",
        param_count=len(merged_dict),
        params=list(merged_dict.keys()),
        had_existing=existing is not None,
    )

    return GenerateContentConfig(**merged_dict)

serialize_generate_config

serialize_generate_config(
    config: "GenerateContentConfig | None",
) -> str

Convert GenerateContentConfig to YAML string with parameter descriptions.

Serializes only the evolvable parameters (temperature, top_p, top_k, max_output_tokens, presence_penalty, frequency_penalty) with YAML comments describing each parameter.

PARAMETER DESCRIPTION
config

The GenerateContentConfig instance to serialize. Returns empty string if None.

TYPE: 'GenerateContentConfig | None'

RETURNS DESCRIPTION
str

YAML string with parameter descriptions as comments.

str

Empty string if config is None or has no evolvable parameters set.

Examples:

config = GenerateContentConfig(temperature=0.7, top_p=0.9)
yaml_text = serialize_generate_config(config)
# yaml_text contains:
# # temperature: Controls randomness (0.0=deterministic, 2.0=creative)
# temperature: 0.7
# # top_p: Nucleus sampling threshold (0.0-1.0)
# top_p: 0.9
Note

Output is parseable by yaml.safe_load() and includes only evolvable parameters that have non-None values.

Source code in src/gepa_adk/utils/config_utils.py
def serialize_generate_config(config: "GenerateContentConfig | None") -> str:
    """Convert GenerateContentConfig to YAML string with parameter descriptions.

    Serializes only the evolvable parameters (temperature, top_p, top_k,
    max_output_tokens, presence_penalty, frequency_penalty) with YAML
    comments describing each parameter.

    Args:
        config: The GenerateContentConfig instance to serialize.
            Returns empty string if None.

    Returns:
        YAML string with parameter descriptions as comments.
        Empty string if config is None or has no evolvable parameters set.

    Examples:
        ```python
        config = GenerateContentConfig(temperature=0.7, top_p=0.9)
        yaml_text = serialize_generate_config(config)
        # yaml_text contains:
        # # temperature: Controls randomness (0.0=deterministic, 2.0=creative)
        # temperature: 0.7
        # # top_p: Nucleus sampling threshold (0.0-1.0)
        # top_p: 0.9
        ```

    Note:
        Output is parseable by yaml.safe_load() and includes only evolvable
        parameters that have non-None values.
    """
    if config is None:
        return ""

    # Extract evolvable parameters using model_dump if available (Pydantic v2)
    try:
        config_dict = config.model_dump(exclude_none=True)
    except AttributeError:
        # Fallback for non-Pydantic objects
        config_dict = {
            k: getattr(config, k, None)
            for k in EVOLVABLE_PARAMS
            if getattr(config, k, None) is not None
        }

    # Filter to only evolvable parameters with non-None values
    evolvable_dict = {
        k: v for k, v in config_dict.items() if k in EVOLVABLE_PARAMS and v is not None
    }

    if not evolvable_dict:
        return ""

    # Build YAML with comments
    lines = ["# LLM Generation Parameters"]
    for param, description in EVOLVABLE_PARAMS.items():
        if param in evolvable_dict:
            lines.append(f"# {param}: {description}")
            lines.append(f"{param}: {evolvable_dict[param]}")

    result = "\n".join(lines)
    logger.debug(
        "config_utils.serialize",
        param_count=len(evolvable_dict),
        params=list(evolvable_dict.keys()),
    )
    return result

validate_generate_config

validate_generate_config(
    config_dict: dict[str, Any],
) -> list[str]

Validate config dict against known parameter constraints.

Checks each parameter value against its defined constraints: - temperature: 0.0 to 2.0 - top_p: 0.0 to 1.0 - top_k: > 0 - max_output_tokens: > 0 - presence_penalty: -2.0 to 2.0 - frequency_penalty: -2.0 to 2.0

Unknown parameters trigger a warning log but do NOT cause errors (they may be model-specific parameters).

PARAMETER DESCRIPTION
config_dict

Dictionary of parameter name to value.

TYPE: dict[str, Any]

RETURNS DESCRIPTION
list[str]

List of validation error messages. Empty list means valid.

Examples:

# Valid config
errors = validate_generate_config({"temperature": 0.7, "top_p": 0.9})
assert errors == []

# Invalid config
errors = validate_generate_config({"temperature": 3.0})
assert len(errors) == 1
assert "temperature" in errors[0]

# Unknown param - no error, just warning
errors = validate_generate_config({"unknown_param": 42})
assert errors == []  # Warning logged, but no error
Note

Only validates known evolvable parameters. Unknown parameters are logged as warnings but accepted (may be model-specific).

Source code in src/gepa_adk/utils/config_utils.py
def validate_generate_config(config_dict: dict[str, Any]) -> list[str]:
    """Validate config dict against known parameter constraints.

    Checks each parameter value against its defined constraints:
    - temperature: 0.0 to 2.0
    - top_p: 0.0 to 1.0
    - top_k: > 0
    - max_output_tokens: > 0
    - presence_penalty: -2.0 to 2.0
    - frequency_penalty: -2.0 to 2.0

    Unknown parameters trigger a warning log but do NOT cause errors
    (they may be model-specific parameters).

    Args:
        config_dict: Dictionary of parameter name to value.

    Returns:
        List of validation error messages. Empty list means valid.

    Examples:
        ```python
        # Valid config
        errors = validate_generate_config({"temperature": 0.7, "top_p": 0.9})
        assert errors == []

        # Invalid config
        errors = validate_generate_config({"temperature": 3.0})
        assert len(errors) == 1
        assert "temperature" in errors[0]

        # Unknown param - no error, just warning
        errors = validate_generate_config({"unknown_param": 42})
        assert errors == []  # Warning logged, but no error
        ```

    Note:
        Only validates known evolvable parameters. Unknown parameters are
        logged as warnings but accepted (may be model-specific).
    """
    errors: list[str] = []

    for key, value in config_dict.items():
        if key not in EVOLVABLE_PARAMS:
            # Unknown parameter - log warning but don't reject
            logger.warning(
                "config_utils.validate.unknown_param",
                param=key,
                value=value,
            )
            continue

        # Skip None values
        if value is None:
            continue

        # Get validation rule
        rule = _VALIDATION_RULES.get(key)
        if rule is None:
            continue

        min_val, max_val, error_template = rule

        # Type check - must be numeric
        if not isinstance(value, (int, float)):
            errors.append(f"{key} must be a number, got {type(value).__name__}")
            continue

        # Range validation
        # For strictly positive params (top_k, max_output_tokens), check > 0 not >= 0
        if key in ("top_k", "max_output_tokens"):
            if value <= 0:
                errors.append(error_template.format(value=value))
        elif min_val is not None and value < min_val:
            errors.append(error_template.format(value=value))
        elif max_val is not None and value > max_val:
            errors.append(error_template.format(value=value))

    if errors:
        logger.debug(
            "config_utils.validate.failed",
            error_count=len(errors),
            errors=errors,
        )
    else:
        logger.debug(
            "config_utils.validate.success",
            param_count=len(config_dict),
        )

    return errors

extract_final_output

extract_final_output(
    events: list[Any], *, prefer_concatenated: bool = False
) -> str

Extract final output text from ADK event stream.

Extracts the final response text from ADK events, handling both event.actions.response_content (preferred) and event.content.parts (fallback) response sources. Filters out reasoning/thought content marked with part.thought=True.

PARAMETER DESCRIPTION
events

List of ADK Event objects from agent execution.

TYPE: list[Any]

prefer_concatenated

If True, concatenate all non-thought text parts from all final response events. If False (default), return only the last non-thought text part from the last final response event.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

Extracted output text as a string. Returns empty string if no valid

str

output can be extracted (empty events, no final responses, all thought

str

parts, or missing attributes).

Examples:

Basic extraction (default mode - returns LAST final response):

events = await runner.run_async(...)
output = extract_final_output(events)

Streaming/concatenation mode for CriticScorer:

events = await runner.run_async(...)
output = extract_final_output(events, prefer_concatenated=True)
Note

Scans events for is_final_response()=True, filters thought parts, skips empty/None text, and handles missing attributes gracefully. Response source priority: response_content > content.parts. Default mode returns LAST final response (for multi-agent pipelines).

Source code in src/gepa_adk/utils/events.py
def extract_final_output(
    events: list[Any],
    *,
    prefer_concatenated: bool = False,
) -> str:
    """Extract final output text from ADK event stream.

    Extracts the final response text from ADK events, handling both
    `event.actions.response_content` (preferred) and `event.content.parts`
    (fallback) response sources. Filters out reasoning/thought content
    marked with `part.thought=True`.

    Args:
        events: List of ADK Event objects from agent execution.
        prefer_concatenated: If True, concatenate all non-thought text parts
            from all final response events. If False (default), return only
            the last non-thought text part from the last final response event.

    Returns:
        Extracted output text as a string. Returns empty string if no valid
        output can be extracted (empty events, no final responses, all thought
        parts, or missing attributes).

    Examples:
        Basic extraction (default mode - returns LAST final response):

        ```python
        events = await runner.run_async(...)
        output = extract_final_output(events)
        ```

        Streaming/concatenation mode for CriticScorer:

        ```python
        events = await runner.run_async(...)
        output = extract_final_output(events, prefer_concatenated=True)
        ```

    Note:
        Scans events for is_final_response()=True, filters thought parts,
        skips empty/None text, and handles missing attributes gracefully.
        Response source priority: response_content > content.parts. Default
        mode returns LAST final response (for multi-agent pipelines).
    """
    if not events:
        return ""

    collected_parts: list[str] = []
    last_output: str = ""

    for event in events:
        # Skip non-final events
        if not hasattr(event, "is_final_response") or not event.is_final_response():
            continue

        # Try to extract text from this event
        event_text = _extract_text_from_event(event)

        if event_text:
            if prefer_concatenated:
                collected_parts.append(event_text)
            else:
                # Default mode: keep updating to get LAST final response
                last_output = event_text

    # For concatenated mode, join all collected parts
    if prefer_concatenated:
        return "".join(collected_parts)

    return last_output

extract_trajectory

extract_trajectory(
    events: list[Any],
    final_output: str = "",
    error: str | None = None,
    config: TrajectoryConfig | None = None,
) -> ADKTrajectory

Extract trajectory from ADK execution events with optional processing.

Extracts tool calls, state deltas, and token usage from ADK event stream, applying redaction and truncation based on configuration.

PARAMETER DESCRIPTION
events

List of ADK Event objects from agent execution.

TYPE: list[Any]

final_output

Final text response from the agent. Defaults to empty string.

TYPE: str DEFAULT: ''

error

Error message if execution failed. Defaults to None.

TYPE: str | None DEFAULT: None

config

Extraction configuration. If None, uses TrajectoryConfig defaults.

TYPE: TrajectoryConfig | None DEFAULT: None

RETURNS DESCRIPTION
ADKTrajectory

ADKTrajectory with extracted and processed data according to config.

Examples:

Basic extraction with defaults:

from google.adk.events import Event

events = [...]  # From ADK runner
trajectory = extract_trajectory(events, final_output="Response")

Custom configuration:

config = TrajectoryConfig(
    include_tool_calls=True,
    include_state_deltas=False,
    redact_sensitive=True,
    max_string_length=5000,
)
trajectory = extract_trajectory(events, config=config)

With error:

trajectory = extract_trajectory(
    events=[],
    final_output="",
    error="Agent timeout after 30s",
)
Note

Extraction follows this order: 1. Extract raw data from events (tool calls, state, tokens) 2. Apply redaction if config.redact_sensitive is True 3. Apply truncation if config.max_string_length is not None 4. Build immutable ADKTrajectory

Empty events list is valid and returns empty trajectory.

Source code in src/gepa_adk/utils/events.py
def extract_trajectory(
    events: list[Any],
    final_output: str = "",
    error: str | None = None,
    config: TrajectoryConfig | None = None,
) -> ADKTrajectory:
    """Extract trajectory from ADK execution events with optional processing.

    Extracts tool calls, state deltas, and token usage from ADK event stream,
    applying redaction and truncation based on configuration.

    Args:
        events: List of ADK Event objects from agent execution.
        final_output: Final text response from the agent. Defaults to empty string.
        error: Error message if execution failed. Defaults to None.
        config: Extraction configuration. If None, uses TrajectoryConfig defaults.

    Returns:
        ADKTrajectory with extracted and processed data according to config.

    Examples:
        Basic extraction with defaults:

        ```python
        from google.adk.events import Event

        events = [...]  # From ADK runner
        trajectory = extract_trajectory(events, final_output="Response")
        ```

        Custom configuration:

        ```python
        config = TrajectoryConfig(
            include_tool_calls=True,
            include_state_deltas=False,
            redact_sensitive=True,
            max_string_length=5000,
        )
        trajectory = extract_trajectory(events, config=config)
        ```

        With error:

        ```python
        trajectory = extract_trajectory(
            events=[],
            final_output="",
            error="Agent timeout after 30s",
        )
        ```

    Note:
        Extraction follows this order:
        1. Extract raw data from events (tool calls, state, tokens)
        2. Apply redaction if config.redact_sensitive is True
        3. Apply truncation if config.max_string_length is not None
        4. Build immutable ADKTrajectory

        Empty events list is valid and returns empty trajectory.
    """
    if config is None:
        config = TrajectoryConfig()

    logger.debug(
        "trajectory.extraction.start",
        event_count=len(events),
        config_include_tool_calls=config.include_tool_calls,
        config_include_state_deltas=config.include_state_deltas,
        config_include_token_usage=config.include_token_usage,
        config_redact_sensitive=config.redact_sensitive,
        config_max_string_length=config.max_string_length,
    )

    # Step 1: Extract raw data from events
    tool_calls_list: list[ToolCallRecord] = []
    if config.include_tool_calls:
        tool_calls_list = list(_extract_tool_calls(events))

    state_deltas_list: list[dict[str, Any]] = []
    if config.include_state_deltas:
        state_deltas_list = list(_extract_state_deltas(events))

    token_usage = None
    if config.include_token_usage:
        token_usage = _extract_token_usage(events)

    # Step 2: Apply redaction if configured
    if config.redact_sensitive and config.sensitive_keys:
        # Redact tool call arguments and results
        redacted_tool_calls = []
        for tc in tool_calls_list:
            redacted_tool_calls.append(
                ToolCallRecord(
                    name=tc.name,
                    arguments=_redact_sensitive(tc.arguments, config.sensitive_keys),
                    result=_redact_sensitive(tc.result, config.sensitive_keys),
                    timestamp=tc.timestamp,
                )
            )
        tool_calls_list = redacted_tool_calls

        # Redact state deltas
        state_deltas_list = [
            _redact_sensitive(delta, config.sensitive_keys)
            for delta in state_deltas_list
        ]

    # Step 3: Apply truncation if configured
    if config.max_string_length is not None:
        # Truncate tool call arguments and results
        truncated_tool_calls = []
        for tc in tool_calls_list:
            truncated_tool_calls.append(
                ToolCallRecord(
                    name=tc.name,
                    arguments=_truncate_strings(tc.arguments, config.max_string_length),
                    result=_truncate_strings(tc.result, config.max_string_length),
                    timestamp=tc.timestamp,
                )
            )
        tool_calls_list = truncated_tool_calls

        # Truncate state deltas
        state_deltas_list = [
            _truncate_strings(delta, config.max_string_length)
            for delta in state_deltas_list
        ]

    # Step 4: Build immutable trajectory
    logger.debug(
        "trajectory.extraction.complete",
        tool_calls_count=len(tool_calls_list),
        state_deltas_count=len(state_deltas_list),
        has_token_usage=token_usage is not None,
        has_error=error is not None,
    )

    return ADKTrajectory(
        tool_calls=tuple(tool_calls_list),
        state_deltas=tuple(state_deltas_list),
        token_usage=token_usage,
        final_output=final_output,
        error=error,
    )

deserialize_schema

deserialize_schema(schema_text: str) -> type[BaseModel]

Deserialize validated schema text to a Pydantic model class.

Convenience function that calls validate_schema_text and returns only the schema class. Use this when you only need the class and don't need the validation metadata.

PARAMETER DESCRIPTION
schema_text

Python source code defining a Pydantic model.

TYPE: str

RETURNS DESCRIPTION
type[BaseModel]

The deserialized Pydantic BaseModel subclass.

RAISES DESCRIPTION
SchemaValidationError

If validation fails.

Examples:

Deserialize schema text and create an instance:

from gepa_adk.utils.schema_utils import deserialize_schema

schema_text = '''
class EvolvedSchema(BaseModel):
    result: str
    confidence: float = Field(ge=0.0, le=1.0)
'''

EvolvedSchema = deserialize_schema(schema_text)
instance = EvolvedSchema(result="success", confidence=0.95)
Source code in src/gepa_adk/utils/schema_utils.py
def deserialize_schema(schema_text: str) -> type[BaseModel]:
    """Deserialize validated schema text to a Pydantic model class.

    Convenience function that calls validate_schema_text and returns only
    the schema class. Use this when you only need the class and don't need
    the validation metadata.

    Args:
        schema_text (str): Python source code defining a Pydantic model.

    Returns:
        The deserialized Pydantic BaseModel subclass.

    Raises:
        SchemaValidationError: If validation fails.

    Examples:
        Deserialize schema text and create an instance:

        ```python
        from gepa_adk.utils.schema_utils import deserialize_schema

        schema_text = '''
        class EvolvedSchema(BaseModel):
            result: str
            confidence: float = Field(ge=0.0, le=1.0)
        '''

        EvolvedSchema = deserialize_schema(schema_text)
        instance = EvolvedSchema(result="success", confidence=0.95)
        ```
    """
    logger.debug(
        "schema.deserialize.start",
        text_length=len(schema_text),
    )

    result = validate_schema_text(schema_text)

    logger.debug(
        "schema.deserialize.complete",
        class_name=result.class_name,
    )

    return result.schema_class

serialize_pydantic_schema

serialize_pydantic_schema(
    schema_class: type[BaseModel],
) -> str

Serialize a Pydantic model class to Python source code.

Uses inspect.getsource() to retrieve the original source code definition of the schema class. This preserves Field() constraints, defaults, and docstrings.

PARAMETER DESCRIPTION
schema_class

The Pydantic BaseModel subclass to serialize.

TYPE: type[BaseModel]

RETURNS DESCRIPTION
str

Python source code string defining the class.

RAISES DESCRIPTION
TypeError

If schema_class is not a BaseModel subclass or is an instance.

OSError

If source code cannot be retrieved (e.g., dynamic class).

Examples:

Serialize a schema to source code:

from pydantic import BaseModel, Field
from gepa_adk.utils.schema_utils import serialize_pydantic_schema


class MySchema(BaseModel):
    name: str
    value: int = Field(ge=0)


text = serialize_pydantic_schema(MySchema)
# text contains the Python source code for MySchema
Source code in src/gepa_adk/utils/schema_utils.py
def serialize_pydantic_schema(schema_class: type[BaseModel]) -> str:
    """Serialize a Pydantic model class to Python source code.

    Uses inspect.getsource() to retrieve the original source code definition
    of the schema class. This preserves Field() constraints, defaults, and
    docstrings.

    Args:
        schema_class (type[BaseModel]): The Pydantic BaseModel subclass to serialize.

    Returns:
        Python source code string defining the class.

    Raises:
        TypeError: If schema_class is not a BaseModel subclass or is an instance.
        OSError: If source code cannot be retrieved (e.g., dynamic class).

    Examples:
        Serialize a schema to source code:

        ```python
        from pydantic import BaseModel, Field
        from gepa_adk.utils.schema_utils import serialize_pydantic_schema


        class MySchema(BaseModel):
            name: str
            value: int = Field(ge=0)


        text = serialize_pydantic_schema(MySchema)
        # text contains the Python source code for MySchema
        ```
    """
    # Type validation
    if not isinstance(schema_class, type):
        raise TypeError(
            f"Expected a class, got {type(schema_class).__name__} instance. "
            "Pass the class itself, not an instance."
        )

    if not issubclass(schema_class, BaseModel):
        raise TypeError(
            f"Expected a BaseModel subclass, got {schema_class.__name__}. "
            "Schema class must inherit from pydantic.BaseModel."
        )

    logger.debug(
        "schema.serialize.start",
        class_name=schema_class.__name__,
        field_count=len(schema_class.model_fields),
    )

    try:
        source = inspect.getsource(schema_class)
    except OSError as e:
        logger.warning(
            "schema.serialize.failed",
            class_name=schema_class.__name__,
            error=str(e),
        )
        raise

    logger.debug(
        "schema.serialize.complete",
        class_name=schema_class.__name__,
        source_length=len(source),
    )

    return source

validate_schema_text

validate_schema_text(
    schema_text: str,
    *,
    allowed_namespace: dict[str, Any] | None = None,
) -> SchemaValidationResult

Validate schema text and return the deserialized class.

Performs three-stage validation: 1. Preprocessing: Strip markdown code fences if present 2. Syntax: Parse Python syntax with ast.parse() 3. Structure: Check for security violations and BaseModel inheritance 4. Execution: Execute in controlled namespace and verify class

PARAMETER DESCRIPTION
schema_text

Python source code defining a Pydantic model. May be wrapped in markdown code fences (```python...```).

TYPE: str

PARAMETER DESCRIPTION
allowed_namespace

Override default namespace. If None, uses SCHEMA_NAMESPACE.

TYPE: dict[str, Any] | None

RETURNS DESCRIPTION
SchemaValidationResult

SchemaValidationResult with the deserialized class and metadata.

RAISES DESCRIPTION
SchemaValidationError

If validation fails at any stage.

Examples:

Validate schema text and inspect results:

from gepa_adk.utils.schema_utils import validate_schema_text

schema_text = '''
class MySchema(BaseModel):
    name: str
    value: int
'''

result = validate_schema_text(schema_text)
assert result.class_name == "MySchema"
assert result.field_names == ("name", "value")
Source code in src/gepa_adk/utils/schema_utils.py
def validate_schema_text(
    schema_text: str,
    *,
    allowed_namespace: dict[str, Any] | None = None,
) -> SchemaValidationResult:
    """Validate schema text and return the deserialized class.

    Performs three-stage validation:
    1. **Preprocessing**: Strip markdown code fences if present
    2. **Syntax**: Parse Python syntax with ast.parse()
    3. **Structure**: Check for security violations and BaseModel inheritance
    4. **Execution**: Execute in controlled namespace and verify class

    Args:
        schema_text (str): Python source code defining a Pydantic model.
            May be wrapped in markdown code fences (` ```python...``` `).

    Other Parameters:
        allowed_namespace (dict[str, Any] | None): Override default namespace.
            If None, uses SCHEMA_NAMESPACE.

    Returns:
        SchemaValidationResult with the deserialized class and metadata.

    Raises:
        SchemaValidationError: If validation fails at any stage.

    Examples:
        Validate schema text and inspect results:

        ```python
        from gepa_adk.utils.schema_utils import validate_schema_text

        schema_text = '''
        class MySchema(BaseModel):
            name: str
            value: int
        '''

        result = validate_schema_text(schema_text)
        assert result.class_name == "MySchema"
        assert result.field_names == ("name", "value")
        ```
    """
    # Preprocess: strip markdown fences if present
    schema_text = _strip_markdown_fences(schema_text)

    logger.debug(
        "schema.validate.start",
        text_length=len(schema_text),
    )

    # Stage 1: Syntax validation
    tree = _validate_syntax(schema_text)

    # Stage 2: Structure validation
    class_name = _validate_structure(tree, schema_text)

    # Stage 3: Execution
    namespace = allowed_namespace if allowed_namespace is not None else SCHEMA_NAMESPACE
    schema_class = _execute_schema(schema_text, class_name, namespace)

    # Build result
    field_names = tuple(schema_class.model_fields.keys())
    result = SchemaValidationResult(
        schema_class=schema_class,
        class_name=class_name,
        field_count=len(field_names),
        field_names=field_names,
    )

    logger.debug(
        "schema.validate.complete",
        class_name=class_name,
        field_count=result.field_count,
        field_names=list(field_names),
    )

    return result