This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.3.

Azure Service Bus Transport - kombu.transport.azureservicebus

Azure Service Bus Message Queue transport module for kombu.

Note that the Shared Access Policy used to connect to Azure Service Bus requires Manage, Send and Listen claims since the broker will create new queues and delete old queues as required.

Notes when using with Celery if you are experiencing issues with programs not terminating properly. The Azure Service Bus SDK uses the Azure uAMQP library which in turn creates some threads. If the AzureServiceBus Channel is closed, said threads will be closed properly, but it seems there are times when Celery does not do this so these threads will be left running. As the uAMQP threads are not marked as Daemon threads, they will not be killed when the main thread exits. Setting the uamqp_keep_alive_interval transport option to 0 will prevent the keep_alive thread from starting

More information about Azure Service Bus: https://azure.microsoft.com/en-us/services/service-bus/

Features

  • Type: Virtual

  • Supports Direct: Unreviewed

  • Supports Topic: Unreviewed

  • Supports Fanout: Unreviewed

  • Supports Priority: Unreviewed

  • Supports TTL: Unreviewed

Connection String

Connection string has the following formats:

azureservicebus://SAS_POLICY_NAME:SAS_KEY@SERVICE_BUSNAMESPACE
azureservicebus://DefaultAzureCredential@SERVICE_BUSNAMESPACE
azureservicebus://ManagedIdentityCredential@SERVICE_BUSNAMESPACE

Transport Options

  • queue_name_prefix - String prefix to prepend to queue names in a service bus namespace.

  • wait_time_seconds - Number of seconds to wait to receive messages. Default 5

  • peek_lock_seconds - Number of seconds the message is visible for before it is requeued and sent to another consumer. Default 60

  • uamqp_keep_alive_interval - Interval in seconds the Azure uAMQP library should send keepalive messages. Default 30

  • retry_total - Azure SDK retry total. Default 3

  • retry_backoff_factor - Azure SDK exponential backoff factor. Default 0.8

  • retry_backoff_max - Azure SDK retry total time. Default 120

Transport

class kombu.transport.azureservicebus.Transport(client, **kwargs)[source]

Azure Service Bus transport.

class Channel(*args, **kwargs)

Azure Service Bus channel.

basic_ack(delivery_tag: str, multiple: bool = False) None

Acknowledge message.

basic_cancel(consumer_tag)

Cancel consumer by consumer tag.

basic_consume(queue, no_ack, *args, **kwargs)

Consume from queue.

close() None

Close channel.

Cancel all consumers, and requeue unacked messages.

property conninfo
default_peek_lock_seconds: int = 60
default_retry_backoff_factor: float = 0.8
default_retry_backoff_max: int = 120
default_retry_total: int = 3
default_uamqp_keep_alive_interval: int = 30
default_wait_time_seconds: int = 5
domain_format: str = 'kombu%(vhost)s'
entity_name(name: str, table: dict[int, int] | None = None) str

Format AMQP queue name into a valid ServiceBus queue name.

property peek_lock_seconds: int
property queue_mgmt_service: ServiceBusAdministrationClient
property queue_name_prefix: str
property queue_service: ServiceBusClient
property retry_backoff_factor: float
property retry_backoff_max: int
property retry_total: int
property transport_options
property uamqp_keep_alive_interval: int
property wait_time_seconds: int
classmethod as_uri(uri: str, include_password=False, mask='**') str[source]

Customise the display format of the URI.

can_parse_url = True

Set to True if Connection should pass the URL unmodified.

default_port = None

port number used when no port is specified.

static parse_uri(uri: str) tuple[str, str | DefaultAzureCredential | ManagedIdentityCredential][source]
polling_interval = 1

Time to sleep between unsuccessful polls.

Channel

class kombu.transport.azureservicebus.Channel(*args, **kwargs)[source]

Azure Service Bus channel.

basic_ack(delivery_tag: str, multiple: bool = False) None[source]

Acknowledge message.

basic_cancel(consumer_tag)[source]

Cancel consumer by consumer tag.

basic_consume(queue, no_ack, *args, **kwargs)[source]

Consume from queue.

close() None[source]

Close channel.

Cancel all consumers, and requeue unacked messages.

property conninfo
default_peek_lock_seconds: int = 60
default_retry_backoff_factor: float = 0.8
default_retry_backoff_max: int = 120
default_retry_total: int = 3
default_uamqp_keep_alive_interval: int = 30
default_wait_time_seconds: int = 5
domain_format: str = 'kombu%(vhost)s'
entity_name(name: str, table: dict[int, int] | None = None) str[source]

Format AMQP queue name into a valid ServiceBus queue name.

property peek_lock_seconds: int
property queue_mgmt_service: ServiceBusAdministrationClient
property queue_name_prefix: str
property queue_service: ServiceBusClient
property retry_backoff_factor: float
property retry_backoff_max: int
property retry_total: int
property transport_options
property uamqp_keep_alive_interval: int
property wait_time_seconds: int