This document is for pytest_celery's development version, which can be significantly different from previous releases. Get the stable docs here: 1.0.
Vendors¶
- Release:
1.1
- Date:
Sep 30, 2024
Built-in Brokers and Backends¶
The plugin comes with support for several brokers and backends out of the box. This page lists the supported vendors and their status.
Brokers¶
Name |
Status |
Enabled |
RabbitMQ |
Stable |
Yes |
Redis |
Stable |
Yes |
Localstack (SQS) |
Beta |
No |
Backends¶
Name |
Status |
Enabled |
Redis |
Stable |
Yes |
Memcache |
Experimental |
No |
Experimental or Beta status means it may be functional but are not confirmed to be production ready.
Enabled means that it is automatically added to the test Test Setup Matrix when running the test suite if the vendor dependencies are installed.
Warning
Enabling a new vendor will automatically add it globally to every test suite that relies on the default vendors detection. Be careful when enabling new vendors and make sure they are stable and production ready.
Built-in Celery Worker¶
The plugin provides a built-in Celery worker that can be used to run tests against. It uses the latest stable version of Celery and can be used, replaced or extended by the user.
The Dockerfile is published with the source code and can be found using
WORKER_DOCKERFILE_ROOTDIR
.
FROM python:3.10-slim-buster
# Create a user to run the worker
RUN adduser --disabled-password --gecos "" test_user
# Install system dependencies
RUN apt-get update && apt-get install -y build-essential \
git \
wget \
make \
curl \
apt-utils \
debconf \
lsb-release \
libmemcached-dev \
libffi-dev \
ca-certificates \
pypy3 \
pypy3-lib \
sudo
# Set arguments
ARG CELERY_VERSION=""
ARG CELERY_LOG_LEVEL=INFO
ARG CELERY_WORKER_NAME=celery_test_worker
ARG CELERY_WORKER_QUEUE=celery
ENV WORKER_VERSION=$CELERY_VERSION
ENV LOG_LEVEL=$CELERY_LOG_LEVEL
ENV WORKER_NAME=$CELERY_WORKER_NAME
ENV WORKER_QUEUE=$CELERY_WORKER_QUEUE
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
EXPOSE 5678
# Install Python dependencies
RUN pip install --no-cache-dir --upgrade \
pip \
celery[redis,pymemcache,gevent]${WORKER_VERSION:+==$WORKER_VERSION} \
pytest-celery[sqs]@git+https://github.com/celery/pytest-celery.git
# The workdir must be /app
WORKDIR /app
COPY content/ .
# Switch to the test_user
USER test_user
# Start the celery worker
CMD celery -A app worker --loglevel=$LOG_LEVEL -n $WORKER_NAME@%h -Q $WORKER_QUEUE
Localstack (SQS) Broker¶
To use the Localstack broker, you will need add additional configuration to the test setup.
You may add this to conftest.py
to configure the Localstack broker.
import os
import pytest
from celery import Celery
from pytest_celery import LOCALSTACK_CREDS
@pytest.fixture
def default_worker_env(default_worker_env: dict) -> dict:
default_worker_env.update(LOCALSTACK_CREDS)
return default_worker_env
@pytest.fixture(scope="session", autouse=True)
def set_aws_credentials():
os.environ.update(LOCALSTACK_CREDS)
@pytest.fixture
def default_worker_app(default_worker_app: Celery) -> Celery:
app = default_worker_app
if app.conf.broker_url and app.conf.broker_url.startswith("sqs"):
app.conf.broker_transport_options["region"] = LOCALSTACK_CREDS["AWS_DEFAULT_REGION"]
return app
And to enable the Localstack broker in the default Test Setup Matrix, add the following configuration to conftest.py
.
from pytest_celery import ALL_CELERY_BROKERS
from pytest_celery import CELERY_LOCALSTACK_BROKER
from pytest_celery import CeleryTestBroker
from pytest_celery import _is_vendor_installed
if _is_vendor_installed("localstack"):
ALL_CELERY_BROKERS.add(CELERY_LOCALSTACK_BROKER)
@pytest.fixture(params=ALL_CELERY_BROKERS)
def celery_broker(request: pytest.FixtureRequest) -> CeleryTestBroker: # type: ignore
broker: CeleryTestBroker = request.getfixturevalue(request.param)
yield broker
broker.teardown()
Custom Vendors¶
Injected brokers, backends and workers can extend the built-in ones or provide completely new ones. The plugin provides a set of base classes that can be used to implement custom vendors.
Custom Broker¶
- class pytest_celery.api.broker.CeleryTestBroker(container: CeleryTestContainer, app: Celery = None)[source]
Bases:
CeleryTestNode
This is specialized node type for handling celery brokers nodes. It is used to encapsulate a broker instance.
- Responsibility Scope:
Handling broker specific requirements and configuration.
- property app: Celery
Celery app for the node if available.
- assert_log_does_not_exist(log: str, message: str = '', timeout: int = 1) None
Assert that a log does not exist in the container.
- assert_log_exists(log: str, message: str = '', timeout: int = 60) None
Assert that a log exists in the container.
- config(*args: tuple, **kwargs: dict) dict
Compile the configurations required for Celery from this node.
- property container: CeleryTestContainer
Underlying container for the node.
- classmethod default_config() dict [source]
Default node configurations if not overridden by the user.
- hostname() str
Get the hostname of this node.
- kill(signal: str | int = 'SIGKILL', reload_container: bool = True) None
Kill the underlying container.
- logs() str
Get the logs of the underlying container.
- name() str
Get the name of this node.
- ready() bool
Waits until the node is ready or raise an exception if it fails to boot up.
- restart(reload_container: bool = True, force: bool = False) None [source]
Override restart method to update the app broker url with new container values.
- teardown() None
Teardown the node.
Custom Backend¶
- class pytest_celery.api.backend.CeleryTestBackend(container: CeleryTestContainer, app: Celery = None)[source]
Bases:
CeleryTestNode
This is specialized node type for handling celery backends nodes. It is used to encapsulate a backend instance.
- Responsibility Scope:
Handling backend specific requirements and configuration.
- property app: Celery
Celery app for the node if available.
- assert_log_does_not_exist(log: str, message: str = '', timeout: int = 1) None
Assert that a log does not exist in the container.
- assert_log_exists(log: str, message: str = '', timeout: int = 60) None
Assert that a log exists in the container.
- config(*args: tuple, **kwargs: dict) dict
Compile the configurations required for Celery from this node.
- property container: CeleryTestContainer
Underlying container for the node.
- classmethod default_config() dict [source]
Default node configurations if not overridden by the user.
- hostname() str
Get the hostname of this node.
- kill(signal: str | int = 'SIGKILL', reload_container: bool = True) None
Kill the underlying container.
- logs() str
Get the logs of the underlying container.
- name() str
Get the name of this node.
- ready() bool
Waits until the node is ready or raise an exception if it fails to boot up.
- restart(reload_container: bool = True, force: bool = False) None [source]
Override restart method to update the app result backend with new container values.
- teardown() None
Teardown the node.
Custom Worker¶
- class pytest_celery.api.worker.CeleryTestWorker(container: CeleryTestContainer, app: Celery)[source]
Bases:
CeleryTestNode
This is specialized node type for handling celery worker nodes. It is used to encapsulate a worker instance.
- Responsibility Scope:
Managing a celery worker.
- property app: Celery
Celery app for the node if available.
- assert_log_does_not_exist(log: str, message: str = '', timeout: int = 1) None
Assert that a log does not exist in the container.
- assert_log_exists(log: str, message: str = '', timeout: int = 60) None
Assert that a log exists in the container.
- config(*args: tuple, **kwargs: dict) dict
Compile the configurations required for Celery from this node.
- property container: CeleryTestContainer
Underlying container for the node.
- classmethod default_config() dict
Default node configurations if not overridden by the user.
- get_running_processes_info(columns: list[str] | None = None, filters: dict[str, str] | None = None) list[dict] [source]
Get running processes info on the container of this node.
- Possible columns:
pid
name
username
cmdline
cpu_percent
memory_percent
create_time
- kill(signal: str | int = 'SIGKILL', reload_container: bool = True) None
Kill the underlying container.
- property log_level: str
Celery log level of this worker node.
- logs() str
Get the logs of the underlying container.
- name() str
Get the name of this node.
- ready() bool
Waits until the node is ready or raise an exception if it fails to boot up.
- teardown() None
Teardown the node.
- property version: str
Celery version of this worker node.
- wait_for_log(log: str, message: str = '', timeout: int = 60) None
Wait for a log to appear in the container.
- property worker_name: str
Celery test worker node name.
- property worker_queue: str
Celery queue for this worker node.
Vendor Class¶
The Vendor Class is an optional mechanism for OOP style configuration of the plugin’s vendors. It allows registering a class that defines how does the vendor behave and configured.
The vendor class represents the vendor’s container class that is used automatically by the plugin.
The following diagram shows the relationship between the vendor class and the vendor’s infrastructure.
Use Cases¶
Warning
It is used only to override the built-in vendors containers.
Registering a Vendor Class¶
The plugin uses the vendor class to implement the default fixtures of the vendor. To override it, create your own vendor class and subclass the matching built-in vendor class to include the built-in fixtures implementation.
Worker Example¶
class MyWorkerContainer(CeleryWorkerContainer):
@property
def client(self) -> Any:
return self
@classmethod
def version(cls) -> str:
return celery.__version__
@classmethod
def log_level(cls) -> str:
return "INFO"
@classmethod
def worker_name(cls) -> str:
return "my_tests_worker"
@classmethod
def worker_queue(cls) -> str:
return "my_tests_queue"
def post_initialization_logic(self) -> None:
pass
And then, register it using the matching default fixture.
@pytest.fixture
def default_worker_container_cls() -> Type[CeleryWorkerContainer]:
return MyWorkerContainer
Warning
The worker vendor requires another fixture to be registered to allow configuring the worker before it gets built.
@pytest.fixture(scope="session")
def default_worker_container_session_cls() -> Type[CeleryWorkerContainer]:
return MyWorkerContainer
There’s no session
vendor class for other vendors.
For RabbitMQ Broker use
default_rabbitmq_broker_cls
.For Redis Broker use
default_redis_broker_cls
.For SQS Broker use
default_localstack_broker_cls
.For Redis Backend use
default_redis_backend_cls
.For Memcache Backend use
default_memcached_backend_cls
.
Accessing the Vendor Class¶
Once a vendor class has been registered, it can be accessed using the Test Setup. Any additional API added to the class can be accessed as well.
For example,
def test_accessing_post_initialization_logic(celery_setup: CeleryTestSetup):
worker: MyWorkerContainer = celery_setup.worker
worker.post_initialization_logic()