В этом блоге мы увидим, как читать файлы Avro с помощью Flink.
Прежде чем читать файлы, давайте рассмотрим Flink.
Существует два типа обработки — пакетная и в режиме реального времени.
- Пакетная обработка:обработка на основе данных, собранных за определенный период времени.
- Обработка в реальном времени:обработка на основе немедленных данных для мгновенного результата.
Обработка в режиме реального времени пользуется спросом, и Apache Flink является инструментом обработки в реальном времени.
Некоторые из функций flink включают в себя:
- Высокая скорость
- Поддержка scala и java
- Малая задержка
- Отказоустойчивость
- Масштабируемость
Давайте начнем.
Шаг 1:
Добавьте необходимые зависимости в build.sbt:
name := "flink-demo" version := "0.1" scalaVersion := "2.12.8" libraryDependencies ++= Seq( "org.apache.flink" %% "flink-scala" % "1.10.0", "org.apache.flink" % "flink-avro" % "1.10.0", "org.apache.flink" %% "flink-streaming-scala" % "1.10.0" )
Шаг 2:
Следующим шагом является создание указателя на среду, в которой работает эта программа. В искре это похоже на контекст искры.
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
Шаг 3.
Установка параллелизма x здесь приведет к тому, что все операторы (такие как соединение, сопоставление, уменьшение) будут выполняться с параллельным экземпляром x.
Я использую 1, так как это демонстрационное приложение.
env.setParallelism(1)
Шаг 4:
Определение формата ввода.
public AvroInputFormat(Path filePath, Class type)
Он принимает два параметра. Первый — это путь к файлу Avro, а второй — тип класса. Мы будем читать файл как Generic Record.
Позже, если мы захотим, мы можем привести его к определенному типу, используя классы case.
val avroInputFormat = new AvroInputFormat[GenericRecord](new org.apache.flink.core.fs.Path("path to avro file"), classOf[GenericRecord])
Шаг 5.
Создание потока входных данных с определенным форматом ввода.
def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T]
Функция createInput принимает формат ввода в качестве параметра, мы будем отправлять ей avroInputFormat. Также требуется TypeInformation.
implicit val typeInfo: TypeInformation[GenericRecord] = TypeInformation.of(classOf[GenericRecord]) val recordStream: scala.DataStream[GenericRecord] = env.createInput(avroInputFormat)
Шаг 6:
Давайте напечатаем данные, которые мы будем читать из файлов Avro. Функция печати будет действовать как приемник.
recordStream.print()
Шаг 7:
Потоки ленивы. Давайте теперь запустим выполнение программы с помощью execute.
env.execute(jobName = "flink-avro-demo")
Чтобы загрузить полный код, посетите flink-avro-demo
Спасибо за чтение.
Заключение:
Надеюсь, после прочтения этого блога вы сможете понять, как мы читаем файлы Avro с помощью Flink.