авторы Варун Сехри, Минакши Джиндал, Бурак Баджиоглу

Введение

В Netflix для наилучшего продвижения и рекомендации контента пользователям существует множество команд Media Algorithm, которые работают рука об руку с создателями и редакторами контента. Некоторые из этих алгоритмов направлены на улучшение различных ручных рабочих процессов, чтобы мы показывали пользователю персонализированное рекламное изображение, трейлер или шоу.

Эти алгоритмы машинного обучения, ориентированные на медиа, а также другие команды генерируют много данных из медиафайлов, которые мы описали в нашем предыдущем блоге, которые хранятся в виде аннотаций в Marken. Мы разработали уникальную концепцию под названием Операции с аннотациями, которая позволяет командам создавать конвейеры данных и легко писать аннотации, не беспокоясь о шаблонах доступа к своим данным из разных приложений.

Цели

Давайте возьмем пример использования идентификации объектов (например, деревьев, автомобилей и т. д.) в видеофайле. Как описано на картинке выше

  • При первом запуске алгоритм определил 500 объектов в конкретном видеофайле. Эти 500 объектов были сохранены как аннотации определенного типа схемы, скажем, Объектов, в Маркене.
  • Команда Algorithm улучшила свой алгоритм. Теперь, когда мы повторно запустили алгоритм для того же видеофайла, он создал 600 аннотаций объектов типа схемы и сохранил их в нашем сервисе.

Обратите внимание, что мы не можем обновить аннотации из предыдущих запусков, потому что мы не знаем, сколько аннотаций приведет к запуску нового алгоритма. Также нам очень дорого отслеживать, какую аннотацию нужно обновить.

Цель состоит в том, что когда потребитель приходит и ищет аннотации типа Objects для данного видеофайла, должно произойти следующее.

  • Перед запуском Algo 1, если они будут искать, они ничего не найдут.
  • После завершения запуска Algo 1 запрос должен найти первый набор из 500 аннотаций.
  • В то время, когда Algo run 2 создавал набор из 600 аннотаций, поиск клиентов все еще должен возвращать более старые 500 аннотаций.
  • Когда все 600 аннотаций будут успешно созданы, они должны заменить старый набор из 500.
  • Итак, теперь, когда клиенты ищут аннотации для объектов, они должны получить 600 аннотаций.

Вам это что-то напоминает? Это кажется очень похожим (не совсем таким же) на распределенную транзакцию.

Как правило, запуск алгоритма может иметь 2k-5k аннотаций. Есть много наивных решений, возможных для этой проблемы, например:

  • Напишите разные прогоны в разных базах данных. Это явно очень дорого.
  • Алгоритм записи работает с файлами. Но мы не можем выполнять поиск или представлять извлечение файлов с малой задержкой.
  • И т. д.

Вместо этого наша задача состояла в том, чтобы реализовать эту функцию поверх баз данных Cassandra и ElasticSearch, потому что это то, что использует Marken. Решение, которое мы представляем в этом блоге, не ограничивается аннотациями и может использоваться для любого другого домена, который также использует ES и Cassandra.

Архитектура Маркен

Схема архитектуры Маркена выглядит следующим образом. Мы отсылаем читателя к нашей предыдущей статье в блоге за подробностями. Мы используем Cassandra в качестве источника достоверной информации, где мы храним аннотации, пока мы индексируем аннотации в ElasticSearch, чтобы обеспечить богатые функции поиска.

Наша цель состояла в том, чтобы помочь командам Netflix создавать конвейеры данных, не задумываясь о том, как эти данные доступны читателям или клиентским командам. Точно так же командам клиентов не нужно беспокоиться о том, когда и как записываются данные. Это то, что мы называем отделением потоков производителей от клиентов данных.

Жизненный цикл фильма проходит множество творческих стадий. У нас есть много временных файлов, которые доставляются до того, как мы доберемся до финального файла фильма. Точно так же фильм имеет много разных языков, и на каждом из этих языков могут быть доставлены разные файлы. Команды обычно хотят запускать алгоритмы и создавать аннотации, используя все эти медиафайлы.

Поскольку алгоритмы могут выполняться с различными вариантами создания и доставки медиафайлов, мы можем упростить выполнение алгоритма следующим образом.

  • Тип схемы аннотации — определяет схему для аннотации, созданной алгоритмом.
  • Версия схемы аннотации — определяет версию схемы аннотации, сгенерированной алгоритмом.
  • PivotId — уникальный строковый идентификатор, идентифицирующий файл или метод, используемый для создания аннотаций. Это может быть SHA-хэш файла или просто идентификационный номер фильма.

Приведенное выше мы можем описать модель данных для операции аннотации следующим образом.

{
  "annotationOperationKeys": [
    {
      "annotationType": "string",   ❶
      "annotationTypeVersion": “integer”,
      "pivotId": "string",
      "operationNumber": “integer”    ❷
    }
  ],
  "id": "UUID",
  "operationStatus": "STARTED",   ❸
  "isActive": true   ❹
}
  1. Мы уже объяснили AnnotationType, AnnotationTypeVersion и PivotId выше.
  2. OperationNumber — это автоматически увеличивающийся номер для каждой новой операции.
  3. OperationStatus — Операция проходит через три фазы: Started, Finished и Canceled.
  4. IsActive — Активны ли операция и связанные с ней аннотации и доступны ли они для поиска.

Как вы можете видеть из модели данных, производитель аннотации должен выбрать AnnotationOperationKey, который позволяет им определить, как они хотят использовать аннотации UPSERT в AnnotationOperation. Внутри AnnotationOperationKey важным полем является pivotId и способ его создания.

Таблицы Cassandra

Наш источник правды для всех объектов в Маркене в Кассандре. Для хранения операций аннотации у нас есть следующие основные таблицы.

  • AnnotationOperationById — хранит AnnotationOperations
  • AnnotationIdByAnnotationOperationId — хранит идентификаторы всех аннотаций в операции.

Поскольку Cassandra — это NoSql, у нас есть больше таблиц, которые помогают нам создавать обратные индексы и запускать административные задания, чтобы мы могли сканировать все операции с аннотациями, когда это необходимо.

Эластичный поиск

Каждая аннотация в Marken также индексируется в ElasticSearch для поддержки различных поисков. Чтобы записать связь между аннотацией и операцией, мы также индексируем два поля.

  • annotationOperationId — идентификатор операции, которой принадлежит эта аннотация.
  • isAnnotationOperationActive — находится ли операция в активном состоянии.

API

Мы предоставляем нашим пользователям три API. В следующих разделах мы описываем API и управление состоянием в API.

Начать операцию аннотации

Когда вызывается этот API, мы сохраняем операцию с ее OperationKey (кортеж из annotationType, annotationType Version и pivotId) в нашей базе данных. Эта новая операция помечена как находящаяся в состоянии НАЧАЛО. Мы храним все OperationID, которые находятся в состоянии STARTED, в распределенном кеше (EVCache) для быстрого доступа во время поиска.

UpsertAnnotationsInOperation

Пользователи вызывают этот API для добавления аннотаций в операцию. Они передают аннотации вместе с OperationID. Мы сохраняем аннотации, а также записываем взаимосвязь между идентификаторами аннотаций и идентификатором операции в Cassandra. На этом этапе операции находятся в состоянии isAnnotationOperationActive = ACTIVE и operationStatus = STARTED.

Обратите внимание, что обычно за один запуск операции может быть создано от 2 до 5 тысяч аннотаций. Клиенты могут вызывать этот API с разных машин или потоков для быстрого обновления.

Завершить операцию аннотации

Как только аннотации были созданы в операции, клиенты вызывают FinishAnnotationOperation, которая изменяется после

  • Помечает текущую операцию (скажем, с ID2) как OperationStatus = FINISHED и isAnnotationOperationActive=ACTIVE.
  • Мы удаляем ID2 из Memcache, так как он не находится в состоянии STARTED.
  • Любая предыдущая операция (скажем, с ID1), которая была АКТИВНОЙ, теперь помечается как AnnotationOperationActive=FALSE в Cassandra.
  • Наконец, мы вызываем updateByQuery API в ElasticSearch. Этот API находит все документы Elasticsearch с ID1 и помечает isAnnotationOperationActive=FALSE.

API поиска

Это ключевая часть для наших читателей. Когда клиент вызывает наш поисковый API, мы должны исключить

  • любые аннотации из операций isAnnotationOperationActive=FALSE или
  • для которых операции с аннотациями в настоящее время находятся в состоянии НАЧАЛО. Мы делаем это, исключая следующее из всех запросов в нашей системе.

Для достижения выше

  1. Мы добавляем фильтр в наш запрос ES, чтобы исключить значение isAnnotationOperationStatus FALSE.
  2. Мы запрашиваем EVCache, чтобы узнать все операции, которые находятся в состоянии STARTED. Затем мы исключаем все аннотации с annotationId, найденные в memcache. Использование memcache позволяет нам поддерживать низкие задержки для нашего поиска (большинство наших запросов менее 100 мс).

Обработка ошибок

Cassandra — наш источник правды, поэтому, если происходит ошибка, мы теряем вызов клиента. Однако, как только мы зафиксируем Cassandra, мы должны обработать ошибки Elasticsearch. По нашему опыту, все ошибки происходят, когда в базе данных Elasticsearch возникают проблемы. В приведенном выше случае мы создали логику повтора для вызовов updateByQuery в ElasticSearch. Если вызов завершается ошибкой, мы отправляем сообщение в SQS, чтобы через некоторое время повторить попытку в автоматическом режиме.

Будущая работа

В ближайшем будущем мы хотим написать единый API высокого уровня абстракции, который наши клиенты смогут вызывать вместо вызова трех API. Например, они могут хранить аннотации в хранилище больших двоичных объектов, таком как S3, и предоставлять нам ссылку на файл как часть единого API.