This document describes the current stable version of Kombu (4.2). For development docs, go here.
Examples¶
Hello World Example¶
Below example uses Simple Interface to send helloworld message through message broker (rabbitmq) and print received message
hello_publisher.py
:
from __future__ import absolute_import, unicode_literals
import datetime
from kombu import Connection
with Connection('amqp://guest:guest@localhost:5672//') as conn:
simple_queue = conn.SimpleQueue('simple_queue')
message = 'helloworld, sent at {0}'.format(datetime.datetime.today())
simple_queue.put(message)
print('Sent: {0}'.format(message))
simple_queue.close()
hello_consumer.py
:
from __future__ import absolute_import, unicode_literals, print_function
from kombu import Connection # noqa
with Connection('amqp://guest:guest@localhost:5672//') as conn:
simple_queue = conn.SimpleQueue('simple_queue')
message = simple_queue.get(block=True, timeout=1)
print('Received: {0}'.format(message.payload))
message.ack()
simple_queue.close()
Task Queue Example¶
Very simple task queue using pickle, with primitive support for priorities using different queues.
queues.py
:
from __future__ import absolute_import, unicode_literals
from kombu import Exchange, Queue
task_exchange = Exchange('tasks', type='direct')
task_queues = [Queue('hipri', task_exchange, routing_key='hipri'),
Queue('midpri', task_exchange, routing_key='midpri'),
Queue('lopri', task_exchange, routing_key='lopri')]
worker.py
:
from __future__ import absolute_import, unicode_literals
from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
from kombu.utils.functional import reprcall
from .queues import task_queues
logger = get_logger(__name__)
class Worker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=task_queues,
accept=['pickle', 'json'],
callbacks=[self.process_task])]
def process_task(self, body, message):
fun = body['fun']
args = body['args']
kwargs = body['kwargs']
logger.info('Got task: %s', reprcall(fun.__name__, args, kwargs))
try:
fun(*args, **kwargs)
except Exception as exc:
logger.error('task raised exception: %r', exc)
message.ack()
if __name__ == '__main__':
from kombu import Connection
from kombu.utils.debug import setup_logging
# setup root logger
setup_logging(loglevel='INFO', loggers=[''])
with Connection('amqp://guest:guest@localhost:5672//') as conn:
try:
worker = Worker(conn)
worker.run()
except KeyboardInterrupt:
print('bye bye')
tasks.py
:
from __future__ import absolute_import, unicode_literals
def hello_task(who='world'):
print('Hello {0}'.format(who))
client.py
:
from __future__ import absolute_import, unicode_literals
from kombu.pools import producers
from .queues import task_exchange
priority_to_routing_key = {
'high': 'hipri',
'mid': 'midpri',
'low': 'lopri',
}
def send_as_task(connection, fun, args=(), kwargs={}, priority='mid'):
payload = {'fun': fun, 'args': args, 'kwargs': kwargs}
routing_key = priority_to_routing_key[priority]
with producers[connection].acquire(block=True) as producer:
producer.publish(payload,
serializer='pickle',
compression='bzip2',
exchange=task_exchange,
declare=[task_exchange],
routing_key=routing_key)
if __name__ == '__main__':
from kombu import Connection
from .tasks import hello_task
connection = Connection('amqp://guest:guest@localhost:5672//')
send_as_task(connection, fun=hello_task, args=('Kombu',), kwargs={},
priority='high')