This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.3.

Apache QPid Transport - kombu.transport.qpid

Qpid Transport module for kombu.

Qpid transport using qpid-python as the client and qpid-tools for broker management.

The use this transport you must install the necessary dependencies. These dependencies are available via PyPI and can be installed using the pip command:

$ pip install kombu[qpid]

or to install the requirements manually:

$ pip install qpid-tools qpid-python

Python 3 and PyPy Limitations

The Qpid transport does not support Python 3 or PyPy environments due to underlying dependencies not being compatible. This version is tested and works with with Python 2.7.

Features

  • Type: Native

  • Supports Direct: Yes

  • Supports Topic: Yes

  • Supports Fanout: Yes

  • Supports Priority: Yes

  • Supports TTL: Yes

Authentication

This transport supports SASL authentication with the Qpid broker. Normally, SASL mechanisms are negotiated from a client list and a server list of possible mechanisms, but in practice, different SASL client libraries give different behaviors. These different behaviors cause the expected SASL mechanism to not be selected in many cases. As such, this transport restricts the mechanism types based on Kombu’s configuration according to the following table.

Broker String

SASL Mechanism

qpid://hostname/

ANONYMOUS

qpid://username:password@hostname/

PLAIN

see instructions below

EXTERNAL

The user can override the above SASL selection behaviors and specify the SASL string using the login_method argument to the Connection object. The string can be a single SASL mechanism or a space separated list of SASL mechanisms. If you are using Celery with Kombu, this can be accomplished by setting the BROKER_LOGIN_METHOD Celery option.

Note

While using SSL, Qpid users may want to override the SASL mechanism to use EXTERNAL. In that case, Qpid requires a username to be presented that matches the CN of the SSL client certificate. Ensure that the broker string contains the corresponding username. For example, if the client certificate has CN=asdf and the client connects to example.com on port 5671, the broker string should be:

qpid://asdf@example.com:5671/

Transport Options

The transport_options argument to the Connection object are passed directly to the qpid.messaging.endpoints.Connection as keyword arguments. These options override and replace any other default or specified values. If using Celery, this can be accomplished by setting the BROKER_TRANSPORT_OPTIONS Celery option.

Transport

class kombu.transport.qpid.Transport(*args, **kwargs)[source]

Kombu native transport for a Qpid broker.

Provide a native transport for Kombu that allows consumers and producers to read and write messages to/from a broker. This Transport is capable of supporting both synchronous and asynchronous reading. All writes are synchronous through the Channel objects that support this Transport.

Asynchronous reads are done using a call to drain_events(), which synchronously reads messages that were fetched asynchronously, and then handles them through calls to the callback handlers maintained on the Connection object.

The Transport also provides methods to establish and close a connection to the broker. This Transport establishes a factory-like pattern that allows for singleton pattern to consolidate all Connections into a single one.

The Transport can create Channel objects to communicate with the broker with using the create_channel() method.

The Transport identifies recoverable connection errors and recoverable channel errors according to the Kombu 3.0 interface. These exception are listed as tuples and store in the Transport class attribute recoverable_connection_errors and recoverable_channel_errors respectively. Any exception raised that is not a member of one of these tuples is considered non-recoverable. This allows Kombu support for automatic retry of certain operations to function correctly.

For backwards compatibility to the pre Kombu 3.0 exception interface, the recoverable errors are also listed as connection_errors and channel_errors.

class Connection(**connection_options)

Qpid Connection.

Encapsulate a connection object for the Transport.

Parameters:
  • host – The host that connections should connect to.

  • port – The port that connection should connect to.

  • username – The username that connections should connect with. Optional.

  • password – The password that connections should connect with. Optional but requires a username.

  • transport – The transport type that connections should use. Either ‘tcp’, or ‘ssl’ are expected as values.

  • timeout – the timeout used when a Connection connects to the broker.

  • sasl_mechanisms – The sasl authentication mechanism type to use. refer to SASL documentation for an explanation of valid values.

Note

qpid.messaging has an AuthenticationFailure exception type, but instead raises a ConnectionError with a message that indicates an authentication failure occurred in those situations. ConnectionError is listed as a recoverable error type, so kombu will attempt to retry if a ConnectionError is raised. Retrying the operation without adjusting the credentials is not correct, so this method specifically checks for a ConnectionError that indicates an Authentication Failure occurred. In those situations, the error type is mutated while preserving the original message and raised so kombu will allow the exception to not be considered recoverable.

A connection object is created by a Transport during a call to establish_connection(). The Transport passes in connection options as keywords that should be used for any connections created. Each Transport creates exactly one Connection.

A Connection object maintains a reference to a Connection which can be accessed through a bound getter method named get_qpid_connection() method. Each Channel uses a the Connection for each BrokerAgent, and the Transport maintains a session for all senders and receivers.

The Connection object is also responsible for maintaining the dictionary of references to callbacks that should be called when messages are received. These callbacks are saved in _callbacks, and keyed on the queue name associated with the received message. The _callbacks are setup in Channel.basic_consume(), removed in Channel.basic_cancel(), and called in Transport.drain_events().

The following keys are expected to be passed in as keyword arguments at a minimum:

All keyword arguments are collected into the connection_options dict and passed directly through to qpid.messaging.endpoints.Connection.establish().

class Channel(connection, transport)

Supports broker configuration and messaging send and receive.

Parameters:

A channel object is designed to have method-parity with a Channel as defined in AMQP 0-10 and earlier, which allows for the following broker actions:

  • exchange declare and delete

  • queue declare and delete

  • queue bind and unbind operations

  • queue length and purge operations

  • sending/receiving/rejecting messages

  • structuring, encoding, and decoding messages

  • supports synchronous and asynchronous reads

  • reading state about the exchange, queues, and bindings

Channels are designed to all share a single TCP connection with a broker, but provide a level of isolated communication with the broker while benefiting from a shared TCP connection. The Channel is given its Connection object by the Transport that instantiates the channel.

This channel inherits from StdChannel, which makes this a ‘native’ channel versus a ‘virtual’ channel which would inherit from kombu.transports.virtual.

Messages sent using this channel are assigned a delivery_tag. The delivery_tag is generated for a message as they are prepared for sending by basic_publish(). The delivery_tag is unique per channel instance. The delivery_tag has no meaningful context in other objects, and is only maintained in the memory of this object, and the underlying QoS object that provides support.

Each channel object instantiates exactly one QoS object for prefetch limiting, and asynchronous ACKing. The QoS object is lazily instantiated through a property method qos(). The QoS object is a supporting object that should not be accessed directly except by the channel itself.

Synchronous reads on a queue are done using a call to basic_get() which uses _get() to perform the reading. These methods read immediately and do not accept any form of timeout. basic_get() reads synchronously and ACKs messages before returning them. ACKing is done in all cases, because an application that reads messages using qpid.messaging, but does not ACK them will experience a memory leak. The no_ack argument to basic_get() does not affect ACKing functionality.

Asynchronous reads on a queue are done by starting a consumer using basic_consume(). Each call to basic_consume() will cause a Receiver to be created on the Session started by the :class: Transport. The receiver will asynchronously read using qpid.messaging, and prefetch messages before the call to Transport.basic_drain() occurs. The prefetch_count value of the QoS object is the capacity value of the new receiver. The new receiver capacity must always be at least 1, otherwise none of the receivers will appear to be ready for reading, and will never be read from.

Each call to basic_consume() creates a consumer, which is given a consumer tag that is identified by the caller of basic_consume(). Already started consumers can be cancelled using by their consumer_tag using basic_cancel(). Cancellation of a consumer causes the Receiver object to be closed.

Asynchronous message ACKing is supported through basic_ack(), and is referenced by delivery_tag. The Channel object uses its QoS object to perform the message ACKing.

class Message(payload, channel=None, **kwargs)

Message object.

accept
body
channel
content_encoding
content_type
delivery_info
delivery_tag
headers
properties
serializable()
class QoS(session, prefetch_count=1)

A helper object for message prefetch and ACKing purposes.

Keyword Arguments:

prefetch_count – Initial prefetch count, hard set to 1.

NOTE: prefetch_count is currently hard set to 1, and needs to be improved

This object is instantiated 1-for-1 with a Channel instance. QoS allows prefetch_count to be set to the number of outstanding messages the corresponding Channel should be allowed to prefetch. Setting prefetch_count to 0 disables prefetch limits, and the object can hold an arbitrary number of messages.

Messages are added using append(), which are held until they are ACKed asynchronously through a call to ack(). Messages that are received, but not ACKed will not be delivered by the broker to another consumer until an ACK is received, or the session is closed. Messages are referred to using delivery_tag, which are unique per Channel. Delivery tags are managed outside of this object and are passed in with a message to append(). Un-ACKed messages can be looked up from QoS using get() and can be rejected and forgotten using reject().

ack(delivery_tag)

Acknowledge a message by delivery_tag.

Called asynchronously once the message has been handled and can be forgotten by the broker.

Parameters:

delivery_tag (uuid.UUID) – the delivery tag associated with the message to be acknowledged.

append(message, delivery_tag)

Append message to the list of un-ACKed messages.

Add a message, referenced by the delivery_tag, for ACKing, rejecting, or getting later. Messages are saved into a dict by delivery_tag.

Parameters:
  • message (qpid.messaging.Message) – A received message that has not yet been ACKed.

  • delivery_tag (uuid.UUID) – A UUID to refer to this message by upon receipt.

can_consume()

Return True if the Channel can consume more messages.

Used to ensure the client adheres to currently active prefetch limits.

Returns:

True, if this QoS object can accept more messages without violating the prefetch_count. If prefetch_count is 0, can_consume will always return True.

Return type:

bool

can_consume_max_estimate()

Return the remaining message capacity.

Returns an estimated number of outstanding messages that a kombu.transport.qpid.Channel can accept without exceeding prefetch_count. If prefetch_count is 0, then this method returns 1.

Returns:

The number of estimated messages that can be fetched without violating the prefetch_count.

Return type:

int

get(delivery_tag)

Get an un-ACKed message by delivery_tag.

If called with an invalid delivery_tag a KeyError is raised.

Parameters:

delivery_tag (uuid.UUID) – The delivery tag associated with the message to be returned.

Returns:

An un-ACKed message that is looked up by delivery_tag.

Return type:

qpid.messaging.Message

reject(delivery_tag, requeue=False)

Reject a message by delivery_tag.

Explicitly notify the broker that the channel associated with this QoS object is rejecting the message that was previously delivered.

If requeue is False, then the message is not requeued for delivery to another consumer. If requeue is True, then the message is requeued for delivery to another consumer.

Parameters:

delivery_tag (uuid.UUID) – The delivery tag associated with the message to be rejected.

Keyword Arguments:

requeue – If True, the broker will be notified to requeue the message. If False, the broker will be told to drop the message entirely. In both cases, the message will be removed from this object.

basic_ack(delivery_tag, multiple=False)

Acknowledge a message by delivery_tag.

Acknowledges a message referenced by delivery_tag. Messages can only be ACKed using basic_ack() if they were acquired using basic_consume(). This is the ACKing portion of the asynchronous read behavior.

Internally, this method uses the QoS object, which stores messages and is responsible for the ACKing.

Parameters:
  • delivery_tag (uuid.UUID) – The delivery tag associated with the message to be acknowledged.

  • multiple (bool) – not implemented. If set to True an AssertionError is raised.

basic_cancel(consumer_tag)

Cancel consumer by consumer tag.

Request the consumer stops reading messages from its queue. The consumer is a Receiver, and it is closed using close().

This method also cleans up all lingering references of the consumer.

Parameters:

consumer_tag (an immutable object) – The tag which refers to the consumer to be cancelled. Originally specified when the consumer was created as a parameter to basic_consume().

basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)

Start an asynchronous consumer that reads from a queue.

This method starts a consumer of type Receiver using the Session created and referenced by the Transport that reads messages from a queue specified by name until stopped by a call to basic_cancel().

Messages are available later through a synchronous call to Transport.drain_events(), which will drain from the consumer started by this method. Transport.drain_events() is synchronous, but the receiving of messages over the network occurs asynchronously, so it should still perform well. Transport.drain_events() calls the callback provided here with the Message of type self.Message.

Each consumer is referenced by a consumer_tag, which is provided by the caller of this method.

This method sets up the callback onto the self.connection object in a dict keyed by queue name. drain_events() is responsible for calling that callback upon message receipt.

All messages that are received are added to the QoS object to be saved for asynchronous ACKing later after the message has been handled by the caller of drain_events(). Messages can be ACKed after being received through a call to basic_ack().

If no_ack is True, The no_ack flag indicates that the receiver of the message will not call basic_ack() later. Since the message will not be ACKed later, it is ACKed immediately.

basic_consume() transforms the message object type prior to calling the callback. Initially the message comes in as a qpid.messaging.Message. This method unpacks the payload of the qpid.messaging.Message and creates a new object of type self.Message.

This method wraps the user delivered callback in a runtime-built function which provides the type transformation from qpid.messaging.Message to Message, and adds the message to the associated QoS object for asynchronous ACKing if necessary.

Parameters:
  • queue (str) – The name of the queue to consume messages from

  • no_ack (bool) – If True, then messages will not be saved for ACKing later, but will be ACKed immediately. If False, then messages will be saved for ACKing later with a call to basic_ack().

  • callback (a callable object) – a callable that will be called when messages arrive on the queue.

  • consumer_tag (an immutable object) – a tag to reference the created consumer by. This consumer_tag is needed to cancel the consumer.

basic_get(queue, no_ack=False, **kwargs)

Non-blocking single message get and ACK from a queue by name.

Internally this method uses _get() to fetch the message. If an Empty exception is raised by _get(), this method silences it and returns None. If _get() does return a message, that message is ACKed. The no_ack parameter has no effect on ACKing behavior, and all messages are ACKed in all cases. This method never adds fetched Messages to the internal QoS object for asynchronous ACKing.

This method converts the object type of the method as it passes through. Fetching from the broker, _get() returns a qpid.messaging.Message, but this method takes the payload of the qpid.messaging.Message and instantiates a Message object with the payload based on the class setting of self.Message.

Parameters:

queue (str) – The queue name to fetch a message from.

Keyword Arguments:

no_ack – The no_ack parameter has no effect on the ACK behavior of this method. Un-ACKed messages create a memory leak in qpid.messaging, and need to be ACKed in all cases.

Returns:

The received message.

Return type:

Message

basic_publish(message, exchange, routing_key, **kwargs)

Publish message onto an exchange using a routing key.

Publish a message onto an exchange specified by name using a routing key specified by routing_key. Prepares the message in the following ways before sending:

  • encodes the body using encode_body()

  • wraps the body as a buffer object, so that

    qpid.messaging.endpoints.Sender uses a content type that can support arbitrarily large messages.

  • sets delivery_tag to a random uuid.UUID

  • sets the exchange and routing_key info as delivery_info

Internally uses _put() to send the message synchronously. This message is typically called by kombu.messaging.Producer._publish as the final step in message publication.

Parameters:
  • message (dict) – A dict containing key value pairs with the message data. A valid message dict can be generated using the prepare_message() method.

  • exchange (str) – The name of the exchange to submit this message onto.

  • routing_key (str) – The routing key to be used as the message is submitted onto the exchange.

basic_qos(prefetch_count, *args)

Change QoS settings for this Channel.

Set the number of un-acknowledged messages this Channel can fetch and hold. The prefetch_value is also used as the capacity for any new Receiver objects.

Currently, this value is hard coded to 1.

Parameters:

prefetch_count (int) – Not used. This method is hard-coded to 1.

basic_reject(delivery_tag, requeue=False)

Reject a message by delivery_tag.

Rejects a message that has been received by the Channel, but not yet acknowledged. Messages are referenced by their delivery_tag.

If requeue is False, the rejected message will be dropped by the broker and not delivered to any other consumers. If requeue is True, then the rejected message will be requeued for delivery to another consumer, potentially to the same consumer who rejected the message previously.

Parameters:

delivery_tag (uuid.UUID) – The delivery tag associated with the message to be rejected.

Keyword Arguments:

requeue – If False, the rejected message will be dropped by the broker and not delivered to any other consumers. If True, then the rejected message will be requeued for delivery to another consumer, potentially to the same consumer who rejected the message previously.

body_encoding = 'base64'

Default body encoding. NOTE: transport_options['body_encoding'] will override this value.

close()

Cancel all associated messages and close the Channel.

This cancels all consumers by calling basic_cancel() for each known consumer_tag. It also closes the self._broker sessions. Closing the sessions implicitly causes all outstanding, un-ACKed messages to be considered undelivered by the broker.

codecs = {'base64': <kombu.transport.virtual.base.Base64 object>}

Binary <-> ASCII codecs.

decode_body(body, encoding=None)

Decode a body using an optionally specified encoding.

The encoding can be specified by name, and is looked up in self.codecs. self.codecs uses strings as its keys which specify the name of the encoding, and then the value is an instantiated object that can provide encoding/decoding of that type through encode and decode methods.

Parameters:

body (str) – The body to be encoded.

Keyword Arguments:

encoding – The encoding type to be used. Must be a supported codec listed in self.codecs.

Returns:

If encoding is specified, the decoded body is returned. If encoding is not specified, the body is returned unchanged.

Return type:

str

encode_body(body, encoding=None)

Encode a body using an optionally specified encoding.

The encoding can be specified by name, and is looked up in self.codecs. self.codecs uses strings as its keys which specify the name of the encoding, and then the value is an instantiated object that can provide encoding/decoding of that type through encode and decode methods.

Parameters:

body (str) – The body to be encoded.

Keyword Arguments:

encoding – The encoding type to be used. Must be a supported codec listed in self.codecs.

Returns:

If encoding is specified, return a tuple with the first position being the encoded body, and the second position the encoding used. If encoding is not specified, the body is passed through unchanged.

Return type:

tuple

exchange_declare(exchange='', type='direct', durable=False, **kwargs)

Create a new exchange.

Create an exchange of a specific type, and optionally have the exchange be durable. If an exchange of the requested name already exists, no action is taken and no exceptions are raised. Durable exchanges will survive a broker restart, non-durable exchanges will not.

Exchanges provide behaviors based on their type. The expected behaviors are those defined in the AMQP 0-10 and prior specifications including ‘direct’, ‘topic’, and ‘fanout’ functionality.

Keyword Arguments:
  • type – The exchange type. Valid values include ‘direct’, ‘topic’, and ‘fanout’.

  • exchange – The name of the exchange to be created. If no exchange is specified, then a blank string will be used as the name.

  • durable – True if the exchange should be durable, or False otherwise.

exchange_delete(exchange_name, **kwargs)

Delete an exchange specified by name.

Parameters:

exchange_name (str) – The name of the exchange to be deleted.

prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)

Prepare message data for sending.

This message is typically called by kombu.messaging.Producer._publish() as a preparation step in message publication.

Parameters:

body (str) – The body of the message

Keyword Arguments:
  • priority – A number between 0 and 9 that sets the priority of the message.

  • content_type – The content_type the message body should be treated as. If this is unset, the qpid.messaging.endpoints.Sender object tries to autodetect the content_type from the body.

  • content_encoding – The content_encoding the message body is encoded as.

  • headers – Additional Message headers that should be set. Passed in as a key-value pair.

  • properties – Message properties to be set on the message.

Returns:

Returns a dict object that encapsulates message attributes. See parameters for more details on attributes that can be set.

Return type:

dict

property qos

QoS manager for this channel.

Lazily instantiates an object of type QoS upon access to the self.qos attribute.

Returns:

An already existing, or newly created QoS object

Return type:

QoS

queue_bind(queue, exchange, routing_key, **kwargs)

Bind a queue to an exchange with a bind key.

Bind a queue specified by name, to an exchange specified by name, with a specific bind key. The queue and exchange must already exist on the broker for the bind to complete successfully. Queues may be bound to exchanges multiple times with different keys.

Parameters:
  • queue (str) – The name of the queue to be bound.

  • exchange (str) – The name of the exchange that the queue should be bound to.

  • routing_key (str) – The bind key that the specified queue should bind to the specified exchange with.

queue_declare(queue, passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None)

Create a new queue specified by name.

If the queue already exists, no change is made to the queue, and the return value returns information about the existing queue.

The queue name is required and specified as the first argument.

If passive is True, the server will not create the queue. The client can use this to check whether a queue exists without modifying the server state. Default is False.

If durable is True, the queue will be durable. Durable queues remain active when a server restarts. Non-durable queues ( transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue. Default is False.

If exclusive is True, the queue will be exclusive. Exclusive queues may only be consumed by the current connection. Setting the ‘exclusive’ flag always implies ‘auto-delete’. Default is False.

If auto_delete is True, the queue is deleted when all consumers have finished using it. The last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won’t be deleted. Default is True.

The nowait parameter is unused. It was part of the 0-9-1 protocol, but this AMQP client implements 0-10 which removed the nowait option.

The arguments parameter is a set of arguments for the declaration of the queue. Arguments are passed as a dict or None. This field is ignored if passive is True. Default is None.

This method returns a namedtuple with the name ‘queue_declare_ok_t’ and the queue name as ‘queue’, message count on the queue as ‘message_count’, and the number of active consumers as ‘consumer_count’. The named tuple values are ordered as queue, message_count, and consumer_count respectively.

Due to Celery’s non-ACKing of events, a ring policy is set on any queue that starts with the string ‘celeryev’ or ends with the string ‘pidbox’. These are celery event queues, and Celery does not ack them, causing the messages to build-up. Eventually Qpid stops serving messages unless the ‘ring’ policy is set, at which point the buffer backing the queue becomes circular.

Parameters:
  • queue (str) – The name of the queue to be created.

  • passive (bool) – If True, the sever will not create the queue.

  • durable (bool) – If True, the queue will be durable.

  • exclusive (bool) – If True, the queue will be exclusive.

  • auto_delete (bool) – If True, the queue is deleted when all consumers have finished using it.

  • nowait (bool) – This parameter is unused since the 0-10 specification does not include it.

  • arguments (dict or None) – A set of arguments for the declaration of the queue.

Returns:

A named tuple representing the declared queue as a named tuple. The tuple values are ordered as queue, message count, and the active consumer count.

Return type:

namedtuple

queue_delete(queue, if_unused=False, if_empty=False, **kwargs)

Delete a queue by name.

Delete a queue specified by name. Using the if_unused keyword argument, the delete can only occur if there are 0 consumers bound to it. Using the if_empty keyword argument, the delete can only occur if there are 0 messages in the queue.

Parameters:

queue (str) – The name of the queue to be deleted.

Keyword Arguments:
  • if_unused – If True, delete only if the queue has 0 consumers. If False, delete a queue even with consumers bound to it.

  • if_empty – If True, only delete the queue if it is empty. If False, delete the queue if it is empty or not.

queue_purge(queue, **kwargs)

Remove all undelivered messages from queue.

Purge all undelivered messages from a queue specified by name. If the queue does not exist an exception is raised. The queue message depth is first checked, and then the broker is asked to purge that number of messages. The integer number of messages requested to be purged is returned. The actual number of messages purged may be different than the requested number of messages to purge.

Sometimes delivered messages are asked to be purged, but are not. This case fails silently, which is the correct behavior when a message that has been delivered to a different consumer, who has not ACKed the message, and still has an active session with the broker. Messages in that case are not safe for purging and will be retained by the broker. The client is unable to change this delivery behavior.

Internally, this method relies on _purge().

Parameters:

queue (str) – The name of the queue which should have all messages removed.

Returns:

The number of messages requested to be purged.

Return type:

int

Raises:

qpid.messaging.exceptions.NotFound if the queue being purged cannot be found.

queue_unbind(queue, exchange, routing_key, **kwargs)

Unbind a queue from an exchange with a given bind key.

Unbind a queue specified by name, from an exchange specified by name, that is already bound with a bind key. The queue and exchange must already exist on the broker, and bound with the bind key for the operation to complete successfully. Queues may be bound to exchanges multiple times with different keys, thus the bind key is a required field to unbind in an explicit way.

Parameters:
  • queue (str) – The name of the queue to be unbound.

  • exchange (str) – The name of the exchange that the queue should be unbound from.

  • routing_key (str) – The existing bind key between the specified queue and a specified exchange that should be unbound.

typeof(exchange, default='direct')

Get the exchange type.

Lookup and return the exchange type for an exchange specified by name. Exchange types are expected to be ‘direct’, ‘topic’, and ‘fanout’, which correspond with exchange functionality as specified in AMQP 0-10 and earlier. If the exchange cannot be found, the default exchange type is returned.

Parameters:

exchange (str) – The exchange to have its type lookup up.

Keyword Arguments:

default – The type of exchange to assume if the exchange does not exist.

Returns:

The exchange type either ‘direct’, ‘topic’, or ‘fanout’.

Return type:

str

close()

Close the connection.

Closing the connection will close all associated session, senders, or receivers used by the Connection.

close_channel(channel)

Close a Channel.

Close a channel specified by a reference to the Channel object.

Parameters:

channel (Channel.) – Channel that should be closed.

get_qpid_connection()

Return the existing connection (singleton).

Returns:

The existing qpid.messaging.Connection

Return type:

qpid.messaging.endpoints.Connection

channel_errors = (None,)

Tuple of errors that can happen due to channel/method failure.

close_connection(connection)[source]

Close the Connection object.

Parameters:

connection (kombu.transport.qpid.Connection) – The Connection that should be closed.

connection_errors = (None, <class 'OSError'>)

Tuple of errors that can happen due to connection failure.

create_channel(connection)[source]

Create and return a Channel.

Creates a new channel, and appends the channel to the list of channels known by the Connection. Once the new channel is created, it is returned.

Parameters:

connection (kombu.transport.qpid.Connection) – The connection that should support the new Channel.

Returns:

The new Channel that is made.

Return type:

kombu.transport.qpid.Channel.

property default_connection_params

Return a dict with default connection parameters.

These connection parameters will be used whenever the creator of Transport does not specify a required parameter.

Returns:

A dict containing the default parameters.

Return type:

dict

drain_events(connection, timeout=0, **kwargs)[source]

Handle and call callbacks for all ready Transport messages.

Drains all events that are ready from all Receiver that are asynchronously fetching messages.

For each drained message, the message is called to the appropriate callback. Callbacks are organized by queue name.

Parameters:

connection (kombu.transport.qpid.Connection) – The Connection that contains the callbacks, indexed by queue name, which will be called by this method.

Keyword Arguments:

timeout – The timeout that limits how long this method will run for. The timeout could interrupt a blocking read that is waiting for a new message, or cause this method to return before all messages are drained. Defaults to 0.

driver_name = 'qpid'

Name of driver library (e.g. ‘py-amqp’, ‘redis’).

driver_type = 'qpid'

Type of driver, can be used to separate transports using the AMQP protocol (driver_type: ‘amqp’), Redis (driver_type: ‘redis’), etc…

establish_connection()[source]

Establish a Connection object.

Determines the correct options to use when creating any connections needed by this Transport, and create a Connection object which saves those values for connections generated as they are needed. The options are a mixture of what is passed in through the creator of the Transport, and the defaults provided by default_connection_params(). Options cover broker network settings, timeout behaviors, authentication, and identity verification settings.

This method also creates and stores a Session using the Connection created by this method. The Session is stored on self.

Returns:

The created Connection object is returned.

Return type:

Connection

implements = {'asynchronous': True, 'exchange_type': frozenset({'direct', 'fanout', 'topic'}), 'heartbeats': False}
on_readable(connection, loop)[source]

Handle any messages associated with this Transport.

This method clears a single message from the externally monitored file descriptor by issuing a read call to the self.r file descriptor which removes a single ‘0’ character that was placed into the pipe by the Qpid session message callback handler. Once a ‘0’ is read, all available events are drained through a call to drain_events().

The file descriptor self.r is modified to be non-blocking, ensuring that an accidental call to this method when no more messages will not cause indefinite blocking.

Nothing is expected to be returned from drain_events() because drain_events() handles messages by calling callbacks that are maintained on the Connection object. When drain_events() returns, all associated messages have been handled.

This method calls drain_events() which reads as many messages as are available for this Transport, and then returns. It blocks in the sense that reading and handling a large number of messages may take time, but it does not block waiting for a new message to arrive. When drain_events() is called a timeout is not specified, which causes this behavior.

One interesting behavior of note is where multiple messages are ready, and this method removes a single ‘0’ character from self.r, but drain_events() may handle an arbitrary amount of messages. In that case, extra ‘0’ characters may be left on self.r to be read, where messages corresponding with those ‘0’ characters have already been handled. The external epoll loop will incorrectly think additional data is ready for reading, and will call on_readable unnecessarily, once for each ‘0’ to be read. Additional calls to on_readable() produce no negative side effects, and will eventually clear out the symbols from the self.r file descriptor. If new messages show up during this draining period, they will also be properly handled.

Parameters:
  • connection (kombu.transport.qpid.Connection) – The connection associated with the readable events, which contains the callbacks that need to be called for the readable objects.

  • loop (kombu.asynchronous.Hub) – The asynchronous loop object that contains epoll like functionality.

polling_interval = None
recoverable_channel_errors = (None,)
recoverable_connection_errors = (None, <class 'OSError'>)
register_with_event_loop(connection, loop)[source]

Register a file descriptor and callback with the loop.

Register the callback self.on_readable to be called when an external epoll loop sees that the file descriptor registered is ready for reading. The file descriptor is created by this Transport, and is written to when a message is available.

Because supports_ev == True, Celery expects to call this method to give the Transport an opportunity to register a read file descriptor for external monitoring by celery using an Event I/O notification mechanism such as epoll. A callback is also registered that is to be called once the external epoll loop is ready to handle the epoll event associated with messages that are ready to be handled for this Transport.

The registration call is made exactly once per Transport after the Transport is instantiated.

Parameters:
verify_runtime_environment()[source]

Verify that the runtime environment is acceptable.

This method is called as part of __init__ and raises a RuntimeError in Python3 or PyPI environments. This module is not compatible with Python3 or PyPI. The RuntimeError identifies this to the user up front along with suggesting Python 2.6+ be used instead.

This method also checks that the dependencies qpidtoollibs and qpid.messaging are installed. If either one is not installed a RuntimeError is raised.

Raises:

RuntimeError if the runtime environment is not acceptable.

Connection

class kombu.transport.qpid.Connection(**connection_options)[source]

Qpid Connection.

Encapsulate a connection object for the Transport.

Parameters:
  • host – The host that connections should connect to.

  • port – The port that connection should connect to.

  • username – The username that connections should connect with. Optional.

  • password – The password that connections should connect with. Optional but requires a username.

  • transport – The transport type that connections should use. Either ‘tcp’, or ‘ssl’ are expected as values.

  • timeout – the timeout used when a Connection connects to the broker.

  • sasl_mechanisms – The sasl authentication mechanism type to use. refer to SASL documentation for an explanation of valid values.

Note

qpid.messaging has an AuthenticationFailure exception type, but instead raises a ConnectionError with a message that indicates an authentication failure occurred in those situations. ConnectionError is listed as a recoverable error type, so kombu will attempt to retry if a ConnectionError is raised. Retrying the operation without adjusting the credentials is not correct, so this method specifically checks for a ConnectionError that indicates an Authentication Failure occurred. In those situations, the error type is mutated while preserving the original message and raised so kombu will allow the exception to not be considered recoverable.

A connection object is created by a Transport during a call to establish_connection(). The Transport passes in connection options as keywords that should be used for any connections created. Each Transport creates exactly one Connection.

A Connection object maintains a reference to a Connection which can be accessed through a bound getter method named get_qpid_connection() method. Each Channel uses a the Connection for each BrokerAgent, and the Transport maintains a session for all senders and receivers.

The Connection object is also responsible for maintaining the dictionary of references to callbacks that should be called when messages are received. These callbacks are saved in _callbacks, and keyed on the queue name associated with the received message. The _callbacks are setup in Channel.basic_consume(), removed in Channel.basic_cancel(), and called in Transport.drain_events().

The following keys are expected to be passed in as keyword arguments at a minimum:

All keyword arguments are collected into the connection_options dict and passed directly through to qpid.messaging.endpoints.Connection.establish().

class Channel(connection, transport)

Supports broker configuration and messaging send and receive.

Parameters:

A channel object is designed to have method-parity with a Channel as defined in AMQP 0-10 and earlier, which allows for the following broker actions:

  • exchange declare and delete

  • queue declare and delete

  • queue bind and unbind operations

  • queue length and purge operations

  • sending/receiving/rejecting messages

  • structuring, encoding, and decoding messages

  • supports synchronous and asynchronous reads

  • reading state about the exchange, queues, and bindings

Channels are designed to all share a single TCP connection with a broker, but provide a level of isolated communication with the broker while benefiting from a shared TCP connection. The Channel is given its Connection object by the Transport that instantiates the channel.

This channel inherits from StdChannel, which makes this a ‘native’ channel versus a ‘virtual’ channel which would inherit from kombu.transports.virtual.

Messages sent using this channel are assigned a delivery_tag. The delivery_tag is generated for a message as they are prepared for sending by basic_publish(). The delivery_tag is unique per channel instance. The delivery_tag has no meaningful context in other objects, and is only maintained in the memory of this object, and the underlying QoS object that provides support.

Each channel object instantiates exactly one QoS object for prefetch limiting, and asynchronous ACKing. The QoS object is lazily instantiated through a property method qos(). The QoS object is a supporting object that should not be accessed directly except by the channel itself.

Synchronous reads on a queue are done using a call to basic_get() which uses _get() to perform the reading. These methods read immediately and do not accept any form of timeout. basic_get() reads synchronously and ACKs messages before returning them. ACKing is done in all cases, because an application that reads messages using qpid.messaging, but does not ACK them will experience a memory leak. The no_ack argument to basic_get() does not affect ACKing functionality.

Asynchronous reads on a queue are done by starting a consumer using basic_consume(). Each call to basic_consume() will cause a Receiver to be created on the Session started by the :class: Transport. The receiver will asynchronously read using qpid.messaging, and prefetch messages before the call to Transport.basic_drain() occurs. The prefetch_count value of the QoS object is the capacity value of the new receiver. The new receiver capacity must always be at least 1, otherwise none of the receivers will appear to be ready for reading, and will never be read from.

Each call to basic_consume() creates a consumer, which is given a consumer tag that is identified by the caller of basic_consume(). Already started consumers can be cancelled using by their consumer_tag using basic_cancel(). Cancellation of a consumer causes the Receiver object to be closed.

Asynchronous message ACKing is supported through basic_ack(), and is referenced by delivery_tag. The Channel object uses its QoS object to perform the message ACKing.

class Message(payload, channel=None, **kwargs)

Message object.

accept
body
channel
content_encoding
content_type
delivery_info
delivery_tag
headers
properties
serializable()
class QoS(session, prefetch_count=1)

A helper object for message prefetch and ACKing purposes.

Keyword Arguments:

prefetch_count – Initial prefetch count, hard set to 1.

NOTE: prefetch_count is currently hard set to 1, and needs to be improved

This object is instantiated 1-for-1 with a Channel instance. QoS allows prefetch_count to be set to the number of outstanding messages the corresponding Channel should be allowed to prefetch. Setting prefetch_count to 0 disables prefetch limits, and the object can hold an arbitrary number of messages.

Messages are added using append(), which are held until they are ACKed asynchronously through a call to ack(). Messages that are received, but not ACKed will not be delivered by the broker to another consumer until an ACK is received, or the session is closed. Messages are referred to using delivery_tag, which are unique per Channel. Delivery tags are managed outside of this object and are passed in with a message to append(). Un-ACKed messages can be looked up from QoS using get() and can be rejected and forgotten using reject().

ack(delivery_tag)

Acknowledge a message by delivery_tag.

Called asynchronously once the message has been handled and can be forgotten by the broker.

Parameters:

delivery_tag (uuid.UUID) – the delivery tag associated with the message to be acknowledged.

append(message, delivery_tag)

Append message to the list of un-ACKed messages.

Add a message, referenced by the delivery_tag, for ACKing, rejecting, or getting later. Messages are saved into a dict by delivery_tag.

Parameters:
  • message (qpid.messaging.Message) – A received message that has not yet been ACKed.

  • delivery_tag (uuid.UUID) – A UUID to refer to this message by upon receipt.

can_consume()

Return True if the Channel can consume more messages.

Used to ensure the client adheres to currently active prefetch limits.

Returns:

True, if this QoS object can accept more messages without violating the prefetch_count. If prefetch_count is 0, can_consume will always return True.

Return type:

bool

can_consume_max_estimate()

Return the remaining message capacity.

Returns an estimated number of outstanding messages that a kombu.transport.qpid.Channel can accept without exceeding prefetch_count. If prefetch_count is 0, then this method returns 1.

Returns:

The number of estimated messages that can be fetched without violating the prefetch_count.

Return type:

int

get(delivery_tag)

Get an un-ACKed message by delivery_tag.

If called with an invalid delivery_tag a KeyError is raised.

Parameters:

delivery_tag (uuid.UUID) – The delivery tag associated with the message to be returned.

Returns:

An un-ACKed message that is looked up by delivery_tag.

Return type:

qpid.messaging.Message

reject(delivery_tag, requeue=False)

Reject a message by delivery_tag.

Explicitly notify the broker that the channel associated with this QoS object is rejecting the message that was previously delivered.

If requeue is False, then the message is not requeued for delivery to another consumer. If requeue is True, then the message is requeued for delivery to another consumer.

Parameters:

delivery_tag (uuid.UUID) – The delivery tag associated with the message to be rejected.

Keyword Arguments:

requeue – If True, the broker will be notified to requeue the message. If False, the broker will be told to drop the message entirely. In both cases, the message will be removed from this object.

basic_ack(delivery_tag, multiple=False)

Acknowledge a message by delivery_tag.

Acknowledges a message referenced by delivery_tag. Messages can only be ACKed using basic_ack() if they were acquired using basic_consume(). This is the ACKing portion of the asynchronous read behavior.

Internally, this method uses the QoS object, which stores messages and is responsible for the ACKing.

Parameters:
  • delivery_tag (uuid.UUID) – The delivery tag associated with the message to be acknowledged.

  • multiple (bool) – not implemented. If set to True an AssertionError is raised.

basic_cancel(consumer_tag)

Cancel consumer by consumer tag.

Request the consumer stops reading messages from its queue. The consumer is a Receiver, and it is closed using close().

This method also cleans up all lingering references of the consumer.

Parameters:

consumer_tag (an immutable object) – The tag which refers to the consumer to be cancelled. Originally specified when the consumer was created as a parameter to basic_consume().

basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)

Start an asynchronous consumer that reads from a queue.

This method starts a consumer of type Receiver using the Session created and referenced by the Transport that reads messages from a queue specified by name until stopped by a call to basic_cancel().

Messages are available later through a synchronous call to Transport.drain_events(), which will drain from the consumer started by this method. Transport.drain_events() is synchronous, but the receiving of messages over the network occurs asynchronously, so it should still perform well. Transport.drain_events() calls the callback provided here with the Message of type self.Message.

Each consumer is referenced by a consumer_tag, which is provided by the caller of this method.

This method sets up the callback onto the self.connection object in a dict keyed by queue name. drain_events() is responsible for calling that callback upon message receipt.

All messages that are received are added to the QoS object to be saved for asynchronous ACKing later after the message has been handled by the caller of drain_events(). Messages can be ACKed after being received through a call to basic_ack().

If no_ack is True, The no_ack flag indicates that the receiver of the message will not call basic_ack() later. Since the message will not be ACKed later, it is ACKed immediately.

basic_consume() transforms the message object type prior to calling the callback. Initially the message comes in as a qpid.messaging.Message. This method unpacks the payload of the qpid.messaging.Message and creates a new object of type self.Message.

This method wraps the user delivered callback in a runtime-built function which provides the type transformation from qpid.messaging.Message to Message, and adds the message to the associated QoS object for asynchronous ACKing if necessary.

Parameters:
  • queue (str) – The name of the queue to consume messages from

  • no_ack (bool) – If True, then messages will not be saved for ACKing later, but will be ACKed immediately. If False, then messages will be saved for ACKing later with a call to basic_ack().

  • callback (a callable object) – a callable that will be called when messages arrive on the queue.

  • consumer_tag (an immutable object) – a tag to reference the created consumer by. This consumer_tag is needed to cancel the consumer.

basic_get(queue, no_ack=False, **kwargs)

Non-blocking single message get and ACK from a queue by name.

Internally this method uses _get() to fetch the message. If an Empty exception is raised by _get(), this method silences it and returns None. If _get() does return a message, that message is ACKed. The no_ack parameter has no effect on ACKing behavior, and all messages are ACKed in all cases. This method never adds fetched Messages to the internal QoS object for asynchronous ACKing.

This method converts the object type of the method as it passes through. Fetching from the broker, _get() returns a qpid.messaging.Message, but this method takes the payload of the qpid.messaging.Message and instantiates a Message object with the payload based on the class setting of self.Message.

Parameters:

queue (str) – The queue name to fetch a message from.

Keyword Arguments:

no_ack – The no_ack parameter has no effect on the ACK behavior of this method. Un-ACKed messages create a memory leak in qpid.messaging, and need to be ACKed in all cases.

Returns:

The received message.

Return type:

Message

basic_publish(message, exchange, routing_key, **kwargs)

Publish message onto an exchange using a routing key.

Publish a message onto an exchange specified by name using a routing key specified by routing_key. Prepares the message in the following ways before sending:

  • encodes the body using encode_body()

  • wraps the body as a buffer object, so that

    qpid.messaging.endpoints.Sender uses a content type that can support arbitrarily large messages.

  • sets delivery_tag to a random uuid.UUID

  • sets the exchange and routing_key info as delivery_info

Internally uses _put() to send the message synchronously. This message is typically called by kombu.messaging.Producer._publish as the final step in message publication.

Parameters:
  • message (dict) – A dict containing key value pairs with the message data. A valid message dict can be generated using the prepare_message() method.

  • exchange (str) – The name of the exchange to submit this message onto.

  • routing_key (str) – The routing key to be used as the message is submitted onto the exchange.

basic_qos(prefetch_count, *args)

Change QoS settings for this Channel.

Set the number of un-acknowledged messages this Channel can fetch and hold. The prefetch_value is also used as the capacity for any new Receiver objects.

Currently, this value is hard coded to 1.

Parameters:

prefetch_count (int) – Not used. This method is hard-coded to 1.

basic_reject(delivery_tag, requeue=False)

Reject a message by delivery_tag.

Rejects a message that has been received by the Channel, but not yet acknowledged. Messages are referenced by their delivery_tag.

If requeue is False, the rejected message will be dropped by the broker and not delivered to any other consumers. If requeue is True, then the rejected message will be requeued for delivery to another consumer, potentially to the same consumer who rejected the message previously.

Parameters:

delivery_tag (uuid.UUID) – The delivery tag associated with the message to be rejected.

Keyword Arguments:

requeue – If False, the rejected message will be dropped by the broker and not delivered to any other consumers. If True, then the rejected message will be requeued for delivery to another consumer, potentially to the same consumer who rejected the message previously.

body_encoding = 'base64'

Default body encoding. NOTE: transport_options['body_encoding'] will override this value.

close()

Cancel all associated messages and close the Channel.

This cancels all consumers by calling basic_cancel() for each known consumer_tag. It also closes the self._broker sessions. Closing the sessions implicitly causes all outstanding, un-ACKed messages to be considered undelivered by the broker.

codecs = {'base64': <kombu.transport.virtual.base.Base64 object>}

Binary <-> ASCII codecs.

decode_body(body, encoding=None)

Decode a body using an optionally specified encoding.

The encoding can be specified by name, and is looked up in self.codecs. self.codecs uses strings as its keys which specify the name of the encoding, and then the value is an instantiated object that can provide encoding/decoding of that type through encode and decode methods.

Parameters:

body (str) – The body to be encoded.

Keyword Arguments:

encoding – The encoding type to be used. Must be a supported codec listed in self.codecs.

Returns:

If encoding is specified, the decoded body is returned. If encoding is not specified, the body is returned unchanged.

Return type:

str

encode_body(body, encoding=None)

Encode a body using an optionally specified encoding.

The encoding can be specified by name, and is looked up in self.codecs. self.codecs uses strings as its keys which specify the name of the encoding, and then the value is an instantiated object that can provide encoding/decoding of that type through encode and decode methods.

Parameters:

body (str) – The body to be encoded.

Keyword Arguments:

encoding – The encoding type to be used. Must be a supported codec listed in self.codecs.

Returns:

If encoding is specified, return a tuple with the first position being the encoded body, and the second position the encoding used. If encoding is not specified, the body is passed through unchanged.

Return type:

tuple

exchange_declare(exchange='', type='direct', durable=False, **kwargs)

Create a new exchange.

Create an exchange of a specific type, and optionally have the exchange be durable. If an exchange of the requested name already exists, no action is taken and no exceptions are raised. Durable exchanges will survive a broker restart, non-durable exchanges will not.

Exchanges provide behaviors based on their type. The expected behaviors are those defined in the AMQP 0-10 and prior specifications including ‘direct’, ‘topic’, and ‘fanout’ functionality.

Keyword Arguments:
  • type – The exchange type. Valid values include ‘direct’, ‘topic’, and ‘fanout’.

  • exchange – The name of the exchange to be created. If no exchange is specified, then a blank string will be used as the name.

  • durable – True if the exchange should be durable, or False otherwise.

exchange_delete(exchange_name, **kwargs)

Delete an exchange specified by name.

Parameters:

exchange_name (str) – The name of the exchange to be deleted.

prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)

Prepare message data for sending.

This message is typically called by kombu.messaging.Producer._publish() as a preparation step in message publication.

Parameters:

body (str) – The body of the message

Keyword Arguments:
  • priority – A number between 0 and 9 that sets the priority of the message.

  • content_type – The content_type the message body should be treated as. If this is unset, the qpid.messaging.endpoints.Sender object tries to autodetect the content_type from the body.

  • content_encoding – The content_encoding the message body is encoded as.

  • headers – Additional Message headers that should be set. Passed in as a key-value pair.

  • properties – Message properties to be set on the message.

Returns:

Returns a dict object that encapsulates message attributes. See parameters for more details on attributes that can be set.

Return type:

dict

property qos

QoS manager for this channel.

Lazily instantiates an object of type QoS upon access to the self.qos attribute.

Returns:

An already existing, or newly created QoS object

Return type:

QoS

queue_bind(queue, exchange, routing_key, **kwargs)

Bind a queue to an exchange with a bind key.

Bind a queue specified by name, to an exchange specified by name, with a specific bind key. The queue and exchange must already exist on the broker for the bind to complete successfully. Queues may be bound to exchanges multiple times with different keys.

Parameters:
  • queue (str) – The name of the queue to be bound.

  • exchange (str) – The name of the exchange that the queue should be bound to.

  • routing_key (str) – The bind key that the specified queue should bind to the specified exchange with.

queue_declare(queue, passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None)

Create a new queue specified by name.

If the queue already exists, no change is made to the queue, and the return value returns information about the existing queue.

The queue name is required and specified as the first argument.

If passive is True, the server will not create the queue. The client can use this to check whether a queue exists without modifying the server state. Default is False.

If durable is True, the queue will be durable. Durable queues remain active when a server restarts. Non-durable queues ( transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue. Default is False.

If exclusive is True, the queue will be exclusive. Exclusive queues may only be consumed by the current connection. Setting the ‘exclusive’ flag always implies ‘auto-delete’. Default is False.

If auto_delete is True, the queue is deleted when all consumers have finished using it. The last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won’t be deleted. Default is True.

The nowait parameter is unused. It was part of the 0-9-1 protocol, but this AMQP client implements 0-10 which removed the nowait option.

The arguments parameter is a set of arguments for the declaration of the queue. Arguments are passed as a dict or None. This field is ignored if passive is True. Default is None.

This method returns a namedtuple with the name ‘queue_declare_ok_t’ and the queue name as ‘queue’, message count on the queue as ‘message_count’, and the number of active consumers as ‘consumer_count’. The named tuple values are ordered as queue, message_count, and consumer_count respectively.

Due to Celery’s non-ACKing of events, a ring policy is set on any queue that starts with the string ‘celeryev’ or ends with the string ‘pidbox’. These are celery event queues, and Celery does not ack them, causing the messages to build-up. Eventually Qpid stops serving messages unless the ‘ring’ policy is set, at which point the buffer backing the queue becomes circular.

Parameters:
  • queue (str) – The name of the queue to be created.

  • passive (bool) – If True, the sever will not create the queue.

  • durable (bool) – If True, the queue will be durable.

  • exclusive (bool) – If True, the queue will be exclusive.

  • auto_delete (bool) – If True, the queue is deleted when all consumers have finished using it.

  • nowait (bool) – This parameter is unused since the 0-10 specification does not include it.

  • arguments (dict or None) – A set of arguments for the declaration of the queue.

Returns:

A named tuple representing the declared queue as a named tuple. The tuple values are ordered as queue, message count, and the active consumer count.

Return type:

namedtuple

queue_delete(queue, if_unused=False, if_empty=False, **kwargs)

Delete a queue by name.

Delete a queue specified by name. Using the if_unused keyword argument, the delete can only occur if there are 0 consumers bound to it. Using the if_empty keyword argument, the delete can only occur if there are 0 messages in the queue.

Parameters:

queue (str) – The name of the queue to be deleted.

Keyword Arguments:
  • if_unused – If True, delete only if the queue has 0 consumers. If False, delete a queue even with consumers bound to it.

  • if_empty – If True, only delete the queue if it is empty. If False, delete the queue if it is empty or not.

queue_purge(queue, **kwargs)

Remove all undelivered messages from queue.

Purge all undelivered messages from a queue specified by name. If the queue does not exist an exception is raised. The queue message depth is first checked, and then the broker is asked to purge that number of messages. The integer number of messages requested to be purged is returned. The actual number of messages purged may be different than the requested number of messages to purge.

Sometimes delivered messages are asked to be purged, but are not. This case fails silently, which is the correct behavior when a message that has been delivered to a different consumer, who has not ACKed the message, and still has an active session with the broker. Messages in that case are not safe for purging and will be retained by the broker. The client is unable to change this delivery behavior.

Internally, this method relies on _purge().

Parameters:

queue (str) – The name of the queue which should have all messages removed.

Returns:

The number of messages requested to be purged.

Return type:

int

Raises:

qpid.messaging.exceptions.NotFound if the queue being purged cannot be found.

queue_unbind(queue, exchange, routing_key, **kwargs)

Unbind a queue from an exchange with a given bind key.

Unbind a queue specified by name, from an exchange specified by name, that is already bound with a bind key. The queue and exchange must already exist on the broker, and bound with the bind key for the operation to complete successfully. Queues may be bound to exchanges multiple times with different keys, thus the bind key is a required field to unbind in an explicit way.

Parameters:
  • queue (str) – The name of the queue to be unbound.

  • exchange (str) – The name of the exchange that the queue should be unbound from.

  • routing_key (str) – The existing bind key between the specified queue and a specified exchange that should be unbound.

typeof(exchange, default='direct')

Get the exchange type.

Lookup and return the exchange type for an exchange specified by name. Exchange types are expected to be ‘direct’, ‘topic’, and ‘fanout’, which correspond with exchange functionality as specified in AMQP 0-10 and earlier. If the exchange cannot be found, the default exchange type is returned.

Parameters:

exchange (str) – The exchange to have its type lookup up.

Keyword Arguments:

default – The type of exchange to assume if the exchange does not exist.

Returns:

The exchange type either ‘direct’, ‘topic’, or ‘fanout’.

Return type:

str

close()[source]

Close the connection.

Closing the connection will close all associated session, senders, or receivers used by the Connection.

close_channel(channel)[source]

Close a Channel.

Close a channel specified by a reference to the Channel object.

Parameters:

channel (Channel.) – Channel that should be closed.

get_qpid_connection()[source]

Return the existing connection (singleton).

Returns:

The existing qpid.messaging.Connection

Return type:

qpid.messaging.endpoints.Connection

Channel

class kombu.transport.qpid.Channel(connection, transport)[source]

Supports broker configuration and messaging send and receive.

Parameters:

A channel object is designed to have method-parity with a Channel as defined in AMQP 0-10 and earlier, which allows for the following broker actions:

  • exchange declare and delete

  • queue declare and delete

  • queue bind and unbind operations

  • queue length and purge operations

  • sending/receiving/rejecting messages

  • structuring, encoding, and decoding messages

  • supports synchronous and asynchronous reads

  • reading state about the exchange, queues, and bindings

Channels are designed to all share a single TCP connection with a broker, but provide a level of isolated communication with the broker while benefiting from a shared TCP connection. The Channel is given its Connection object by the Transport that instantiates the channel.

This channel inherits from StdChannel, which makes this a ‘native’ channel versus a ‘virtual’ channel which would inherit from kombu.transports.virtual.

Messages sent using this channel are assigned a delivery_tag. The delivery_tag is generated for a message as they are prepared for sending by basic_publish(). The delivery_tag is unique per channel instance. The delivery_tag has no meaningful context in other objects, and is only maintained in the memory of this object, and the underlying QoS object that provides support.

Each channel object instantiates exactly one QoS object for prefetch limiting, and asynchronous ACKing. The QoS object is lazily instantiated through a property method qos(). The QoS object is a supporting object that should not be accessed directly except by the channel itself.

Synchronous reads on a queue are done using a call to basic_get() which uses _get() to perform the reading. These methods read immediately and do not accept any form of timeout. basic_get() reads synchronously and ACKs messages before returning them. ACKing is done in all cases, because an application that reads messages using qpid.messaging, but does not ACK them will experience a memory leak. The no_ack argument to basic_get() does not affect ACKing functionality.

Asynchronous reads on a queue are done by starting a consumer using basic_consume(). Each call to basic_consume() will cause a Receiver to be created on the Session started by the :class: Transport. The receiver will asynchronously read using qpid.messaging, and prefetch messages before the call to Transport.basic_drain() occurs. The prefetch_count value of the QoS object is the capacity value of the new receiver. The new receiver capacity must always be at least 1, otherwise none of the receivers will appear to be ready for reading, and will never be read from.

Each call to basic_consume() creates a consumer, which is given a consumer tag that is identified by the caller of basic_consume(). Already started consumers can be cancelled using by their consumer_tag using basic_cancel(). Cancellation of a consumer causes the Receiver object to be closed.

Asynchronous message ACKing is supported through basic_ack(), and is referenced by delivery_tag. The Channel object uses its QoS object to perform the message ACKing.

class Message(payload, channel=None, **kwargs)

message class used.

accept
body
channel
content_encoding
content_type
delivery_info
delivery_tag
headers
properties
serializable()
class QoS(session, prefetch_count=1)

A class reference that will be instantiated using the qos property.

ack(delivery_tag)

Acknowledge a message by delivery_tag.

Called asynchronously once the message has been handled and can be forgotten by the broker.

Parameters:

delivery_tag (uuid.UUID) – the delivery tag associated with the message to be acknowledged.

append(message, delivery_tag)

Append message to the list of un-ACKed messages.

Add a message, referenced by the delivery_tag, for ACKing, rejecting, or getting later. Messages are saved into a dict by delivery_tag.

Parameters:
  • message (qpid.messaging.Message) – A received message that has not yet been ACKed.

  • delivery_tag (uuid.UUID) – A UUID to refer to this message by upon receipt.

can_consume()

Return True if the Channel can consume more messages.

Used to ensure the client adheres to currently active prefetch limits.

Returns:

True, if this QoS object can accept more messages without violating the prefetch_count. If prefetch_count is 0, can_consume will always return True.

Return type:

bool

can_consume_max_estimate()

Return the remaining message capacity.

Returns an estimated number of outstanding messages that a kombu.transport.qpid.Channel can accept without exceeding prefetch_count. If prefetch_count is 0, then this method returns 1.

Returns:

The number of estimated messages that can be fetched without violating the prefetch_count.

Return type:

int

get(delivery_tag)

Get an un-ACKed message by delivery_tag.

If called with an invalid delivery_tag a KeyError is raised.

Parameters:

delivery_tag (uuid.UUID) – The delivery tag associated with the message to be returned.

Returns:

An un-ACKed message that is looked up by delivery_tag.

Return type:

qpid.messaging.Message

reject(delivery_tag, requeue=False)

Reject a message by delivery_tag.

Explicitly notify the broker that the channel associated with this QoS object is rejecting the message that was previously delivered.

If requeue is False, then the message is not requeued for delivery to another consumer. If requeue is True, then the message is requeued for delivery to another consumer.

Parameters:

delivery_tag (uuid.UUID) – The delivery tag associated with the message to be rejected.

Keyword Arguments:

requeue – If True, the broker will be notified to requeue the message. If False, the broker will be told to drop the message entirely. In both cases, the message will be removed from this object.

basic_ack(delivery_tag, multiple=False)[source]

Acknowledge a message by delivery_tag.

Acknowledges a message referenced by delivery_tag. Messages can only be ACKed using basic_ack() if they were acquired using basic_consume(). This is the ACKing portion of the asynchronous read behavior.

Internally, this method uses the QoS object, which stores messages and is responsible for the ACKing.

Parameters:
  • delivery_tag (uuid.UUID) – The delivery tag associated with the message to be acknowledged.

  • multiple (bool) – not implemented. If set to True an AssertionError is raised.

basic_cancel(consumer_tag)[source]

Cancel consumer by consumer tag.

Request the consumer stops reading messages from its queue. The consumer is a Receiver, and it is closed using close().

This method also cleans up all lingering references of the consumer.

Parameters:

consumer_tag (an immutable object) – The tag which refers to the consumer to be cancelled. Originally specified when the consumer was created as a parameter to basic_consume().

basic_consume(queue, no_ack, callback, consumer_tag, **kwargs)[source]

Start an asynchronous consumer that reads from a queue.

This method starts a consumer of type Receiver using the Session created and referenced by the Transport that reads messages from a queue specified by name until stopped by a call to basic_cancel().

Messages are available later through a synchronous call to Transport.drain_events(), which will drain from the consumer started by this method. Transport.drain_events() is synchronous, but the receiving of messages over the network occurs asynchronously, so it should still perform well. Transport.drain_events() calls the callback provided here with the Message of type self.Message.

Each consumer is referenced by a consumer_tag, which is provided by the caller of this method.

This method sets up the callback onto the self.connection object in a dict keyed by queue name. drain_events() is responsible for calling that callback upon message receipt.

All messages that are received are added to the QoS object to be saved for asynchronous ACKing later after the message has been handled by the caller of drain_events(). Messages can be ACKed after being received through a call to basic_ack().

If no_ack is True, The no_ack flag indicates that the receiver of the message will not call basic_ack() later. Since the message will not be ACKed later, it is ACKed immediately.

basic_consume() transforms the message object type prior to calling the callback. Initially the message comes in as a qpid.messaging.Message. This method unpacks the payload of the qpid.messaging.Message and creates a new object of type self.Message.

This method wraps the user delivered callback in a runtime-built function which provides the type transformation from qpid.messaging.Message to Message, and adds the message to the associated QoS object for asynchronous ACKing if necessary.

Parameters:
  • queue (str) – The name of the queue to consume messages from

  • no_ack (bool) – If True, then messages will not be saved for ACKing later, but will be ACKed immediately. If False, then messages will be saved for ACKing later with a call to basic_ack().

  • callback (a callable object) – a callable that will be called when messages arrive on the queue.

  • consumer_tag (an immutable object) – a tag to reference the created consumer by. This consumer_tag is needed to cancel the consumer.

basic_get(queue, no_ack=False, **kwargs)[source]

Non-blocking single message get and ACK from a queue by name.

Internally this method uses _get() to fetch the message. If an Empty exception is raised by _get(), this method silences it and returns None. If _get() does return a message, that message is ACKed. The no_ack parameter has no effect on ACKing behavior, and all messages are ACKed in all cases. This method never adds fetched Messages to the internal QoS object for asynchronous ACKing.

This method converts the object type of the method as it passes through. Fetching from the broker, _get() returns a qpid.messaging.Message, but this method takes the payload of the qpid.messaging.Message and instantiates a Message object with the payload based on the class setting of self.Message.

Parameters:

queue (str) – The queue name to fetch a message from.

Keyword Arguments:

no_ack – The no_ack parameter has no effect on the ACK behavior of this method. Un-ACKed messages create a memory leak in qpid.messaging, and need to be ACKed in all cases.

Returns:

The received message.

Return type:

Message

basic_publish(message, exchange, routing_key, **kwargs)[source]

Publish message onto an exchange using a routing key.

Publish a message onto an exchange specified by name using a routing key specified by routing_key. Prepares the message in the following ways before sending:

  • encodes the body using encode_body()

  • wraps the body as a buffer object, so that

    qpid.messaging.endpoints.Sender uses a content type that can support arbitrarily large messages.

  • sets delivery_tag to a random uuid.UUID

  • sets the exchange and routing_key info as delivery_info

Internally uses _put() to send the message synchronously. This message is typically called by kombu.messaging.Producer._publish as the final step in message publication.

Parameters:
  • message (dict) – A dict containing key value pairs with the message data. A valid message dict can be generated using the prepare_message() method.

  • exchange (str) – The name of the exchange to submit this message onto.

  • routing_key (str) – The routing key to be used as the message is submitted onto the exchange.

basic_qos(prefetch_count, *args)[source]

Change QoS settings for this Channel.

Set the number of un-acknowledged messages this Channel can fetch and hold. The prefetch_value is also used as the capacity for any new Receiver objects.

Currently, this value is hard coded to 1.

Parameters:

prefetch_count (int) – Not used. This method is hard-coded to 1.

basic_reject(delivery_tag, requeue=False)[source]

Reject a message by delivery_tag.

Rejects a message that has been received by the Channel, but not yet acknowledged. Messages are referenced by their delivery_tag.

If requeue is False, the rejected message will be dropped by the broker and not delivered to any other consumers. If requeue is True, then the rejected message will be requeued for delivery to another consumer, potentially to the same consumer who rejected the message previously.

Parameters:

delivery_tag (uuid.UUID) – The delivery tag associated with the message to be rejected.

Keyword Arguments:

requeue – If False, the rejected message will be dropped by the broker and not delivered to any other consumers. If True, then the rejected message will be requeued for delivery to another consumer, potentially to the same consumer who rejected the message previously.

body_encoding = 'base64'

Default body encoding. NOTE: transport_options['body_encoding'] will override this value.

close()[source]

Cancel all associated messages and close the Channel.

This cancels all consumers by calling basic_cancel() for each known consumer_tag. It also closes the self._broker sessions. Closing the sessions implicitly causes all outstanding, un-ACKed messages to be considered undelivered by the broker.

codecs = {'base64': <kombu.transport.virtual.base.Base64 object>}

Binary <-> ASCII codecs.

decode_body(body, encoding=None)[source]

Decode a body using an optionally specified encoding.

The encoding can be specified by name, and is looked up in self.codecs. self.codecs uses strings as its keys which specify the name of the encoding, and then the value is an instantiated object that can provide encoding/decoding of that type through encode and decode methods.

Parameters:

body (str) – The body to be encoded.

Keyword Arguments:

encoding – The encoding type to be used. Must be a supported codec listed in self.codecs.

Returns:

If encoding is specified, the decoded body is returned. If encoding is not specified, the body is returned unchanged.

Return type:

str

encode_body(body, encoding=None)[source]

Encode a body using an optionally specified encoding.

The encoding can be specified by name, and is looked up in self.codecs. self.codecs uses strings as its keys which specify the name of the encoding, and then the value is an instantiated object that can provide encoding/decoding of that type through encode and decode methods.

Parameters:

body (str) – The body to be encoded.

Keyword Arguments:

encoding – The encoding type to be used. Must be a supported codec listed in self.codecs.

Returns:

If encoding is specified, return a tuple with the first position being the encoded body, and the second position the encoding used. If encoding is not specified, the body is passed through unchanged.

Return type:

tuple

exchange_declare(exchange='', type='direct', durable=False, **kwargs)[source]

Create a new exchange.

Create an exchange of a specific type, and optionally have the exchange be durable. If an exchange of the requested name already exists, no action is taken and no exceptions are raised. Durable exchanges will survive a broker restart, non-durable exchanges will not.

Exchanges provide behaviors based on their type. The expected behaviors are those defined in the AMQP 0-10 and prior specifications including ‘direct’, ‘topic’, and ‘fanout’ functionality.

Keyword Arguments:
  • type – The exchange type. Valid values include ‘direct’, ‘topic’, and ‘fanout’.

  • exchange – The name of the exchange to be created. If no exchange is specified, then a blank string will be used as the name.

  • durable – True if the exchange should be durable, or False otherwise.

exchange_delete(exchange_name, **kwargs)[source]

Delete an exchange specified by name.

Parameters:

exchange_name (str) – The name of the exchange to be deleted.

prepare_message(body, priority=None, content_type=None, content_encoding=None, headers=None, properties=None)[source]

Prepare message data for sending.

This message is typically called by kombu.messaging.Producer._publish() as a preparation step in message publication.

Parameters:

body (str) – The body of the message

Keyword Arguments:
  • priority – A number between 0 and 9 that sets the priority of the message.

  • content_type – The content_type the message body should be treated as. If this is unset, the qpid.messaging.endpoints.Sender object tries to autodetect the content_type from the body.

  • content_encoding – The content_encoding the message body is encoded as.

  • headers – Additional Message headers that should be set. Passed in as a key-value pair.

  • properties – Message properties to be set on the message.

Returns:

Returns a dict object that encapsulates message attributes. See parameters for more details on attributes that can be set.

Return type:

dict

property qos

QoS manager for this channel.

Lazily instantiates an object of type QoS upon access to the self.qos attribute.

Returns:

An already existing, or newly created QoS object

Return type:

QoS

queue_bind(queue, exchange, routing_key, **kwargs)[source]

Bind a queue to an exchange with a bind key.

Bind a queue specified by name, to an exchange specified by name, with a specific bind key. The queue and exchange must already exist on the broker for the bind to complete successfully. Queues may be bound to exchanges multiple times with different keys.

Parameters:
  • queue (str) – The name of the queue to be bound.

  • exchange (str) – The name of the exchange that the queue should be bound to.

  • routing_key (str) – The bind key that the specified queue should bind to the specified exchange with.

queue_declare(queue, passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None)[source]

Create a new queue specified by name.

If the queue already exists, no change is made to the queue, and the return value returns information about the existing queue.

The queue name is required and specified as the first argument.

If passive is True, the server will not create the queue. The client can use this to check whether a queue exists without modifying the server state. Default is False.

If durable is True, the queue will be durable. Durable queues remain active when a server restarts. Non-durable queues ( transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue. Default is False.

If exclusive is True, the queue will be exclusive. Exclusive queues may only be consumed by the current connection. Setting the ‘exclusive’ flag always implies ‘auto-delete’. Default is False.

If auto_delete is True, the queue is deleted when all consumers have finished using it. The last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won’t be deleted. Default is True.

The nowait parameter is unused. It was part of the 0-9-1 protocol, but this AMQP client implements 0-10 which removed the nowait option.

The arguments parameter is a set of arguments for the declaration of the queue. Arguments are passed as a dict or None. This field is ignored if passive is True. Default is None.

This method returns a namedtuple with the name ‘queue_declare_ok_t’ and the queue name as ‘queue’, message count on the queue as ‘message_count’, and the number of active consumers as ‘consumer_count’. The named tuple values are ordered as queue, message_count, and consumer_count respectively.

Due to Celery’s non-ACKing of events, a ring policy is set on any queue that starts with the string ‘celeryev’ or ends with the string ‘pidbox’. These are celery event queues, and Celery does not ack them, causing the messages to build-up. Eventually Qpid stops serving messages unless the ‘ring’ policy is set, at which point the buffer backing the queue becomes circular.

Parameters:
  • queue (str) – The name of the queue to be created.

  • passive (bool) – If True, the sever will not create the queue.

  • durable (bool) – If True, the queue will be durable.

  • exclusive (bool) – If True, the queue will be exclusive.

  • auto_delete (bool) – If True, the queue is deleted when all consumers have finished using it.

  • nowait (bool) – This parameter is unused since the 0-10 specification does not include it.

  • arguments (dict or None) – A set of arguments for the declaration of the queue.

Returns:

A named tuple representing the declared queue as a named tuple. The tuple values are ordered as queue, message count, and the active consumer count.

Return type:

namedtuple

queue_delete(queue, if_unused=False, if_empty=False, **kwargs)[source]

Delete a queue by name.

Delete a queue specified by name. Using the if_unused keyword argument, the delete can only occur if there are 0 consumers bound to it. Using the if_empty keyword argument, the delete can only occur if there are 0 messages in the queue.

Parameters:

queue (str) – The name of the queue to be deleted.

Keyword Arguments:
  • if_unused – If True, delete only if the queue has 0 consumers. If False, delete a queue even with consumers bound to it.

  • if_empty – If True, only delete the queue if it is empty. If False, delete the queue if it is empty or not.

queue_purge(queue, **kwargs)[source]

Remove all undelivered messages from queue.

Purge all undelivered messages from a queue specified by name. If the queue does not exist an exception is raised. The queue message depth is first checked, and then the broker is asked to purge that number of messages. The integer number of messages requested to be purged is returned. The actual number of messages purged may be different than the requested number of messages to purge.

Sometimes delivered messages are asked to be purged, but are not. This case fails silently, which is the correct behavior when a message that has been delivered to a different consumer, who has not ACKed the message, and still has an active session with the broker. Messages in that case are not safe for purging and will be retained by the broker. The client is unable to change this delivery behavior.

Internally, this method relies on _purge().

Parameters:

queue (str) – The name of the queue which should have all messages removed.

Returns:

The number of messages requested to be purged.

Return type:

int

Raises:

qpid.messaging.exceptions.NotFound if the queue being purged cannot be found.

queue_unbind(queue, exchange, routing_key, **kwargs)[source]

Unbind a queue from an exchange with a given bind key.

Unbind a queue specified by name, from an exchange specified by name, that is already bound with a bind key. The queue and exchange must already exist on the broker, and bound with the bind key for the operation to complete successfully. Queues may be bound to exchanges multiple times with different keys, thus the bind key is a required field to unbind in an explicit way.

Parameters:
  • queue (str) – The name of the queue to be unbound.

  • exchange (str) – The name of the exchange that the queue should be unbound from.

  • routing_key (str) – The existing bind key between the specified queue and a specified exchange that should be unbound.

typeof(exchange, default='direct')[source]

Get the exchange type.

Lookup and return the exchange type for an exchange specified by name. Exchange types are expected to be ‘direct’, ‘topic’, and ‘fanout’, which correspond with exchange functionality as specified in AMQP 0-10 and earlier. If the exchange cannot be found, the default exchange type is returned.

Parameters:

exchange (str) – The exchange to have its type lookup up.

Keyword Arguments:

default – The type of exchange to assume if the exchange does not exist.

Returns:

The exchange type either ‘direct’, ‘topic’, or ‘fanout’.

Return type:

str

Message

class kombu.transport.qpid.Message(payload, channel=None, **kwargs)[source]

Message object.

accept
body
channel
content_encoding
content_type
delivery_info
delivery_tag
headers
properties
serializable()[source]