Как использовать функции, запускаемые по очереди, и почему

Пошаговое руководство о том, как эффективно обрабатывать несколько элементов

Функции Azure, служба, позволяющая запускать бессерверный код, бывают разных форм: они могут запускаться по протоколу HTTP - вы можете вызывать их напрямую для выполнения некоторого кода; blob-triggered - они запускаются, когда новый большой двоичный объект загружается в хранилище, или, среди прочего, запускаются по очереди, что мы и собираемся здесь обсудить. Если вы не знакомы с функциями Azure, сначала прочтите это, а если вы хотите перейти к реализации функции Azure Python, обратитесь к моей последней статье.

Функция, запускаемая с помощью очереди, выполняет код, когда новое сообщение поступает в очередь, структуру данных, из которой вы можете извлекать элементы в том порядке, в котором они поступили (мы называем этот принцип упорядочения FIFO: первым вошел, первым out). Но зачем вам помещать сообщения в очередь?

Что ж, вы обычно делаете это, когда у вас есть несколько элементов для обработки, когда эти элементы могут обрабатываться независимо и когда обработка их всех одновременно может занять слишком много времени.

Функции Azure в плане потребления имеют ограниченный срок службы: они представляют собой небольшие фрагменты кода, которые вы можете вызывать в любое время. Они могут работать не более 10 минут, а если они запускаются по протоколу HTTP, вы получите ответ в течение 230 секунд. Если код, который вы выполняете, занимает слишком много времени для выполнения, функция будет отключена по тайм-ауту.

Поэтому, когда вам нужно повторить одну и ту же задачу для нескольких разных элементов, вы можете использовать очереди для независимого выполнения этой задачи для каждого сообщения очереди. Функции Azure также являются масштабируемыми: если вы имеете дело с несколькими сообщениями в очереди, Azure будет запускать несколько экземпляров приложения-функции, чтобы различные сообщения могли обрабатываться параллельно.

Подводя итог, вам следует подумать об использовании функций, запускаемых с помощью очереди, когда вы сталкиваетесь с этим типом использования:

  • Есть несколько предметов для обработки
  • Обработка каждого элемента занимает время, а обработка всех элементов занимает более 230 секунд.
  • Обработка каждого элемента требует одного и того же кода и может выполняться независимо
  • Обработка каждого элемента занимает менее 10 минут (если вы хотите использовать план потребления).

В этом руководстве мы увидим, как реализовать следующий сценарий:

Мы собираемся запустить весь конвейер обработки с помощью функции, запускаемой HTTP, которую вы можете вызывать в любое время и из любого места. Единственная цель этой функции - поместить сообщения в очередь, и на каждый элемент будет приходиться обрабатывать одно сообщение. Затем каждый элемент будет подбираться и обрабатываться функцией, запускаемой очередью, и мы будем отслеживать статус каждого элемента в таблице.

В качестве примера мы возьмем список текстовых документов из хранилища BLOB-объектов в качестве входных данных и обработаем эти документы, чтобы перевернуть текст перед сохранением обновленной версии.

Цель этого руководства - сосредоточить внимание на функциях очереди, а не на самой задаче обработки, поэтому не стесняйтесь заменять ее любым вариантом использования. Я просто подумал, что было бы полезно привести пример с извлечением данных из хранилища BLOB-объектов, потому что это довольно распространенный вариант использования.

Настраивать

Первое, что вам понадобится - это неудивительно - учетная запись Azure. Если у вас его нет, вы можете создать его бесплатно здесь.

Затем вы хотите создать учетную запись хранения, в которой будут размещаться ваша очередь, таблица и файлы.

На портале Azure создайте новую учетную запись хранения с настройками по умолчанию, затем создайте в этой учетной записи хранения новый контейнер testdocs, новую очередь processing и новую таблицу статус.

Мы собираемся хранить некоторые файлы для обработки в контейнере. Вы можете использовать эти здесь или другие файлы, которые вы создали сами.

Разработка

1. Создайте приложение-функцию.

В этом руководстве я покажу образцы кода Python, но вы можете выбрать язык по своему усмотрению, поскольку процесс остается неизменным.

Во-первых, убедитесь, что у вас есть подходящая среда для работы с функциями Azure. Вы можете обратиться к моему последнему сообщению о функциях Azure, если вам понадобится руководство.

Чтобы создать приложение-функцию, выполните следующую команду:

func init

Затем создайте 2 новые функции: одну запускаемую HTTP и одну запускаемую очередь.

Для этого запустите

func new 

и когда вы это видите:

Сначала выберите триггер HTTP и выберите имя - я выбрал «TriggerProcessing».

Затем повторите, но на этот раз выберите Триггер очереди (также называемый «Триггер хранилища очереди Azure») - я назвал свой «ProcessDoc».

2. Обновите настройки

Теперь давайте откроем local.settings.json и добавим параметр QueueConnectionString со строкой подключения к вашей учетной записи хранения. Вы можете найти свою строку подключения на портале, перейдя на панель ключей доступа:

Это позволит нам установить связь между очередью в нашей учетной записи хранения и нашей функцией очереди.

Мы также хотим добавить переменную STORAGE_CONTAINER_NAME, которая будет указывать имя контейнера, в который мы загрузили наши файлы, и переменную STORAGE_TABLE_NAME, содержащую имя таблицы, которую мы используйте для отслеживания статуса.

Ваш файл local.settings.json теперь должен выглядеть так:

3. Привяжите функции к очереди.

Теперь, когда мы добавили строку подключения в наши настройки, мы можем обновить привязки функций, чтобы связать наши 2 функции с очередью.

Если вы используете python или node, сделайте следующее:

Откройте файл function.json в папке ProcessDoc (функция, запускаемая по очереди), обновите значение соединения в привязках с помощью «QueueConnectionString», а имя очереди с «обработка ”

Также откройте файл function.json для TriggerProcessing (HTTP-триггерная функция) и обновите привязки, чтобы добавить выходное соединение в очередь:

Нам также необходимо обновить основной метод функции TriggerProcessing, чтобы включить это сообщение:

def main(req: func.HttpRequest, msg: func.Out[func.QueueMessage]) -> func.HttpResponse:

Если вы работаете в .NET, процесс немного отличается: вам нужно обновлять привязки непосредственно в файлах функций.

Для ProcessDoc вы можете обновить имя по умолчанию и строку подключения, а для TriggerProcessing вы можете добавить эту строку:

[return: Queue(“processing”, Connection = “QueueConnectionString”)]

Под названием функции.

Теперь у вас должно получиться что-то вроде этого:

Отлично, теперь две наши функции готовы отправлять и получать сообщения из нашей очереди обработки.

Вы можете проверить, все ли на месте, запустив приложение-функцию. Для этого запустите

func host start

из корневой папки. Вы также можете протестировать конечную точку, запускаемую HTTP, с помощью такого инструмента, как Postman.

4. Записывать сообщения в очередь.

Откройте файл функции для TriggerProcessing (в папке TriggerProcessing, __init.py__ для python или index.js для node и TriggerProcessing.cs для .NET).

Там уже есть несколько строк кода: это образец кода для функции, запускаемой HTTP. Избавьтесь от этого и замените на это:

Здесь мы получаем documentList из тела запроса и помещаем каждый элемент списка в очередь. Мы можем использовать объект msg благодаря привязке, которую мы добавили ранее.

Вы можете поместить одно сообщение в очередь:

msg.set("Message")

Или добавляйте по несколько, передавая список:

msg.set(["Element1", "Element2", ...])

Поскольку documentList уже должен быть списком, мы можем передать его как есть.

Давайте попробуем отправить POST-запрос нашей функции (сначала убедитесь, что она запущена). Вы можете использовать Почтальон или любой другой инструмент, который вам нравится.

curl — location — request POST 'https://localhost:7071/api/TriggerProcessing' \
— header 'Content-Type: application/json' \
— data-raw '{
"documentList": ["sample_file1.txt", "sample_file2.txt", "sample_file3.txt", "sample_file4.txt", "sample_file5.txt"]
}'

Если вы посмотрите на терминал, в котором работает ваша функция, вы увидите кучу сообщений

Python queue trigger function processed a queue item: […]

Значит, уже работает!

Теперь осталось только обработать документы, а не просто зарегистрировать какое-то сообщение.

5. Обработка элементов очереди

ProcessDoc уже получает сообщения благодаря привязке, которую мы сделали на шаге 3. Если вы хотите быть уверены в этом, замените сообщение журнала следующим образом:

logging.info(f"Processing queue item: {msg.get_body().decode('utf-8')}")

Если вы снова запустите TriggerProcessing, вы увидите в консоли такие журналы:

Processing queue item: sample_file1.txt

Поскольку мы отправили список имен файлов в очередь с помощью TriggerProcessing, каждый элемент очереди представляет собой новое сообщение, содержащее имя файла. Мы можем получить доступ к содержанию сообщений с помощью

msg.get_body()

Теперь, когда мы знаем, какой файл мы хотим обработать, нам нужно сделать 3 вещи:

  1. Получение файла из хранилища
  2. Обработка файла (обращение текста)
  3. Сохранение измененной версии файла

Для этого нам нужно установить библиотеку для взаимодействия с хранилищем BLOB-объектов Azure: запустите

pip install azure-storage-blob

Чтобы наш код оставался чистым, мы собираемся написать код для выполнения этих задач в отдельных файлах. Я создаю папку utils на корневом уровне, содержащую 2 новых файла: storage_helpers.py и processing.py.

Вставьте это в storage_helpers.py

Это все, что нам понадобится для взаимодействия с большими двоичными объектами в хранилище Azure.

Теперь давайте обновим нашу функцию ProcessDoc, чтобы получить файл, соответствующий сообщению очереди, из хранилища. Замените текущий основной метод следующим:

file_name = msg.get_body().decode('utf-8')
logging.info(f"Processing queue item: {file_name}…")
# Getting settings
STORAGE_CONNECTION_STRING = os.getenv("QueueConnectionString")
CONTAINER_NAME = os.getenv("STORAGE_CONTAINER_NAME")
# Getting file from storage
file_path = storage_helpers.download_blob(CONTAINER_NAME, file_name, STORAGE_CONNECTION_STRING)

Запустите процесс снова - вы должны увидеть файлы в папке загрузок: это означает, что мы выполнили 1-й шаг, получив файлы из хранилища.

Взгляните также на журналы, вы увидите, что каждое сообщение обрабатывается независимо.

Теперь займемся вторым шагом - обработкой файлов.

В processing.py вставьте этот код:

Попробуйте запустить все это еще раз и просмотрите журналы, чтобы увидеть обработанный контент (перевернутый текст). Ура, работает!

Все, что осталось сделать, это сохранить обновленные файлы обратно в хранилище. У нас уже есть готовая функция upload_blob, поэтому давайте просто добавим ее в основную:

# Saving processed file to storage
if processed_doc != None:
    new_file_name = 'processed_' + file_name
    storage_helpers.upload_blob(CONTAINER_NAME, new_file_name,       processed_doc, STORAGE_CONNECTION_STRING)
    # Deleting local copy
    os.remove(file_path)
    logging.info(f'Done processing {file_name}.')

Это последний файл функции ProcessDoc (с обработкой ошибок):

Отлично, мы закончили. Вы можете убедиться, что все работает нормально, посмотрев на контейнер для хранения - там должны быть обновленные файлы. Вы можете использовать обозреватель хранилища на портале, чтобы загрузить новые файлы и убедиться, что они содержат то, что вы ожидали.

6. Обновите статус

Мы смогли проверить правильность работы функций, посмотрев на новые файлы, загруженные в контейнер для хранения. Однако это не идеально: мы предпочли бы иметь возможность сразу проверить, правильно ли были обработаны наши файлы. Кроме того, вы можете запустить обработку, она может дать сбой, и вы не обязательно заметите, были ли у вас уже файлы в контейнере, созданные во время предыдущего запуска.

Вот почему мы хотим использовать таблицу для отслеживания статуса файлов. Как только мы что-то сделаем, мы обновим статус текущего файла - таким образом, если что-то выйдет из строя, это будет отражено в этой таблице.

Давайте сначала добавим в storage_helpers.py несколько функций, связанных с таблицами:

Большой. Теперь нам просто нужно обновить статус файлов в разных точках основного метода:

Попробуйте - если все работает как надо, вы должны увидеть в таблице все файлы со статусом «Готово». Наш код работает так быстро, что трудно определить статус в состоянии «новый» или что-то еще, но в реальной жизни, если у вас была задача, выполнение которой занимало довольно много времени, вы могли бы следить за прогрессом из этой таблицы. .

Следующие шаги

Поздравляю! Вы узнали, как использовать функции очереди Azure и почему вам это нужно. Если вам нужно выполнить несколько независимых элементов, вы можете напрямую применить то, что узнали здесь. И в качестве бонуса теперь вы также знаете, как работать с хранилищем Azure для извлечения и загружать файлы, а также отслеживать события.

Если вам нужен совет о том, как опубликовать приложение-функцию, ознакомьтесь с последней частью моей последней статьи.

Не забудьте обновить настройки вашего размещенного приложения, включив в них переменные среды, которые вы определили локально.

Вы также можете найти образец кода для приложения-функции, созданного в этом руководстве, здесь.

Спасибо за прочтение!