This document describes the current stable version of Celery (4.2). For development docs, go here.
Source code for celery.security.certificate
# -*- coding: utf-8 -*-
"""X.509 certificates."""
from __future__ import absolute_import, unicode_literals
import glob
import os
from kombu.utils.encoding import bytes_to_str
from celery.exceptions import SecurityError
from celery.five import values
from .utils import crypto, reraise_errors
__all__ = ('Certificate', 'CertStore', 'FSCertStore')
[docs]class Certificate(object):
"""X.509 certificate."""
def __init__(self, cert):
assert crypto is not None
with reraise_errors('Invalid certificate: {0!r}'):
self._cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
return self._cert.has_expired()
return bytes_to_str(self._cert.get_serial_number())
[docs] def get_issuer(self):
"""Return issuer (CA) as a string."""
return ' '.join(bytes_to_str(x[1]) for x in
self._cert.get_issuer().get_components())
return '{0} {1}'.format(self.get_issuer(), self.get_serial_number())
[docs] def verify(self, data, signature, digest):
"""Verify signature for string containing data."""
with reraise_errors('Bad signature: {0!r}'):
crypto.verify(self._cert, signature, data, digest)
[docs]class CertStore(object):
"""Base class for certificate stores."""
def __init__(self):
self._certs = {}
yield c
def __getitem__(self, id):
"""Get certificate by id."""
try:
return self._certs[bytes_to_str(id)]
except KeyError:
raise SecurityError('Unknown certificate: {0!r}'.format(id))
[docs] def add_cert(self, cert):
cert_id = bytes_to_str(cert.get_id())
if cert_id in self._certs:
raise SecurityError('Duplicate certificate: {0!r}'.format(id))
self._certs[cert_id] = cert
[docs]class FSCertStore(CertStore):
"""File system certificate store."""
def __init__(self, path):
CertStore.__init__(self)
if os.path.isdir(path):
path = os.path.join(path, '*')
for p in glob.glob(path):
with open(p) as f:
cert = Certificate(f.read())
if cert.has_expired():
raise SecurityError(
'Expired certificate: {0!r}'.format(cert.get_id()))
self.add_cert(cert)