This document is for py-amqp's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

amqp - Python AMQP low-level client library

Version

5.1.0

Web

https://amqp.readthedocs.io/

Download

https://pypi.org/project/amqp/

Source

http://github.com/celery/py-amqp/

Keywords

amqp, rabbitmq

About

This is a fork of amqplib which was originally written by Barry Pederson. It is maintained by the Celery project, and used by kombu as a pure python alternative when librabbitmq is not available.

This library should be API compatible with librabbitmq.

Differences from amqplib

  • Supports draining events from multiple channels (Connection.drain_events)

  • Support for timeouts

  • Channels are restored after channel error, instead of having to close the connection.

  • Support for heartbeats

    • Connection.heartbeat_tick(rate=2) must called at regular intervals (half of the heartbeat value if rate is 2).

    • Or some other scheme by using Connection.send_heartbeat.

  • Supports RabbitMQ extensions:
    • Consumer Cancel Notifications
      • by default a cancel results in ChannelError being raised

      • but not if a on_cancel callback is passed to basic_consume.

    • Publisher confirms
      • Channel.confirm_select() enables publisher confirms.

      • Channel.events['basic_ack'].append(my_callback) adds a callback to be called when a message is confirmed. This callback is then called with the signature (delivery_tag, multiple).

    • Exchange-to-exchange bindings: exchange_bind / exchange_unbind.
      • Channel.confirm_select() enables publisher confirms.

      • Channel.events['basic_ack'].append(my_callback) adds a callback to be called when a message is confirmed. This callback is then called with the signature (delivery_tag, multiple).

  • Support for basic_return

  • Uses AMQP 0-9-1 instead of 0-8.
    • Channel.access_request and ticket arguments to methods removed.

    • Supports the arguments argument to basic_consume.

    • internal argument to exchange_declare removed.

    • auto_delete argument to exchange_declare deprecated

    • insist argument to Connection removed.

    • Channel.alerts has been removed.

    • Support for Channel.basic_recover_async.

    • Channel.basic_recover deprecated.

  • Exceptions renamed to have idiomatic names:
    • AMQPException -> AMQPError

    • AMQPConnectionException -> ConnectionError``

    • AMQPChannelException -> ChannelError``

    • Connection.known_hosts removed.

    • Connection no longer supports redirects.

    • exchange argument to queue_bind can now be empty to use the “default exchange”.

  • Adds Connection.is_alive that tries to detect whether the connection can still be used.

  • Adds Connection.connection_errors and .channel_errors, a list of recoverable errors.

  • Exposes the underlying socket as Connection.sock.

  • Adds Channel.no_ack_consumers to keep track of consumer tags that set the no_ack flag.

  • Slightly better at error recovery

Further

Contents

API Reference

Release

5.1

Date

Mar 06, 2022

amqp.connection

AMQP Connections.

class amqp.connection.Connection(host='localhost:5672', userid='guest', password='guest', login_method=None, login_response=None, authentication=(), virtual_host='/', locale='en_US', client_properties=None, ssl=False, connect_timeout=None, channel_max=None, frame_max=None, heartbeat=0, on_open=None, on_blocked=None, on_unblocked=None, confirm_publish=False, on_tune_ok=None, read_timeout=None, write_timeout=None, socket_settings=None, frame_handler=<function frame_handler>, frame_writer=<function frame_writer>, **kwargs)[source]

AMQP Connection.

The connection class provides methods for a client to establish a network connection to a server, and for both peers to operate the connection thereafter.

GRAMMAR:

connection          = open-connection *use-connection close-connection
open-connection     = C:protocol-header
                      S:START C:START-OK
                      *challenge
                      S:TUNE C:TUNE-OK
                      C:OPEN S:OPEN-OK
challenge           = S:SECURE C:SECURE-OK
use-connection      = *channel
close-connection    = C:CLOSE S:CLOSE-OK
                    / S:CLOSE C:CLOSE-OK

Create a connection to the specified host, which should be a ‘host[:port]’, such as ‘localhost’, or ‘1.2.3.4:5672’ (defaults to ‘localhost’, if a port is not specified then 5672 is used)

Authentication can be controlled by passing one or more amqp.sasl.SASL instances as the authentication parameter, or setting the login_method string to one of the supported methods: ‘GSSAPI’, ‘EXTERNAL’, ‘AMQPLAIN’, or ‘PLAIN’. Otherwise authentication will be performed using any supported method preferred by the server. Userid and passwords apply to AMQPLAIN and PLAIN authentication, whereas on GSSAPI only userid will be used as the client name. For EXTERNAL authentication both userid and password are ignored.

The ‘ssl’ parameter may be simply True/False, or a dictionary of options to pass to ssl.SSLContext such as requiring certain certificates. For details, refer ssl parameter of SSLTransport.

The “socket_settings” parameter is a dictionary defining tcp settings which will be applied as socket options.

When “confirm_publish” is set to True, the channel is put to confirm mode. In this mode, each published message is confirmed using Publisher confirms RabbitMQ extension.

class Channel(connection, channel_id=None, auto_decode=True, on_open=None)

AMQP Channel.

The channel class provides methods for a client to establish a virtual connection - a channel - to a server and for both peers to operate the virtual connection thereafter.

GRAMMAR:

channel             = open-channel *use-channel close-channel
open-channel        = C:OPEN S:OPEN-OK
use-channel         = C:FLOW S:FLOW-OK
                    / S:FLOW C:FLOW-OK
                    / functional-class
close-channel       = C:CLOSE S:CLOSE-OK
                    / S:CLOSE C:CLOSE-OK

Create a channel bound to a connection and using the specified numeric channel_id, and open on the server.

The ‘auto_decode’ parameter (defaults to True), indicates whether the library should attempt to decode the body of Messages to a Unicode string if there’s a ‘content_encoding’ property for the message. If there’s no ‘content_encoding’ property, or the decode raises an Exception, the message body is left as plain bytes.

auto_decode
basic_ack(delivery_tag, multiple=False, argsig='Lb')

Acknowledge one or more messages.

This method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. The client can ask to confirm a single message or a set of messages up to and including a specific message.

PARAMETERS:

delivery_tag: longlong

server-assigned delivery tag

The server-assigned and channel-specific delivery tag

RULE:

The delivery tag is valid only within the channel from which the message was received. I.e. a client MUST NOT receive a message on one channel and then acknowledge it on another.

RULE:

The server MUST NOT use a zero value for delivery tags. Zero is reserved for client use, meaning “all messages so far received”.

multiple: boolean

acknowledge multiple messages

If set to True, the delivery tag is treated as “up to and including”, so that the client can acknowledge multiple messages with a single method. If set to False, the delivery tag refers to a single message. If the multiple field is True, and the delivery tag is zero, tells the server to acknowledge all outstanding messages.

RULE:

The server MUST validate that a non-zero delivery- tag refers to an delivered message, and raise a channel exception if this is not the case.

basic_cancel(consumer_tag, nowait=False, argsig='sb')

End a queue consumer.

This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply.

RULE:

If the queue no longer exists when the client sends a cancel command, or the consumer has been cancelled for other reasons, this command has no effect.

PARAMETERS:

consumer_tag: shortstr

consumer tag

Identifier for the consumer, valid within the current connection.

RULE:

The consumer tag is valid only within the channel from which the consumer was created. I.e. a client MUST NOT create a consumer in one channel and then use it in another.

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

basic_consume(queue='', consumer_tag='', no_local=False, no_ack=False, exclusive=False, nowait=False, callback=None, arguments=None, on_cancel=None, argsig='BssbbbbF')

Start a queue consumer.

This method asks the server to start a “consumer”, which is a transient request for messages from a specific queue. Consumers last as long as the channel they were created on, or until the client cancels them.

RULE:

The server SHOULD support at least 16 consumers per queue, unless the queue was declared as private, and ideally, impose no limit except as defined by available resources.

PARAMETERS:

queue: shortstr

Specifies the name of the queue to consume from. If the queue name is null, refers to the current queue for the channel, which is the last declared queue.

RULE:

If the client did not previously declare a queue, and the queue name in this method is empty, the server MUST raise a connection exception with reply code 530 (not allowed).

consumer_tag: shortstr

Specifies the identifier for the consumer. The consumer tag is local to a connection, so two clients can use the same consumer tags. If this field is empty the server will generate a unique tag.

RULE:

The tag MUST NOT refer to an existing consumer. If the client attempts to create two consumers with the same non-empty tag the server MUST raise a connection exception with reply code 530 (not allowed).

no_local: boolean

do not deliver own messages

If the no-local field is set the server will not send messages to the client that published them.

no_ack: boolean

no acknowledgment needed

If this field is set the server does not expect acknowledgments for messages. That is, when a message is delivered to the client the server automatically and silently acknowledges it on behalf of the client. This functionality increases performance but at the cost of reliability. Messages can get lost if a client dies before it can deliver them to the application.

exclusive: boolean

request exclusive access

Request exclusive consumer access, meaning only this consumer can access the queue.

RULE:

If the server cannot grant exclusive access to the queue when asked, - because there are other consumers active - it MUST raise a channel exception with return code 403 (access refused).

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

callback: Python callable

function/method called with each delivered message

For each message delivered by the broker, the callable will be called with a Message object as the single argument. If no callable is specified, messages are quietly discarded, no_ack should probably be set to True in that case.

basic_get(queue='', no_ack=False, argsig='Bsb')

Direct access to a queue.

This method provides a direct access to the messages in a queue using a synchronous dialogue that is designed for specific types of application where synchronous functionality is more important than performance.

PARAMETERS:

queue: shortstr

Specifies the name of the queue to consume from. If the queue name is null, refers to the current queue for the channel, which is the last declared queue.

RULE:

If the client did not previously declare a queue, and the queue name in this method is empty, the server MUST raise a connection exception with reply code 530 (not allowed).

no_ack: boolean

no acknowledgment needed

If this field is set the server does not expect acknowledgments for messages. That is, when a message is delivered to the client the server automatically and silently acknowledges it on behalf of the client. This functionality increases performance but at the cost of reliability. Messages can get lost if a client dies before it can deliver them to the application.

Non-blocking, returns a amqp.basic_message.Message object, or None if queue is empty.

basic_publish(msg, exchange='', routing_key='', mandatory=False, immediate=False, timeout=None, confirm_timeout=None, argsig='Bssbb')

Publish a message.

This method publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.

When channel is in confirm mode (when Connection parameter confirm_publish is set to True), each message is confirmed. When broker rejects published message (e.g. due internal broker constrains), MessageNacked exception is raised and set confirm_timeout to wait maximum confirm_timeout second for message to confirm.

PARAMETERS:

exchange: shortstr

Specifies the name of the exchange to publish to. The exchange name can be empty, meaning the default exchange. If the exchange name is specified, and that exchange does not exist, the server will raise a channel exception.

RULE:

The server MUST accept a blank exchange name to mean the default exchange.

RULE:

The exchange MAY refuse basic content in which case it MUST raise a channel exception with reply code 540 (not implemented).

routing_key: shortstr

Message routing key

Specifies the routing key for the message. The routing key is used for routing messages depending on the exchange configuration.

mandatory: boolean

indicate mandatory routing

This flag tells the server how to react if the message cannot be routed to a queue. If this flag is True, the server will return an unroutable message with a Return method. If this flag is False, the server silently drops the message.

RULE:

The server SHOULD implement the mandatory flag.

immediate: boolean

request immediate delivery

This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed.

RULE:

The server SHOULD implement the immediate flag.

timeout: short

timeout for publish

Set timeout to wait maximum timeout second for message to publish.

confirm_timeout: short

confirm_timeout for publish in confirm mode

When the channel is in confirm mode set confirm_timeout to wait maximum confirm_timeout second for message to confirm.

basic_publish_confirm(*args, **kwargs)
basic_qos(prefetch_size, prefetch_count, a_global, argsig='lBb')

Specify quality of service.

This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a qos method always depend on the content class semantics. Though the qos method could in principle apply to both peers, it is currently meaningful only for the server.

PARAMETERS:

prefetch_size: long

prefetch window in octets

The client can request that messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. Prefetching gives a performance improvement. This field specifies the prefetch window size in octets. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning “no specific limit”, although other prefetch limits may still apply. The prefetch-size is ignored if the no-ack option is set.

RULE:

The server MUST ignore this setting when the client is not processing any messages - i.e. the prefetch size does not limit the transfer of single messages to a client, only the sending in advance of more messages while the client still has one or more unacknowledged messages.

prefetch_count: short

prefetch window in messages

Specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it. The prefetch- count is ignored if the no-ack option is set.

RULE:

The server MAY send less data in advance than allowed by the client’s specified prefetch windows but it MUST NOT send more.

a_global: boolean

Defines a scope of QoS. Semantics of this parameter differs between AMQP 0-9-1 standard and RabbitMQ broker:

MEANING IN AMQP 0-9-1:

False: shared across all consumers on the channel True: shared across all consumers on the connection

MEANING IN RABBITMQ:
False: applied separately to each new consumer

on the channel

True: shared across all consumers on the channel

basic_recover(requeue=False)

Redeliver unacknowledged messages.

This method asks the broker to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method is only allowed on non-transacted channels.

RULE:

The server MUST set the redelivered flag on all messages that are resent.

RULE:

The server MUST raise a channel exception if this is called on a transacted channel.

PARAMETERS:

requeue: boolean

requeue the message

If this field is False, the message will be redelivered to the original recipient. If this field is True, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.

basic_recover_async(requeue=False)
basic_reject(delivery_tag, requeue, argsig='Lb')

Reject an incoming message.

This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.

RULE:

The server SHOULD be capable of accepting and process the Reject method while sending message content with a Deliver or Get-Ok method. I.e. the server should read and process incoming methods while sending output frames. To cancel a partially-send content, the server sends a content body frame of size 1 (i.e. with no data except the frame-end octet).

RULE:

The server SHOULD interpret this method as meaning that the client is unable to process the message at this time.

RULE:

A client MUST NOT use this method as a means of selecting messages to process. A rejected message MAY be discarded or dead-lettered, not necessarily passed to another client.

PARAMETERS:

delivery_tag: longlong

server-assigned delivery tag

The server-assigned and channel-specific delivery tag

RULE:

The delivery tag is valid only within the channel from which the message was received. I.e. a client MUST NOT receive a message on one channel and then acknowledge it on another.

RULE:

The server MUST NOT use a zero value for delivery tags. Zero is reserved for client use, meaning “all messages so far received”.

requeue: boolean

requeue the message

If this field is False, the message will be discarded. If this field is True, the server will attempt to requeue the message.

RULE:

The server MUST NOT deliver the message to the same client within the context of the current channel. The recommended strategy is to attempt to deliver the message to an alternative consumer, and if that is not possible, to move the message to a dead-letter queue. The server MAY use more sophisticated tracking to hold the message on the queue and redeliver it to the same client at a later stage.

channel_id
close(reply_code=0, reply_text='', method_sig=(0, 0), argsig='BsBB')

Request a channel close.

This method indicates that the sender wants to close the channel. This may be due to internal conditions (e.g. a forced shut-down) or due to an error handling a specific method, i.e. an exception. When a close is due to an exception, the sender provides the class and method id of the method which caused the exception.

RULE:

After sending this method any received method except Channel.Close-OK MUST be discarded.

RULE:

The peer sending this method MAY use a counter or timeout to detect failure of the other peer to respond correctly with Channel.Close-OK..

PARAMETERS:

reply_code: short

The reply code. The AMQ reply codes are defined in AMQ RFC 011.

reply_text: shortstr

The localised reply text. This text can be logged as an aid to resolving issues.

class_id: short

failing method class

When the close is provoked by a method exception, this is the class of the method.

method_id: short

failing method ID

When the close is provoked by a method exception, this is the ID of the method.

collect()

Tear down this object.

Best called after we’ve agreed to close with the server.

confirm_select(nowait=False)

Enable publisher confirms for this channel.

Note: This is an RabbitMQ extension.

Can now be used if the channel is in transactional mode.

Parameters

nowait – If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

connection
exchange_bind(destination, source='', routing_key='', nowait=False, arguments=None, argsig='BsssbF')

Bind an exchange to an exchange.

RULE:

A server MUST allow and ignore duplicate bindings - that is, two or more bind methods for a specific exchanges, with identical arguments - without treating these as an error.

RULE:

A server MUST allow cycles of exchange bindings to be created including allowing an exchange to be bound to itself.

RULE:

A server MUST not deliver the same message more than once to a destination exchange, even if the topology of exchanges and bindings results in multiple (even infinite) routes to that exchange.

PARAMETERS:

reserved-1: short

destination: shortstr

Specifies the name of the destination exchange to bind.

RULE:

A client MUST NOT be allowed to bind a non- existent destination exchange.

RULE:

The server MUST accept a blank exchange name to mean the default exchange.

source: shortstr

Specifies the name of the source exchange to bind.

RULE:

A client MUST NOT be allowed to bind a non- existent source exchange.

RULE:

The server MUST accept a blank exchange name to mean the default exchange.

routing-key: shortstr

Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration. Not all exchanges use a routing key - refer to the specific exchange documentation.

no-wait: bit

arguments: table

A set of arguments for the binding. The syntax and semantics of these arguments depends on the exchange class.

exchange_declare(exchange, type, passive=False, durable=False, auto_delete=True, nowait=False, arguments=None, argsig='BssbbbbbF')

Declare exchange, create if needed.

This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.

RULE:

The server SHOULD support a minimum of 16 exchanges per virtual host and ideally, impose no limit except as defined by available resources.

PARAMETERS:

exchange: shortstr

RULE:

Exchange names starting with “amq.” are reserved for predeclared and standardised exchanges. If the client attempts to create an exchange starting with “amq.”, the server MUST raise a channel exception with reply code 403 (access refused).

type: shortstr

exchange type

Each exchange belongs to one of a set of exchange types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. It is not valid or meaningful to attempt to change the type of an existing exchange.

RULE:

If the exchange already exists with a different type, the server MUST raise a connection exception with a reply code 507 (not allowed).

RULE:

If the server does not support the requested exchange type it MUST raise a connection exception with a reply code 503 (command invalid).

passive: boolean

do not create exchange

If set, the server will not create the exchange. The client can use this to check whether an exchange exists without modifying the server state.

RULE:

If set, and the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found).

durable: boolean

request a durable exchange

If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.

RULE:

The server MUST support both durable and transient exchanges.

RULE:

The server MUST ignore the durable field if the exchange already exists.

auto_delete: boolean

auto-delete when unused

If set, the exchange is deleted when all queues have finished using it.

RULE:

The server SHOULD allow for a reasonable delay between the point when it determines that an exchange is not being used (or no longer used), and the point when it deletes the exchange. At the least it must allow a client to create an exchange and then bind a queue to it, with a small but non-zero delay between these two actions.

RULE:

The server MUST ignore the auto-delete field if the exchange already exists.

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

arguments: table

arguments for declaration

A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation. This field is ignored if passive is True.

exchange_delete(exchange, if_unused=False, nowait=False, argsig='Bsbb')

Delete an exchange.

This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are cancelled.

PARAMETERS:

exchange: shortstr

RULE:

The exchange MUST exist. Attempting to delete a non-existing exchange causes a channel exception.

if_unused: boolean

delete only if unused

If set, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but raises a channel exception instead.

RULE:

If set, the server SHOULD delete the exchange but only if it has no queue bindings.

RULE:

If set, the server SHOULD raise a channel exception if the exchange is in use.

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

exchange_unbind(destination, source='', routing_key='', nowait=False, arguments=None, argsig='BsssbF')

Unbind an exchange from an exchange.

RULE:

If a unbind fails, the server MUST raise a connection exception.

PARAMETERS:

reserved-1: short

destination: shortstr

Specifies the name of the destination exchange to unbind.

RULE:

The client MUST NOT attempt to unbind an exchange that does not exist from an exchange.

RULE:

The server MUST accept a blank exchange name to mean the default exchange.

source: shortstr

Specifies the name of the source exchange to unbind.

RULE:

The client MUST NOT attempt to unbind an exchange from an exchange that does not exist.

RULE:

The server MUST accept a blank exchange name to mean the default exchange.

routing-key: shortstr

Specifies the routing key of the binding to unbind.

no-wait: bit

arguments: table

Specifies the arguments of the binding to unbind.

flow(active)

Enable/disable flow from peer.

This method asks the peer to pause or restart the flow of content data. This is a simple flow-control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. The peer that receives a request to stop sending content should finish sending the current content, if any, and then wait until it receives a Flow restart method.

RULE:

When a new channel is opened, it is active. Some applications assume that channels are inactive until started. To emulate this behaviour a client MAY open the channel, then pause it.

RULE:

When sending content data in multiple frames, a peer SHOULD monitor the channel for incoming methods and respond to a Channel.Flow as rapidly as possible.

RULE:

A peer MAY use the Channel.Flow method to throttle incoming content data for internal reasons, for example, when exchanging data over a slower connection.

RULE:

The peer that requests a Channel.Flow method MAY disconnect and/or ban a peer that does not respect the request.

PARAMETERS:

active: boolean

start/stop content frames

If True, the peer starts sending content frames. If False, the peer stops sending content frames.

is_closing
method_queue
open()

Open a channel for use.

This method opens a virtual connection (a channel).

RULE:

This method MUST NOT be called when the channel is already open.

PARAMETERS:

out_of_band: shortstr (DEPRECATED)

out-of-band settings

Configures out-of-band transfers on this channel. The syntax and meaning of this field will be formally defined at a later date.

queue_bind(queue, exchange='', routing_key='', nowait=False, arguments=None, argsig='BsssbF')

Bind queue to an exchange.

This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and-forward queues are bound to a dest exchange and subscription queues are bound to a dest_wild exchange.

RULE:

A server MUST allow ignore duplicate bindings - that is, two or more bind methods for a specific queue, with identical arguments - without treating these as an error.

RULE:

If a bind fails, the server MUST raise a connection exception.

RULE:

The server MUST NOT allow a durable queue to bind to a transient exchange. If the client attempts this the server MUST raise a channel exception.

RULE:

Bindings for durable queues are automatically durable and the server SHOULD restore such bindings after a server restart.

RULE:

The server SHOULD support at least 4 bindings per queue, and ideally, impose no limit except as defined by available resources.

PARAMETERS:

queue: shortstr

Specifies the name of the queue to bind. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue.

RULE:

If the client did not previously declare a queue, and the queue name in this method is empty, the server MUST raise a connection exception with reply code 530 (not allowed).

RULE:

If the queue does not exist the server MUST raise a channel exception with reply code 404 (not found).

exchange: shortstr

The name of the exchange to bind to.

RULE:

If the exchange does not exist the server MUST raise a channel exception with reply code 404 (not found).

routing_key: shortstr

message routing key

Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration. Not all exchanges use a routing key - refer to the specific exchange documentation. If the routing key is empty and the queue name is empty, the routing key will be the current queue for the channel, which is the last declared queue.

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

arguments: table

arguments for binding

A set of arguments for the binding. The syntax and semantics of these arguments depends on the exchange class.

queue_declare(queue='', passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None, argsig='BsbbbbbF')

Declare queue, create if needed.

This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.

RULE:

The server MUST create a default binding for a newly- created queue to the default exchange, which is an exchange of type ‘direct’.

RULE:

The server SHOULD support a minimum of 256 queues per virtual host and ideally, impose no limit except as defined by available resources.

PARAMETERS:

queue: shortstr

RULE:

The queue name MAY be empty, in which case the server MUST create a new queue with a unique generated name and return this to the client in the Declare-Ok method.

RULE:

Queue names starting with “amq.” are reserved for predeclared and standardised server queues. If the queue name starts with “amq.” and the passive option is False, the server MUST raise a connection exception with reply code 403 (access refused).

passive: boolean

do not create queue

If set, the server will not create the queue. The client can use this to check whether a queue exists without modifying the server state.

RULE:

If set, and the queue does not already exist, the server MUST respond with a reply code 404 (not found) and raise a channel exception.

durable: boolean

request a durable queue

If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue.

RULE:

The server MUST recreate the durable queue after a restart.

RULE:

The server MUST support both durable and transient queues.

RULE:

The server MUST ignore the durable field if the queue already exists.

exclusive: boolean

request an exclusive queue

Exclusive queues may only be consumed from by the current connection. Setting the ‘exclusive’ flag always implies ‘auto-delete’.

RULE:

The server MUST support both exclusive (private) and non-exclusive (shared) queues.

RULE:

The server MUST raise a channel exception if ‘exclusive’ is specified and the queue already exists and is owned by a different connection.

auto_delete: boolean

auto-delete queue when unused

If set, the queue is deleted when all consumers have finished using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won’t be deleted.

RULE:

The server SHOULD allow for a reasonable delay between the point when it determines that a queue is not being used (or no longer used), and the point when it deletes the queue. At the least it must allow a client to create a queue and then create a consumer to read from it, with a small but non-zero delay between these two actions. The server should equally allow for clients that may be disconnected prematurely, and wish to re- consume from the same queue without losing messages. We would recommend a configurable timeout, with a suitable default value being one minute.

RULE:

The server MUST ignore the auto-delete field if the queue already exists.

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

arguments: table

arguments for declaration

A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation. This field is ignored if passive is True.

Returns a tuple containing 3 items:

the name of the queue (essential for automatically-named queues), message count and consumer count

queue_delete(queue='', if_unused=False, if_empty=False, nowait=False, argsig='Bsbbb')

Delete a queue.

This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled.

RULE:

The server SHOULD use a dead-letter queue to hold messages that were pending on a deleted queue, and MAY provide facilities for a system administrator to move these messages back to an active queue.

PARAMETERS:

queue: shortstr

Specifies the name of the queue to delete. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue.

RULE:

If the client did not previously declare a queue, and the queue name in this method is empty, the server MUST raise a connection exception with reply code 530 (not allowed).

RULE:

The queue must exist. Attempting to delete a non- existing queue causes a channel exception.

if_unused: boolean

delete only if unused

If set, the server will only delete the queue if it has no consumers. If the queue has consumers the server does does not delete it but raises a channel exception instead.

RULE:

The server MUST respect the if-unused flag when deleting a queue.

if_empty: boolean

delete only if empty

If set, the server will only delete the queue if it has no messages. If the queue is not empty the server raises a channel exception.

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

If nowait is False, returns the number of deleted messages.

queue_purge(queue='', nowait=False, argsig='Bsb')

Purge a queue.

This method removes all messages from a queue. It does not cancel consumers. Purged messages are deleted without any formal “undo” mechanism.

RULE:

A call to purge MUST result in an empty queue.

RULE:

On transacted channels the server MUST not purge messages that have already been sent to a client but not yet acknowledged.

RULE:

The server MAY implement a purge queue or log that allows system administrators to recover accidentally-purged messages. The server SHOULD NOT keep purged messages in the same storage spaces as the live messages since the volumes of purged messages may get very large.

PARAMETERS:

queue: shortstr

Specifies the name of the queue to purge. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue.

RULE:

If the client did not previously declare a queue, and the queue name in this method is empty, the server MUST raise a connection exception with reply code 530 (not allowed).

RULE:

The queue must exist. Attempting to purge a non- existing queue causes a channel exception.

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

If nowait is False, returns a number of purged messages.

queue_unbind(queue, exchange, routing_key='', nowait=False, arguments=None, argsig='BsssF')

Unbind a queue from an exchange.

This method unbinds a queue from an exchange.

RULE:

If a unbind fails, the server MUST raise a connection exception.

PARAMETERS:

queue: shortstr

Specifies the name of the queue to unbind.

RULE:

The client MUST either specify a queue name or have previously declared a queue on the same channel

RULE:

The client MUST NOT attempt to unbind a queue that does not exist.

exchange: shortstr

The name of the exchange to unbind from.

RULE:

The client MUST NOT attempt to unbind a queue from an exchange that does not exist.

RULE:

The server MUST accept a blank exchange name to mean the default exchange.

routing_key: shortstr

routing key of binding

Specifies the routing key of the binding to unbind.

arguments: table

arguments of binding

Specifies the arguments of the binding to unbind.

then(on_success, on_error=None)
tx_commit()

Commit the current transaction.

This method commits all messages published and acknowledged in the current transaction. A new transaction starts immediately after a commit.

tx_rollback()

Abandon the current transaction.

This method abandons all messages published and acknowledged in the current transaction. A new transaction starts immediately after a rollback.

tx_select()

Select standard transaction mode.

This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.

Transport(host, connect_timeout, ssl=False, read_timeout=None, write_timeout=None, socket_settings=None, **kwargs)[source]
auto_decode
blocking_read(timeout=None)[source]
bytes_recv = 0

Number of successful reads from socket.

bytes_sent = 0

Number of successful writes to socket.

channel(channel_id=None, callback=None)[source]

Create new channel.

Fetch a Channel object identified by the numeric channel_id, or create that object if it doesn’t already exist.

channel_errors = (<class 'amqp.exceptions.ChannelError'>,)
channel_id
client_heartbeat = None

Original heartbeat interval value proposed by client.

close(reply_code=0, reply_text='', method_sig=(0, 0), argsig='BsBB')[source]

Request a connection close.

This method indicates that the sender wants to close the connection. This may be due to internal conditions (e.g. a forced shut-down) or due to an error handling a specific method, i.e. an exception. When a close is due to an exception, the sender provides the class and method id of the method which caused the exception.

RULE:

After sending this method any received method except the Close-OK method MUST be discarded.

RULE:

The peer sending this method MAY use a counter or timeout to detect failure of the other peer to respond correctly with the Close-OK method.

RULE:

When a server receives the Close method from a client it MUST delete all server-side resources associated with the client’s context. A client CANNOT reconnect to a context after sending or receiving a Close method.

PARAMETERS:

reply_code: short

The reply code. The AMQ reply codes are defined in AMQ RFC 011.

reply_text: shortstr

The localised reply text. This text can be logged as an aid to resolving issues.

class_id: short

failing method class

When the close is provoked by a method exception, this is the class of the method.

method_id: short

failing method ID

When the close is provoked by a method exception, this is the ID of the method.

collect()[source]
connect(callback=None)[source]
property connected
connection
connection_errors = (<class 'amqp.exceptions.ConnectionError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>)
drain_events(timeout=None)[source]
property frame_writer
heartbeat = None

Final heartbeat interval value (in float seconds) after negotiation

heartbeat_tick(rate=2)[source]

Send heartbeat packets if necessary.

Raises:
~amqp.exceptions.ConnectionForvced: if none have been

received recently.

Note:

This should be called frequently, on the order of once per second.

Keyword Arguments:

rate (int): Previously used, but ignored now.

is_alive()[source]
is_closing
last_heartbeat_received = 0

Time of last heartbeat received (in monotonic time, if available).

last_heartbeat_sent = 0

Time of last heartbeat sent (in monotonic time, if available).

library_properties = {'product': 'py-amqp', 'product_version': '5.1.0'}

These are sent to the server to announce what features we support, type of client etc.

method_queue
negotiate_capabilities = {'authentication_failure_close': True, 'connection.blocked': True, 'consumer_cancel_notify': True}

Mapping of protocol extensions to enable. The server will report these in server_properties[capabilities], and if a key in this map is present the client will tell the server to either enable or disable the capability depending on the value set in this map. For example with:

negotiate_capabilities = {

‘consumer_cancel_notify’: True,

}

The client will enable this capability if the server reports support for it, but if the value is False the client will disable the capability.

property on_inbound_frame
on_inbound_method(channel_id, method_sig, payload, content)[source]
prev_recv = None

Number of bytes received from socket at the last heartbeat check.

prev_sent = None

Number of bytes sent to socket at the last heartbeat check.

recoverable_channel_errors = (<class 'amqp.exceptions.RecoverableChannelError'>,)
recoverable_connection_errors = (<class 'amqp.exceptions.RecoverableConnectionError'>, <class 'OSError'>, <class 'OSError'>, <class 'OSError'>)
send_heartbeat()[source]
property server_capabilities
server_heartbeat = None

Original heartbeat interval proposed by server.

property sock
then(on_success, on_error=None)[source]
property transport

amqp.channel

AMQP Channels.

class amqp.channel.Channel(connection, channel_id=None, auto_decode=True, on_open=None)[source]

AMQP Channel.

The channel class provides methods for a client to establish a virtual connection - a channel - to a server and for both peers to operate the virtual connection thereafter.

GRAMMAR:

channel             = open-channel *use-channel close-channel
open-channel        = C:OPEN S:OPEN-OK
use-channel         = C:FLOW S:FLOW-OK
                    / S:FLOW C:FLOW-OK
                    / functional-class
close-channel       = C:CLOSE S:CLOSE-OK
                    / S:CLOSE C:CLOSE-OK

Create a channel bound to a connection and using the specified numeric channel_id, and open on the server.

The ‘auto_decode’ parameter (defaults to True), indicates whether the library should attempt to decode the body of Messages to a Unicode string if there’s a ‘content_encoding’ property for the message. If there’s no ‘content_encoding’ property, or the decode raises an Exception, the message body is left as plain bytes.

auto_decode
basic_ack(delivery_tag, multiple=False, argsig='Lb')[source]

Acknowledge one or more messages.

This method acknowledges one or more messages delivered via the Deliver or Get-Ok methods. The client can ask to confirm a single message or a set of messages up to and including a specific message.

PARAMETERS:

delivery_tag: longlong

server-assigned delivery tag

The server-assigned and channel-specific delivery tag

RULE:

The delivery tag is valid only within the channel from which the message was received. I.e. a client MUST NOT receive a message on one channel and then acknowledge it on another.

RULE:

The server MUST NOT use a zero value for delivery tags. Zero is reserved for client use, meaning “all messages so far received”.

multiple: boolean

acknowledge multiple messages

If set to True, the delivery tag is treated as “up to and including”, so that the client can acknowledge multiple messages with a single method. If set to False, the delivery tag refers to a single message. If the multiple field is True, and the delivery tag is zero, tells the server to acknowledge all outstanding messages.

RULE:

The server MUST validate that a non-zero delivery- tag refers to an delivered message, and raise a channel exception if this is not the case.

basic_cancel(consumer_tag, nowait=False, argsig='sb')[source]

End a queue consumer.

This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply.

RULE:

If the queue no longer exists when the client sends a cancel command, or the consumer has been cancelled for other reasons, this command has no effect.

PARAMETERS:

consumer_tag: shortstr

consumer tag

Identifier for the consumer, valid within the current connection.

RULE:

The consumer tag is valid only within the channel from which the consumer was created. I.e. a client MUST NOT create a consumer in one channel and then use it in another.

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

basic_consume(queue='', consumer_tag='', no_local=False, no_ack=False, exclusive=False, nowait=False, callback=None, arguments=None, on_cancel=None, argsig='BssbbbbF')[source]

Start a queue consumer.

This method asks the server to start a “consumer”, which is a transient request for messages from a specific queue. Consumers last as long as the channel they were created on, or until the client cancels them.

RULE:

The server SHOULD support at least 16 consumers per queue, unless the queue was declared as private, and ideally, impose no limit except as defined by available resources.

PARAMETERS:

queue: shortstr

Specifies the name of the queue to consume from. If the queue name is null, refers to the current queue for the channel, which is the last declared queue.

RULE:

If the client did not previously declare a queue, and the queue name in this method is empty, the server MUST raise a connection exception with reply code 530 (not allowed).

consumer_tag: shortstr

Specifies the identifier for the consumer. The consumer tag is local to a connection, so two clients can use the same consumer tags. If this field is empty the server will generate a unique tag.

RULE:

The tag MUST NOT refer to an existing consumer. If the client attempts to create two consumers with the same non-empty tag the server MUST raise a connection exception with reply code 530 (not allowed).

no_local: boolean

do not deliver own messages

If the no-local field is set the server will not send messages to the client that published them.

no_ack: boolean

no acknowledgment needed

If this field is set the server does not expect acknowledgments for messages. That is, when a message is delivered to the client the server automatically and silently acknowledges it on behalf of the client. This functionality increases performance but at the cost of reliability. Messages can get lost if a client dies before it can deliver them to the application.

exclusive: boolean

request exclusive access

Request exclusive consumer access, meaning only this consumer can access the queue.

RULE:

If the server cannot grant exclusive access to the queue when asked, - because there are other consumers active - it MUST raise a channel exception with return code 403 (access refused).

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

callback: Python callable

function/method called with each delivered message

For each message delivered by the broker, the callable will be called with a Message object as the single argument. If no callable is specified, messages are quietly discarded, no_ack should probably be set to True in that case.

basic_get(queue='', no_ack=False, argsig='Bsb')[source]

Direct access to a queue.

This method provides a direct access to the messages in a queue using a synchronous dialogue that is designed for specific types of application where synchronous functionality is more important than performance.

PARAMETERS:

queue: shortstr

Specifies the name of the queue to consume from. If the queue name is null, refers to the current queue for the channel, which is the last declared queue.

RULE:

If the client did not previously declare a queue, and the queue name in this method is empty, the server MUST raise a connection exception with reply code 530 (not allowed).

no_ack: boolean

no acknowledgment needed

If this field is set the server does not expect acknowledgments for messages. That is, when a message is delivered to the client the server automatically and silently acknowledges it on behalf of the client. This functionality increases performance but at the cost of reliability. Messages can get lost if a client dies before it can deliver them to the application.

Non-blocking, returns a amqp.basic_message.Message object, or None if queue is empty.

basic_publish(msg, exchange='', routing_key='', mandatory=False, immediate=False, timeout=None, confirm_timeout=None, argsig='Bssbb')

Publish a message.

This method publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.

When channel is in confirm mode (when Connection parameter confirm_publish is set to True), each message is confirmed. When broker rejects published message (e.g. due internal broker constrains), MessageNacked exception is raised and set confirm_timeout to wait maximum confirm_timeout second for message to confirm.

PARAMETERS:

exchange: shortstr

Specifies the name of the exchange to publish to. The exchange name can be empty, meaning the default exchange. If the exchange name is specified, and that exchange does not exist, the server will raise a channel exception.

RULE:

The server MUST accept a blank exchange name to mean the default exchange.

RULE:

The exchange MAY refuse basic content in which case it MUST raise a channel exception with reply code 540 (not implemented).

routing_key: shortstr

Message routing key

Specifies the routing key for the message. The routing key is used for routing messages depending on the exchange configuration.

mandatory: boolean

indicate mandatory routing

This flag tells the server how to react if the message cannot be routed to a queue. If this flag is True, the server will return an unroutable message with a Return method. If this flag is False, the server silently drops the message.

RULE:

The server SHOULD implement the mandatory flag.

immediate: boolean

request immediate delivery

This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed.

RULE:

The server SHOULD implement the immediate flag.

timeout: short

timeout for publish

Set timeout to wait maximum timeout second for message to publish.

confirm_timeout: short

confirm_timeout for publish in confirm mode

When the channel is in confirm mode set confirm_timeout to wait maximum confirm_timeout second for message to confirm.

basic_publish_confirm(*args, **kwargs)[source]
basic_qos(prefetch_size, prefetch_count, a_global, argsig='lBb')[source]

Specify quality of service.

This method requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a qos method always depend on the content class semantics. Though the qos method could in principle apply to both peers, it is currently meaningful only for the server.

PARAMETERS:

prefetch_size: long

prefetch window in octets

The client can request that messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. Prefetching gives a performance improvement. This field specifies the prefetch window size in octets. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning “no specific limit”, although other prefetch limits may still apply. The prefetch-size is ignored if the no-ack option is set.

RULE:

The server MUST ignore this setting when the client is not processing any messages - i.e. the prefetch size does not limit the transfer of single messages to a client, only the sending in advance of more messages while the client still has one or more unacknowledged messages.

prefetch_count: short

prefetch window in messages

Specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it. The prefetch- count is ignored if the no-ack option is set.

RULE:

The server MAY send less data in advance than allowed by the client’s specified prefetch windows but it MUST NOT send more.

a_global: boolean

Defines a scope of QoS. Semantics of this parameter differs between AMQP 0-9-1 standard and RabbitMQ broker:

MEANING IN AMQP 0-9-1:

False: shared across all consumers on the channel True: shared across all consumers on the connection

MEANING IN RABBITMQ:
False: applied separately to each new consumer

on the channel

True: shared across all consumers on the channel

basic_recover(requeue=False)[source]

Redeliver unacknowledged messages.

This method asks the broker to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered. This method is only allowed on non-transacted channels.

RULE:

The server MUST set the redelivered flag on all messages that are resent.

RULE:

The server MUST raise a channel exception if this is called on a transacted channel.

PARAMETERS:

requeue: boolean

requeue the message

If this field is False, the message will be redelivered to the original recipient. If this field is True, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.

basic_recover_async(requeue=False)[source]
basic_reject(delivery_tag, requeue, argsig='Lb')[source]

Reject an incoming message.

This method allows a client to reject a message. It can be used to interrupt and cancel large incoming messages, or return untreatable messages to their original queue.

RULE:

The server SHOULD be capable of accepting and process the Reject method while sending message content with a Deliver or Get-Ok method. I.e. the server should read and process incoming methods while sending output frames. To cancel a partially-send content, the server sends a content body frame of size 1 (i.e. with no data except the frame-end octet).

RULE:

The server SHOULD interpret this method as meaning that the client is unable to process the message at this time.

RULE:

A client MUST NOT use this method as a means of selecting messages to process. A rejected message MAY be discarded or dead-lettered, not necessarily passed to another client.

PARAMETERS:

delivery_tag: longlong

server-assigned delivery tag

The server-assigned and channel-specific delivery tag

RULE:

The delivery tag is valid only within the channel from which the message was received. I.e. a client MUST NOT receive a message on one channel and then acknowledge it on another.

RULE:

The server MUST NOT use a zero value for delivery tags. Zero is reserved for client use, meaning “all messages so far received”.

requeue: boolean

requeue the message

If this field is False, the message will be discarded. If this field is True, the server will attempt to requeue the message.

RULE:

The server MUST NOT deliver the message to the same client within the context of the current channel. The recommended strategy is to attempt to deliver the message to an alternative consumer, and if that is not possible, to move the message to a dead-letter queue. The server MAY use more sophisticated tracking to hold the message on the queue and redeliver it to the same client at a later stage.

channel_id
close(reply_code=0, reply_text='', method_sig=(0, 0), argsig='BsBB')[source]

Request a channel close.

This method indicates that the sender wants to close the channel. This may be due to internal conditions (e.g. a forced shut-down) or due to an error handling a specific method, i.e. an exception. When a close is due to an exception, the sender provides the class and method id of the method which caused the exception.

RULE:

After sending this method any received method except Channel.Close-OK MUST be discarded.

RULE:

The peer sending this method MAY use a counter or timeout to detect failure of the other peer to respond correctly with Channel.Close-OK..

PARAMETERS:

reply_code: short

The reply code. The AMQ reply codes are defined in AMQ RFC 011.

reply_text: shortstr

The localised reply text. This text can be logged as an aid to resolving issues.

class_id: short

failing method class

When the close is provoked by a method exception, this is the class of the method.

method_id: short

failing method ID

When the close is provoked by a method exception, this is the ID of the method.

collect()[source]

Tear down this object.

Best called after we’ve agreed to close with the server.

confirm_select(nowait=False)[source]

Enable publisher confirms for this channel.

Note: This is an RabbitMQ extension.

Can now be used if the channel is in transactional mode.

Parameters

nowait – If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

connection
exchange_bind(destination, source='', routing_key='', nowait=False, arguments=None, argsig='BsssbF')[source]

Bind an exchange to an exchange.

RULE:

A server MUST allow and ignore duplicate bindings - that is, two or more bind methods for a specific exchanges, with identical arguments - without treating these as an error.

RULE:

A server MUST allow cycles of exchange bindings to be created including allowing an exchange to be bound to itself.

RULE:

A server MUST not deliver the same message more than once to a destination exchange, even if the topology of exchanges and bindings results in multiple (even infinite) routes to that exchange.

PARAMETERS:

reserved-1: short

destination: shortstr

Specifies the name of the destination exchange to bind.

RULE:

A client MUST NOT be allowed to bind a non- existent destination exchange.

RULE:

The server MUST accept a blank exchange name to mean the default exchange.

source: shortstr

Specifies the name of the source exchange to bind.

RULE:

A client MUST NOT be allowed to bind a non- existent source exchange.

RULE:

The server MUST accept a blank exchange name to mean the default exchange.

routing-key: shortstr

Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration. Not all exchanges use a routing key - refer to the specific exchange documentation.

no-wait: bit

arguments: table

A set of arguments for the binding. The syntax and semantics of these arguments depends on the exchange class.

exchange_declare(exchange, type, passive=False, durable=False, auto_delete=True, nowait=False, arguments=None, argsig='BssbbbbbF')[source]

Declare exchange, create if needed.

This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.

RULE:

The server SHOULD support a minimum of 16 exchanges per virtual host and ideally, impose no limit except as defined by available resources.

PARAMETERS:

exchange: shortstr

RULE:

Exchange names starting with “amq.” are reserved for predeclared and standardised exchanges. If the client attempts to create an exchange starting with “amq.”, the server MUST raise a channel exception with reply code 403 (access refused).

type: shortstr

exchange type

Each exchange belongs to one of a set of exchange types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. It is not valid or meaningful to attempt to change the type of an existing exchange.

RULE:

If the exchange already exists with a different type, the server MUST raise a connection exception with a reply code 507 (not allowed).

RULE:

If the server does not support the requested exchange type it MUST raise a connection exception with a reply code 503 (command invalid).

passive: boolean

do not create exchange

If set, the server will not create the exchange. The client can use this to check whether an exchange exists without modifying the server state.

RULE:

If set, and the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found).

durable: boolean

request a durable exchange

If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.

RULE:

The server MUST support both durable and transient exchanges.

RULE:

The server MUST ignore the durable field if the exchange already exists.

auto_delete: boolean

auto-delete when unused

If set, the exchange is deleted when all queues have finished using it.

RULE:

The server SHOULD allow for a reasonable delay between the point when it determines that an exchange is not being used (or no longer used), and the point when it deletes the exchange. At the least it must allow a client to create an exchange and then bind a queue to it, with a small but non-zero delay between these two actions.

RULE:

The server MUST ignore the auto-delete field if the exchange already exists.

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

arguments: table

arguments for declaration

A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation. This field is ignored if passive is True.

exchange_delete(exchange, if_unused=False, nowait=False, argsig='Bsbb')[source]

Delete an exchange.

This method deletes an exchange. When an exchange is deleted all queue bindings on the exchange are cancelled.

PARAMETERS:

exchange: shortstr

RULE:

The exchange MUST exist. Attempting to delete a non-existing exchange causes a channel exception.

if_unused: boolean

delete only if unused

If set, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but raises a channel exception instead.

RULE:

If set, the server SHOULD delete the exchange but only if it has no queue bindings.

RULE:

If set, the server SHOULD raise a channel exception if the exchange is in use.

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

exchange_unbind(destination, source='', routing_key='', nowait=False, arguments=None, argsig='BsssbF')[source]

Unbind an exchange from an exchange.

RULE:

If a unbind fails, the server MUST raise a connection exception.

PARAMETERS:

reserved-1: short

destination: shortstr

Specifies the name of the destination exchange to unbind.

RULE:

The client MUST NOT attempt to unbind an exchange that does not exist from an exchange.

RULE:

The server MUST accept a blank exchange name to mean the default exchange.

source: shortstr

Specifies the name of the source exchange to unbind.

RULE:

The client MUST NOT attempt to unbind an exchange from an exchange that does not exist.

RULE:

The server MUST accept a blank exchange name to mean the default exchange.

routing-key: shortstr

Specifies the routing key of the binding to unbind.

no-wait: bit

arguments: table

Specifies the arguments of the binding to unbind.

flow(active)[source]

Enable/disable flow from peer.

This method asks the peer to pause or restart the flow of content data. This is a simple flow-control mechanism that a peer can use to avoid overflowing its queues or otherwise finding itself receiving more messages than it can process. Note that this method is not intended for window control. The peer that receives a request to stop sending content should finish sending the current content, if any, and then wait until it receives a Flow restart method.

RULE:

When a new channel is opened, it is active. Some applications assume that channels are inactive until started. To emulate this behaviour a client MAY open the channel, then pause it.

RULE:

When sending content data in multiple frames, a peer SHOULD monitor the channel for incoming methods and respond to a Channel.Flow as rapidly as possible.

RULE:

A peer MAY use the Channel.Flow method to throttle incoming content data for internal reasons, for example, when exchanging data over a slower connection.

RULE:

The peer that requests a Channel.Flow method MAY disconnect and/or ban a peer that does not respect the request.

PARAMETERS:

active: boolean

start/stop content frames

If True, the peer starts sending content frames. If False, the peer stops sending content frames.

is_closing
method_queue
open()[source]

Open a channel for use.

This method opens a virtual connection (a channel).

RULE:

This method MUST NOT be called when the channel is already open.

PARAMETERS:

out_of_band: shortstr (DEPRECATED)

out-of-band settings

Configures out-of-band transfers on this channel. The syntax and meaning of this field will be formally defined at a later date.

queue_bind(queue, exchange='', routing_key='', nowait=False, arguments=None, argsig='BsssbF')[source]

Bind queue to an exchange.

This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and-forward queues are bound to a dest exchange and subscription queues are bound to a dest_wild exchange.

RULE:

A server MUST allow ignore duplicate bindings - that is, two or more bind methods for a specific queue, with identical arguments - without treating these as an error.

RULE:

If a bind fails, the server MUST raise a connection exception.

RULE:

The server MUST NOT allow a durable queue to bind to a transient exchange. If the client attempts this the server MUST raise a channel exception.

RULE:

Bindings for durable queues are automatically durable and the server SHOULD restore such bindings after a server restart.

RULE:

The server SHOULD support at least 4 bindings per queue, and ideally, impose no limit except as defined by available resources.

PARAMETERS:

queue: shortstr

Specifies the name of the queue to bind. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue.

RULE:

If the client did not previously declare a queue, and the queue name in this method is empty, the server MUST raise a connection exception with reply code 530 (not allowed).

RULE:

If the queue does not exist the server MUST raise a channel exception with reply code 404 (not found).

exchange: shortstr

The name of the exchange to bind to.

RULE:

If the exchange does not exist the server MUST raise a channel exception with reply code 404 (not found).

routing_key: shortstr

message routing key

Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration. Not all exchanges use a routing key - refer to the specific exchange documentation. If the routing key is empty and the queue name is empty, the routing key will be the current queue for the channel, which is the last declared queue.

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

arguments: table

arguments for binding

A set of arguments for the binding. The syntax and semantics of these arguments depends on the exchange class.

queue_declare(queue='', passive=False, durable=False, exclusive=False, auto_delete=True, nowait=False, arguments=None, argsig='BsbbbbbF')[source]

Declare queue, create if needed.

This method creates or checks a queue. When creating a new queue the client can specify various properties that control the durability of the queue and its contents, and the level of sharing for the queue.

RULE:

The server MUST create a default binding for a newly- created queue to the default exchange, which is an exchange of type ‘direct’.

RULE:

The server SHOULD support a minimum of 256 queues per virtual host and ideally, impose no limit except as defined by available resources.

PARAMETERS:

queue: shortstr

RULE:

The queue name MAY be empty, in which case the server MUST create a new queue with a unique generated name and return this to the client in the Declare-Ok method.

RULE:

Queue names starting with “amq.” are reserved for predeclared and standardised server queues. If the queue name starts with “amq.” and the passive option is False, the server MUST raise a connection exception with reply code 403 (access refused).

passive: boolean

do not create queue

If set, the server will not create the queue. The client can use this to check whether a queue exists without modifying the server state.

RULE:

If set, and the queue does not already exist, the server MUST respond with a reply code 404 (not found) and raise a channel exception.

durable: boolean

request a durable queue

If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue.

RULE:

The server MUST recreate the durable queue after a restart.

RULE:

The server MUST support both durable and transient queues.

RULE:

The server MUST ignore the durable field if the queue already exists.

exclusive: boolean

request an exclusive queue

Exclusive queues may only be consumed from by the current connection. Setting the ‘exclusive’ flag always implies ‘auto-delete’.

RULE:

The server MUST support both exclusive (private) and non-exclusive (shared) queues.

RULE:

The server MUST raise a channel exception if ‘exclusive’ is specified and the queue already exists and is owned by a different connection.

auto_delete: boolean

auto-delete queue when unused

If set, the queue is deleted when all consumers have finished using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won’t be deleted.

RULE:

The server SHOULD allow for a reasonable delay between the point when it determines that a queue is not being used (or no longer used), and the point when it deletes the queue. At the least it must allow a client to create a queue and then create a consumer to read from it, with a small but non-zero delay between these two actions. The server should equally allow for clients that may be disconnected prematurely, and wish to re- consume from the same queue without losing messages. We would recommend a configurable timeout, with a suitable default value being one minute.

RULE:

The server MUST ignore the auto-delete field if the queue already exists.

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

arguments: table

arguments for declaration

A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation. This field is ignored if passive is True.

Returns a tuple containing 3 items:

the name of the queue (essential for automatically-named queues), message count and consumer count

queue_delete(queue='', if_unused=False, if_empty=False, nowait=False, argsig='Bsbbb')[source]

Delete a queue.

This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled.

RULE:

The server SHOULD use a dead-letter queue to hold messages that were pending on a deleted queue, and MAY provide facilities for a system administrator to move these messages back to an active queue.

PARAMETERS:

queue: shortstr

Specifies the name of the queue to delete. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue.

RULE:

If the client did not previously declare a queue, and the queue name in this method is empty, the server MUST raise a connection exception with reply code 530 (not allowed).

RULE:

The queue must exist. Attempting to delete a non- existing queue causes a channel exception.

if_unused: boolean

delete only if unused

If set, the server will only delete the queue if it has no consumers. If the queue has consumers the server does does not delete it but raises a channel exception instead.

RULE:

The server MUST respect the if-unused flag when deleting a queue.

if_empty: boolean

delete only if empty

If set, the server will only delete the queue if it has no messages. If the queue is not empty the server raises a channel exception.

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

If nowait is False, returns the number of deleted messages.

queue_purge(queue='', nowait=False, argsig='Bsb')[source]

Purge a queue.

This method removes all messages from a queue. It does not cancel consumers. Purged messages are deleted without any formal “undo” mechanism.

RULE:

A call to purge MUST result in an empty queue.

RULE:

On transacted channels the server MUST not purge messages that have already been sent to a client but not yet acknowledged.

RULE:

The server MAY implement a purge queue or log that allows system administrators to recover accidentally-purged messages. The server SHOULD NOT keep purged messages in the same storage spaces as the live messages since the volumes of purged messages may get very large.

PARAMETERS:

queue: shortstr

Specifies the name of the queue to purge. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue.

RULE:

If the client did not previously declare a queue, and the queue name in this method is empty, the server MUST raise a connection exception with reply code 530 (not allowed).

RULE:

The queue must exist. Attempting to purge a non- existing queue causes a channel exception.

nowait: boolean

do not send a reply method

If set, the server will not respond to the method. The client should not wait for a reply method. If the server could not complete the method it will raise a channel or connection exception.

If nowait is False, returns a number of purged messages.

queue_unbind(queue, exchange, routing_key='', nowait=False, arguments=None, argsig='BsssF')[source]

Unbind a queue from an exchange.

This method unbinds a queue from an exchange.

RULE:

If a unbind fails, the server MUST raise a connection exception.

PARAMETERS:

queue: shortstr

Specifies the name of the queue to unbind.

RULE:

The client MUST either specify a queue name or have previously declared a queue on the same channel

RULE:

The client MUST NOT attempt to unbind a queue that does not exist.

exchange: shortstr

The name of the exchange to unbind from.

RULE:

The client MUST NOT attempt to unbind a queue from an exchange that does not exist.

RULE:

The server MUST accept a blank exchange name to mean the default exchange.

routing_key: shortstr

routing key of binding

Specifies the routing key of the binding to unbind.

arguments: table

arguments of binding

Specifies the arguments of the binding to unbind.

then(on_success, on_error=None)[source]
tx_commit()[source]

Commit the current transaction.

This method commits all messages published and acknowledged in the current transaction. A new transaction starts immediately after a commit.

tx_rollback()[source]

Abandon the current transaction.

This method abandons all messages published and acknowledged in the current transaction. A new transaction starts immediately after a rollback.

tx_select()[source]

Select standard transaction mode.

This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.

amqp.basic_message

AMQP Messages.

class amqp.basic_message.Message(body='', children=None, channel=None, **properties)[source]

A Message for use with the Channel.basic_* methods.

Expected arg types

body: string children: (not supported)

Keyword properties may include:

content_type: shortstr

MIME content type

content_encoding: shortstr

MIME content encoding

application_headers: table

Message header field table, a dict with string keys, and string | int | Decimal | datetime | dict values.

delivery_mode: octet

Non-persistent (1) or persistent (2)

priority: octet

The message priority, 0 to 9

correlation_id: shortstr

The application correlation identifier

reply_to: shortstr

The destination to reply to

expiration: shortstr

Message expiration specification

message_id: shortstr

The application message identifier

timestamp: unsigned long

The message timestamp

type: shortstr

The message type name

user_id: shortstr

The creating user id

app_id: shortstr

The creating application id

cluster_id: shortstr

Intra-cluster routing identifier

Unicode bodies are encoded according to the ‘content_encoding’ argument. If that’s None, it’s set to ‘UTF-8’ automatically.

Example:

msg = Message('hello world',
                content_type='text/plain',
                application_headers={'foo': 7})
CLASS_ID = 60
PROPERTIES = [('content_type', 's'), ('content_encoding', 's'), ('application_headers', 'F'), ('delivery_mode', 'o'), ('priority', 'o'), ('correlation_id', 's'), ('reply_to', 's'), ('expiration', 's'), ('message_id', 's'), ('timestamp', 'L'), ('type', 's'), ('user_id', 's'), ('app_id', 's'), ('cluster_id', 's')]

Instances of this class have these attributes, which are passed back and forth as message properties between client and server

body
channel
delivery_info

set by basic_consume/basic_get

property delivery_tag
property headers

amqp.exceptions

Exceptions used by amqp.

exception amqp.exceptions.AMQPDeprecationWarning[source]

Warning for deprecated things.

exception amqp.exceptions.AMQPError(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

Base class for all AMQP exceptions.

code = 0
property method
exception amqp.exceptions.AMQPNotImplementedError(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Not Implemented Error.

code = 540
exception amqp.exceptions.AccessRefused(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Access Refused Error.

code = 403
exception amqp.exceptions.ChannelError(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Channel Error.

exception amqp.exceptions.ChannelNotOpen(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Channel Not Open Error.

code = 504
exception amqp.exceptions.ConnectionError(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Connection Error.

exception amqp.exceptions.ConnectionForced(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Connection Forced Error.

code = 320
exception amqp.exceptions.ConsumerCancelled(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Consumer Cancelled Predicate.

exception amqp.exceptions.ContentTooLarge(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Content Too Large Error.

code = 311
exception amqp.exceptions.FrameError(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Frame Error.

code = 501
exception amqp.exceptions.FrameSyntaxError(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Frame Syntax Error.

code = 502
exception amqp.exceptions.InternalError(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Internal Error.

code = 541
exception amqp.exceptions.InvalidCommand(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Invalid Command Error.

code = 503
exception amqp.exceptions.InvalidPath(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Invalid Path Error.

code = 402
exception amqp.exceptions.IrrecoverableChannelError(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

Exception class for irrecoverable channel errors.

exception amqp.exceptions.IrrecoverableConnectionError(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

Exception class for irrecoverable connection errors.

exception amqp.exceptions.MessageNacked[source]

Message was nacked by broker.

exception amqp.exceptions.NoConsumers(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP No Consumers Error.

code = 313
exception amqp.exceptions.NotAllowed(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Not Allowed Error.

code = 530
exception amqp.exceptions.NotFound(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Not Found Error.

code = 404
exception amqp.exceptions.PreconditionFailed(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Precondition Failed Error.

code = 406
exception amqp.exceptions.RecoverableChannelError(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

Exception class for recoverable channel errors.

exception amqp.exceptions.RecoverableConnectionError(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

Exception class for recoverable connection errors.

exception amqp.exceptions.ResourceError(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Resource Error.

code = 506
exception amqp.exceptions.ResourceLocked(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Resource Locked Error.

code = 405
exception amqp.exceptions.UnexpectedFrame(reply_text=None, method_sig=None, method_name=None, reply_code=None)[source]

AMQP Unexpected Frame.

code = 505

amqp.abstract_channel

Code common to Connection and Channel objects.

class amqp.abstract_channel.AbstractChannel(connection, channel_id)[source]

Superclass for Connection and Channel.

The connection is treated as channel 0, then comes user-created channel objects.

The subclasses must have a _METHOD_MAP class property, mapping between AMQP method signatures and Python methods.

auto_decode
channel_id
close()[source]

Close this Channel or Connection.

connection
dispatch_method(method_sig, payload, content)[source]
is_closing
method_queue
send_method(sig, format=None, args=None, content=None, wait=None, callback=None, returns_tuple=False)[source]
wait(method, callback=None, timeout=None, returns_tuple=False)[source]

amqp.transport

Transport implementation.

class amqp.transport._AbstractTransport(host, connect_timeout=None, read_timeout=None, write_timeout=None, socket_settings=None, raise_on_initial_eintr=True, **kwargs)[source]

Common superclass for TCP and SSL transports.

PARAMETERS:

host: str

Broker address in format HOSTNAME:PORT.

connect_timeout: int

Timeout of creating new connection.

read_timeout: int

sets SO_RCVTIMEO parameter of socket.

write_timeout: int

sets SO_SNDTIMEO parameter of socket.

socket_settings: dict

dictionary containing optname and optval passed to setsockopt(2).

raise_on_initial_eintr: bool

when True, socket.timeout is raised when exception is received during first read. See _read() for details.

close()[source]
connect()[source]
connect_timeout
connection
having_timeout(timeout)[source]
host
port
raise_on_initial_eintr
read_frame(unpack=<built-in function unpack>)[source]

Parse AMQP frame.

Frame has following format:

0      1         3         7                   size+7      size+8
+------+---------+---------+   +-------------+   +-----------+
| type | channel |  size   |   |   payload   |   | frame-end |
+------+---------+---------+   +-------------+   +-----------+
 octet    short     long        'size' octets        octet
read_timeout
sock
socket_settings
write(s)[source]
write_timeout
class amqp.transport.SSLTransport(host, connect_timeout=None, ssl=None, **kwargs)[source]

Transport that works over SSL.

PARAMETERS:

host: str

Broker address in format HOSTNAME:PORT.

connect_timeout: int

Timeout of creating new connection.

ssl: bool|dict

parameters of TLS subsystem.
  • when ssl is not dictionary, defaults of TLS are used

  • otherwise:
    • if ssl dictionary contains context key, _wrap_context is used for wrapping socket. context is a dictionary passed to _wrap_context as context parameter. All others items from ssl argument are passed as sslopts.

    • if ssl dictionary does not contain context key, _wrap_socket_sni is used for wrapping socket. All items in ssl argument are passed to _wrap_socket_sni as parameters.

kwargs:

additional arguments of _AbstractTransport class

_wrap_context(sock, sslopts, check_hostname=None, **ctx_options)[source]

Wrap socket without SNI headers.

PARAMETERS:

sock: socket.socket

Socket to be wrapped.

sslopts: dict

check_hostname

Whether to match the peer cert’s hostname. See ssl.SSLContext.check_hostname for details.

ctx_options

Parameters of ssl.create_default_context.

_wrap_socket_sni(sock, keyfile=None, certfile=None, server_side=False, cert_reqs=None, ca_certs=None, do_handshake_on_connect=False, suppress_ragged_eofs=True, server_hostname=None, ciphers=None, ssl_version=None)[source]

Socket wrap with SNI headers.

stdlib ssl.SSLContext.wrap_socket method augmented with support for setting the server_hostname field required for SNI hostname header.

PARAMETERS:

sock: socket.socket

Socket to be wrapped.

keyfile: str

Path to the private key

certfile: str

Path to the certificate

server_side: bool

Identifies whether server-side or client-side behavior is desired from this socket. See wrap_socket for details.

cert_reqs: ssl.VerifyMode

When set to other than ssl.CERT_NONE, peers certificate is checked. Possible values are ssl.CERT_NONE, ssl.CERT_OPTIONAL and ssl.CERT_REQUIRED.

ca_certs: str

Path to “certification authority” (CA) certificates used to validate other peers’ certificates when cert_reqs is other than ssl.CERT_NONE.

do_handshake_on_connect: bool

Specifies whether to do the SSL handshake automatically. See wrap_socket for details.

suppress_ragged_eofs (bool):

See wrap_socket for details.

server_hostname: str

Specifies the hostname of the service which we are connecting to. See wrap_socket for details.

ciphers: str

Available ciphers for sockets created with this context. See ssl.SSLContext.set_ciphers

ssl_version:

Protocol of the SSL Context. The value is one of ssl.PROTOCOL_* constants.

sslopts
class amqp.transport.TCPTransport(host, connect_timeout=None, read_timeout=None, write_timeout=None, socket_settings=None, raise_on_initial_eintr=True, **kwargs)[source]

Transport that deals directly with TCP socket.

All parameters are _AbstractTransport class.

connect_timeout
connection
host
port
raise_on_initial_eintr
read_timeout
sock
socket_settings
write_timeout
class amqp.transport.Transport(host, connect_timeout=None, ssl=False, **kwargs)[source]

Create transport.

Given a few parameters from the Connection constructor, select and create a subclass of _AbstractTransport.

PARAMETERS:

host: str

Broker address in format HOSTNAME:PORT.

connect_timeout: int

Timeout of creating new connection.

ssl: bool|dict

If set, SSLTransport is used and ssl parameter is passed to it. Otherwise TCPTransport is used.

kwargs:

additional arguments of _AbstractTransport class

amqp.method_framing

Convert between frames and higher-level AMQP methods.

amqp.method_framing.frame_handler(connection, callback, unpack_from=<built-in function unpack_from>, content_methods=frozenset({(60, 50), (60, 60), (60, 71)}))[source]

Create closure that reads frames.

amqp.method_framing.frame_writer(connection, transport, pack=<built-in function pack>, pack_into=<built-in function pack_into>, range=<class 'range'>, len=<built-in function len>, bytes=<class 'bytes'>, str_to_bytes=<function str_to_bytes>, text_t=<class 'str'>)[source]

Create closure that writes frames.

amqp.platform

Platform compatibility.

amqp.protocol

Protocol data.

class amqp.protocol.basic_return_t(reply_code, reply_text, exchange, routing_key, message)
property exchange

Alias for field number 2

property message

Alias for field number 4

property reply_code

Alias for field number 0

property reply_text

Alias for field number 1

property routing_key

Alias for field number 3

class amqp.protocol.queue_declare_ok_t(queue, message_count, consumer_count)
property consumer_count

Alias for field number 2

property message_count

Alias for field number 1

property queue

Alias for field number 0

amqp.spec

SASL mechanisms for AMQP authentication.

class amqp.sasl.AMQPLAIN(username, password)[source]

AMQPLAIN SASL authentication mechanism.

This is a non-standard mechanism used by AMQP servers.

mechanism = b'AMQPLAIN'
password
start(connection)[source]

Return the first response to a SASL challenge as a bytes object.

username
class amqp.sasl.EXTERNAL[source]

EXTERNAL SASL mechanism.

Enables external authentication, i.e. not handled through this protocol. Only passes ‘EXTERNAL’ as authentication mechanism, but no further authentication data.

mechanism = b'EXTERNAL'
start(connection)[source]

Return the first response to a SASL challenge as a bytes object.

amqp.sasl.GSSAPI

alias of amqp.sasl._get_gssapi_mechanism.<locals>.FakeGSSAPI

class amqp.sasl.PLAIN(username, password)[source]

PLAIN SASL authentication mechanism.

See https://tools.ietf.org/html/rfc4616 for details

mechanism = b'PLAIN'
password
start(connection)[source]

Return the first response to a SASL challenge as a bytes object.

username
class amqp.sasl.RAW(mechanism, response)[source]

A generic custom SASL mechanism.

This mechanism takes a mechanism name and response to send to the server, so can be used for simple custom authentication schemes.

mechanism = None
start(connection)[source]

Return the first response to a SASL challenge as a bytes object.

class amqp.sasl.SASL[source]

The base class for all amqp SASL authentication mechanisms.

You should sub-class this if you’re implementing your own authentication.

property mechanism

Return a bytes containing the SASL mechanism name.

start(connection)[source]

Return the first response to a SASL challenge as a bytes object.

amqp.serialization

Convert between bytestreams and higher-level AMQP types.

2007-11-05 Barry Pederson <bp@barryp.org>

class amqp.serialization.GenericContent(frame_method=None, frame_args=None, **props)[source]

Abstract base class for AMQP content.

Subclasses should override the PROPERTIES attribute.

CLASS_ID = None
PROPERTIES = [('dummy', 's')]
body_received
body_size
frame_args
frame_method
inbound_body(buf)[source]
inbound_header(buf, offset=0)[source]
properties
ready
amqp.serialization.decode_properties_basic(buf, offset)[source]

Decode basic properties.

amqp.serialization.dumps(format, values)[source]

Serialize AMQP arguments.

Notes:

bit = b octet = o short = B long = l long long = L shortstr = s longstr = S byte array = x table = F array = A

amqp.serialization.loads(format, buf, offset)[source]

Deserialize amqp format.

bit = b octet = o short = B long = l long long = L float = f shortstr = s longstr = S table = F array = A timestamp = T

amqp.spec

AMQP Spec.

class amqp.spec.Basic[source]

AMQ Basic class.

Ack = (60, 80)
CLASS_ID = 60
Cancel = (60, 30)
CancelOk = (60, 31)
Consume = (60, 20)
ConsumeOk = (60, 21)
Deliver = (60, 60)
Get = (60, 70)
GetEmpty = (60, 72)
GetOk = (60, 71)
Nack = (60, 120)
Publish = (60, 40)
Qos = (60, 10)
QosOk = (60, 11)
Recover = (60, 110)
RecoverAsync = (60, 100)
RecoverOk = (60, 111)
Reject = (60, 90)
Return = (60, 50)
class amqp.spec.Channel[source]

AMQ Channel class.

CLASS_ID = 20
Close = (20, 40)
CloseOk = (20, 41)
Flow = (20, 20)
FlowOk = (20, 21)
Open = (20, 10)
OpenOk = (20, 11)
class amqp.spec.Confirm[source]

AMQ Confirm class.

CLASS_ID = 85
Select = (85, 10)
SelectOk = (85, 11)
class amqp.spec.Connection[source]

AMQ Connection class.

Blocked = (10, 60)
CLASS_ID = 10
Close = (10, 50)
CloseOk = (10, 51)
Open = (10, 40)
OpenOk = (10, 41)
Secure = (10, 20)
SecureOk = (10, 21)
Start = (10, 10)
StartOk = (10, 11)
Tune = (10, 30)
TuneOk = (10, 31)
Unblocked = (10, 61)
class amqp.spec.Exchange[source]

AMQ Exchange class.

Bind = (40, 30)
BindOk = (40, 31)
CLASS_ID = 40
Declare = (40, 10)
DeclareOk = (40, 11)
Delete = (40, 20)
DeleteOk = (40, 21)
Unbind = (40, 40)
UnbindOk = (40, 51)
class amqp.spec.Queue[source]

AMQ Queue class.

Bind = (50, 20)
BindOk = (50, 21)
CLASS_ID = 50
Declare = (50, 10)
DeclareOk = (50, 11)
Delete = (50, 40)
DeleteOk = (50, 41)
Purge = (50, 30)
PurgeOk = (50, 31)
Unbind = (50, 50)
UnbindOk = (50, 51)
class amqp.spec.Tx[source]

AMQ Tx class.

CLASS_ID = 90
Commit = (90, 20)
CommitOk = (90, 21)
Rollback = (90, 30)
RollbackOk = (90, 31)
Select = (90, 10)
SelectOk = (90, 11)
amqp.spec.method(method_sig, args=None, content=False)[source]

Create amqp method specification tuple.

class amqp.spec.method_t(method_sig, args, content)
property args

Alias for field number 1

property content

Alias for field number 2

property method_sig

Alias for field number 0

amqp.utils

Compatibility utilities.

amqp.utils.bytes_to_str(s)[source]

Convert bytes to str.

amqp.utils.coro(gen)[source]

Decorator to mark generator as a co-routine.

amqp.utils.get_logger(logger)[source]

Get logger by name.

amqp.utils.set_cloexec(fd, cloexec)[source]

Set flag to close fd after exec.

amqp.utils.str_to_bytes(s)[source]

Convert str to bytes.

Changes

py-amqp is fork of amqplib used by Kombu containing additional features and improvements. The previous amqplib changelog is here: http://code.google.com/p/py-amqplib/source/browse/CHANGES

5.1.0

release-date

2022-03-06 10:05 A.M. UTC+6:00

release-by

Asif Saif Uddin

  • Improve performance of _get_free_channel_id, fix channel max bug (#385).

  • Document memoryview usage, minor frame_writer.write_frame refactor (#384).

  • Start dropping python 3.6 (#387).

  • Added experimental __slots__ to some classes (#368)

  • Relaxed vine version for upcoming release.

  • Upgraded topytest 7 (#388).

5.0.9

release-date

2021-12-20 11:00 A.M. UTC+6:00

release-by

Asif Saif Uddin

  • Append to _used_channel_ids in _used_channel_ids

5.0.8

release-date

2021-12-19 11:15 A.M. UTC+6:00

release-by

Asif Saif Uddin

  • Reduce memory usage of Connection (#377)

  • Add additional error handling around code where an OSError may be raised on failed connections. Fixes (#378)

5.0.7

release-date

2021-12-13 15:45 P.M. UTC+6:00

release-by

Asif Saif Uddin

  • Remove dependency to case

  • Bugfix: not closing socket after server disconnect

5.0.6

release-date

2021-04-01 10:45 A.M. UTC+6:00

release-by

Asif Saif Uddin

  • Change the order in which context.check_hostname and context.verify_mode get set in SSLTransport._wrap_socket_sni. Fixes bug introduced in 5.0.3 where setting context.verify_mode = ssl.CERT_NONE would raise “ValueError: Cannot set verify_mode to CERT_NONE when check_hostname is enabled.” Setting context.check_hostname prior to setting context.verify_mode resolves the issue.

  • Remove TCP_USER_TIMEOUT option for Solaris (#355)

  • Pass long_description to setup() (#353)

  • Fix for tox-docker 2.0

  • Moved to GitHub actions CI (#359)

5.0.5

release-date

2021-01-28 4:30 P.M UTC+6:00

release-by

Asif Saif Uddin

  • Removed mistakenly introduced code which was causing import errors

5.0.4

release-date

2021-01-28 2:30 P.M UTC+6:00

release-by

Asif Saif Uddin

  • Add missing load_default_certs() call to fix a regression in v5.0.3 release. (#350)

5.0.3

release-date

2021-01-19 9:00 P.M UTC+6:00

release-by

Asif Saif Uddin

  • Change the default value of ssl_version to None. When not set, the proper value between ssl.PROTOCOL_TLS_CLIENT and ssl.PROTOCOL_TLS_SERVER will be selected based on the param server_side in order to create a TLS Context object with better defaults that fit the desired connection side.

  • Change the default value of cert_reqs to None. The default value of ctx.verify_mode is ssl.CERT_NONE, but when ssl.PROTOCOL_TLS_CLIENT is used, ctx.verify_mode defaults to ssl.CERT_REQUIRED.

  • Fix context.check_hostname logic. Checking the hostname depends on having support of the SNI TLS extension and being provided with a server_hostname value. Another important thing to mention is that enabling hostname checking automatically sets verify_mode from ssl.CERT_NONE to ssl.CERT_REQUIRED in the stdlib ssl and it cannot be set back to ssl.CERT_NONE as long as hostname checking is enabled.

  • Refactor the SNI tests to test one thing at a time and removing some tests that were being repeated over and over.

5.0.2

release-date

2020-11-08 6:50 P.M UTC+3:00

release-by

Omer Katz

  • Whhels are no longer universal.

    Contributed by Omer Katz

  • Added debug representation to Connection and *Transport classes

    Contributed by Matus Valo

  • Reintroduce ca_certs and ciphers parameters of SSLTransport._wrap_socket_sni()

    This fixes issue introduced in commit: 53d6777

    Contributed by Matus Valo

  • Fix infinite wait when using confirm_publish

    Contributed by Omer Katz & RezaSi

5.0.1

release-date

2020-09-06 6:10 P.M UTC+3:00

release-by

Omer Katz

  • Require vine 5.0.0.

    Contributed by Omer Katz

5.0.0

release-date

2020-09-03 3:20 P.M UTC+3:00

release-by

Omer Katz

  • Stop to use deprecated method ssl.wrap_socket.

    Contributed by Hervé Beraud

5.0.0b1

release-date

2020-09-01 6:20 P.M UTC+3:00

release-by

Omer Katz

  • Dropped Python 3.5 support.

    Contributed by Omer Katz

  • Removed additional compatibility code.

    Contributed by Omer Katz

5.0.0a1

release-date

2019-04-01 4:30 P.M UTC+3:00

release-by

Omer Katz

  • Dropped Python 2.x support.

    Contributed by Omer Katz

  • Dropped Python 3.4 support.

    Contributed by Omer Katz

  • Depend on vine 5.0.0a1.

    Contributed by Omer Katz

Code Cleanups & Improvements:

  • Omer Katz

2.6.1

release-date

2020-07-31 10.30 P.M UTC+6:00

release-by

Asif Saif Uddin

  • Fix buffer overflow in frame_writer after frame_max is increased. frame_writer

allocates a bytearray on initialization with a length based on the connection.frame_max value. If connection.frame_max is changed to a larger value, this causes an error like pack_into requires a buffer of at least 408736 bytes.

2.6.0

release-date

20-06-01 12.00 P.M UTC+6:00

release-by

Asif Saif Uddin

  • Implement speedups in cython (#311)

  • Updated some tests & code improvements

  • Separate logger for Connection.heartbeat_tick method

  • Cython generic content (#315)

  • Improve documentation a_global parameter of basic_qos() method.

  • Fix saving partial read buffer on windows during socket timeout. (#321)

  • Fix deserialization of long string field values that are not utf-8.

  • Added simple cythonization of abstract_channel.py

  • Speedups of serialization.py are more restrictive

2.5.2

release-date

2019-09-30 19.00 P.M UTC+6:00

release-by

Asif Saif Uddin

  • Fixed a channel issue against a connection already closed

  • Updated some tests & code improvements

2.5.1

release-date

2019-08-14 22.00 P.M UTC+6:00

release-by

Asif Saif Uddin

  • Ignore all methods except Close and Close-OK when channel/connection is closing

  • Fix faulty ssl sni intiation parameters (#283)

  • Undeprecate auto_delete flag for exchanges. (#287)

  • Improved tests and testing environments

2.5.0

release-date

2019-05-30 17.30 P.M UTC+6:00

release-by

Asif Saif Uddin

  • Drop Python 3.4

  • Add new platform

  • Numerious bug fixes

2.4.2

release-date

2019-03-03 10:45 P.M UTC+2:00

release-by

Omer Katz

  • Added support for the Cygwin platform

    Contributed by Matus Valo

  • Correct offset incrementation when parsing bitmaps.

    Contributed by Allan Simon & Omer Katz

  • Consequent bitmaps are now parsed correctly.

    Previously the bit counter was reset with every bit. We now reset it once per 8 bits, when we consume the next byte.

    Contributed by Omer Katz

Code Cleanups & Improvements:

  • Patrick Cloke

  • Matus Valo

  • Jeremiah Cooper

  • Omer Katz

Test Coverage & CI Improvements:

  • Matus Valo

  • Omer Katz

  • Jeremiah Cooper

  • Omer Katz

2.4.1

release-date

2018-04-02 9:00 A.M UTC+2

release-by

Omer Katz

  • To avoid breaking the API basic_consume() now returns the consumer tag instead of a tuple when nowait is True.

    Fix contributed by Matus Valo

  • Fix crash in basic_publish when broker does not support connection.blocked capability.

    Fix contributed by Matus Valo

  • read_frame() is now Python 3 compatible for large payloads.

    Fix contributed by Antonio Ojea

  • Support float read_timeout/write_timeout.

    Fix contributed by :github_user:`cadl`

  • Always treat SSLError timeouts as socket timeouts.

    Fix contributed by Dirk Mueller and Antonio Ojea

  • Treat EWOULDBLOCK as timeout.

    This fixes a regression on Windows from 2.4.0.

    Fix contributed by Lucian Petrut

Test Coverage & CI Improvements:

  • Matus Valo

  • Antonio Ojea

2.4.0

release-date

2018-13-01 1:00 P.M UTC+2

release-by

Omer Katz

  • Fix inconsistent frame_handler return value.

    The function returned by frame_handler is meant to return True once the complete message is received and the callback is called, False otherwise.

    This fixes the return value for messages with a body split across multiple frames, and heartbeat frames.

    Fix contributed by :github_user:`evanunderscore`

  • Don’t default content_encoding to utf-8 for bytes.

    This is not an acceptable default as the content may not be valid utf-8, and even if it is, the producer likely does not expect the message to be decoded by the consumer.

    Fix contributed by :github_user:`evanunderscore`

  • Fix encoding of messages with multibyte characters.

    Body length was previously calculated using string length, which may be less than the length of the encoded body when it contains multibyte sequences. This caused the body of the frame to be truncated.

    Fix contributed by :github_user:`evanunderscore`

  • Respect content_encoding when encoding messages.

    Previously the content_encoding was ignored and messages were always encoded as utf-8. This caused messages to be incorrectly decoded if content_encoding is properly respected when decoding.

    Fix contributed by :github_user:`evanunderscore`

  • Fix AMQP protocol header for AMQP 0-9-1.

    Previously it was set to a different value for unknown reasons.

    Fix contributed by Carl Hörberg

  • Add support for Python 3.7.

    Change direct SSLSocket instantiation with wrap_socket. Added Python 3.7 to CI.

    Fix contributed by Omer Katz and :github_user:`avborhanian`

  • Add support for field type “x” (byte array).

    Fix contributed by Davis Kirkendall

  • If there is an exception raised on Connection.connect or Connection.close, ensure that the underlying transport socket is closed.

    Adjust exception message on connection errors as well.

    Fix contributed by :github_user:`tomc797`

  • TCP_USER_TIMEOUT has to be excluded from KNOWN_TCP_OPTS in BSD platforms.

    Fix contributed by George Tantiras

  • Handle negative acknowledgments.

    Fix contributed by Matus Valo

  • Added integration tests.

    Fix contributed by Matus Valo

  • Fix basic_consume() with no consumer_tag provided.

    Fix contributed by Matus Valo

  • Improved empty AMQPError string representation.

    Fix contributed by Matus Valo

  • Drain events before publish.

    This is needed to capture out of memory messages for clients that only publish. Otherwise on_blocked is never called.

    Fix contributed by Jelte Fennema and Matus Valo

  • Don’t revive channel when connection is closing.

    When connection is closing don’t raise error when Channel.Close method is received.

    Fix contributed by Matus Valo

2.3.2

release-date

2018-05-29 15:30 P.M UTC+3

release-by

Omer Katz

  • Fix a regression that occurs when running amqp on OSX.

    TCP_USER_TIMEOUT is not available when running on OSX. We now remove it from the set of known TCP options.

    Fix contributed by Ofer Horowitz

2.3.1

release-date

2018-05-28 16:30 P.M UTC+3

release-by

Omer Katz

  • Fix a regression that occurs when running amqp under Python 2.7.

    #182 mistakenly replaced a type check with unicode to string_t which is str in Python 2.7. text_t should have been used instead. This is now fixed and the tests have been adjusted to ensure this never regresses again.

    Fix contributed by Omer Katz

2.3.0

release-date

2018-05-27 16:30 P.M UTC+3

release-by

Omer Katz

  • Cleanup TCP configurations across platforms and unified defaults.

    Fix contributed by Dan Chowdhury

  • Fix for TypeError when setting socket options.

    Fix contributed by Matthias Erll

  • Ensure that all call sites for decoding bytes to str allow surrogates, as the encoding mechanism now supports.

    Fix contributed by Stephen Hatch

  • Don’t send AAAA DNS request when domain resolved to IPv4 address.

    Fix contributed by Ihar Hrachyshka & Omer Katz

  • Support for EXTERNAL authentication and specific login_method.

    Fix contributed by Matthias Erll

  • If the old python-gssapi library is installed the gssapi module will be available. We now ensure that we only use the new gssapi library.

    Fix contributed by Jacopo Notarstefano

Code Cleanups & Test Coverage:

2.2.2

release-date

2017-09-14 09:00 A.M UTC+2

release-by

Omer Katz

  • Sending empty messages no longer hangs. Instead an empty message is sent correctly.(addresses #151)

    Fix contributed by Christian Blades

  • Fixed compatibility issues in UTF-8 encoding behavior between Py2/Py3 (#164)

    Fix contributed by Tyler James Harden

2.2.1

release-date

2017-07-14 09:00 A.M UTC+2

release-by

Omer Katz

  • Fix implicit conversion from bytes to string on the connection object. (Issue #155)

    This issue has caused Celery to crash on connection to RabbitMQ.

    Fix contributed by Omer Katz

2.2.0

release-date

2017-07-12 10:00 A.M UTC+2

release-by

Ask Solem

  • Fix random delays in task execution.

    This is a bug that caused performance issues due to polling timeouts that occur when receiving incomplete AMQP frames. (Issues #3978 #3737 #3814)

    Fix contributed by Robert Kopaczewski

  • Calling conn.collect() multiple times will no longer raise an AttributeError when no channels exist.

    Fix contributed by Gord Chung

  • Fix compatibility code for Python 2.7.6.

    Fix contributed by Jonathan Schuff

  • When running in Windows, py-amqp will no longer use the unsupported TCP option TCP_MAXSEG.

    Fix contributed by Tony Breeds

  • Added support for setting the SNI hostname header.

    The SSL protocol version is now set to SSLv23

    Contributed by Dhananjay Sathe

  • Authentication mechanisms were refactored to be more modular. GSSAPI authentication is now supported.

    Contributed by Alexander Dutton

  • Do not reconnect on collect.

    Fix contributed by Gord Chung

2.1.4

release-date

2016-12-14 03:40 P.M PST

release-by

Ask Solem

  • Removes byte string comparison warnings when running under python -b.

    Fix contributed by Jon Dufresne.

  • Linux version parsing broke when the version included a ‘+’ character (Issue #119).

  • Now sets default TCP settings for platforms that support them (e.g. Linux).

    Constant

    Value

    TCP_KEEPIDLE

    60

    TCP_KEEPINTVL

    10

    TCP_KEEPCNT

    9

    TCP_USER_TIMEOUT

    1000 (1s)

    This will help detecting the socket being closed earlier, which is very important in failover and load balancing scenarios.

2.1.3

release-date

2016-12-07 06:00 P.M PST

release-by

Ask Solem

  • Fixes compatibility with Python 2.7.5 and below (Issue #107).

2.1.2

release-date

2016-12-07 02:00 P.M PST

  • Linux: Now sets the TCP_USER_TIMEOUT flag if available for better failed connection detection.

    Contributed by Jelte Fennema.

    The timeout is set to the connect_timeout value by default, but can also be specified by using the socket_settings argument to Connection:

    from amqp import Connection
    from amqp.platform import TCP_USER_TIMEOUT
    
    conn = Connection(socket_settings={
        TCP_USER_TIMEOUT: int(60 * 1000),  # six minutes in ms.
    })
    

    When using Kombu this can be specified as part of the transport_options:

    from amqp.platform import TCP_USER_TIMEOUT
    from kombu import Connection
    
    conn = Connection(transport_options={
        'socket_settings': {
            TCP_USER_TIMEOUT: int(60 * 1000),  # six minutes in ms.
        },
    })
    

    Or when using Celery it can be specified using the broker_transport_options setting:

    from amqp.platform import TCP_USER_TIMEOUT
    from celery import Celery
    
    app = Celery()
    app.conf.update(
        broker_transport_options={
            TCP_USER_TIMEOUT: int(60 * 1000),  # six minutes in ms.
        }
    )
    
  • Python compatibility: Fixed compatibility when using the python -b flag.

    Fix contributed by Jon Dufresne.

2.1.1

release-date

2016-10-13 06:36 P.M PDT

release-by

Ask Solem

  • Requirements

    • Now depends on Vine 1.1.3.

  • Frame writer: Account for overhead when calculating frame size.

    The client would crash if the message was within a certain size.

  • Fixed struct unicode problems (#108)

    • Standardize pack invocations on bytestrings.

    • Leave some literals as strings to enable interpolation.

    • Fix flake8 fail.

    Fix contributed by Brendan Smithyman.

2.1.0

release-date

2016-09-07 04:23 P.M PDT

release-by

Ask Solem

  • Requirements

    • Now depends on Vine 1.1.2.

  • Now licensed under the BSD license!

    Thanks to Barry Pederson for approving the license change, which unifies the license used across all projects in the Celery organization.

  • Datetimes in method frame arguments are now handled properly.

  • Fixed compatibility with Python <= 2.7.6

  • Frame_writer is no longer a generator, which should solve a rare “generator already executing” error (Issue #103).

2.0.3

release-date

2016-07-11 08:00 P.M PDT

release-by

Ask Solem

  • SSLTransport: Fixed crash “no attribute sslopts” when ssl=True (Issue #100).

  • Fixed incompatible argument spec for Connection.Close (Issue #45).

    This caused the RabbitMQ server to raise an exception (INTERNAL ERROR).

  • Transport: No longer implements __del__ to make sure gc can collect connections.

    It’s the responsibility of the caller to close connections, this was simply a relic from the amqplib library.

2.0.2

release-date

2016-06-10 5:40 P.M PDT

release-by

Ask Solem

  • Python 3: Installation requirements ended up being a generator and crashed setup.py.

    Fix contributed by ChangBo Guo(gcb).

  • Python <= 2.7.7: struct.pack arguments cannot be unicode

    Fix contributed by Alan Justino and Xin Li.

  • Python 3.4: Fixed use of bytes % int.

    Fix contributed by Alan Justino.

  • Connection/Transport: Fixed handling of default port.

    Fix contributed by Quentin Pradet.

2.0.1

release-date

2016-05-31 6:20 P.M PDT

release-by

Ask Solem

  • Adds backward compatibility layer for the 1.4 API.

    Using the connection without calling .connect() first will now work, but a warning is emitted and the behavior is deprecated and will be removed in version 2.2.

  • Fixes kombu 3.0/celery 3.1 compatibility (Issue #88).

    Fix contributed by Bas ten Berge.

  • Fixed compatibility with Python 2.7.3 (Issue #85)

    Fix contributed by Bas ten Berge.

  • Fixed bug where calling drain_events() with a timeout of 0 would actually block until a frame is received.

  • Documentation moved to http://amqp.readthedocs.io (Issue #89).

    See https://blog.readthedocs.com/securing-subdomains/ for the reasoning behind this change.

    Fix contributed by Adam Chainz.

2.0.0

release-date

2016-05-26 1:44 P.M PDT

release-by

Ask Solem

  • No longer supports Python 2.6

  • You must now call Connection.connect() to establish the connection.

    The Connection constructor no longer has side effects, so you have to explicitly call connect first.

  • Library rewritten to anticipate async changes.

  • Connection now exposes underlying socket options.

    This change allows to set arbitrary TCP socket options during the creation of the transport.

    Those values can be set passing a dictionray where the key is the name of the parameter we want to set. The names of the keys are the ones reported above.

    Contributed by Andrea Rosa, Dallas Marlow and Rongze Zhu.

  • Additional logging for heartbeats.

    Contributed by Davanum Srinivas, and Dmitry Mescheryakov.

  • SSL: Fixes issue with remote connection hanging

    Fix contributed by Adrien Guinet.

  • SSL: ssl dict argument now supports the check_hostname key

    (Issue #63).

    Contributed by Vic Kumar.

  • Contributions by:

    Adrien Guinet Andrea Rosa Artyom Koval Corey Farwell Craig Jellick Dallas Marlow Davanum Srinivas Federico Ficarelli Jared Lewis Rémy Greinhofer Rongze Zhu Yury Selivanov Vic Kumar Vladimir Bolshakov @lezeroq

1.4.9

release-date

2016-01-08 5:50 P.M PST

release-by

Ask Solem

  • Fixes compatibility with Linux/macOS instances where the ctypes module does not exist.

    Fix contributed by Jared Lewis.

1.4.8

release-date

2015-12-07 12:25 A.M

release-by

Ask Solem

  • abstract_channel.wait now accepts a float timeout parameter expressed

    in seconds

    Contributed by Goir.

1.4.7

release-date

2015-10-02 05:30 P.M PDT

release-by

Ask Solem

  • Fixed libSystem error on macOS 10.11 (El Capitan)

    Fix contributed by Eric Wang.

  • channel.basic_publish now raises amqp.exceptions.NotConfirmed on

    basic.nack.

  • AMQP timestamps received are now converted from GMT instead of local time

    (Issue #67).

  • Wheel package installation now supported by both Python 2 and Python3.

    Fix contributed by Rémy Greinhofer.

1.4.6

release-date

2014-08-11 06:00 P.M UTC

release-by

Ask Solem

  • Now keeps buffer when socket times out.

    Fix contributed by Artyom Koval.

  • Adds Connection.Transport attribute that can be used to specify a different transport implementation.

    Contributed by Yury Selivanov.

1.4.5

release-date

2014-04-15 09:00 P.M UTC

release-by

Ask Solem

  • Can now deserialize more AMQP types.

    Now handles types short string, short short int, short short unsigned int, short int, short unsigned int, long unsigned int, long long int, long long unsigned int and float which for some reason was missing, even in the original amqplib module.

  • SSL: Workaround for Python SSL bug.

    A bug in the python socket library causes ssl.read/write() on a closed socket to raise AttributeError instead of IOError.

    Fix contributed by Craig Jellick.

  • Transport.__del_ now handles errors occurring at late interpreter shutdown (Issue #36).

1.4.4

release-date

2014-03-03 04:00 P.M UTC

release-by

Ask Solem

  • SSL transport accidentally disconnected after read timeout.

    Fix contributed by Craig Jellick.

1.4.3

release-date

2014-02-09 03:00 P.M UTC

release-by

Ask Solem

  • Fixed bug where more data was requested from the socket than was actually needed.

    Contributed by Ionel Cristian Mărieș.

1.4.2

release-date

2014-01-23 05:00 P.M UTC

  • Heartbeat negotiation would use heartbeat value from server even if heartbeat disabled (Issue #31).

1.4.1

release-date

2014-01-14 09:30 P.M UTC

release-by

Ask Solem

  • Fixed error occurring when heartbeats disabled.

1.4.0

release-date

2014-01-13 03:00 P.M UTC

release-by

Ask Solem

  • Heartbeat implementation improved (Issue #6).

    The new heartbeat behavior is the same approach as taken by the RabbitMQ java library.

    This also means that clients should preferably call the heartbeat_tick method more frequently (like every second) instead of using the old rate argument (which is now ignored).

    • Heartbeat interval is negotiated with the server.

    • Some delay is allowed if the heartbeat is late.

    • Monotonic time is used to keep track of the heartbeat instead of relying on the caller to call the checking function at the right time.

    Contributed by Dustin J. Mitchell.

  • NoneType is now supported in tables and arrays.

    Contributed by Dominik Fässler.

  • SSLTransport: Now handles ENOENT.

    Fix contributed by Adrien Guinet.

1.3.3

release-date

2013-11-11 03:30 P.M UTC

release-by

Ask Solem

  • SSLTransport: Now keeps read buffer if an exception is raised (Issue #26).

    Fix contributed by Tommie Gannert.

1.3.2

release-date

2013-10-29 02:00 P.M UTC

release-by

Ask Solem

  • Message.channel is now a channel object (not the channel id).

  • Bug in previous version caused the socket to be flagged as disconnected at EAGAIN/EINTR.

1.3.1

release-date

2013-10-24 04:00 P.M UTC

release-by

Ask Solem

  • Now implements Connection.connected (Issue #22).

  • Fixed bug where str(AMQPError) did not return string.

1.3.0

release-date

2013-09-04 02:39 P.M UTC

release-by

Ask Solem

  • Now sets Message.channel on delivery (Issue #12)

    amqplib used to make the channel object available as Message.delivery_info['channel'], but this was removed in py-amqp. librabbitmq sets Message.channel, which is a more reasonable solution in our opinion as that keeps the delivery info intact.

  • New option to wait for publish confirmations (Issue #3)

    There is now a new Connection confirm_publish that will force any basic_publish call to wait for confirmation.

    Enabling publisher confirms like this degrades performance considerably, but can be suitable for some applications and now it’s possible by configuration.

  • queue_declare now returns named tuple of type basic_declare_ok_t.

    Supporting fields: queue, message_count, and consumer_count.

  • Contents of Channel.returned_messages is now named tuples.

    Supporting fields: reply_code, reply_text, exchange, routing_key, and message.

  • Sockets now set to close on exec using the FD_CLOEXEC flag.

    Currently only supported on platforms supporting this flag, which does not include Windows.

    Contributed by Tommie Gannert.

1.2.1

release-date

2013-08-16 05:30 P.M UTC

release-by

Ask Solem

  • Adds promise type: amqp.utils.promise()

  • Merges fixes from 1.0.x

1.2.0

release-date

2012-11-12 04:00 P.M UTC

release-by

Ask Solem

  • New exception hierarchy:

    • AMQPError
      • ConnectionError
        • RecoverableConnectionError
          • ConsumerCancelled

          • ConnectionForced

          • ResourceError

        • IrrecoverableConnectionError
          • ChannelNotOpen

          • FrameError

          • FrameSyntaxError

          • InvalidCommand

          • InvalidPath

          • NotAllowed

          • UnexpectedFrame

          • AMQPNotImplementedError

          • InternalError

      • ChannelError
        • RecoverableChannelError
          • ContentTooLarge

          • NoConsumers

          • ResourceLocked

        • IrrecoverableChannelError
          • AccessRefused

          • NotFound

          • PreconditionFailed

1.1.0

release-date

2013-11-08 10:36 P.M UTC

release-by

Ask Solem

  • No longer supports Python 2.5

  • Fixed receiving of float table values.

  • Now Supports Python 3 and Python 2.6+ in the same source code.

  • Python 3 related fixes.

1.0.13

release-date

2013-07-31 04:00 P.M BST

release-by

Ask Solem

  • Fixed problems with the SSL transport (Issue #15).

    Fix contributed by Adrien Guinet.

  • Small optimizations

1.0.12

release-date

2013-06-25 02:00 P.M BST

release-by

Ask Solem

  • Fixed another Python 3 compatibility problem.

1.0.11

release-date

2013-04-11 06:00 P.M BST

release-by

Ask Solem

  • Fixed Python 3 incompatibility in amqp/transport.py.

1.0.10

release-date

2013-03-21 03:30 P.M UTC

release-by

Ask Solem

  • Fixed Python 3 incompatibility in amqp/serialization.py. (Issue #11).

1.0.9

release-date

2013-03-08 10:40 A.M UTC

release-by

Ask Solem

  • Publisher ack callbacks should now work after typo fix (Issue #9).

  • channel(explicit_id) will now claim that id from the array of unused channel ids.

  • Fixes Jython compatibility.

1.0.8

release-date

2013-02-08 01:00 P.M UTC

release-by

Ask Solem

  • Fixed SyntaxError on Python 2.5

1.0.7

release-date

2013-02-08 01:00 P.M UTC

release-by

Ask Solem

  • Workaround for bug on some Python 2.5 installations where (2**32) is 0.

  • Can now serialize the ARRAY type.

    Contributed by Adam Wentz.

  • Fixed tuple format bug in exception (Issue #4).

1.0.6

release-date

2012-11-29 01:14 P.M UTC

release-by

Ask Solem

  • Channel.close is now ignored if the connection attribute is None.

1.0.5

release-date

2012-11-21 04:00 P.M UTC

release-by

Ask Solem

  • Channel.basic_cancel is now ignored if the channel was already closed.

  • Channel.events is now a dict of sets:

    >>> channel.events['basic_return'].add(on_basic_return)
    >>> channel.events['basic_return'].discard(on_basic_return)
    

1.0.4

release-date

2012-11-13 04:00 P.M UTC

release-by

Ask Solem

  • Fixes Python 2.5 support

1.0.3

release-date

2012-11-12 04:00 P.M UTC

release-by

Ask Solem

  • Now can also handle float in headers/tables when receiving messages.

  • Now uses array.array to keep track of unused channel ids.

  • The METHOD_NAME_MAP has been updated for amqp/0.9.1 and Rabbit extensions.

  • Removed a bunch of accidentally included images.

1.0.2

release-date

2012-11-06 05:00 P.M UTC

release-by

Ask Solem

  • Now supports float values in headers/tables.

1.0.1

release-date

2012-11-05 01:00 P.M UTC

release-by

Ask Solem

  • Connection errors no longer includes AttributeError.

  • Fixed problem with using the SSL transport in a non-blocking context.

    Fix contributed by Mher Movsisyan.

1.0.0

release-date

2012-11-05 01:00 P.M UTC

release-by

Ask Solem

  • Channels are now restored on channel error, so that the connection does not have to closed.

Version 0.9.4

  • Adds support for exchange_bind and exchange_unbind.

    Contributed by Rumyana Neykova

  • Fixed bugs in funtests and demo scripts.

    Contributed by Rumyana Neykova

Version 0.9.3

  • Fixed bug that could cause the consumer to crash when reading large message payloads asynchronously.

  • Serialization error messages now include the invalid value.

Version 0.9.2

  • Consumer cancel notification support was broken (Issue #1)

    Fix contributed by Andrew Grangaard

Version 0.9.1

  • Supports draining events from multiple channels (Connection.drain_events)

  • Support for timeouts

  • Support for heartbeats
    • Connection.heartbeat_tick(rate=2) must called at regular intervals (half of the heartbeat value if rate is 2).

    • Or some other scheme by using Connection.send_heartbeat.

  • Supports RabbitMQ extensions:
    • Consumer Cancel Notifications
      • by default a cancel results in ChannelError being raised

      • but not if a on_cancel callback is passed to basic_consume.

    • Publisher confirms
      • Channel.confirm_select() enables publisher confirms.

      • Channel.events['basic_ack'].append(my_callback) adds a callback to be called when a message is confirmed. This callback is then called with the signature (delivery_tag, multiple).

  • Support for basic_return

  • Uses AMQP 0-9-1 instead of 0-8.
    • Channel.access_request and ticket arguments to methods removed.

    • Supports the arguments argument to basic_consume.

    • internal argument to exchange_declare removed.

    • auto_delete argument to exchange_declare deprecated

    • insist argument to Connection removed.

    • Channel.alerts has been removed.

    • Support for Channel.basic_recover_async.

    • Channel.basic_recover deprecated.

  • Exceptions renamed to have idiomatic names:
    • AMQPException -> AMQPError

    • AMQPConnectionException -> ConnectionError``

    • AMQPChannelException -> ChannelError``

    • Connection.known_hosts removed.

    • Connection no longer supports redirects.

    • exchange argument to queue_bind can now be empty to use the “default exchange”.

  • Adds Connection.is_alive that tries to detect whether the connection can still be used.

  • Adds Connection.connection_errors and .channel_errors, a list of recoverable errors.

  • Exposes the underlying socket as Connection.sock.

  • Adds Channel.no_ack_consumers to keep track of consumer tags that set the no_ack flag.

  • Slightly better at error recovery

Indices and tables