Skip to content

Stoppers

stoppers

Stoppers for evolution termination conditions.

This subpackage provides concrete stopper implementations that terminate evolution based on various conditions like timeout, iterations, score, or external signals.

ATTRIBUTE DESCRIPTION
CompositeStopper

Combine multiple stoppers with AND/OR logic.

TYPE: class

FileStopper

Stop evolution when a specified file exists.

TYPE: class

MaxEvaluationsStopper

Stop evolution after maximum evaluations.

TYPE: class

ScoreThresholdStopper

Stop evolution when best score reaches threshold.

TYPE: class

SignalStopper

Stop evolution on Unix signals (SIGINT, SIGTERM).

TYPE: class

TimeoutStopper

Stop evolution after a specified timeout.

TYPE: class

Examples:

Using a timeout stopper:

from gepa_adk.adapters.stoppers import TimeoutStopper

stopper = TimeoutStopper(300.0)  # Stop after 5 minutes

Using a score threshold stopper:

from gepa_adk.adapters.stoppers import ScoreThresholdStopper

stopper = ScoreThresholdStopper(0.95)  # Stop at 95% accuracy

Using a signal stopper:

from gepa_adk.adapters.stoppers import SignalStopper

async with SignalStopper() as stopper:
    # Ctrl+C will gracefully stop evolution
    config = EvolutionConfig(stop_callbacks=[stopper])
    result = await engine.run(config)

Combining multiple stoppers:

from gepa_adk.adapters.stoppers import (
    CompositeStopper,
    ScoreThresholdStopper,
    TimeoutStopper,
)

# Stop after 5 minutes OR when score >= 0.95
composite = CompositeStopper(
    [TimeoutStopper(300), ScoreThresholdStopper(0.95)],
    mode="any",
)
See Also
Note

This subpackage contains adapters layer implementations. All stoppers implement the StopperProtocol from the ports layer.

CompositeStopper

Combine multiple stoppers with AND/OR logic.

A meta-stopper that composes multiple stoppers into a single stopping condition with configurable logic. Use mode='any' for OR semantics (stop if any stopper fires) or mode='all' for AND semantics (stop only if all stoppers fire).

ATTRIBUTE DESCRIPTION
stoppers

The sequence of stoppers to combine.

TYPE: list[StopperProtocol]

mode

Combination mode - 'any' for OR, 'all' for AND.

TYPE: Literal['any', 'all']

Examples:

Stop after 5 minutes OR when score reaches 95%:

from gepa_adk.adapters.stoppers import (
    CompositeStopper,
    ScoreThresholdStopper,
    TimeoutStopper,
)
from gepa_adk.domain.stopper import StopperState

composite = CompositeStopper(
    [TimeoutStopper(300), ScoreThresholdStopper(0.95)],
    mode="any",
)

state = StopperState(
    iteration=10,
    best_score=0.97,  # Exceeds threshold
    stagnation_counter=2,
    total_evaluations=50,
    candidates_count=3,
    elapsed_seconds=120.0,  # Below timeout
)

composite(state)  # Returns True (score threshold met)

Stop only when minimum time AND score threshold are both met:

composite = CompositeStopper(
    [TimeoutStopper(60), ScoreThresholdStopper(0.8)],
    mode="all",
)
Note

A composite stopper can contain other composite stoppers for arbitrarily complex stopping conditions like "(A OR B) AND (C OR D)".

Source code in src/gepa_adk/adapters/stoppers/composite.py
class CompositeStopper:
    """Combine multiple stoppers with AND/OR logic.

    A meta-stopper that composes multiple stoppers into a single stopping
    condition with configurable logic. Use mode='any' for OR semantics
    (stop if any stopper fires) or mode='all' for AND semantics (stop
    only if all stoppers fire).

    Attributes:
        stoppers (list[StopperProtocol]): The sequence of stoppers to combine.
        mode (Literal["any", "all"]): Combination mode - 'any' for OR, 'all' for AND.

    Examples:
        Stop after 5 minutes OR when score reaches 95%:

        ```python
        from gepa_adk.adapters.stoppers import (
            CompositeStopper,
            ScoreThresholdStopper,
            TimeoutStopper,
        )
        from gepa_adk.domain.stopper import StopperState

        composite = CompositeStopper(
            [TimeoutStopper(300), ScoreThresholdStopper(0.95)],
            mode="any",
        )

        state = StopperState(
            iteration=10,
            best_score=0.97,  # Exceeds threshold
            stagnation_counter=2,
            total_evaluations=50,
            candidates_count=3,
            elapsed_seconds=120.0,  # Below timeout
        )

        composite(state)  # Returns True (score threshold met)
        ```

        Stop only when minimum time AND score threshold are both met:

        ```python
        composite = CompositeStopper(
            [TimeoutStopper(60), ScoreThresholdStopper(0.8)],
            mode="all",
        )
        ```

    Note:
        A composite stopper can contain other composite stoppers for arbitrarily
        complex stopping conditions like "(A OR B) AND (C OR D)".
    """

    def __init__(
        self,
        stoppers: Sequence[StopperProtocol],
        mode: Literal["any", "all"] = "any",
    ) -> None:
        """Initialize composite stopper with child stoppers and mode.

        Args:
            stoppers: Sequence of stoppers to combine. Must contain at least
                one stopper. Each stopper must implement StopperProtocol.
            mode: Combination logic - 'any' (OR) or 'all' (AND). Defaults
                to 'any'.

        Raises:
            ValueError: If stoppers sequence is empty.
            ValueError: If mode is not 'any' or 'all'.

        Examples:
            ```python
            # OR logic - stop if either condition met
            composite = CompositeStopper(
                [TimeoutStopper(300), ScoreThresholdStopper(0.95)],
                mode="any",
            )

            # AND logic - stop only if both conditions met
            composite = CompositeStopper(
                [TimeoutStopper(60), ScoreThresholdStopper(0.8)],
                mode="all",
            )
            ```

        Note:
            Consider using 'any' mode for fail-safe conditions (timeout OR
            resource limit) and 'all' mode for minimum requirements.
        """
        if not stoppers:
            raise ValueError("At least one stopper required")
        if mode not in ("any", "all"):
            raise ValueError(f"mode must be 'any' or 'all', got {mode!r}")

        self.stoppers: list[StopperProtocol] = list(stoppers)
        self.mode: Literal["any", "all"] = mode

    def __call__(self, state: StopperState) -> bool:
        """Check if evolution should stop based on combined stopper logic.

        Args:
            state: Current evolution state snapshot passed to all child stoppers.

        Returns:
            For mode='any': True if any child stopper returns True.
            For mode='all': True only if all child stoppers return True.

        Examples:
            ```python
            composite = CompositeStopper(
                [TimeoutStopper(60), ScoreThresholdStopper(0.9)],
                mode="any",
            )

            # Below both thresholds
            state1 = StopperState(
                iteration=5,
                best_score=0.5,
                stagnation_counter=0,
                total_evaluations=25,
                candidates_count=1,
                elapsed_seconds=30.0,
            )
            composite(state1)  # False

            # Score threshold met
            state2 = StopperState(
                iteration=10,
                best_score=0.95,
                stagnation_counter=1,
                total_evaluations=50,
                candidates_count=2,
                elapsed_seconds=45.0,
            )
            composite(state2)  # True (any mode - score threshold met)
            ```

        Note:
            Often called after each iteration. For 'any' mode, evaluation
            short-circuits on first True. For 'all' mode, short-circuits on
            first False.
        """
        if self.mode == "any":
            return any(stopper(state) for stopper in self.stoppers)
        # mode == "all"
        return all(stopper(state) for stopper in self.stoppers)

    def __repr__(self) -> str:
        """Return string representation of the composite stopper.

        Returns:
            String showing the stoppers list and mode.

        Examples:
            ```python
            composite = CompositeStopper(
                [TimeoutStopper(60), ScoreThresholdStopper(0.9)],
                mode="any",
            )
            repr(composite)
            # "CompositeStopper([TimeoutStopper(...), ScoreThresholdStopper(...)], mode='any')"
            ```
        """
        return f"CompositeStopper({self.stoppers!r}, mode={self.mode!r})"

__init__

__init__(
    stoppers: Sequence[StopperProtocol],
    mode: Literal["any", "all"] = "any",
) -> None

Initialize composite stopper with child stoppers and mode.

PARAMETER DESCRIPTION
stoppers

Sequence of stoppers to combine. Must contain at least one stopper. Each stopper must implement StopperProtocol.

TYPE: Sequence[StopperProtocol]

mode

Combination logic - 'any' (OR) or 'all' (AND). Defaults to 'any'.

TYPE: Literal['any', 'all'] DEFAULT: 'any'

RAISES DESCRIPTION
ValueError

If stoppers sequence is empty.

ValueError

If mode is not 'any' or 'all'.

Examples:

# OR logic - stop if either condition met
composite = CompositeStopper(
    [TimeoutStopper(300), ScoreThresholdStopper(0.95)],
    mode="any",
)

# AND logic - stop only if both conditions met
composite = CompositeStopper(
    [TimeoutStopper(60), ScoreThresholdStopper(0.8)],
    mode="all",
)
Note

Consider using 'any' mode for fail-safe conditions (timeout OR resource limit) and 'all' mode for minimum requirements.

Source code in src/gepa_adk/adapters/stoppers/composite.py
def __init__(
    self,
    stoppers: Sequence[StopperProtocol],
    mode: Literal["any", "all"] = "any",
) -> None:
    """Initialize composite stopper with child stoppers and mode.

    Args:
        stoppers: Sequence of stoppers to combine. Must contain at least
            one stopper. Each stopper must implement StopperProtocol.
        mode: Combination logic - 'any' (OR) or 'all' (AND). Defaults
            to 'any'.

    Raises:
        ValueError: If stoppers sequence is empty.
        ValueError: If mode is not 'any' or 'all'.

    Examples:
        ```python
        # OR logic - stop if either condition met
        composite = CompositeStopper(
            [TimeoutStopper(300), ScoreThresholdStopper(0.95)],
            mode="any",
        )

        # AND logic - stop only if both conditions met
        composite = CompositeStopper(
            [TimeoutStopper(60), ScoreThresholdStopper(0.8)],
            mode="all",
        )
        ```

    Note:
        Consider using 'any' mode for fail-safe conditions (timeout OR
        resource limit) and 'all' mode for minimum requirements.
    """
    if not stoppers:
        raise ValueError("At least one stopper required")
    if mode not in ("any", "all"):
        raise ValueError(f"mode must be 'any' or 'all', got {mode!r}")

    self.stoppers: list[StopperProtocol] = list(stoppers)
    self.mode: Literal["any", "all"] = mode

__call__

__call__(state: StopperState) -> bool

Check if evolution should stop based on combined stopper logic.

PARAMETER DESCRIPTION
state

Current evolution state snapshot passed to all child stoppers.

TYPE: StopperState

RETURNS DESCRIPTION
bool

For mode='any': True if any child stopper returns True.

bool

For mode='all': True only if all child stoppers return True.

Examples:

composite = CompositeStopper(
    [TimeoutStopper(60), ScoreThresholdStopper(0.9)],
    mode="any",
)

# Below both thresholds
state1 = StopperState(
    iteration=5,
    best_score=0.5,
    stagnation_counter=0,
    total_evaluations=25,
    candidates_count=1,
    elapsed_seconds=30.0,
)
composite(state1)  # False

# Score threshold met
state2 = StopperState(
    iteration=10,
    best_score=0.95,
    stagnation_counter=1,
    total_evaluations=50,
    candidates_count=2,
    elapsed_seconds=45.0,
)
composite(state2)  # True (any mode - score threshold met)
Note

Often called after each iteration. For 'any' mode, evaluation short-circuits on first True. For 'all' mode, short-circuits on first False.

Source code in src/gepa_adk/adapters/stoppers/composite.py
def __call__(self, state: StopperState) -> bool:
    """Check if evolution should stop based on combined stopper logic.

    Args:
        state: Current evolution state snapshot passed to all child stoppers.

    Returns:
        For mode='any': True if any child stopper returns True.
        For mode='all': True only if all child stoppers return True.

    Examples:
        ```python
        composite = CompositeStopper(
            [TimeoutStopper(60), ScoreThresholdStopper(0.9)],
            mode="any",
        )

        # Below both thresholds
        state1 = StopperState(
            iteration=5,
            best_score=0.5,
            stagnation_counter=0,
            total_evaluations=25,
            candidates_count=1,
            elapsed_seconds=30.0,
        )
        composite(state1)  # False

        # Score threshold met
        state2 = StopperState(
            iteration=10,
            best_score=0.95,
            stagnation_counter=1,
            total_evaluations=50,
            candidates_count=2,
            elapsed_seconds=45.0,
        )
        composite(state2)  # True (any mode - score threshold met)
        ```

    Note:
        Often called after each iteration. For 'any' mode, evaluation
        short-circuits on first True. For 'all' mode, short-circuits on
        first False.
    """
    if self.mode == "any":
        return any(stopper(state) for stopper in self.stoppers)
    # mode == "all"
    return all(stopper(state) for stopper in self.stoppers)

__repr__

__repr__() -> str

Return string representation of the composite stopper.

RETURNS DESCRIPTION
str

String showing the stoppers list and mode.

Examples:

composite = CompositeStopper(
    [TimeoutStopper(60), ScoreThresholdStopper(0.9)],
    mode="any",
)
repr(composite)
# "CompositeStopper([TimeoutStopper(...), ScoreThresholdStopper(...)], mode='any')"
Source code in src/gepa_adk/adapters/stoppers/composite.py
def __repr__(self) -> str:
    """Return string representation of the composite stopper.

    Returns:
        String showing the stoppers list and mode.

    Examples:
        ```python
        composite = CompositeStopper(
            [TimeoutStopper(60), ScoreThresholdStopper(0.9)],
            mode="any",
        )
        repr(composite)
        # "CompositeStopper([TimeoutStopper(...), ScoreThresholdStopper(...)], mode='any')"
        ```
    """
    return f"CompositeStopper({self.stoppers!r}, mode={self.mode!r})"

MaxEvaluationsStopper

Stop evolution after maximum number of evaluations.

Useful for controlling API costs when evaluations are expensive. Checks the total_evaluations field from StopperState against the configured limit.

ATTRIBUTE DESCRIPTION
max_evaluations

Maximum number of evaluate() calls allowed.

TYPE: int

Examples:

Stop after 1000 evaluations:

stopper = MaxEvaluationsStopper(1000)

Check if evolution should stop:

state = StopperState(total_evaluations=1000, ...)
stopper(state)  # Returns True
Note

Any evaluation count at or above the limit triggers a stop. This handles the case where batch evaluations cause the count to exceed the exact limit.

Source code in src/gepa_adk/adapters/stoppers/evaluations.py
class MaxEvaluationsStopper:
    """Stop evolution after maximum number of evaluations.

    Useful for controlling API costs when evaluations are expensive.
    Checks the total_evaluations field from StopperState against
    the configured limit.

    Attributes:
        max_evaluations (int): Maximum number of evaluate() calls allowed.

    Examples:
        Stop after 1000 evaluations:

        ```python
        stopper = MaxEvaluationsStopper(1000)
        ```

        Check if evolution should stop:

        ```python
        state = StopperState(total_evaluations=1000, ...)
        stopper(state)  # Returns True
        ```

    Note:
        Any evaluation count at or above the limit triggers a stop. This handles
        the case where batch evaluations cause the count to exceed the exact limit.
    """

    def __init__(self, max_evaluations: int) -> None:
        """Initialize the stopper with maximum evaluation count.

        Args:
            max_evaluations: Maximum number of evaluate() calls allowed.
                Must be a positive integer.

        Raises:
            ValueError: If max_evaluations is not positive.

        Note:
            Configure this value based on your API budget. Each evaluation
            typically corresponds to one model API call.
        """
        if max_evaluations <= 0:
            msg = "max_evaluations must be positive"
            raise ValueError(msg)
        self.max_evaluations = max_evaluations

    def __call__(self, state: StopperState) -> bool:
        """Check if evolution should stop based on evaluation count.

        Args:
            state: Current evolution state snapshot containing the
                total_evaluations count.

        Returns:
            True if total_evaluations >= max_evaluations, False otherwise.

        Note:
            Once this returns True, evolution should terminate to stay
            within the configured evaluation budget.
        """
        return state.total_evaluations >= self.max_evaluations

__init__

__init__(max_evaluations: int) -> None

Initialize the stopper with maximum evaluation count.

PARAMETER DESCRIPTION
max_evaluations

Maximum number of evaluate() calls allowed. Must be a positive integer.

TYPE: int

RAISES DESCRIPTION
ValueError

If max_evaluations is not positive.

Note

Configure this value based on your API budget. Each evaluation typically corresponds to one model API call.

Source code in src/gepa_adk/adapters/stoppers/evaluations.py
def __init__(self, max_evaluations: int) -> None:
    """Initialize the stopper with maximum evaluation count.

    Args:
        max_evaluations: Maximum number of evaluate() calls allowed.
            Must be a positive integer.

    Raises:
        ValueError: If max_evaluations is not positive.

    Note:
        Configure this value based on your API budget. Each evaluation
        typically corresponds to one model API call.
    """
    if max_evaluations <= 0:
        msg = "max_evaluations must be positive"
        raise ValueError(msg)
    self.max_evaluations = max_evaluations

__call__

__call__(state: StopperState) -> bool

Check if evolution should stop based on evaluation count.

PARAMETER DESCRIPTION
state

Current evolution state snapshot containing the total_evaluations count.

TYPE: StopperState

RETURNS DESCRIPTION
bool

True if total_evaluations >= max_evaluations, False otherwise.

Note

Once this returns True, evolution should terminate to stay within the configured evaluation budget.

Source code in src/gepa_adk/adapters/stoppers/evaluations.py
def __call__(self, state: StopperState) -> bool:
    """Check if evolution should stop based on evaluation count.

    Args:
        state: Current evolution state snapshot containing the
            total_evaluations count.

    Returns:
        True if total_evaluations >= max_evaluations, False otherwise.

    Note:
        Once this returns True, evolution should terminate to stay
        within the configured evaluation budget.
    """
    return state.total_evaluations >= self.max_evaluations

FileStopper

Stop evolution when a specified file exists.

Useful for external orchestration where CI/CD pipelines, job schedulers, or monitoring tools can signal graceful termination by creating a file.

ATTRIBUTE DESCRIPTION
stop_file_path

Path to the stop signal file.

TYPE: Path

remove_on_stop

If True, remove the file after triggering stop.

TYPE: bool

Examples:

Stop when file exists:

stopper = FileStopper("/tmp/gepa_stop")

Auto-remove file after triggering:

stopper = FileStopper("/tmp/gepa_stop", remove_on_stop=True)
Note

Any path that doesn't exist simply won't trigger a stop. Invalid paths are handled gracefully without raising errors.

Source code in src/gepa_adk/adapters/stoppers/file.py
class FileStopper:
    """Stop evolution when a specified file exists.

    Useful for external orchestration where CI/CD pipelines, job schedulers,
    or monitoring tools can signal graceful termination by creating a file.

    Attributes:
        stop_file_path (Path): Path to the stop signal file.
        remove_on_stop (bool): If True, remove the file after triggering stop.

    Examples:
        Stop when file exists:

        ```python
        stopper = FileStopper("/tmp/gepa_stop")
        ```

        Auto-remove file after triggering:

        ```python
        stopper = FileStopper("/tmp/gepa_stop", remove_on_stop=True)
        ```

    Note:
        Any path that doesn't exist simply won't trigger a stop. Invalid paths
        are handled gracefully without raising errors.
    """

    def __init__(
        self, stop_file_path: str | Path, remove_on_stop: bool = False
    ) -> None:
        """Initialize the stopper with a stop file path.

        Args:
            stop_file_path: Path to the stop signal file. Can be a string
                or Path object. Will be converted to Path internally.
            remove_on_stop: If True, automatically remove the stop file
                after triggering a stop. Defaults to False.

        Note:
            Configure the path based on your orchestration system. Common
            locations include /tmp/, /var/run/, or project-specific directories.
        """
        self.stop_file_path = Path(stop_file_path)
        self.remove_on_stop = remove_on_stop

    def __call__(self, state: StopperState) -> bool:
        """Check if evolution should stop based on file existence.

        Args:
            state: Current evolution state snapshot (not used for file-based
                stopping, but required by the StopperProtocol).

        Returns:
            True if the stop file exists, False otherwise.

        Note:
            Once this returns True, the stop file may be removed if
            remove_on_stop was enabled. Subsequent calls will return False.
        """
        if self.stop_file_path.exists():
            if self.remove_on_stop:
                self.stop_file_path.unlink(missing_ok=True)
            return True
        return False

    def remove_stop_file(self) -> None:
        """Manually remove the stop file.

        This is idempotent - safe to call even if the file doesn't exist.
        Useful for resetting the stop condition before starting a new
        evolution run.

        Note:
            Only call this when you explicitly want to remove the stop file.
            The file is automatically removed during __call__ if remove_on_stop=True.
        """
        self.stop_file_path.unlink(missing_ok=True)

__init__

__init__(
    stop_file_path: str | Path, remove_on_stop: bool = False
) -> None

Initialize the stopper with a stop file path.

PARAMETER DESCRIPTION
stop_file_path

Path to the stop signal file. Can be a string or Path object. Will be converted to Path internally.

TYPE: str | Path

remove_on_stop

If True, automatically remove the stop file after triggering a stop. Defaults to False.

TYPE: bool DEFAULT: False

Note

Configure the path based on your orchestration system. Common locations include /tmp/, /var/run/, or project-specific directories.

Source code in src/gepa_adk/adapters/stoppers/file.py
def __init__(
    self, stop_file_path: str | Path, remove_on_stop: bool = False
) -> None:
    """Initialize the stopper with a stop file path.

    Args:
        stop_file_path: Path to the stop signal file. Can be a string
            or Path object. Will be converted to Path internally.
        remove_on_stop: If True, automatically remove the stop file
            after triggering a stop. Defaults to False.

    Note:
        Configure the path based on your orchestration system. Common
        locations include /tmp/, /var/run/, or project-specific directories.
    """
    self.stop_file_path = Path(stop_file_path)
    self.remove_on_stop = remove_on_stop

__call__

__call__(state: StopperState) -> bool

Check if evolution should stop based on file existence.

PARAMETER DESCRIPTION
state

Current evolution state snapshot (not used for file-based stopping, but required by the StopperProtocol).

TYPE: StopperState

RETURNS DESCRIPTION
bool

True if the stop file exists, False otherwise.

Note

Once this returns True, the stop file may be removed if remove_on_stop was enabled. Subsequent calls will return False.

Source code in src/gepa_adk/adapters/stoppers/file.py
def __call__(self, state: StopperState) -> bool:
    """Check if evolution should stop based on file existence.

    Args:
        state: Current evolution state snapshot (not used for file-based
            stopping, but required by the StopperProtocol).

    Returns:
        True if the stop file exists, False otherwise.

    Note:
        Once this returns True, the stop file may be removed if
        remove_on_stop was enabled. Subsequent calls will return False.
    """
    if self.stop_file_path.exists():
        if self.remove_on_stop:
            self.stop_file_path.unlink(missing_ok=True)
        return True
    return False

remove_stop_file

remove_stop_file() -> None

Manually remove the stop file.

This is idempotent - safe to call even if the file doesn't exist. Useful for resetting the stop condition before starting a new evolution run.

Note

Only call this when you explicitly want to remove the stop file. The file is automatically removed during call if remove_on_stop=True.

Source code in src/gepa_adk/adapters/stoppers/file.py
def remove_stop_file(self) -> None:
    """Manually remove the stop file.

    This is idempotent - safe to call even if the file doesn't exist.
    Useful for resetting the stop condition before starting a new
    evolution run.

    Note:
        Only call this when you explicitly want to remove the stop file.
        The file is automatically removed during __call__ if remove_on_stop=True.
    """
    self.stop_file_path.unlink(missing_ok=True)

SignalStopper

Stop evolution on Unix signals (SIGINT, SIGTERM).

Handles Ctrl+C (SIGINT) and termination signals gracefully, allowing the current iteration to complete before returning results. Supports both async contexts (using asyncio signal handling) and sync contexts (using traditional signal handlers).

ATTRIBUTE DESCRIPTION
signals

Signals to handle.

TYPE: tuple[Signals, ...]

Examples:

Using as async context manager:

from gepa_adk.adapters.stoppers import SignalStopper

async with SignalStopper() as stopper:
    # stopper.setup() called automatically
    config = EvolutionConfig(stop_callbacks=[stopper])
    result = await engine.run(config)
    # stopper.cleanup() called automatically

Using with custom signals:

import signal

stopper = SignalStopper(signals=[signal.SIGINT])
stopper.setup()
try:
    # Only SIGINT will trigger stop
    pass
finally:
    stopper.cleanup()
Note

Always call cleanup() after evolution completes to restore original signal handlers. The context manager pattern handles this automatically.

Source code in src/gepa_adk/adapters/stoppers/signal.py
class SignalStopper:
    """Stop evolution on Unix signals (SIGINT, SIGTERM).

    Handles Ctrl+C (SIGINT) and termination signals gracefully, allowing the
    current iteration to complete before returning results. Supports both
    async contexts (using asyncio signal handling) and sync contexts (using
    traditional signal handlers).

    Attributes:
        signals (tuple[signal.Signals, ...]): Signals to handle.

    Examples:
        Using as async context manager:

        ```python
        from gepa_adk.adapters.stoppers import SignalStopper

        async with SignalStopper() as stopper:
            # stopper.setup() called automatically
            config = EvolutionConfig(stop_callbacks=[stopper])
            result = await engine.run(config)
            # stopper.cleanup() called automatically
        ```

        Using with custom signals:

        ```python
        import signal

        stopper = SignalStopper(signals=[signal.SIGINT])
        stopper.setup()
        try:
            # Only SIGINT will trigger stop
            pass
        finally:
            stopper.cleanup()
        ```

    Note:
        Always call cleanup() after evolution completes to restore original
        signal handlers. The context manager pattern handles this automatically.
    """

    def __init__(self, signals: Sequence[signal.Signals] | None = None) -> None:
        """Initialize signal stopper with signals to handle.

        Args:
            signals: Signals to handle. Defaults to [SIGINT, SIGTERM] which
                covers Ctrl+C and system termination requests.

        Examples:
            ```python
            # Default signals (SIGINT, SIGTERM)
            stopper = SignalStopper()

            # Custom signals
            stopper = SignalStopper(signals=[signal.SIGINT])
            ```

        Note:
            Call setup() before evolution starts and cleanup() after
            evolution completes to properly manage signal handlers.
        """
        if signals is None:
            self.signals: tuple[signal.Signals, ...] = (
                signal.SIGINT,
                signal.SIGTERM,
            )
        else:
            self.signals = tuple(signals)
        self._stop_requested: bool = False
        self._original_handlers: dict[signal.Signals, _SignalHandler] = {}
        self._loop: asyncio.AbstractEventLoop | None = None

    def setup(self) -> None:
        """Install signal handlers.

        Must be called before evolution starts. In async contexts, uses
        asyncio's signal handling. In sync contexts, uses traditional
        signal handlers.

        Examples:
            ```python
            stopper = SignalStopper()
            stopper.setup()
            try:
                # Run evolution
                pass
            finally:
                stopper.cleanup()
            ```

        Note:
            On platforms where certain signals are unavailable (like Windows
            for SIGTERM), those signals are silently skipped.
        """
        try:
            self._loop = asyncio.get_running_loop()
            for sig in self.signals:
                try:
                    self._loop.add_signal_handler(sig, self._handle_signal)
                except (OSError, ValueError, NotImplementedError):
                    pass  # Signal not available on platform
        except RuntimeError:
            # Not in async context, use traditional signal handling
            for sig in self.signals:
                try:
                    self._original_handlers[sig] = signal.signal(
                        sig, self._sync_handler
                    )
                except (OSError, ValueError):
                    pass  # Signal not available on platform

    def _handle_signal(self) -> None:
        """Handle signal in async context."""
        self._stop_requested = True

    def _sync_handler(
        self,
        signum: int,
        frame: FrameType | None,  # noqa: ARG002
    ) -> None:
        """Handle signal in sync context."""
        self._stop_requested = True

    def __call__(self, state: StopperState) -> bool:  # noqa: ARG002
        """Check if evolution should stop due to signal.

        Args:
            state: Current evolution state snapshot (not used, but required
                by StopperProtocol).

        Returns:
            True if a signal was received, False otherwise.

        Examples:
            ```python
            stopper = SignalStopper()
            stopper.setup()

            state = StopperState(
                iteration=5,
                best_score=0.8,
                stagnation_counter=0,
                total_evaluations=25,
                candidates_count=1,
                elapsed_seconds=30.0,
            )

            # Before signal
            stopper(state)  # False

            # After Ctrl+C
            # stopper(state)  # True
            ```

        Note:
            Often called after each iteration. Returns True as soon as
            a signal is received.
        """
        return self._stop_requested

    def cleanup(self) -> None:
        """Restore original signal handlers.

        Should be called after evolution completes to restore the signal
        handlers that were in place before setup() was called.

        Examples:
            ```python
            stopper = SignalStopper()
            stopper.setup()
            try:
                # Run evolution
                pass
            finally:
                stopper.cleanup()  # Restore original handlers
            ```

        Note:
            On platforms where certain signals are unavailable, those
            signals are silently skipped during cleanup.
        """
        if self._loop is not None:
            for sig in self.signals:
                try:
                    self._loop.remove_signal_handler(sig)
                except (OSError, ValueError, NotImplementedError):
                    # Some signals may not be removable or supported by the event
                    # loop on all platforms; cleanup is best-effort.
                    pass
        else:
            for sig, handler in self._original_handlers.items():
                try:
                    signal.signal(sig, handler)
                except (OSError, ValueError):
                    # Some signals or states may prevent restoring previous
                    # handlers; cleanup is best-effort.
                    pass
        self._original_handlers.clear()
        self._loop = None

    async def __aenter__(self) -> SignalStopper:
        """Enter async context and install signal handlers.

        Returns:
            Self for use as stopper in evolution config.

        Examples:
            ```python
            async with SignalStopper() as stopper:
                config = EvolutionConfig(stop_callbacks=[stopper])
                result = await engine.run(config)
            ```

        Note:
            On entry, signal handlers are installed automatically.
        """
        self.setup()
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: object,
    ) -> None:
        """Exit async context and restore signal handlers.

        Args:
            exc_type: Exception type if an exception was raised.
            exc_val: Exception value if an exception was raised.
            exc_tb: Exception traceback if an exception was raised.

        Note:
            Original signal handlers are restored automatically on exit,
            even if an exception was raised.
        """
        self.cleanup()

    def __enter__(self) -> SignalStopper:
        """Enter sync context and install signal handlers.

        Returns:
            Self for use as stopper in evolution config.

        Examples:
            ```python
            with SignalStopper() as stopper:
                config = EvolutionConfig(stop_callbacks=[stopper])
                result = engine.run(config)  # sync run
            ```

        Note:
            On entry, signal handlers are installed automatically.
        """
        self.setup()
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: object,
    ) -> None:
        """Exit sync context and restore signal handlers.

        Args:
            exc_type: Exception type if an exception was raised.
            exc_val: Exception value if an exception was raised.
            exc_tb: Exception traceback if an exception was raised.

        Note:
            Original signal handlers are restored automatically on exit,
            even if an exception was raised.
        """
        self.cleanup()

__init__

__init__(signals: Sequence[Signals] | None = None) -> None

Initialize signal stopper with signals to handle.

PARAMETER DESCRIPTION
signals

Signals to handle. Defaults to [SIGINT, SIGTERM] which covers Ctrl+C and system termination requests.

TYPE: Sequence[Signals] | None DEFAULT: None

Examples:

# Default signals (SIGINT, SIGTERM)
stopper = SignalStopper()

# Custom signals
stopper = SignalStopper(signals=[signal.SIGINT])
Note

Call setup() before evolution starts and cleanup() after evolution completes to properly manage signal handlers.

Source code in src/gepa_adk/adapters/stoppers/signal.py
def __init__(self, signals: Sequence[signal.Signals] | None = None) -> None:
    """Initialize signal stopper with signals to handle.

    Args:
        signals: Signals to handle. Defaults to [SIGINT, SIGTERM] which
            covers Ctrl+C and system termination requests.

    Examples:
        ```python
        # Default signals (SIGINT, SIGTERM)
        stopper = SignalStopper()

        # Custom signals
        stopper = SignalStopper(signals=[signal.SIGINT])
        ```

    Note:
        Call setup() before evolution starts and cleanup() after
        evolution completes to properly manage signal handlers.
    """
    if signals is None:
        self.signals: tuple[signal.Signals, ...] = (
            signal.SIGINT,
            signal.SIGTERM,
        )
    else:
        self.signals = tuple(signals)
    self._stop_requested: bool = False
    self._original_handlers: dict[signal.Signals, _SignalHandler] = {}
    self._loop: asyncio.AbstractEventLoop | None = None

setup

setup() -> None

Install signal handlers.

Must be called before evolution starts. In async contexts, uses asyncio's signal handling. In sync contexts, uses traditional signal handlers.

Examples:

stopper = SignalStopper()
stopper.setup()
try:
    # Run evolution
    pass
finally:
    stopper.cleanup()
Note

On platforms where certain signals are unavailable (like Windows for SIGTERM), those signals are silently skipped.

Source code in src/gepa_adk/adapters/stoppers/signal.py
def setup(self) -> None:
    """Install signal handlers.

    Must be called before evolution starts. In async contexts, uses
    asyncio's signal handling. In sync contexts, uses traditional
    signal handlers.

    Examples:
        ```python
        stopper = SignalStopper()
        stopper.setup()
        try:
            # Run evolution
            pass
        finally:
            stopper.cleanup()
        ```

    Note:
        On platforms where certain signals are unavailable (like Windows
        for SIGTERM), those signals are silently skipped.
    """
    try:
        self._loop = asyncio.get_running_loop()
        for sig in self.signals:
            try:
                self._loop.add_signal_handler(sig, self._handle_signal)
            except (OSError, ValueError, NotImplementedError):
                pass  # Signal not available on platform
    except RuntimeError:
        # Not in async context, use traditional signal handling
        for sig in self.signals:
            try:
                self._original_handlers[sig] = signal.signal(
                    sig, self._sync_handler
                )
            except (OSError, ValueError):
                pass  # Signal not available on platform

__call__

__call__(state: StopperState) -> bool

Check if evolution should stop due to signal.

PARAMETER DESCRIPTION
state

Current evolution state snapshot (not used, but required by StopperProtocol).

TYPE: StopperState

RETURNS DESCRIPTION
bool

True if a signal was received, False otherwise.

Examples:

stopper = SignalStopper()
stopper.setup()

state = StopperState(
    iteration=5,
    best_score=0.8,
    stagnation_counter=0,
    total_evaluations=25,
    candidates_count=1,
    elapsed_seconds=30.0,
)

# Before signal
stopper(state)  # False

# After Ctrl+C
# stopper(state)  # True
Note

Often called after each iteration. Returns True as soon as a signal is received.

Source code in src/gepa_adk/adapters/stoppers/signal.py
def __call__(self, state: StopperState) -> bool:  # noqa: ARG002
    """Check if evolution should stop due to signal.

    Args:
        state: Current evolution state snapshot (not used, but required
            by StopperProtocol).

    Returns:
        True if a signal was received, False otherwise.

    Examples:
        ```python
        stopper = SignalStopper()
        stopper.setup()

        state = StopperState(
            iteration=5,
            best_score=0.8,
            stagnation_counter=0,
            total_evaluations=25,
            candidates_count=1,
            elapsed_seconds=30.0,
        )

        # Before signal
        stopper(state)  # False

        # After Ctrl+C
        # stopper(state)  # True
        ```

    Note:
        Often called after each iteration. Returns True as soon as
        a signal is received.
    """
    return self._stop_requested

cleanup

cleanup() -> None

Restore original signal handlers.

Should be called after evolution completes to restore the signal handlers that were in place before setup() was called.

Examples:

stopper = SignalStopper()
stopper.setup()
try:
    # Run evolution
    pass
finally:
    stopper.cleanup()  # Restore original handlers
Note

On platforms where certain signals are unavailable, those signals are silently skipped during cleanup.

Source code in src/gepa_adk/adapters/stoppers/signal.py
def cleanup(self) -> None:
    """Restore original signal handlers.

    Should be called after evolution completes to restore the signal
    handlers that were in place before setup() was called.

    Examples:
        ```python
        stopper = SignalStopper()
        stopper.setup()
        try:
            # Run evolution
            pass
        finally:
            stopper.cleanup()  # Restore original handlers
        ```

    Note:
        On platforms where certain signals are unavailable, those
        signals are silently skipped during cleanup.
    """
    if self._loop is not None:
        for sig in self.signals:
            try:
                self._loop.remove_signal_handler(sig)
            except (OSError, ValueError, NotImplementedError):
                # Some signals may not be removable or supported by the event
                # loop on all platforms; cleanup is best-effort.
                pass
    else:
        for sig, handler in self._original_handlers.items():
            try:
                signal.signal(sig, handler)
            except (OSError, ValueError):
                # Some signals or states may prevent restoring previous
                # handlers; cleanup is best-effort.
                pass
    self._original_handlers.clear()
    self._loop = None

__aenter__ async

__aenter__() -> SignalStopper

Enter async context and install signal handlers.

RETURNS DESCRIPTION
SignalStopper

Self for use as stopper in evolution config.

Examples:

async with SignalStopper() as stopper:
    config = EvolutionConfig(stop_callbacks=[stopper])
    result = await engine.run(config)
Note

On entry, signal handlers are installed automatically.

Source code in src/gepa_adk/adapters/stoppers/signal.py
async def __aenter__(self) -> SignalStopper:
    """Enter async context and install signal handlers.

    Returns:
        Self for use as stopper in evolution config.

    Examples:
        ```python
        async with SignalStopper() as stopper:
            config = EvolutionConfig(stop_callbacks=[stopper])
            result = await engine.run(config)
        ```

    Note:
        On entry, signal handlers are installed automatically.
    """
    self.setup()
    return self

__aexit__ async

__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None

Exit async context and restore signal handlers.

PARAMETER DESCRIPTION
exc_type

Exception type if an exception was raised.

TYPE: type[BaseException] | None

exc_val

Exception value if an exception was raised.

TYPE: BaseException | None

exc_tb

Exception traceback if an exception was raised.

TYPE: object

Note

Original signal handlers are restored automatically on exit, even if an exception was raised.

Source code in src/gepa_adk/adapters/stoppers/signal.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None:
    """Exit async context and restore signal handlers.

    Args:
        exc_type: Exception type if an exception was raised.
        exc_val: Exception value if an exception was raised.
        exc_tb: Exception traceback if an exception was raised.

    Note:
        Original signal handlers are restored automatically on exit,
        even if an exception was raised.
    """
    self.cleanup()

__enter__

__enter__() -> SignalStopper

Enter sync context and install signal handlers.

RETURNS DESCRIPTION
SignalStopper

Self for use as stopper in evolution config.

Examples:

with SignalStopper() as stopper:
    config = EvolutionConfig(stop_callbacks=[stopper])
    result = engine.run(config)  # sync run
Note

On entry, signal handlers are installed automatically.

Source code in src/gepa_adk/adapters/stoppers/signal.py
def __enter__(self) -> SignalStopper:
    """Enter sync context and install signal handlers.

    Returns:
        Self for use as stopper in evolution config.

    Examples:
        ```python
        with SignalStopper() as stopper:
            config = EvolutionConfig(stop_callbacks=[stopper])
            result = engine.run(config)  # sync run
        ```

    Note:
        On entry, signal handlers are installed automatically.
    """
    self.setup()
    return self

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None

Exit sync context and restore signal handlers.

PARAMETER DESCRIPTION
exc_type

Exception type if an exception was raised.

TYPE: type[BaseException] | None

exc_val

Exception value if an exception was raised.

TYPE: BaseException | None

exc_tb

Exception traceback if an exception was raised.

TYPE: object

Note

Original signal handlers are restored automatically on exit, even if an exception was raised.

Source code in src/gepa_adk/adapters/stoppers/signal.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None:
    """Exit sync context and restore signal handlers.

    Args:
        exc_type: Exception type if an exception was raised.
        exc_val: Exception value if an exception was raised.
        exc_tb: Exception traceback if an exception was raised.

    Note:
        Original signal handlers are restored automatically on exit,
        even if an exception was raised.
    """
    self.cleanup()

ScoreThresholdStopper

Stop evolution when best score reaches threshold.

Terminates evolution when the best achieved score meets or exceeds the configured threshold. Useful for "early success" scenarios where continued evolution beyond a target performance is unnecessary.

ATTRIBUTE DESCRIPTION
threshold

Target score to achieve (evolution stops when best_score >= threshold).

TYPE: float

Examples:

Creating a 95% accuracy threshold:

from gepa_adk.adapters.stoppers import ScoreThresholdStopper
from gepa_adk.domain.stopper import StopperState

stopper = ScoreThresholdStopper(0.95)

state = StopperState(
    iteration=10,
    best_score=0.97,
    stagnation_counter=2,
    total_evaluations=50,
    candidates_count=3,
    elapsed_seconds=120.0,
)

stopper(state)  # Returns True (should stop)
Note

Any float value can be used as threshold, including negative numbers for domains where scores can be negative (e.g., loss minimization).

Source code in src/gepa_adk/adapters/stoppers/threshold.py
class ScoreThresholdStopper:
    """Stop evolution when best score reaches threshold.

    Terminates evolution when the best achieved score meets or exceeds the
    configured threshold. Useful for "early success" scenarios where continued
    evolution beyond a target performance is unnecessary.

    Attributes:
        threshold (float): Target score to achieve (evolution stops when
            best_score >= threshold).

    Examples:
        Creating a 95% accuracy threshold:

        ```python
        from gepa_adk.adapters.stoppers import ScoreThresholdStopper
        from gepa_adk.domain.stopper import StopperState

        stopper = ScoreThresholdStopper(0.95)

        state = StopperState(
            iteration=10,
            best_score=0.97,
            stagnation_counter=2,
            total_evaluations=50,
            candidates_count=3,
            elapsed_seconds=120.0,
        )

        stopper(state)  # Returns True (should stop)
        ```

    Note:
        Any float value can be used as threshold, including negative numbers
        for domains where scores can be negative (e.g., loss minimization).
    """

    def __init__(self, threshold: float) -> None:
        """Initialize threshold stopper with target score.

        Args:
            threshold: Target score to achieve. Evolution stops when
                best_score >= threshold. Can be any float value including
                negative numbers.

        Examples:
            ```python
            stopper = ScoreThresholdStopper(0.9)  # 90% target
            stopper = ScoreThresholdStopper(-0.5)  # For negative score domains
            ```

        Note:
            Compared to timeout values, threshold has no restrictions on sign
            or magnitude since score domains vary by application.
        """
        self.threshold = threshold

    def __call__(self, state: StopperState) -> bool:
        """Check if evolution should stop due to reaching threshold.

        Args:
            state: Current evolution state snapshot containing best_score.

        Returns:
            True if best score meets or exceeds threshold, False otherwise.

        Examples:
            ```python
            stopper = ScoreThresholdStopper(0.9)

            # Below threshold
            state1 = StopperState(
                iteration=5,
                best_score=0.85,
                stagnation_counter=0,
                total_evaluations=25,
                candidates_count=1,
                elapsed_seconds=30.0,
            )
            stopper(state1)  # False

            # At threshold
            state2 = StopperState(
                iteration=10,
                best_score=0.9,
                stagnation_counter=1,
                total_evaluations=50,
                candidates_count=2,
                elapsed_seconds=65.0,
            )
            stopper(state2)  # True
            ```

        Note:
            Often called after each iteration. Returns True as soon as
            the threshold is reached.
        """
        return state.best_score >= self.threshold

__init__

__init__(threshold: float) -> None

Initialize threshold stopper with target score.

PARAMETER DESCRIPTION
threshold

Target score to achieve. Evolution stops when best_score >= threshold. Can be any float value including negative numbers.

TYPE: float

Examples:

stopper = ScoreThresholdStopper(0.9)  # 90% target
stopper = ScoreThresholdStopper(-0.5)  # For negative score domains
Note

Compared to timeout values, threshold has no restrictions on sign or magnitude since score domains vary by application.

Source code in src/gepa_adk/adapters/stoppers/threshold.py
def __init__(self, threshold: float) -> None:
    """Initialize threshold stopper with target score.

    Args:
        threshold: Target score to achieve. Evolution stops when
            best_score >= threshold. Can be any float value including
            negative numbers.

    Examples:
        ```python
        stopper = ScoreThresholdStopper(0.9)  # 90% target
        stopper = ScoreThresholdStopper(-0.5)  # For negative score domains
        ```

    Note:
        Compared to timeout values, threshold has no restrictions on sign
        or magnitude since score domains vary by application.
    """
    self.threshold = threshold

__call__

__call__(state: StopperState) -> bool

Check if evolution should stop due to reaching threshold.

PARAMETER DESCRIPTION
state

Current evolution state snapshot containing best_score.

TYPE: StopperState

RETURNS DESCRIPTION
bool

True if best score meets or exceeds threshold, False otherwise.

Examples:

stopper = ScoreThresholdStopper(0.9)

# Below threshold
state1 = StopperState(
    iteration=5,
    best_score=0.85,
    stagnation_counter=0,
    total_evaluations=25,
    candidates_count=1,
    elapsed_seconds=30.0,
)
stopper(state1)  # False

# At threshold
state2 = StopperState(
    iteration=10,
    best_score=0.9,
    stagnation_counter=1,
    total_evaluations=50,
    candidates_count=2,
    elapsed_seconds=65.0,
)
stopper(state2)  # True
Note

Often called after each iteration. Returns True as soon as the threshold is reached.

Source code in src/gepa_adk/adapters/stoppers/threshold.py
def __call__(self, state: StopperState) -> bool:
    """Check if evolution should stop due to reaching threshold.

    Args:
        state: Current evolution state snapshot containing best_score.

    Returns:
        True if best score meets or exceeds threshold, False otherwise.

    Examples:
        ```python
        stopper = ScoreThresholdStopper(0.9)

        # Below threshold
        state1 = StopperState(
            iteration=5,
            best_score=0.85,
            stagnation_counter=0,
            total_evaluations=25,
            candidates_count=1,
            elapsed_seconds=30.0,
        )
        stopper(state1)  # False

        # At threshold
        state2 = StopperState(
            iteration=10,
            best_score=0.9,
            stagnation_counter=1,
            total_evaluations=50,
            candidates_count=2,
            elapsed_seconds=65.0,
        )
        stopper(state2)  # True
        ```

    Note:
        Often called after each iteration. Returns True as soon as
        the threshold is reached.
    """
    return state.best_score >= self.threshold

TimeoutStopper

Stop evolution after a specified timeout.

Terminates evolution when wall-clock time exceeds the configured timeout duration. Useful for resource management and CI/CD pipelines.

ATTRIBUTE DESCRIPTION
timeout_seconds

Maximum wall-clock time for evolution.

TYPE: float

Examples:

Creating a 5-minute timeout:

from gepa_adk.adapters.stoppers import TimeoutStopper
from gepa_adk.domain.stopper import StopperState

stopper = TimeoutStopper(300.0)  # 5 minutes

state = StopperState(
    iteration=10,
    best_score=0.8,
    stagnation_counter=2,
    total_evaluations=50,
    candidates_count=3,
    elapsed_seconds=400.0,  # Exceeds timeout
)

stopper(state)  # Returns True (should stop)
Note

All timeout values must be positive. Zero and negative values raise ValueError to prevent invalid configurations.

Source code in src/gepa_adk/adapters/stoppers/timeout.py
class TimeoutStopper:
    """Stop evolution after a specified timeout.

    Terminates evolution when wall-clock time exceeds the configured
    timeout duration. Useful for resource management and CI/CD pipelines.

    Attributes:
        timeout_seconds (float): Maximum wall-clock time for evolution.

    Examples:
        Creating a 5-minute timeout:

        ```python
        from gepa_adk.adapters.stoppers import TimeoutStopper
        from gepa_adk.domain.stopper import StopperState

        stopper = TimeoutStopper(300.0)  # 5 minutes

        state = StopperState(
            iteration=10,
            best_score=0.8,
            stagnation_counter=2,
            total_evaluations=50,
            candidates_count=3,
            elapsed_seconds=400.0,  # Exceeds timeout
        )

        stopper(state)  # Returns True (should stop)
        ```

    Note:
        All timeout values must be positive. Zero and negative values
        raise ValueError to prevent invalid configurations.
    """

    def __init__(self, timeout_seconds: float) -> None:
        """Initialize timeout stopper with maximum duration.

        Args:
            timeout_seconds: Maximum wall-clock time for evolution in seconds.
                Must be a positive value.

        Raises:
            ValueError: If timeout_seconds is zero or negative.

        Examples:
            ```python
            stopper = TimeoutStopper(60.0)  # 1 minute
            stopper = TimeoutStopper(3600.0)  # 1 hour
            ```

        Note:
            Consider using reasonable timeouts for your use case. Very
            short timeouts may not allow sufficient evolution progress.
        """
        if timeout_seconds <= 0:
            raise ValueError("timeout_seconds must be positive")
        self.timeout_seconds = timeout_seconds

    def __call__(self, state: StopperState) -> bool:
        """Check if evolution should stop due to timeout.

        Args:
            state: Current evolution state snapshot containing elapsed_seconds.

        Returns:
            True if elapsed time meets or exceeds timeout, False otherwise.

        Examples:
            ```python
            stopper = TimeoutStopper(60.0)

            # Not yet timed out
            state1 = StopperState(
                iteration=5,
                best_score=0.5,
                stagnation_counter=0,
                total_evaluations=25,
                candidates_count=1,
                elapsed_seconds=30.0,
            )
            stopper(state1)  # False

            # Timed out
            state2 = StopperState(
                iteration=10,
                best_score=0.7,
                stagnation_counter=1,
                total_evaluations=50,
                candidates_count=2,
                elapsed_seconds=65.0,
            )
            stopper(state2)  # True
            ```

        Note:
            Often called after each iteration. Returns True as soon as
            the time limit is reached.
        """
        return state.elapsed_seconds >= self.timeout_seconds

__init__

__init__(timeout_seconds: float) -> None

Initialize timeout stopper with maximum duration.

PARAMETER DESCRIPTION
timeout_seconds

Maximum wall-clock time for evolution in seconds. Must be a positive value.

TYPE: float

RAISES DESCRIPTION
ValueError

If timeout_seconds is zero or negative.

Examples:

stopper = TimeoutStopper(60.0)  # 1 minute
stopper = TimeoutStopper(3600.0)  # 1 hour
Note

Consider using reasonable timeouts for your use case. Very short timeouts may not allow sufficient evolution progress.

Source code in src/gepa_adk/adapters/stoppers/timeout.py
def __init__(self, timeout_seconds: float) -> None:
    """Initialize timeout stopper with maximum duration.

    Args:
        timeout_seconds: Maximum wall-clock time for evolution in seconds.
            Must be a positive value.

    Raises:
        ValueError: If timeout_seconds is zero or negative.

    Examples:
        ```python
        stopper = TimeoutStopper(60.0)  # 1 minute
        stopper = TimeoutStopper(3600.0)  # 1 hour
        ```

    Note:
        Consider using reasonable timeouts for your use case. Very
        short timeouts may not allow sufficient evolution progress.
    """
    if timeout_seconds <= 0:
        raise ValueError("timeout_seconds must be positive")
    self.timeout_seconds = timeout_seconds

__call__

__call__(state: StopperState) -> bool

Check if evolution should stop due to timeout.

PARAMETER DESCRIPTION
state

Current evolution state snapshot containing elapsed_seconds.

TYPE: StopperState

RETURNS DESCRIPTION
bool

True if elapsed time meets or exceeds timeout, False otherwise.

Examples:

stopper = TimeoutStopper(60.0)

# Not yet timed out
state1 = StopperState(
    iteration=5,
    best_score=0.5,
    stagnation_counter=0,
    total_evaluations=25,
    candidates_count=1,
    elapsed_seconds=30.0,
)
stopper(state1)  # False

# Timed out
state2 = StopperState(
    iteration=10,
    best_score=0.7,
    stagnation_counter=1,
    total_evaluations=50,
    candidates_count=2,
    elapsed_seconds=65.0,
)
stopper(state2)  # True
Note

Often called after each iteration. Returns True as soon as the time limit is reached.

Source code in src/gepa_adk/adapters/stoppers/timeout.py
def __call__(self, state: StopperState) -> bool:
    """Check if evolution should stop due to timeout.

    Args:
        state: Current evolution state snapshot containing elapsed_seconds.

    Returns:
        True if elapsed time meets or exceeds timeout, False otherwise.

    Examples:
        ```python
        stopper = TimeoutStopper(60.0)

        # Not yet timed out
        state1 = StopperState(
            iteration=5,
            best_score=0.5,
            stagnation_counter=0,
            total_evaluations=25,
            candidates_count=1,
            elapsed_seconds=30.0,
        )
        stopper(state1)  # False

        # Timed out
        state2 = StopperState(
            iteration=10,
            best_score=0.7,
            stagnation_counter=1,
            total_evaluations=50,
            candidates_count=2,
            elapsed_seconds=65.0,
        )
        stopper(state2)  # True
        ```

    Note:
        Often called after each iteration. Returns True as soon as
        the time limit is reached.
    """
    return state.elapsed_seconds >= self.timeout_seconds