This document is for pytest_celery's development version, which can be significantly different from previous releases. Get the stable docs here: 1.0.
How to connect signal handlers¶
- Release:
1.1
- Date:
Sep 30, 2024
Signal handlers may be defined in the publisher or the consumer side or both. When done on the publisher side, they can be connected inside the scope of the test function using the standard Celery API. When done on the consumer side, they can be connected using injected signal handlers modules, which we’ll cover in this guide.
The plugin uses its Code Generation mechanism to inject signal handlers modules into the worker container. The available signal handlers can be configured differently for each test case using the Fixture availability feature of pytest.
This guide will teach you how to utilize this mechanism to connect signal handlers to your Celery workers in your test cases.
Note
If you already understand how the initialization pipeline works, you can skip to the Signal handlers modules injection section.
Worker Pipeline Breakdown¶
Added in version 1.0.0.
Each worker component is built using a pipeline of fixtures that control each layer and is responsible for preparing the worker for the test case. Let’s see how our Built-in Celery Worker is built to understand each step in the pipeline.
Initialization Pipeline¶
The worker component is initialized using a container and a node that is responsible for integrating the component with the test case.
The component’s node creation fixture receives the worker container during its initialization.
@pytest.fixture
def celery_setup_worker(
...
default_worker_container: CeleryWorkerContainer,
default_worker_app: Celery,
...
) -> CeleryTestWorker:
Container¶
The default worker container receives its configuration from the default worker fixtures
.
Each fixture is responsible for a different layer of the initialization procedure.
default_worker_container = container(
image="{celery_base_worker_image.id}",
ports=fxtr("default_worker_ports"),
environment=fxtr("default_worker_env"),
network="{default_pytest_celery_network.name}",
volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
wrapper_class=CeleryWorkerContainer,
timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT,
command=fxtr("default_worker_command"),
)
Image¶
The image is built using the built-in Dockerfile and is provided to the container using the following fixture.
celery_base_worker_image = build(
path=WORKER_DOCKERFILE_ROOTDIR,
tag="pytest-celery/components/worker:default",
buildargs={
"CELERY_VERSION": fxtr("default_worker_celery_version"),
"CELERY_LOG_LEVEL": fxtr("default_worker_celery_log_level"),
"CELERY_WORKER_NAME": fxtr("default_worker_celery_worker_name"),
"CELERY_WORKER_QUEUE": fxtr("default_worker_celery_worker_queue"),
},
)
Environment¶
Environment variables are provided to the worker container during initialization using the Vendor Class.
The worker receives the broker and result backend configurations from the environment variables
by default using the celery_worker_cluster_config
fixture,
which is initialized using celery_broker_cluster_config
and
celery_backend_cluster_config
fixtures,
to provide the worker with the broker and result backend configurations according to the configured broker and backend clusters.
Network¶
The worker will use the default network that will be created for each test case to allow the worker component to communicate with the other components.
The network isolation allows multiple setups to run in parallel without interfering with each other.
Volumes¶
The plugin provides a special volume that is designed to provide improved testing control over the worker component initialization and functionality.
To practically install the pytest-celery plugin inside the worker component, the worker container needs to be using the default volume.
default_worker_container = container(
...
volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
...
)
This will use the following binding to mount the plugin volume into the worker container.
WORKER_VOLUME = {
"bind": "/app",
"mode": "rw",
}
Note
The default volume may be replaced or reconfigured if needed, by providing your own volume configuration dict to the worker container.
More volumes can be added to the worker container to accommodate more complex testing scenarios, or to provide additional configuration options to the worker component. For example, the current project can be added as a mounted volume alongside the default volume to provide the worker with the project code and configuration.
volumes={
"{default_worker_volume.name}": defaults.DEFAULT_WORKER_VOLUME,
os.path.abspath(os.getcwd()): {
"bind": "/target/path/in/worker",
"mode": "rw",
},
},
Tip
When Debugging with VSCode, the bind value is what should be set for the remoteRoot
in the launch.json
configuration.
Wrapper Class¶
The wrapper_class
is responsible for providing the configuration class that will be used to
initialize the worker container instance.
The wrapper_class
must be a subclass of CeleryWorkerContainer
.
See more: Fixture wrappers.
Timeout¶
The timeout defines the time pytest will wait for the worker container to be ready before raising a timeout exception.
By default, the timeout is set to accommodate parallel test runs and to provide a reasonable
time for the worker to be ready in most cases. Feel free to experiment and adjust the timeout
according to your needs, or use DEFAULT_WORKER_CONTAINER_TIMEOUT
to apply the default timeout.
Command¶
The command field allows to override the worker container CMD
instruction instead of
the CMD
defined in the Dockerfile using the default_worker_command
fixture.
If the CMD
instruction is provided in the Dockerfile, the command field can be omitted.
Sequence Diagram¶
The following diagram describes the worker component initialization pipeline described above.
Configuration Pipeline¶
The worker uses the default_worker_initial_content
fixture
to provide the worker with the initial content that will be used to configure the worker component’s container volume.
@pytest.fixture
def default_worker_initial_content(
...
default_worker_app_module: ModuleType,
default_worker_utils_module: ModuleType,
default_worker_tasks: set,
default_worker_signals: set,
default_worker_app: Celery,
...
) -> dict:
It uses the default worker fixtures
to allow configuring every part of the volume
using the standard pytest fixtures mechanism by itself
without hooking into the default_worker_initial_content
fixture directly.
The volume initialization integrates into the initialization pipeline by injecting worker configurations and files into the worker container to control the Celery app instance and provide enhanced testing capabilities.
Signal handlers modules injection¶
Added in version 1.0.0.
To add your own signal handlers, use the default_worker_signals
fixture.
@pytest.fixture
def default_worker_signals(default_worker_signals: set) -> set:
from tests import signals
default_worker_signals.add(signals)
return default_worker_signals
For example, we can review the plugin’s tests to see how the signal handlers are connected.
signals.py¶
This module contain our signal handlers which we want to connect on the consumer side.
from celery.signals import worker_init
from celery.signals import worker_process_init
from celery.signals import worker_process_shutdown
from celery.signals import worker_ready
from celery.signals import worker_shutdown
@worker_init.connect
def worker_init_handler(sender, **kwargs): # type: ignore
print("worker_init_handler")
@worker_process_init.connect
def worker_process_init_handler(sender, **kwargs): # type: ignore
print("worker_process_init_handler")
@worker_process_shutdown.connect
def worker_process_shutdown_handler(sender, pid, exitcode, **kwargs): # type: ignore
print("worker_process_shutdown_handler")
@worker_ready.connect
def worker_ready_handler(sender, **kwargs): # type: ignore
print("worker_ready_handler")
@worker_shutdown.connect
def worker_shutdown_handler(sender, **kwargs): # type: ignore
print("worker_shutdown_handler")
test_signals.py¶
These tests demonstrate how to query the output of the signal handlers that were injected into the worker container alongside inline signal handlers connected on the publisher side.
@pytest.mark.parametrize(
"log, control",
[
("worker_init_handler", None),
("worker_process_init_handler", None),
("worker_ready_handler", None),
("worker_process_shutdown_handler", "shutdown"),
("worker_shutdown_handler", "shutdown"),
],
)
def test_sanity(self, celery_setup: CeleryTestSetup, log: str, control: str):
if isinstance(celery_setup.broker, LocalstackTestBroker) and control == "shutdown":
pytest.xfail("Potential real bug where shutdown signal isn't called with SQS broker")
if control:
celery_setup.app.control.broadcast(control)
celery_setup.worker.assert_log_exists(log)
def test_before_task_publish(self, celery_setup: CeleryTestSetup):
@before_task_publish.connect
def before_task_publish_handler(*args, **kwargs):
nonlocal signal_was_called
signal_was_called = True
signal_was_called = False
noop.s().apply_async(queue=celery_setup.worker.worker_queue)
assert signal_was_called is True
def test_after_task_publish(self, celery_setup: CeleryTestSetup):
@after_task_publish.connect
def after_task_publish_handler(*args, **kwargs):
nonlocal signal_was_called
signal_was_called = True
signal_was_called = False
noop.s().apply_async(queue=celery_setup.worker.worker_queue)
assert signal_was_called is True