This document describes the current stable version of pytest_celery (1.3). For development docs, go here.
How to connect signal handlers¶
- Release:
1.3
- Date:
Mar 24, 2026
Signal handlers may be defined in the publisher or the consumer side or both. When done on the publisher side, they can be connected inside the scope of the test function using the standard Celery API. When done on the consumer side, they can be connected using injected signal handlers modules, which we’ll cover in this guide.
The plugin uses its Code Generation mechanism to inject signal handlers modules into the worker container. The available signal handlers can be configured differently for each test case using the Fixture availability feature of pytest.
This guide will teach you how to utilize this mechanism to connect signal handlers to your Celery workers in your test cases.
Note
If you already understand how the initialization pipeline works, you can skip to the Signal handlers modules injection section.
Worker Pipeline Breakdown¶
Added in version 1.0.0.
Each worker component is built using a pipeline of fixtures that control each layer and is responsible for preparing the worker for the test case. Let’s see how our Built-in Celery Worker is built to understand each step in the pipeline.
Initialization Pipeline¶
The worker component is initialized using a container and a node that is responsible for integrating the component with the test case.
The component’s node creation fixture receives the worker container during its initialization.
@pytest.fixture
def celery_setup_worker(
...
default_worker_container: CeleryWorkerContainer,
default_worker_app: Celery,
...
) -> CeleryTestWorker:
Container¶
The default worker container receives its configuration from the default worker fixtures.
Each fixture is responsible for a different layer of the initialization procedure.
default_worker_container = container(
image="{celery_base_worker_image.id}",
ports=fxtr("default_worker_ports"),
environment=fxtr("default_worker_env"),
network="{default_pytest_celery_network.name}",
volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
wrapper_class=CeleryWorkerContainer,
timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT,
command=fxtr("default_worker_command"),
)
Image¶
The image is built using the built-in Dockerfile and is provided to the container using the following fixture.
celery_base_worker_image = build(
path=WORKER_DOCKERFILE_ROOTDIR,
tag="pytest-celery/components/worker:default",
buildargs={
"CELERY_VERSION": fxtr("default_worker_celery_version"),
"CELERY_LOG_LEVEL": fxtr("default_worker_celery_log_level"),
"CELERY_WORKER_NAME": fxtr("default_worker_celery_worker_name"),
"CELERY_WORKER_QUEUE": fxtr("default_worker_celery_worker_queue"),
},
)
Environment¶
Environment variables are provided to the worker container during initialization using the Vendor Class.
The worker receives the broker and result backend configurations from the environment variables
by default using the celery_worker_cluster_config fixture,
which is initialized using celery_broker_cluster_config and
celery_backend_cluster_config fixtures,
to provide the worker with the broker and result backend configurations according to the configured broker and backend clusters.
Network¶
The worker will use the default network that will be created for each test case to allow the worker component to communicate with the other components.
The network isolation allows multiple setups to run in parallel without interfering with each other.
Volumes¶
The plugin provides a special volume that is designed to provide improved testing control over the worker component initialization and functionality.
To practically install the pytest-celery plugin inside the worker component, the worker container needs to be using the default volume.
default_worker_container = container(
...
volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
...
)
This will use the following binding to mount the plugin volume into the worker container.
WORKER_VOLUME = {
"bind": "/app",
"mode": "rw",
}
Note
The default volume may be replaced or reconfigured if needed, by providing your own volume configuration dict to the worker container.
More volumes can be added to the worker container to accommodate more complex testing scenarios, or to provide additional configuration options to the worker component. For example, the current project can be added as a mounted volume alongside the default volume to provide the worker with the project code and configuration.
volumes={
"{default_worker_volume.name}": defaults.DEFAULT_WORKER_VOLUME,
os.path.abspath(os.getcwd()): {
"bind": "/target/path/in/worker",
"mode": "rw",
},
},
Tip
When Debugging with VSCode, the bind value is what should be set for the remoteRoot in the launch.json configuration.
Wrapper Class¶
The wrapper_class is responsible for providing the configuration class that will be used to
initialize the worker container instance.
The wrapper_class must be a subclass of CeleryWorkerContainer.
See more: Fixture wrappers.
Timeout¶
The timeout defines the time pytest will wait for the worker container to be ready before raising a timeout exception.
By default, the timeout is set to accommodate parallel test runs and to provide a reasonable
time for the worker to be ready in most cases. Feel free to experiment and adjust the timeout
according to your needs, or use DEFAULT_WORKER_CONTAINER_TIMEOUT to apply the default timeout.
Command¶
The command field allows to override the worker container CMD instruction instead of
the CMD defined in the Dockerfile using the default_worker_command fixture.
If the CMD instruction is provided in the Dockerfile, the command field can be omitted.
Sequence Diagram¶
The following diagram describes the worker component initialization pipeline described above.
sequenceDiagram
autonumber
participant WCI as Worker Component Initialization
participant CF as Container Fixture
participant DF as Dockerfile
participant EV as Environment Variables
participant Net as Network
participant Vol as Volumes
participant PV as Plugin Volume
participant TO as Timeout
participant Cmd as Command
participant WC as Wrapper Class
participant CWCC as CeleryWorkerContainer Class
participant NF as Node Fixture
WCI->>CF: Initiates
CF->>DF: Builds Image From
CF->>EV: Sets
CF->>Net: Connects to
CF->>Vol: Mounts
Vol->>PV: Includes
CF->>TO: Sets
CF->>Cmd: Sets
CF->>WC: Manages with
WC->>CWCC: Inherits from
CF->>WCI: Create Worker Container
WCI->>NF: Integrates the container into its node
NF->>WCI: Node Ready, worker initialization completed
Configuration Pipeline¶
The worker uses the default_worker_initial_content fixture
to provide the worker with the initial content that will be used to configure the worker component’s container volume.
@pytest.fixture
def default_worker_initial_content(
...
default_worker_app_module: ModuleType,
default_worker_utils_module: ModuleType,
default_worker_tasks: set,
default_worker_signals: set,
default_worker_app: Celery,
...
) -> dict:
It uses the default worker fixtures to allow configuring every part of the volume
using the standard pytest fixtures mechanism by itself
without hooking into the default_worker_initial_content
fixture directly.
The volume initialization integrates into the initialization pipeline by injecting worker configurations and files into the worker container to control the Celery app instance and provide enhanced testing capabilities.
sequenceDiagram
autonumber
participant WCI as Worker Component<br>Initialization
participant CF as Container Fixture
participant V as Volumes
participant DCI as Default Configuration Injection
participant WN as Worker Node
WCI->>CF: Initializes Container
CF->>V: Prepares Volumes
V->>DCI: Injects<br>default_worker_app_module,<br>default_worker_utils_module,<br>default_worker_tasks,<br>default_worker_signals,<br>default_worker_app
DCI->>CF: Finishes Volume<br>Configuration
CF->>WN: Finalizes Worker<br>Container Initialization
WN->>WCI: Add container to node,<br>Worker Component Initialization Completed
Signal handlers modules injection¶
Added in version 1.0.0.
To add your own signal handlers, use the default_worker_signals fixture.
@pytest.fixture
def default_worker_signals(default_worker_signals: set) -> set:
from tests import signals
default_worker_signals.add(signals)
return default_worker_signals
For example, we can review the plugin’s tests to see how the signal handlers are connected.
signals.py¶
This module contain our signal handlers which we want to connect on the consumer side.
from celery.signals import worker_init
from celery.signals import worker_process_init
from celery.signals import worker_process_shutdown
from celery.signals import worker_ready
from celery.signals import worker_shutdown
@worker_init.connect
def worker_init_handler(sender, **kwargs): # type: ignore
print("worker_init_handler")
@worker_process_init.connect
def worker_process_init_handler(sender, **kwargs): # type: ignore
print("worker_process_init_handler")
@worker_process_shutdown.connect
def worker_process_shutdown_handler(sender, pid, exitcode, **kwargs): # type: ignore
print("worker_process_shutdown_handler")
@worker_ready.connect
def worker_ready_handler(sender, **kwargs): # type: ignore
print("worker_ready_handler")
@worker_shutdown.connect
def worker_shutdown_handler(sender, **kwargs): # type: ignore
print("worker_shutdown_handler")
test_signals.py¶
These tests demonstrate how to query the output of the signal handlers that were injected into the worker container alongside inline signal handlers connected on the publisher side.
@pytest.mark.parametrize(
"log, control",
[
("worker_init_handler", None),
("worker_process_init_handler", None),
("worker_ready_handler", None),
("worker_process_shutdown_handler", "shutdown"),
("worker_shutdown_handler", "shutdown"),
],
)
def test_sanity(self, celery_setup: CeleryTestSetup, log: str, control: str):
if isinstance(celery_setup.broker, LocalstackTestBroker) and control == "shutdown":
pytest.xfail("Potential real bug where shutdown signal isn't called with SQS broker")
if control:
celery_setup.app.control.broadcast(control)
celery_setup.worker.assert_log_exists(log)
def test_before_task_publish(self, celery_setup: CeleryTestSetup):
@before_task_publish.connect
def before_task_publish_handler(*args, **kwargs):
nonlocal signal_was_called
signal_was_called = True
signal_was_called = False
noop.s().apply_async(queue=celery_setup.worker.worker_queue)
assert signal_was_called is True
def test_after_task_publish(self, celery_setup: CeleryTestSetup):
@after_task_publish.connect
def after_task_publish_handler(*args, **kwargs):
nonlocal signal_was_called
signal_was_called = True
signal_was_called = False
noop.s().apply_async(queue=celery_setup.worker.worker_queue)
assert signal_was_called is True