This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.3.

Source code for kombu.transport.azurestoragequeues

"""Azure Storage Queues transport module for kombu.

More information about Azure Storage Queues:
https://azure.microsoft.com/en-us/services/storage/queues/

Features
========
* Type: Virtual
* Supports Direct: *Unreviewed*
* Supports Topic: *Unreviewed*
* Supports Fanout: *Unreviewed*
* Supports Priority: *Unreviewed*
* Supports TTL: *Unreviewed*

Connection String
=================

Connection string has the following formats:

.. code-block::

    azurestoragequeues://<STORAGE_ACCOUNT_ACCESS_KEY>@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://<SAS_TOKEN>@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL>

Note that if the access key for the storage account contains a forward slash
(``/``), it will have to be regenerated before it can be used in the connection
URL.

.. code-block::

    azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL>

If you wish to use an `Azure Managed Identity` you may use the
``DefaultAzureCredential`` format of the connection string which will use
``DefaultAzureCredential`` class in the azure-identity package. You may want to
read the `azure-identity documentation` for more information on how the
``DefaultAzureCredential`` works.

.. _azure-identity documentation:
https://learn.microsoft.com/en-us/python/api/overview/azure/identity-readme?view=azure-python
.. _Azure Managed Identity:
https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview

Transport Options
=================

* ``queue_name_prefix``
"""

from __future__ import annotations

import string
from queue import Empty
from typing import Any

from azure.core.exceptions import ResourceExistsError

from kombu.utils.encoding import safe_str
from kombu.utils.json import dumps, loads
from kombu.utils.objects import cached_property

from . import virtual

try:
    from azure.storage.queue import QueueServiceClient
except ImportError:  # pragma: no cover
    QueueServiceClient = None

try:
    from azure.identity import (DefaultAzureCredential,
                                ManagedIdentityCredential)
except ImportError:
    DefaultAzureCredential = None
    ManagedIdentityCredential = None

# Azure storage queues allow only alphanumeric and dashes
# so, replace everything with a dash
CHARS_REPLACE_TABLE = {
    ord(c): 0x2d for c in string.punctuation
}


[docs] class Channel(virtual.Channel): """Azure Storage Queues channel.""" domain_format: str = 'kombu%(vhost)s' _queue_service: QueueServiceClient | None = None _queue_name_cache: dict[Any, Any] = {} no_ack: bool = True _noack_queues: set[Any] = set() def __init__(self, *args, **kwargs): if QueueServiceClient is None: raise ImportError('Azure Storage Queues transport requires the ' 'azure-storage-queue library') super().__init__(*args, **kwargs) self._credential, self._url = Transport.parse_uri( self.conninfo.hostname ) for queue in self.queue_service.list_queues(): self._queue_name_cache[queue['name']] = queue
[docs] def basic_consume(self, queue, no_ack, *args, **kwargs): if no_ack: self._noack_queues.add(queue) return super().basic_consume(queue, no_ack, *args, **kwargs)
[docs] def entity_name(self, name, table=CHARS_REPLACE_TABLE) -> str: """Format AMQP queue name into a valid Azure Storage Queue name.""" return str(safe_str(name)).translate(table)
def _ensure_queue(self, queue): """Ensure a queue exists.""" queue = self.entity_name(self.queue_name_prefix + queue) try: q = self._queue_service.get_queue_client( queue=self._queue_name_cache[queue] ) except KeyError: try: q = self.queue_service.create_queue(queue) except ResourceExistsError: q = self._queue_service.get_queue_client(queue=queue) self._queue_name_cache[queue] = q.get_queue_properties() return q def _delete(self, queue, *args, **kwargs): """Delete queue by name.""" queue_name = self.entity_name(queue) self._queue_name_cache.pop(queue_name, None) self.queue_service.delete_queue(queue_name) def _put(self, queue, message, **kwargs): """Put message onto queue.""" q = self._ensure_queue(queue) encoded_message = dumps(message) q.send_message(encoded_message) def _get(self, queue, timeout=None): """Try to retrieve a single message off ``queue``.""" q = self._ensure_queue(queue) messages = q.receive_messages(messages_per_page=1, timeout=timeout) try: message = next(messages) except StopIteration: raise Empty() content = loads(message.content) q.delete_message(message=message) return content def _size(self, queue): """Return the number of messages in a queue.""" q = self._ensure_queue(queue) return q.get_queue_properties().approximate_message_count def _purge(self, queue): """Delete all current messages in a queue.""" q = self._ensure_queue(queue) n = self._size(q.queue_name) q.clear_messages() return n @property def queue_service(self) -> QueueServiceClient: if self._queue_service is None: self._queue_service = QueueServiceClient( account_url=self._url, credential=self._credential ) return self._queue_service @property def conninfo(self): return self.connection.client @property def transport_options(self): return self.connection.client.transport_options @cached_property def queue_name_prefix(self) -> str: return self.transport_options.get('queue_name_prefix', '')
[docs] class Transport(virtual.Transport): """Azure Storage Queues transport.""" Channel = Channel polling_interval: int = 1 default_port: int | None = None can_parse_url: bool = True
[docs] @staticmethod def parse_uri(uri: str) -> tuple[str | dict, str]: # URL like: # azurestoragequeues://<STORAGE_ACCOUNT_ACCESS_KEY>@<STORAGE_ACCOUNT_URL> # azurestoragequeues://<SAS_TOKEN>@<STORAGE_ACCOUNT_URL> # azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL> # azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL> # urllib parse does not work as the sas key could contain a slash # e.g.: azurestoragequeues://some/key@someurl try: # > 'some/key@url' uri = uri.replace('azurestoragequeues://', '') # > 'some/key', 'url' credential, url = uri.rsplit('@', 1) if "DefaultAzureCredential".lower() == credential.lower(): if DefaultAzureCredential is None: raise ImportError('Azure Storage Queues transport with a ' 'DefaultAzureCredential requires the ' 'azure-identity library') credential = DefaultAzureCredential() elif "ManagedIdentityCredential".lower() == credential.lower(): if ManagedIdentityCredential is None: raise ImportError('Azure Storage Queues transport with a ' 'ManagedIdentityCredential requires the ' 'azure-identity library') credential = ManagedIdentityCredential() elif "devstoreaccount1" in url and ".core.windows.net" not in url: # parse credential as a dict if Azurite is being used credential = { "account_name": "devstoreaccount1", "account_key": credential, } # Validate parameters assert all([credential, url]) except Exception: raise ValueError( 'Need a URI like ' 'azurestoragequeues://{SAS or access key}@{URL}, ' 'azurestoragequeues://DefaultAzureCredential@{URL}, ' ', or ' 'azurestoragequeues://ManagedIdentityCredential@{URL}' ) return credential, url
[docs] @classmethod def as_uri( cls, uri: str, include_password: bool = False, mask: str = "**" ) -> str: credential, url = cls.parse_uri(uri) return "azurestoragequeues://{}@{}".format( credential if include_password else mask, url )