This document describes the current stable version of Kombu (5.3). For development docs, go here.

Source code for kombu.transport.azurestoragequeues

"""Azure Storage Queues transport module for kombu.

More information about Azure Storage Queues:
https://azure.microsoft.com/en-us/services/storage/queues/

Features
========
* Type: Virtual
* Supports Direct: *Unreviewed*
* Supports Topic: *Unreviewed*
* Supports Fanout: *Unreviewed*
* Supports Priority: *Unreviewed*
* Supports TTL: *Unreviewed*

Connection String
=================

Connection string has the following formats:

.. code-block::

    azurestoragequeues://<STORAGE_ACCOUNT_ACCESS_KEY>@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://<SAS_TOKEN>@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL>

Note that if the access key for the storage account contains a forward slash
(``/``), it will have to be regenerated before it can be used in the connection
URL.

.. code-block::

    azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL>
    azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL>

If you wish to use an `Azure Managed Identity` you may use the
``DefaultAzureCredential`` format of the connection string which will use
``DefaultAzureCredential`` class in the azure-identity package. You may want to
read the `azure-identity documentation` for more information on how the
``DefaultAzureCredential`` works.

.. _azure-identity documentation:
https://learn.microsoft.com/en-us/python/api/overview/azure/identity-readme?view=azure-python
.. _Azure Managed Identity:
https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview

Transport Options
=================

* ``queue_name_prefix``
"""

from __future__ import annotations

import string
from queue import Empty
from typing import Any, Optional

from azure.core.exceptions import ResourceExistsError

from kombu.utils.encoding import safe_str
from kombu.utils.json import dumps, loads
from kombu.utils.objects import cached_property

from . import virtual

try:
    from azure.storage.queue import QueueServiceClient
except ImportError:  # pragma: no cover
    QueueServiceClient = None

try:
    from azure.identity import (DefaultAzureCredential,
                                ManagedIdentityCredential)
except ImportError:
    DefaultAzureCredential = None
    ManagedIdentityCredential = None

# Azure storage queues allow only alphanumeric and dashes
# so, replace everything with a dash
CHARS_REPLACE_TABLE = {
    ord(c): 0x2d for c in string.punctuation
}


[docs] class Channel(virtual.Channel): """Azure Storage Queues channel.""" domain_format: str = 'kombu%(vhost)s' _queue_service: Optional[QueueServiceClient] = None _queue_name_cache: dict[Any, Any] = {} no_ack: bool = True _noack_queues: set[Any] = set() def __init__(self, *args, **kwargs): if QueueServiceClient is None: raise ImportError('Azure Storage Queues transport requires the ' 'azure-storage-queue library') super().__init__(*args, **kwargs) self._credential, self._url = Transport.parse_uri( self.conninfo.hostname ) for queue in self.queue_service.list_queues(): self._queue_name_cache[queue['name']] = queue
[docs] def basic_consume(self, queue, no_ack, *args, **kwargs): if no_ack: self._noack_queues.add(queue) return super().basic_consume(queue, no_ack, *args, **kwargs)
[docs] def entity_name(self, name, table=CHARS_REPLACE_TABLE) -> str: """Format AMQP queue name into a valid Azure Storage Queue name.""" return str(safe_str(name)).translate(table)
def _ensure_queue(self, queue): """Ensure a queue exists.""" queue = self.entity_name(self.queue_name_prefix + queue) try: q = self._queue_service.get_queue_client( queue=self._queue_name_cache[queue] ) except KeyError: try: q = self.queue_service.create_queue(queue) except ResourceExistsError: q = self._queue_service.get_queue_client(queue=queue) self._queue_name_cache[queue] = q.get_queue_properties() return q def _delete(self, queue, *args, **kwargs): """Delete queue by name.""" queue_name = self.entity_name(queue) self._queue_name_cache.pop(queue_name, None) self.queue_service.delete_queue(queue_name) def _put(self, queue, message, **kwargs): """Put message onto queue.""" q = self._ensure_queue(queue) encoded_message = dumps(message) q.send_message(encoded_message) def _get(self, queue, timeout=None): """Try to retrieve a single message off ``queue``.""" q = self._ensure_queue(queue) messages = q.receive_messages(messages_per_page=1, timeout=timeout) try: message = next(messages) except StopIteration: raise Empty() content = loads(message.content) q.delete_message(message=message) return content def _size(self, queue): """Return the number of messages in a queue.""" q = self._ensure_queue(queue) return q.get_queue_properties().approximate_message_count def _purge(self, queue): """Delete all current messages in a queue.""" q = self._ensure_queue(queue) n = self._size(q.queue_name) q.clear_messages() return n @property def queue_service(self) -> QueueServiceClient: if self._queue_service is None: self._queue_service = QueueServiceClient( account_url=self._url, credential=self._credential ) return self._queue_service @property def conninfo(self): return self.connection.client @property def transport_options(self): return self.connection.client.transport_options @cached_property def queue_name_prefix(self) -> str: return self.transport_options.get('queue_name_prefix', '')
[docs] class Transport(virtual.Transport): """Azure Storage Queues transport.""" Channel = Channel polling_interval: int = 1 default_port: Optional[int] = None can_parse_url: bool = True
[docs] @staticmethod def parse_uri(uri: str) -> tuple[str | dict, str]: # URL like: # azurestoragequeues://<STORAGE_ACCOUNT_ACCESS_KEY>@<STORAGE_ACCOUNT_URL> # azurestoragequeues://<SAS_TOKEN>@<STORAGE_ACCOUNT_URL> # azurestoragequeues://DefaultAzureCredential@<STORAGE_ACCOUNT_URL> # azurestoragequeues://ManagedIdentityCredential@<STORAGE_ACCOUNT_URL> # urllib parse does not work as the sas key could contain a slash # e.g.: azurestoragequeues://some/key@someurl try: # > 'some/key@url' uri = uri.replace('azurestoragequeues://', '') # > 'some/key', 'url' credential, url = uri.rsplit('@', 1) if "DefaultAzureCredential".lower() == credential.lower(): if DefaultAzureCredential is None: raise ImportError('Azure Storage Queues transport with a ' 'DefaultAzureCredential requires the ' 'azure-identity library') credential = DefaultAzureCredential() elif "ManagedIdentityCredential".lower() == credential.lower(): if ManagedIdentityCredential is None: raise ImportError('Azure Storage Queues transport with a ' 'ManagedIdentityCredential requires the ' 'azure-identity library') credential = ManagedIdentityCredential() elif "devstoreaccount1" in url and ".core.windows.net" not in url: # parse credential as a dict if Azurite is being used credential = { "account_name": "devstoreaccount1", "account_key": credential, } # Validate parameters assert all([credential, url]) except Exception: raise ValueError( 'Need a URI like ' 'azurestoragequeues://{SAS or access key}@{URL}, ' 'azurestoragequeues://DefaultAzureCredential@{URL}, ' ', or ' 'azurestoragequeues://ManagedIdentityCredential@{URL}' ) return credential, url
[docs] @classmethod def as_uri( cls, uri: str, include_password: bool = False, mask: str = "**" ) -> str: credential, url = cls.parse_uri(uri) return "azurestoragequeues://{}@{}".format( credential if include_password else mask, url )