В этой серии блогов мы рассмотрим операционализацию модели машинного обучения с использованием бессерверной инфраструктуры AWS. Цель состоит в том, чтобы построить конвейер машинного обучения, который обучает и настраивает модель перед созданием конечной точки модели, которую можно использовать для получения прогнозов.
В рамках этой серии основное внимание уделяется максимально возможной автоматизации, чтобы уменьшить количество человеческих ошибок и предоставить эффективный способ операционализации модели машинного обучения с использованием AWS. Хотя мы рассмотрим конвейер в AWS, аналогичный конвейер можно построить на любой облачной платформе (включая платформу Google Cloud, Azure и т. д.), поскольку они имеют очень похожие сервисы.
Для демонстрационных целей мы будем использовать следующие сервисы в A.W.S:
- S3
- Соцсети
- SQS
- Лямбда-функции
- Обучение мудрецов
- Оптимизация гиперпараметров Sagemaker
- Конечная точка Sagemaker
- Шлюз API Amazon
- ИАМ
Кроме того, мы будем использовать Terraform для создания инфраструктуры в виде кода. Это помогает снизить вероятность ошибок при перемещении модели, скажем, от разработки к производству.
Мы рассмотрим назначение каждой услуги по мере продвижения в блоге, чтобы не чувствовать себя перегруженными обилием сухой информации.
Схема архитектуры
Мы будем работать над следующей архитектурой:
Без дальнейших церемоний, давайте начнем.
Инфраструктура
Личное примечание. Как специалист по данным, одной из областей, о которых я никогда не беспокоился, была инфраструктура. Мой опыт был в основном связан с ноутбуками Jupyter, что было хорошо, однако у меня был недостаток, заключающийся в том, что я не мог ввести в действие созданную мной модель. Вскоре я начал понимать важность наличия эффективной инфраструктуры, которая не только помогла мне понять мир ML Ops (операций машинного обучения), но и сделала меня лучшим специалистом по данным, помогая мне избегать нереалистичных моделей, которые казались справедливыми в ноутбуках Jupyter. .
Итак, давайте начнем с создания инфраструктуры, необходимой для конвейера. Мы будем использовать Terraform — программный инструмент инфраструктура как код (IaaC) с открытым исходным кодом.
Почему инфраструктура — это код?
Справедливо задаться вопросом, зачем использовать инфраструктуру как код, когда мы могли бы использовать сервисы AWS с помощью консоли? Причина проста,
- Во-первых, допустим, мы разрабатываем конвейер в среде разработки. Когда конвейер нас устраивает, мы перемещаем конвейер в тестовую, а затем в производственную среду. Представьте, что мы используем сотни сервисов, тогда нам придется заново создавать все эти сервисы вручную сначала в тестовой среде, а затем в среде разработки. Это утомительная и трудоемкая задача.
- Во-вторых, с сотнями сервисов, которые необходимо создавать несколько раз, существует большая вероятность ошибок, которые могут возникнуть, и их будет очень сложно отладить, если делать это вручную.
- Наконец, если мы создаем сервисы вручную, мы не можем использовать методы контроля версий, например, с помощью Git. Инфраструктура как код (Iaac) весьма полезна в этом аспекте.
Чтобы узнать о том, почему Iaac, нажмите здесь.
Примечание. Мы используем terraform для реализации инфраструктуры, однако это не единственный инструмент на рынке. Поскольку мы используем AWS, мы могли бы использовать AWS Cloudformation — еще один сервис IaaC от AWS. Мне лично нравится использовать terraform из-за его возможностей интеграции с несколькими облаками.
Начнем строить инфраструктуру.
Основываясь на приведенной выше архитектуре, нам нужно создать следующее
- Корзина S3, куда могут попасть данные
- Раздел SNS, в котором корзина s3 отправляет уведомление о событии каждый раз, когда файл попадает в s3
- Очередь SQS, которая получает сообщение от SNS и отправляет событие в функцию Lambda.
Для простоты блога мы просто рассмотрим первую комбинацию S3, SNS, SQS и лямбда-функции, которая отвечает за преобразование данных, когда данные попадают в корзину S3. Другие комбинации для вызова модели и создания конечной точки модели могут применяться аналогичным образом.
Во-первых, мы загружаем поставщика terraform, который используется для взаимодействия с облачным сервисом. Поскольку мы взаимодействуем с AWS, мы можем загрузить его, используя следующий код.
###################################################################### | |
# Terraform Provider | |
###################################################################### | |
terraform { | |
required_providers { | |
aws = { | |
source = "hashicorp/aws" | |
version = "4.5.0" | |
} | |
} | |
} | |
provider "aws" { | |
region = "ap-southeast-2" | |
shared_credentials_files = ["/Users/vachananand/.aws/credentials"] | |
profile = "VachanAnand" | |
} |
Далее нам нужно создать файл, содержащий все переменные, которые мы могли бы использовать в файле terraform для легкого доступа. Переменные имеют 3 поля
- type : тип данных переменной.
- default : значение переменной по умолчанию, которое будет использоваться в конфигурации terraform.
- описание : описание переменной.
###################################################################### | |
# TAGS | |
###################################################################### | |
variable environment { | |
type = string | |
default = "dev" | |
} | |
variable creator { | |
type = string | |
default = "vachan" | |
} | |
variable type { | |
type = string | |
default = "ml-pipeline" | |
} | |
###################################################################### | |
# LANDING VARIABLES | |
###################################################################### | |
variable s3_bucket_name { | |
type = string | |
default = "ml-pipeline" | |
description = "S3 bucket name for machine learning pipeline demo" | |
} | |
variable sns_landing_name { | |
type = string | |
default = "sns-landing" | |
description = "Name of SNS used when data lands in s3" | |
} | |
variable sqs_landing_name { | |
type = string | |
default = "sqs-landing" | |
description = "Name of SQS used when data lands in s3" | |
} | |
variable sqs_dlq_landing_name { | |
type = string | |
default = "sqs-dlq-landing" | |
description = "Name of SQS dead letter queue used when data lands in s3" | |
} | |
variable lambda_landing_name { | |
type = string | |
default = "lambda-landing" | |
} | |
variable iam_lambda_landing_name { | |
type = string | |
default = "iam-lambda-landing" | |
} | |
variable iam_policy_lambda_landing_name { | |
type = string | |
default = "iam-policy-lambda-landing" | |
} | |
variable iam_policy_lambda_landing_logs_name { | |
type = string | |
default = "iam-policy-lambda-landing-logs" | |
} |
Затем мы добавляем еще один файл с именем terraform.tfvars, который тесно связан с упомянутым выше файлом переменных. Цель файла — заменить значения всех переменных в одном месте, которые можно использовать во всем файле конфигурации.
###################################################################### | |
# TAGS | |
###################################################################### | |
environment="dev" | |
creator="Vachan-terraform" | |
type="ml-pipeline" | |
###################################################################### | |
# LANDING | |
###################################################################### | |
s3_bucket_name="ml-pipeline-demo" | |
sqs_landing_name="sqs-landing-demo" | |
sqs_dlq_landing_name="sqs-dlq-landing-demo" | |
sns_landing_name="sns-landing-demo" | |
lambda_landing_name="lambda-landing-demo" | |
iam_lambda_landing_name="iam-lambda-landing-demo" | |
iam_policy_lambda_landing_name="iam-policy-lambda-landing-demo" | |
iam_policy_lambda_landing_logs_name="iam-policy-lambda-landing-logs-demo" |
Теперь мы, наконец, начинаем создавать инфраструктуру.
Шаг 1. Создайте сегмент S3.
###################################################################### | |
# S3 | |
###################################################################### | |
resource "aws_s3_bucket" "s3_ml_pipeline" { | |
bucket = var.s3_bucket_name | |
tags = { | |
environment = var.environment | |
creator = var.creator | |
type = var.type | |
} | |
} | |
resource "aws_s3_bucket_notification" "s3_landing_notification" { | |
bucket = aws_s3_bucket.s3_ml_pipeline.id | |
topic { | |
topic_arn = aws_sns_topic.sns_landing_topic.arn | |
events = ["s3:ObjectCreated:*"] | |
} | |
} |
Приведенный выше код создает 2 вещи. Во-первых, ведро S3, куда будут попадать наши данные для обучения модели, а во-вторых, уведомление s3, которое позволяет нашему ведру s3 отправлять уведомление о событии в тему SNS.
Шаг 2. Создайте тему SNS
data "aws_iam_policy_document" "sns_policy_landing" { | |
policy_id = "__default_policy_ID" | |
statement { | |
actions = [ | |
"SNS:Subscribe", | |
"SNS:SetTopicAttributes", | |
"SNS:RemovePermission", | |
"SNS:Receive", | |
"SNS:Publish", | |
"SNS:ListSubscriptionsByTopic", | |
"SNS:GetTopicAttributes", | |
"SNS:DeleteTopic", | |
"SNS:AddPermission" | |
] | |
effect = "Allow" | |
principals { | |
type = "AWS" | |
identifiers = ["*"] | |
} | |
resources = [ aws_sns_topic.sns_landing_topic.arn ] | |
condition { | |
test = "ArnLike" | |
variable = "aws:SourceArn" | |
values = [ aws_s3_bucket.s3_ml_pipeline.arn ] | |
} | |
sid = "__default_statement_ID" | |
} | |
} | |
###################################################################### | |
# SNS | |
###################################################################### | |
resource "aws_sns_topic" "sns_landing_topic" { | |
name = var.sns_landing_name | |
tags = { | |
environment = var.environment | |
creator = var.creator | |
type = var.type | |
} | |
} | |
resource "aws_sns_topic_policy" "sns_landing_topic_policy" { | |
arn = aws_sns_topic.sns_landing_topic.arn | |
policy = data.aws_iam_policy_document.sns_policy_landing.json | |
} | |
resource "aws_sns_topic_subscription" "user_updates_sqs_target" { | |
topic_arn = aws_sns_topic.sns_landing_topic.arn | |
protocol = "sqs" | |
endpoint = aws_sqs_queue.sqs_landing.arn | |
} |
В приведенном выше коде мы делаем следующее:
- Создайте тему SNS для нашего модельного пайплайна
- Создайте политику раздела, которая позволяет S3 публиковать уведомление о событии в SNS.
- Создайте подписку SNS, которая отправляет сообщение из SNS в SQS.
Шаг 3. Создайте очередь SQS
###################################################################### | |
# SQS LANDING | |
###################################################################### | |
resource "aws_sqs_queue" "sqs_landing" { | |
name = var.sqs_landing_name | |
redrive_policy = jsonencode({ | |
deadLetterTargetArn = aws_sqs_queue.sqs_dlq_landing.arn | |
maxReceiveCount = 5 | |
}) | |
tags = { | |
environment = var.environment | |
creator = var.creator | |
type = var.type | |
} | |
} | |
resource "aws_sqs_queue" "sqs_dlq_landing" { | |
name = var.sqs_dlq_landing_name | |
tags = { | |
environment = var.environment | |
creator = var.creator | |
type = var.type | |
} | |
} | |
resource "aws_sqs_queue_policy" "sqs_landing_policy" { | |
queue_url = aws_sqs_queue.sqs_landing.id | |
policy = jsonencode({ | |
Version = "2012-10-17", | |
Id = "sqspolicy", | |
Statement = [ | |
{ | |
Sid = "First", | |
Effect = "Allow", | |
Principal = "*", | |
Action = "sqs:SendMessage", | |
Resource = "${aws_sqs_queue.sqs_landing.arn}", | |
Condition = { | |
ArnEquals = { | |
"aws:SourceArn" = "${aws_sns_topic.sns_landing_topic.arn}" | |
} | |
} | |
} | |
] | |
}) | |
} |
Приведенный выше код делает следующее:
- Создайте очередь SQS для получения сообщения от SNS.
- Создайте очередь недоставленных сообщений, чтобы переместить в нее сообщения, которые не могут быть обработаны, для последующего изучения.
- Прикрепите политику к SQS, чтобы он мог получать сообщения из темы SNS.
Шаг 4. Создайте лямбда-функцию
data "archive_file" "lambda_zip"{ | |
type = "zip" | |
source_file = "${path.module}/lambda/landing/main.py" | |
output_path = "${path.module}/lambda/landing/main.zip" | |
} | |
###################################################################### | |
# LAMBDA FUNCTIONS | |
###################################################################### | |
resource "aws_lambda_function" "lambda_landing" { | |
function_name = var.lambda_landing_name | |
filename = "${path.module}/lambda/landing/main.zip" | |
role = aws_iam_role.iam_lambda_landing.arn | |
handler = "main.lambda_handler" | |
runtime = "python3.8" | |
tags = { | |
environment = var.environment | |
creator = var.creator | |
type = var.type | |
} | |
} | |
resource "aws_lambda_event_source_mapping" "lambda_event_mapping" { | |
event_source_arn = aws_sqs_queue.sqs_landing.arn | |
function_name = aws_lambda_function.lambda_landing.arn | |
} |
Приведенный выше код выполняет следующие задачи
- Создайте лямбда-функцию для выполнения преобразований данных
- Создайте сопоставление между SQS и функцией для получения сообщения.
Шаг 5. Создайте роль и политику IAM
###################################################################### | |
# IAM | |
###################################################################### | |
resource "aws_iam_role" "iam_lambda_landing" { | |
name = var.iam_lambda_landing_name | |
assume_role_policy = jsonencode({ | |
Version = "2012-10-17", | |
Statement = [ | |
{ | |
Action = "sts:AssumeRole", | |
Principal = { | |
Service = "lambda.amazonaws.com" | |
}, | |
Effect = "Allow", | |
Sid = "" | |
} | |
] | |
}) | |
tags = { | |
environment = var.environment | |
creator = var.creator | |
type = var.type | |
} | |
} | |
resource "aws_iam_role_policy" "iam_policy_lambda_landing" { | |
name = var.iam_policy_lambda_landing_name | |
role = aws_iam_role.iam_lambda_landing.id | |
policy = jsonencode({ | |
Version = "2012-10-17" | |
Statement = [ | |
{ | |
Action = [ | |
"sqs:ChangeMessageVisibility", | |
"sqs:DeleteMessage", | |
"sqs:GetQueueAttributes", | |
"sqs:ReceiveMessage", | |
], | |
Effect = "Allow" | |
Resource = "*" | |
}, | |
] | |
}) | |
} | |
resource "aws_iam_role_policy" "iam_policy_lambda_landing_logs" { | |
name = var.iam_policy_lambda_landing_logs_name | |
role = aws_iam_role.iam_lambda_landing.id | |
policy = jsonencode({ | |
Version = "2012-10-17" | |
Statement = [ | |
{ | |
Action = [ | |
"logs:CreateLogGroup", | |
"logs:CreateLogStream", | |
"logs:PutLogEvents" | |
], | |
Effect = "Allow" | |
Resource = "*" | |
}, | |
] | |
}) | |
} | |
Блок кода выше делает следующее
- Создайте роль IAM для лямбда-функции
- Прикрепите политику SQS к роли, чтобы управлять сообщениями от SQS.
- Прикрепите политику SQS к роли для управления журналами.