This document describes the current stable version of Celery (5.1). For development docs, go here.

Source code for celery.contrib.testing.mocks

"""Useful mocks for unit testing."""
import numbers
from datetime import datetime, timedelta

try:
    from case import Mock
except ImportError:
    from unittest.mock import Mock


[docs]def TaskMessage( name, # type: str id=None, # type: str args=(), # type: Sequence kwargs=None, # type: Mapping callbacks=None, # type: Sequence[Signature] errbacks=None, # type: Sequence[Signature] chain=None, # type: Sequence[Signature] shadow=None, # type: str utc=None, # type: bool **options # type: Any ): # type: (...) -> Any """Create task message in protocol 2 format.""" kwargs = {} if not kwargs else kwargs from kombu.serialization import dumps from celery import uuid id = id or uuid() message = Mock(name=f'TaskMessage-{id}') message.headers = { 'id': id, 'task': name, 'shadow': shadow, } embed = {'callbacks': callbacks, 'errbacks': errbacks, 'chain': chain} message.headers.update(options) message.content_type, message.content_encoding, message.body = dumps( (args, kwargs, embed), serializer='json', ) message.payload = (args, kwargs, embed) return message
[docs]def TaskMessage1( name, # type: str id=None, # type: str args=(), # type: Sequence kwargs=None, # type: Mapping callbacks=None, # type: Sequence[Signature] errbacks=None, # type: Sequence[Signature] chain=None, # type: Squence[Signature] **options # type: Any ): # type: (...) -> Any """Create task message in protocol 1 format.""" kwargs = {} if not kwargs else kwargs from kombu.serialization import dumps from celery import uuid id = id or uuid() message = Mock(name=f'TaskMessage-{id}') message.headers = {} message.payload = { 'task': name, 'id': id, 'args': args, 'kwargs': kwargs, 'callbacks': callbacks, 'errbacks': errbacks, } message.payload.update(options) message.content_type, message.content_encoding, message.body = dumps( message.payload, ) return message
[docs]def task_message_from_sig(app, sig, utc=True, TaskMessage=TaskMessage): # type: (Celery, Signature, bool, Any) -> Any """Create task message from :class:`celery.Signature`. Example: >>> m = task_message_from_sig(app, add.s(2, 2)) >>> amqp_client.basic_publish(m, exchange='ex', routing_key='rkey') """ sig.freeze() callbacks = sig.options.pop('link', None) errbacks = sig.options.pop('link_error', None) countdown = sig.options.pop('countdown', None) if countdown: eta = app.now() + timedelta(seconds=countdown) else: eta = sig.options.pop('eta', None) if eta and isinstance(eta, datetime): eta = eta.isoformat() expires = sig.options.pop('expires', None) if expires and isinstance(expires, numbers.Real): expires = app.now() + timedelta(seconds=expires) if expires and isinstance(expires, datetime): expires = expires.isoformat() return TaskMessage( sig.task, id=sig.id, args=sig.args, kwargs=sig.kwargs, callbacks=[dict(s) for s in callbacks] if callbacks else None, errbacks=[dict(s) for s in errbacks] if errbacks else None, eta=eta, expires=expires, utc=utc, **sig.options )