WedX - журнал о программировании и компьютерных науках

Celery: получена незарегистрированная задача типа ‹AsyncResult: [хэш]›

Во всех подобных проблемах, которые я видел в stackOverflow:

в ошибке указано имя незарегистрированной задачи. У меня другая проблема. Имя задачи не отображается, а отображается Received unregistered task of type <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>, что приводит к KeyError: <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>.

Вот моя трассировка:

KeyError: <AsyncResult: 4aca05f8-14c6-4a25-988a-ff605a27871d>
[2016-06-15 14:11:46,016: ERROR/MainProcess] Received unregistered task of type <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?

The full contents of the message body was:
{'utc': True, 'chord': None, 'args': [], 'retries': 0, 'expires': None, 'task': <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>, 'callbacks': None, 'errbacks': None, 'timelimit': (None, None), 'taskset': 'a6e8d1c0-c75b-471e-b21f-af8492592aeb', 'kwargs': {}, 'eta': None, 'id': '0dffed5f-3090-417c-a9ec-c99e11bc9579'} (568b)
Traceback (most recent call last):
File "/Users/me/Developer/virtualenvironments/project_name/lib/python2.7/site-packages/celery/worker/consumer.py", line 456, in on_task_received
strategies[name](message, body,
KeyError: <AsyncResult: e8018fcb-cd15-4dca-ae6d-6eb906055f13>

Мое приложение celery включает файл, в котором у меня только 3 задачи:

приложение/celery_app.py:

celery_app = Celery('app',
         broker='amqp://ip',  # RabbitMQ
         backend='redis://ip', #Redis
         include=['app.tasks.assets'])

celery_app.conf.update(
CELERY_DEFAULT_QUEUE = 'local_testing',
CELERY_TASK_RESULT_EXPIRES=86400,  # 24 hours
CELERY_ROUTES={
    'app.tasks.assets.list_assets': {'queue': 'gatherAPI'},
    'app.tasks.assets.massage_assets': {'queue':'computation'},
    'app.tasks.assets.save_assets': {'queue':'database_writes'},
}

)

приложение/задачи/assets.py:

from __future__ import absolute_import
from celery import current_app

@current_app.task(name='app.tasks.assets.list_assets')
def list_assets(*args, **kwargs):
    print "list assets"

@current_app.task(name='app.tasks.assets.massage_assets')
def massage_assets(assets):
    print "massaging assets"

@current_app.task(name='app.tasks.assets.save_assets', ignore_result=True)
def save_assets(assets):
    print "saving assets..."

Эти ошибки возникают только в очередях "сельдерей" (которым я не пользуюсь) и "local_testing".

Соответствующие очереди для всех этих задач распечатываются и работают как положено, но каким-то образом очереди с именами "celery" и "local_testing" заполняются (одинаковый размер очереди) и не выдают ничего, кроме этой трассировки. снова и снова.

Вот как я называю задачи...

приложение/процессы/processes.py:

from celery import group
class Process(object):
    def run_process(self, resource_generator, chain_signature):
        tasks = []
        for resources in resource_generator:
            tasks.append(chain_signature(resources))
        group(tasks)()

приложение/процессы/assets.py:

from __future__ import absolute_import

from app.processes.processes import Process
from app.indexes.asset import AssetIndex
from app.tasks.assets import *

class AssetProcess(Process):
    def run(self):
        Process.run_process(self,
                            resource_generator=AssetIndex.asset_generator(),
                            chain_signature=(
                                list_assets.s() | 
                                massage_assets.s() | 
                                save_assets.s()))

Опять же, очередь по умолчанию установлена ​​​​на «local_testing», поэтому я не уверен, как что-то передается в очередь «celery». Трассировка, которую я получаю, также довольно бесполезна.

Я запускаю celery worker (с очередью «celery» или с очередью local_testing (-Q local_testing)) из каталога выше app/, например так:

celery -A app.celery_app worker -l info -n worker3.%h

Любая помощь приветствуется.

Ваше здоровье!


Ответы:


1

Я определил проблему, и это от использования группы.

При передаче подписи цепочки в качестве аргумента она автоматически применяется асинхронно. Используя группу, я группирую объект asyncResult, что не имеет никакого смысла. Я изменил исполнение таким образом:

def run_process(self, resource_generator, chain_signature):
    for resources in resource_generator:
        chain_signature(resources)

Это эффективно делает то, что я хотел в любом случае.

Ваше здоровье

16.06.2016
Новые материалы

Как проанализировать работу вашего классификатора?
Не всегда просто знать, какие показатели использовать С развитием глубокого обучения все больше и больше людей учатся обучать свой первый классификатор. Но как только вы закончите..

Работа с цепями Маркова, часть 4 (Машинное обучение)
Нелинейные цепи Маркова с агрегатором и их приложения (arXiv) Автор : Бар Лайт Аннотация: Изучаются свойства подкласса случайных процессов, называемых дискретными нелинейными цепями Маркова..

Crazy Laravel Livewire упростил мне создание электронной коммерции (панель администратора и API) [Часть 3]
Как вы сегодня, ребята? В этой части мы создадим CRUD для данных о продукте. Думаю, в этой части я не буду слишком много делиться теорией, но чаще буду делиться своим кодом. Потому что..

Использование машинного обучения и Python для классификации 1000 сезонов новичков MLB Hitter
Чему может научиться машина, глядя на сезоны новичков 1000 игроков MLB? Это то, что исследует это приложение. В этом процессе мы будем использовать неконтролируемое обучение, чтобы..

Учебные заметки: создание моего первого пакета Node.js
Это мои обучающие заметки, когда я научился создавать свой самый первый пакет Node.js, распространяемый через npm. Оглавление Глоссарий I. Новый пакет 1.1 советы по инициализации..

Забудьте о Matplotlib: улучшите визуализацию данных с помощью умопомрачительных функций Seaborn!
Примечание. Эта запись в блоге предполагает базовое знакомство с Python и концепциями анализа данных. Привет, энтузиасты данных! Добро пожаловать в мой блог, где я расскажу о невероятных..

ИИ в аэрокосмической отрасли
Каждый полет – это шаг вперед к великой мечте. Чтобы это происходило в их собственном темпе, необходима команда астронавтов для погони за космосом и команда технического обслуживания..


Для любых предложений по сайту: [email protected]