Мне нужно начать работать с некоторыми наборами данных, превышающими размер памяти, а это означает, что мне нужно быстро и в спешке ознакомиться с Dask. Пока это не плохо, но я только что столкнулся с проблемой, которую, я думаю, я в значительной степени решил, но это не красиво, и я хотел посмотреть, есть ли лучший способ решить эту проблему.
Проблема: у меня есть данные временного ряда, хранящиеся в DataFrame. К каждому столбцу (вектору) должна быть применена функция. Функция возвращает 3 дополнительных вектора, которые я хотел бы добавить к исходному кадру данных.
Код: Первая часть приведенного ниже кода — это мое решение в обычных Pandas. Вторая половина — это то, что я сделал, чтобы заставить его работать в Dask.
import numpy as np
import pandas as pd
import os
import dask
import datetime
from dask import delayed
from dask import visualize
import pandas as pd
import dask.dataframe as dd
#### Helper functions
def peak_detection_smoothed_zscore_v2(x, lag, threshold, influence):
    '''
    iterative smoothed z-score algorithm
    Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
    '''
    import numpy as np
    labels = np.zeros(len(x))
    filtered_y = np.array(x)
    avg_filter = np.zeros(len(x))
    std_filter = np.zeros(len(x))
    var_filter = np.zeros(len(x))
    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])
    for i in range(lag, len(x)):
        if abs(x[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
            if x[i] > avg_filter[i - 1]:
                labels[i] = 1
            else:
                labels[i] = -1
            filtered_y[i] = influence * x[i] + (1 - influence) * filtered_y[i - 1]
        else:
            labels[i] = 0
            filtered_y[i] = x[i]
        # update avg, var, std
        avg_filter[i] = avg_filter[i - 1] + 1. / lag * (filtered_y[i] - filtered_y[i - lag])
        var_filter[i] = var_filter[i - 1] + 1. / lag * ((filtered_y[i] - avg_filter[i - 1]) ** 2 - (
            filtered_y[i - lag] - avg_filter[i - 1]) ** 2 - (filtered_y[i] - filtered_y[i - lag]) ** 2 / lag)
        std_filter[i] = np.sqrt(var_filter[i])
    return [labels, avg_filter, std_filter]
def make_example_data():
    # Make example data
    y = np.array(
        [1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9,
         1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3,
         2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1])
    # simulate data stored in individual files
    df = pd.DataFrame(
        {
            "Time": np.arange(len(y)),
            "y1": y,
            "y2": y * 2,
            "y3": y ** 2,
            "yn": y ** (y)
        }
    )
    bigdf = pd.DataFrame()
    for i in range(10):
        _df = df
        # create my partitioning column
        _df["session"] = "S0" + str(i)
        bigdf = pd.concat([bigdf, _df], axis=0)
    # return a normal dataframe that looks similar to a dask dataframe
    return bigdf
# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0
############# Normal Pandas Solution ########################
bigdf = make_example_data()
results_df = pd.DataFrame()
columns = list(bigdf.columns)
columns.remove("Time")
columns.remove("session")
for col in columns:
    res1 = bigdf.groupby("session")[col].apply(peak_detection_smoothed_zscore_v2, lag, threshold, influence)
    res1 = pd.concat([pd.DataFrame(a).T for a in res1])
    res1.columns = [col + "_Signal", col + "_meanFilter", col + "_stdFilter"]
    results_df = pd.concat([results_df, res1], axis=1)
pd_results = pd.concat([bigdf, results_df], axis=1)
############### Dask Solution ############################
bigdf = make_example_data()
ddf = dd.from_pandas(bigdf, npartitions=10)
columns = list(ddf.columns)
# remove columns that don't have the function applied to them
columns.remove("Time")
columns.remove("session")
# get all the different sessions
sessions = ddf.groupby("session").count().compute().index.tolist()
# column names that get returned by my function
returns = ["_Signal", "_meanFilter", "_stdFilter"]
# list to hold example series for meta data
rcols = []
for col in columns:
    for r in returns:
        s = pd.Series([])
        s.name = col + r
        rcols.append(s)
results = pd.DataFrame(rcols).T
results = dd.from_pandas(results, npartitions=len(sessions))
for session in sessions:
    sess_df = ddf[ddf["session"] == session].compute()
    # making a dask df to store the results in
    sess_results = dd.from_pandas(sess_df, npartitions=1)
    for col in columns:
        # returns a list of 3 lists
        res = peak_detection_smoothed_zscore_v2(sess_df[col], lag, threshold, influence)
        # turn 3 lists into a dataframe of 3 columns
        res = pd.concat([pd.DataFrame(a).T for a in res]).T
        _cols = [col + "_Signal", col + "_meanFilter", col + "_stdFilter"]
        res.columns = _cols
        # do this iteratively cause I can't figure out how to do it in a single line
        for cc in _cols:
            sess_results[cc] = res[cc]
        # NOTE: If memory is a problem could probably throw this to disk here
    # append session results to main results
    results = results.append(sess_results)
dd_results = results.compute()
print("Are my Dask results the same as my Pandas results?", dd_results.shape == pd_results.shape)
Вопросы:
- Я ищу лучшее возможное решение. Как видите, код Dask намного длиннее и сложнее. Есть ли способ сделать его менее грязным? Может быть, покончить с forloops? 
- Еще одна проблема, которую я предвижу, заключается в том, что если у меня есть раздел Dask, который достаточно мал, чтобы поместиться в памяти. Что произойдет, если я создам еще 3 вектора одинаковой длины? Моя система умирает? 
- Если на самом деле нет способа очистить вещи. По крайней мере, делаю ли я что-то максимально эффективно? 
Спасибо
 
                                                                     
                                                                     
                                                                    
lagэлементы для правильной работы. Мой исходный алгоритм перебирает все точки данных каждый раз, когда он вызывается, что крайне неэффективно (я также предостерегаю от этого). Поэтому в идеале вы хотите перебирать свои данные с помощью движущегося окна, загружая толькоlagэлементов в память, обновляя алгоритм, удаляя данные из памяти и переходя к следующему окну (+1 новое наблюдение). Это должен быть самый эффективный способ сделать это с точки зрения памяти. 04.10.2018