This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.3.
Examples¶
Hello World Example¶
Below example uses Simple Interface to send helloworld message through message broker (rabbitmq) and print received message
hello_publisher.py
:
from __future__ import annotations
import datetime
from kombu import Connection
with Connection('amqp://guest:guest@localhost:5672//') as conn:
simple_queue = conn.SimpleQueue('simple_queue')
message = f'helloworld, sent at {datetime.datetime.today()}'
simple_queue.put(message)
print(f'Sent: {message}')
simple_queue.close()
hello_consumer.py
:
from __future__ import annotations
from kombu import Connection
with Connection('amqp://guest:guest@localhost:5672//') as conn:
simple_queue = conn.SimpleQueue('simple_queue')
message = simple_queue.get(block=True, timeout=1)
print(f'Received: {message.payload}')
message.ack()
simple_queue.close()
Task Queue Example¶
Very simple task queue using pickle, with primitive support for priorities using different queues.
queues.py
:
from __future__ import annotations
from kombu import Exchange, Queue
task_exchange = Exchange('tasks', type='direct')
task_queues = [Queue('hipri', task_exchange, routing_key='hipri'),
Queue('midpri', task_exchange, routing_key='midpri'),
Queue('lopri', task_exchange, routing_key='lopri')]
worker.py
:
from __future__ import annotations
from kombu.log import get_logger
from kombu.mixins import ConsumerMixin
from kombu.utils.functional import reprcall
from .queues import task_queues
logger = get_logger(__name__)
class Worker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=task_queues,
accept=['pickle', 'json'],
callbacks=[self.process_task])]
def process_task(self, body, message):
fun = body['fun']
args = body['args']
kwargs = body['kwargs']
logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs))
try:
fun(*args, **kwargs)
except Exception as exc:
logger.error('task raised exception: %r', exc)
message.ack()
if __name__ == '__main__':
from kombu import Connection
from kombu.utils.debug import setup_logging
# setup root logger
setup_logging(loglevel='INFO', loggers=[''])
with Connection('amqp://guest:guest@localhost:5672//') as conn:
try:
worker = Worker(conn)
worker.run()
except KeyboardInterrupt:
print('bye bye')
tasks.py
:
from __future__ import annotations
def hello_task(who='world'):
print(f'Hello {who}')
client.py
:
from __future__ import annotations
from kombu.pools import producers
from .queues import task_exchange
priority_to_routing_key = {
'high': 'hipri',
'mid': 'midpri',
'low': 'lopri',
}
def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):
payload = {'fun': fun, 'args': args, 'kwargs': kwargs}
routing_key = priority_to_routing_key[priority]
with producers[connection].acquire(block=True) as producer:
producer.publish(payload,
serializer='pickle',
compression='bzip2',
exchange=task_exchange,
declare=[task_exchange],
routing_key=routing_key)
if __name__ == '__main__':
from kombu import Connection
from .tasks import hello_task
connection = Connection('amqp://guest:guest@localhost:5672//')
send_as_task(connection, fun=hello_task, args=('Kombu',), kwargs={},
priority='high')
Native Delayed Delivery¶
This example demonstrates how to declare native delayed delivery queues and exchanges and publish a message using the native delayed delivery mechanism.
delayed_infra.py
:
from __future__ import annotations
from examples.experimental.async_consume import queue
from kombu import Connection, Exchange, Queue
from kombu.transport.native_delayed_delivery import (
bind_queue_to_native_delayed_delivery_exchange, calculate_routing_key,
declare_native_delayed_delivery_exchanges_and_queues, level_name)
with Connection('amqp://guest:guest@localhost:5672//') as connection:
declare_native_delayed_delivery_exchanges_and_queues(connection, 'quorum')
destination_exchange = Exchange(
'destination', type='topic')
destination_queue = Queue("destination", exchange=destination_exchange)
bind_queue_to_native_delayed_delivery_exchange(connection, queue)
channel = connection.channel()
with connection.Producer(channel=channel) as producer:
routing_key = calculate_routing_key(30, 'destination')
producer.publish(
"delayed msg",
routing_key=routing_key,
exchange=level_name(27)
)