InvalidIndexError dask dataframe

11 浏览
0 Comments

InvalidIndexError dask dataframe

我正在学习dask,并且会时不时地遇到这个错误:

InvalidIndexError: Reindexing only valid with uniquely valued Index objects

我有一个经过预处理的dask df,可以对其进行许多操作,但是有些简单的操作会引发此错误。

我尝试重置索引,但似乎没有帮助。

问题:有没有一个高级的答案可以解释可能出错的原因?我可以在哪里阅读相关内容?为什么通常会出现这个问题?谷歌搜索没有帮助。

例如,这里有一个奇怪的例子:

我在每个操作后测试df的统计信息,以便找出任何可疑的地方。

df = load_data()
df.shape[0].compute(), df.npartitions
#ok
df = prepare_target(df)
df.shape[0].compute(), df.npartitions
#ok
x_train, x_test, y_train, y_test = dask_tts(df.drop('target', 1), df['target'], random_state=1)
#ok
x_train['target'] = y_train
x_test['target'] = y_test
#ok
x_train.shape[0].compute(), x_train.npartitions
x_test.shape[0].compute(), x_test.npartitions
#ok
x_train.index.nunique().compute()
x_test.index.nunique().compute()
#ok
train, smooth_dict = smoothed_likelyhood(x_train) # returns df and dict
train.shape[0].compute()
#ok
test, _ = smoothed_likelyhood(x_test)
test.shape[0].compute()
#ok
train.index.nunique().compute()
#ok
test.index.nunique().compute() # after this line - error
# InvalidIndexError: Reindexing only valid with uniquely valued Index objects

需要指出的是,只有test会引发错误。

在这里,我尝试复现它,但它按预期工作:

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask_ml.model_selection import train_test_split
def smoothed_likelyhood(df, alpha=1): # works with dask df
    global_mean = df['target'].mean()
    smooth_dict = {}
    final_df = df.copy()
    for c in [c for c in df.columns if c!='target']:
        n_rows = df[c].value_counts()
        all_means = df.groupby(by=c)['target'].mean()
        temp_result = ((all_means * n_rows + global_mean + alpha) / (n_rows + alpha))    
        final_df[c] = df[c].map(temp_result)
        smooth_dict[c] = temp_result.compute().to_dict()
    return final_df, smooth_dict
# TOY EXAMPLE
test = pd.DataFrame({'a':['mos', 'mos', 'london', 'dubai', 'ny', 'mos', 'london', 'dubai', 'shanghai', 'dubai', 'mos', 'london', 'dubai', 'dubai'],
                     'b':['big', 'big', 'big', 'med', 'med', 'med', 'small', 'small', 'small', 'small', 'big', 'med', 'small', 'med'],
                     'target':[1,0,0,1,0,1,1,0,1,1,1,0,0,0]}) 
df = dd.from_pandas(test, npartitions=2)
# -----------------------------------------------
print(f'npartitions: {df.npartitions}')
x_train, x_test, y_train, y_test = train_test_split(df.drop('target', 1), df['target'], test_size=0.3, shuffle=True)
x_train['target'] = y_train
x_test['target'] = y_test
print(x_train.shape[0].compute(), x_train.index.nunique().compute())
print(x_test.shape[0].compute(), x_test.index.nunique().compute())
train, smooth_dict = smoothed_likelyhood(x_train)
test, _ = smoothed_likelyhood(x_test)
print(train.shape[0].compute(), train.index.nunique().compute())
print(test.shape[0].compute(), test.index.nunique().compute())
# train.compute()
print(train['target'].mean().compute())
print(test['target'].mean().compute())

这部分可以正常工作,但是当我尝试使用真实数据时:

%%time
df = load_data(stage='prep_2', frac=config.FRAC, verbose=False) # loading many parquet files
df = prepare_target(df) # some small changes to df
# -----------------------------------------------
print(f'npartitions: {df.npartitions}')
x_train, x_test, y_train, y_test = train_test_split(df.drop('target', 1), df['target'], random_state=1)
x_train['target'] = y_train
x_test['target'] = y_test
print(x_train.shape[0].compute(), x_train.index.nunique().compute())
print(x_test.shape[0].compute(), x_test.index.nunique().compute())
train, smooth_dict = smoothed_likelyhood(x_train)
test, _ = smoothed_likelyhood(x_test)
print(x_train.shape[0].compute(), x_train.index.nunique().compute())
print(x_test.shape[0].compute(), x_test.index.nunique().compute())

这些是打印出的结果:

npartitions: 10
1476758 164300
164018 106750
1476758 164300
164018 106750

其中任何一个都会引发上述的索引错误:

train['target'].mean().compute()
test['target'].mean().compute()

如果你有任何想法,我可能会进一步调查,只是我不知道应该从哪里开始。

谢谢。

0
0 Comments

问题出现的原因是在smoothed_likelihood函数中,使用了dask series的temp_result进行映射操作,导致出现了无效的索引错误。通常情况下,使用pandas时,如果想要将一些值替换成其他值,可以使用map函数,但是在dask中,不能直接将temp_result转换为字典,只能在计算之后才能转换。作者尝试了一些方法,发现简单的df.map(series)操作奇迹般地生效了,但是引入了无效索引的问题。

解决方法是使用预先计算好的字典smooth_dict来进行映射,而不是使用series(temp_result)。虽然在dask中提前计算可能不是最优解,但是作者仍然需要结果,并且据他所知,此处只计算了一次。

下面是一个可行的函数实现:

def smoothed_likelyhood(df, alpha=1):
    """
    Discription:
        preprocess based on mean values of positive events in each category
    Args:
        df: [df] - df to encode
        alpha: [int/float] - regularization param. We can find it with CV
    Returns:
        encoded df, dict to encode user during prediction
    """
    global_mean = df['target'].mean()
    smooth_dict = {}
    for c in [c for c in df.columns if c!='target']:
        n_rows = df[c].value_counts()
        all_means = df.groupby(by=c)['target'].mean()
        temp_result = ((all_means * n_rows + global_mean + alpha) / (n_rows + alpha))
        smooth_dict[c] = temp_result.compute().to_dict()
        df[c] = df[c].map(smooth_dict[c])
    return df, smooth_dict

然而,这个函数在处理800k * 90的数据框时需要花费25分钟,而这只是总数据量的5%。如果有人能提供在dask中加速这个函数的建议,将不胜感激。

0