In [1]:
import dask.dataframe as dd
import pandas as pd
import gc
import numpy as np
from timeit import default_timer as timer
from multiprocessing import cpu_count
conn_string = 'sqlite:////Volumes/SSHD2/10GB/DB/testdb.db'
import warnings
warnings.simplefilter('ignore')
n_cores = cpu_count()


#dask tests
def mean_df_dask():
    return ddf.loc[ddf['sales_price']>20].mean().compute()
def max_df_dask():
    return ddf.loc[ddf['sales_price']>20].max().compute()
def sum_df_dask():
    return ddf.loc[ddf['sales_price']<10].sum().compute()
def min_df_dask():
    return ddf.loc[ddf['sales_price']>20].min().compute()
def group_df_nlargest_dask():
    return ddf.groupby(['item_category', 'item_color']).\
            agg({'sales_price':'sum'}).\
            mean().\
            nlargest(100).\
            compute()
def lambda_df_dask():
    result = ddf.assign(net_profit = \
                        lambda x:((ddf['quantity'] * ddf['sales_price'])\
                                  - (ddf['quantity'] * ddf['wholesale_cost'])\
                                  - ddf['coupon_amt']))
    return result.groupby(['sold_day_name'])['net_profit'].agg('sum').compute()


#pandas tests
def mean_df_pandas():
    return pdf.loc[pdf['sales_price']>20].mean()
def max_df_pandas():
    return pdf.loc[pdf['sales_price']>20].max()
def sum_df_pandas():
    return pdf.loc[pdf['sales_price']<10].sum()
def min_df_pandas():
    return pdf.loc[pdf['sales_price']>20].min()
def group_df_nlargest_pandas():
    return pdf.groupby(['item_category', 'item_color']).\
            agg({'sales_price':'sum'}).\
            mean().\
            nlargest(100)
def lambda_df_pandas():
    result = pdf.assign(net_profit = \
                        lambda x:((pdf['quantity'] * pdf['sales_price'])\
                                  - (pdf['quantity'] * pdf['wholesale_cost']\
                                     - pdf['coupon_amt'])))
    return result.groupby(['sold_day_name'])['net_profit'].agg('sum')

tests = [
        mean_df_dask,
        mean_df_pandas,
        max_df_dask,
        max_df_pandas,
        sum_df_dask,
        sum_df_pandas,
        min_df_dask,
        min_df_pandas,
        group_df_nlargest_dask,
        group_df_nlargest_pandas,
        lambda_df_dask,
        lambda_df_pandas
        ]

times = []
methods = ['dask', 'pandas']
n_records = [
            [1000000, '1M'],
            [5000000, '5M'],
            [10000000,'10M']
            ]

def f(vname, record, method, tests):
    for k, v in enumerate(tests):
        fname = v.__name__
        library = fname.rsplit('_', 1)[1]
        if library == method:
            start = timer()
            v()
            end = timer()
            t = round(end - start, 1)
            #print(library + '_' + record + ', ' + fname + ' --> ' + str(t))
            times.append([fname, library, t, vname[0:3] + record])


for method in methods:
    if method == 'dask':
        for record in n_records:
            vname = 'vw_test_data' + '_' + str(record[0])
            ddf = dd.read_sql_table(vname, conn_string, index_col = 'id', npartitions = n_cores)
            f(vname, record[1], method, tests)
            #gc.collect()
    if method == 'pandas':
        for record in n_records:
            vname = 'vw_test_data' + '_' + str(record[0])
            pdf = pd.read_sql_table(vname, conn_string, index_col = 'id')
            f(vname, record[1], method, tests)
            #gc.collect()           
In [3]:
import matplotlib.pyplot as plt
plt.style.use('ggplot')
plt.rcParams['figure.figsize'] = [12,4]
df = pd.DataFrame.from_records(times, columns = ['operation',
                                                 'library', 
                                                 'elapsed_time', 
                                                 'view_name'])
df['operation'] = df['operation'].str.replace('_pandas', '').str.replace('_dask', '')
df_1m_pivot = df.loc[df['view_name']=='vw_1M'].drop(columns=['view_name']).\
                pivot(index='operation',\
                columns='library',\
                values='elapsed_time')
df_5m_pivot = df.loc[df['view_name']=='vw_5M'].drop(columns=['view_name']).\
               pivot(index='operation',\
               columns='library',\
               values='elapsed_time')
df_10m_pivot = df.loc[df['view_name']=='vw_10M'].drop(columns=['view_name']).\
                pivot(index='operation',\
                columns='library',\
                values='elapsed_time')
          
                  
x1 = df_1m_pivot.plot.bar(fontsize = 12,\
                         table = True,\
                         title = 'Execution Time Comparison for {v} records'.format(v='1 million'))
x1.set_ylabel('Execution Time (sec.)')
x_axis = x1.axes.get_xaxis().set_visible(False)
table = x1.tables[0]
table.set_fontsize = 12
table.scale(1,2)

x2 = df_5m_pivot.plot.bar(fontsize = 12,\
                         table = True,\
                         title = 'Execution Time Comparison for {v} records'.format(v='5 million'))
x2.set_ylabel('Execution Time (sec.)')
x_axis = x2.axes.get_xaxis().set_visible(False)
table = x2.tables[0]
table.set_fontsize = 12
table.scale(1,2)

x3 = df_10m_pivot.plot.bar(fontsize = 12,\
                          table = True,\
                          title = 'Execution Time Comparison for {v} records'.format(v='10 million'))
x3.set_ylabel('Execution Time (sec.)')
x_axis = x3.axes.get_xaxis().set_visible(False)
table = x3.tables[0]
table.set_fontsize = 12
table.scale(1,2)