This document describes the current stable version of Celery (5.4). For development docs, go here.

Source code for celery.backends.database.models

"""Database models used by the SQLAlchemy result store backend."""
from datetime import datetime, timezone

import sqlalchemy as sa
from sqlalchemy.types import PickleType

from celery import states

from .session import ResultModelBase

__all__ = ('Task', 'TaskExtended', 'TaskSet')


[docs] class Task(ResultModelBase): """Task result/status.""" __tablename__ = 'celery_taskmeta' __table_args__ = {'sqlite_autoincrement': True} id = sa.Column(sa.Integer, sa.Sequence('task_id_sequence'), primary_key=True, autoincrement=True) task_id = sa.Column(sa.String(155), unique=True) status = sa.Column(sa.String(50), default=states.PENDING) result = sa.Column(PickleType, nullable=True) date_done = sa.Column(sa.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc), nullable=True) traceback = sa.Column(sa.Text, nullable=True) def __init__(self, task_id): self.task_id = task_id
[docs] def to_dict(self): return { 'task_id': self.task_id, 'status': self.status, 'result': self.result, 'traceback': self.traceback, 'date_done': self.date_done, }
def __repr__(self): return '<Task {0.task_id} state: {0.status}>'.format(self)
[docs] @classmethod def configure(cls, schema=None, name=None): cls.__table__.schema = schema cls.id.default.schema = schema cls.__table__.name = name or cls.__tablename__
[docs] class TaskExtended(Task): """For the extend result.""" __tablename__ = 'celery_taskmeta' __table_args__ = {'sqlite_autoincrement': True, 'extend_existing': True} name = sa.Column(sa.String(155), nullable=True) args = sa.Column(sa.LargeBinary, nullable=True) kwargs = sa.Column(sa.LargeBinary, nullable=True) worker = sa.Column(sa.String(155), nullable=True) retries = sa.Column(sa.Integer, nullable=True) queue = sa.Column(sa.String(155), nullable=True)
[docs] def to_dict(self): task_dict = super().to_dict() task_dict.update({ 'name': self.name, 'args': self.args, 'kwargs': self.kwargs, 'worker': self.worker, 'retries': self.retries, 'queue': self.queue, }) return task_dict
[docs] class TaskSet(ResultModelBase): """TaskSet result.""" __tablename__ = 'celery_tasksetmeta' __table_args__ = {'sqlite_autoincrement': True} id = sa.Column(sa.Integer, sa.Sequence('taskset_id_sequence'), autoincrement=True, primary_key=True) taskset_id = sa.Column(sa.String(155), unique=True) result = sa.Column(PickleType, nullable=True) date_done = sa.Column(sa.DateTime, default=datetime.now(timezone.utc), nullable=True) def __init__(self, taskset_id, result): self.taskset_id = taskset_id self.result = result
[docs] def to_dict(self): return { 'taskset_id': self.taskset_id, 'result': self.result, 'date_done': self.date_done, }
def __repr__(self): return f'<TaskSet: {self.taskset_id}>'
[docs] @classmethod def configure(cls, schema=None, name=None): cls.__table__.schema = schema cls.id.default.schema = schema cls.__table__.name = name or cls.__tablename__