This document is for pytest_celery's development version, which can be significantly different from previous releases. Get the stable docs here: 1.1.
myworker¶
- Release:
1.2
- Date:
Feb 21, 2025
Description¶
This example project demonstrates how to inject a custom Celery worker into the testing environment.
Breakdown¶
File Structure¶
The following diagram lists the relevant files in the project.
myworker/
├── tests/
│ ├── myworker/
│ │ └── __init__.py
│ │ └── Dockerfile.py
│ │ └── myworker.py
│ ├── __init__.py
│ ├── conftest.py
│ └── test_myworker.py
└── requirements.txt
tests/myworker¶
The tests/myworker
package contains the custom Celery worker that will be injected into the testing environment.
It uses a simple Dockerfile to build the latest Celery version from git.
Dockerfile¶
FROM python:3.11-bookworm
test_user
is created to run the worker.
# Create a user to run the worker
RUN adduser --disabled-password --gecos "" test_user
# Install system dependencies
RUN apt-get update && apt-get install -y build-essential git
CELERY_LOG_LEVEL
, CELERY_WORKER_NAME
and CELERY_WORKER_QUEUE
are set as build arguments.
These will be used to configure the worker for the tests.
# Set arguments
ARG CELERY_LOG_LEVEL=INFO
ARG CELERY_WORKER_NAME=my_worker
ARG CELERY_WORKER_QUEUE=celery
ENV LOG_LEVEL=$CELERY_LOG_LEVEL
ENV WORKER_NAME=$CELERY_WORKER_NAME
ENV WORKER_QUEUE=$CELERY_WORKER_QUEUE
EXPOSE 5678
/src
is arbitrarily chosen as the working directory to install Celery from.
# Install packages
WORKDIR /src
COPY --chown=test_user:test_user requirements.txt .
RUN pip install --no-cache-dir --upgrade pip
RUN pip install -r ./requirements.txt
RUN git clone https://github.com/celery/celery.git
WORKDIR /src/celery
RUN pip install -e .
/app
is used internally by the pytest-celery plugin to inject code into the Celery worker at runtime.
# The workdir must be /app
WORKDIR /app
# Switch to the test_user
USER test_user
CMD
is set to allow standalone execution of the worker outside of the testing environment.
It is also useful for the injection of the worker as it removes the need to programmatically set the command.
# Start the celery worker
CMD celery -A app worker --loglevel=$LOG_LEVEL -n $WORKER_NAME@%h -Q $WORKER_QUEUE
myworker.py¶
The MyWorkerContainer
class is used to configure the worker container and acts as the interface
to the container instance.
class MyWorkerContainer(CeleryWorkerContainer):
@property
def client(self) -> Any:
return self
@classmethod
def version(cls) -> str:
return "Celery main branch"
@classmethod
def log_level(cls) -> str:
return "INFO"
@classmethod
def worker_name(cls) -> str:
return "my_worker"
@classmethod
def worker_queue(cls) -> str:
return "myworker"
Tip
Add the following implementation to enable debugpy for the worker container.
@classmethod
def ports(cls) -> dict | None:
return WORKER_DEBUGPY_PORTS
@classmethod
def command(cls, *args: str, **kwargs: dict) -> list[str]:
return super().command(*args, debugpy=True, wait_for_client=True, **kwargs)
The WORKER_DEBUGPY_PORTS
can be imported from the plugin.
Next, we build our worker image using the build and container fixtures.
Notice we use default fixtures for other configuration options, notably the network and volume, which allows the plugin to manage the lifecycle of these resources automatically.
These fixtures may be overridden if required.
myworker_image = build(
path=".",
dockerfile="tests/myworker/Dockerfile",
tag="pytest-celery/myworker:example",
buildargs=MyWorkerContainer.buildargs(),
)
myworker_container = container(
image="{myworker_image.id}",
ports=MyWorkerContainer.ports(),
environment=fxtr("default_worker_env"),
network="{default_pytest_celery_network.name}",
volumes={"{default_worker_volume.name}": defaults.DEFAULT_WORKER_VOLUME},
wrapper_class=MyWorkerContainer,
timeout=defaults.DEFAULT_WORKER_CONTAINER_TIMEOUT,
command=MyWorkerContainer.command(),
)
Lastly, we wrap the container in a fixture to allow it to be injected into the test environment
using the CeleryTestWorker
to represent the worker component.
@pytest.fixture
def myworker_worker(myworker_container: MyWorkerContainer, celery_setup_app: Celery) -> CeleryTestWorker:
worker = CeleryTestWorker(myworker_container, app=celery_setup_app)
yield worker
worker.teardown()
test_myworker.py¶
To inject the worker into this test suite, we hook into the celery_worker_cluster
fixture
and add the worker to the cluster, alongside the default built-in worker.
@pytest.fixture
def celery_worker_cluster(
celery_worker: CeleryTestWorker,
myworker_worker: CeleryTestWorker,
) -> CeleryWorkerCluster:
cluster = CeleryWorkerCluster(celery_worker, myworker_worker)
yield cluster
cluster.teardown()
The default worker can also be fully replaced:
@pytest.fixture
def celery_worker_cluster(
myworker_worker: CeleryTestWorker,
) -> CeleryWorkerCluster:
cluster = CeleryWorkerCluster(myworker_worker)
yield cluster
cluster.teardown()
And all that’s left is the test itself, which is a simple ping
test for each worker node in the cluster.
def test_ping(celery_setup: CeleryTestSetup):
worker: CeleryTestWorker
for worker in celery_setup.worker_cluster:
sig: Signature = ping.s()
res: AsyncResult = sig.apply_async(queue=worker.worker_queue)
assert res.get(timeout=RESULT_TIMEOUT) == "pong"