Skip to content

Fernet

fernet

Fernet symmetric encryption backend.

Implements the EncryptionBackend protocol using cryptography.fernet.Fernet for authenticated symmetric encryption (AES-128-CBC + HMAC-SHA256). Provides both async and synchronous encrypt/decrypt methods plus a backend_id property for envelope identification.

Keys can be provided as strings, bytes, or valid Fernet keys. Arbitrary input is derived into a valid Fernet key via a two-phase key derivation scheme:

  1. Extract (init time): PBKDF2-HMAC-SHA256 with 600,000 iterations stretches the passphrase into a 32-byte master key.
  2. Expand (per operation): HKDF-SHA256 with a fresh 16-byte random salt derives a unique per-operation Fernet key from the master key.

This provides per-operation key diversification for passphrase-derived keys, hardening against precomputation attacks on the derived key.

Pre-generated Fernet keys (via Fernet.generate_key()) bypass derivation entirely and are used directly with no salt marker and no format change.

Examples:

Basic usage:

from adk_secure_sessions.backends.fernet import FernetBackend

backend = FernetBackend(key="my-secret-passphrase")
ciphertext = await backend.encrypt(b"hello")
plaintext = await backend.decrypt(ciphertext)
See Also

adk_secure_sessions.protocols: EncryptionBackend protocol definition.

FernetBackend

Fernet encryption backend conforming to EncryptionBackend.

Accepts a key as str or bytes. If the key is a valid base64url-encoded 32-byte Fernet key, it is used directly (no derivation, no salt marker). Otherwise, a two-phase key derivation scheme is used:

  • Init: PBKDF2-HMAC-SHA256 (600,000 iterations) derives a master key from the passphrase.
  • Per-operation: HKDF-SHA256 with a fresh random salt expands the master key into a unique per-operation Fernet key.

Backward compatibility: data encrypted with pre-3.2 fixed-salt derivation (480,000 iterations) is detected via marker byte and decrypted transparently.

ATTRIBUTE DESCRIPTION
_passphrase_key

Master key from PBKDF2 at 600k iterations (passphrase mode only, None in direct-key mode).

TYPE: bytes | None

_legacy_fernet

Fernet instance for direct-key mode or legacy (480k iterations) backward-compatible decryption.

TYPE: Fernet

_is_passphrase_mode

Whether the backend was initialized with a passphrase (True) or a pre-generated Fernet key (False).

TYPE: bool

Examples:

Initialize with a passphrase:

backend = FernetBackend(key="my-secret")

Initialize with a pre-generated Fernet key:

from cryptography.fernet import Fernet

key = Fernet.generate_key()
backend = FernetBackend(key=key)
Source code in src/adk_secure_sessions/backends/fernet.py
class FernetBackend:
    """Fernet encryption backend conforming to ``EncryptionBackend``.

    Accepts a key as ``str`` or ``bytes``. If the key is a valid
    base64url-encoded 32-byte Fernet key, it is used directly (no
    derivation, no salt marker). Otherwise, a two-phase key derivation
    scheme is used:

    - **Init**: PBKDF2-HMAC-SHA256 (600,000 iterations) derives a master
      key from the passphrase.
    - **Per-operation**: HKDF-SHA256 with a fresh random salt expands
      the master key into a unique per-operation Fernet key.

    Backward compatibility: data encrypted with pre-3.2 fixed-salt
    derivation (480,000 iterations) is detected via marker byte and
    decrypted transparently.

    Attributes:
        _passphrase_key (bytes | None): Master key from PBKDF2 at 600k
            iterations (passphrase mode only, ``None`` in direct-key mode).
        _legacy_fernet (Fernet): Fernet instance for direct-key mode or
            legacy (480k iterations) backward-compatible decryption.
        _is_passphrase_mode (bool): Whether the backend was initialized
            with a passphrase (True) or a pre-generated Fernet key (False).

    Examples:
        Initialize with a passphrase:

        ```python
        backend = FernetBackend(key="my-secret")
        ```

        Initialize with a pre-generated Fernet key:

        ```python
        from cryptography.fernet import Fernet

        key = Fernet.generate_key()
        backend = FernetBackend(key=key)
        ```
    """

    def __init__(self, key: str | bytes) -> None:
        """Initialize FernetBackend with the given key.

        Args:
            key: Encryption key as ``str`` or ``bytes``. A valid Fernet
                key is used directly; arbitrary input is derived via
                two-phase PBKDF2 + HKDF.

        Raises:
            ConfigurationError: If *key* is not ``str`` or ``bytes``,
                or if *key* is empty.
        """
        if not isinstance(key, (str, bytes)):
            msg = f"key must be str or bytes, got {type(key).__name__}"
            raise ConfigurationError(msg)

        if isinstance(key, str):
            key = key.encode()

        if not key:
            msg = "key must not be empty"
            raise ConfigurationError(msg)

        if self._is_valid_fernet_key(key):
            self._passphrase_key: bytes | None = None
            self._legacy_fernet = Fernet(key)
            self._is_passphrase_mode = False
        else:
            self._passphrase_key = self._derive_master_key(key, _PBKDF2_ITERATIONS)
            legacy_raw = self._derive_master_key(key, _PBKDF2_ITERATIONS_LEGACY)
            self._legacy_fernet = Fernet(base64.urlsafe_b64encode(legacy_raw))
            self._is_passphrase_mode = True

    @property
    def backend_id(self) -> int:
        """Unique backend identifier for the envelope header.

        Returns:
            ``BACKEND_FERNET`` (``0x01``).
        """
        return BACKEND_FERNET

    def sync_encrypt(self, plaintext: bytes) -> bytes:
        """Encrypt plaintext bytes synchronously.

        In passphrase mode, generates a fresh 16-byte random salt,
        derives a per-operation Fernet key via HKDF, and returns
        ``SALT_MARKER || salt || fernet_token``. In direct-key mode,
        returns a standard Fernet token.

        Args:
            plaintext: Raw bytes to encrypt.

        Returns:
            Encrypted ciphertext as bytes.

        Raises:
            TypeError: If *plaintext* is not ``bytes``.

        Examples:
            ```python
            ciphertext = backend.sync_encrypt(b"hello")
            ```
        """
        if not isinstance(plaintext, bytes):
            msg = f"plaintext must be bytes, got {type(plaintext).__name__}"
            raise TypeError(msg)

        if not self._is_passphrase_mode:
            return self._legacy_fernet.encrypt(plaintext)

        salt = os.urandom(_SALT_LENGTH)
        per_op_key = self._derive_per_op_key(salt)
        token = Fernet(per_op_key).encrypt(plaintext)
        return _SALT_MARKER + salt + token

    def sync_decrypt(self, ciphertext: bytes) -> bytes:
        """Decrypt ciphertext bytes synchronously.

        Detects the ciphertext format via the first byte:

        - ``0x01``: new salted format — extracts salt, derives per-op
          key via HKDF, decrypts the Fernet token.
        - ``>= 0x2B``: legacy Fernet token — decrypts with the legacy
          Fernet instance.
        - Otherwise: raises ``DecryptionError``.

        Args:
            ciphertext: Encrypted bytes to decrypt.

        Returns:
            Decrypted plaintext as bytes.

        Raises:
            TypeError: If *ciphertext* is not ``bytes``.
            DecryptionError: If decryption fails due to wrong key,
                tampered ciphertext, or malformed input.

        Examples:
            ```python
            plaintext = backend.sync_decrypt(ciphertext)
            ```
        """
        if not isinstance(ciphertext, bytes):
            msg = f"ciphertext must be bytes, got {type(ciphertext).__name__}"
            raise TypeError(msg)

        if not ciphertext:
            msg = _DECRYPT_FAILED_MSG
            raise DecryptionError(msg)

        if ciphertext[0:1] == _SALT_MARKER and len(ciphertext) >= _MIN_SALTED_LENGTH:
            return self._decrypt_salted(ciphertext)

        if ciphertext[0] >= 0x2B:
            return self._decrypt_legacy(ciphertext)

        msg = "Decryption failed: invalid ciphertext format"
        raise DecryptionError(msg)

    async def encrypt(self, plaintext: bytes) -> bytes:
        """Encrypt plaintext bytes asynchronously.

        Delegates to ``sync_encrypt`` via ``asyncio.to_thread()``.

        Args:
            plaintext: Raw bytes to encrypt.

        Returns:
            Encrypted ciphertext as bytes.

        Raises:
            TypeError: If *plaintext* is not ``bytes``.

        Examples:
            ```python
            ciphertext = await backend.encrypt(b"hello")
            ```
        """
        if not isinstance(plaintext, bytes):
            msg = f"plaintext must be bytes, got {type(plaintext).__name__}"
            raise TypeError(msg)

        return await asyncio.to_thread(self.sync_encrypt, plaintext)

    async def decrypt(self, ciphertext: bytes) -> bytes:
        """Decrypt ciphertext bytes asynchronously.

        Delegates to ``sync_decrypt`` via ``asyncio.to_thread()``.

        Args:
            ciphertext: Encrypted bytes to decrypt.

        Returns:
            Decrypted plaintext as bytes.

        Raises:
            TypeError: If *ciphertext* is not ``bytes``.
            DecryptionError: If decryption fails due to wrong key,
                tampered ciphertext, or malformed input.

        Examples:
            ```python
            plaintext = await backend.decrypt(ciphertext)
            ```
        """
        if not isinstance(ciphertext, bytes):
            msg = f"ciphertext must be bytes, got {type(ciphertext).__name__}"
            raise TypeError(msg)

        return await asyncio.to_thread(self.sync_decrypt, ciphertext)

    def _derive_per_op_key(self, salt: bytes) -> bytes:
        """Derive a per-operation Fernet key via HKDF-SHA256.

        Validates that a master key exists, then expands it with the
        given salt to produce a unique Fernet key.

        Args:
            salt: Random salt for HKDF domain separation.

        Returns:
            A valid base64url-encoded 32-byte Fernet key.

        Raises:
            DecryptionError: If no master key is available (direct-key mode).
        """
        master = self._passphrase_key
        if master is None:
            msg = _DECRYPT_FAILED_MSG
            raise DecryptionError(msg)

        raw = HKDF(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            info=_HKDF_INFO,
        ).derive(master)
        return base64.urlsafe_b64encode(raw)

    def _decrypt_salted(self, ciphertext: bytes) -> bytes:
        """Decrypt new-format salted ciphertext.

        Extracts the 16-byte salt, derives a per-op key via HKDF,
        and decrypts the remaining Fernet token.

        Args:
            ciphertext: Full ciphertext including marker and salt.

        Returns:
            Decrypted plaintext.

        Raises:
            DecryptionError: If decryption fails.
        """
        salt = ciphertext[1 : 1 + _SALT_LENGTH]
        token = ciphertext[1 + _SALT_LENGTH :]

        if self._passphrase_key is None:
            msg = _DECRYPT_FAILED_MSG
            raise DecryptionError(msg)

        per_op_key = self._derive_per_op_key(salt)
        try:
            return Fernet(per_op_key).decrypt(token)
        except InvalidToken:
            msg = _DECRYPT_FAILED_MSG
            raise DecryptionError(msg) from None

    def _decrypt_legacy(self, ciphertext: bytes) -> bytes:
        """Decrypt legacy Fernet token (pre-3.2 or direct-key mode).

        Delegates to ``self._legacy_fernet`` which uses the 480k-iteration
        PBKDF2-derived key (passphrase mode) or direct Fernet key.

        Args:
            ciphertext: Legacy Fernet token bytes.

        Returns:
            Decrypted plaintext.

        Raises:
            DecryptionError: If decryption fails.
        """
        try:
            return self._legacy_fernet.decrypt(ciphertext)
        except InvalidToken:
            msg = _DECRYPT_FAILED_MSG
            raise DecryptionError(msg) from None

    @staticmethod
    def _is_valid_fernet_key(key_bytes: bytes) -> bool:
        """Check whether key_bytes is a valid Fernet key.

        Attempts to construct a ``Fernet`` instance. Returns ``False``
        on ``ValueError`` (wrong key length) or ``binascii.Error``
        (non-base64 input).

        Args:
            key_bytes: Raw key material.

        Returns:
            True if the bytes are a valid Fernet key.
        """
        try:
            Fernet(key_bytes)
            return True
        except (ValueError, binascii.Error):
            return False

    @staticmethod
    def _derive_master_key(passphrase: bytes, iterations: int) -> bytes:
        """Derive a 32-byte master key via PBKDF2-HMAC-SHA256.

        Args:
            passphrase: Raw passphrase bytes.
            iterations: Number of PBKDF2 iterations.

        Returns:
            32-byte raw derived key.
        """
        return hashlib.pbkdf2_hmac(
            "sha256",
            passphrase,
            _PBKDF2_SALT,
            iterations,
        )

backend_id property

backend_id: int

Unique backend identifier for the envelope header.

RETURNS DESCRIPTION
int

BACKEND_FERNET (0x01).

__init__

__init__(key: str | bytes) -> None

Initialize FernetBackend with the given key.

PARAMETER DESCRIPTION
key

Encryption key as str or bytes. A valid Fernet key is used directly; arbitrary input is derived via two-phase PBKDF2 + HKDF.

TYPE: str | bytes

RAISES DESCRIPTION
ConfigurationError

If key is not str or bytes, or if key is empty.

Source code in src/adk_secure_sessions/backends/fernet.py
def __init__(self, key: str | bytes) -> None:
    """Initialize FernetBackend with the given key.

    Args:
        key: Encryption key as ``str`` or ``bytes``. A valid Fernet
            key is used directly; arbitrary input is derived via
            two-phase PBKDF2 + HKDF.

    Raises:
        ConfigurationError: If *key* is not ``str`` or ``bytes``,
            or if *key* is empty.
    """
    if not isinstance(key, (str, bytes)):
        msg = f"key must be str or bytes, got {type(key).__name__}"
        raise ConfigurationError(msg)

    if isinstance(key, str):
        key = key.encode()

    if not key:
        msg = "key must not be empty"
        raise ConfigurationError(msg)

    if self._is_valid_fernet_key(key):
        self._passphrase_key: bytes | None = None
        self._legacy_fernet = Fernet(key)
        self._is_passphrase_mode = False
    else:
        self._passphrase_key = self._derive_master_key(key, _PBKDF2_ITERATIONS)
        legacy_raw = self._derive_master_key(key, _PBKDF2_ITERATIONS_LEGACY)
        self._legacy_fernet = Fernet(base64.urlsafe_b64encode(legacy_raw))
        self._is_passphrase_mode = True

sync_encrypt

sync_encrypt(plaintext: bytes) -> bytes

Encrypt plaintext bytes synchronously.

In passphrase mode, generates a fresh 16-byte random salt, derives a per-operation Fernet key via HKDF, and returns SALT_MARKER || salt || fernet_token. In direct-key mode, returns a standard Fernet token.

PARAMETER DESCRIPTION
plaintext

Raw bytes to encrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Encrypted ciphertext as bytes.

RAISES DESCRIPTION
TypeError

If plaintext is not bytes.

Examples:

ciphertext = backend.sync_encrypt(b"hello")
Source code in src/adk_secure_sessions/backends/fernet.py
def sync_encrypt(self, plaintext: bytes) -> bytes:
    """Encrypt plaintext bytes synchronously.

    In passphrase mode, generates a fresh 16-byte random salt,
    derives a per-operation Fernet key via HKDF, and returns
    ``SALT_MARKER || salt || fernet_token``. In direct-key mode,
    returns a standard Fernet token.

    Args:
        plaintext: Raw bytes to encrypt.

    Returns:
        Encrypted ciphertext as bytes.

    Raises:
        TypeError: If *plaintext* is not ``bytes``.

    Examples:
        ```python
        ciphertext = backend.sync_encrypt(b"hello")
        ```
    """
    if not isinstance(plaintext, bytes):
        msg = f"plaintext must be bytes, got {type(plaintext).__name__}"
        raise TypeError(msg)

    if not self._is_passphrase_mode:
        return self._legacy_fernet.encrypt(plaintext)

    salt = os.urandom(_SALT_LENGTH)
    per_op_key = self._derive_per_op_key(salt)
    token = Fernet(per_op_key).encrypt(plaintext)
    return _SALT_MARKER + salt + token

sync_decrypt

sync_decrypt(ciphertext: bytes) -> bytes

Decrypt ciphertext bytes synchronously.

Detects the ciphertext format via the first byte:

  • 0x01: new salted format — extracts salt, derives per-op key via HKDF, decrypts the Fernet token.
  • >= 0x2B: legacy Fernet token — decrypts with the legacy Fernet instance.
  • Otherwise: raises DecryptionError.
PARAMETER DESCRIPTION
ciphertext

Encrypted bytes to decrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Decrypted plaintext as bytes.

RAISES DESCRIPTION
TypeError

If ciphertext is not bytes.

DecryptionError

If decryption fails due to wrong key, tampered ciphertext, or malformed input.

Examples:

plaintext = backend.sync_decrypt(ciphertext)
Source code in src/adk_secure_sessions/backends/fernet.py
def sync_decrypt(self, ciphertext: bytes) -> bytes:
    """Decrypt ciphertext bytes synchronously.

    Detects the ciphertext format via the first byte:

    - ``0x01``: new salted format — extracts salt, derives per-op
      key via HKDF, decrypts the Fernet token.
    - ``>= 0x2B``: legacy Fernet token — decrypts with the legacy
      Fernet instance.
    - Otherwise: raises ``DecryptionError``.

    Args:
        ciphertext: Encrypted bytes to decrypt.

    Returns:
        Decrypted plaintext as bytes.

    Raises:
        TypeError: If *ciphertext* is not ``bytes``.
        DecryptionError: If decryption fails due to wrong key,
            tampered ciphertext, or malformed input.

    Examples:
        ```python
        plaintext = backend.sync_decrypt(ciphertext)
        ```
    """
    if not isinstance(ciphertext, bytes):
        msg = f"ciphertext must be bytes, got {type(ciphertext).__name__}"
        raise TypeError(msg)

    if not ciphertext:
        msg = _DECRYPT_FAILED_MSG
        raise DecryptionError(msg)

    if ciphertext[0:1] == _SALT_MARKER and len(ciphertext) >= _MIN_SALTED_LENGTH:
        return self._decrypt_salted(ciphertext)

    if ciphertext[0] >= 0x2B:
        return self._decrypt_legacy(ciphertext)

    msg = "Decryption failed: invalid ciphertext format"
    raise DecryptionError(msg)

encrypt async

encrypt(plaintext: bytes) -> bytes

Encrypt plaintext bytes asynchronously.

Delegates to sync_encrypt via asyncio.to_thread().

PARAMETER DESCRIPTION
plaintext

Raw bytes to encrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Encrypted ciphertext as bytes.

RAISES DESCRIPTION
TypeError

If plaintext is not bytes.

Examples:

ciphertext = await backend.encrypt(b"hello")
Source code in src/adk_secure_sessions/backends/fernet.py
async def encrypt(self, plaintext: bytes) -> bytes:
    """Encrypt plaintext bytes asynchronously.

    Delegates to ``sync_encrypt`` via ``asyncio.to_thread()``.

    Args:
        plaintext: Raw bytes to encrypt.

    Returns:
        Encrypted ciphertext as bytes.

    Raises:
        TypeError: If *plaintext* is not ``bytes``.

    Examples:
        ```python
        ciphertext = await backend.encrypt(b"hello")
        ```
    """
    if not isinstance(plaintext, bytes):
        msg = f"plaintext must be bytes, got {type(plaintext).__name__}"
        raise TypeError(msg)

    return await asyncio.to_thread(self.sync_encrypt, plaintext)

decrypt async

decrypt(ciphertext: bytes) -> bytes

Decrypt ciphertext bytes asynchronously.

Delegates to sync_decrypt via asyncio.to_thread().

PARAMETER DESCRIPTION
ciphertext

Encrypted bytes to decrypt.

TYPE: bytes

RETURNS DESCRIPTION
bytes

Decrypted plaintext as bytes.

RAISES DESCRIPTION
TypeError

If ciphertext is not bytes.

DecryptionError

If decryption fails due to wrong key, tampered ciphertext, or malformed input.

Examples:

plaintext = await backend.decrypt(ciphertext)
Source code in src/adk_secure_sessions/backends/fernet.py
async def decrypt(self, ciphertext: bytes) -> bytes:
    """Decrypt ciphertext bytes asynchronously.

    Delegates to ``sync_decrypt`` via ``asyncio.to_thread()``.

    Args:
        ciphertext: Encrypted bytes to decrypt.

    Returns:
        Decrypted plaintext as bytes.

    Raises:
        TypeError: If *ciphertext* is not ``bytes``.
        DecryptionError: If decryption fails due to wrong key,
            tampered ciphertext, or malformed input.

    Examples:
        ```python
        plaintext = await backend.decrypt(ciphertext)
        ```
    """
    if not isinstance(ciphertext, bytes):
        msg = f"ciphertext must be bytes, got {type(ciphertext).__name__}"
        raise TypeError(msg)

    return await asyncio.to_thread(self.sync_decrypt, ciphertext)