Неблокирующая обработка ошибочных сообщений в Kafka с RetryableTopic в приложении Spring Boot

Иногда, когда мы обрабатываем сообщения из топиков Kafka, могут возникать ошибки. Например, потребительские службы или другая инфраструктура могут быть недоступны. Мы хотим убедиться, что не теряем никаких данных, и пытаемся обработать неудавшиеся сообщения.

Поведение обработки сбоев Kafka по умолчанию повторяет попытки обработки сообщений бесконечно. Это невыгодно, потому что некоторые фатальные ошибки нельзя исправить, и мы не должны их перерабатывать.

Вместо того, чтобы полагаться на реализацию по умолчанию, мы можем использовать аннотацию RetryableTopic для настройки более надежной стратегии обработки ошибочных сообщений. Например, мы можем отправить неудавшееся сообщение в Очередь недоставленных писем, ограничить количество повторных попыток, определить таймаут, исключить переобработку фатального исключения и т. д.

В этом руководстве я покажу вам, как реализовать RetryableTopic в приложении Spring Boot за несколько простых шагов.

Давайте начнем!

RetryableTopic Введение и преимущества

Во-первых, давайте поймем разницу между блокирующими и неблокирующими повторными попытками сообщения. Допустим, вы настроили @Bean в своей конфигурации Kafka, чтобы попытаться повторно обработать ошибочные сообщения N раз. Например, рассмотрим следующий фрагмент кода:

Потребитель постоянно пытается повторно обработать ошибочные сообщения в режиме реального времени. Основная тема будет заблокирована. Здесь у нас есть стратегия FixedBackOff с 3 попытками с интервалом восстановления 5 секунд.

Если все повторные попытки не увенчались успехом, сообщение отправляется в очередь недоставленных сообщений (DLT). До тех пор все остальные входящие сообщения будут блокироваться до тех пор, пока не будут обработаны предыдущие.

Это может быть опасно, особенно если интервал повтора слишком велик.

Вот как мы можем улучшить ситуацию, используя RetryableTopic:

  • Основная тема не блокируется, и другие сообщения могут быть обработаны.
  • Неудачные сообщения отправляются в темы повторных попыток с отметкой времени отсрочки.
  • Если сообщение, в котором произошла ошибка, не может быть обработано, оно отправляется в следующую тему повторной попытки.
  • Если обработка завершается сбоем для всех тем повторных попыток, сообщение перенаправляется в DLT.
  • Сообщения из DLT можно повторить, отправив их обратно в тему с первой повторной попыткой.

Подготовить проект

Мы будем создавать сообщения с помощью HTTP-запроса Get и потреблять сообщения с помощью Kafka. Слушатель Kafka будет использовать аннотацию RetryableTopic.

Я создал скелет проекта Spring Boot через https://start.spring.io/ для этой демонстрации.

Добавьте зависимости

Я использую Maven в качестве инструмента сборки. Нам нужны следующие зависимости в pom.xml:

  • Зависимость spring-boot-starter-web позволяет создавать веб-приложения.
  • Зависимость spring-kafka используется для операций Kafka.
  • Зависимость lombok исключает использование стандартного кода.

Подготовьте инфраструктуру

Чтобы запустить Kafka локально, давайте создадим файл docker-compose.yml:

  • У нас есть два сервиса — zookeeper, нужен kafka.

Настройте свойства Кафки

Настроим application.yml:

  • Название темы, которую мы будем слушать, называется my-topic.
  • Группа потребителей Kafka называется my-group.
  • Мы настроили свойства KafkaProducer и KafkaConsumer для сериализации и десериализации.
  • Мы используем порт 29092 для загрузочного сервера Kafka, который мы определили в файле docker-compose.yml .

Добавить прослушиватель Kafka

Давайте создадим прослушиватель Kafka:

  • Мы используем аннотацию @ Component для регистрации Bean-компонента в приложении Spring Boot.
  • У нас есть метод handleMessage(), в котором мы определяем слушателя Kafka и используем метод the@RetryableTopic.
  • Когда все повторные попытки исчерпаны, сообщение пересылается методу handleDlt(), указанному аннотацией @DltHandler. Имя темы DLT по умолчанию будет my-topic-dlt.
  • Обратите внимание, что часть throw new RuntimeException("Test exception") необходима для целей тестирования.

Вот некоторые из @RetryableTopic's essential свойств:

  • Свойство attempts определяет, сколько повторных попыток мы хотим иметь. В этом случае у нас будет 4 попытки плюс 1 для исходной темы.
  • Приложение автоматически создаст темы с суффиксами, названными в соответствии со значением индекса. Например, my-topic-retry-1. Стратегия именования определяется свойством topicSuffixingStrategy.
  • Свойство backoff указывает приложению повторить попытку отправки ошибочных сообщений через 1 секунду. У нас есть множитель 2.0. Это означает, что вторая попытка произойдет через 2 секунды, третья через 4 секунды и так далее.
  • Свойство exclude позволяет нам указать, какие исключения мы не хотим повторять. Например, рекомендуется игнорировать фатальные исключения, такие как DeserializationException. Полный список неустранимых сбоев смотрите в документации.

RetryableTopic предоставляет другие мощные возможности. Если вы хотите ознакомиться с ними, посетите документацию.

Добавить контроллер отдыха

Для простоты давайте добавим RestController для создания сообщений в тему:

  • Метод produceMessage() будет отправлять сообщения в нашу тему Kafka.
  • По умолчанию kafkaTemplate подключается автоматически. Конечно, мы могли бы настроить свой собственный Bean, если нам нужна нестандартная реализация.

Создайте основное приложение

Основное приложение выглядит так:

Протестируйте приложение

Теперь пришло время для тестирования!

  1. Запустите локальную инфраструктуру, запустив:
docker-compose up

2. Запустить основной класс - KafkaErrorHandlingApplication.java.

3. Отправьте тестовое сообщение через контроллер. Например:

GET https://localhost:8090/produce/hello

Вы должны увидеть в консоли приложения, что слушатель получает сообщения. Поскольку мы бросаем RuntimeException, сообщение будет перенаправлено в темы повторных попыток и, наконец, в DLT.

Лог выглядит так:

Идеальный! Проверьте отметку времени на снимке экрана, чтобы увидеть, что повторные попытки происходят в соответствии с настройками политики BackOff.

Кроме того, DLT работает должным образом:

Заключение

В этом руководстве вы узнали, как использовать аннотацию RetryableTopic для реализации неблокирующей обработки ошибочных сообщений. Мы видели некоторые примеры свойств конфигурации.

Эта аннотация предоставляет нам надежное решение для обработки ошибок. Однако имейте в виду, что в настоящее время он имеет несколько ограничений в соответствии с официальными документами:

Используя эту стратегию, вы теряете гарантии порядка Kafka для этой темы.

В настоящее время эта функция не поддерживает аннотации уровня класса @KafkaListener.

Вы можете найти ссылку на полный исходный код этой демонстрации в разделе Ссылки ниже.

Я надеюсь, что вы узнали что-то новое из этого поста. Если вам понравился этот урок, вам также могут понравиться другие мои статьи, связанные с Kafka:





Спасибо за чтение и удачного кодирования!

Рекомендации