This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.3.

Examples

Hello World Example

Below example uses Simple Interface to send helloworld message through message broker (rabbitmq) and print received message

hello_publisher.py:

from __future__ import annotations

import datetime

from kombu import Connection

with Connection('amqp://guest:guest@localhost:5672//') as conn:
    simple_queue = conn.SimpleQueue('simple_queue')
    message = f'helloworld, sent at {datetime.datetime.today()}'
    simple_queue.put(message)
    print(f'Sent: {message}')
    simple_queue.close()

hello_consumer.py:

from __future__ import annotations

from kombu import Connection

with Connection('amqp://guest:guest@localhost:5672//') as conn:
    simple_queue = conn.SimpleQueue('simple_queue')
    message = simple_queue.get(block=True, timeout=1)
    print(f'Received: {message.payload}')
    message.ack()
    simple_queue.close()

Task Queue Example

Very simple task queue using pickle, with primitive support for priorities using different queues.

queues.py:

from __future__ import annotations

from kombu import Exchange, Queue

task_exchange = Exchange('tasks', type='direct')
task_queues = [Queue('hipri', task_exchange, routing_key='hipri'),
               Queue('midpri', task_exchange, routing_key='midpri'),
               Queue('lopri', task_exchange, routing_key='lopri')]

worker.py:

from __future__ import annotations

from kombu.log import get_logger
from kombu.mixins import ConsumerMixin
from kombu.utils.functional import reprcall

from .queues import task_queues

logger = get_logger(__name__)


class Worker(ConsumerMixin):

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=task_queues,
                         accept=['pickle', 'json'],
                         callbacks=[self.process_task])]

    def process_task(self, body, message):
        fun = body['fun']
        args = body['args']
        kwargs = body['kwargs']
        logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs))
        try:
            fun(*args, **kwargs)
        except Exception as exc:
            logger.error('task raised exception: %r', exc)
        message.ack()


if __name__ == '__main__':
    from kombu import Connection
    from kombu.utils.debug import setup_logging

    # setup root logger
    setup_logging(loglevel='INFO', loggers=[''])

    with Connection('amqp://guest:guest@localhost:5672//') as conn:
        try:
            worker = Worker(conn)
            worker.run()
        except KeyboardInterrupt:
            print('bye bye')

tasks.py:

from __future__ import annotations


def hello_task(who='world'):
    print(f'Hello {who}')

client.py:

from __future__ import annotations

from kombu.pools import producers

from .queues import task_exchange

priority_to_routing_key = {
    'high': 'hipri',
    'mid': 'midpri',
    'low': 'lopri',
}


def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):
    payload = {'fun': fun, 'args': args, 'kwargs': kwargs}
    routing_key = priority_to_routing_key[priority]

    with producers[connection].acquire(block=True) as producer:
        producer.publish(payload,
                         serializer='pickle',
                         compression='bzip2',
                         exchange=task_exchange,
                         declare=[task_exchange],
                         routing_key=routing_key)


if __name__ == '__main__':
    from kombu import Connection

    from .tasks import hello_task

    connection = Connection('amqp://guest:guest@localhost:5672//')
    send_as_task(connection, fun=hello_task, args=('Kombu',), kwargs={},
                 priority='high')

Native Delayed Delivery

This example demonstrates how to declare native delayed delivery queues and exchanges and publish a message using the native delayed delivery mechanism.

delayed_infra.py:

from __future__ import annotations

from examples.experimental.async_consume import queue
from kombu import Connection, Exchange, Queue
from kombu.transport.native_delayed_delivery import (
    bind_queue_to_native_delayed_delivery_exchange, calculate_routing_key,
    declare_native_delayed_delivery_exchanges_and_queues, level_name)

with Connection('amqp://guest:guest@localhost:5672//') as connection:
    declare_native_delayed_delivery_exchanges_and_queues(connection, 'quorum')

    destination_exchange = Exchange(
        'destination', type='topic')
    destination_queue = Queue("destination", exchange=destination_exchange)
    bind_queue_to_native_delayed_delivery_exchange(connection, queue)

    channel = connection.channel()
    with connection.Producer(channel=channel) as producer:
        routing_key = calculate_routing_key(30, 'destination')
        producer.publish(
            "delayed msg",
            routing_key=routing_key,
            exchange=level_name(27)
        )