Советы и рекомендации по обработке данных JSON в Databricks с помощью PySpark

В простом случае JSON легко обрабатывается в Databricks. Вы можете прочитать файл объектов JSON непосредственно в DataFrame или таблице, а Databricks знает, как анализировать JSON в отдельных полях. Но, как и в большинстве вещей, связанных с программным обеспечением, здесь есть недостатки и вариации. В этой статье показано, как справляться с наиболее распространенными ситуациями, и приведены подробные примеры кодирования.

Моим вариантом использования были медицинские данные HL7, которые были переведены в JSON, но описанные здесь методы применимы к любым данным JSON. Рассматриваются три формата:

  • Текстовый файл, содержащий полные объекты JSON, по одному в строке. Это типично при загрузке файлов JSON в таблицы Databricks.
  • Текстовый файл, содержащий различные поля (столбцы) данных, один из которых является объектом JSON. Это часто можно увидеть в компьютерных журналах, где есть метаданные в виде простого текста, за которыми следует более подробная информация в строке JSON.
  • Вариант вышеприведенного, где поле JSON представляет собой массив объектов.

Для получения каждого из этих типов входных данных в Databricks требуются разные методы.

Предположения…

  • У вас есть доступ к Databricks и вы знаете основные операции. Если вы только начинаете, попробуйте бесплатную Community Edition.
  • Я беру спецификацию JSON с json.org и проверяю правильность объектов JSON с jsonlint.com.
  • Я игнорирую «голый JSON», такой как [1,2,3] и «привет». По спецификации это полные, действительные объекты JSON, но я считаю их плохой формой, поскольку поля не имеют имен, поэтому их трудно использовать в дальнейшем. (И Databricks может просто обрабатывать их как обычные поля ввода, не заботясь о том, что они являются действительными JSON.)

Полный блокнот Databricks со всем этим кодом находится по адресу https://github.com/ChuckConnell/articles/blob/master/json_tricks.dbc. Скопируйте URL-адрес, а затем в Databricks выполните Workspace/Import/URL.

Хотя этот код был разработан в Databricks, он также должен работать в собственном Apache Spark с установленным PySpark, хотя я не тестировал его там.

Стандартные объекты JSON

Стандартные текстовые файлы JSON выглядят так:

{ "Text1":"hello", "Text2":"goodbye", "Num1":5, "Array1":[7,8,9] }
{ "Text1":"this", "Text2":"that", "Num1":6.6, "Array1":[77,88,99] }
{ "Text1":"yes", "Text2":"no", "Num1":-0.03, "Array1":[555,444,222] }

Чтобы прочитать этот файл в DataFrame, используйте стандартный импорт JSON, который выводит схему из предоставленных имен полей и элементов данных.

test1DF = spark.read.json("/tmp/test1.json")

Результирующий DataFrame имеет столбцы, которые соответствуют тегам JSON, и типы данных разумно выведены. (Вывод типов не идеален, особенно для целых чисел, чисел с плавающей запятой и логических значений.) Теперь вы можете читать столбцы DataFrame, используя только их простые имена; весь синтаксис JSON исчез.

Текстовый файл с полем JSON

Текстовый файл с некоторыми обычными полями и одним JSON-полем выглядит так:

Text1|Text2|Num1|JSON1
hello | goodbye | 5 | {"Sub1":"john", "Sub2":3}
this | that | 6.6 | {"Sub1":"betty", "Sub2":4}
yes | no | -0.03 | {"Sub1":"bobby", "Sub2":5}

Первая строка содержит имена полей, что является стандартом для текстовых файлов данных. Я использую вертикальную черту для разделения полей, чтобы избежать путаницы с запятыми, которые являются частью синтаксиса JSON. (Возможно, вы захотите сделать то же самое, так как синтаксический анализатор текста Databricks испытывает затруднения с синтаксисом escape для встроенных запятых и кавычек.)

Чтобы импортировать такие файлы, используйте двухэтапный процесс, сначала читая поле JSON как текст. Обратите внимание, что я использую «inferSchema», потому что размер файла небольшой; для больших файлов вы должны использовать .schema(my_schema), который работает быстрее.

test2DF = spark.read\
.option("inferSchema", True)\
.option("header", True)\
.option("delimiter", "|")\
.csv("/tmp/test2.txt")

Обычные поля теперь корректны, а поле JSON представляет собой одну текстовую строку.

Нам нужно изменить строку JSON на правильную структуру, чтобы мы могли получить доступ к ее частям.

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define the schema of the JSON string.
schema = StructType([
  StructField("Sub1", StringType()), 
  StructField("Sub2", IntegerType()
  )
])
# Use the schema to change the JSON string into a struct, overwriting the JSON string.
test2DF = test2DF.withColumn("JSON1", from_json(col("JSON1"), schema))
# Make a separate column from one of the struct fields.
test2DF = test2DF.withColumn("JSON1_Sub2", col("JSON1.Sub2"))

Теперь у нас есть то, что мы хотим — не-JSON-поля в том виде, в каком они были, поле JSON как реальная структура и пример извлечения одного элемента JSON.

Текстовый файл с полем массива JSON

Текстовый файл с полем, представляющим собой массив объектов JSON, выглядит так:

Text1|Text2|Num1|JSON1
hello | goodbye | 5 | [{"Sub1":"stop", "Sub2":3}, {"Sub1":"go", "Sub2":6}]
this | that | 6.6 | [{"Sub1":"eggs", "Sub2":4}, {"Sub1":"bacon", "Sub2":8}]
yes | no | -0.03 | [{"Sub1":"apple", "Sub2":5}, {"Sub1":"pear", "Sub2":10}]

Я предполагаю, что каждый объект JSON в массиве имеет одинаковую структуру. Это, конечно, не обязательно, но вероятно для объектов, которые логически связаны и находятся в одном массиве.

Для этого типа ввода JSON начните таким же образом, читая обычные поля в их столбцы и JSON как обычное текстовое поле.

test3DF = spark.read\
.option("inferSchema", True)\
.option("header", True)\
.option("delimiter", "|")\
.csv("/tmp/test3.txt")

Затем измените строку JSON на реальный массив структур с помощью определяемой пользователем функции (UDF). Спасибо https://kontext.tech/column/spark/284/pyspark-convert-json-string-column-to-array-of-object-structtype-in-data-frame за этот трюк с кодированием.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import *
import json
# Schema for the array of JSON objects.
json_array_schema = ArrayType(
  StructType([
    StructField('Sub1', StringType(), nullable=False), 
    StructField('Sub2', IntegerType(), nullable=False)
  ])
)
# Create function to parse JSON using standard Python json library.
def parse_json(array_str):
  json_obj = json.loads(array_str)
  for item in json_obj:
    yield (item['Sub1'], item['Sub2'])
# Create a UDF, whose return type is the JSON schema defined above.    
parse_json_udf = udf(lambda str: parse_json(str), json_array_schema)
  
# Use the UDF to change the JSON string into a true array of structs.
test3DF = test3DF.withColumn("JSON1arr", parse_json_udf((col("JSON1"))))
# We don't need to JSON text anymore.
test3DF = test3DF.drop("JSON1")

Массив структур полезен, но часто полезно «денормировать» и поместить каждый объект JSON в отдельную строку.

from pyspark.sql.functions import col, explode
test3DF = test3DF.withColumn("JSON1obj", explode(col("JSON1arr")))
# The column with the array is now redundant.
test3DF = test3DF.drop("JSON1arr")

Обратите внимание, что поля, отличные от JSON, теперь дублируются в нескольких строках с одним объектом JSON в каждой строке.

И мы можем собирать данные из одного конкретного поля JSON по всем массивам, что теперь намного проще для разнесенного массива.

display (test3DF.select("JSON1obj.Sub1"))

Заключение

JSON — это размеченный текстовый формат. Это удобочитаемый файл, содержащий имена, значения, двоеточия, фигурные скобки и различные другие синтаксические элементы. PySpark DataFrames, с другой стороны, представляет собой двоичную структуру с видимыми данными и метаданными (тип, массивы, подструктуры), встроенными в DataFrame. Инструкции выше помогли вам перевести первое во второе.

Для получения дополнительной информации см.: