Мне нужно начать работать с некоторыми наборами данных, превышающими размер памяти, а это означает, что мне нужно быстро и в спешке ознакомиться с Dask. Пока это не плохо, но я только что столкнулся с проблемой, которую, я думаю, я в значительной степени решил, но это не красиво, и я хотел посмотреть, есть ли лучший способ решить эту проблему.
Проблема: у меня есть данные временного ряда, хранящиеся в DataFrame. К каждому столбцу (вектору) должна быть применена функция. Функция возвращает 3 дополнительных вектора, которые я хотел бы добавить к исходному кадру данных.
Код: Первая часть приведенного ниже кода — это мое решение в обычных Pandas. Вторая половина — это то, что я сделал, чтобы заставить его работать в Dask.
import numpy as np
import pandas as pd
import os
import dask
import datetime
from dask import delayed
from dask import visualize
import pandas as pd
import dask.dataframe as dd
#### Helper functions
def peak_detection_smoothed_zscore_v2(x, lag, threshold, influence):
'''
iterative smoothed z-score algorithm
Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
'''
import numpy as np
labels = np.zeros(len(x))
filtered_y = np.array(x)
avg_filter = np.zeros(len(x))
std_filter = np.zeros(len(x))
var_filter = np.zeros(len(x))
avg_filter[lag - 1] = np.mean(x[0:lag])
std_filter[lag - 1] = np.std(x[0:lag])
var_filter[lag - 1] = np.var(x[0:lag])
for i in range(lag, len(x)):
if abs(x[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
if x[i] > avg_filter[i - 1]:
labels[i] = 1
else:
labels[i] = -1
filtered_y[i] = influence * x[i] + (1 - influence) * filtered_y[i - 1]
else:
labels[i] = 0
filtered_y[i] = x[i]
# update avg, var, std
avg_filter[i] = avg_filter[i - 1] + 1. / lag * (filtered_y[i] - filtered_y[i - lag])
var_filter[i] = var_filter[i - 1] + 1. / lag * ((filtered_y[i] - avg_filter[i - 1]) ** 2 - (
filtered_y[i - lag] - avg_filter[i - 1]) ** 2 - (filtered_y[i] - filtered_y[i - lag]) ** 2 / lag)
std_filter[i] = np.sqrt(var_filter[i])
return [labels, avg_filter, std_filter]
def make_example_data():
# Make example data
y = np.array(
[1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9,
1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3,
2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1])
# simulate data stored in individual files
df = pd.DataFrame(
{
"Time": np.arange(len(y)),
"y1": y,
"y2": y * 2,
"y3": y ** 2,
"yn": y ** (y)
}
)
bigdf = pd.DataFrame()
for i in range(10):
_df = df
# create my partitioning column
_df["session"] = "S0" + str(i)
bigdf = pd.concat([bigdf, _df], axis=0)
# return a normal dataframe that looks similar to a dask dataframe
return bigdf
# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0
############# Normal Pandas Solution ########################
bigdf = make_example_data()
results_df = pd.DataFrame()
columns = list(bigdf.columns)
columns.remove("Time")
columns.remove("session")
for col in columns:
res1 = bigdf.groupby("session")[col].apply(peak_detection_smoothed_zscore_v2, lag, threshold, influence)
res1 = pd.concat([pd.DataFrame(a).T for a in res1])
res1.columns = [col + "_Signal", col + "_meanFilter", col + "_stdFilter"]
results_df = pd.concat([results_df, res1], axis=1)
pd_results = pd.concat([bigdf, results_df], axis=1)
############### Dask Solution ############################
bigdf = make_example_data()
ddf = dd.from_pandas(bigdf, npartitions=10)
columns = list(ddf.columns)
# remove columns that don't have the function applied to them
columns.remove("Time")
columns.remove("session")
# get all the different sessions
sessions = ddf.groupby("session").count().compute().index.tolist()
# column names that get returned by my function
returns = ["_Signal", "_meanFilter", "_stdFilter"]
# list to hold example series for meta data
rcols = []
for col in columns:
for r in returns:
s = pd.Series([])
s.name = col + r
rcols.append(s)
results = pd.DataFrame(rcols).T
results = dd.from_pandas(results, npartitions=len(sessions))
for session in sessions:
sess_df = ddf[ddf["session"] == session].compute()
# making a dask df to store the results in
sess_results = dd.from_pandas(sess_df, npartitions=1)
for col in columns:
# returns a list of 3 lists
res = peak_detection_smoothed_zscore_v2(sess_df[col], lag, threshold, influence)
# turn 3 lists into a dataframe of 3 columns
res = pd.concat([pd.DataFrame(a).T for a in res]).T
_cols = [col + "_Signal", col + "_meanFilter", col + "_stdFilter"]
res.columns = _cols
# do this iteratively cause I can't figure out how to do it in a single line
for cc in _cols:
sess_results[cc] = res[cc]
# NOTE: If memory is a problem could probably throw this to disk here
# append session results to main results
results = results.append(sess_results)
dd_results = results.compute()
print("Are my Dask results the same as my Pandas results?", dd_results.shape == pd_results.shape)
Вопросы:
Я ищу лучшее возможное решение. Как видите, код Dask намного длиннее и сложнее. Есть ли способ сделать его менее грязным? Может быть, покончить с forloops?
Еще одна проблема, которую я предвижу, заключается в том, что если у меня есть раздел Dask, который достаточно мал, чтобы поместиться в памяти. Что произойдет, если я создам еще 3 вектора одинаковой длины? Моя система умирает?
Если на самом деле нет способа очистить вещи. По крайней мере, делаю ли я что-то максимально эффективно?
Спасибо
lag
элементы для правильной работы. Мой исходный алгоритм перебирает все точки данных каждый раз, когда он вызывается, что крайне неэффективно (я также предостерегаю от этого). Поэтому в идеале вы хотите перебирать свои данные с помощью движущегося окна, загружая толькоlag
элементов в память, обновляя алгоритм, удаляя данные из памяти и переходя к следующему окну (+1 новое наблюдение). Это должен быть самый эффективный способ сделать это с точки зрения памяти. 04.10.2018