Skip to content

Full Module Index

adk_secure_sessions

Encrypted session storage for Google ADK.

Provides field-level encryption for ADK session data via pluggable encryption backends that conform to the EncryptionBackend protocol.

ATTRIBUTE DESCRIPTION
EncryptionBackend

Protocol defining the encrypt/decrypt contract.

TYPE: Protocol

AesGcmBackend

AES-256-GCM authenticated encryption backend.

TYPE: Protocol

FernetBackend

Fernet symmetric encryption backend.

TYPE: Protocol

EncryptedSessionService

Encrypted session service wrapping DatabaseSessionService with transparent encryption.

TYPE: Protocol

SecureSessionError

Base exception for all library errors.

TYPE: Protocol

EncryptionError

Raised when encryption fails.

TYPE: Protocol

DecryptionError

Raised when decryption fails.

TYPE: Protocol

SerializationError

Raised when data cannot be serialized to JSON.

TYPE: Protocol

ConfigurationError

Raised when the service is misconfigured at startup.

TYPE: Protocol

encrypt_session

Serialize a dict to an encrypted envelope.

TYPE: bytes

decrypt_session

Decrypt an envelope back to a dict.

TYPE: dict[str, Any]

encrypt_json

Encrypt a JSON string into an envelope.

TYPE: bytes

decrypt_json

Decrypt an envelope back to a JSON string.

TYPE: str

BACKEND_AES_GCM

Backend identifier for AES-256-GCM encryption.

TYPE: int

BACKEND_FERNET

Backend identifier for Fernet encryption.

TYPE: int

ENVELOPE_VERSION_1

Current envelope format version byte.

TYPE: int

Examples:

Encrypt and decrypt session state:

from adk_secure_sessions import (
    FernetBackend,
    encrypt_session,
    decrypt_session,
    BACKEND_FERNET,
)

backend = FernetBackend("my-secret-passphrase")
envelope = await encrypt_session({"ssn": "123-45-6789"}, backend, BACKEND_FERNET)
state = await decrypt_session(envelope, backend)
See Also

adk_secure_sessions.protocols: Full protocol definition and known limitations.

BACKEND_AES_GCM module-attribute

BACKEND_AES_GCM: int = 2

Backend identifier for AES-256-GCM encryption.

BACKEND_FERNET module-attribute

BACKEND_FERNET: int = 1

Backend identifier for Fernet encryption.

ENVELOPE_VERSION_1 module-attribute

ENVELOPE_VERSION_1: int = 1

Current envelope format version byte.

AesGcmBackend

AES-256-GCM encryption backend conforming to EncryptionBackend.

Accepts a key as exactly 32 bytes (256 bits). Use AESGCM.generate_key(bit_length=256) to generate a valid key.

ATTRIBUTE DESCRIPTION
_aesgcm

Internal AESGCM instance for encrypt/decrypt operations.

TYPE: AESGCM

Examples:

Initialize with a generated key:

from cryptography.hazmat.primitives.ciphers.aead import AESGCM

key = AESGCM.generate_key(bit_length=256)
backend = AesGcmBackend(key=key)
Source code in src/adk_secure_sessions/backends/aes_gcm.py
class AesGcmBackend:
    """AES-256-GCM encryption backend conforming to ``EncryptionBackend``.

    Accepts a key as exactly 32 bytes (256 bits). Use
    ``AESGCM.generate_key(bit_length=256)`` to generate a valid key.

    Attributes:
        _aesgcm (AESGCM): Internal AESGCM instance for encrypt/decrypt
            operations.

    Examples:
        Initialize with a generated key:

        ```python
        from cryptography.hazmat.primitives.ciphers.aead import AESGCM

        key = AESGCM.generate_key(bit_length=256)
        backend = AesGcmBackend(key=key)
        ```
    """

    def __init__(self, key: bytes) -> None:
        """Initialize AesGcmBackend with the given key.

        Args:
            key: Encryption key as exactly 32 bytes (256 bits).

        Raises:
            ConfigurationError: If *key* is not ``bytes`` or is not
                exactly 32 bytes long.
        """
        if not isinstance(key, bytes):
            msg = f"key must be bytes, got {type(key).__name__}"
            raise ConfigurationError(msg)

        if len(key) != _KEY_LENGTH:
            msg = f"key must be exactly {_KEY_LENGTH} bytes, got {len(key)}"
            raise ConfigurationError(msg)

        self._aesgcm = AESGCM(key)

    @property
    def backend_id(self) -> int:
        """Unique backend identifier for the envelope header.

        Returns:
            ``BACKEND_AES_GCM`` (``0x02``).
        """
        return BACKEND_AES_GCM

    def sync_encrypt(self, plaintext: bytes) -> bytes:
        """Encrypt plaintext bytes synchronously.

        Generates a fresh 12-byte random nonce per call. Returns
        ``nonce || ciphertext + tag``.

        Args:
            plaintext: Raw bytes to encrypt.

        Returns:
            Encrypted bytes as ``nonce (12) || ciphertext + tag``.

        Raises:
            TypeError: If *plaintext* is not ``bytes``.

        Examples:
            ```python
            ciphertext = backend.sync_encrypt(b"hello")
            ```
        """
        if not isinstance(plaintext, bytes):
            msg = f"plaintext must be bytes, got {type(plaintext).__name__}"
            raise TypeError(msg)

        nonce = os.urandom(_NONCE_LENGTH)
        ciphertext_and_tag = self._aesgcm.encrypt(nonce, plaintext, None)
        return nonce + ciphertext_and_tag

    def sync_decrypt(self, ciphertext: bytes) -> bytes:
        """Decrypt ciphertext bytes synchronously.

        Expects ``nonce (12 bytes) || ciphertext + tag``.

        Args:
            ciphertext: Encrypted bytes to decrypt.

        Returns:
            Decrypted plaintext as bytes.

        Raises:
            TypeError: If *ciphertext* is not ``bytes``.
            DecryptionError: If decryption fails due to wrong key,
                tampered ciphertext, or malformed input.

        Examples:
            ```python
            plaintext = backend.sync_decrypt(ciphertext)
            ```
        """
        if not isinstance(ciphertext, bytes):
            msg = f"ciphertext must be bytes, got {type(ciphertext).__name__}"
            raise TypeError(msg)

        if len(ciphertext) < _MIN_CIPHERTEXT_LENGTH:
            msg = "Decryption failed: ciphertext too short"
            raise DecryptionError(msg)

        nonce = ciphertext[:_NONCE_LENGTH]
        ct_and_tag = ciphertext[_NONCE_LENGTH:]

        try:
            return self._aesgcm.decrypt(nonce, ct_and_tag, None)
        except InvalidTag:
            msg = "Decryption failed: invalid tag or wrong key"
            raise DecryptionError(msg) from None

    async def encrypt(self, plaintext: bytes) -> bytes:
        """Encrypt plaintext bytes asynchronously.

        Args:
            plaintext: Raw bytes to encrypt.

        Returns:
            Encrypted bytes as ``nonce (12) || ciphertext + tag``.

        Raises:
            TypeError: If *plaintext* is not ``bytes``.

        Examples:
            ```python
            ciphertext = await backend.encrypt(b"hello")
            ```
        """
        if not isinstance(plaintext, bytes):
            msg = f"plaintext must be bytes, got {type(plaintext).__name__}"
            raise TypeError(msg)

        return await asyncio.to_thread(self.sync_encrypt, plaintext)

    async def decrypt(self, ciphertext: bytes) -> bytes:
        """Decrypt ciphertext bytes asynchronously.

        Args:
            ciphertext: Encrypted bytes to decrypt.

        Returns:
            Decrypted plaintext as bytes.

        Raises:
            TypeError: If *ciphertext* is not ``bytes``.
            DecryptionError: If decryption fails due to wrong key,
                tampered ciphertext, or malformed input.

        Examples:
            ```python
            plaintext = await backend.decrypt(ciphertext)
            ```
        """
        if not isinstance(ciphertext, bytes):
            msg = f"ciphertext must be bytes, got {type(ciphertext).__name__}"
            raise TypeError(msg)

        return await asyncio.to_thread(self.sync_decrypt, ciphertext)

backend_id property

backend_id: int

Unique backend identifier for the envelope header.

RETURNS DESCRIPTION
int

BACKEND_AES_GCM (0x02).

__init__

__init__(key: bytes) -> None

Initialize AesGcmBackend with the given key.

PARAMETER DESCRIPTION
key

Encryption key as exactly 32 bytes (256 bits).

TYPE: bytes

RAISES DESCRIPTION
ConfigurationError

If key is not bytes or is not exactly 32 bytes long.

Source code in src/adk_secure_sessions/backends/aes_gcm.py
def __init__(self, key: bytes) -> None:
    """Initialize AesGcmBackend with the given key.

    Args:
        key: Encryption key as exactly 32 bytes (256 bits).

    Raises:
        ConfigurationError: If *key* is not ``bytes`` or is not
            exactly 32 bytes long.
    """
    if not isinstance(key, bytes):
        msg = f"key must be bytes, got {type(key).__name__}"
        raise ConfigurationError(msg)

    if len(key) != _KEY_LENGTH:
        msg = f"key must be exactly {_KEY_LENGTH} bytes, got {len(key)}"
        raise ConfigurationError(msg)

    self._aesgcm = AESGCM(key)

sync_encrypt

sync_encrypt(plaintext: bytes) -> bytes

Encrypt plaintext bytes synchronously.

Generates a fresh 12-byte random nonce per call. Returns nonce || ciphertext + tag.

PARAMETER DESCRIPTION
plaintext

Raw bytes to encrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Encrypted bytes as nonce (12) || ciphertext + tag.

RAISES DESCRIPTION
TypeError

If plaintext is not bytes.

Examples:

ciphertext = backend.sync_encrypt(b"hello")
Source code in src/adk_secure_sessions/backends/aes_gcm.py
def sync_encrypt(self, plaintext: bytes) -> bytes:
    """Encrypt plaintext bytes synchronously.

    Generates a fresh 12-byte random nonce per call. Returns
    ``nonce || ciphertext + tag``.

    Args:
        plaintext: Raw bytes to encrypt.

    Returns:
        Encrypted bytes as ``nonce (12) || ciphertext + tag``.

    Raises:
        TypeError: If *plaintext* is not ``bytes``.

    Examples:
        ```python
        ciphertext = backend.sync_encrypt(b"hello")
        ```
    """
    if not isinstance(plaintext, bytes):
        msg = f"plaintext must be bytes, got {type(plaintext).__name__}"
        raise TypeError(msg)

    nonce = os.urandom(_NONCE_LENGTH)
    ciphertext_and_tag = self._aesgcm.encrypt(nonce, plaintext, None)
    return nonce + ciphertext_and_tag

sync_decrypt

sync_decrypt(ciphertext: bytes) -> bytes

Decrypt ciphertext bytes synchronously.

Expects nonce (12 bytes) || ciphertext + tag.

PARAMETER DESCRIPTION
ciphertext

Encrypted bytes to decrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Decrypted plaintext as bytes.

RAISES DESCRIPTION
TypeError

If ciphertext is not bytes.

DecryptionError

If decryption fails due to wrong key, tampered ciphertext, or malformed input.

Examples:

plaintext = backend.sync_decrypt(ciphertext)
Source code in src/adk_secure_sessions/backends/aes_gcm.py
def sync_decrypt(self, ciphertext: bytes) -> bytes:
    """Decrypt ciphertext bytes synchronously.

    Expects ``nonce (12 bytes) || ciphertext + tag``.

    Args:
        ciphertext: Encrypted bytes to decrypt.

    Returns:
        Decrypted plaintext as bytes.

    Raises:
        TypeError: If *ciphertext* is not ``bytes``.
        DecryptionError: If decryption fails due to wrong key,
            tampered ciphertext, or malformed input.

    Examples:
        ```python
        plaintext = backend.sync_decrypt(ciphertext)
        ```
    """
    if not isinstance(ciphertext, bytes):
        msg = f"ciphertext must be bytes, got {type(ciphertext).__name__}"
        raise TypeError(msg)

    if len(ciphertext) < _MIN_CIPHERTEXT_LENGTH:
        msg = "Decryption failed: ciphertext too short"
        raise DecryptionError(msg)

    nonce = ciphertext[:_NONCE_LENGTH]
    ct_and_tag = ciphertext[_NONCE_LENGTH:]

    try:
        return self._aesgcm.decrypt(nonce, ct_and_tag, None)
    except InvalidTag:
        msg = "Decryption failed: invalid tag or wrong key"
        raise DecryptionError(msg) from None

encrypt async

encrypt(plaintext: bytes) -> bytes

Encrypt plaintext bytes asynchronously.

PARAMETER DESCRIPTION
plaintext

Raw bytes to encrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Encrypted bytes as nonce (12) || ciphertext + tag.

RAISES DESCRIPTION
TypeError

If plaintext is not bytes.

Examples:

ciphertext = await backend.encrypt(b"hello")
Source code in src/adk_secure_sessions/backends/aes_gcm.py
async def encrypt(self, plaintext: bytes) -> bytes:
    """Encrypt plaintext bytes asynchronously.

    Args:
        plaintext: Raw bytes to encrypt.

    Returns:
        Encrypted bytes as ``nonce (12) || ciphertext + tag``.

    Raises:
        TypeError: If *plaintext* is not ``bytes``.

    Examples:
        ```python
        ciphertext = await backend.encrypt(b"hello")
        ```
    """
    if not isinstance(plaintext, bytes):
        msg = f"plaintext must be bytes, got {type(plaintext).__name__}"
        raise TypeError(msg)

    return await asyncio.to_thread(self.sync_encrypt, plaintext)

decrypt async

decrypt(ciphertext: bytes) -> bytes

Decrypt ciphertext bytes asynchronously.

PARAMETER DESCRIPTION
ciphertext

Encrypted bytes to decrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Decrypted plaintext as bytes.

RAISES DESCRIPTION
TypeError

If ciphertext is not bytes.

DecryptionError

If decryption fails due to wrong key, tampered ciphertext, or malformed input.

Examples:

plaintext = await backend.decrypt(ciphertext)
Source code in src/adk_secure_sessions/backends/aes_gcm.py
async def decrypt(self, ciphertext: bytes) -> bytes:
    """Decrypt ciphertext bytes asynchronously.

    Args:
        ciphertext: Encrypted bytes to decrypt.

    Returns:
        Decrypted plaintext as bytes.

    Raises:
        TypeError: If *ciphertext* is not ``bytes``.
        DecryptionError: If decryption fails due to wrong key,
            tampered ciphertext, or malformed input.

    Examples:
        ```python
        plaintext = await backend.decrypt(ciphertext)
        ```
    """
    if not isinstance(ciphertext, bytes):
        msg = f"ciphertext must be bytes, got {type(ciphertext).__name__}"
        raise TypeError(msg)

    return await asyncio.to_thread(self.sync_decrypt, ciphertext)

FernetBackend

Fernet encryption backend conforming to EncryptionBackend.

Accepts a key as str or bytes. If the key is a valid base64url-encoded 32-byte Fernet key, it is used directly (no derivation, no salt marker). Otherwise, a two-phase key derivation scheme is used:

  • Init: PBKDF2-HMAC-SHA256 (600,000 iterations) derives a master key from the passphrase.
  • Per-operation: HKDF-SHA256 with a fresh random salt expands the master key into a unique per-operation Fernet key.

Backward compatibility: data encrypted with pre-3.2 fixed-salt derivation (480,000 iterations) is detected via marker byte and decrypted transparently.

ATTRIBUTE DESCRIPTION
_passphrase_key

Master key from PBKDF2 at 600k iterations (passphrase mode only, None in direct-key mode).

TYPE: bytes | None

_legacy_fernet

Fernet instance for direct-key mode or legacy (480k iterations) backward-compatible decryption.

TYPE: Fernet

_is_passphrase_mode

Whether the backend was initialized with a passphrase (True) or a pre-generated Fernet key (False).

TYPE: bool

Examples:

Initialize with a passphrase:

backend = FernetBackend(key="my-secret")

Initialize with a pre-generated Fernet key:

from cryptography.fernet import Fernet

key = Fernet.generate_key()
backend = FernetBackend(key=key)
Source code in src/adk_secure_sessions/backends/fernet.py
class FernetBackend:
    """Fernet encryption backend conforming to ``EncryptionBackend``.

    Accepts a key as ``str`` or ``bytes``. If the key is a valid
    base64url-encoded 32-byte Fernet key, it is used directly (no
    derivation, no salt marker). Otherwise, a two-phase key derivation
    scheme is used:

    - **Init**: PBKDF2-HMAC-SHA256 (600,000 iterations) derives a master
      key from the passphrase.
    - **Per-operation**: HKDF-SHA256 with a fresh random salt expands
      the master key into a unique per-operation Fernet key.

    Backward compatibility: data encrypted with pre-3.2 fixed-salt
    derivation (480,000 iterations) is detected via marker byte and
    decrypted transparently.

    Attributes:
        _passphrase_key (bytes | None): Master key from PBKDF2 at 600k
            iterations (passphrase mode only, ``None`` in direct-key mode).
        _legacy_fernet (Fernet): Fernet instance for direct-key mode or
            legacy (480k iterations) backward-compatible decryption.
        _is_passphrase_mode (bool): Whether the backend was initialized
            with a passphrase (True) or a pre-generated Fernet key (False).

    Examples:
        Initialize with a passphrase:

        ```python
        backend = FernetBackend(key="my-secret")
        ```

        Initialize with a pre-generated Fernet key:

        ```python
        from cryptography.fernet import Fernet

        key = Fernet.generate_key()
        backend = FernetBackend(key=key)
        ```
    """

    def __init__(self, key: str | bytes) -> None:
        """Initialize FernetBackend with the given key.

        Args:
            key: Encryption key as ``str`` or ``bytes``. A valid Fernet
                key is used directly; arbitrary input is derived via
                two-phase PBKDF2 + HKDF.

        Raises:
            ConfigurationError: If *key* is not ``str`` or ``bytes``,
                or if *key* is empty.
        """
        if not isinstance(key, (str, bytes)):
            msg = f"key must be str or bytes, got {type(key).__name__}"
            raise ConfigurationError(msg)

        if isinstance(key, str):
            key = key.encode()

        if not key:
            msg = "key must not be empty"
            raise ConfigurationError(msg)

        if self._is_valid_fernet_key(key):
            self._passphrase_key: bytes | None = None
            self._legacy_fernet = Fernet(key)
            self._is_passphrase_mode = False
        else:
            self._passphrase_key = self._derive_master_key(key, _PBKDF2_ITERATIONS)
            legacy_raw = self._derive_master_key(key, _PBKDF2_ITERATIONS_LEGACY)
            self._legacy_fernet = Fernet(base64.urlsafe_b64encode(legacy_raw))
            self._is_passphrase_mode = True

    @property
    def backend_id(self) -> int:
        """Unique backend identifier for the envelope header.

        Returns:
            ``BACKEND_FERNET`` (``0x01``).
        """
        return BACKEND_FERNET

    def sync_encrypt(self, plaintext: bytes) -> bytes:
        """Encrypt plaintext bytes synchronously.

        In passphrase mode, generates a fresh 16-byte random salt,
        derives a per-operation Fernet key via HKDF, and returns
        ``SALT_MARKER || salt || fernet_token``. In direct-key mode,
        returns a standard Fernet token.

        Args:
            plaintext: Raw bytes to encrypt.

        Returns:
            Encrypted ciphertext as bytes.

        Raises:
            TypeError: If *plaintext* is not ``bytes``.

        Examples:
            ```python
            ciphertext = backend.sync_encrypt(b"hello")
            ```
        """
        if not isinstance(plaintext, bytes):
            msg = f"plaintext must be bytes, got {type(plaintext).__name__}"
            raise TypeError(msg)

        if not self._is_passphrase_mode:
            return self._legacy_fernet.encrypt(plaintext)

        salt = os.urandom(_SALT_LENGTH)
        per_op_key = self._derive_per_op_key(salt)
        token = Fernet(per_op_key).encrypt(plaintext)
        return _SALT_MARKER + salt + token

    def sync_decrypt(self, ciphertext: bytes) -> bytes:
        """Decrypt ciphertext bytes synchronously.

        Detects the ciphertext format via the first byte:

        - ``0x01``: new salted format — extracts salt, derives per-op
          key via HKDF, decrypts the Fernet token.
        - ``>= 0x2B``: legacy Fernet token — decrypts with the legacy
          Fernet instance.
        - Otherwise: raises ``DecryptionError``.

        Args:
            ciphertext: Encrypted bytes to decrypt.

        Returns:
            Decrypted plaintext as bytes.

        Raises:
            TypeError: If *ciphertext* is not ``bytes``.
            DecryptionError: If decryption fails due to wrong key,
                tampered ciphertext, or malformed input.

        Examples:
            ```python
            plaintext = backend.sync_decrypt(ciphertext)
            ```
        """
        if not isinstance(ciphertext, bytes):
            msg = f"ciphertext must be bytes, got {type(ciphertext).__name__}"
            raise TypeError(msg)

        if not ciphertext:
            msg = _DECRYPT_FAILED_MSG
            raise DecryptionError(msg)

        if ciphertext[0:1] == _SALT_MARKER and len(ciphertext) >= _MIN_SALTED_LENGTH:
            return self._decrypt_salted(ciphertext)

        if ciphertext[0] >= 0x2B:
            return self._decrypt_legacy(ciphertext)

        msg = "Decryption failed: invalid ciphertext format"
        raise DecryptionError(msg)

    async def encrypt(self, plaintext: bytes) -> bytes:
        """Encrypt plaintext bytes asynchronously.

        Delegates to ``sync_encrypt`` via ``asyncio.to_thread()``.

        Args:
            plaintext: Raw bytes to encrypt.

        Returns:
            Encrypted ciphertext as bytes.

        Raises:
            TypeError: If *plaintext* is not ``bytes``.

        Examples:
            ```python
            ciphertext = await backend.encrypt(b"hello")
            ```
        """
        if not isinstance(plaintext, bytes):
            msg = f"plaintext must be bytes, got {type(plaintext).__name__}"
            raise TypeError(msg)

        return await asyncio.to_thread(self.sync_encrypt, plaintext)

    async def decrypt(self, ciphertext: bytes) -> bytes:
        """Decrypt ciphertext bytes asynchronously.

        Delegates to ``sync_decrypt`` via ``asyncio.to_thread()``.

        Args:
            ciphertext: Encrypted bytes to decrypt.

        Returns:
            Decrypted plaintext as bytes.

        Raises:
            TypeError: If *ciphertext* is not ``bytes``.
            DecryptionError: If decryption fails due to wrong key,
                tampered ciphertext, or malformed input.

        Examples:
            ```python
            plaintext = await backend.decrypt(ciphertext)
            ```
        """
        if not isinstance(ciphertext, bytes):
            msg = f"ciphertext must be bytes, got {type(ciphertext).__name__}"
            raise TypeError(msg)

        return await asyncio.to_thread(self.sync_decrypt, ciphertext)

    def _derive_per_op_key(self, salt: bytes) -> bytes:
        """Derive a per-operation Fernet key via HKDF-SHA256.

        Validates that a master key exists, then expands it with the
        given salt to produce a unique Fernet key.

        Args:
            salt: Random salt for HKDF domain separation.

        Returns:
            A valid base64url-encoded 32-byte Fernet key.

        Raises:
            DecryptionError: If no master key is available (direct-key mode).
        """
        master = self._passphrase_key
        if master is None:
            msg = _DECRYPT_FAILED_MSG
            raise DecryptionError(msg)

        raw = HKDF(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            info=_HKDF_INFO,
        ).derive(master)
        return base64.urlsafe_b64encode(raw)

    def _decrypt_salted(self, ciphertext: bytes) -> bytes:
        """Decrypt new-format salted ciphertext.

        Extracts the 16-byte salt, derives a per-op key via HKDF,
        and decrypts the remaining Fernet token.

        Args:
            ciphertext: Full ciphertext including marker and salt.

        Returns:
            Decrypted plaintext.

        Raises:
            DecryptionError: If decryption fails.
        """
        salt = ciphertext[1 : 1 + _SALT_LENGTH]
        token = ciphertext[1 + _SALT_LENGTH :]

        if self._passphrase_key is None:
            msg = _DECRYPT_FAILED_MSG
            raise DecryptionError(msg)

        per_op_key = self._derive_per_op_key(salt)
        try:
            return Fernet(per_op_key).decrypt(token)
        except InvalidToken:
            msg = _DECRYPT_FAILED_MSG
            raise DecryptionError(msg) from None

    def _decrypt_legacy(self, ciphertext: bytes) -> bytes:
        """Decrypt legacy Fernet token (pre-3.2 or direct-key mode).

        Delegates to ``self._legacy_fernet`` which uses the 480k-iteration
        PBKDF2-derived key (passphrase mode) or direct Fernet key.

        Args:
            ciphertext: Legacy Fernet token bytes.

        Returns:
            Decrypted plaintext.

        Raises:
            DecryptionError: If decryption fails.
        """
        try:
            return self._legacy_fernet.decrypt(ciphertext)
        except InvalidToken:
            msg = _DECRYPT_FAILED_MSG
            raise DecryptionError(msg) from None

    @staticmethod
    def _is_valid_fernet_key(key_bytes: bytes) -> bool:
        """Check whether key_bytes is a valid Fernet key.

        Attempts to construct a ``Fernet`` instance. Returns ``False``
        on ``ValueError`` (wrong key length) or ``binascii.Error``
        (non-base64 input).

        Args:
            key_bytes: Raw key material.

        Returns:
            True if the bytes are a valid Fernet key.
        """
        try:
            Fernet(key_bytes)
            return True
        except (ValueError, binascii.Error):
            return False

    @staticmethod
    def _derive_master_key(passphrase: bytes, iterations: int) -> bytes:
        """Derive a 32-byte master key via PBKDF2-HMAC-SHA256.

        Args:
            passphrase: Raw passphrase bytes.
            iterations: Number of PBKDF2 iterations.

        Returns:
            32-byte raw derived key.
        """
        return hashlib.pbkdf2_hmac(
            "sha256",
            passphrase,
            _PBKDF2_SALT,
            iterations,
        )

backend_id property

backend_id: int

Unique backend identifier for the envelope header.

RETURNS DESCRIPTION
int

BACKEND_FERNET (0x01).

__init__

__init__(key: str | bytes) -> None

Initialize FernetBackend with the given key.

PARAMETER DESCRIPTION
key

Encryption key as str or bytes. A valid Fernet key is used directly; arbitrary input is derived via two-phase PBKDF2 + HKDF.

TYPE: str | bytes

RAISES DESCRIPTION
ConfigurationError

If key is not str or bytes, or if key is empty.

Source code in src/adk_secure_sessions/backends/fernet.py
def __init__(self, key: str | bytes) -> None:
    """Initialize FernetBackend with the given key.

    Args:
        key: Encryption key as ``str`` or ``bytes``. A valid Fernet
            key is used directly; arbitrary input is derived via
            two-phase PBKDF2 + HKDF.

    Raises:
        ConfigurationError: If *key* is not ``str`` or ``bytes``,
            or if *key* is empty.
    """
    if not isinstance(key, (str, bytes)):
        msg = f"key must be str or bytes, got {type(key).__name__}"
        raise ConfigurationError(msg)

    if isinstance(key, str):
        key = key.encode()

    if not key:
        msg = "key must not be empty"
        raise ConfigurationError(msg)

    if self._is_valid_fernet_key(key):
        self._passphrase_key: bytes | None = None
        self._legacy_fernet = Fernet(key)
        self._is_passphrase_mode = False
    else:
        self._passphrase_key = self._derive_master_key(key, _PBKDF2_ITERATIONS)
        legacy_raw = self._derive_master_key(key, _PBKDF2_ITERATIONS_LEGACY)
        self._legacy_fernet = Fernet(base64.urlsafe_b64encode(legacy_raw))
        self._is_passphrase_mode = True

sync_encrypt

sync_encrypt(plaintext: bytes) -> bytes

Encrypt plaintext bytes synchronously.

In passphrase mode, generates a fresh 16-byte random salt, derives a per-operation Fernet key via HKDF, and returns SALT_MARKER || salt || fernet_token. In direct-key mode, returns a standard Fernet token.

PARAMETER DESCRIPTION
plaintext

Raw bytes to encrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Encrypted ciphertext as bytes.

RAISES DESCRIPTION
TypeError

If plaintext is not bytes.

Examples:

ciphertext = backend.sync_encrypt(b"hello")
Source code in src/adk_secure_sessions/backends/fernet.py
def sync_encrypt(self, plaintext: bytes) -> bytes:
    """Encrypt plaintext bytes synchronously.

    In passphrase mode, generates a fresh 16-byte random salt,
    derives a per-operation Fernet key via HKDF, and returns
    ``SALT_MARKER || salt || fernet_token``. In direct-key mode,
    returns a standard Fernet token.

    Args:
        plaintext: Raw bytes to encrypt.

    Returns:
        Encrypted ciphertext as bytes.

    Raises:
        TypeError: If *plaintext* is not ``bytes``.

    Examples:
        ```python
        ciphertext = backend.sync_encrypt(b"hello")
        ```
    """
    if not isinstance(plaintext, bytes):
        msg = f"plaintext must be bytes, got {type(plaintext).__name__}"
        raise TypeError(msg)

    if not self._is_passphrase_mode:
        return self._legacy_fernet.encrypt(plaintext)

    salt = os.urandom(_SALT_LENGTH)
    per_op_key = self._derive_per_op_key(salt)
    token = Fernet(per_op_key).encrypt(plaintext)
    return _SALT_MARKER + salt + token

sync_decrypt

sync_decrypt(ciphertext: bytes) -> bytes

Decrypt ciphertext bytes synchronously.

Detects the ciphertext format via the first byte:

  • 0x01: new salted format — extracts salt, derives per-op key via HKDF, decrypts the Fernet token.
  • >= 0x2B: legacy Fernet token — decrypts with the legacy Fernet instance.
  • Otherwise: raises DecryptionError.
PARAMETER DESCRIPTION
ciphertext

Encrypted bytes to decrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Decrypted plaintext as bytes.

RAISES DESCRIPTION
TypeError

If ciphertext is not bytes.

DecryptionError

If decryption fails due to wrong key, tampered ciphertext, or malformed input.

Examples:

plaintext = backend.sync_decrypt(ciphertext)
Source code in src/adk_secure_sessions/backends/fernet.py
def sync_decrypt(self, ciphertext: bytes) -> bytes:
    """Decrypt ciphertext bytes synchronously.

    Detects the ciphertext format via the first byte:

    - ``0x01``: new salted format — extracts salt, derives per-op
      key via HKDF, decrypts the Fernet token.
    - ``>= 0x2B``: legacy Fernet token — decrypts with the legacy
      Fernet instance.
    - Otherwise: raises ``DecryptionError``.

    Args:
        ciphertext: Encrypted bytes to decrypt.

    Returns:
        Decrypted plaintext as bytes.

    Raises:
        TypeError: If *ciphertext* is not ``bytes``.
        DecryptionError: If decryption fails due to wrong key,
            tampered ciphertext, or malformed input.

    Examples:
        ```python
        plaintext = backend.sync_decrypt(ciphertext)
        ```
    """
    if not isinstance(ciphertext, bytes):
        msg = f"ciphertext must be bytes, got {type(ciphertext).__name__}"
        raise TypeError(msg)

    if not ciphertext:
        msg = _DECRYPT_FAILED_MSG
        raise DecryptionError(msg)

    if ciphertext[0:1] == _SALT_MARKER and len(ciphertext) >= _MIN_SALTED_LENGTH:
        return self._decrypt_salted(ciphertext)

    if ciphertext[0] >= 0x2B:
        return self._decrypt_legacy(ciphertext)

    msg = "Decryption failed: invalid ciphertext format"
    raise DecryptionError(msg)

encrypt async

encrypt(plaintext: bytes) -> bytes

Encrypt plaintext bytes asynchronously.

Delegates to sync_encrypt via asyncio.to_thread().

PARAMETER DESCRIPTION
plaintext

Raw bytes to encrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Encrypted ciphertext as bytes.

RAISES DESCRIPTION
TypeError

If plaintext is not bytes.

Examples:

ciphertext = await backend.encrypt(b"hello")
Source code in src/adk_secure_sessions/backends/fernet.py
async def encrypt(self, plaintext: bytes) -> bytes:
    """Encrypt plaintext bytes asynchronously.

    Delegates to ``sync_encrypt`` via ``asyncio.to_thread()``.

    Args:
        plaintext: Raw bytes to encrypt.

    Returns:
        Encrypted ciphertext as bytes.

    Raises:
        TypeError: If *plaintext* is not ``bytes``.

    Examples:
        ```python
        ciphertext = await backend.encrypt(b"hello")
        ```
    """
    if not isinstance(plaintext, bytes):
        msg = f"plaintext must be bytes, got {type(plaintext).__name__}"
        raise TypeError(msg)

    return await asyncio.to_thread(self.sync_encrypt, plaintext)

decrypt async

decrypt(ciphertext: bytes) -> bytes

Decrypt ciphertext bytes asynchronously.

Delegates to sync_decrypt via asyncio.to_thread().

PARAMETER DESCRIPTION
ciphertext

Encrypted bytes to decrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Decrypted plaintext as bytes.

RAISES DESCRIPTION
TypeError

If ciphertext is not bytes.

DecryptionError

If decryption fails due to wrong key, tampered ciphertext, or malformed input.

Examples:

plaintext = await backend.decrypt(ciphertext)
Source code in src/adk_secure_sessions/backends/fernet.py
async def decrypt(self, ciphertext: bytes) -> bytes:
    """Decrypt ciphertext bytes asynchronously.

    Delegates to ``sync_decrypt`` via ``asyncio.to_thread()``.

    Args:
        ciphertext: Encrypted bytes to decrypt.

    Returns:
        Decrypted plaintext as bytes.

    Raises:
        TypeError: If *ciphertext* is not ``bytes``.
        DecryptionError: If decryption fails due to wrong key,
            tampered ciphertext, or malformed input.

    Examples:
        ```python
        plaintext = await backend.decrypt(ciphertext)
        ```
    """
    if not isinstance(ciphertext, bytes):
        msg = f"ciphertext must be bytes, got {type(ciphertext).__name__}"
        raise TypeError(msg)

    return await asyncio.to_thread(self.sync_decrypt, ciphertext)

ConfigurationError

Bases: SecureSessionError


              flowchart TD
              adk_secure_sessions.ConfigurationError[ConfigurationError]
              adk_secure_sessions.exceptions.SecureSessionError[SecureSessionError]

                              adk_secure_sessions.exceptions.SecureSessionError --> adk_secure_sessions.ConfigurationError
                


              click adk_secure_sessions.ConfigurationError href "" "adk_secure_sessions.ConfigurationError"
              click adk_secure_sessions.exceptions.SecureSessionError href "" "adk_secure_sessions.exceptions.SecureSessionError"
            

Raised when the service is misconfigured at startup.

Covers invalid encryption keys, backends that do not conform to the EncryptionBackend protocol, invalid backend IDs, empty database paths, and database connection failures. Error messages never include key material or other sensitive data.

Examples:

Handle configuration failures at startup:

try:
    service = EncryptedSessionService(
        db_url="sqlite+aiosqlite:///sessions.db",
        backend=my_backend,
    )
except ConfigurationError as exc:
    log.error("Service misconfigured: %s", exc)
Source code in src/adk_secure_sessions/exceptions.py
class ConfigurationError(SecureSessionError):
    """Raised when the service is misconfigured at startup.

    Covers invalid encryption keys, backends that do not conform to the
    ``EncryptionBackend`` protocol, invalid backend IDs, empty database
    paths, and database connection failures. Error messages never include
    key material or other sensitive data.

    Examples:
        Handle configuration failures at startup:

        ```python
        try:
            service = EncryptedSessionService(
                db_url="sqlite+aiosqlite:///sessions.db",
                backend=my_backend,
            )
        except ConfigurationError as exc:
            log.error("Service misconfigured: %s", exc)
        ```
    """

DecryptionError

Bases: SecureSessionError, DontWrapMixin


              flowchart TD
              adk_secure_sessions.DecryptionError[DecryptionError]
              adk_secure_sessions.exceptions.SecureSessionError[SecureSessionError]

                              adk_secure_sessions.exceptions.SecureSessionError --> adk_secure_sessions.DecryptionError
                


              click adk_secure_sessions.DecryptionError href "" "adk_secure_sessions.DecryptionError"
              click adk_secure_sessions.exceptions.SecureSessionError href "" "adk_secure_sessions.exceptions.SecureSessionError"
            

Raised when decryption fails.

Inherits DontWrapMixin so SQLAlchemy propagates this directly instead of wrapping it in StatementError.

Possible causes include a wrong key, tampered ciphertext, or malformed input. Error messages intentionally exclude key material, ciphertext, and plaintext to prevent information leakage.

Examples:

Handle decryption failures specifically:

try:
    plaintext = await backend.decrypt(ciphertext)
except DecryptionError:
    log.error("Decryption failed, check key")
Source code in src/adk_secure_sessions/exceptions.py
class DecryptionError(SecureSessionError, DontWrapMixin):
    """Raised when decryption fails.

    Inherits ``DontWrapMixin`` so SQLAlchemy propagates this directly
    instead of wrapping it in ``StatementError``.

    Possible causes include a wrong key, tampered ciphertext, or
    malformed input. Error messages intentionally exclude key material,
    ciphertext, and plaintext to prevent information leakage.

    Examples:
        Handle decryption failures specifically:

        ```python
        try:
            plaintext = await backend.decrypt(ciphertext)
        except DecryptionError:
            log.error("Decryption failed, check key")
        ```
    """

EncryptionError

Bases: SecureSessionError, DontWrapMixin


              flowchart TD
              adk_secure_sessions.EncryptionError[EncryptionError]
              adk_secure_sessions.exceptions.SecureSessionError[SecureSessionError]

                              adk_secure_sessions.exceptions.SecureSessionError --> adk_secure_sessions.EncryptionError
                


              click adk_secure_sessions.EncryptionError href "" "adk_secure_sessions.EncryptionError"
              click adk_secure_sessions.exceptions.SecureSessionError href "" "adk_secure_sessions.exceptions.SecureSessionError"
            

Raised when encryption fails.

Inherits DontWrapMixin so SQLAlchemy propagates this directly instead of wrapping it in StatementError.

Possible causes include invalid plaintext input or backend-specific errors. Error messages intentionally exclude key material, plaintext, and ciphertext to prevent information leakage.

Examples:

Handle encryption failures specifically:

try:
    ciphertext = await backend.encrypt(plaintext)
except EncryptionError:
    log.error("Encryption failed")
Source code in src/adk_secure_sessions/exceptions.py
class EncryptionError(SecureSessionError, DontWrapMixin):
    """Raised when encryption fails.

    Inherits ``DontWrapMixin`` so SQLAlchemy propagates this directly
    instead of wrapping it in ``StatementError``.

    Possible causes include invalid plaintext input or backend-specific
    errors. Error messages intentionally exclude key material, plaintext,
    and ciphertext to prevent information leakage.

    Examples:
        Handle encryption failures specifically:

        ```python
        try:
            ciphertext = await backend.encrypt(plaintext)
        except EncryptionError:
            log.error("Encryption failed")
        ```
    """

SecureSessionError

Bases: Exception


              flowchart TD
              adk_secure_sessions.SecureSessionError[SecureSessionError]

              

              click adk_secure_sessions.SecureSessionError href "" "adk_secure_sessions.SecureSessionError"
            

Base exception for all adk-secure-sessions errors.

All library-specific exceptions inherit from this class so callers can use a single except SecureSessionError clause to handle any failure originating from this package.

Examples:

Catch any library error regardless of type:

try:
    await backend.encrypt(plaintext)
except SecureSessionError:
    log.error("adk-secure-sessions operation failed")
Source code in src/adk_secure_sessions/exceptions.py
class SecureSessionError(Exception):
    """Base exception for all adk-secure-sessions errors.

    All library-specific exceptions inherit from this class so callers
    can use a single ``except SecureSessionError`` clause to handle any
    failure originating from this package.

    Examples:
        Catch any library error regardless of type:

        ```python
        try:
            await backend.encrypt(plaintext)
        except SecureSessionError:
            log.error("adk-secure-sessions operation failed")
        ```
    """

SerializationError

Bases: SecureSessionError, DontWrapMixin


              flowchart TD
              adk_secure_sessions.SerializationError[SerializationError]
              adk_secure_sessions.exceptions.SecureSessionError[SecureSessionError]

                              adk_secure_sessions.exceptions.SecureSessionError --> adk_secure_sessions.SerializationError
                


              click adk_secure_sessions.SerializationError href "" "adk_secure_sessions.SerializationError"
              click adk_secure_sessions.exceptions.SecureSessionError href "" "adk_secure_sessions.exceptions.SecureSessionError"
            

Raised when data cannot be serialized to JSON.

Inherits DontWrapMixin so SQLAlchemy propagates this directly instead of wrapping it in StatementError.

This indicates a caller bug — the input contains types that are not JSON-serializable (e.g., datetime, custom objects). This is distinct from encryption/decryption failures which indicate configuration or data integrity issues.

Examples:

Handle serialization failures:

try:
    envelope = await encrypt_session(data, backend, backend_id)
except SerializationError:
    log.error("Data contains non-JSON-serializable values")
Source code in src/adk_secure_sessions/exceptions.py
class SerializationError(SecureSessionError, DontWrapMixin):
    """Raised when data cannot be serialized to JSON.

    Inherits ``DontWrapMixin`` so SQLAlchemy propagates this directly
    instead of wrapping it in ``StatementError``.

    This indicates a caller bug — the input contains types that are not
    JSON-serializable (e.g., ``datetime``, custom objects). This is
    distinct from encryption/decryption failures which indicate
    configuration or data integrity issues.

    Examples:
        Handle serialization failures:

        ```python
        try:
            envelope = await encrypt_session(data, backend, backend_id)
        except SerializationError:
            log.error("Data contains non-JSON-serializable values")
        ```
    """

EncryptionBackend

Bases: Protocol


              flowchart TD
              adk_secure_sessions.EncryptionBackend[EncryptionBackend]

              

              click adk_secure_sessions.EncryptionBackend href "" "adk_secure_sessions.EncryptionBackend"
            

Contract for all encryption backends.

Implementors provide:

  • backend_id — read-only property returning the unique envelope backend identifier byte.
  • sync_encrypt / sync_decrypt — synchronous methods used by SQLAlchemy TypeDecorators in sync contexts.
  • encrypt / decrypt — async wrappers for use in async application code.

The session service uses this protocol for runtime validation at initialization and static type checkers verify conformance at analysis time.

This is a typing.Protocol — not an abstract base class. Conforming classes do not need to inherit from or import this protocol.

Known limitations of @runtime_checkable:

  • isinstance() checks verify method existence only. It does not validate parameter types, return types, or whether methods are coroutines. Use a static type checker (mypy, pyright) for full signature validation.

Examples:

Define a minimal conforming backend:

from adk_secure_sessions.protocols import EncryptionBackend


class MyBackend:
    @property
    def backend_id(self) -> int:
        return 0x10

    def sync_encrypt(self, plaintext: bytes) -> bytes:
        return plaintext  # replace with real encryption

    def sync_decrypt(self, ciphertext: bytes) -> bytes:
        return ciphertext  # replace with real decryption

    async def encrypt(self, plaintext: bytes) -> bytes:
        return self.sync_encrypt(plaintext)

    async def decrypt(self, ciphertext: bytes) -> bytes:
        return self.sync_decrypt(ciphertext)


assert isinstance(MyBackend(), EncryptionBackend)
Source code in src/adk_secure_sessions/protocols.py
@runtime_checkable
class EncryptionBackend(Protocol):
    """Contract for all encryption backends.

    Implementors provide:

    * ``backend_id`` — read-only property returning the unique
      envelope backend identifier byte.
    * ``sync_encrypt`` / ``sync_decrypt`` — synchronous methods
      used by SQLAlchemy TypeDecorators in sync contexts.
    * ``encrypt`` / ``decrypt`` — async wrappers for use in
      async application code.

    The session service uses this protocol for runtime validation
    at initialization and static type checkers verify conformance
    at analysis time.

    This is a ``typing.Protocol`` — not an abstract base class.
    Conforming classes do not need to inherit from or import this
    protocol.

    Known limitations of ``@runtime_checkable``:

    * ``isinstance()`` checks verify method **existence** only. It
      does not validate parameter types, return types, or whether
      methods are coroutines. Use a static type checker (mypy,
      pyright) for full signature validation.

    Examples:
        Define a minimal conforming backend:

        ```python
        from adk_secure_sessions.protocols import EncryptionBackend


        class MyBackend:
            @property
            def backend_id(self) -> int:
                return 0x10

            def sync_encrypt(self, plaintext: bytes) -> bytes:
                return plaintext  # replace with real encryption

            def sync_decrypt(self, ciphertext: bytes) -> bytes:
                return ciphertext  # replace with real decryption

            async def encrypt(self, plaintext: bytes) -> bytes:
                return self.sync_encrypt(plaintext)

            async def decrypt(self, ciphertext: bytes) -> bytes:
                return self.sync_decrypt(ciphertext)


        assert isinstance(MyBackend(), EncryptionBackend)
        ```
    """

    @property
    def backend_id(self) -> int:
        """Unique backend identifier byte for the envelope header.

        Returns:
            Integer backend ID (e.g., ``0x01`` for Fernet,
            ``0x02`` for AES-GCM).

        Examples:
            ```python
            assert backend.backend_id == 0x01
            ```
        """
        ...

    def sync_encrypt(self, plaintext: bytes) -> bytes:
        """Encrypt plaintext bytes synchronously.

        Used by SQLAlchemy TypeDecorators that operate in sync
        contexts. The async ``encrypt`` method should delegate
        to this via ``asyncio.to_thread()``.

        Args:
            plaintext: Raw bytes to encrypt.

        Returns:
            Encrypted ciphertext as bytes.

        Examples:
            ```python
            ciphertext = backend.sync_encrypt(b"hello")
            ```
        """
        ...

    def sync_decrypt(self, ciphertext: bytes) -> bytes:
        """Decrypt ciphertext bytes synchronously.

        Used by SQLAlchemy TypeDecorators that operate in sync
        contexts. The async ``decrypt`` method should delegate
        to this via ``asyncio.to_thread()``.

        Args:
            ciphertext: Encrypted bytes to decrypt.

        Returns:
            Decrypted plaintext as bytes.

        Examples:
            ```python
            plaintext = backend.sync_decrypt(ciphertext)
            ```
        """
        ...

    async def encrypt(self, plaintext: bytes) -> bytes:
        """Encrypt plaintext bytes asynchronously.

        Args:
            plaintext: Raw bytes to encrypt.

        Returns:
            Encrypted ciphertext as bytes.

        Examples:
            ```python
            ciphertext = await backend.encrypt(b"hello")
            ```
        """
        ...

    async def decrypt(self, ciphertext: bytes) -> bytes:
        """Decrypt ciphertext bytes asynchronously.

        Args:
            ciphertext: Encrypted bytes to decrypt.

        Returns:
            Decrypted plaintext as bytes.

        Examples:
            ```python
            plaintext = await backend.decrypt(ciphertext)
            ```
        """
        ...

backend_id property

backend_id: int

Unique backend identifier byte for the envelope header.

RETURNS DESCRIPTION
int

Integer backend ID (e.g., 0x01 for Fernet,

int

0x02 for AES-GCM).

Examples:

assert backend.backend_id == 0x01

sync_encrypt

sync_encrypt(plaintext: bytes) -> bytes

Encrypt plaintext bytes synchronously.

Used by SQLAlchemy TypeDecorators that operate in sync contexts. The async encrypt method should delegate to this via asyncio.to_thread().

PARAMETER DESCRIPTION
plaintext

Raw bytes to encrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Encrypted ciphertext as bytes.

Examples:

ciphertext = backend.sync_encrypt(b"hello")
Source code in src/adk_secure_sessions/protocols.py
def sync_encrypt(self, plaintext: bytes) -> bytes:
    """Encrypt plaintext bytes synchronously.

    Used by SQLAlchemy TypeDecorators that operate in sync
    contexts. The async ``encrypt`` method should delegate
    to this via ``asyncio.to_thread()``.

    Args:
        plaintext: Raw bytes to encrypt.

    Returns:
        Encrypted ciphertext as bytes.

    Examples:
        ```python
        ciphertext = backend.sync_encrypt(b"hello")
        ```
    """
    ...

sync_decrypt

sync_decrypt(ciphertext: bytes) -> bytes

Decrypt ciphertext bytes synchronously.

Used by SQLAlchemy TypeDecorators that operate in sync contexts. The async decrypt method should delegate to this via asyncio.to_thread().

PARAMETER DESCRIPTION
ciphertext

Encrypted bytes to decrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Decrypted plaintext as bytes.

Examples:

plaintext = backend.sync_decrypt(ciphertext)
Source code in src/adk_secure_sessions/protocols.py
def sync_decrypt(self, ciphertext: bytes) -> bytes:
    """Decrypt ciphertext bytes synchronously.

    Used by SQLAlchemy TypeDecorators that operate in sync
    contexts. The async ``decrypt`` method should delegate
    to this via ``asyncio.to_thread()``.

    Args:
        ciphertext: Encrypted bytes to decrypt.

    Returns:
        Decrypted plaintext as bytes.

    Examples:
        ```python
        plaintext = backend.sync_decrypt(ciphertext)
        ```
    """
    ...

encrypt async

encrypt(plaintext: bytes) -> bytes

Encrypt plaintext bytes asynchronously.

PARAMETER DESCRIPTION
plaintext

Raw bytes to encrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Encrypted ciphertext as bytes.

Examples:

ciphertext = await backend.encrypt(b"hello")
Source code in src/adk_secure_sessions/protocols.py
async def encrypt(self, plaintext: bytes) -> bytes:
    """Encrypt plaintext bytes asynchronously.

    Args:
        plaintext: Raw bytes to encrypt.

    Returns:
        Encrypted ciphertext as bytes.

    Examples:
        ```python
        ciphertext = await backend.encrypt(b"hello")
        ```
    """
    ...

decrypt async

decrypt(ciphertext: bytes) -> bytes

Decrypt ciphertext bytes asynchronously.

PARAMETER DESCRIPTION
ciphertext

Encrypted bytes to decrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Decrypted plaintext as bytes.

Examples:

plaintext = await backend.decrypt(ciphertext)
Source code in src/adk_secure_sessions/protocols.py
async def decrypt(self, ciphertext: bytes) -> bytes:
    """Decrypt ciphertext bytes asynchronously.

    Args:
        ciphertext: Encrypted bytes to decrypt.

    Returns:
        Decrypted plaintext as bytes.

    Examples:
        ```python
        plaintext = await backend.decrypt(ciphertext)
        ```
    """
    ...

EncryptedSessionService

Bases: DatabaseSessionService


              flowchart TD
              adk_secure_sessions.EncryptedSessionService[EncryptedSessionService]

              

              click adk_secure_sessions.EncryptedSessionService href "" "adk_secure_sessions.EncryptedSessionService"
            

Encrypted session service wrapping DatabaseSessionService.

Subclasses ADK's DatabaseSessionService to inject encrypted SQLAlchemy models via _get_schema_classes() and _prepare_tables(). All CRUD methods (create_session, get_session, list_sessions, delete_session, append_event) are inherited without modification.

Supports multiple encryption backends for incremental migration. The backend parameter is the primary backend used for new writes. The additional_backends parameter provides legacy decrypt capability. Backends are fixed after construction — they cannot be added or removed post-init.

ATTRIBUTE DESCRIPTION
db_engine

The SQLAlchemy async engine (inherited).

TYPE: AsyncEngine

Examples:

Create a service with SQLite:

from adk_secure_sessions import FernetBackend, EncryptedSessionService

backend = FernetBackend("my-secret-passphrase")
service = EncryptedSessionService(
    db_url="sqlite+aiosqlite:///sessions.db",
    backend=backend,
)
session = await service.create_session(
    app_name="my-agent",
    user_id="user-123",
    state={"secret": "sensitive-data"},
)

Multi-backend migration (new writes use AES-GCM, legacy Fernet sessions remain readable):

from adk_secure_sessions import (
    AesGcmBackend,
    FernetBackend,
    EncryptedSessionService,
)

from cryptography.hazmat.primitives.ciphers.aead import AESGCM

fernet = FernetBackend("old-passphrase")
aes_gcm = AesGcmBackend(key=AESGCM.generate_key(bit_length=256))
service = EncryptedSessionService(
    db_url="sqlite+aiosqlite:///sessions.db",
    backend=aes_gcm,
    additional_backends=[fernet],
)
Source code in src/adk_secure_sessions/services/encrypted_session.py
class EncryptedSessionService(DatabaseSessionService):
    """Encrypted session service wrapping DatabaseSessionService.

    Subclasses ADK's ``DatabaseSessionService`` to inject encrypted
    SQLAlchemy models via ``_get_schema_classes()`` and
    ``_prepare_tables()``. All CRUD methods (``create_session``,
    ``get_session``, ``list_sessions``, ``delete_session``,
    ``append_event``) are inherited without modification.

    Supports multiple encryption backends for incremental migration.
    The ``backend`` parameter is the primary backend used for new writes.
    The ``additional_backends`` parameter provides legacy decrypt
    capability. Backends are fixed after construction — they cannot be
    added or removed post-init.

    Attributes:
        db_engine (AsyncEngine): The SQLAlchemy async engine (inherited).

    Examples:
        Create a service with SQLite:

        ```python
        from adk_secure_sessions import FernetBackend, EncryptedSessionService

        backend = FernetBackend("my-secret-passphrase")
        service = EncryptedSessionService(
            db_url="sqlite+aiosqlite:///sessions.db",
            backend=backend,
        )
        session = await service.create_session(
            app_name="my-agent",
            user_id="user-123",
            state={"secret": "sensitive-data"},
        )
        ```

        Multi-backend migration (new writes use AES-GCM, legacy Fernet
        sessions remain readable):

        ```python
        from adk_secure_sessions import (
            AesGcmBackend,
            FernetBackend,
            EncryptedSessionService,
        )

        from cryptography.hazmat.primitives.ciphers.aead import AESGCM

        fernet = FernetBackend("old-passphrase")
        aes_gcm = AesGcmBackend(key=AESGCM.generate_key(bit_length=256))
        service = EncryptedSessionService(
            db_url="sqlite+aiosqlite:///sessions.db",
            backend=aes_gcm,
            additional_backends=[fernet],
        )
        ```
    """

    def __init__(
        self,
        db_url: str,
        backend: EncryptionBackend,
        additional_backends: Sequence[EncryptionBackend] = (),
        **kwargs: Any,
    ) -> None:
        """Initialize the encrypted session service.

        Uses ``backend.sync_encrypt``, ``backend.sync_decrypt``, and
        ``backend.backend_id`` from the protocol to configure the
        ``EncryptedJSON`` TypeDecorator. When ``additional_backends``
        are provided, their decrypt functions are included in the
        dispatch map for reading legacy-encrypted data.

        Backends are fixed after construction. The ``cache_ok = True``
        on ``EncryptedJSON`` means SQLAlchemy may cache the type
        instance — post-init mutation would be a correctness bug.

        Args:
            db_url: SQLAlchemy connection string (e.g.,
                ``"sqlite+aiosqlite:///sessions.db"``).
            backend: Primary encryption backend. Used for all new writes
                and included in the decrypt dispatch map.
            additional_backends: Extra backends for decrypt-only
                dispatch. Each must conform to ``EncryptionBackend``
                and have a unique ``backend_id``.

        Other Parameters:
            **kwargs: Additional keyword arguments passed to
                ``DatabaseSessionService.__init__``.

        Raises:
            ConfigurationError: If *backend* or any entry in
                *additional_backends* does not conform to
                ``EncryptionBackend``, or if duplicate ``backend_id``
                values are detected.
        """
        if not isinstance(backend, EncryptionBackend):
            msg = (
                f"backend must conform to EncryptionBackend protocol, "
                f"got {type(backend).__name__}"
            )
            raise ConfigurationError(msg)

        for extra in additional_backends:
            if not isinstance(extra, EncryptionBackend):
                msg = (
                    f"additional_backends entries must conform to "
                    f"EncryptionBackend protocol, got {type(extra).__name__}"
                )
                raise ConfigurationError(msg)

        all_backends = [backend, *additional_backends]
        seen_ids: set[int] = set()
        for b in all_backends:
            if b.backend_id in seen_ids:
                msg = (
                    f"Duplicate backend_id {b.backend_id:#04x} — each "
                    f"backend must have a unique backend_id"
                )
                raise ConfigurationError(msg)
            seen_ids.add(b.backend_id)

        decrypt_dispatch = {b.backend_id: b.sync_decrypt for b in all_backends}

        self._encrypted_json = EncryptedJSON(
            encrypt_fn=backend.sync_encrypt,
            backend_id=backend.backend_id,
            decrypt_dispatch=decrypt_dispatch,
        )
        self._encrypted_base, self._encrypted_schema = create_encrypted_models(
            self._encrypted_json
        )

        super().__init__(db_url=db_url, **kwargs)

    def _get_schema_classes(self) -> _EncryptedSchemaClasses:  # type: ignore[override]
        """Return encrypted model classes for CRUD operations.

        Returns:
            Duck-typed schema classes with encrypted models.
        """
        return self._encrypted_schema

    async def _prepare_tables(self) -> None:
        """Create encrypted tables using custom DeclarativeBase metadata.

        Overrides the parent method to use our encrypted model metadata
        instead of ADK's built-in schema.
        """
        if self._tables_created:
            return

        async with self._table_creation_lock:
            if self._tables_created:
                return

            async with self.db_engine.begin() as conn:
                await conn.run_sync(self._encrypted_base.metadata.create_all)

            self._tables_created = True

__init__

__init__(
    db_url: str,
    backend: EncryptionBackend,
    additional_backends: Sequence[EncryptionBackend] = (),
    **kwargs: Any,
) -> None

Initialize the encrypted session service.

Uses backend.sync_encrypt, backend.sync_decrypt, and backend.backend_id from the protocol to configure the EncryptedJSON TypeDecorator. When additional_backends are provided, their decrypt functions are included in the dispatch map for reading legacy-encrypted data.

Backends are fixed after construction. The cache_ok = True on EncryptedJSON means SQLAlchemy may cache the type instance — post-init mutation would be a correctness bug.

PARAMETER DESCRIPTION
db_url

SQLAlchemy connection string (e.g., "sqlite+aiosqlite:///sessions.db").

TYPE: str

backend

Primary encryption backend. Used for all new writes and included in the decrypt dispatch map.

TYPE: EncryptionBackend

additional_backends

Extra backends for decrypt-only dispatch. Each must conform to EncryptionBackend and have a unique backend_id.

TYPE: Sequence[EncryptionBackend] DEFAULT: ()

PARAMETER DESCRIPTION
**kwargs

Additional keyword arguments passed to DatabaseSessionService.__init__.

TYPE: Any

RAISES DESCRIPTION
ConfigurationError

If backend or any entry in additional_backends does not conform to EncryptionBackend, or if duplicate backend_id values are detected.

Source code in src/adk_secure_sessions/services/encrypted_session.py
def __init__(
    self,
    db_url: str,
    backend: EncryptionBackend,
    additional_backends: Sequence[EncryptionBackend] = (),
    **kwargs: Any,
) -> None:
    """Initialize the encrypted session service.

    Uses ``backend.sync_encrypt``, ``backend.sync_decrypt``, and
    ``backend.backend_id`` from the protocol to configure the
    ``EncryptedJSON`` TypeDecorator. When ``additional_backends``
    are provided, their decrypt functions are included in the
    dispatch map for reading legacy-encrypted data.

    Backends are fixed after construction. The ``cache_ok = True``
    on ``EncryptedJSON`` means SQLAlchemy may cache the type
    instance — post-init mutation would be a correctness bug.

    Args:
        db_url: SQLAlchemy connection string (e.g.,
            ``"sqlite+aiosqlite:///sessions.db"``).
        backend: Primary encryption backend. Used for all new writes
            and included in the decrypt dispatch map.
        additional_backends: Extra backends for decrypt-only
            dispatch. Each must conform to ``EncryptionBackend``
            and have a unique ``backend_id``.

    Other Parameters:
        **kwargs: Additional keyword arguments passed to
            ``DatabaseSessionService.__init__``.

    Raises:
        ConfigurationError: If *backend* or any entry in
            *additional_backends* does not conform to
            ``EncryptionBackend``, or if duplicate ``backend_id``
            values are detected.
    """
    if not isinstance(backend, EncryptionBackend):
        msg = (
            f"backend must conform to EncryptionBackend protocol, "
            f"got {type(backend).__name__}"
        )
        raise ConfigurationError(msg)

    for extra in additional_backends:
        if not isinstance(extra, EncryptionBackend):
            msg = (
                f"additional_backends entries must conform to "
                f"EncryptionBackend protocol, got {type(extra).__name__}"
            )
            raise ConfigurationError(msg)

    all_backends = [backend, *additional_backends]
    seen_ids: set[int] = set()
    for b in all_backends:
        if b.backend_id in seen_ids:
            msg = (
                f"Duplicate backend_id {b.backend_id:#04x} — each "
                f"backend must have a unique backend_id"
            )
            raise ConfigurationError(msg)
        seen_ids.add(b.backend_id)

    decrypt_dispatch = {b.backend_id: b.sync_decrypt for b in all_backends}

    self._encrypted_json = EncryptedJSON(
        encrypt_fn=backend.sync_encrypt,
        backend_id=backend.backend_id,
        decrypt_dispatch=decrypt_dispatch,
    )
    self._encrypted_base, self._encrypted_schema = create_encrypted_models(
        self._encrypted_json
    )

    super().__init__(db_url=db_url, **kwargs)

decrypt_json async

decrypt_json(
    envelope: bytes, backend: EncryptionBackend
) -> str

Decrypt an encrypted envelope back to a JSON string.

PARAMETER DESCRIPTION
envelope

Encrypted envelope bytes (>= 3 bytes).

TYPE: bytes

backend

Any EncryptionBackend-conformant object.

TYPE: EncryptionBackend

RETURNS DESCRIPTION
str

Original JSON string.

RAISES DESCRIPTION
DecryptionError

If envelope is invalid, tampered, backend fails, or decrypted bytes are not valid UTF-8.

Examples:

json_str = await decrypt_json(envelope, backend)
Source code in src/adk_secure_sessions/serialization.py
async def decrypt_json(
    envelope: bytes,
    backend: EncryptionBackend,
) -> str:
    """Decrypt an encrypted envelope back to a JSON string.

    Args:
        envelope: Encrypted envelope bytes (>= 3 bytes).
        backend: Any ``EncryptionBackend``-conformant object.

    Returns:
        Original JSON string.

    Raises:
        DecryptionError: If envelope is invalid, tampered, backend fails,
            or decrypted bytes are not valid UTF-8.

    Examples:
        ```python
        json_str = await decrypt_json(envelope, backend)
        ```
    """
    _version, _backend_id, ciphertext = _parse_envelope(envelope)
    plaintext = await backend.decrypt(ciphertext)
    try:
        return plaintext.decode("utf-8")
    except UnicodeDecodeError as exc:
        msg = "Failed to decode decrypted data as UTF-8"
        raise DecryptionError(msg) from exc

decrypt_session async

decrypt_session(
    envelope: bytes, backend: EncryptionBackend
) -> dict[str, Any]

Decrypt an encrypted envelope back to a session state dict.

PARAMETER DESCRIPTION
envelope

Encrypted envelope bytes (>= 3 bytes).

TYPE: bytes

backend

Any EncryptionBackend-conformant object.

TYPE: EncryptionBackend

RETURNS DESCRIPTION
dict[str, Any]

Original Python dictionary.

RAISES DESCRIPTION
DecryptionError

If envelope is invalid, tampered, or backend fails.

SerializationError

If decrypted bytes are not valid JSON.

Examples:

state = await decrypt_session(envelope, backend)
Source code in src/adk_secure_sessions/serialization.py
async def decrypt_session(
    envelope: bytes,
    backend: EncryptionBackend,
) -> dict[str, Any]:
    """Decrypt an encrypted envelope back to a session state dict.

    Args:
        envelope: Encrypted envelope bytes (>= 3 bytes).
        backend: Any ``EncryptionBackend``-conformant object.

    Returns:
        Original Python dictionary.

    Raises:
        DecryptionError: If envelope is invalid, tampered, or backend fails.
        SerializationError: If decrypted bytes are not valid JSON.

    Examples:
        ```python
        state = await decrypt_session(envelope, backend)
        ```
    """
    _version, _backend_id, ciphertext = _parse_envelope(envelope)
    plaintext = await backend.decrypt(ciphertext)
    try:
        data = json.loads(plaintext)
    except (json.JSONDecodeError, UnicodeDecodeError) as exc:
        msg = "Failed to deserialize decrypted data from JSON"
        raise SerializationError(msg) from exc
    return data

encrypt_json async

encrypt_json(
    json_str: str,
    backend: EncryptionBackend,
    backend_id: int,
) -> bytes

Encrypt a pre-serialized JSON string into an encrypted envelope.

PARAMETER DESCRIPTION
json_str

Valid JSON string (e.g., from model_dump_json()).

TYPE: str

backend

Any EncryptionBackend-conformant object.

TYPE: EncryptionBackend

backend_id

Integer identifying the backend.

TYPE: int

RETURNS DESCRIPTION
bytes

Encrypted envelope bytes: [version][backend_id][ciphertext].

Examples:

envelope = await encrypt_json(event.model_dump_json(), backend, BACKEND_FERNET)
Source code in src/adk_secure_sessions/serialization.py
async def encrypt_json(
    json_str: str,
    backend: EncryptionBackend,
    backend_id: int,
) -> bytes:
    """Encrypt a pre-serialized JSON string into an encrypted envelope.

    Args:
        json_str: Valid JSON string (e.g., from ``model_dump_json()``).
        backend: Any ``EncryptionBackend``-conformant object.
        backend_id: Integer identifying the backend.

    Returns:
        Encrypted envelope bytes: ``[version][backend_id][ciphertext]``.

    Examples:
        ```python
        envelope = await encrypt_json(event.model_dump_json(), backend, BACKEND_FERNET)
        ```
    """
    plaintext = json_str.encode("utf-8")
    ciphertext = await backend.encrypt(plaintext)
    return _build_envelope(ENVELOPE_VERSION_1, backend_id, ciphertext)

encrypt_session async

encrypt_session(
    data: dict[str, Any],
    backend: EncryptionBackend,
    backend_id: int,
) -> bytes

Serialize a session state dict to an encrypted envelope.

PARAMETER DESCRIPTION
data

JSON-serializable Python dictionary.

TYPE: dict[str, Any]

backend

Any EncryptionBackend-conformant object.

TYPE: EncryptionBackend

backend_id

Integer identifying the backend.

TYPE: int

RETURNS DESCRIPTION
bytes

Encrypted envelope bytes: [version][backend_id][ciphertext].

RAISES DESCRIPTION
SerializationError

If data cannot be serialized to JSON.

Examples:

envelope = await encrypt_session(
    {"ssn": "123-45-6789"}, backend, BACKEND_FERNET
)
Source code in src/adk_secure_sessions/serialization.py
async def encrypt_session(
    data: dict[str, Any],
    backend: EncryptionBackend,
    backend_id: int,
) -> bytes:
    """Serialize a session state dict to an encrypted envelope.

    Args:
        data: JSON-serializable Python dictionary.
        backend: Any ``EncryptionBackend``-conformant object.
        backend_id: Integer identifying the backend.

    Returns:
        Encrypted envelope bytes: ``[version][backend_id][ciphertext]``.

    Raises:
        SerializationError: If *data* cannot be serialized to JSON.

    Examples:
        ```python
        envelope = await encrypt_session(
            {"ssn": "123-45-6789"}, backend, BACKEND_FERNET
        )
        ```
    """
    try:
        plaintext = json.dumps(data).encode()
    except (TypeError, ValueError) as exc:
        msg = "Failed to serialize session data to JSON"
        raise SerializationError(msg) from exc
    ciphertext = await backend.encrypt(plaintext)
    return _build_envelope(ENVELOPE_VERSION_1, backend_id, ciphertext)