Я использую Spark 2.1.0 и Kafka 0.9.0.
Я пытаюсь передать результат пакетного задания искры в kafka. Предполагается, что задание выполняется каждый час, но не в потоковом режиме.
В поисках ответа в сети я смог найти только интеграцию kafka с потоковой передачей Spark и ничего об интеграции с пакетным заданием.
Кто-нибудь знает, возможна ли такая вещь?
Спасибо
ОБНОВИТЬ :
Как упоминалось пользователем 8371915, я пытался следовать тому, что было сделано в Запись вывода пакетных запросов в Kafka.
Я использовал искровую оболочку:
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
Вот простой код, который я пробовал:
val df = Seq(("Rey", "23"), ("John", "44")).toDF("key", "value")
val newdf = df.select(to_json(struct(df.columns.map(column):_*)).alias("value"))
newdf.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "alerts").save()
Но я получаю сообщение об ошибке:
java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider does not allow create table as select.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:497)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
... 50 elided
Есть идеи, с чем это связано?
Спасибо