В этом блоге мы увидим, как читать файлы Avro с помощью Flink.

Прежде чем читать файлы, давайте рассмотрим Flink.

Существует два типа обработки — пакетная и в режиме реального времени.

  • Пакетная обработка:обработка на основе данных, собранных за определенный период времени.
  • Обработка в реальном времени:обработка на основе немедленных данных для мгновенного результата.

Обработка в режиме реального времени пользуется спросом, и Apache Flink является инструментом обработки в реальном времени.

Некоторые из функций flink включают в себя:

  • Высокая скорость
  • Поддержка scala и java
  • Малая задержка
  • Отказоустойчивость
  • Масштабируемость

Давайте начнем.

Шаг 1:

Добавьте необходимые зависимости в build.sbt:

name := "flink-demo"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies ++= Seq(

"org.apache.flink" %% "flink-scala" % "1.10.0",

"org.apache.flink" % "flink-avro" % "1.10.0",

"org.apache.flink" %% "flink-streaming-scala" % "1.10.0"

)

Шаг 2:

Следующим шагом является создание указателя на среду, в которой работает эта программа. В искре это похоже на контекст искры.

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

Шаг 3.

Установка параллелизма x здесь приведет к тому, что все операторы (такие как соединение, сопоставление, уменьшение) будут выполняться с параллельным экземпляром x.

Я использую 1, так как это демонстрационное приложение.

env.setParallelism(1)

Шаг 4:

Определение формата ввода.

public AvroInputFormat(Path filePath, Class type)

Он принимает два параметра. Первый — это путь к файлу Avro, а второй — тип класса. Мы будем читать файл как Generic Record.

Позже, если мы захотим, мы можем привести его к определенному типу, используя классы case.

val avroInputFormat = new AvroInputFormat[GenericRecord](new org.apache.flink.core.fs.Path("path to avro file"), classOf[GenericRecord])

Шаг 5.

Создание потока входных данных с определенным форматом ввода.

def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T]

Функция createInput принимает формат ввода в качестве параметра, мы будем отправлять ей avroInputFormat. Также требуется TypeInformation.

implicit val typeInfo: TypeInformation[GenericRecord] = TypeInformation.of(classOf[GenericRecord])

val recordStream: scala.DataStream[GenericRecord] = env.createInput(avroInputFormat)

Шаг 6:

Давайте напечатаем данные, которые мы будем читать из файлов Avro. Функция печати будет действовать как приемник.

recordStream.print()

Шаг 7:

Потоки ленивы. Давайте теперь запустим выполнение программы с помощью execute.

env.execute(jobName = "flink-avro-demo")

Чтобы загрузить полный код, посетите flink-avro-demo

Спасибо за чтение.

Заключение:
Надеюсь, после прочтения этого блога вы сможете понять, как мы читаем файлы Avro с помощью Flink.