This document describes the current stable version of Celery (5.5). For development docs, go here.

celery.contrib.testing.mocks

API Reference

Useful mocks for unit testing.

celery.contrib.testing.mocks.ContextMock(*args, **kwargs)[source]

Mock that mocks with statement contexts.

celery.contrib.testing.mocks.TaskMessage(name: str, id: str | None = None, args: Sequence = (), kwargs: Mapping | None = None, callbacks: Sequence[Signature] | None = None, errbacks: Sequence[Signature] | None = None, chain: Sequence[Signature] | None = None, shadow: str | None = None, utc: bool | None = None, **options: Any) Any[source]

Create task message in protocol 2 format.

celery.contrib.testing.mocks.TaskMessage1(name: str, id: str | None = None, args: Sequence = (), kwargs: Mapping | None = None, callbacks: Sequence[Signature] | None = None, errbacks: Sequence[Signature] | None = None, chain: Sequence[Signature] | None = None, **options: Any) Any[source]

Create task message in protocol 1 format.

celery.contrib.testing.mocks.task_message_from_sig(app: ~celery.app.base.Celery, sig: ~celery.canvas.Signature, utc: bool = True, TaskMessage: ~typing.Any = <function TaskMessage>) Any[source]

Create task message from celery.Signature.

Example

>>> m = task_message_from_sig(app, add.s(2, 2))
>>> amqp_client.basic_publish(m, exchange='ex', routing_key='rkey')