WedX - журнал о программировании и компьютерных науках

Как написать набор данных в тему Kafka?

Я использую Spark 2.1.0 и Kafka 0.9.0.

Я пытаюсь передать результат пакетного задания искры в kafka. Предполагается, что задание выполняется каждый час, но не в потоковом режиме.

В поисках ответа в сети я смог найти только интеграцию kafka с потоковой передачей Spark и ничего об интеграции с пакетным заданием.

Кто-нибудь знает, возможна ли такая вещь?

Спасибо

ОБНОВИТЬ :

Как упоминалось пользователем 8371915, я пытался следовать тому, что было сделано в Запись вывода пакетных запросов в Kafka.

Я использовал искровую оболочку:

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

Вот простой код, который я пробовал:

val df = Seq(("Rey", "23"), ("John", "44")).toDF("key", "value")
val newdf = df.select(to_json(struct(df.columns.map(column):_*)).alias("value"))
newdf.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "alerts").save()

Но я получаю сообщение об ошибке:

java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider does not allow create table as select.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:497)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
... 50 elided

Есть идеи, с чем это связано?

Спасибо


Ответы:


1

tl;dr Вы используете устаревшую версию Spark. Запись включена в 2.2 и более поздних версиях.

По умолчанию вы можете использовать коннектор Kafka SQL (тот же, что и для Structured Streaming). Включают

  • spark-sql-kafka в ваших зависимостях.
  • Преобразование данных в DataFrame, содержащих не менее value столбца типа StringType или BinaryType.
  • Запись данных в Кафку:

    df   
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", server)
      .save()
    

Подробности см. в документации по структурированной потоковой передаче (начиная с Написание вывод пакетных запросов в Kafka).

06.04.2018

2

Если у вас есть кадр данных и вы хотите записать его в тему кафки, вам нужно сначала преобразовать столбцы в столбец значений, содержащий данные в формате json. В скале это

import org.apache.spark.sql.functions._

val kafkaServer: String = "localhost:9092"
val topicSampleName: String = "kafkatopic"

df.select(to_json(struct("*")).as("value"))
  .selectExpr("CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaServer)
  .option("topic", topicSampleName)
  .save()
09.03.2021
  • Это ответ, который я ищу. Как преобразовать фрейм данных, состоящий из нескольких столбцов, в фрейм данных, состоящий из одного столбца с именем value. Спасибо 02.04.2021

  • 3

    Для этой ошибки java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider не позволяет создавать таблицу как выбранную. в scala.sys.package$.error(package.scala:27)

    Я думаю, вам нужно разобрать сообщение на пару значений ключа. Ваш фрейм данных должен иметь столбец значений.

    Скажем, если у вас есть фрейм данных со student_id, scores.

    df.show()
    >> student_id | scores
        1         |  99.00
        2         |  98.00
    

    то вы должны изменить свой фрейм данных на

    value
    {"student_id":1,"score":99.00}
    {"student_id":2,"score":98.00}
    

    Для преобразования вы можете использовать аналогичный код, подобный этому

    df.select(to_json(struct($"student_id",$"score")).alias("value"))
    
    15.08.2019
    Новые материалы

    Объяснение документов 02: BERT
    BERT представил двухступенчатую структуру обучения: предварительное обучение и тонкая настройка. Во время предварительного обучения модель обучается на неразмеченных данных с помощью..

    Как проанализировать работу вашего классификатора?
    Не всегда просто знать, какие показатели использовать С развитием глубокого обучения все больше и больше людей учатся обучать свой первый классификатор. Но как только вы закончите..

    Работа с цепями Маркова, часть 4 (Машинное обучение)
    Нелинейные цепи Маркова с агрегатором и их приложения (arXiv) Автор : Бар Лайт Аннотация: Изучаются свойства подкласса случайных процессов, называемых дискретными нелинейными цепями Маркова..

    Crazy Laravel Livewire упростил мне создание электронной коммерции (панель администратора и API) [Часть 3]
    Как вы сегодня, ребята? В этой части мы создадим CRUD для данных о продукте. Думаю, в этой части я не буду слишком много делиться теорией, но чаще буду делиться своим кодом. Потому что..

    Использование машинного обучения и Python для классификации 1000 сезонов новичков MLB Hitter
    Чему может научиться машина, глядя на сезоны новичков 1000 игроков MLB? Это то, что исследует это приложение. В этом процессе мы будем использовать неконтролируемое обучение, чтобы..

    Учебные заметки: создание моего первого пакета Node.js
    Это мои обучающие заметки, когда я научился создавать свой самый первый пакет Node.js, распространяемый через npm. Оглавление Глоссарий I. Новый пакет 1.1 советы по инициализации..

    Забудьте о Matplotlib: улучшите визуализацию данных с помощью умопомрачительных функций Seaborn!
    Примечание. Эта запись в блоге предполагает базовое знакомство с Python и концепциями анализа данных. Привет, энтузиасты данных! Добро пожаловать в мой блог, где я расскажу о невероятных..


    Для любых предложений по сайту: [email protected]