import dask.dataframe as dd
import pandas as pd
import gc
import numpy as np
from timeit import default_timer as timer
from multiprocessing import cpu_count
conn_string = 'sqlite:////Volumes/SSHD2/10GB/DB/testdb.db'
import warnings
warnings.simplefilter('ignore')
n_cores = cpu_count()
#dask tests
def mean_df_dask():
return ddf.loc[ddf['sales_price']>20].mean().compute()
def max_df_dask():
return ddf.loc[ddf['sales_price']>20].max().compute()
def sum_df_dask():
return ddf.loc[ddf['sales_price']<10].sum().compute()
def min_df_dask():
return ddf.loc[ddf['sales_price']>20].min().compute()
def group_df_nlargest_dask():
return ddf.groupby(['item_category', 'item_color']).\
agg({'sales_price':'sum'}).\
mean().\
nlargest(100).\
compute()
def lambda_df_dask():
result = ddf.assign(net_profit = \
lambda x:((ddf['quantity'] * ddf['sales_price'])\
- (ddf['quantity'] * ddf['wholesale_cost'])\
- ddf['coupon_amt']))
return result.groupby(['sold_day_name'])['net_profit'].agg('sum').compute()
#pandas tests
def mean_df_pandas():
return pdf.loc[pdf['sales_price']>20].mean()
def max_df_pandas():
return pdf.loc[pdf['sales_price']>20].max()
def sum_df_pandas():
return pdf.loc[pdf['sales_price']<10].sum()
def min_df_pandas():
return pdf.loc[pdf['sales_price']>20].min()
def group_df_nlargest_pandas():
return pdf.groupby(['item_category', 'item_color']).\
agg({'sales_price':'sum'}).\
mean().\
nlargest(100)
def lambda_df_pandas():
result = pdf.assign(net_profit = \
lambda x:((pdf['quantity'] * pdf['sales_price'])\
- (pdf['quantity'] * pdf['wholesale_cost']\
- pdf['coupon_amt'])))
return result.groupby(['sold_day_name'])['net_profit'].agg('sum')
tests = [
mean_df_dask,
mean_df_pandas,
max_df_dask,
max_df_pandas,
sum_df_dask,
sum_df_pandas,
min_df_dask,
min_df_pandas,
group_df_nlargest_dask,
group_df_nlargest_pandas,
lambda_df_dask,
lambda_df_pandas
]
times = []
methods = ['dask', 'pandas']
n_records = [
[1000000, '1M'],
[5000000, '5M'],
[10000000,'10M']
]
def f(vname, record, method, tests):
for k, v in enumerate(tests):
fname = v.__name__
library = fname.rsplit('_', 1)[1]
if library == method:
start = timer()
v()
end = timer()
t = round(end - start, 1)
#print(library + '_' + record + ', ' + fname + ' --> ' + str(t))
times.append([fname, library, t, vname[0:3] + record])
for method in methods:
if method == 'dask':
for record in n_records:
vname = 'vw_test_data' + '_' + str(record[0])
ddf = dd.read_sql_table(vname, conn_string, index_col = 'id', npartitions = n_cores)
f(vname, record[1], method, tests)
#gc.collect()
if method == 'pandas':
for record in n_records:
vname = 'vw_test_data' + '_' + str(record[0])
pdf = pd.read_sql_table(vname, conn_string, index_col = 'id')
f(vname, record[1], method, tests)
#gc.collect()
import matplotlib.pyplot as plt
plt.style.use('ggplot')
plt.rcParams['figure.figsize'] = [12,4]
df = pd.DataFrame.from_records(times, columns = ['operation',
'library',
'elapsed_time',
'view_name'])
df['operation'] = df['operation'].str.replace('_pandas', '').str.replace('_dask', '')
df_1m_pivot = df.loc[df['view_name']=='vw_1M'].drop(columns=['view_name']).\
pivot(index='operation',\
columns='library',\
values='elapsed_time')
df_5m_pivot = df.loc[df['view_name']=='vw_5M'].drop(columns=['view_name']).\
pivot(index='operation',\
columns='library',\
values='elapsed_time')
df_10m_pivot = df.loc[df['view_name']=='vw_10M'].drop(columns=['view_name']).\
pivot(index='operation',\
columns='library',\
values='elapsed_time')
x1 = df_1m_pivot.plot.bar(fontsize = 12,\
table = True,\
title = 'Execution Time Comparison for {v} records'.format(v='1 million'))
x1.set_ylabel('Execution Time (sec.)')
x_axis = x1.axes.get_xaxis().set_visible(False)
table = x1.tables[0]
table.set_fontsize = 12
table.scale(1,2)
x2 = df_5m_pivot.plot.bar(fontsize = 12,\
table = True,\
title = 'Execution Time Comparison for {v} records'.format(v='5 million'))
x2.set_ylabel('Execution Time (sec.)')
x_axis = x2.axes.get_xaxis().set_visible(False)
table = x2.tables[0]
table.set_fontsize = 12
table.scale(1,2)
x3 = df_10m_pivot.plot.bar(fontsize = 12,\
table = True,\
title = 'Execution Time Comparison for {v} records'.format(v='10 million'))
x3.set_ylabel('Execution Time (sec.)')
x_axis = x3.axes.get_xaxis().set_visible(False)
table = x3.tables[0]
table.set_fontsize = 12
table.scale(1,2)