WedX - журнал о программировании и компьютерных науках

Сохранение смещения сообщения в Kafka с помощью KafkaUtils.createDirectStream

Как сохранить смещение сообщения в Kafka, если я использую KafkaUtils.createDirectStream для чтения сообщений. Kafka теряет значение смещения каждый раз, когда приложение выходит из строя. Затем оно считывает значение, указанное в auto.offset.reset (которое является последним), и не может читать сообщения в интервале остановки-запуска приложения.


Ответы:


1

Вы можете избежать этого, зафиксировав смещение вручную. Установите enable.auto.commit как false, а затем используйте приведенный ниже код, чтобы зафиксировать смещение в kafka после успешной операции.

  var offsetRanges = Array[OffsetRange]()

          val valueStream = stream.transform {
            rdd =>
              offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
              rdd
          }.map(_.value())
//operation
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

Вы также можете прочитать этот документ, который даст вам хорошее представление об управлении смещением https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/

14.01.2019
  • Спасибо за ответ, Риши. Я реализовал вышеуказанное в своем коде, но я получаю Вызвано: java.io.NotSerializableException: объект org.apache.spark.streaming.kafka010.DirectKafkaInputDStream сериализуется, возможно, как часть закрытия операции RDD. Это связано с тем, что на объект DStream ссылаются внутри замыкания. Пожалуйста, перепишите операцию RDD внутри этого DStream, чтобы избежать этого. Это было сделано, чтобы избежать раздувания задач Spark ненужными объектами. 27.01.2019
  • Могу ли я использовать CommitSync в моем случае, если я использую KafkaUtils.createDirectStream для чтения сообщений? 07.02.2019
  • Новые материалы

    Объяснение документов 02: BERT
    BERT представил двухступенчатую структуру обучения: предварительное обучение и тонкая настройка. Во время предварительного обучения модель обучается на неразмеченных данных с помощью..

    Как проанализировать работу вашего классификатора?
    Не всегда просто знать, какие показатели использовать С развитием глубокого обучения все больше и больше людей учатся обучать свой первый классификатор. Но как только вы закончите..

    Работа с цепями Маркова, часть 4 (Машинное обучение)
    Нелинейные цепи Маркова с агрегатором и их приложения (arXiv) Автор : Бар Лайт Аннотация: Изучаются свойства подкласса случайных процессов, называемых дискретными нелинейными цепями Маркова..

    Crazy Laravel Livewire упростил мне создание электронной коммерции (панель администратора и API) [Часть 3]
    Как вы сегодня, ребята? В этой части мы создадим CRUD для данных о продукте. Думаю, в этой части я не буду слишком много делиться теорией, но чаще буду делиться своим кодом. Потому что..

    Использование машинного обучения и Python для классификации 1000 сезонов новичков MLB Hitter
    Чему может научиться машина, глядя на сезоны новичков 1000 игроков MLB? Это то, что исследует это приложение. В этом процессе мы будем использовать неконтролируемое обучение, чтобы..

    Учебные заметки: создание моего первого пакета Node.js
    Это мои обучающие заметки, когда я научился создавать свой самый первый пакет Node.js, распространяемый через npm. Оглавление Глоссарий I. Новый пакет 1.1 советы по инициализации..

    Забудьте о Matplotlib: улучшите визуализацию данных с помощью умопомрачительных функций Seaborn!
    Примечание. Эта запись в блоге предполагает базовое знакомство с Python и концепциями анализа данных. Привет, энтузиасты данных! Добро пожаловать в мой блог, где я расскажу о невероятных..


    Для любых предложений по сайту: [email protected]