В этой статье мы увидим, как мы можем использовать Nlphose вместе с Pyspark для выполнения конвейера NLP и собрать информацию о знаменитом путешествии из книги Жюля Верна Вокруг света за 80 дней. Вот ссылка на ⬇️ Блокнот Pyspark, использованный в этой статье.

Из моего личного опыта я обнаружил, что интеллектуальный анализ данных из неструктурированных данных требует использования нескольких методов. Не существует единой модели или библиотеки, которая обычно предлагала бы все, что вам нужно. Часто вам может понадобиться использовать компоненты, написанные на разных языках программирования/фреймворках. Здесь на помощь приходит мой проект с открытым исходным кодом Nlphose. Nlphose позволяет за считанные секунды создавать сложные конвейеры NLP для обработки статических файлов или потокового текста с помощью набора простых инструментов командной строки. Вы можете выполнять несколько операций с текстом, таких как NER, анализ настроений, фрагментация, идентификация языка, вопросы и ответы, 0-выстрел Классификация и многое другое, выполнив одну команду в терминале. Spark — широко используемый инструмент обработки больших данных, который можно использовать для распараллеливания рабочих нагрузок.

Nlphose основан на философии инструментов Unix. Идея состоит в том, чтобы создать простые инструменты, которые могут работать вместе для выполнения нескольких задач. Сценарии Nlphose полагаются на стандартные файловые потоки для чтения и записи данных и могут объединяться для создания сложных конвейеров обработки естественного языка. Каждый скрипт читает JSON из Стандартного ввода и записывает в Стандартный вывод. Все сценарии ожидают, что JSON будет закодирован в формате, совместимом с nlphose, и выходные данные также должны быть совместимы с nlphose. Более подробно об архитектуре Nlphose вы можете прочитать здесь.

Чем отличается Nlphose?

Nlphose изначально не поддерживает Spark/Pyspark, как SparkML. Nlphose в Pyspark полагается на Docker, установленный на всех узлах кластера Spark. Это легко сделать при создании нового кластера в Google Dataproc (и должно быть таким же для любого другого дистрибутива Spark). Помимо Docker, Nlphose не требует установки каких-либо других зависимостей на рабочие узлы кластера Spark. Мы используем класс PipeLineExecutor, как описано ниже, для выполнения конвейера Nlphose из Pyspark.

Под капотом этот класс использует модуль подпроцесс для создания нового контейнера докеров для задачи Spark и запуска конвейера Nlphose. Ввод/вывод осуществляется с использованием stdout и stdin. Это звучит очень просто, но именно так я создаю Nlphose. Никогда не предполагалось поддерживать какую-либо конкретную вычислительную среду или библиотеку. Вы можете запустить Nlphose разными способами, один из которых описан здесь.

Давай начнем

Сначала мы устанавливаем пакет, который мы будем использовать позже для построения визуализации.

!pip установить количество слов

Приведенная ниже команда загружает электронную книгу Вокруг света за 80 дней с gutenber.org и использует утилиту, предоставленную Nlphose, для сегментации одного текстового файла в json с разделителями строк. Каждый объект json имеет атрибуты id и text.

!docker run code2k13/nlphose:latest \
/bin/bash -c “wget https://www.gutenberg.org/files/103/103-0.txt && ./file2json.py 103–0.txt -n 2” > ebook.json

Мы удаляем все docker-контейнеры, которые нам больше не нужны.

!docker system prune -f

Давайте импортируем все необходимые библиотеки. Прочитайте файл json, который мы создали ранее, и преобразуйте его во фрейм данных pandas. Затем мы добавляем столбец «group_id», который присваивает строкам случайный идентификатор группы от 0 до 3. Как только это будет сделано, мы создадим новый фрейм данных PySpark и отобразим некоторые результаты.

import pandas as pd
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import udf
df_pd = pd.read_json(“ebook.json”,lines=True) 
df_pd[‘group_id’] = [i for i in range(0,3)]*347
df= spark.createDataFrame(df_pd)
df= spark.createDataFrame(df_pd)
+ — — — — -+ — — — — — — — — — — + — — — — — — — — — — + — — — — +
|file_name| id| text|group_id|
+ — — — — -+ — — — — — — — — — — + — — — — — — — — — — + — — — — +
|103–0.txt|2bbbfe64–7c1e-11e…| The Project Gute…| 0|
|103–0.txt|2bbea7ea-7c1e-11e…| Title: Around th…| 1|
|103–0.txt|2bbf2eb8–7c1e-11e…|IN WHICH PHILEAS …| 2|
|103–0.txt|2bbfdbd8–7c1e-11e…| Certainly an Eng…| 0|
|103–0.txt|2bbff29e-7c1e-11e…| Phileas Fogg was…| 1|
|103–0.txt|2bc00734–7c1e-11e…| The way in which…| 2|
|103–0.txt|2bc02570–7c1e-11e…| He was recommend…| 0|
|103–0.txt|2bc095f0–7c1e-11e…| Was Phileas Fogg…| 1|
|103–0.txt|2bc0ed20–7c1e-11e…| Had he travelled…| 2|
|103–0.txt|2bc159d6–7c1e-11e…| It was at least …| 0|
|103–0.txt|2bc1a3be-7c1e-11e…| Phileas Fogg was…| 1|
|103–0.txt|2bc2a2aa-7c1e-11e…|He breakfasted an…| 2|
|103–0.txt|2bc2c280–7c1e-11e…| If to live in th…| 0|
|103–0.txt|2bc30b3c-7c1e-11e…| The mansion in S…| 1|
|103–0.txt|2bc34dd6–7c1e-11e…| Phileas Fogg was…| 2|
|103–0.txt|2bc35f88–7c1e-11e…|Fogg would, accor…| 0|
|103–0.txt|2bc3772a-7c1e-11e…| A rap at this mo…| 1|
|103–0.txt|2bc3818e-7c1e-11e…| “The new servant…| 2|
|103–0.txt|2bc38e0e-7c1e-11e…| A young man of t…| 0|
|103–0.txt|2bc45c6c-7c1e-11e…| “You are a Frenc…| 1|
+ — — — — -+ — — — — — — — — — — + — — — — — — — — — — + — — — — +

Запуск конвейера nlphose с помощью Pyspark

Как обсуждалось ранее, Nlphose не имеет встроенной интеграции с Pyspark/Spark. Итак, мы создаем класс под названием «PipeLineExecutor», который запускает док-контейнер и выполняет команду Nlphose. Этот класс взаимодействует с контейнером докера, используя «stdin» и «stdout». Наконец, когда контейнер docker завершает выполнение, мы выполняем «docker system prune -f», чтобы очистить все неиспользуемые контейнеры. Метод «execute_pipeline» записывает данные из фрейма данных в стандартный ввод (построчно), считывает вывод из stdout и возвращает фрейм данных, созданный на основе вывода.

import subprocess
import pandas as pd
import json

class PipeLineExecutor:
  def __init__(self, nlphose_command,data,id_column='id',text_column='text'):
    self.nlphose_command = nlphose_command
    self.id_column = id_column
    self.text_column = text_column
    self.data = data

  def execute_pipeline(self):
    try:
     prune_proc = subprocess.Popen(["docker system prune -f"],shell=True)
     prune_proc.communicate()
        
     proc = subprocess.Popen([self.nlphose_command],shell=True,stdout=subprocess.PIPE, stdin=subprocess.PIPE,stderr=subprocess.PIPE)
     for idx,row in self.data.iterrows():       
        proc.stdin.write(bytes(json.dumps({"id":row[self.id_column],"text":row[self.text_column]}),"utf8"))
        proc.stdin.write(b"\n")
        proc.stdin.flush()
    
     output,error = proc.communicate()
     output_str = str(output,'utf-8')
     output_str = output_str
     data = output_str.split("\n")    
     data = [d for d in data if len(d) > 2]
    finally:
        prune_proc = subprocess.Popen(["docker system prune -f"],shell=True)
        prune_proc.communicate()
    return pd.DataFrame(data)

Команда Nlphose

Следующая команда делает несколько вещей:

command = '''
docker run -a stdin -a stdout -a stderr -i code2k13/nlphose:latest /bin/bash -c “./entity.py |\
./xformer.py — pipeline question-answering — param ‘what did they carry?’ 
“ 
'''

Приведенная ниже функция форматирует данные, возвращаемые задачей PipelineExecutor. Фрейм данных, возвращаемый «PipelineExecutor.execute_pipeline», имеет строковый столбец, содержащий выходные данные команды Nlphose. Каждая строка в кадре данных представляет строку/документ, выводимый командой Nlphose.

def get_answer(row):
  try:
    x =  json.loads(row[0],strict=False)
    row['json_obj'] = json.dumps(x)
    if x['xfrmr_question_answering']['score'] > 0.80:
        row['id'] =  str(x['id'])
        row['answer'] = x['xfrmr_question_answering']['answer']        
    else:
        row['id'] = str(x['id'])
        row['answer'] = None
        
  except Exception as e:
    row['id'] = None
    row['answer'] = "ERROR " + str(e) #.message
    row['json_obj'] = None
    
  return row

Приведенная ниже функция создает объект «PipeLineExecutor», передает ему данные, а затем вызывает метод «execute_pipeline» для объекта. Затем он использует метод «get_answer» для форматирования вывода метода «execute_pipeline».

def run_pipeline(data):
  nlphose_executor = PipeLineExecutor(command,data,"id","text")
  result = nlphose_executor.execute_pipeline()
  result =  result.apply(get_answer,axis=1)     
  return  result[["id","answer","json_obj"]]

Масштабирование пайплайна с помощью PySpark

Мы используем applyInPandas из PySpark для распараллеливания и обработки текста в масштабе. PySpark автоматически обрабатывает масштабирование конвейера Nlphose в кластере Spark. Метод run_pipeline вызывается для каждой группы входных данных. Важно иметь соответствующее количество групп в зависимости от количества узлов, чтобы эффективно обрабатывать данные в кластере Spark.

output = df.groupby(“group_id”).applyInPandas(run_pipeline, schema=”id string,answer string,json_obj string”)
output.cache()

Визуализация наших выводов

Как только мы закончим выполнение конвейера nlphose, мы приступим к визуализации наших результатов. Я создал две визуализации:

  • Карта с указанием мест, упомянутых в книге.
  • Облако слов обо всех важных предметах, которые персонажи взяли с собой в путешествие.

Нанесение наиболее часто встречающихся мест из книги на карту мира

Приведенный ниже код извлекает информацию о широте и долготе из конвейера Nlphose и создает список наиболее распространенных местоположений.

💡 Примечание. При извлечении объектов Nlphose автоматически угадываются координаты хорошо известных местоположений с использованием подхода на основе словаря

def get_latlon2(data):
        json_obj = json.loads(data)
        if 'entities' in json_obj.keys():
            for e in json_obj['entities']:
                if e['label'] == 'GPE' and 'cords' in e.keys():
                    return json.dumps({'data':[e['entity'],e['cords']['lat'],e['cords']['lon']]})
        return None
        
get_latlon_udf2 = udf(get_latlon2)  
df_locations = output.withColumn("locations",get_latlon_udf2(output["json_obj"]))
top_locations = df_locations.filter("`locations` != 'null'").groupby("locations").count().sort(desc("count")).filter("`count` >= 1")
top_locations.cache() 
top_locations.show()

Затем мы используем пакет «geopandas», чтобы нанести эти места на карту мира. Прежде чем мы это сделаем, нам нужно будет преобразовать наш фрейм данных в формат, понятный «геопандам». Это делается путем применения функции add_lat_long.

def add_lat_long(row):
     obj =  json.loads(row[0])["data"]
     row["lat"] = obj[1]
     row["lon"] = obj[2]
     return row
import geopandas

df_locations = top_locations.toPandas()
df_locations = df_locations.apply(add_lat_long,axis=1)

gdf = geopandas.GeoDataFrame(df_locations, geometry=geopandas.points_from_xy(df_locations.lon, df_locations.lat))
world = geopandas.read_file(geopandas.datasets.get_path('naturalearth_lowres'))
ax = world.plot(color=(25/255,211/255,243/255) ,edgecolor=(25/255,211/255,243/255),
                    linewidth=0.4,edgecolors='none',figsize=(15, 15))
ax.axis('off')   
gdf.plot(ax=ax,alpha=0.5,marker=".",markersize=df_locations['count']*100,color='seagreen')

Если вы знакомы с этой книгой, то поймете, что мы почти наметили фактический маршрут, пройденный Фоггом в его знаменитом путешествии.

Для справки, ниже показано изображение фактического маршрута, взятого им из Википедии.

Карта «Вокруг света за восемьдесят дней»

Роке, CC BY-SA 3.0, через Wikimedia Commons

Создание облака слов из предметов, которые Фогг возил в своем путешествии.

Приведенный ниже код находит наиболее распространенные предметы, которые несут путешественники, используя «извлекающие ответы на вопросы» и создает облако слов.

from wordcloud import WordCloud, STOPWORDS
import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
figure(figsize=(12, 6), dpi=120)
wordcloud = WordCloud(background_color=’white’,width=1024,height=500).generate(‘ ‘.join(output.filter(“`answer` != ‘null’”).toPandas()[‘answer’].tolist()))
plt.imshow(wordcloud)
plt.axis(“off”)
plt.show()

Заключение

Одна из причин, по которой PySpark является любимым инструментом специалистов по данным и специалистов по машинному обучению, заключается в том, что работать с фреймами данных очень удобно. В этой статье показано, как мы можем запустить Nlphose в кластере Spark с помощью PySpark. Используя подход, описанный в этой статье, мы можем очень легко встроить конвейер Nlphose в наши конвейеры обработки данных. Надеюсь, вам понравилась эта статья. Отзывы и комментарии всегда приветствуются, спасибо!