This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.3.
Azure Service Bus Transport - kombu.transport.azureservicebus¶
Azure Service Bus Message Queue transport module for kombu.
Note that the Shared Access Policy used to connect to Azure Service Bus requires Manage, Send and Listen claims since the broker will create new queues and delete old queues as required.
Notes when using with Celery if you are experiencing issues with programs not
terminating properly. The Azure Service Bus SDK uses the Azure uAMQP library
which in turn creates some threads. If the AzureServiceBus Connection is
closed (e.g. via Connection.release()), said threads will be closed
properly, but it seems there are times when Celery does not do this so these
threads will be left running. As the uAMQP threads are not marked as Daemon
threads, they will not be killed when the main thread exits. Setting the
uamqp_keep_alive_interval transport option to 0 will prevent the
keep_alive thread from starting
More information about Azure Service Bus: https://azure.microsoft.com/en-us/services/service-bus/
Features¶
Type: Virtual
Supports Direct: Unreviewed
Supports Topic: Unreviewed
Supports Fanout: Unreviewed
Supports Priority: Unreviewed
Supports TTL: Unreviewed
Connection String¶
Connection string has the following formats:
azureservicebus://SAS_POLICY_NAME:SAS_KEY@SERVICE_BUSNAMESPACE
azureservicebus://DefaultAzureCredential@SERVICE_BUSNAMESPACE
azureservicebus://ManagedIdentityCredential@SERVICE_BUSNAMESPACE
Transport Options¶
queue_name_prefix- String prefix to prepend to queue names in a service bus namespace.wait_time_seconds- Number of seconds to wait to receive messages. Default5peek_lock_seconds- Number of seconds the message is visible for before it is requeued and sent to another consumer. Default60uamqp_keep_alive_interval- Interval in seconds the Azure uAMQP library should send keepalive messages. Default30retry_total- Azure SDK retry total. Default3retry_backoff_factor- Azure SDK exponential backoff factor. Default0.8retry_backoff_max- Azure SDK retry total time. Default120use_lock_renewal- Enable Azure SDKAutoLockRenewerto keep message locks alive while a worker is processing. Only effective when receive mode isPEEK_LOCK(the default). DefaultFalse.max_lock_renewal_duration- Time in seconds that locks registered to the renewer should be maintained for. Default3600(1 hour).
Added in version 5.7.0: use_lock_renewal and max_lock_renewal_duration transport options.
Transport¶
- class kombu.transport.azureservicebus.Transport(client, **kwargs)[source]¶
Azure Service Bus transport.
- class Channel(*args, **kwargs)¶
Azure Service Bus channel.
- basic_cancel(consumer_tag)¶
Cancel consumer by consumer tag.
- basic_consume(queue, no_ack, callback, consumer_tag, *args, **kwargs)¶
Consume from queue.
- property conninfo¶
- entity_name(name: str, table: dict[int, int] | None = None) str¶
Format AMQP queue name into a valid ServiceBus queue name.
- property queue_mgmt_service: ServiceBusAdministrationClient¶
- property queue_service: ServiceBusClient¶
- property transport_options¶
- classmethod as_uri(uri: str, include_password=False, mask='**') str[source]¶
Customise the display format of the URI.
- can_parse_url = True¶
Set to True if
Connectionshould pass the URL unmodified.
- default_port = None¶
port number used when no port is specified.
- polling_interval = 1¶
Time to sleep between unsuccessful polls.
Channel¶
- class kombu.transport.azureservicebus.Channel(*args, **kwargs)[source]¶
Azure Service Bus channel.
- property conninfo¶
- entity_name(name: str, table: dict[int, int] | None = None) str[source]¶
Format AMQP queue name into a valid ServiceBus queue name.
- property queue_mgmt_service: ServiceBusAdministrationClient¶
- property queue_service: ServiceBusClient¶
- property transport_options¶