This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.3.

Connections and transports

Basics

To send and receive messages you need a transport and a connection. There are several transports to choose from (amqp, librabbitmq, redis, qpid, in-memory, etc.), and you can even create your own. The default transport is amqp.

Create a connection using the default transport:

>>> from kombu import Connection
>>> connection = Connection('amqp://guest:guest@localhost:5672//')

The connection will not be established yet, as the connection is established when needed. If you want to explicitly establish the connection you have to call the connect() method:

>>> connection.connect()

You can also check whether the connection is connected:

>>> connection.connected
True

Connections must always be closed after use:

>>> connection.close()

But best practice is to release the connection instead, this will release the resource if the connection is associated with a connection pool, or close the connection if not, and makes it easier to do the transition to connection pools later:

>>> connection.release()

Of course, the connection can be used as a context, and you are encouraged to do so as it makes it harder to forget releasing open resources:

with Connection() as connection:
    # work with connection

Debug Logs

Kombu exposes multiple environment variables that control debug logging for connection and channel logs. This is useful for situations where you want to debug Kombu or contribute to the project.

If KOMBU_LOG_CONNECTION is set to 1, debug logs are enabled for connections.

If KOMBU_LOG_CHANNEL is set to 1, debug logs are enabled for channels.

If KOMBU_LOG_DEBUG is set to 1, debug logs are enabled for both connections and channels.

Celery with SQS

SQS broker url doesn’t include queue_name_prefix by default. So we can use the following code snippet to make it work in celery.

from celery import Celery
def make_celery(app):
    celery = Celery(
        app.import_name,
        broker="sqs://",
        broker_transport_options={
            "queue_name_prefix": "{SERVICE_ENV}-{SERVICE_NAME}-"
        },
    )
    task_base = celery.Task

    class ContextTask(task_base):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return task_base.__call__(self, *args, **kwargs)

    celery.Task = ContextTask

    return celery

URLs

Connection parameters can be provided as a URL in the format:

transport://userid:password@hostname:port/virtual_host

All of these are valid URLs:

# Specifies using the amqp transport only, default values
# are taken from the keyword arguments.
amqp://

# Using Redis
redis://localhost:6379/

# Using Redis over a Unix socket
redis+socket:///tmp/redis.sock

# Using Redis sentinel
sentinel://sentinel1:26379;sentinel://sentinel2:26379

# Using Qpid
qpid://localhost/

# Using virtual host '/foo'
amqp://localhost//foo

# Using virtual host 'foo'
amqp://localhost/foo

# Using Pyro with name server running on 'localhost'
pyro://localhost/kombu.broker

The query part of the URL can also be used to set options, e.g.:

amqp://localhost/myvhost?ssl=1

See Keyword arguments for a list of supported options.

A connection without options will use the default connection settings, which is using the localhost host, default port, user name guest, password guest and virtual host “/”. A connection without arguments is the same as:

>>> Connection('amqp://guest:guest@localhost:5672//')

The default port is transport specific, for AMQP this is 5672.

Other fields may also have different meaning depending on the transport used. For example, the Redis transport uses the virtual_host argument as the redis database number.

Keyword arguments

The Connection class supports additional keyword arguments, these are:

hostname:

Default host name if not provided in the URL.

userid:

Default user name if not provided in the URL.

password:

Default password if not provided in the URL.

virtual_host:

Default virtual host if not provided in the URL.

port:

Default port if not provided in the URL.

transport:

Default transport if not provided in the URL. Can be a string specifying the path to the class. (e.g. kombu.transport.pyamqp:Transport), or one of the aliases: pyamqp, librabbitmq, redis, qpid, memory, and so on.

ssl:

Use SSL to connect to the server. Default is False. Only supported by the amqp and qpid transports.

insist:

Insist on connecting to a server. No longer supported, relic from AMQP 0.8

connect_timeout:

Timeout in seconds for connecting to the server. May not be supported by the specified transport.

transport_options:

A dict of additional connection arguments to pass to alternate kombu channel implementations. Consult the transport documentation for available options.

AMQP Transports

There are 4 transports available for AMQP use.

  1. pyamqp uses the pure Python library amqp, automatically installed with Kombu.

  2. librabbitmq uses the high performance transport written in C. This requires the librabbitmq Python package to be installed, which automatically compiles the C library.

  3. amqp tries to use librabbitmq but falls back to pyamqp.

  4. qpid uses the pure Python library qpid.messaging, automatically installed with Kombu. The Qpid library uses AMQP, but uses custom extensions specifically supported by the Apache Qpid Broker.

For the highest performance, you should install the librabbitmq package. To ensure librabbitmq is used, you can explicitly specify it in the transport URL, or use amqp to have the fallback.

Transport Comparison

Client

Type

Direct

Topic

Fanout

Priority

amqp

Native

Yes

Yes

Yes

Yes [3]

qpid

Native

Yes

Yes

Yes

No

redis

Virtual

Yes

Yes

Yes (PUB/SUB)

Yes

SQS

Virtual

Yes

Yes [1]

Yes [2]

No

zookeeper

Virtual

Yes

Yes [1]

No

Yes

in-memory

Virtual

Yes

Yes [1]

No

No

SLMQ

Virtual

Yes

Yes [1]

No

No

Transport Options

py-amqp

read_timeout:

Timeout for reading data from RabbitMQ.

write_timeout:

Timeout for writing data to RabbitMQ.