Советы и рекомендации по обработке данных JSON в Databricks с помощью PySpark
В простом случае JSON легко обрабатывается в Databricks. Вы можете прочитать файл объектов JSON непосредственно в DataFrame или таблице, а Databricks знает, как анализировать JSON в отдельных полях. Но, как и в большинстве вещей, связанных с программным обеспечением, здесь есть недостатки и вариации. В этой статье показано, как справляться с наиболее распространенными ситуациями, и приведены подробные примеры кодирования.
Моим вариантом использования были медицинские данные HL7, которые были переведены в JSON, но описанные здесь методы применимы к любым данным JSON. Рассматриваются три формата:
- Текстовый файл, содержащий полные объекты JSON, по одному в строке. Это типично при загрузке файлов JSON в таблицы Databricks.
- Текстовый файл, содержащий различные поля (столбцы) данных, один из которых является объектом JSON. Это часто можно увидеть в компьютерных журналах, где есть метаданные в виде простого текста, за которыми следует более подробная информация в строке JSON.
- Вариант вышеприведенного, где поле JSON представляет собой массив объектов.
Для получения каждого из этих типов входных данных в Databricks требуются разные методы.
Предположения…
- У вас есть доступ к Databricks и вы знаете основные операции. Если вы только начинаете, попробуйте бесплатную Community Edition.
- Я беру спецификацию JSON с json.org и проверяю правильность объектов JSON с jsonlint.com.
- Я игнорирую «голый JSON», такой как [1,2,3] и «привет». По спецификации это полные, действительные объекты JSON, но я считаю их плохой формой, поскольку поля не имеют имен, поэтому их трудно использовать в дальнейшем. (И Databricks может просто обрабатывать их как обычные поля ввода, не заботясь о том, что они являются действительными JSON.)
Полный блокнот Databricks со всем этим кодом находится по адресу https://github.com/ChuckConnell/articles/blob/master/json_tricks.dbc. Скопируйте URL-адрес, а затем в Databricks выполните Workspace/Import/URL.
Хотя этот код был разработан в Databricks, он также должен работать в собственном Apache Spark с установленным PySpark, хотя я не тестировал его там.
Стандартные объекты JSON
Стандартные текстовые файлы JSON выглядят так:
{ "Text1":"hello", "Text2":"goodbye", "Num1":5, "Array1":[7,8,9] } { "Text1":"this", "Text2":"that", "Num1":6.6, "Array1":[77,88,99] } { "Text1":"yes", "Text2":"no", "Num1":-0.03, "Array1":[555,444,222] }
Чтобы прочитать этот файл в DataFrame, используйте стандартный импорт JSON, который выводит схему из предоставленных имен полей и элементов данных.
test1DF = spark.read.json("/tmp/test1.json")
Результирующий DataFrame имеет столбцы, которые соответствуют тегам JSON, и типы данных разумно выведены. (Вывод типов не идеален, особенно для целых чисел, чисел с плавающей запятой и логических значений.) Теперь вы можете читать столбцы DataFrame, используя только их простые имена; весь синтаксис JSON исчез.
Текстовый файл с полем JSON
Текстовый файл с некоторыми обычными полями и одним JSON-полем выглядит так:
Text1|Text2|Num1|JSON1 hello | goodbye | 5 | {"Sub1":"john", "Sub2":3} this | that | 6.6 | {"Sub1":"betty", "Sub2":4} yes | no | -0.03 | {"Sub1":"bobby", "Sub2":5}
Первая строка содержит имена полей, что является стандартом для текстовых файлов данных. Я использую вертикальную черту для разделения полей, чтобы избежать путаницы с запятыми, которые являются частью синтаксиса JSON. (Возможно, вы захотите сделать то же самое, так как синтаксический анализатор текста Databricks испытывает затруднения с синтаксисом escape для встроенных запятых и кавычек.)
Чтобы импортировать такие файлы, используйте двухэтапный процесс, сначала читая поле JSON как текст. Обратите внимание, что я использую «inferSchema», потому что размер файла небольшой; для больших файлов вы должны использовать .schema(my_schema), который работает быстрее.
test2DF = spark.read\ .option("inferSchema", True)\ .option("header", True)\ .option("delimiter", "|")\ .csv("/tmp/test2.txt")
Обычные поля теперь корректны, а поле JSON представляет собой одну текстовую строку.
Нам нужно изменить строку JSON на правильную структуру, чтобы мы могли получить доступ к ее частям.
from pyspark.sql.functions import from_json, col from pyspark.sql.types import StructType, StructField, StringType, IntegerType # Define the schema of the JSON string. schema = StructType([ StructField("Sub1", StringType()), StructField("Sub2", IntegerType() ) ]) # Use the schema to change the JSON string into a struct, overwriting the JSON string. test2DF = test2DF.withColumn("JSON1", from_json(col("JSON1"), schema)) # Make a separate column from one of the struct fields. test2DF = test2DF.withColumn("JSON1_Sub2", col("JSON1.Sub2"))
Теперь у нас есть то, что мы хотим — не-JSON-поля в том виде, в каком они были, поле JSON как реальная структура и пример извлечения одного элемента JSON.
Текстовый файл с полем массива JSON
Текстовый файл с полем, представляющим собой массив объектов JSON, выглядит так:
Text1|Text2|Num1|JSON1 hello | goodbye | 5 | [{"Sub1":"stop", "Sub2":3}, {"Sub1":"go", "Sub2":6}] this | that | 6.6 | [{"Sub1":"eggs", "Sub2":4}, {"Sub1":"bacon", "Sub2":8}] yes | no | -0.03 | [{"Sub1":"apple", "Sub2":5}, {"Sub1":"pear", "Sub2":10}]
Я предполагаю, что каждый объект JSON в массиве имеет одинаковую структуру. Это, конечно, не обязательно, но вероятно для объектов, которые логически связаны и находятся в одном массиве.
Для этого типа ввода JSON начните таким же образом, читая обычные поля в их столбцы и JSON как обычное текстовое поле.
test3DF = spark.read\ .option("inferSchema", True)\ .option("header", True)\ .option("delimiter", "|")\ .csv("/tmp/test3.txt")
Затем измените строку JSON на реальный массив структур с помощью определяемой пользователем функции (UDF). Спасибо https://kontext.tech/column/spark/284/pyspark-convert-json-string-column-to-array-of-object-structtype-in-data-frame за этот трюк с кодированием.
from pyspark.sql.functions import col, udf from pyspark.sql.types import * import json # Schema for the array of JSON objects. json_array_schema = ArrayType( StructType([ StructField('Sub1', StringType(), nullable=False), StructField('Sub2', IntegerType(), nullable=False) ]) ) # Create function to parse JSON using standard Python json library. def parse_json(array_str): json_obj = json.loads(array_str) for item in json_obj: yield (item['Sub1'], item['Sub2']) # Create a UDF, whose return type is the JSON schema defined above. parse_json_udf = udf(lambda str: parse_json(str), json_array_schema) # Use the UDF to change the JSON string into a true array of structs. test3DF = test3DF.withColumn("JSON1arr", parse_json_udf((col("JSON1")))) # We don't need to JSON text anymore. test3DF = test3DF.drop("JSON1")
Массив структур полезен, но часто полезно «денормировать» и поместить каждый объект JSON в отдельную строку.
from pyspark.sql.functions import col, explode test3DF = test3DF.withColumn("JSON1obj", explode(col("JSON1arr"))) # The column with the array is now redundant. test3DF = test3DF.drop("JSON1arr")
Обратите внимание, что поля, отличные от JSON, теперь дублируются в нескольких строках с одним объектом JSON в каждой строке.
И мы можем собирать данные из одного конкретного поля JSON по всем массивам, что теперь намного проще для разнесенного массива.
display (test3DF.select("JSON1obj.Sub1"))
Заключение
JSON — это размеченный текстовый формат. Это удобочитаемый файл, содержащий имена, значения, двоеточия, фигурные скобки и различные другие синтаксические элементы. PySpark DataFrames, с другой стороны, представляет собой двоичную структуру с видимыми данными и метаданными (тип, массивы, подструктуры), встроенными в DataFrame. Инструкции выше помогли вам перевести первое во второе.
Для получения дополнительной информации см.: