This document describes the current stable version of pytest_celery (1.0). For development docs, go here.
worker_pool¶
- Release:
1.0
- Date:
May 16, 2024
Description¶
This example project demonstrates how to use a different worker pool. The example uses two different methods to run the Celery worker with different pools.
The following guide will explain each method and how they are used.
Tip
See first the myworker example before continuing with this one.
Breakdown¶
File Structure¶
The following diagram lists the relevant files in the project.
rabbitmq_management/
├── tests/
│ ├── __init__.py
│ └── test_gevent_pool.py
│ └── test_solo_pool.py
└── Dockerfile
└── tasks.py
└── requirements.txt
Dockerfile¶
To use the gevent pool, we create our own image using a similar Dockerfile to the one in the myworker example. The purpose of this worker is to ensure the gevent dependency is installed.
FROM python:3.11-bookworm
# Create a user to run the worker
RUN adduser --disabled-password --gecos "" test_user
# Install system dependencies
RUN apt-get update && apt-get install -y build-essential git libevent-dev
# Set arguments
ARG CELERY_LOG_LEVEL=INFO
ARG CELERY_WORKER_NAME=my_worker
ARG CELERY_WORKER_QUEUE=celery
ENV LOG_LEVEL=$CELERY_LOG_LEVEL
ENV WORKER_NAME=$CELERY_WORKER_NAME
ENV WORKER_QUEUE=$CELERY_WORKER_QUEUE
EXPOSE 5678
# Install packages
COPY --chown=test_user:test_user requirements.txt .
RUN pip install --no-cache-dir --upgrade pip
RUN pip install -r ./requirements.txt
# The workdir must be /app
WORKDIR /app
# Switch to the test_user
USER test_user
# Start the celery worker
CMD celery -A app worker --loglevel=$LOG_LEVEL -n $WORKER_NAME@%h -Q $WORKER_QUEUE
pytest>=7.4.4
pytest-xdist>=3.5.0
pytest-subtests>=0.11.0
pytest-rerunfailures>=14.0
celery[gevent]
pytest-celery[all]@git+https://github.com/celery/pytest-celery.git
tasks.py¶
Our tasks module is using the example task from the Celery gevent example.
# Based on https://github.com/celery/celery/blob/main/examples/gevent/tasks.py
import requests
from celery import shared_task
@shared_task(ignore_result=True)
def urlopen(url):
print(f"Opening: {url}")
try:
requests.get(url)
except requests.exceptions.RequestException as exc:
print(f"Exception for {url}: {exc!r}")
return url, 0
print(f"Done with: {url}")
return url, 1
test_gevent_pool.py¶
To add a new gevent worker, we create a new CeleryWorkerContainer
to
configure the worker with the gevent pool.
from __future__ import annotations
import pytest
import tasks
from celery import Celery
from celery.canvas import Signature
from celery.canvas import group
from celery.result import AsyncResult
from pytest_docker_tools import build
from pytest_docker_tools import container
from pytest_docker_tools import fxtr
from pytest_celery import RESULT_TIMEOUT
from pytest_celery import CeleryTestSetup
from pytest_celery import CeleryTestWorker
from pytest_celery import CeleryWorkerCluster
from pytest_celery import CeleryWorkerContainer
from pytest_celery import defaults
from pytest_celery import ping
class GeventWorkerContainer(CeleryWorkerContainer):
@classmethod
def command(cls, *args: str) -> list[str]:
return super().command("-P", "gevent", "-c", "1000")
gevent_worker_image = build(
path=".",
dockerfile="Dockerfile",
tag="pytest-celery/examples/worker_pool:gevent",
buildargs=GeventWorkerContainer.buildargs(),
)
gevent_worker_container = container(
image="{gevent_worker_image.id}",
ports=fxtr("default_worker_ports"),
environment=fxtr("default_worker_env"),
network="{default_pytest_celery_network.name}",
volumes={"{default_worker_volume.name}": defaults.DEFAULT_WORKER_VOLUME},
wrapper_class=GeventWorkerContainer,
timeout=defaults.DEFAULT_WORKER_CONTAINER_TIMEOUT,
command=GeventWorkerContainer.command(),
)
@pytest.fixture
def gevent_worker(gevent_worker_container: GeventWorkerContainer, celery_setup_app: Celery) -> CeleryTestWorker:
worker = CeleryTestWorker(gevent_worker_container, app=celery_setup_app)
yield worker
worker.teardown()
@pytest.fixture
def celery_worker_cluster(gevent_worker: CeleryTestWorker) -> CeleryWorkerCluster:
cluster = CeleryWorkerCluster(gevent_worker)
yield cluster
cluster.teardown()
@pytest.fixture
def default_worker_tasks(default_worker_tasks: set) -> set:
default_worker_tasks.add(tasks)
return default_worker_tasks
And then we can just use it in our tests.
class TestGeventPool:
def test_celery_banner(self, gevent_worker: CeleryTestWorker):
gevent_worker.assert_log_exists("concurrency: 1000 (gevent)")
def test_ping(self, celery_setup: CeleryTestSetup):
sig: Signature = ping.s()
res: AsyncResult = sig.apply_async()
assert res.get(timeout=RESULT_TIMEOUT) == "pong"
def test_celery_gevent_example(self, celery_setup: CeleryTestSetup):
"""Based on https://github.com/celery/celery/tree/main/examples/gevent"""
LIST_OF_URLS = [
"https://github.com/celery",
"https://github.com/celery/celery",
"https://github.com/celery/pytest-celery",
]
group(tasks.urlopen.s(url) for url in LIST_OF_URLS).apply_async()
celery_setup.worker.assert_log_does_not_exist("Exception for")
test_solo_pool.py¶
The solo pool example on the other hand, reconfigures the default Built-in Celery Worker as it does not require any additional dependencies.
from __future__ import annotations
import pytest
from celery.canvas import Signature
from celery.result import AsyncResult
from pytest_celery import RESULT_TIMEOUT
from pytest_celery import CeleryTestSetup
from pytest_celery import CeleryTestWorker
from pytest_celery import CeleryWorkerContainer
from pytest_celery import ping
class SoloPoolWorker(CeleryWorkerContainer):
@classmethod
def command(cls, *args: str) -> list[str]:
return super().command("-P", "solo")
@pytest.fixture
def default_worker_container_cls() -> type[CeleryWorkerContainer]:
return SoloPoolWorker
@pytest.fixture(scope="session")
def default_worker_container_session_cls() -> type[CeleryWorkerContainer]:
return SoloPoolWorker
class TestSoloPool:
def test_celery_banner(self, celery_worker: CeleryTestWorker):
celery_worker.assert_log_exists("solo")
def test_ping(self, celery_setup: CeleryTestSetup):
sig: Signature = ping.s()
res: AsyncResult = sig.apply_async()
assert res.get(timeout=RESULT_TIMEOUT) == "pong"