This document is for pytest_celery's development version, which can be significantly different from previous releases. Get the stable docs here: 1.1.

How to inject your own utility functions

Release:

1.2

Date:

Feb 23, 2025

The plugin injects a special utils.py module into the worker component to provide enhanced testing capabilities over the Celery worker component. The module contains API that is accessible using the CeleryTestWorker API.

This guide will teach you how to inject your own utility functions into the worker component using this mechanism.

Note

If you already understand how the initialization pipeline works, you can skip to the Custom Utility Functions section.

Worker Pipeline Breakdown

Added in version 1.0.0.

Each worker component is built using a pipeline of fixtures that control each layer and is responsible for preparing the worker for the test case. Let’s see how our Built-in Celery Worker is built to understand each step in the pipeline.

Initialization Pipeline

The worker component is initialized using a container and a node that is responsible for integrating the component with the test case.

The component’s node creation fixture receives the worker container during its initialization.

@pytest.fixture
def celery_setup_worker(
    ...
    default_worker_container: CeleryWorkerContainer,
    default_worker_app: Celery,
    ...
) -> CeleryTestWorker:

Container

The default worker container receives its configuration from the default worker fixtures. Each fixture is responsible for a different layer of the initialization procedure.

default_worker_container = container(
    image="{celery_base_worker_image.id}",
    ports=fxtr("default_worker_ports"),
    environment=fxtr("default_worker_env"),
    network="{default_pytest_celery_network.name}",
    volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
    wrapper_class=CeleryWorkerContainer,
    timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT,
    command=fxtr("default_worker_command"),
)
Image

The image is built using the built-in Dockerfile and is provided to the container using the following fixture.

celery_base_worker_image = build(
    path=WORKER_DOCKERFILE_ROOTDIR,
    tag="pytest-celery/components/worker:default",
    buildargs={
        "CELERY_VERSION": fxtr("default_worker_celery_version"),
        "CELERY_LOG_LEVEL": fxtr("default_worker_celery_log_level"),
        "CELERY_WORKER_NAME": fxtr("default_worker_celery_worker_name"),
        "CELERY_WORKER_QUEUE": fxtr("default_worker_celery_worker_queue"),
    },
)
Environment

Environment variables are provided to the worker container during initialization using the Vendor Class.

The worker receives the broker and result backend configurations from the environment variables by default using the celery_worker_cluster_config fixture, which is initialized using celery_broker_cluster_config and celery_backend_cluster_config fixtures, to provide the worker with the broker and result backend configurations according to the configured broker and backend clusters.

Network

The worker will use the default network that will be created for each test case to allow the worker component to communicate with the other components.

The network isolation allows multiple setups to run in parallel without interfering with each other.

Volumes

The plugin provides a special volume that is designed to provide improved testing control over the worker component initialization and functionality.

To practically install the pytest-celery plugin inside the worker component, the worker container needs to be using the default volume.

default_worker_container = container(
    ...
    volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
    ...
)

This will use the following binding to mount the plugin volume into the worker container.

WORKER_VOLUME = {
    "bind": "/app",
    "mode": "rw",
}

Note

The default volume may be replaced or reconfigured if needed, by providing your own volume configuration dict to the worker container.

More volumes can be added to the worker container to accommodate more complex testing scenarios, or to provide additional configuration options to the worker component. For example, the current project can be added as a mounted volume alongside the default volume to provide the worker with the project code and configuration.

volumes={
    "{default_worker_volume.name}": defaults.DEFAULT_WORKER_VOLUME,
    os.path.abspath(os.getcwd()): {
        "bind": "/target/path/in/worker",
        "mode": "rw",
    },
},

Tip

When Debugging with VSCode, the bind value is what should be set for the remoteRoot in the launch.json configuration.

Wrapper Class

The wrapper_class is responsible for providing the configuration class that will be used to initialize the worker container instance.

The wrapper_class must be a subclass of CeleryWorkerContainer.

See more: Fixture wrappers.

Timeout

The timeout defines the time pytest will wait for the worker container to be ready before raising a timeout exception.

By default, the timeout is set to accommodate parallel test runs and to provide a reasonable time for the worker to be ready in most cases. Feel free to experiment and adjust the timeout according to your needs, or use DEFAULT_WORKER_CONTAINER_TIMEOUT to apply the default timeout.

Command

The command field allows to override the worker container CMD instruction instead of the CMD defined in the Dockerfile using the default_worker_command fixture.

If the CMD instruction is provided in the Dockerfile, the command field can be omitted.

Sequence Diagram

The following diagram describes the worker component initialization pipeline described above.

        sequenceDiagram
  autonumber
  participant WCI as Worker Component Initialization
  participant CF as Container Fixture
  participant DF as Dockerfile
  participant EV as Environment Variables
  participant Net as Network
  participant Vol as Volumes
  participant PV as Plugin Volume
  participant TO as Timeout
  participant Cmd as Command
  participant WC as Wrapper Class
  participant CWCC as CeleryWorkerContainer Class
  participant NF as Node Fixture

  WCI->>CF: Initiates
  CF->>DF: Builds Image From
  CF->>EV: Sets
  CF->>Net: Connects to
  CF->>Vol: Mounts
  Vol->>PV: Includes
  CF->>TO: Sets
  CF->>Cmd: Sets
  CF->>WC: Manages with
  WC->>CWCC: Inherits from
  CF->>WCI: Create Worker Container
  WCI->>NF: Integrates the container into its node
  NF->>WCI: Node Ready, worker initialization completed
    

Configuration Pipeline

The worker uses the default_worker_initial_content fixture to provide the worker with the initial content that will be used to configure the worker component’s container volume.

@pytest.fixture
def default_worker_initial_content(
    ...
    default_worker_app_module: ModuleType,
    default_worker_utils_module: ModuleType,
    default_worker_tasks: set,
    default_worker_signals: set,
    default_worker_app: Celery,
    ...
) -> dict:

It uses the default worker fixtures to allow configuring every part of the volume using the standard pytest fixtures mechanism by itself without hooking into the default_worker_initial_content fixture directly.

The volume initialization integrates into the initialization pipeline by injecting worker configurations and files into the worker container to control the Celery app instance and provide enhanced testing capabilities.

        sequenceDiagram
  autonumber
  participant WCI as Worker Component<br>Initialization
  participant CF as Container Fixture
  participant V as Volumes
  participant DCI as Default Configuration Injection
  participant WN as Worker Node

  WCI->>CF: Initializes Container
  CF->>V: Prepares Volumes
  V->>DCI: Injects<br>default_worker_app_module,<br>default_worker_utils_module,<br>default_worker_tasks,<br>default_worker_signals,<br>default_worker_app
  DCI->>CF: Finishes Volume<br>Configuration
  CF->>WN: Finalizes Worker<br>Container Initialization
  WN->>WCI: Add container to node,<br>Worker Component Initialization Completed
    

Custom Utility Functions

Added in version 1.0.0.

To configure your own module, use the default_worker_utils_module fixture.

@pytest.fixture
def default_worker_utils_module() -> ModuleType:
    from tests import myutils

    return myutils

This will inject the myutils module into the worker component, instead of the default module, allowing you to access your own utility functions.

Warning

The module must provide all of the existing API in the utils.py module, otherwise the worker component will not function correctly (when based off of CeleryTestWorker).

For reference, the default utils.py module is defined as follows:

pytest_celery.vendors.worker.content.utils.py
"""The pytest-celery plugin provides a set of built-in components called
:ref:`vendors`.

This module is part of the :ref:`built-in-worker` vendor.
"""

from __future__ import annotations

import json

import psutil


def get_running_processes_info(columns: list[str] | None = None) -> str:
    """Get information about running processes using psutil."""
    if not columns:
        columns = [
            "pid",
            "name",
            "username",
            "cmdline",
            "cpu_percent",
            "memory_percent",
            "create_time",
        ]
    processes = [proc.info for proc in psutil.process_iter(columns)]
    return json.dumps(processes)

Tip

Check out the myutils example for a demonstration of how to use this feature.