This document describes the current stable version of Celery (5.0). For development docs, go here.

Source code for celery.backends.azureblockblob

"""The Azure Storage Block Blob backend for Celery."""
from kombu.utils import cached_property
from kombu.utils.encoding import bytes_to_str

from celery.exceptions import ImproperlyConfigured
from celery.utils.log import get_logger

from .base import KeyValueStoreBackend

try:
    from azure import storage as azurestorage
    from azure.common import AzureMissingResourceHttpError
    from azure.storage.blob import BlockBlobService
    from azure.storage.common.retry import ExponentialRetry
except ImportError:  # pragma: no cover
    azurestorage = BlockBlobService = ExponentialRetry = \
        AzureMissingResourceHttpError = None  # noqa

__all__ = ("AzureBlockBlobBackend",)

LOGGER = get_logger(__name__)


[docs]class AzureBlockBlobBackend(KeyValueStoreBackend): """Azure Storage Block Blob backend for Celery.""" def __init__(self, url=None, container_name=None, retry_initial_backoff_sec=None, retry_increment_base=None, retry_max_attempts=None, *args, **kwargs): super().__init__(*args, **kwargs) if azurestorage is None: raise ImproperlyConfigured( "You need to install the azure-storage library to use the " "AzureBlockBlob backend") conf = self.app.conf self._connection_string = self._parse_url(url) self._container_name = ( container_name or conf["azureblockblob_container_name"]) self._retry_initial_backoff_sec = ( retry_initial_backoff_sec or conf["azureblockblob_retry_initial_backoff_sec"]) self._retry_increment_base = ( retry_increment_base or conf["azureblockblob_retry_increment_base"]) self._retry_max_attempts = ( retry_max_attempts or conf["azureblockblob_retry_max_attempts"]) @classmethod def _parse_url(cls, url, prefix="azureblockblob://"): connection_string = url[len(prefix):] if not connection_string: raise ImproperlyConfigured("Invalid URL") return connection_string @cached_property def _client(self): """Return the Azure Storage Block Blob service. If this is the first call to the property, the client is created and the container is created if it doesn't yet exist. """ client = BlockBlobService(connection_string=self._connection_string) created = client.create_container( container_name=self._container_name, fail_on_exist=False) if created: LOGGER.info("Created Azure Blob Storage container %s", self._container_name) client.retry = ExponentialRetry( initial_backoff=self._retry_initial_backoff_sec, increment_base=self._retry_increment_base, max_attempts=self._retry_max_attempts).retry return client
[docs] def get(self, key): """Read the value stored at the given key. Args: key: The key for which to read the value. """ key = bytes_to_str(key) LOGGER.debug("Getting Azure Block Blob %s/%s", self._container_name, key) try: return self._client.get_blob_to_text( self._container_name, key).content except AzureMissingResourceHttpError: return None
[docs] def set(self, key, value): """Store a value for a given key. Args: key: The key at which to store the value. value: The value to store. """ key = bytes_to_str(key) LOGGER.debug("Creating Azure Block Blob at %s/%s", self._container_name, key) return self._client.create_blob_from_text( self._container_name, key, value)
[docs] def mget(self, keys): """Read all the values for the provided keys. Args: keys: The list of keys to read. """ return [self.get(key) for key in keys]
[docs] def delete(self, key): """Delete the value at a given key. Args: key: The key of the value to delete. """ key = bytes_to_str(key) LOGGER.debug("Deleting Azure Block Blob at %s/%s", self._container_name, key) self._client.delete_blob(self._container_name, key)