Во всех подобных проблемах, которые я видел в stackOverflow:
- Celery Получена незарегистрированная задача типа (пример запуска)
- получение ошибки "Получено незарегистрированное задание типа mytasks.add" а>
- Получено незарегистрированное задание для сельдерея
- https://serverfault.com/questions/416888/celery-daemon-receives-unregistered-tasks< /а>
- https://github.com/duointeractive/sea-cucumber/issues/15
в ошибке указано имя незарегистрированной задачи. У меня другая проблема. Имя задачи не отображается, а отображается Received unregistered task of type <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>
, что приводит к KeyError: <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>
.
Вот моя трассировка:
KeyError: <AsyncResult: 4aca05f8-14c6-4a25-988a-ff605a27871d>
[2016-06-15 14:11:46,016: ERROR/MainProcess] Received unregistered task of type <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you are using relative imports?
The full contents of the message body was:
{'utc': True, 'chord': None, 'args': [], 'retries': 0, 'expires': None, 'task': <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>, 'callbacks': None, 'errbacks': None, 'timelimit': (None, None), 'taskset': 'a6e8d1c0-c75b-471e-b21f-af8492592aeb', 'kwargs': {}, 'eta': None, 'id': '0dffed5f-3090-417c-a9ec-c99e11bc9579'} (568b)
Traceback (most recent call last):
File "/Users/me/Developer/virtualenvironments/project_name/lib/python2.7/site-packages/celery/worker/consumer.py", line 456, in on_task_received
strategies[name](message, body,
KeyError: <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>
Мое приложение celery включает файл, в котором у меня только 3 задачи:
приложение/celery_app.py:
celery_app = Celery('app',
broker='amqp://ip', # RabbitMQ
backend='redis://ip', #Redis
include=['app.tasks.assets'])
celery_app.conf.update(
CELERY_DEFAULT_QUEUE = 'local_testing',
CELERY_TASK_RESULT_EXPIRES=86400, # 24 hours
CELERY_ROUTES={
'app.tasks.assets.list_assets': {'queue': 'gatherAPI'},
'app.tasks.assets.massage_assets': {'queue':'computation'},
'app.tasks.assets.save_assets': {'queue':'database_writes'},
}
)
приложение/задачи/assets.py:
from __future__ import absolute_import
from celery import current_app
@current_app.task(name='app.tasks.assets.list_assets')
def list_assets(*args, **kwargs):
print "list assets"
@current_app.task(name='app.tasks.assets.massage_assets')
def massage_assets(assets):
print "massaging assets"
@current_app.task(name='app.tasks.assets.save_assets', ignore_result=True)
def save_assets(assets):
print "saving assets..."
Эти ошибки возникают только в очередях "сельдерей" (которым я не пользуюсь) и "local_testing".
Соответствующие очереди для всех этих задач распечатываются и работают как положено, но каким-то образом очереди с именами "celery" и "local_testing" заполняются (одинаковый размер очереди) и не выдают ничего, кроме этой трассировки. снова и снова.
Вот как я называю задачи...
приложение/процессы/processes.py:
from celery import group
class Process(object):
def run_process(self, resource_generator, chain_signature):
tasks = []
for resources in resource_generator:
tasks.append(chain_signature(resources))
group(tasks)()
приложение/процессы/assets.py:
from __future__ import absolute_import
from app.processes.processes import Process
from app.indexes.asset import AssetIndex
from app.tasks.assets import *
class AssetProcess(Process):
def run(self):
Process.run_process(self,
resource_generator=AssetIndex.asset_generator(),
chain_signature=(
list_assets.s() |
massage_assets.s() |
save_assets.s()))
Опять же, очередь по умолчанию установлена на «local_testing», поэтому я не уверен, как что-то передается в очередь «celery». Трассировка, которую я получаю, также довольно бесполезна.
Я запускаю celery worker (с очередью «celery» или с очередью local_testing (-Q local_testing)) из каталога выше app/, например так:
celery -A app.celery_app worker -l info -n worker3.%h
Любая помощь приветствуется.
Ваше здоровье!