This document describes the current stable version of Kombu (5.3). For development docs, go here.

Automatic Failover

Automatic failover is functionality for connecting to clustered broker. Application using automatic failover should be able to automatically connect to healthy node and react to unexpected failure of node in cluster.

Connection failover

The Connection is accepting multiple URLs to several brokers. During connecting to broker, kombu is automatically picking the healthy node from the list. In the example below, kombu uses broker:

>>> conn = Connection(
...     'amqp://;'
... )
>>> conn.connect()
>>> conn
<Connection: amqp://guest:** at 0x6fffff751710>

Connection also accepts failover_strategy parameter which defines the strategy of trying the nodes:

>>> Connection(
...     'amqp://;amqp://',
...     failover_strategy='round-robin'
... )

The current list of available failver strategies is defined in kombu.connection module:

>>> import kombu
>>> kombu.connection.failover_strategies
{'round-robin': <class 'itertools.cycle'>, 'shuffle': <function shufflecycle at 0x6fffff8547a0>}

Failover during connection handle only failover during calling connect() method of Connection.

Operation failover

Failover of connection using multiple connection strings in Connection solves problem when broker is unavailable during creating new connection. But in real world these connections are long lived and hence it is possible that broker fails during lifetime of connection. For this scenario retrying of operation executed against broker is needed. Retrying ensures that failed operation triggers new connection to healthy broker and re-execution of failed operation.

Failover is implemented in ensure() method which tries to execute the function. When contacting broker fails, it reconnects the underlying connection and re-executes the function again. The following example is ensuring that publish() method is re-executed when errors occurred:

>>> from kombu import Connection, Producer
>>> conn = Connection('amqp://')
>>> producer = Producer(conn)
>>> def errback(exc, interval):
...     logger.error('Error: %r', exc, exc_info=1)
...'Retry in %s seconds.', interval)
>>> publish = conn.ensure(producer, producer.publish,
...                       errback=errback, max_retries=3)
>>> publish({'hello': 'world'}, routing_key='dest')

Some methods are accepting channel as a parameter, e.g. declare(). Since channel is passed as parameter, it is not refreshed automatically during failover and hence retrying calling of method fails. In this scenarios autoretry() needs to be used which automatically passes channel and refresh it during failover:

>>> import kombu
>>> conn = kombu.Connection('amqp://broker1:5672;amqp://broker2:5672')
>>> conn.connect()
>>> q = kombu.Queue('test_queue')

>>> declare = conn.autoretry(q.declare)
>>> declare()


publish() can have automatic failover using ensure() as mentioned before. Moreover, it contains retry parameter as a shortcut for retrying. The following example is retrying publishing when error occurs:

>>> from kombu import *
>>> with Connection('amqp://broker1:5672;amqp://broker2:5672') as conn:
...     with as channel:
...         producer = conn.Producer()
...         producer = Producer(channel)
...         producer.publish(
...             {'hello': 'world'}, routing_key='queue', retry=True
...         )


Consumer with failover functionality can be implemented using following function:

>>> def consume():
...     while True:
...         try:
...             conn.drain_events(timeout=1)
...         except socket.timeout:
...             pass

This function is draining events in infinite loop with timeout to avoid blocked connections of unavailable broker. Consumer with failover is implemented by wrapping consume function using ensure() method:

>>> consume = conn.ensure(conn, consume)
>>> consume()

The full example implementing consumer with failover is as follows:

>>> from kombu import *
>>> import socket

>>> def callback(body, message):
...     print(body)
...     message.ack()

>>> queue = Queue('queue', routing_key='queue')
>>> with Connection('amqp://broker1:5672;amqp://broker2:5672') as conn:
...     def consume():
...         while True:
...             try:
...                 conn.drain_events(timeout=1)
...             except socket.timeout:
...                 pass
...     with as channel:
...         consumer = Consumer(channel, queue)
...         consumer.register_callback(callback)
...         with consumer:
...             while True:
...                 consume = conn.ensure(conn, consume)
...                 consume()

When implementing consumer as ConsumerMixin, the failover functionality is by wrapping consume method with ensure():

>>> from kombu import *
>>> from kombu.mixins import ConsumerMixin

>>> class C(ConsumerMixin):
...     def __init__(self, connection):
...         self.connection = connection
...     def get_consumers(self, Consumer, channel):
...         return [
...             Consumer(
...                  [Queue('queue', routing_key='queue')],
...                  callbacks=[self.on_message], accept=['json']
...             ),
...         ]
...     def on_message(self, body, message):
...         print('RECEIVED MESSAGE: {0!r}'.format(body))
...         message.ack()
...     def consume(self, *args, **kwargs):
...         consume = conn.ensure(conn, super().consume)
...         return consume(*args, **kwargs)

>>> with Connection('amqp://broker1:5672;amqp://broker2:5672') as conn:
...     C(conn).run()