This document describes the current stable version of pytest_celery (1.0). For development docs, go here.

Source code for pytest_celery.vendors.redis.backend.api

"""The pytest-celery plugin provides a set of built-in components called
:ref:`vendors`.

This module is part of the Redis Backend vendor.
"""

from __future__ import annotations

import gc

from pytest_celery.api.backend import CeleryTestBackend


[docs] class RedisTestBackend(CeleryTestBackend):
[docs] def teardown(self) -> None: # When a test that has a AsyncResult object is finished # there's a race condition between the AsyncResult object # and the Redis container. The AsyncResult object tries # to release the connection but the Redis container has already # exited. This causes a warning to be logged. To avoid this # warning to our best effort we force a garbage collection here. gc.collect(1) super().teardown()