В этой серии блогов мы рассмотрим операционализацию модели машинного обучения с использованием бессерверной инфраструктуры AWS. Цель состоит в том, чтобы построить конвейер машинного обучения, который обучает и настраивает модель перед созданием конечной точки модели, которую можно использовать для получения прогнозов.

В рамках этой серии основное внимание уделяется максимально возможной автоматизации, чтобы уменьшить количество человеческих ошибок и предоставить эффективный способ операционализации модели машинного обучения с использованием AWS. Хотя мы рассмотрим конвейер в AWS, аналогичный конвейер можно построить на любой облачной платформе (включая платформу Google Cloud, Azure и т. д.), поскольку они имеют очень похожие сервисы.

Для демонстрационных целей мы будем использовать следующие сервисы в A.W.S:

  1. S3
  2. Соцсети
  3. SQS
  4. Лямбда-функции
  5. Обучение мудрецов
  6. Оптимизация гиперпараметров Sagemaker
  7. Конечная точка Sagemaker
  8. Шлюз API Amazon
  9. ИАМ

Кроме того, мы будем использовать Terraform для создания инфраструктуры в виде кода. Это помогает снизить вероятность ошибок при перемещении модели, скажем, от разработки к производству.

Мы рассмотрим назначение каждой услуги по мере продвижения в блоге, чтобы не чувствовать себя перегруженными обилием сухой информации.

Схема архитектуры

Мы будем работать над следующей архитектурой:

Без дальнейших церемоний, давайте начнем.

Инфраструктура

Личное примечание. Как специалист по данным, одной из областей, о которых я никогда не беспокоился, была инфраструктура. Мой опыт был в основном связан с ноутбуками Jupyter, что было хорошо, однако у меня был недостаток, заключающийся в том, что я не мог ввести в действие созданную мной модель. Вскоре я начал понимать важность наличия эффективной инфраструктуры, которая не только помогла мне понять мир ML Ops (операций машинного обучения), но и сделала меня лучшим специалистом по данным, помогая мне избегать нереалистичных моделей, которые казались справедливыми в ноутбуках Jupyter. .

Итак, давайте начнем с создания инфраструктуры, необходимой для конвейера. Мы будем использовать Terraform — программный инструмент инфраструктура как код (IaaC) с открытым исходным кодом.

Почему инфраструктура — это код?

Справедливо задаться вопросом, зачем использовать инфраструктуру как код, когда мы могли бы использовать сервисы AWS с помощью консоли? Причина проста,

  • Во-первых, допустим, мы разрабатываем конвейер в среде разработки. Когда конвейер нас устраивает, мы перемещаем конвейер в тестовую, а затем в производственную среду. Представьте, что мы используем сотни сервисов, тогда нам придется заново создавать все эти сервисы вручную сначала в тестовой среде, а затем в среде разработки. Это утомительная и трудоемкая задача.
  • Во-вторых, с сотнями сервисов, которые необходимо создавать несколько раз, существует большая вероятность ошибок, которые могут возникнуть, и их будет очень сложно отладить, если делать это вручную.
  • Наконец, если мы создаем сервисы вручную, мы не можем использовать методы контроля версий, например, с помощью Git. Инфраструктура как код (Iaac) весьма полезна в этом аспекте.

Чтобы узнать о том, почему Iaac, нажмите здесь.

Примечание. Мы используем terraform для реализации инфраструктуры, однако это не единственный инструмент на рынке. Поскольку мы используем AWS, мы могли бы использовать AWS Cloudformation — еще один сервис IaaC от AWS. Мне лично нравится использовать terraform из-за его возможностей интеграции с несколькими облаками.

Начнем строить инфраструктуру.

Основываясь на приведенной выше архитектуре, нам нужно создать следующее

  1. Корзина S3, куда могут попасть данные
  2. Раздел SNS, в котором корзина s3 отправляет уведомление о событии каждый раз, когда файл попадает в s3
  3. Очередь SQS, которая получает сообщение от SNS и отправляет событие в функцию Lambda.

Для простоты блога мы просто рассмотрим первую комбинацию S3, SNS, SQS и лямбда-функции, которая отвечает за преобразование данных, когда данные попадают в корзину S3. Другие комбинации для вызова модели и создания конечной точки модели могут применяться аналогичным образом.

Во-первых, мы загружаем поставщика terraform, который используется для взаимодействия с облачным сервисом. Поскольку мы взаимодействуем с AWS, мы можем загрузить его, используя следующий код.

######################################################################
# Terraform Provider
######################################################################
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "4.5.0"
}
}
}
provider "aws" {
region = "ap-southeast-2"
shared_credentials_files = ["/Users/vachananand/.aws/credentials"]
profile = "VachanAnand"
}
view raw main.tf hosted with ❤ by GitHub

Далее нам нужно создать файл, содержащий все переменные, которые мы могли бы использовать в файле terraform для легкого доступа. Переменные имеют 3 поля

  1. type : тип данных переменной.
  2. default : значение переменной по умолчанию, которое будет использоваться в конфигурации terraform.
  3. описание : описание переменной.
######################################################################
# TAGS
######################################################################
variable environment {
type = string
default = "dev"
}
variable creator {
type = string
default = "vachan"
}
variable type {
type = string
default = "ml-pipeline"
}
######################################################################
# LANDING VARIABLES
######################################################################
variable s3_bucket_name {
type = string
default = "ml-pipeline"
description = "S3 bucket name for machine learning pipeline demo"
}
variable sns_landing_name {
type = string
default = "sns-landing"
description = "Name of SNS used when data lands in s3"
}
variable sqs_landing_name {
type = string
default = "sqs-landing"
description = "Name of SQS used when data lands in s3"
}
variable sqs_dlq_landing_name {
type = string
default = "sqs-dlq-landing"
description = "Name of SQS dead letter queue used when data lands in s3"
}
variable lambda_landing_name {
type = string
default = "lambda-landing"
}
variable iam_lambda_landing_name {
type = string
default = "iam-lambda-landing"
}
variable iam_policy_lambda_landing_name {
type = string
default = "iam-policy-lambda-landing"
}
variable iam_policy_lambda_landing_logs_name {
type = string
default = "iam-policy-lambda-landing-logs"
}
view raw variables.tf hosted with ❤ by GitHub

Затем мы добавляем еще один файл с именем terraform.tfvars, который тесно связан с упомянутым выше файлом переменных. Цель файла — заменить значения всех переменных в одном месте, которые можно использовать во всем файле конфигурации.

######################################################################
# TAGS
######################################################################
environment="dev"
creator="Vachan-terraform"
type="ml-pipeline"
######################################################################
# LANDING
######################################################################
s3_bucket_name="ml-pipeline-demo"
sqs_landing_name="sqs-landing-demo"
sqs_dlq_landing_name="sqs-dlq-landing-demo"
sns_landing_name="sns-landing-demo"
lambda_landing_name="lambda-landing-demo"
iam_lambda_landing_name="iam-lambda-landing-demo"
iam_policy_lambda_landing_name="iam-policy-lambda-landing-demo"
iam_policy_lambda_landing_logs_name="iam-policy-lambda-landing-logs-demo"

Теперь мы, наконец, начинаем создавать инфраструктуру.

Шаг 1. Создайте сегмент S3.

######################################################################
# S3
######################################################################
resource "aws_s3_bucket" "s3_ml_pipeline" {
bucket = var.s3_bucket_name
tags = {
environment = var.environment
creator = var.creator
type = var.type
}
}
resource "aws_s3_bucket_notification" "s3_landing_notification" {
bucket = aws_s3_bucket.s3_ml_pipeline.id
topic {
topic_arn = aws_sns_topic.sns_landing_topic.arn
events = ["s3:ObjectCreated:*"]
}
}
view raw s3.tf hosted with ❤ by GitHub

Приведенный выше код создает 2 вещи. Во-первых, ведро S3, куда будут попадать наши данные для обучения модели, а во-вторых, уведомление s3, которое позволяет нашему ведру s3 отправлять уведомление о событии в тему SNS.

Шаг 2. Создайте тему SNS

data "aws_iam_policy_document" "sns_policy_landing" {
policy_id = "__default_policy_ID"
statement {
actions = [
"SNS:Subscribe",
"SNS:SetTopicAttributes",
"SNS:RemovePermission",
"SNS:Receive",
"SNS:Publish",
"SNS:ListSubscriptionsByTopic",
"SNS:GetTopicAttributes",
"SNS:DeleteTopic",
"SNS:AddPermission"
]
effect = "Allow"
principals {
type = "AWS"
identifiers = ["*"]
}
resources = [ aws_sns_topic.sns_landing_topic.arn ]
condition {
test = "ArnLike"
variable = "aws:SourceArn"
values = [ aws_s3_bucket.s3_ml_pipeline.arn ]
}
sid = "__default_statement_ID"
}
}
######################################################################
# SNS
######################################################################
resource "aws_sns_topic" "sns_landing_topic" {
name = var.sns_landing_name
tags = {
environment = var.environment
creator = var.creator
type = var.type
}
}
resource "aws_sns_topic_policy" "sns_landing_topic_policy" {
arn = aws_sns_topic.sns_landing_topic.arn
policy = data.aws_iam_policy_document.sns_policy_landing.json
}
resource "aws_sns_topic_subscription" "user_updates_sqs_target" {
topic_arn = aws_sns_topic.sns_landing_topic.arn
protocol = "sqs"
endpoint = aws_sqs_queue.sqs_landing.arn
}
view raw sns.tf hosted with ❤ by GitHub

В приведенном выше коде мы делаем следующее:

  1. Создайте тему SNS для нашего модельного пайплайна
  2. Создайте политику раздела, которая позволяет S3 публиковать уведомление о событии в SNS.
  3. Создайте подписку SNS, которая отправляет сообщение из SNS в SQS.

Шаг 3. Создайте очередь SQS

######################################################################
# SQS LANDING
######################################################################
resource "aws_sqs_queue" "sqs_landing" {
name = var.sqs_landing_name
redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.sqs_dlq_landing.arn
maxReceiveCount = 5
})
tags = {
environment = var.environment
creator = var.creator
type = var.type
}
}
resource "aws_sqs_queue" "sqs_dlq_landing" {
name = var.sqs_dlq_landing_name
tags = {
environment = var.environment
creator = var.creator
type = var.type
}
}
resource "aws_sqs_queue_policy" "sqs_landing_policy" {
queue_url = aws_sqs_queue.sqs_landing.id
policy = jsonencode({
Version = "2012-10-17",
Id = "sqspolicy",
Statement = [
{
Sid = "First",
Effect = "Allow",
Principal = "*",
Action = "sqs:SendMessage",
Resource = "${aws_sqs_queue.sqs_landing.arn}",
Condition = {
ArnEquals = {
"aws:SourceArn" = "${aws_sns_topic.sns_landing_topic.arn}"
}
}
}
]
})
}
view raw sqs.tf hosted with ❤ by GitHub

Приведенный выше код делает следующее:

  1. Создайте очередь SQS для получения сообщения от SNS.
  2. Создайте очередь недоставленных сообщений, чтобы переместить в нее сообщения, которые не могут быть обработаны, для последующего изучения.
  3. Прикрепите политику к SQS, чтобы он мог получать сообщения из темы SNS.

Шаг 4. Создайте лямбда-функцию

data "archive_file" "lambda_zip"{
type = "zip"
source_file = "${path.module}/lambda/landing/main.py"
output_path = "${path.module}/lambda/landing/main.zip"
}
######################################################################
# LAMBDA FUNCTIONS
######################################################################
resource "aws_lambda_function" "lambda_landing" {
function_name = var.lambda_landing_name
filename = "${path.module}/lambda/landing/main.zip"
role = aws_iam_role.iam_lambda_landing.arn
handler = "main.lambda_handler"
runtime = "python3.8"
tags = {
environment = var.environment
creator = var.creator
type = var.type
}
}
resource "aws_lambda_event_source_mapping" "lambda_event_mapping" {
event_source_arn = aws_sqs_queue.sqs_landing.arn
function_name = aws_lambda_function.lambda_landing.arn
}
view raw lambda.tf hosted with ❤ by GitHub

Приведенный выше код выполняет следующие задачи

  1. Создайте лямбда-функцию для выполнения преобразований данных
  2. Создайте сопоставление между SQS и функцией для получения сообщения.

Шаг 5. Создайте роль и политику IAM

######################################################################
# IAM
######################################################################
resource "aws_iam_role" "iam_lambda_landing" {
name = var.iam_lambda_landing_name
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Action = "sts:AssumeRole",
Principal = {
Service = "lambda.amazonaws.com"
},
Effect = "Allow",
Sid = ""
}
]
})
tags = {
environment = var.environment
creator = var.creator
type = var.type
}
}
resource "aws_iam_role_policy" "iam_policy_lambda_landing" {
name = var.iam_policy_lambda_landing_name
role = aws_iam_role.iam_lambda_landing.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = [
"sqs:ChangeMessageVisibility",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage",
],
Effect = "Allow"
Resource = "*"
},
]
})
}
resource "aws_iam_role_policy" "iam_policy_lambda_landing_logs" {
name = var.iam_policy_lambda_landing_logs_name
role = aws_iam_role.iam_lambda_landing.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
Effect = "Allow"
Resource = "*"
},
]
})
}
view raw iam.tf hosted with ❤ by GitHub

Блок кода выше делает следующее

  1. Создайте роль IAM для лямбда-функции
  2. Прикрепите политику SQS к роли, чтобы управлять сообщениями от SQS.
  3. Прикрепите политику SQS к роли для управления журналами.

Репозиторий кода:



Использованная литература: