This document describes the current stable version of Celery (3.1). For development docs, go here.
Task Message Protocol v2 (Draft Spec.)¶
Notes¶
Support for multiple languages via the
lang
header.Worker may redirect the message to a worker that supports the language.
Metadata moved to headers.
This means that workers/intermediates can inspect the message and make decisions based on the headers without decoding the payload (which may be language specific, e.g. serialized by the Python specific pickle serializer).
Body is only for language specific data.
- Python stores args/kwargs in body.
- If a message uses raw encoding then the raw data will be passed as a single argument to the function.
- Java/C, etc. can use a thrift/protobuf document as the body
Dispatches to actor based on
c_type
,c_meth
headersc_meth
is unused by python, but may be used in the future to specify class+method pairs.Chain gains a dedicated field.
Reducing the chain into a recursive
callbacks
argument causes problems when the recursion limit is exceeded.This is fixed in the new message protocol by specifying a list of signatures, each task will then pop a task off the list when sending the next message:
execute_task(message) chain = message.headers['chain'] if chain: sig = maybe_signature(chain.pop()) sig.apply_async(chain=chain)
correlation_id
replacestask_id
field.c_shadow
lets you specify a different name for logs, monitors can be used for e.g. meta tasks that calls any function:from celery.utils.imports import qualname class PickleTask(Task): abstract = True def unpack_args(self, fun, args=()): return fun, args def apply_async(self, args, kwargs, **options): fun, real_args = self.unpack_args(*args) return super(PickleTask, self).apply_async( (fun, real_args, kwargs), shadow=qualname(fun), **options ) @app.task(base=PickleTask) def call(fun, args, kwargs): return fun(*args, **kwargs)
Undecided¶
May consider moving callbacks/errbacks/chain into body.
Will huge lists in headers cause overhead? The downside of keeping them in the body is that intermediates won’t be able to introspect these values.
Definition¶
# protocol v2 implies UTC=True
# 'class' header existing means protocol is v2
properties = {
'correlation_id': (uuid)task_id,
'content_type': (string)mime,
'content_encoding': (string)encoding,
# optional
'reply_to': (string)queue_or_url,
}
headers = {
'lang': (string)'py'
'c_type': (string)task,
# optional
'c_meth': (string)unused,
'c_shadow': (string)replace_name,
'eta': (iso8601)eta,
'expires'; (iso8601)expires,
'callbacks': (list)Signature,
'errbacks': (list)Signature,
'chain': (list)Signature, # non-recursive, reversed list of signatures
'group': (uuid)group_id,
'chord': (uuid)chord_id,
'retries': (int)retries,
'timelimit': (tuple)(soft, hard),
}
body = (args, kwargs)
Example¶
# chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
task_id = uuid()
basic_publish(
message=json.dumps([[2, 2], {}]),
application_headers={
'lang': 'py',
'c_type': 'proj.tasks.add',
'chain': [
# reversed chain list
{'task': 'proj.tasks.add', 'args': (8, )},
{'task': 'proj.tasks.add', 'args': (4, )},
]
}
properties={
'correlation_id': task_id,
'content_type': 'application/json',
'content_encoding': 'utf-8',
}
)