In [1]:
from dask.distributed import Client
client = Client()
client
Out[1]:

Client

Cluster

  • Workers: 6
  • Cores: 24
  • Memory: 137.44 GB
In [2]:
import sqlite3
import gc
import dask.dataframe as dd
import pandas as pd
from timeit import default_timer as timer
from multiprocessing import cpu_count

n_cores = cpu_count()
conn_string = 'sqlite:////Volumes/SSHD2/10GB/DB/testdb.db'
n_records = [
            [1000000, '1M'],
            [2000000, '2M'],
            [3000000, '3M'],
            [4000000, '4M'],
            [5000000, '5M'],
            [6000000, '6M'],
            [7000000, '7M'],
            [8000000, '8M'],
            [9000000, '9M'],
            [10000000, '10M']
            ]
methods = ['pandas', 'dask']
times = []

for record in n_records:
    for method in methods:
        view = 'vw_test_data' + '_' + str(record[0])
        start = timer()
        if method == 'dask':
            df = dd.read_sql_table(view, conn_string,\
                                   index_col = 'id',\
                                   npartitions = n_cores)
        if method == 'pandas':
            df = pd.read_sql_table(view, conn_string, index_col = 'id')
        end = timer()
        gc.collect()
        times.append([view[:3] + record[1], round(end-start,1), method])             
In [3]:
import matplotlib.pyplot as plt
%matplotlib inline
plt.style.use('ggplot')
plt.rcParams['figure.figsize'] = [12,4]
df = pd.DataFrame.from_records(times, columns = ['view_name', 
                                                 'elapsed_time', 
                                                 'library']).\
pivot(index = 'view_name',\
      columns = 'library',\
      values = 'elapsed_time')
df_sorted = df.sort_values('dask')
ax = df_sorted.plot.bar(fontsize = 12,\
                 table=True,\
                 title = 'Data Import Time Comparison')
ax.set_ylabel('Execution Time (sec.)')
x_axis = ax.axes.get_xaxis().set_visible(False)
table = ax.tables[0]
table.set_fontsize = 12
table.scale(1,2)