This document describes the current stable version of pytest_celery (1.0). For development docs, go here.

hybrid_setup

Release:

1.0

Date:

May 16, 2024

Description

The purpose of this example is to demonstrate a more complex setup with multiple components. The example is using two brokers, with the failover feature, a backend and multiple workers of different pools and versions. One of the workers is using gevent with the latest Celery release on the default queue, while the other is using prefork with Celery 4 and its own queue.

It uses the following workflow to utilize both workers:

canvas = (
    group(
        identity.si("Hello, "),
        identity.si("world!"),
    )
    | noop.s().set(queue="legacy")
    | identity.si("Done!")
)

Highlights

  1. No default components.

  2. Session broker and backend components.
    • Shared between tests, but not between pytest-xdist sessions.

    • Only the workers are created again for each test case.

  3. Injects tasks and signal handlers modules to all workers.

This example is based on,

Breakdown

File Structure

The following diagram lists the relevant files in the project.

hybrid_setup/
├── requirements.txt
└── tests/
    ├── conftest.py
    ├── test_hybrid_setup.py
    └── vendors/
        ├── __init__.py
        ├── memcached.py
        ├── rabbitmq.py
        └── workers/
            ├── __init__.py
            ├── gevent.Dockerfile
            ├── gevent.py
            ├── legacy.Dockerfile
            ├── legacy.py
            ├── signals.py
            └── tasks.py

requirements.txt

Take a look at the requirements file for this example:

examples.hybrid_setup.requirements.txt
pytest>=7.4.4
pytest-xdist>=3.5.0
pytest-subtests>=0.11.0
pytest-rerunfailures>=14.0
celery[gevent]
pytest-celery[all]@git+https://github.com/celery/pytest-celery.git

Take note the gevent can be installed independently from the celery package.

conftest.py

The conftest.py file will be used to aggregate each individual configuration. To understand how it works, we’ll split the file into three parts.

  1. Creating the docker network for the components.

  2. Configuring the broker, backend and workers for the setup.

  3. Injecting the tasks and signal handlers modules.

examples.hybrid_setup.tests.conftest.py

hybrid_setup_example_network = network(scope="session")


@pytest.fixture
def celery_broker_cluster(
    session_rabbitmq_broker: RabbitMQTestBroker,
    session_failover_broker: RabbitMQTestBroker,
) -> CeleryBrokerCluster:
    """This is like setting broker_url to
    "session_rabbitmq_broker;session_failover_broker"."""
    cluster = CeleryBrokerCluster(
        session_rabbitmq_broker,
        session_failover_broker,
    )
    yield cluster
    cluster.teardown()


@pytest.fixture
def celery_backend_cluster(session_memcached_backend: MemcachedTestBackend) -> CeleryBackendCluster:
    cluster = CeleryBackendCluster(session_memcached_backend)
    yield cluster
    cluster.teardown()


@pytest.fixture
def celery_worker_cluster(
    gevent_worker: CeleryTestWorker,
    legacy_worker: CeleryTestWorker,
) -> CeleryWorkerCluster:
    cluster = CeleryWorkerCluster(gevent_worker, legacy_worker)
    yield cluster
    cluster.teardown()


@pytest.fixture
def default_worker_tasks(default_worker_tasks: set) -> set:
    from tests.vendors.workers import tasks

    default_worker_tasks.add(tasks)
    return default_worker_tasks


@pytest.fixture
def default_worker_signals(default_worker_signals: set) -> set:
    from tests.vendors.workers import signals

    default_worker_signals.add(signals)
    return default_worker_signals

test_hybrid_setup.py

Every test case that uses the celery_setup fixture will run its scenario on the setup that was configured in the conftest.py file.

For this example, we have the following test cases.

examples.hybrid_setup.tests.test_hybrid_setup.py
    def test_ping(self, celery_setup: CeleryTestSetup):
        assert ping.s().delay().get(timeout=RESULT_TIMEOUT) == "pong"

    def test_job(self, celery_setup: CeleryTestSetup):
        assert job.s().delay().get(timeout=RESULT_TIMEOUT) == "Done!"

    def test_signal(self, celery_setup: CeleryTestSetup):
        celery_setup.worker.assert_log_exists("Worker init handler called!")

    def test_failover(
        self,
        celery_setup: CeleryTestSetup,
        gevent_worker: CeleryTestWorker,
        legacy_worker: CeleryTestWorker,
        session_failover_broker: RabbitMQTestBroker,
        subtests: SubTests,
    ):
        with subtests.test(msg="Kill the main broker"):
            celery_setup.broker.kill()

        with subtests.test(msg="Manually assert the workers"):
            gevent_worker.assert_log_exists("Will retry using next failover.")
            legacy_worker.assert_log_exists("Will retry using next failover.")

        with subtests.test(msg="Use the celery setup to assert the workers"):
            worker: CeleryTestWorker
            for worker in celery_setup.worker_cluster:
                log = f"Connected to amqp://guest:**@{session_failover_broker.hostname()}:5672//"
                worker.assert_log_exists(log)

        with subtests.test(msg="Verify that the workers are still working (publish tasks)"):
            assert job.s().delay().get(timeout=RESULT_TIMEOUT) == "Done!"

Tip

The components themselves can be used in the test case to easily access their attributes and methods, like shown in the failover test case. When used without the celery_setup fixture, the components will run independently and might not be aware of each other.

rabbitmq.py and memcached.py

The brokers and result backend are defined as independent components that are being configured into the setup using the conftest.py file. They add session scope fixtures and integrate using the matching node class.

Main | Failover Brokers

examples.hybrid_setup.tests.vendors.rabbitmq.py
import pytest
from pytest_docker_tools import container
from pytest_docker_tools import fetch

from pytest_celery import RABBITMQ_CONTAINER_TIMEOUT
from pytest_celery import RABBITMQ_ENV
from pytest_celery import RABBITMQ_IMAGE
from pytest_celery import RABBITMQ_PORTS
from pytest_celery import RabbitMQContainer
from pytest_celery import RabbitMQTestBroker

rabbitmq_image = fetch(repository=RABBITMQ_IMAGE)

rabbitmq_test_container = container(
    # name="Main RabbitMQ Broker (session)",  # Optional | Incompatible with parallel execution
    image="{rabbitmq_image.id}",
    scope="session",
    ports=RABBITMQ_PORTS,
    environment=RABBITMQ_ENV,
    network="{hybrid_setup_example_network.name}",
    wrapper_class=RabbitMQContainer,
    timeout=RABBITMQ_CONTAINER_TIMEOUT,
)


@pytest.fixture
def session_rabbitmq_broker(rabbitmq_test_container: RabbitMQContainer) -> RabbitMQTestBroker:
    broker = RabbitMQTestBroker(rabbitmq_test_container)
    yield broker
    broker.teardown()


failover_test_container = container(
    # name="Failover RabbitMQ Broker (session)",  # Optional | Incompatible with parallel execution
    image="{rabbitmq_image.id}",
    scope="session",
    ports=RABBITMQ_PORTS,
    environment=RABBITMQ_ENV,
    network="{hybrid_setup_example_network.name}",
    wrapper_class=RabbitMQContainer,
    timeout=RABBITMQ_CONTAINER_TIMEOUT,
)


@pytest.fixture
def session_failover_broker(failover_test_container: RabbitMQContainer) -> RabbitMQTestBroker:
    broker = RabbitMQTestBroker(failover_test_container)
    yield broker
    broker.teardown()

Result Backend

examples.hybrid_setup.tests.vendors.memcached.py
import pytest
from pytest_docker_tools import container
from pytest_docker_tools import fetch

from pytest_celery import MEMCACHED_CONTAINER_TIMEOUT
from pytest_celery import MEMCACHED_ENV
from pytest_celery import MEMCACHED_IMAGE
from pytest_celery import MEMCACHED_PORTS
from pytest_celery import MemcachedContainer
from pytest_celery import MemcachedTestBackend

memcached_image = fetch(repository=MEMCACHED_IMAGE)
memcached_test_container = container(
    # name="Memcached-Session-Backend",  # Optional | Incompatible with parallel execution
    image="{memcached_image.id}",
    scope="session",
    ports=MEMCACHED_PORTS,
    environment=MEMCACHED_ENV,
    network="{hybrid_setup_example_network.name}",
    wrapper_class=MemcachedContainer,
    timeout=MEMCACHED_CONTAINER_TIMEOUT,
)


@pytest.fixture
def session_memcached_backend(memcached_test_container: MemcachedContainer) -> MemcachedTestBackend:
    backend = MemcachedTestBackend(memcached_test_container)
    yield backend
    backend.teardown()

gevent.py and gevent.Dockerfile

These files are taken from the test_gevent_pool.py example with one simple change.

RUN pip install "celery[gevent]" "pytest-celery[all]==1.0.0b4"

The Dockerfile doesn’t use the requirements file, but instead installs the packages directly.

examples.hybrid_setup.tests.vendors.workers.gevent.Dockerfile
FROM python:3.11-bookworm

# Create a user to run the worker
RUN adduser --disabled-password --gecos "" test_user

# Install system dependencies
RUN apt-get update && apt-get install -y build-essential git libevent-dev

# Set arguments
ARG CELERY_LOG_LEVEL=INFO
ARG CELERY_WORKER_NAME=my_worker
ARG CELERY_WORKER_QUEUE=celery
ENV LOG_LEVEL=$CELERY_LOG_LEVEL
ENV WORKER_NAME=$CELERY_WORKER_NAME
ENV WORKER_QUEUE=$CELERY_WORKER_QUEUE

# Install packages
RUN pip install --no-cache-dir --upgrade pip
RUN pip install "celery[gevent]" "pytest-celery[all]==1.0.0b4"

# The workdir must be /app
WORKDIR /app

# Switch to the test_user
USER test_user

# Start the celery worker
CMD celery -A app worker --loglevel=$LOG_LEVEL -n $WORKER_NAME@%h -Q $WORKER_QUEUE

Note

The test_gevent_pool.py example defines everything in the test file. Here we use the gevent.py file.

examples.hybrid_setup.tests.vendors.workers.gevent.py
import pytest
from celery import Celery
from pytest_docker_tools import build
from pytest_docker_tools import container
from pytest_docker_tools import fxtr

from pytest_celery import CeleryTestWorker
from pytest_celery import CeleryWorkerContainer
from pytest_celery import defaults


class GeventWorkerContainer(CeleryWorkerContainer):
    @classmethod
    def command(cls, *args: str) -> list[str]:
        return super().command("-P", "gevent", "-c", "1000")


gevent_worker_image = build(
    path=".",
    dockerfile="tests/vendors/workers/gevent.Dockerfile",
    tag="pytest-celery/examples/hybrid_setup:gevent",
    buildargs=GeventWorkerContainer.buildargs(),
)


gevent_worker_container = container(
    image="{gevent_worker_image.id}",
    environment=fxtr("default_worker_env"),
    network="{hybrid_setup_example_network.name}",
    volumes={"{default_worker_volume.name}": defaults.DEFAULT_WORKER_VOLUME},
    wrapper_class=GeventWorkerContainer,
    timeout=defaults.DEFAULT_WORKER_CONTAINER_TIMEOUT,
    command=GeventWorkerContainer.command(),
)


@pytest.fixture
def gevent_worker(gevent_worker_container: GeventWorkerContainer, celery_setup_app: Celery) -> CeleryTestWorker:
    worker = CeleryTestWorker(gevent_worker_container, app=celery_setup_app)
    yield worker
    worker.teardown()

legacy.py and legacy.Dockerfile

The “legacy” worker is basically Celery 4 worker with the prefork pool. Very similar to the gevent worker, we add a new Dockerfile and worker module.

examples.hybrid_setup.tests.vendors.workers.legacy.Dockerfile
# Celery 4 does not support >3.10-bookworm
FROM python:3.10-bookworm

# Create a user to run the worker
RUN adduser --disabled-password --gecos "" test_user

# Install system dependencies
RUN apt-get update && apt-get install -y build-essential git

# Set arguments
ARG CELERY_LOG_LEVEL=INFO
ARG CELERY_WORKER_NAME=my_worker
ARG CELERY_WORKER_QUEUE=celery
ENV LOG_LEVEL=$CELERY_LOG_LEVEL
ENV WORKER_NAME=$CELERY_WORKER_NAME
ENV WORKER_QUEUE=$CELERY_WORKER_QUEUE

# Install packages
RUN pip install --no-cache-dir --upgrade pip
RUN pip install celery==4.4.7 "pytest-celery[all]==1.0.0b4"

# The workdir must be /app
WORKDIR /app

# Switch to the test_user
USER test_user

# Start the celery worker
CMD celery -A app worker --loglevel=$LOG_LEVEL -n $WORKER_NAME@%h -Q $WORKER_QUEUE
examples.hybrid_setup.tests.vendors.workers.legacy.py
import pytest
from celery import Celery
from pytest_docker_tools import build
from pytest_docker_tools import container
from pytest_docker_tools import fxtr

from pytest_celery import CeleryTestWorker
from pytest_celery import CeleryWorkerContainer
from pytest_celery import defaults


class LegacyWorkerContainer(CeleryWorkerContainer):
    @classmethod
    def version(cls) -> str:
        return "4.4.7"

    @classmethod
    def worker_queue(cls) -> str:
        return "legacy"


legacy_worker_image = build(
    path=".",
    dockerfile="tests/vendors/workers/legacy.Dockerfile",
    tag="pytest-celery/examples/hybrid_setup:legacy",
    buildargs=LegacyWorkerContainer.buildargs(),
)


legacy_worker_container = container(
    image="{legacy_worker_image.id}",
    environment=fxtr("default_worker_env"),
    network="{hybrid_setup_example_network.name}",
    volumes={"{default_worker_volume.name}": defaults.DEFAULT_WORKER_VOLUME},
    wrapper_class=LegacyWorkerContainer,
    timeout=defaults.DEFAULT_WORKER_CONTAINER_TIMEOUT,
    command=LegacyWorkerContainer.command(),
)


@pytest.fixture
def legacy_worker(legacy_worker_container: LegacyWorkerContainer, celery_setup_app: Celery) -> CeleryTestWorker:
    worker = CeleryTestWorker(legacy_worker_container, app=celery_setup_app)
    yield worker
    worker.teardown()

Tip

Check all of the configurations above again and notice the usage of hybrid_setup_example_network. See how both session and non-session fixtures are sharing the same session docker network.

tasks.py and signals.py

The tasks and signal handlers are being injected into the workers using the conftest.py file, according to the documentation:

  1. How to add tasks.

  2. How to connect signal handlers.

The files themselves are very simple,

examples.hybrid_setup.tests.vendors.workers.tasks.py
import celery.utils
from celery import shared_task
from celery.canvas import group

from pytest_celery import RESULT_TIMEOUT


@shared_task
def noop(*args, **kwargs) -> None:
    return celery.utils.noop(*args, **kwargs)


@shared_task
def identity(x):
    return x


@shared_task
def job() -> str:
    canvas = (
        group(
            identity.si("Hello, "),
            identity.si("world!"),
        )
        | noop.s().set(queue="legacy")
        | identity.si("Done!")
    )
    return canvas.delay().get(timeout=RESULT_TIMEOUT)
examples.hybrid_setup.tests.vendors.workers.signals.py
from __future__ import annotations

from celery.signals import worker_init


@worker_init.connect
def worker_init_handler(sender, **kwargs):
    print("Worker init handler called!")

And again, from confest.py,

@pytest.fixture
def default_worker_tasks(default_worker_tasks: set) -> set:
    from tests.vendors.workers import tasks

    default_worker_tasks.add(tasks)
    return default_worker_tasks


@pytest.fixture
def default_worker_signals(default_worker_signals: set) -> set:
    from tests.vendors.workers import signals

    default_worker_signals.add(signals)
    return default_worker_signals

Note

The tasks and signals are being injected into the workers that use the default volume with:

volumes={"{default_worker_volume.name}": defaults.DEFAULT_WORKER_VOLUME},

Both our workers are using the default volume, so we only need to inject the tasks and signals once.