Kicking the Tires on Airflow, Apache’s workflow management platform – Architecture Overview, Installation and sample Azure Cloud Deployment Pipeline in Python (Part 1)

April 13th, 2020 / No Comments » / by admin

Note: Part 2 can be found HERE and all files used in this tutorial can be downloaded from HERE.

Ever since I can remember, most of ETL or ELT jobs I was tasked with architecting and/or developing were based on batch processing. Nowadays, the trend is leaning heavily towards streaming or semi-real time processing where possible, but the complex nature of those systems and the relative immaturity of the ecosystems they promote still means that most business are reluctant to transition away from an old-school batch processing. The last few years have seen more of a Lambda deployment (hybrid of real-time and batch workloads) but most business requirements for data availability in SMB scenarios hardly go beyond ‘next day is fine’ approach. This, however, does not mean that the platforms themselves have not evolved and that the tooling around how these pipelines are structured have not moved on with time. Most GUI based applications e.g. SSIS, Pentaho, Talend etc. are evolving and morphing into cloud-first solutions and major vendors are launching new features to keep up with the insatiable need for speed of major cloud behemoths. Also, the design patterns and ways which ETL/ELT is architected is undergoing a major transformation, with code and configuration-based deployment taking over drag-and-drop and point-and-click applications. Graphical interface-based tools are arguably easier to debug but with a plethora of new data sources to integrate with, tools like Amazon’s Glue or Apache Airflow are slowly emerging as favourites with the new breed of data engineers and analytics-driven companies alike.

In spite of Airflow being only few years old (project was open-sourced in 2015), it has become Apache’s Top Level Project a year later, with over 1K contributors, 15K stars on Github and close to 10K commits. Airflow has been used successfully in production by both: countless number of smaller startups and big corporations alike (AirBnB, Slack, Square, Robinhood) and its popularity even kickstrated Google to create a managed version called Cloud Composer, running on GCP. Airflow’s versatility and flexibility also allows it to be used for workloads not associated with building ETL/ELT data pipelines e.g. automating DevOps operations, machine learning pipelines, scheduler replacement etc.

Some of the main reasons why Airflow has been gaining momentum across data engineering communities and is slowly superseding tools such as Apache Oozie or Luigi include the following:

  • Airflow is dynamic – DAGs in Airflow are dynamic and flexible in nature. As Airflow pipelines are configured as code (Python), many knobs and switches are available to its developers and nearly any aspect of the pipeline development can be customised and tuned. Also, DAGs provide a nice abstraction to a series of operations
  • Airflow is extensible – Airflow provides large number of operators and executors thus allowing any tasks (not just ETL) to be scheduled and executed. In case a specific operator/executor is not available out of the box, Airflow extensible architecture allows defining your own with relative ease
  • Airflow is scalable – Airflow modular architecture, built-in provisions for configuring a choice of different message queueing systems and executors e.g. RabbitMQ, Celery, Dask, Mesos, multi-node cluster configuration and Kubernetes-based tasks execution model allow for tasks to be deployed and run across number of machines/VMs/containers. While Airflow is not designed to perform any computation itself and tasks can be executed in a sequential manner, Airflow is designed to spread the workloads and maximise computation resources availability
  • Airflow has a good monitoring and management interface – Airflow provides a monitoring and managing interface, where it is possible to have a quick overview of the status of the different tasks, as well as have the possibility to trigger and clear tasks or DAGs runs
  • Airflow is open source – due to the fact that Airflow is not a commercial product and Airflow’s community is very active, the tool receives a steady stream of updates and code contributions, further extending its functionality

Airflow Architecture

Apache Airflow is an open-source workflow management platform. It started at Airbnb in October 2014 as a solution to manage the company’s increasing complex workflows. Creating Airflow allowed Airbnb to programmatically author and schedule their workflows and monitor them via the built-in Airflow user interface. From the beginning, the project was made open source, becoming an Apache Incubator project in March 2016 and a Top-Level Apache Software Foundation project in January 2019.

Building on the popularity of Python as the de facto programming language for data, Airflow is written in Python and workflows are created via Python scripts. Airflow is designed under the principle of “configuration as code” where the Airflow scheduler periodically checks whether or not criteria for running tasks have been met, have other conditions for execution been met etc. according to the dependencies defined in directed acyclic graphs (DAGs). Once all criteria have been met, this fact is recorded in the database and Airflow worker pick up and run jobs with their loads properly balanced. Depending on the choice of setup this worker process can be a single process on a single machine, multiple processes on a single machine or multiple processes distributed across multiple machines. All job information is stored in the meta DB, which is updated in a timely manner. The users can monitor their jobs via a shiny Airflow web UI and/or the logs. A sample Airflow architecture with all its key components and services is as per the image below.

In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. For example, a simple DAG could consist of three tasks: A, B, and C. It could say that A has to run successfully before B can run, but C can run anytime. It could say that task A times out after 5 minutes, and B can be restarted up to 5 times in case it fails. It might also say that the workflow will run every night at 10pm, but shouldn’t start until a certain date. In this way, a DAG describes how you want to carry out your workflow; but notice that we haven’t said anything about what we actually want to do! A, B, and C could be anything. Maybe A prepares data for B to analyze while C sends an email. Or perhaps A monitors your location so B can open your garage door while C turns on your house lights. The important thing is that the DAG isn’t concerned with what its constituent tasks do; its job is to make sure that whatever they do happens at the right time, or in the right order, or with the right handling of any unexpected issues. DAGs are defined in standard Python files that are placed in Airflow’s ‘dags’ folder. Airflow will execute the code in each file to dynamically build the DAG objects. You can have as many DAGs as you want, each describing an arbitrary number of tasks. In general, each one should correspond to a single logical workflow.

Installing Airflow

There are a lot of really detailed and comprehensive posts on the internet describing step-by-step process for Airflow installation across various platforms so in this post I will refrain from looking into all the different ways Airflow can be deployed. For the purpose of this demo, I will spin up Ubuntu micro VM on the Henzner VPS, install Anaconda Python distribution and PostgreSQL database (used for Airflow metadata). The process is fairly straightforward, and all these tasks can be accomplished using your favorite terminal with just a few commands.

#create 'airflow' user
sudo adduser airflow
sudo usermod -aG sudo airflow
su - airflow

#download and install Anaconda and PostgreSQL
curl -O https://repo.anaconda.com/archive/Anaconda3-2019.10-Linux-x86_64.sh
bash Anaconda3-2019.10-Linux-x86_64.sh
conda install -c conda-forge azure-cli-core
sudo apt-get update --fix-missing
sudo apt-get install build-essential autoconf
sudo apt-get install postgresql postgresql-contrib
sudo service postgresql start

#set up database user, database and its permissions
sudo -u postgres psql
CREATE USER airflow PASSWORD 'YourTopSecretPassword';
CREATE DATABASE airflow;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO airflow;

#change PostgreSQL config to accept Airflow connections
sudo nano /etc/postgresql/10/main/pg_hba.conf

#change 'IPv4 local connections' value to the following:
host all all 0.0.0.0/0 trust
sudo nano /etc/postgresql/10/main/postgresql.conf

#change listen_addresses = 'localhost' to the following value:
listen_addresses = '*'

#restart PostgreSQL service
sudo service postgresql restart

#install Airflow and its packages
export AIRFLOW_HOME=~/airflow
pip install "apache-airflow[postgres]"
airflow initdb

#update airflow config values
cd airflow
sudo nano airflow.cfg

#change default value for the executor (SequentialExecutor) to the following value:
executor = LocalExecutor

#change default value for sql_alchemy_conn to the following value:
sql_alchemy_conn = postgresql+psycopg2://airflow@localhost:5432/airflow

#ensure that the following connection pooling parameter is set to True:
sql_alchemy_pool_enabled = True

#change from default Sqlite database to PostgreSQL
airflow initdb

#start Airflow services
airflow webserver
airflow scheduler

Once completed and web server is up and running, we should be able to access Airflow admin panel accessing the following URL: your_server_up_address:8080/admin. This process runs a simple Flask application which reads the state of all tasks from the metadata database and renders these states for the Web UI. Also, given that the default Sqlite metadata database has been replaced with a more robust PostgreSQL instance, the following airflow schema was generated (click on image to enlarge).

To demonstrate the internal working of Airflow and some of its key architectural principles it’s probably best to create a simple, multi-task workflow. In this two-part post I will be creating two separate DAGs – one to provide a brief overview of the DAG file template in its simplest form and another one which will include a more comprehensive workflow interacting with a public cloud resources as well as local data.

Airflow Sample Workflow – Basic DAG

Let’s start with the most basic workflow and introduce the anatomy of the pipeline creation, DAG definition file and individual tasks. One thing to remember is that this Airflow Python script is just a configuration file specifying the DAG’s structure as code. The actual tasks defined here will run in a different context from the context of this script. Different tasks run on different workers at different points in time, which means that this script cannot be used to cross communicate between tasks (with the exception of Xcom – a more advanced Airflow concept). An Airflow pipeline is just a Python script that happens to define an Airflow DAG object. Let’s start by importing the libraries we will need.

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator

When creating a DAG and some tasks, we have the choice to explicitly pass a set of arguments to each task’s constructor (which would become redundant), or (better!) we can define a dictionary of default parameters that we can use when creating tasks.

from datetime import datetime, timedelta

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 1, 30),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2020, 1, 30),
}

We’ll need a DAG object to nest our tasks into. Here we pass a string that defines the dag_id, which serves as a unique identifier for your DAG. We also pass the default argument dictionary that we just defined and define a schedule_interval of 1 day for the DAG.

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

Tasks are generated when instantiating operator objects. An object instantiated from an operator is called a constructor. The first argument task_id acts as a unique identifier for the task.

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

Notice how we pass a mix of operator specific arguments (bash_command) and an argument common to all operators (retries) inherited from BaseOperator to the operator’s constructor. This is simpler than passing every argument for every constructor call. Also, notice that in the second task we override the retries parameter with 3.

Airflow leverages the power of Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. Airflow also provides hooks for the pipeline author to define their own parameters, macros and templates. This tutorial barely scratches the surface of what you can do with templating in Airflow, but the goal of this section is to let you know this feature exists, get you familiar with double curly brackets, and point to the most common template variable: {{ ds }} (today’s “date stamp”).

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7) }}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

Notice that the templated_command contains code logic in {% %} blocks, references parameters like {{ ds }}, calls a function as in {{ macros.ds_add(ds, 7)}}, and references a user-defined parameter in {{ params.my_param }}. The params hook in BaseOperator allows you to pass a dictionary of parameters and/or objects to your templates. Files can also be passed to the bash_command argument, like bash_command=’templated_command.sh’, where the file location is relative to the directory containing the pipeline file. This may be desirable for many reasons, like separating your script’s logic and pipeline code, allowing for proper code highlighting in files composed in different languages, and general flexibility in structuring pipelines. It is also possible to define your template_searchpath as pointing to any folder locations in the DAG constructor call. Using that same DAG constructor call, it is possible to define user_defined_macros which allow you to specify your own variables. For example, passing dict(foo=’bar’) to this argument allows you to use {{ foo }} in your templates. Moreover, specifying user_defined_filters allow you to register you own filters. For example, passing dict(hello=lambda name: ‘Hello %s’ % name) to this argument allows you to use {{ ‘world’ | hello }} in your templates.

Now that we have tasks t1, t2 and t3 that do not depend on each other we need to define a chain of dependencies between them. The following example shows few different ways this can be achieved by.

t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1 # Chaining multiple dependencies becomes # concise with the bit shift operator: t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

Note that when executing your script, Airflow will raise exceptions when it finds cycles in your DAG or when a dependency is referenced more than once. At this point our code should look something like this.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

Finally, when our DAG file is saved into the default folder location, Airflow scheduler should pick it up and display its graph view (showing all its dependencies) as per the image below.

This concludes a brief Airflow architecture overview and a sample pipeline built tutorial. For information on more complex DAGs e.g. involving dynamic provisioning of Azure cloud services (Citus database cluster) please refer to the second part of this post HERE.

Tags: , , , , ,

Data Analysis with Dask – A Python Scale-Out, Parallel Computation Framework For Big Data

July 14th, 2019 / No Comments » / by admin

Introduction

The jury is still out on whether Python emerged as the clear favourite language for all things data but from my personal experience I am witnessing more and more folks working in the field of data wrangling gravitating towards Python rich libraries ecosystem, particularly the so-called Python Open Data Science Stack i.e. Pandas, NumPy, SciPy, and Scikit-learn and away from the industry stalwarts e.g. Fortran, Matlab and Octave. Alongside new tooling and frameworks development, computers have continued to become ever more powerful. This makes it easy to produce, collect, store, and process far more data than before, all at a price that continues to march downward. But, this deluge of data now has many organisations questioning the value of collecting and storing all that data. Working with the Python Open Data Science Stack, data scientists often turn to tools like Pandas for data cleaning and exploratory data analysis, SciPy and NumPy to run statistical tests on the data, and Scikit-Learn to build predictive models. This all works well for relatively small-sized datasets that can comfortably fit into RAM. But, because of the shrinking expense of data collection and storage, data scientists are more frequently working on problems that involve analysing enormous datasets. These tools have upper limits to their feasibility when working with datasets beyond a certain size. Once the threshold is crossed, it is difficult to extract meaning out of data due to painfully long run times – even for the simplest of calculations, unstable code, and unwieldy workflows. Large datasets are datasets that can neither fit in RAM nor can fit in a single computer’s persistent storage. These datasets are typically above 1 terabyte in size, and depending on the problem, can reach into petabytes and beyond. Pandas, NumPy, and Scikit-Learn are not suitable at all for datasets of this size, as they were not inherently built to operate on distributed datasets. Enter Dask. Launched in late 2014 by Matthew Rocklin with aims to bring native scalability to the Python Open Data Science Stack and overcome its single machine restrictions Dask has proven to be a great alternative to Big Data frameworks, which sometimes require specialised expertise and maintenance/configuration overhead e.g. Apache Spark. Dask consists of several different components and APIs, which can be categorised into three layers: task schedulers, low-level APIs, and high-level APIs.

 

What makes Dask so powerful is how these components and layers are built on top of one another. At the core are the task schedulers, which coordinate and monitor execution of computations across CPU cores and machines. These computations are represented in code either as Dask Delayed objects or Dask Futures objects (the key difference is the former are evaluated lazily – meaning they are evaluated just-in-time when the values are needed, while the latter are evaluated eagerly – meaning they are evaluated in real-time regardless of if the value is needed immediately or not). Dask’s high-level APIs offer a layer of abstraction over Delayed and Futures objects. Operations on these high-level objects result in many parallel low-level operations managed by the task schedulers, which provides a seamless experience for the user.

As a result, Dask can scale out data processing computation across multiple machines and hundreds of terabytes of data efficiently. Dask can also enable efficient parallel computations on single machines by leveraging their multi-core CPUs and streaming data efficiently from disk. It can run on a distributed cluster, but it doesn’t have to. Dask allows you to swap out the cluster for single-machine schedulers which are surprisingly lightweight, require no setup, and can run entirely within the same process as the user’s session. To avoid excess memory use, Dask is good at finding ways to evaluate computations in a low-memory footprint when possible by pulling in chunks of data from disk, doing the necessary processing, and throwing away intermediate values as quickly as possible. This lets analysts perform computations on moderately large datasets (100GB+) even on relatively low-power laptops. This requires no configuration and no setup, meaning that adding Dask to a single-machine computation adds very little cognitive overhead.

Even though Dask set of APIs can be utilised to tackle problems across a wide range of spectrum e.g. multi-dimensional data analysis, scalable machine learning training and prediction on large models etc., one of its primary uses is to enable Pandas-like workflows i.e. enabling applications in time series, business intelligence and general data crunching on big data. Like Pandas, Dask also utilises a concept of a DataFrame, with most functionality overlapping that of Pandas. A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index. These Pandas DataFrames may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster. One Dask DataFrame operation triggers many operations on the constituent Pandas DataFrames.

Testing Methodology and Results

Let’s look whether we can benefit from using Dask and its parallel computation functionality to process data generated by the TPC-DS benchmark faster. There are a few other data sets and methodologies that can be used to gauge data storage and processing system’s performance e.g. TLC Trip Record Data released by the New York City Taxi, however, TPC-approved benchmarks have long been considered as the most objective, vendor-agnostic way to perform data-specific hardware and software comparisons, capturing the complexity of modern business processing to a much greater extent than its predecessors.

Firstly, let’s load the 10GB TPC-DS flat files into a Sqlite database using Python script. The following code takes two arguments – 1st one for how the data should be loaded i.e. using Python pandas module or csv module and 2nd one as a comma separated list of file names to be loaded (alternatively ‘all’ value can be passed if all files are to be loaded). This script also builds the database schema which definition is stored in a separate SQL file, creates indexes on some of the tables to speed up subsequent data extraction and finally, creates 10 views – each containing an increment of 1 million records for testing purposes. When loaded all files sequentially, it took around 1 hour to process and generate Sqlite database, which ends up being around 17GB is size.

#!/usr/bin/python
import sqlite3
import sys
import os
import csv
import time
import pandas as pd
import argparse

tpc_ds_files = '/Volumes/SSHD2/10GB'
tpc_ds_files_processed = '/Volumes/SSHD2/10GB/Processed'
dbsqlite_location = '/Volumes/SSHD2/10GB/DB'
dbsqlite_filename = 'testdb.db'
schema_sql_location = '/Volumes/SSHD2/10GB/DB'
schema_sql_filename = 'create_sqlite_schema.sql'
view_sql_location = '/Volumes/SSHD2/10GB/DB'

view_sql_filename = 'create_view.sql'
encoding = 'iso-8859-1'
view_sql_name = 'vw_test_data'
encoding = 'iso-8859-1'
indexes = [['store_sales', 'ss_sold_date_sk'],
           ['store_sales', 'ss_addr_sk'],
           ['store_sales', 'ss_item_sk'],
           ['store_sales', 'ss_store_sk'],
           ['date_dim', 'd_day_name']]
record_counts = ['1000000',
                 '2000000',
                 '3000000',
                 '4000000',
                 '5000000',
                 '6000000',
                 '7000000',
                 '8000000',
                 '9000000',
                 '10000000']
methods = ['use_pandas', 'use_csv']
view_sql_filename = 'create_view.sql'
view_sql_name = 'vw_test_data'
encoding = 'iso-8859-1'


def get_sql(sql_statements_file_path):
    """
    Source operation types from the 'create_sqlite_schema' SQL file.
    Each operation is denoted by the use of four dash characters
    and a corresponding table DDL statement and store them in a dictionary
    (referenced in the main() function).
    """
    table_name = []
    query_sql = []

    with open(sql_statements_file_path, "r") as f:
        for i in f:
            if i.startswith("----"):
                i = i.replace("----", "")
                table_name.append(i.rstrip("\n"))

    temp_query_sql = []
    with open(sql_statements_file_path, "r") as f:
        for i in f:
            temp_query_sql.append(i)
        l = [i for i, s in enumerate(temp_query_sql) if "----" in s]
        l.append((len(temp_query_sql)))
        for first, second in zip(l, l[1:]):
            query_sql.append("".join(temp_query_sql[first:second]))
    sql = dict(zip(table_name, query_sql))
    return sql


def RemoveTrailingChar(row_limit, file, encoding, tpc_ds_files, tpc_ds_files_processed):
    """
    Remove trailing '|' characters from the files generated by the TPC-DS utility
    as they intefere with Sqlite load function.
    """
    line_numer = row_limit
    lines = []
    with open(os.path.join(tpc_ds_files, file), 'r', encoding=encoding) as r,\
            open(os.path.join(tpc_ds_files_processed, file), 'w', encoding=encoding) as w:
        for line in r:
            line.encode(encoding).strip()
            if line.endswith('|\n'):
                lines.append(line[:-2]+"\n")
                if len(lines) == line_numer:
                    w.writelines(lines)
                    lines = []
        w.writelines(lines)


def SourceFilesRename(tpc_ds_files_processed):
    """"
    Rename files extensions from .dat to .csv
    """
    for file in os.listdir(tpc_ds_files_processed):
        if file.endswith(".dat"):
            os.rename(os.path.join(tpc_ds_files_processed, file),
                      os.path.join(tpc_ds_files_processed, file[:-4]+'.csv'))


def BuildSchema(conn, arg_method, arg_tables, dbsqlite_location, dbsqlite_filename, sql):
    """
    Build Sqlite database schema based on table names passed
    """
    if arg_tables[0] == 'all':
        sql = sql.values()
    else:
        sql = {x: sql[x] for x in sql if x in arg_tables}
        sql = sql.values()
    for s in sql:
        try:
            conn.executescript(s)
        except sqlite3.OperationalError as e:
            print(e)
            conn.rollback()
            sys.exit(1)
        else:
            conn.commit()
    conn.execute("PRAGMA SYNCHRONOUS = OFF")
    conn.execute("PRAGMA LOCKING_MODE = EXCLUSIVE")
    conn.execute("PRAGMA JOURNAL_MODE = OFF")  # WALL2


def CleanUpFiles(tpc_ds_files, tpc_ds_files_processed, arg_tables):
    """
    This is a wrapper function which calls two other function i.e.
    RemoveTrailingChar and SourceFilesRename. Removing trailing characters
    is executed in batches equal to rows count specyfied by the row_limit
    variable.
    """
    if arg_tables[0] == "all":
        files = [f for f in os.listdir(tpc_ds_files) if f.endswith(
            ".dat")]
    else:
        files = [f for f in os.listdir(tpc_ds_files) if f.endswith(
            ".dat") and f[:-4] in arg_tables]
    for file in files:
        row_limit = 10000
        print('Processing {file_name} file...'.format(
            file_name=file))
        RemoveTrailingChar(row_limit, file, encoding, tpc_ds_files,
                           tpc_ds_files_processed)
        SourceFilesRename(tpc_ds_files_processed)


def LoadFiles(conn, tpc_ds_files_processed, encoding, method, indexes, arg_tables):
    """
    Import csv flat files into Sqlite database and create indexes on all
    nominated tables. This function also contains logic for the method used
    to load flat files i.e. using either 'pandas' Python module or 'csv' Python
    module. As a result, one of those arguments need to be provided when
    executing the script. The function also does some rudamentary checks
    on whether database row counts are equal to those of the files and
    whether nominated indexes have been created.
    """
    if arg_tables[0] == "all":
        files = [f for f in os.listdir(
            tpc_ds_files_processed) if f.endswith(".csv")]
    else:
        files = [f for f in os.listdir(
            tpc_ds_files_processed) if f.endswith(".csv") and f[:-4] in arg_tables]
    for file in files:
        table_name = file[:-4]
        print('Loading {file_name} file...'.format(
            file_name=file), end="", flush=True)
        c = conn.cursor()
        c.execute("""SELECT p.name as column_name
                            FROM sqlite_master AS m
                            JOIN pragma_table_info(m.name) AS p
                            WHERE m.name = '{tbl}'""".format(tbl=table_name))
        cols = c.fetchall()
        columns = [",".join(row) for row in cols]
        c.execute("""SELECT p.name
                    FROM sqlite_master AS m
                    JOIN pragma_table_info(m.name) AS p
                    WHERE m.name = '{tbl}'
	                AND pk != 0
                    ORDER BY p.pk ASC""".format(tbl=table_name))
        pks = c.fetchall()
        try:
            if method == 'use_pandas':
                chunk_size = 100000
                for df in pd.read_csv(os.path.join(tpc_ds_files_processed, file),
                                      sep='|', names=columns, encoding=encoding, chunksize=chunk_size, iterator=True):
                    if pks:
                        pk = [",".join(row) for row in pks]
                        df.set_index(pk)
                    df.to_sql(table_name, conn,
                              if_exists='append', index=False)
            if method == 'use_csv':
                with open(os.path.join(tpc_ds_files_processed, file), 'r', encoding=encoding) as f:
                    reader = csv.reader(f, delimiter='|')
                    sql = "INSERT INTO {tbl} ({cols}) VALUES({vals})"
                    sql = sql.format(tbl=table_name, cols=', '.join(
                        columns), vals=','.join('?' * len(columns)))
                    # c.execute('BEGIN TRANSACTION')
                    for data in reader:
                        c.execute(sql, data)
                    # c.execute('COMMIT TRANSACTION')
            file_row_rounts = sum(1 for line in open(
                os.path.join(tpc_ds_files_processed, file), encoding=encoding, newline=''))
            db_row_counts = c.execute(
                """SELECT COUNT(1) FROM {}""".format(table_name)).fetchone()
        except Exception as e:
            print(e)
            sys.exit(1)
        try:
            columns_to_index = [v[1] for v in indexes if v[0] == table_name]
            if columns_to_index:
                l = len(columns_to_index)
                i = 0
                for column_name in columns_to_index:
                    index_name = "indx_{tbl}_{col}".format(
                        tbl=table_name, col=column_name)
                    c.execute("DROP INDEX IF EXISTS {indx};".format(
                        indx=index_name))
                    c.execute("CREATE INDEX {indx} ON {tbl}({col});".format(
                        tbl=table_name, indx=index_name, col=column_name))
                    index_created = c.execute("""SELECT 1 FROM sqlite_master
                                            WHERE type = 'index'
                                            AND tbl_name = '{tbl}'
                                            AND name = '{indx}' LIMIT 1""".format(tbl=table_name, indx=index_name)).fetchone()
                    if index_created:
                        i += 1
        except Exception as e:
            print(e)
            sys.exit(1)
        finally:
            if file_row_rounts != db_row_counts[0]:
                raise Exception(
                    "Table {tbl} failed to load correctly as record counts do not match: flat file: {ff_ct} vs database: {db_ct}.\
                        Please troubleshoot!".format(tbl=table_name, ff_ct=file_row_rounts, db_ct=db_row_counts[0]))
            if columns_to_index and l != i:
                raise Exception("Failed to create all nominated indexes on table '{tbl}'. Please troubleshoot!".format(
                    tbl=table_name))
            else:
                print("OK")
        c.close()


def CreateView(conn, sql_view_path, view_sql_name, *args):
    """
    Create multiple views used for data extract in Dask demo.
        Each view uses the same blueprint DDL stored in a sql file so to change
        the number of rows returned by the view, this function
        also changes the view schema by:
        (1) amending DROP statement with a new view name
        (2) amending CREATE VIEW statement by
                * adding 'record_count' suffix to the view name
                * limiting number of records output by adding LIMIT key word with
                  a 'record count' value as per the variable passed
    """
    for r in args:
        r = str(r)
        v_name = view_sql_name + '_' + r
        with open(sql_view_path, 'r') as f:
            view_sql_file = f.read()
        sql_commands = view_sql_file.split(';')
        for c in sql_commands[:-1]:
            if str(c).endswith(view_sql_name):
                c = (c + ';').replace(view_sql_name, v_name).strip()
            else:
                c = (c + ' LIMIT ' + r + ';').replace(view_sql_name, v_name).strip()
            try:
                conn.execute(c)
            except sqlite3.OperationalError as e:
                print(e)
                conn.rollback()
                sys.exit(1)
            else:
                conn.commit()
        c = conn.cursor()
        view_created = c.execute("""SELECT 1 FROM sqlite_master
                                WHERE type = 'view'
                                AND name = '{v}' LIMIT 1""".format(v=v_name)).fetchone()
        if view_created:
            print("View '{v}' created successfully.".format(v=v_name))
        else:
            raise Exception("Failed to create view '{v}'. Please troubleshoot!".format(
                v=v_name))


def main(view_sql_location, view_sql_filename):
    t = time.time()
    if os.path.isfile(os.path.join(dbsqlite_location, dbsqlite_filename)):
        print('Dropping existing {db} database...'.format(
            db=dbsqlite_filename))
        os.remove(os.path.join(dbsqlite_location, dbsqlite_filename))
    conn = sqlite3.connect(os.path.join(dbsqlite_location, dbsqlite_filename))
    if len(sys.argv[1:]) > 1:
        arg_method = sys.argv[1]
        arg_tables = [arg.replace(",", "") for arg in sys.argv[2:]]
        sql_schema = get_sql(os.path.join(
            schema_sql_location, schema_sql_filename))
        sql_view_path = os.path.join(view_sql_location, view_sql_filename)
        tables = [q for q in sql_schema]
        tables.append('all')
        if not arg_method or not any(e in arg_method for e in methods):
            raise ValueError('Incorrect load method argument provided. Choose from the following options: {m}'.format(
                m=', '.join(methods)))
        if not arg_tables or not any(e in arg_tables for e in tables):
            raise ValueError('Incorrect object name argument(s) provided. Choose from the following options: {t}'.format(
                t=', '.join(tables)))
        else:
            CleanUpFiles(tpc_ds_files, tpc_ds_files_processed, arg_tables)
            BuildSchema(conn, arg_method, arg_tables,
                        dbsqlite_location, dbsqlite_filename, sql_schema)
            LoadFiles(conn, tpc_ds_files_processed, encoding,
                      arg_method, indexes, arg_tables)
            CreateView(conn, sql_view_path, view_sql_name, *record_counts)
    else:
        raise ValueError(
            '''No/wrong arguments given. Please provide the following:
            (1) load method e.g. <use_pandas>
            (2) object name(s) e.g. <store_sales>
            Alternatively <all> argument will load all flat files''')
    print("Processed in {t} seconds.".format(
        t=format(time.time()-t, '.2f')))


if __name__ == '__main__':
    main(view_sql_location, view_sql_filename)

Now that we have the required data sets loaded and persisted in the database, let’s look at how fast this data can be extracted into Pandas and Dask dataframe objects as well as how fast certain trivial computation operations can be executed against those. Execution times are captured and visualised inside the notebook below using a simple matplotlib graph but Dask also comes with a nifty web interface to help deliver performance information over a standard web page in real time. This web interface is launched by default wherever the scheduler is launched providing the scheduler machine has Bokeh installed and includes data on system resources utilisation, tasks and workers progress, basic health checks etc. For example, the below task stream plot shows when tasks complete on which workers, with worker cores recorded on the y-axis and time on the x-axis.

For more detailed walkthrough of Dask web interface and its features Matthew Rocklin has a great video on YouTube – you can watch it HERE.

The below Jupyter notebook depicts sample Python code used to time the export of Sqlite data from 10 views created by the previous script visualised side by side for each view and library utilised (Pandas and Dask). It also compares execution times for some rudimentary operations e.g. sum(), max(), groupby() across three data sets i.e. 1 million, 5 million and 10 million records (again, read from Sqlite views). It is worth noting that Pandas, by design, is limited to a single CPU core execution (for most operations using standard CPython implementation thus being restricted by the Global Interpreter Lock GIL). Dask, on the other hand, was created from the ground-up to take advantage of multiple cores. By default, Dask Dataframe uses the multi-threaded scheduler. This exposes some parallelism when Pandas or the underlying NumPy operations release the global interpreter lock. Generally, Pandas is more GIL bound than NumPy, so multi-core speed-ups are not as pronounced for Dask DataFrame as they are for Dask Array. This is changing, and the Pandas development team is actively working on releasing the GIL. These few tests were run on my Mac Pro mid-2012 with 128GB DDR3 memory and dual Intel Xeon X5690 (12 cores/24 threads) CPUs. Dask version installed was v.1.2.2 and Python distribution version was v.3.6.8

Looking at the graph outlining comparative results between Dask and Pandas, it’s evident that data imports differ in performance and execution speed in favour of Dask. On the average, Dask seems to be twice as fast as Pandas when reading data from disk and this ratio is maintained across all data volumes in a fairly liner fashion.

Before I get to the computation performance overview, let’s look at how Dask utilised available resources to its advantage to parallelize number of operations across all available cores. All of the large-scale Dask collections like Dask Array, Dask DataFrame, and Dask Bag and the fine-grained APIs like delayed and futures generate task graphs where each node in the graph is a normal Python function and edges between nodes are normal Python objects that are created by one task as outputs and used as inputs in another task. After Dask generates these task graphs, it needs to execute them on parallel hardware. This is the job of a task scheduler. Different task schedulers exist, and each will consume a task graph and compute the same result, but with different performance characteristics. The threaded scheduler executes computations with a local multiprocessing.pool.ThreadPool. It is lightweight, requires no setup and it introduces very little task overhead (around 50us per task). Also, because everything occurs in the same process, it incurs no costs to transfer data between tasks. The threaded scheduler is the default choice for Dask Array, Dask DataFrame, and Dask Delayed. The below image captured CPU workload (using htop utility) during the below code execution. You can clearly see how Dask, by default, tried to spread the workload across multiple cores.

Looking at computation speed across number of different functions e.g. mean(), max(), groupby() the results are not as clear-cut as they were with the data import. Running these tests across three different data sources i.e. 1M, 5M and 10M records, Dask outperformed Pandas in ‘grouping’, ‘lambda’, ‘mean’ and ‘sum’ tests but surprisingly, in respect to ‘max’ and ‘min’, the advantage went to Pandas. I can only attribute this to the overhead Dask distributed scheduler creates when dispatching and collecting data across multiple threads/cores and Pandas efficiency in how it stores and indexes data inside the dataframe. Also, these datasets are quite small to truly do justice to how performant Dask can be in the right scenarios with the right environment setup and a lot more data to crunch so the old adage ‘the right tool for the right job’ is very fitting in this context.

To me it seems that Dask is more like a hammer, whereas Pandas is more akin to a scalpel – if you data is relatively small, Pandas is an excellent tool and the heavyweight approach of ‘divide and conquer’ is not the methodology you want to (or need to) use. Likewise, if your hardware and datasets are large enough to warrant taking advantage of parallelism and concurrency, Dask delivers on all fronts with minimal setup and API which is very similar to Pandas.

To sum up, the performance benefit (or drawback) of using a parallel dataframe like Dask dataframes over Pandas will differ based on the kinds of computations you do:

  • If you’re doing small computations then Pandas is always the right choice. The administrative costs of parallelizing will outweigh any benefit. You should not parallelize if your computations are taking less than, say, 100ms.
  • For simple operations like filtering, cleaning, and aggregating large data you should expect linear speedup by using a parallel dataframes. If you’re on a 20-core computer you might expect a 20x speedup. If you’re on a 1000-core cluster you might expect a 1000x speedup, assuming that you have a problem big enough to spread across 1000 cores. As you scale up administrative overhead will increase, so you should expect the speedup to decrease a bit.
  • For complex operations like distributed joins it’s more complicated. You might get linear speedups like above, or you might even get slowdowns. Someone experienced in database-like computations and parallel computing can probably predict pretty well which computations will do well.

Tags: , , , , , ,