Sep 30, 2024

Built-in Brokers and Backends

The plugin comes with support for several brokers and backends out of the box. This page lists the supported vendors and their status.











Localstack (SQS)













Experimental or Beta status means it may be functional but are not confirmed to be production ready.

Enabled means that it is automatically added to the test Test Setup Matrix when running the test suite if the vendor dependencies are installed.


Enabling a new vendor will automatically add it globally to every test suite that relies on the default vendors detection. Be careful when enabling new vendors and make sure they are stable and production ready.

Built-in Celery Worker

The plugin provides a built-in Celery worker that can be used to run tests against. It uses the latest stable version of Celery and can be used, replaced or extended by the user.

The Dockerfile is published with the source code and can be found using WORKER_DOCKERFILE_ROOTDIR.

FROM python:3.10-slim-buster

# Create a user to run the worker
RUN adduser --disabled-password --gecos "" test_user

# Install system dependencies
RUN apt-get update && apt-get install -y build-essential \
    git \
    wget \
    make \
    curl \
    apt-utils \
    debconf \
    lsb-release \
    libmemcached-dev \
    libffi-dev \
    ca-certificates \
    pypy3 \
    pypy3-lib \

# Set arguments
ARG CELERY_WORKER_NAME=celery_test_worker



# Install Python dependencies
RUN pip install --no-cache-dir --upgrade \
    pip \
    celery[redis,pymemcache,gevent]${WORKER_VERSION:+==$WORKER_VERSION} \

# The workdir must be /app

COPY content/ .

# Switch to the test_user
USER test_user

# Start the celery worker
CMD celery -A app worker --loglevel=$LOG_LEVEL -n $WORKER_NAME@%h -Q $WORKER_QUEUE

Localstack (SQS) Broker

To use the Localstack broker, you will need add additional configuration to the test setup.

You may add this to to configure the Localstack broker.

import os

import pytest
from celery import Celery

from pytest_celery import LOCALSTACK_CREDS

def default_worker_env(default_worker_env: dict) -> dict:
    return default_worker_env

@pytest.fixture(scope="session", autouse=True)
def set_aws_credentials():

def default_worker_app(default_worker_app: Celery) -> Celery:
    app = default_worker_app
    if app.conf.broker_url and app.conf.broker_url.startswith("sqs"):
        app.conf.broker_transport_options["region"] = LOCALSTACK_CREDS["AWS_DEFAULT_REGION"]
    return app

And to enable the Localstack broker in the default Test Setup Matrix, add the following configuration to

from pytest_celery import ALL_CELERY_BROKERS
from pytest_celery import CELERY_LOCALSTACK_BROKER
from pytest_celery import CeleryTestBroker
from pytest_celery import _is_vendor_installed

if _is_vendor_installed("localstack"):

def celery_broker(request: pytest.FixtureRequest) -> CeleryTestBroker:  # type: ignore
    broker: CeleryTestBroker = request.getfixturevalue(request.param)
    yield broker

Custom Vendors

Injected brokers, backends and workers can extend the built-in ones or provide completely new ones. The plugin provides a set of base classes that can be used to implement custom vendors.

Custom Broker

class CeleryTestContainer, app: Celery = None)[source]

Bases: CeleryTestNode

This is specialized node type for handling celery brokers nodes. It is used to encapsulate a broker instance.

Responsibility Scope:

Handling broker specific requirements and configuration.

property app: Celery

Celery app for the node if available.

assert_log_does_not_exist(log: str, message: str = '', timeout: int = 1) None

Assert that a log does not exist in the container.

  • log (str) – Log to assert.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to 1.

assert_log_exists(log: str, message: str = '', timeout: int = 60) None

Assert that a log exists in the container.

  • log (str) – Log to assert.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to RESULT_TIMEOUT.

config(*args: tuple, **kwargs: dict) dict

Compile the configurations required for Celery from this node.

property container: CeleryTestContainer

Underlying container for the node.

classmethod default_config() dict[source]

Default node configurations if not overridden by the user.

hostname() str

Get the hostname of this node.

kill(signal: str | int = 'SIGKILL', reload_container: bool = True) None

Kill the underlying container.

  • signal (str | int, optional) – Signal to send to the container. Defaults to “SIGKILL”.

  • reload_container (bool, optional) – Reload the container object after killing it. Defaults to True.

logs() str

Get the logs of the underlying container.

name() str

Get the name of this node.

ready() bool

Waits until the node is ready or raise an exception if it fails to boot up.

restart(reload_container: bool = True, force: bool = False) None[source]

Override restart method to update the app broker url with new container values.

teardown() None

Teardown the node.

wait_for_log(log: str, message: str = '', timeout: int = 60) None

Wait for a log to appear in the container.

  • log (str) – Log to wait for.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to RESULT_TIMEOUT.

Custom Backend

class pytest_celery.api.backend.CeleryTestBackend(container: CeleryTestContainer, app: Celery = None)[source]

Bases: CeleryTestNode

This is specialized node type for handling celery backends nodes. It is used to encapsulate a backend instance.

Responsibility Scope:

Handling backend specific requirements and configuration.

property app: Celery

Celery app for the node if available.

assert_log_does_not_exist(log: str, message: str = '', timeout: int = 1) None

Assert that a log does not exist in the container.

  • log (str) – Log to assert.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to 1.

assert_log_exists(log: str, message: str = '', timeout: int = 60) None

Assert that a log exists in the container.

  • log (str) – Log to assert.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to RESULT_TIMEOUT.

config(*args: tuple, **kwargs: dict) dict

Compile the configurations required for Celery from this node.

property container: CeleryTestContainer

Underlying container for the node.

classmethod default_config() dict[source]

Default node configurations if not overridden by the user.

hostname() str

Get the hostname of this node.

kill(signal: str | int = 'SIGKILL', reload_container: bool = True) None

Kill the underlying container.

  • signal (str | int, optional) – Signal to send to the container. Defaults to “SIGKILL”.

  • reload_container (bool, optional) – Reload the container object after killing it. Defaults to True.

logs() str

Get the logs of the underlying container.

name() str

Get the name of this node.

ready() bool

Waits until the node is ready or raise an exception if it fails to boot up.

restart(reload_container: bool = True, force: bool = False) None[source]

Override restart method to update the app result backend with new container values.

teardown() None

Teardown the node.

wait_for_log(log: str, message: str = '', timeout: int = 60) None

Wait for a log to appear in the container.

  • log (str) – Log to wait for.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to RESULT_TIMEOUT.

Custom Worker

class pytest_celery.api.worker.CeleryTestWorker(container: CeleryTestContainer, app: Celery)[source]

Bases: CeleryTestNode

This is specialized node type for handling celery worker nodes. It is used to encapsulate a worker instance.

Responsibility Scope:

Managing a celery worker.

property app: Celery

Celery app for the node if available.

assert_log_does_not_exist(log: str, message: str = '', timeout: int = 1) None

Assert that a log does not exist in the container.

  • log (str) – Log to assert.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to 1.

assert_log_exists(log: str, message: str = '', timeout: int = 60) None

Assert that a log exists in the container.

  • log (str) – Log to assert.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to RESULT_TIMEOUT.

config(*args: tuple, **kwargs: dict) dict

Compile the configurations required for Celery from this node.

property container: CeleryTestContainer

Underlying container for the node.

classmethod default_config() dict

Default node configurations if not overridden by the user.

get_running_processes_info(columns: list[str] | None = None, filters: dict[str, str] | None = None) list[dict][source]

Get running processes info on the container of this node.

Possible columns:
  • pid

  • name

  • username

  • cmdline

  • cpu_percent

  • memory_percent

  • create_time

  • columns (list[str] | None, optional) – Columns to query. Defaults to None (all).

  • filters (dict[str, str] | None, optional) – Filters to apply. Defaults to None.


RuntimeError – If the command fails.


List of processes info per requested columns.

Return type:


hostname() str[source]

Hostname of the worker node.

kill(signal: str | int = 'SIGKILL', reload_container: bool = True) None

Kill the underlying container.

  • signal (str | int, optional) – Signal to send to the container. Defaults to “SIGKILL”.

  • reload_container (bool, optional) – Reload the container object after killing it. Defaults to True.

property log_level: str

Celery log level of this worker node.

logs() str

Get the logs of the underlying container.

name() str

Get the name of this node.

ready() bool

Waits until the node is ready or raise an exception if it fails to boot up.

restart(reload_container: bool = True, force: bool = False) None

Restart the underlying container.

  • reload_container (bool, optional) – Reload the container object after restarting it. Defaults to True.

  • force (bool, optional) – Kill the container before restarting it. Defaults to False.

teardown() None

Teardown the node.

property version: str

Celery version of this worker node.

wait_for_log(log: str, message: str = '', timeout: int = 60) None

Wait for a log to appear in the container.

  • log (str) – Log to wait for.

  • message (str, optional) – Message to display while waiting. Defaults to “”.

  • timeout (int, optional) – Timeout in seconds. Defaults to RESULT_TIMEOUT.

property worker_name: str

Celery test worker node name.

property worker_queue: str

Celery queue for this worker node.

Vendor Class

The Vendor Class is an optional mechanism for OOP style configuration of the plugin’s vendors. It allows registering a class that defines how does the vendor behave and configured.

The vendor class represents the vendor’s container class that is used automatically by the plugin.

The following diagram shows the relationship between the vendor class and the vendor’s infrastructure.

graph TD; Vendor[Vendor] --> BrokerComponent[Broker Component] Vendor --> BackendComponent[Backend Component] Vendor --> WorkerComponent[Worker Component] BrokerComponent --> Comp BackendComponent --> Comp WorkerComponent --> Comp subgraph Comp["Component"] Node[Node] --> Container[Container] end Comp --> DefaultFixtures[Default Fixtures] Comp --> VendorClass[Vendor Class] VendorClass -. "You are here" .-> VendorClass DefaultFixtures <.-> VendorClass style Vendor fill:#f9f,stroke:#333,stroke-width:4px style Comp fill:#ddf,stroke:#333,stroke-width:2px style Node fill:#eeffdd,stroke:#333 style Container fill:#ffffee,stroke:#333 style VendorClass fill:#ffeedd,stroke:#333

Use Cases


It is used only to override the built-in vendors containers.

Registering a Vendor Class

The plugin uses the vendor class to implement the default fixtures of the vendor. To override it, create your own vendor class and subclass the matching built-in vendor class to include the built-in fixtures implementation.

Worker Example
class MyWorkerContainer(CeleryWorkerContainer):
    def client(self) -> Any:
        return self

    def version(cls) -> str:
        return celery.__version__

    def log_level(cls) -> str:
        return "INFO"

    def worker_name(cls) -> str:
        return "my_tests_worker"

    def worker_queue(cls) -> str:
        return "my_tests_queue"

    def post_initialization_logic(self) -> None:

And then, register it using the matching default fixture.

def default_worker_container_cls() -> Type[CeleryWorkerContainer]:
    return MyWorkerContainer


The worker vendor requires another fixture to be registered to allow configuring the worker before it gets built.

def default_worker_container_session_cls() -> Type[CeleryWorkerContainer]:
    return MyWorkerContainer

There’s no session vendor class for other vendors.

Accessing the Vendor Class

Once a vendor class has been registered, it can be accessed using the Test Setup. Any additional API added to the class can be accessed as well.

For example,

def test_accessing_post_initialization_logic(celery_setup: CeleryTestSetup):
    worker: MyWorkerContainer = celery_setup.worker