This document describes the current stable version of Celery (5.2). For development docs, go here.

Source code for celery.security.serialization

"""Secure serializer."""
from kombu.serialization import dumps, loads, registry
from kombu.utils.encoding import bytes_to_str, ensure_bytes, str_to_bytes

from celery.app.defaults import DEFAULT_SECURITY_DIGEST
from celery.utils.serialization import b64decode, b64encode

from .certificate import Certificate, FSCertStore
from .key import PrivateKey
from .utils import get_digest_algorithm, reraise_errors

__all__ = ('SecureSerializer', 'register_auth')


[docs]class SecureSerializer: """Signed serializer.""" def __init__(self, key=None, cert=None, cert_store=None, digest=DEFAULT_SECURITY_DIGEST, serializer='json'): self._key = key self._cert = cert self._cert_store = cert_store self._digest = get_digest_algorithm(digest) self._serializer = serializer
[docs] def serialize(self, data): """Serialize data structure into string.""" assert self._key is not None assert self._cert is not None with reraise_errors('Unable to serialize: {0!r}', (Exception,)): content_type, content_encoding, body = dumps( bytes_to_str(data), serializer=self._serializer) # What we sign is the serialized body, not the body itself. # this way the receiver doesn't have to decode the contents # to verify the signature (and thus avoiding potential flaws # in the decoding step). body = ensure_bytes(body) return self._pack(body, content_type, content_encoding, signature=self._key.sign(body, self._digest), signer=self._cert.get_id())
[docs] def deserialize(self, data): """Deserialize data structure from string.""" assert self._cert_store is not None with reraise_errors('Unable to deserialize: {0!r}', (Exception,)): payload = self._unpack(data) signature, signer, body = (payload['signature'], payload['signer'], payload['body']) self._cert_store[signer].verify(body, signature, self._digest) return loads(bytes_to_str(body), payload['content_type'], payload['content_encoding'], force=True)
def _pack(self, body, content_type, content_encoding, signer, signature, sep=str_to_bytes('\x00\x01')): fields = sep.join( ensure_bytes(s) for s in [signer, signature, content_type, content_encoding, body] ) return b64encode(fields) def _unpack(self, payload, sep=str_to_bytes('\x00\x01')): raw_payload = b64decode(ensure_bytes(payload)) first_sep = raw_payload.find(sep) signer = raw_payload[:first_sep] signer_cert = self._cert_store[signer] # shift 3 bits right to get signature length # 2048bit rsa key has a signature length of 256 # 4096bit rsa key has a signature length of 512 sig_len = signer_cert.get_pubkey().key_size >> 3 sep_len = len(sep) signature_start_position = first_sep + sep_len signature_end_position = signature_start_position + sig_len signature = raw_payload[ signature_start_position:signature_end_position ] v = raw_payload[signature_end_position + sep_len:].split(sep) return { 'signer': signer, 'signature': signature, 'content_type': bytes_to_str(v[0]), 'content_encoding': bytes_to_str(v[1]), 'body': bytes_to_str(v[2]), }
[docs]def register_auth(key=None, cert=None, store=None, digest=DEFAULT_SECURITY_DIGEST, serializer='json'): """Register security serializer.""" s = SecureSerializer(key and PrivateKey(key), cert and Certificate(cert), store and FSCertStore(store), digest, serializer=serializer) registry.register('auth', s.serialize, s.deserialize, content_type='application/data', content_encoding='utf-8')