Skip to content

Events

events

Trajectory extraction and processing utilities.

This module provides functions for extracting, redacting, and truncating trajectory data from ADK agent execution events.

The main entry point is extract_trajectory, which orchestrates a three-stage pipeline:

  1. Extract raw data (tool calls, state deltas, token usage)
  2. Apply redaction to sensitive fields
  3. Apply truncation to oversized strings

All functions except extract_trajectory are private helpers prefixed with underscore. The extraction logic is designed to gracefully handle both real ADK Event objects and test mocks.

ATTRIBUTE DESCRIPTION
extract_trajectory

Main trajectory extraction API with configurable redaction and truncation.

TYPE: function

Exported Functions
See Also
Note

These utilities handle infrastructure concerns like data transformation and security (redaction), not domain logic. They consume domain models but don't define them.

extract_final_output

extract_final_output(
    events: list[Any], *, prefer_concatenated: bool = False
) -> str

Extract final output text from ADK event stream.

Extracts the final response text from ADK events, handling both event.actions.response_content (preferred) and event.content.parts (fallback) response sources. Filters out reasoning/thought content marked with part.thought=True.

PARAMETER DESCRIPTION
events

List of ADK Event objects from agent execution.

TYPE: list[Any]

prefer_concatenated

If True, concatenate all non-thought text parts from all final response events. If False (default), return only the last non-thought text part from the last final response event.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
str

Extracted output text as a string. Returns empty string if no valid

str

output can be extracted (empty events, no final responses, all thought

str

parts, or missing attributes).

Examples:

Basic extraction (default mode - returns LAST final response):

events = await runner.run_async(...)
output = extract_final_output(events)

Streaming/concatenation mode for CriticScorer:

events = await runner.run_async(...)
output = extract_final_output(events, prefer_concatenated=True)
Note

Scans events for is_final_response()=True, filters thought parts, skips empty/None text, and handles missing attributes gracefully. Response source priority: response_content > content.parts. Default mode returns LAST final response (for multi-agent pipelines).

Source code in src/gepa_adk/utils/events.py
def extract_final_output(
    events: list[Any],
    *,
    prefer_concatenated: bool = False,
) -> str:
    """Extract final output text from ADK event stream.

    Extracts the final response text from ADK events, handling both
    `event.actions.response_content` (preferred) and `event.content.parts`
    (fallback) response sources. Filters out reasoning/thought content
    marked with `part.thought=True`.

    Args:
        events: List of ADK Event objects from agent execution.
        prefer_concatenated: If True, concatenate all non-thought text parts
            from all final response events. If False (default), return only
            the last non-thought text part from the last final response event.

    Returns:
        Extracted output text as a string. Returns empty string if no valid
        output can be extracted (empty events, no final responses, all thought
        parts, or missing attributes).

    Examples:
        Basic extraction (default mode - returns LAST final response):

        ```python
        events = await runner.run_async(...)
        output = extract_final_output(events)
        ```

        Streaming/concatenation mode for CriticScorer:

        ```python
        events = await runner.run_async(...)
        output = extract_final_output(events, prefer_concatenated=True)
        ```

    Note:
        Scans events for is_final_response()=True, filters thought parts,
        skips empty/None text, and handles missing attributes gracefully.
        Response source priority: response_content > content.parts. Default
        mode returns LAST final response (for multi-agent pipelines).
    """
    if not events:
        return ""

    collected_parts: list[str] = []
    last_output: str = ""

    for event in events:
        # Skip non-final events
        if not hasattr(event, "is_final_response") or not event.is_final_response():
            continue

        # Try to extract text from this event
        event_text = _extract_text_from_event(event)

        if event_text:
            if prefer_concatenated:
                collected_parts.append(event_text)
            else:
                # Default mode: keep updating to get LAST final response
                last_output = event_text

    # For concatenated mode, join all collected parts
    if prefer_concatenated:
        return "".join(collected_parts)

    return last_output

extract_trajectory

extract_trajectory(
    events: list[Any],
    final_output: str = "",
    error: str | None = None,
    config: TrajectoryConfig | None = None,
) -> ADKTrajectory

Extract trajectory from ADK execution events with optional processing.

Extracts tool calls, state deltas, and token usage from ADK event stream, applying redaction and truncation based on configuration.

PARAMETER DESCRIPTION
events

List of ADK Event objects from agent execution.

TYPE: list[Any]

final_output

Final text response from the agent. Defaults to empty string.

TYPE: str DEFAULT: ''

error

Error message if execution failed. Defaults to None.

TYPE: str | None DEFAULT: None

config

Extraction configuration. If None, uses TrajectoryConfig defaults.

TYPE: TrajectoryConfig | None DEFAULT: None

RETURNS DESCRIPTION
ADKTrajectory

ADKTrajectory with extracted and processed data according to config.

Examples:

Basic extraction with defaults:

from google.adk.events import Event

events = [...]  # From ADK runner
trajectory = extract_trajectory(events, final_output="Response")

Custom configuration:

config = TrajectoryConfig(
    include_tool_calls=True,
    include_state_deltas=False,
    redact_sensitive=True,
    max_string_length=5000,
)
trajectory = extract_trajectory(events, config=config)

With error:

trajectory = extract_trajectory(
    events=[],
    final_output="",
    error="Agent timeout after 30s",
)
Note

Extraction follows this order: 1. Extract raw data from events (tool calls, state, tokens) 2. Apply redaction if config.redact_sensitive is True 3. Apply truncation if config.max_string_length is not None 4. Build immutable ADKTrajectory

Empty events list is valid and returns empty trajectory.

Source code in src/gepa_adk/utils/events.py
def extract_trajectory(
    events: list[Any],
    final_output: str = "",
    error: str | None = None,
    config: TrajectoryConfig | None = None,
) -> ADKTrajectory:
    """Extract trajectory from ADK execution events with optional processing.

    Extracts tool calls, state deltas, and token usage from ADK event stream,
    applying redaction and truncation based on configuration.

    Args:
        events: List of ADK Event objects from agent execution.
        final_output: Final text response from the agent. Defaults to empty string.
        error: Error message if execution failed. Defaults to None.
        config: Extraction configuration. If None, uses TrajectoryConfig defaults.

    Returns:
        ADKTrajectory with extracted and processed data according to config.

    Examples:
        Basic extraction with defaults:

        ```python
        from google.adk.events import Event

        events = [...]  # From ADK runner
        trajectory = extract_trajectory(events, final_output="Response")
        ```

        Custom configuration:

        ```python
        config = TrajectoryConfig(
            include_tool_calls=True,
            include_state_deltas=False,
            redact_sensitive=True,
            max_string_length=5000,
        )
        trajectory = extract_trajectory(events, config=config)
        ```

        With error:

        ```python
        trajectory = extract_trajectory(
            events=[],
            final_output="",
            error="Agent timeout after 30s",
        )
        ```

    Note:
        Extraction follows this order:
        1. Extract raw data from events (tool calls, state, tokens)
        2. Apply redaction if config.redact_sensitive is True
        3. Apply truncation if config.max_string_length is not None
        4. Build immutable ADKTrajectory

        Empty events list is valid and returns empty trajectory.
    """
    if config is None:
        config = TrajectoryConfig()

    logger.debug(
        "trajectory.extraction.start",
        event_count=len(events),
        config_include_tool_calls=config.include_tool_calls,
        config_include_state_deltas=config.include_state_deltas,
        config_include_token_usage=config.include_token_usage,
        config_redact_sensitive=config.redact_sensitive,
        config_max_string_length=config.max_string_length,
    )

    # Step 1: Extract raw data from events
    tool_calls_list: list[ToolCallRecord] = []
    if config.include_tool_calls:
        tool_calls_list = list(_extract_tool_calls(events))

    state_deltas_list: list[dict[str, Any]] = []
    if config.include_state_deltas:
        state_deltas_list = list(_extract_state_deltas(events))

    token_usage = None
    if config.include_token_usage:
        token_usage = _extract_token_usage(events)

    # Step 2: Apply redaction if configured
    if config.redact_sensitive and config.sensitive_keys:
        # Redact tool call arguments and results
        redacted_tool_calls = []
        for tc in tool_calls_list:
            redacted_tool_calls.append(
                ToolCallRecord(
                    name=tc.name,
                    arguments=_redact_sensitive(tc.arguments, config.sensitive_keys),
                    result=_redact_sensitive(tc.result, config.sensitive_keys),
                    timestamp=tc.timestamp,
                )
            )
        tool_calls_list = redacted_tool_calls

        # Redact state deltas
        state_deltas_list = [
            _redact_sensitive(delta, config.sensitive_keys)
            for delta in state_deltas_list
        ]

    # Step 3: Apply truncation if configured
    if config.max_string_length is not None:
        # Truncate tool call arguments and results
        truncated_tool_calls = []
        for tc in tool_calls_list:
            truncated_tool_calls.append(
                ToolCallRecord(
                    name=tc.name,
                    arguments=_truncate_strings(tc.arguments, config.max_string_length),
                    result=_truncate_strings(tc.result, config.max_string_length),
                    timestamp=tc.timestamp,
                )
            )
        tool_calls_list = truncated_tool_calls

        # Truncate state deltas
        state_deltas_list = [
            _truncate_strings(delta, config.max_string_length)
            for delta in state_deltas_list
        ]

    # Step 4: Build immutable trajectory
    logger.debug(
        "trajectory.extraction.complete",
        tool_calls_count=len(tool_calls_list),
        state_deltas_count=len(state_deltas_list),
        has_token_usage=token_usage is not None,
        has_error=error is not None,
    )

    return ADKTrajectory(
        tool_calls=tuple(tool_calls_list),
        state_deltas=tuple(state_deltas_list),
        token_usage=token_usage,
        final_output=final_output,
        error=error,
    )

extract_output_from_state

extract_output_from_state(
    session_state: dict[str, Any], output_key: str | None
) -> str | None

Extract output from session state using output_key.

A shared utility for extracting agent output from ADK session state using the output_key mechanism. Complements extract_final_output() for use when agents have output_key configured.

PARAMETER DESCRIPTION
session_state

ADK session state dictionary.

TYPE: dict[str, Any]

output_key

Key where agent stored its output, or None.

TYPE: str | None

RETURNS DESCRIPTION
str | None

Output string if found in state, None otherwise.

str | None

Caller should implement fallback logic when None is returned.

Examples:

Basic extraction:

state = {"proposed_component_text": "Be helpful and concise"}
result = extract_output_from_state(state, "proposed_component_text")
# result == "Be helpful and concise"

Missing key returns None:

state = {"other_key": "value"}
result = extract_output_from_state(state, "proposed_component_text")
# result is None

None output_key returns None:

state = {"proposed_component_text": "text"}
result = extract_output_from_state(state, None)
# result is None
Note

State-based extraction complements event-based extraction for ADK's output_key mechanism. Callers should implement fallback logic (e.g., extract_final_output) when this function returns None.

Source code in src/gepa_adk/utils/events.py
def extract_output_from_state(
    session_state: dict[str, Any],
    output_key: str | None,
) -> str | None:
    """Extract output from session state using output_key.

    A shared utility for extracting agent output from ADK session state using
    the output_key mechanism. Complements extract_final_output() for use when
    agents have output_key configured.

    Args:
        session_state: ADK session state dictionary.
        output_key: Key where agent stored its output, or None.

    Returns:
        Output string if found in state, None otherwise.
        Caller should implement fallback logic when None is returned.

    Examples:
        Basic extraction:

        ```python
        state = {"proposed_component_text": "Be helpful and concise"}
        result = extract_output_from_state(state, "proposed_component_text")
        # result == "Be helpful and concise"
        ```

        Missing key returns None:

        ```python
        state = {"other_key": "value"}
        result = extract_output_from_state(state, "proposed_component_text")
        # result is None
        ```

        None output_key returns None:

        ```python
        state = {"proposed_component_text": "text"}
        result = extract_output_from_state(state, None)
        # result is None
        ```

    Note:
        State-based extraction complements event-based extraction for ADK's
        output_key mechanism. Callers should implement fallback logic
        (e.g., extract_final_output) when this function returns None.
    """
    if not output_key:
        return None
    if output_key in session_state:
        value = session_state[output_key]
        if value is not None:
            return str(value)
    return None

partition_events_by_agent

partition_events_by_agent(
    events: list[Any],
) -> dict[str, list[Any]]

Partition ADK events by their originating agent.

Separates a mixed event stream (e.g., from SequentialAgent) into per-agent event lists based on the event.author field. Each agent's events can then be processed independently for trajectory extraction.

PARAMETER DESCRIPTION
events

List of ADK Event objects from multi-agent execution. Each event should have an author attribute identifying the agent that generated it.

TYPE: list[Any]

RETURNS DESCRIPTION
dict[str, list[Any]]

Dictionary mapping agent names to their respective event lists.

dict[str, list[Any]]

Events with author='user' or missing author are excluded.

dict[str, list[Any]]

Empty dict returned if no agent events found.

Examples:

Basic partitioning from SequentialAgent:

events = await runner.run_async(sequential_agent, ...)
partitions = partition_events_by_agent(events)
# partitions == {
#     "generator": [Event(...), Event(...)],
#     "critic": [Event(...), Event(...)],
# }

Building per-agent trajectories:

partitions = partition_events_by_agent(events)
trajectories = {}
for agent_name, agent_events in partitions.items():
    trajectories[agent_name] = extract_trajectory(agent_events)
Note

Sorts events into agent-specific lists by examining event.author. User events are excluded since they represent input, not agent output.

Source code in src/gepa_adk/utils/events.py
def partition_events_by_agent(events: list[Any]) -> dict[str, list[Any]]:
    """Partition ADK events by their originating agent.

    Separates a mixed event stream (e.g., from SequentialAgent) into
    per-agent event lists based on the `event.author` field. Each agent's
    events can then be processed independently for trajectory extraction.

    Args:
        events: List of ADK Event objects from multi-agent execution.
            Each event should have an `author` attribute identifying
            the agent that generated it.

    Returns:
        Dictionary mapping agent names to their respective event lists.
        Events with author='user' or missing author are excluded.
        Empty dict returned if no agent events found.

    Examples:
        Basic partitioning from SequentialAgent:

        ```python
        events = await runner.run_async(sequential_agent, ...)
        partitions = partition_events_by_agent(events)
        # partitions == {
        #     "generator": [Event(...), Event(...)],
        #     "critic": [Event(...), Event(...)],
        # }
        ```

        Building per-agent trajectories:

        ```python
        partitions = partition_events_by_agent(events)
        trajectories = {}
        for agent_name, agent_events in partitions.items():
            trajectories[agent_name] = extract_trajectory(agent_events)
        ```

    Note:
        Sorts events into agent-specific lists by examining `event.author`.
        User events are excluded since they represent input, not agent output.
    """
    partitions: dict[str, list[Any]] = {}

    for event in events:
        author = getattr(event, "author", None)
        # Skip user events and events without author
        if not author or author == "user":
            continue

        if author not in partitions:
            partitions[author] = []
        partitions[author].append(event)

    return partitions