WedX - журнал о программировании и компьютерных науках

Какой более чистый способ мутировать Dask Dataframe?

Мне нужно начать работать с некоторыми наборами данных, превышающими размер памяти, а это означает, что мне нужно быстро и в спешке ознакомиться с Dask. Пока это не плохо, но я только что столкнулся с проблемой, которую, я думаю, я в значительной степени решил, но это не красиво, и я хотел посмотреть, есть ли лучший способ решить эту проблему.

Проблема: у меня есть данные временного ряда, хранящиеся в DataFrame. К каждому столбцу (вектору) должна быть применена функция. Функция возвращает 3 дополнительных вектора, которые я хотел бы добавить к исходному кадру данных.

Код: Первая часть приведенного ниже кода — это мое решение в обычных Pandas. Вторая половина — это то, что я сделал, чтобы заставить его работать в Dask.

import numpy as np
import pandas as pd
import os
import dask
import datetime
from dask import delayed
from dask import visualize
import pandas as pd
import dask.dataframe as dd

#### Helper functions

def peak_detection_smoothed_zscore_v2(x, lag, threshold, influence):
    '''
    iterative smoothed z-score algorithm
    Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
    '''
    import numpy as np

    labels = np.zeros(len(x))
    filtered_y = np.array(x)
    avg_filter = np.zeros(len(x))
    std_filter = np.zeros(len(x))
    var_filter = np.zeros(len(x))

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])
    for i in range(lag, len(x)):
        if abs(x[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
            if x[i] > avg_filter[i - 1]:
                labels[i] = 1
            else:
                labels[i] = -1
            filtered_y[i] = influence * x[i] + (1 - influence) * filtered_y[i - 1]
        else:
            labels[i] = 0
            filtered_y[i] = x[i]
        # update avg, var, std
        avg_filter[i] = avg_filter[i - 1] + 1. / lag * (filtered_y[i] - filtered_y[i - lag])
        var_filter[i] = var_filter[i - 1] + 1. / lag * ((filtered_y[i] - avg_filter[i - 1]) ** 2 - (
            filtered_y[i - lag] - avg_filter[i - 1]) ** 2 - (filtered_y[i] - filtered_y[i - lag]) ** 2 / lag)
        std_filter[i] = np.sqrt(var_filter[i])


    return [labels, avg_filter, std_filter]


def make_example_data():
    # Make example data
    y = np.array(
        [1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9,
         1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3,
         2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1])
    # simulate data stored in individual files
    df = pd.DataFrame(
        {
            "Time": np.arange(len(y)),
            "y1": y,
            "y2": y * 2,
            "y3": y ** 2,
            "yn": y ** (y)
        }
    )

    bigdf = pd.DataFrame()
    for i in range(10):
        _df = df
        # create my partitioning column
        _df["session"] = "S0" + str(i)
        bigdf = pd.concat([bigdf, _df], axis=0)
    # return a normal dataframe that looks similar to a dask dataframe
    return bigdf

# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0


############# Normal Pandas Solution ########################

bigdf = make_example_data()
results_df = pd.DataFrame()
columns = list(bigdf.columns)
columns.remove("Time")
columns.remove("session")
for col in columns:
    res1 = bigdf.groupby("session")[col].apply(peak_detection_smoothed_zscore_v2, lag, threshold, influence)
    res1 = pd.concat([pd.DataFrame(a).T for a in res1])
    res1.columns = [col + "_Signal", col + "_meanFilter", col + "_stdFilter"]
    results_df = pd.concat([results_df, res1], axis=1)

pd_results = pd.concat([bigdf, results_df], axis=1)

############### Dask Solution ############################
bigdf = make_example_data()
ddf = dd.from_pandas(bigdf, npartitions=10)


columns = list(ddf.columns)
# remove columns that don't have the function applied to them
columns.remove("Time")
columns.remove("session")

# get all the different sessions
sessions = ddf.groupby("session").count().compute().index.tolist()

# column names that get returned by my function
returns = ["_Signal", "_meanFilter", "_stdFilter"]

# list to hold example series for meta data
rcols = []
for col in columns:
    for r in returns:
        s = pd.Series([])
        s.name = col + r
        rcols.append(s)

results = pd.DataFrame(rcols).T
results = dd.from_pandas(results, npartitions=len(sessions))

for session in sessions:
    sess_df = ddf[ddf["session"] == session].compute()
    # making a dask df to store the results in
    sess_results = dd.from_pandas(sess_df, npartitions=1)

    for col in columns:
        # returns a list of 3 lists
        res = peak_detection_smoothed_zscore_v2(sess_df[col], lag, threshold, influence)
        # turn 3 lists into a dataframe of 3 columns
        res = pd.concat([pd.DataFrame(a).T for a in res]).T
        _cols = [col + "_Signal", col + "_meanFilter", col + "_stdFilter"]
        res.columns = _cols
        # do this iteratively cause I can't figure out how to do it in a single line
        for cc in _cols:
            sess_results[cc] = res[cc]
        # NOTE: If memory is a problem could probably throw this to disk here

    # append session results to main results
    results = results.append(sess_results)

dd_results = results.compute()

print("Are my Dask results the same as my Pandas results?", dd_results.shape == pd_results.shape)

Вопросы:

  • Я ищу лучшее возможное решение. Как видите, код Dask намного длиннее и сложнее. Есть ли способ сделать его менее грязным? Может быть, покончить с forloops?

  • Еще одна проблема, которую я предвижу, заключается в том, что если у меня есть раздел Dask, который достаточно мал, чтобы поместиться в памяти. Что произойдет, если я создам еще 3 вектора одинаковой длины? Моя система умирает?

  • Если на самом деле нет способа очистить вещи. По крайней мере, делаю ли я что-то максимально эффективно?

Спасибо

27.09.2018

Ответы:


1

После работы над этой проблемой почти неделю я думаю, что у меня есть решение. Это не так лаконично, как хотелось бы, но избавляет от необходимости загружать слишком много в память за один раз. Я не на 100% уверен, масштабировал ли я это только на своем ноутбуке, будет ли он распределять задачи на другие рабочие узлы или нет.

В итоге я переместил свои данные из файлов перьев в ctables bcolz. Это позволило мне изменить фреймы данных/ctables без хлопот, которые ввел Dask. И я почти уверен, что мне не нужно беспокоиться о нехватке памяти на моем компьютере.

import bcolz
import numpy as np
import pandas as pd
import os
import dask
import datetime
from dask import delayed
from dask import visualize
import pandas as pd
import dask.dataframe as dd
from copy import copy


def peak_detection_smoothed_zscore_v2(x, lag, threshold, influence, lst=True):
    '''
    iterative smoothed z-score algorithm
    Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
    '''
    import numpy as np

    labels = np.zeros(len(x))
    filtered_y = np.array(x)
    avg_filter = np.zeros(len(x))
    std_filter = np.zeros(len(x))
    var_filter = np.zeros(len(x))

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])
    for i in range(lag, len(x)):
        if abs(x[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
            if x[i] > avg_filter[i - 1]:
                labels[i] = 1
            else:
                labels[i] = -1
            filtered_y[i] = influence * x[i] + (1 - influence) * filtered_y[i - 1]
        else:
            labels[i] = 0
            filtered_y[i] = x[i]
        # update avg, var, std
        avg_filter[i] = avg_filter[i - 1] + 1. / lag * (filtered_y[i] - filtered_y[i - lag])
        var_filter[i] = var_filter[i - 1] + 1. / lag * ((filtered_y[i] - avg_filter[i - 1]) ** 2 - (
                filtered_y[i - lag] - avg_filter[i - 1]) ** 2 - (filtered_y[i] - filtered_y[i - lag]) ** 2 / lag)
        std_filter[i] = np.sqrt(var_filter[i])

    return [labels, avg_filter, std_filter]


def make_example_data():
    # Make example data
    y = np.array(
        [1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9,
         1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3,
         2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1])
    # simulate data stored in individual files
    df = pd.DataFrame(
        {
            "Time": np.arange(len(y)),
            "y1": y,
            "y2": y * 2,
            "y3": y ** 2,
            "yn": y ** (y)
        }
    )

    bigdf = pd.DataFrame()
    for i in range(10):
        _df = df
        # create my partitioning column
        _df["session"] = "S0" + str(i)
        bigdf = pd.concat([bigdf, _df], axis=0)
    # return a normal dataframe that looks similar to a dask dataframe
    return bigdf

def ctable_append(cts):
    """
    A function to append multiple ctables and clean up the disk entries along the 0 axis
    similar to pd.concat([df1, df2], axis=0)


    :param cts: a string containing the root directory path or a list of ctables
    :return: ctable
    """
    import shutil

    ctables = []
    first = True

    # check if we are getting a list or a root dir
    if type(cts) == str:
        cts = bcolz.walk(cts)

    for ct in cts:
        if first is True:
            ct1 = ct
        else:
            ct1.append(ct)
            shutil.rmtree(ct.rootdir)
        first = False

    return ct1

# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0

bigdf = make_example_data()
results_df = pd.DataFrame()
columns = list(bigdf.columns)
columns.remove("Time")
columns.remove("session")
for col in columns:
    res1 = bigdf.groupby("session")[col].apply(peak_detection_smoothed_zscore_v2, lag, threshold, influence)
    res1 = pd.concat([pd.DataFrame(a).T for a in res1])
    res1.columns = [col + "_Signal", col + "_meanFilter", col + "_stdFilter"]
    results_df = pd.concat([results_df, res1], axis=1)

pd_results = pd.concat([bigdf, results_df], axis=1)

bigdf = make_example_data()
sessions = list(set(bigdf['session']))
root_dir = os.path.join(os.getcwd(), 'example_data')

# breaking this example dataset out into something a little more like my real dataset
for session in sessions:
    sdf = bigdf[bigdf['session'] == session]
    sess_dir = os.path.join(root_dir, session)
    bcolz.ctable.fromdataframe(sdf, rootdir=sess_dir)

dnapply_cols = [
    'session',
    'Time'
]  # columns that are not signals to find peaks in

lazy_apply = []
# apply my function to all the data.. making the extra columns
# don't think that Dask is really needed here as I'm not sure if it actually distributes the tasks
# when I ran this on a lot more data I only had one maybe two cores doing anything.
# this could have been because of the cost of memory but my ram didn't really go beyond being
# half used.
for ct in bcolz.walk(root_dir):
    for column in ct.cols.names:
        if column not in dnapply_cols:
            #             signal, mean_filter, std_filter = delayed(peak_detection_smoothed_zscore_v2)(ct[column], lag, threshold, influence)
            res = delayed(peak_detection_smoothed_zscore_v2)(ct[column], lag, threshold, influence)
            lazy_apply.append(delayed(ct.addcol)(res[0], name=column + "_Signal"))
            lazy_apply.append(delayed(ct.addcol)(res[1], name=column + "_meanFilter"))
            lazy_apply.append(delayed(ct.addcol)(res[2], name=column + "_stdFilter"))

dask.compute(*lazy_apply)

# combine all ctables into a single ctable

ct1 = ctable_append(root_dir)
dd_results = dd.from_bcolz(ct1, chunksize=74)  # chose a chunk size of 74 cause thats about how long each session df was
print(dd_results.head(), dd_results.compute().shape, pd_results.shape)
print("Are my Dask results the same as my Pandas results?", dd_results.compute().shape == pd_results.shape)
01.10.2018
  • Я не знаю Dash, но алгоритму обнаружения пиков нужны только последние lag элементы для правильной работы. Мой исходный алгоритм перебирает все точки данных каждый раз, когда он вызывается, что крайне неэффективно (я также предостерегаю от этого). Поэтому в идеале вы хотите перебирать свои данные с помощью движущегося окна, загружая только lag элементов в память, обновляя алгоритм, удаляя данные из памяти и переходя к следующему окну (+1 новое наблюдение). Это должен быть самый эффективный способ сделать это с точки зрения памяти. 04.10.2018
  • хороший момент ... Я в основном просто использовал функцию в качестве удобного примера, когда пытался ознакомиться с вещами, и я думаю, что мой возможный вариант использования, вероятно, будет делать что-то вроде обнаружения пиков и, возможно, некоторые другие вещи. Я еще не знаю.. все еще жду, чтобы получить данные.. 06.10.2018
  • Новые материалы

    Я хотел выучить язык программирования MVC4, но не мог выучить его раньше, потому что это выглядит сложно…
    Просто начните и учитесь самостоятельно Я хотел выучить язык программирования MVC4, но не мог выучить его раньше, потому что он кажется мне сложным, и я бросил его. Это в основном инструмент..

    Лицензии с открытым исходным кодом: руководство для разработчиков и создателей
    В динамичном мире разработки программного обеспечения открытый исходный код стал мощной парадигмой, способствующей сотрудничеству, инновациям и прогрессу, движимому сообществом. В основе..

    Объяснение документов 02: BERT
    BERT представил двухступенчатую структуру обучения: предварительное обучение и тонкая настройка. Во время предварительного обучения модель обучается на неразмеченных данных с помощью..

    Как проанализировать работу вашего классификатора?
    Не всегда просто знать, какие показатели использовать С развитием глубокого обучения все больше и больше людей учатся обучать свой первый классификатор. Но как только вы закончите..

    Работа с цепями Маркова, часть 4 (Машинное обучение)
    Нелинейные цепи Маркова с агрегатором и их приложения (arXiv) Автор : Бар Лайт Аннотация: Изучаются свойства подкласса случайных процессов, называемых дискретными нелинейными цепями Маркова..

    Crazy Laravel Livewire упростил мне создание электронной коммерции (панель администратора и API) [Часть 3]
    Как вы сегодня, ребята? В этой части мы создадим CRUD для данных о продукте. Думаю, в этой части я не буду слишком много делиться теорией, но чаще буду делиться своим кодом. Потому что..

    Использование машинного обучения и Python для классификации 1000 сезонов новичков MLB Hitter
    Чему может научиться машина, глядя на сезоны новичков 1000 игроков MLB? Это то, что исследует это приложение. В этом процессе мы будем использовать неконтролируемое обучение, чтобы..


    Для любых предложений по сайту: [email protected]