Kicking the tires on BigQuery – Google’s Serverless Enterprise Data Warehouse (Part 1)

September 21st, 2018 / No Comments » / by admin

Note: Part 2 can be found HERE.

Introduction

As traditional, on-premise hosted data warehouse solutions are increasingly becoming harder to scale and manage, a new breed of vendors and products is starting to emerge, one which can easily accommodate exponentially growing data volumes with little up-front cost and very little operational overhead. More and more do we get to see cash-strapped IT departments or startups being pushed by their VC overlords to provide immediate ROI, not being able to take time and build up their analytical capability following the old-fashion, waterfall trajectory. Instead, the famous Facebook mantra of “move fast and break things” is gradually becoming a standard to live up to, with cloud-first, pay-for-what-you-use, ops-free services making inroads or displacing/replacing ‘old guard’ technologies. Data warehousing domain has successfully avoided being dragged into the relentless push for optimisation and efficiency for a very long time, with many venerable vendors (IBM, Oracle, Teradata) resting on their laurels, sometimes for decades. However, with the advent of the cloud computing, new juggernauts emerged and Google’s BigQuery is slowly becoming synonymous with petabyte-scale, ops-free, ultra-scalable and extremely fast data warehouse service. And while BigQuery has not (yet) become the go-to platform for large volumes of data storage and processing, big organisations such as New York Times and Spotify decided to adopt it, bucking the industry trend of selecting existing cloud data warehouse incumbents e.g. AWS Redshift or Microsoft SQL DW.

BigQuery traces its roots back to 2010, when Google released beta access to a new SQL processing system based on its distributed query engine technology, called Dremel, which was described in an influential paper released the same year. BigQuery and Dremel share the same underlying architecture and by incorporating columnar storage and tree architecture of Dremel, BigQuery manages to offer unprecedented performance. But, BigQuery is much more than Dremel which serves as the execution engine for the BigQuery. In fact, BigQuery service leverages Google’s innovative technologies like Borg – large-scale cluster management system, Colossus – Google’s latest generation distributed file system, Capacitor – columnar data storage format, and Jupiter – Google’s networking infrastructure. As illustrated below, a BigQuery client interact with Dremel engine via a client interface. Borg – Google’s large-scale cluster management system – allocates the compute capacity for the Dremel jobs. Dremel jobs read data from Google’s Colossus file systems using Jupiter network, perform various SQL operations and return results to the client.

Having previously worked with a number of other vendors which have successfully occupied this realm for a very long time, it was refreshing to see how different BigQuery is and what separates it from the rest of the competition. But before I get into the general observations and the actual conclusion, let’s look at how one can easily process, load and query large volumes of data (TPC-DS data in this example) using BigQuery and a few Python scripts for automation.

TPC-DS Schema Creation and Data Loading

BigQuery offers a number of ways one can interact with it and take advantage of its functionality. The most basic one is through the GCP Web UI but for repetitive tasks users will find themselves mainly utilising Google Cloud SDK or via Google BigQuery API. For the purpose of this example, I have generated two TPC-DS datasets – 100GB and 300GB – and staged them on the local machine’s folder. I have also created a TPC-DS schema JSON file which is used by the script to generate all dataset objects. All these resources, as well as the scripts used in this post can be found in my OneDrive folder HERE.

The following config file parameters and Python code are used to mainly clean up the flat files (TPC-DS delimits line ending with a “|” character which needs to be removed), break them up into smaller chunks and upload them to Google Cloud storage service (it assumes that gs://tpc_ds_data_100GB and gs://tpc_ds_data_300GB buckets has already been created). The schema structure used for this project is a direct translation of the PostgreSQL-specific TPC-DS schema used in one of my earlier projects where data types and their NULL-ability have been converted and standardized to conform to BigQuery standards.

import configparser
from tqdm import tqdm
from os import path, rename, listdir, remove, walk
from shutil import copy2, move
from google.cloud import storage
from csv import reader, writer


config = configparser.ConfigParser()
config.read("params.cfg")

bq_tpc_ds_schema_as_json = config.get(
    "Files_Path", path.normpath("bq_tpc_ds_schema_as_json"))
bq_local_source_files_path = config.get(
    "Files_Path", path.normpath("bq_local_source_files_path"))
bq_local_target_files_path = config.get(
    "Files_Path", path.normpath("bq_local_target_files_path"))
storage_client = config.get("Big_Query", path.normpath("bq_client"))
bq_bucket_path = config.get("Big_Query", "bq_bucket_path")
bq_storage_client = storage.Client.from_service_account_json(storage_client)


keep_src_files = True
keep_headers = False
keep_src_csv_files = True
row_limit = 1000000
output_name_template = '_%s.csv'
delimiter = '|'


def RemoveFiles(DirPath):
    filelist = [f for f in listdir(DirPath)]
    for f in filelist:
        remove(path.join(DirPath, f))


def SourceFilesRename(bq_local_source_files_path):
    for file in listdir(bq_local_source_files_path):
        if file.endswith(".dat"):
            rename(path.join(bq_local_source_files_path, file),
                   path.join(bq_local_source_files_path, file[:-4]+'.csv'))


def SplitLargeFiles(file, row_count, bq_local_target_files_path, output_name_template, keep_headers=False):
    """
    Split flat files into smaller chunks based on the comparison between row count 
    and 'row_limit' variable value. This creates file_row_count/row_limit number of files which can 
    facilitate upload speed if parallelized. 
    """
    file_handler = open(path.join(bq_local_target_files_path, file), 'r')
    csv_reader = reader(file_handler, delimiter=delimiter)
    current_piece = 1
    current_out_path = path.join(bq_local_target_files_path, path.splitext(file)[
                                 0]+output_name_template % current_piece)
    current_out_writer = writer(
        open(current_out_path, 'w', newline=''), delimiter=delimiter)
    current_limit = row_limit
    if keep_headers:
        headers = next(csv_reader)
        current_out_writer.writerow(headers)
    pbar = tqdm(total=row_count)
    for i, row in enumerate(csv_reader):
        pbar.update()
        if i + 1 > current_limit:
            current_piece += 1
            current_limit = row_limit * current_piece
            current_out_path = path.join(bq_local_target_files_path, path.splitext(file)[
                                         0]+output_name_template % current_piece)
            current_out_writer = writer(
                open(current_out_path, 'w', newline=''), delimiter=delimiter)
            if keep_headers:
                current_out_writer.writerow(headers)
        current_out_writer.writerow(row)
    pbar.close()
    print("\n")


def ProcessLargeFiles(bq_local_source_files_path, bq_local_target_files_path, output_name_template, row_limit):
    """
    Remove trailing '|' characters from the files generated by the TPC-DS utility
    as they intefere with BQ load function. As BQ does not allow for denoting end-of-line 
    characters in the flat file, this breaks data load functionality. This function also calls
    the 'SplitLargeFiles' function resulting in splitting files with the row count > 'row_limit'
    variable value into smaller chunks/files.
    """
    RemoveFiles(bq_local_target_files_path)
    fileRowCounts = []
    for file in listdir(bq_local_source_files_path):
        if file.endswith(".csv"):
            fileRowCounts.append([file, sum(1 for line in open(
                path.join(bq_local_source_files_path, file), newline=''))])
    for file in fileRowCounts:
        if file[1] > row_limit:
            print(
                "Removing trailing characters in {f} file...".format(f=file[0]))
            RemoveTrailingChar(row_limit, path.join(
                bq_local_source_files_path, file[0]), path.join(bq_local_target_files_path, file[0]))
            print("\nSplitting file:", file[0],)
            print("Measured row count:", file[1])
            print("Progress...")
            SplitLargeFiles(
                file[0], file[1], bq_local_target_files_path, output_name_template)
            remove(path.join(bq_local_target_files_path, file[0]))
        else:
            print(
                "Removing trailing characters in {f} file...".format(f=file[0]))
            RemoveTrailingChar(row_limit, path.join(
                bq_local_source_files_path, file[0]), path.join(bq_local_target_files_path, file[0]))
    RemoveFiles(bq_local_source_files_path)


def RemoveTrailingChar(row_limit, bq_local_source_files_path, bq_local_target_files_path):
    """
    Remove trailing '|' characters from files' line ending. 
    """
    line_numer = row_limit
    lines = []
    with open(bq_local_source_files_path, "r") as r, open(bq_local_target_files_path, "w") as w:
        for line in r:
            if line.endswith('|\n'):
                lines.append(line[:-2]+"\n")
                if len(lines) == line_numer:
                    w.writelines(lines)
                    lines = []
        w.writelines(lines)


def GcsUploadBlob(bq_storage_client, bq_local_target_files_path, bq_bucket_path):
    """
    Upload files into the nominated GCP bucket if it does not exists. 
    """
    bucket_name = (bq_bucket_path.split("//"))[1].split("/")[0]
    CHUNK_SIZE = 10485760
    bucket = bq_storage_client.get_bucket(bucket_name)
    print("\nCommencing files upload...")
    for file in listdir(bq_local_target_files_path):
        try:
            blob = bucket.blob(file, chunk_size=CHUNK_SIZE)
            file_exist = storage.Blob(bucket=bucket, name=file).exists(
                bq_storage_client)
            if not file_exist:
                print("Uploading file {fname} into {bname} GCP bucket...".format(
                    fname=file, bname=bucket_name))
                blob.upload_from_filename(
                    path.join(bq_local_target_files_path, file))
            else:
                print("Nominated file {fname} already exists in {bname} bucket. Moving on...".format(
                    fname=file, bname=bucket_name))
        except Exception as e:
            print(e)


if __name__ == "__main__":
    SourceFilesRename(bq_local_source_files_path)
    ProcessLargeFiles(bq_local_source_files_path,
                      bq_local_target_files_path, output_name_template, row_limit)
    GcsUploadBlob(bq_storage_client,
                  bq_local_target_files_path, bq_bucket_path)

Once all the TPC-DS data has been moved across into GCP storage bucket, we can proceed with creating BigQuery dataset (synonymous with the schema name in a typical Microsoft SQL Server or PostgreSQL RDBMS hierarchy) and its corresponding tables. Dataset is associated with a project which forms the overarching container, somewhat equivalent to the database if you come from a traditional RDBMS experience. It is also worth pointing out that dataset names must be unique per project, all tables referenced in a query must be stored in datasets in the same location and finally, geographic location can be set at creation time only. The following is an excerpt from the JSON file used to create the dataset schema which can be downloaded from my OneDrive folder HERE.

{
    "table_schema": [
        {
            "table_name": "dbgen_version",
            "fields": [
                {
                    "name": "dv_version",
                    "type": "STRING",
                    "mode": "NULLABLE"
                },
                {
                    "name": "dv_create_date",
                    "type": "DATE",
                    "mode": "NULLABLE"
                },
                {
                    "name": "dv_create_time",
                    "type": "TIME",
                    "mode": "NULLABLE"
                },
                {
                    "name": "dv_cmdline_args",
                    "type": "STRING",
                    "mode": "NULLABLE"
                }
            ]
        }
    ]
}

Now that the dataset is in place, we can create the required tables and populate them with the flat files data which was moved across into Google cloud storage using the previous code snippet. The following Python script creates tpc_ds_test_data dataset, creates all tables based on the TPC-DS schema stored in the JSON file and finally populates them with text files data.

from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.cloud import storage
from os import path
import configparser
import json
import re


config = configparser.ConfigParser()
config.read("params.cfg")

bq_tpc_ds_schema_as_json = config.get(
    "Files_Path", path.normpath("bq_tpc_ds_schema_as_json"))
bq_client = config.get("Big_Query", path.normpath("bq_client"))
bq_client = bigquery.Client.from_service_account_json(bq_client)
storage_client = config.get("Big_Query", path.normpath("bq_client"))
bq_bucket_path = config.get("Big_Query", "bq_bucket_path")
bq_storage_client = storage.Client.from_service_account_json(storage_client)
bq_ref_dataset = config.get("Big_Query", "bq_ref_dataset")


def dataset_bq_exists(bq_client, bq_ref_dataset):
    try:
        bq_client.get_dataset(bq_ref_dataset)
        return True
    except NotFound as e:
        return False


def bq_create_dataset(bq_client, bq_ref_dataset):
    """
    Create BigQuery dataset if it does not exists in the
    Australian location
    """
    if not dataset_bq_exists(bq_client, bq_ref_dataset):
        print("Creating {d} dataset...".format(d=bq_ref_dataset))
        dataset = bq_client.dataset(bq_ref_dataset)
        dataset = bigquery.Dataset(dataset)
        dataset.location = 'australia-southeast1'
        dataset = bq_client.create_dataset(dataset)
    else:
        print("Nominated dataset already exists. Moving on...")


def table_bq_exists(bq_client, ref_table):
    try:
        bq_client.get_table(ref_table)
        return True
    except NotFound as e:
        return False


def bq_drop_create_table(bq_client, bq_ref_dataset, bq_tpc_ds_schema_as_json):
    """
    Drop (if exists) and create schema tables on the nominated BigQuery dataset.
    Schema details are stored inside 'tpc_ds_schema.json' file.
    """
    dataset = bq_client.dataset(bq_ref_dataset)
    with open(bq_tpc_ds_schema_as_json) as schema_file:
        data = json.load(schema_file)
        for t in data['table_schema']:
            table_name = t['table_name']
            ref_table = dataset.table(t['table_name'])
            table = bigquery.Table(ref_table)
            if table_bq_exists(bq_client, ref_table):
                print("Table '{tname}' already exists in the '{dname}' dataset. Dropping table '{tname}'...".format(
                    tname=table_name, dname=bq_ref_dataset))
                bq_client.delete_table(table)
            for f in t['fields']:
                table.schema += (
                    bigquery.SchemaField(f['name'], f['type'], mode=f['mode']),)
            print("Creating table {tname} in the '{dname}' dataset...".format(
                tname=table_name, dname=bq_ref_dataset))
            table = bq_client.create_table(table)


def bq_load_data_from_file(bq_client, bq_ref_dataset,  bq_bucket_path, bq_storage_client):
    """
    Load data stored on the nominated GCP bucket into dataset tables
    """
    bucket_name = (bq_bucket_path.split("//"))[1].split("/")[0]
    bucket = bq_storage_client.get_bucket(bucket_name)
    blobs = bucket.list_blobs()
    dataset = bq_client.dataset(bq_ref_dataset)
    for blob in blobs:
        file_name = blob.name
        rem_char = re.findall("_\d+", file_name)
        if len(rem_char) == 0:
            ref_table = dataset.table(file_name[:file_name.index(".")])
            table_name = file_name[:file_name.index(".")]
        else:
            rem_char = str("".join(rem_char))
            ref_table = dataset.table(file_name[:file_name.index(rem_char)])
            table_name = file_name[:file_name.index(rem_char)]
        print("Loading file {f} into {t} table...".format(
            f=file_name, t=table_name))
        job_config = bigquery.LoadJobConfig()
        job_config.source_format = "CSV"
        job_config.skip_leading_rows = 0
        job_config.field_delimiter = "|"
        job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
        job = bq_client.load_table_from_uri(
            path.join(bq_bucket_path, file_name), ref_table, job_config=job_config)
        result = job.result()
        print(result.state)


if __name__ == "__main__":
    bq_create_dataset(bq_client, bq_ref_dataset)
    bq_drop_create_table(bq_client, bq_ref_dataset, bq_tpc_ds_schema_as_json)
    bq_load_data_from_file(bq_client, bq_ref_dataset,
                           bq_bucket_path, bq_storage_client)

BigQuery offers some special tables whose contents represent metadata, such as the list of tables and views in a dataset. To confirm all objects have been created in the correct dataset, we can interrogate the __TABLES__ or __TABLES_SUMMARY__ meta-tables as per the query below.

In the next post I will go over some of TPC-DS queries execution performance on BigQuery, BigQuery ML – Google’s take on in-database machine learning as well as some basic interactive dashboard building in Tableau.

Tags: , , , , , ,

Vertica MPP Database Overview and TPC-DS Benchmark Performance Analysis (Part 4)

June 11th, 2018 / 2 Comments » / by admin

Testing Continued…

In the previous post I started looking at some of the TPC-DS queries’ performance across a single node and multi-node environments. This post continues with the theme of performance analysis across the remaining set of queries and looks at high-level interactions between Vertica and Tableau/PowerBI. Firstly though, let’s look at the remaining ten queries and how their execution times fared in the context of three different data and cluster sizes.

--Query 44
SELECT asceding.rnk,
               i1.i_product_name best_performing,
               i2.i_product_name worst_performing
FROM  (SELECT *
       FROM   (SELECT item_sk,
                      Rank()
                        OVER (
                          ORDER BY rank_col ASC) rnk
               FROM   (SELECT ss_item_sk         item_sk,
                              Avg(ss_net_profit) rank_col
                       FROM   tpc_ds.store_sales ss1
                       WHERE  ss_store_sk = 4
                       GROUP  BY ss_item_sk
                       HAVING Avg(ss_net_profit) > 0.9 *
                              (SELECT Avg(ss_net_profit)
                                      rank_col
                               FROM   tpc_ds.store_sales
                               WHERE  ss_store_sk = 4
                                      AND ss_cdemo_sk IS
                                          NULL
                               GROUP  BY ss_store_sk))V1)
              V11
       WHERE  rnk < 11) asceding,
      (SELECT *
       FROM   (SELECT item_sk,
                      Rank()
                        OVER (
                          ORDER BY rank_col DESC) rnk
               FROM   (SELECT ss_item_sk         item_sk,
                              Avg(ss_net_profit) rank_col
                       FROM   tpc_ds.store_sales ss1
                       WHERE  ss_store_sk = 4
                       GROUP  BY ss_item_sk
                       HAVING Avg(ss_net_profit) > 0.9 *
                              (SELECT Avg(ss_net_profit)
                                      rank_col
                               FROM   tpc_ds.store_sales
                               WHERE  ss_store_sk = 4
                                      AND ss_cdemo_sk IS
                                          NULL
                               GROUP  BY ss_store_sk))V2)
              V21
       WHERE  rnk < 11) descending,
      tpc_ds.item i1,
      tpc_ds.item i2
WHERE  asceding.rnk = descending.rnk
       AND i1.i_item_sk = asceding.item_sk
       AND i2.i_item_sk = descending.item_sk
ORDER  BY asceding.rnk
LIMIT 100;

--Query 46
SELECT c_last_name,
               c_first_name,
               ca_city,
               bought_city,
               ss_ticket_number,
               amt,
               profit
FROM   (SELECT ss_ticket_number,
               ss_customer_sk,
               ca_city            bought_city,
               Sum(ss_coupon_amt) amt,
               Sum(ss_net_profit) profit
        FROM   tpc_ds.store_sales,
               tpc_ds.date_dim,
               tpc_ds.store,
               tpc_ds.household_demographics,
               tpc_ds.customer_address
        WHERE  store_sales.ss_sold_date_sk = date_dim.d_date_sk
               AND store_sales.ss_store_sk = store.s_store_sk
               AND store_sales.ss_hdemo_sk = household_demographics.hd_demo_sk
               AND store_sales.ss_addr_sk = customer_address.ca_address_sk
               AND ( household_demographics.hd_dep_count = 6
                      OR household_demographics.hd_vehicle_count = 0 )
               AND date_dim.d_dow IN ( 6, 0 )
               AND date_dim.d_year IN ( 2000, 2000 + 1, 2000 + 2 )
               AND store.s_city IN ( 'Midway', 'Fairview', 'Fairview',
                                     'Fairview',
                                     'Fairview' )
        GROUP  BY ss_ticket_number,
                  ss_customer_sk,
                  ss_addr_sk,
                  ca_city) dn,
       tpc_ds.customer,
       tpc_ds.customer_address current_addr
WHERE  ss_customer_sk = c_customer_sk
       AND customer.c_current_addr_sk = current_addr.ca_address_sk
       AND current_addr.ca_city <> bought_city
ORDER  BY c_last_name,
          c_first_name,
          ca_city,
          bought_city,
          ss_ticket_number
LIMIT 100;

--Query 47
WITH v1 
     AS (SELECT i_category, 
                i_brand, 
                s_store_name, 
                s_company_name, 
                d_year, 
                d_moy, 
                Sum(ss_sales_price)         sum_sales, 
                Avg(Sum(ss_sales_price)) 
                  OVER ( 
                    partition BY i_category, i_brand, s_store_name, 
                  s_company_name, 
                  d_year) 
                                            avg_monthly_sales, 
                Rank() 
                  OVER ( 
                    partition BY i_category, i_brand, s_store_name, 
                  s_company_name 
                    ORDER BY d_year, d_moy) rn 
         FROM   tpc_ds.item,
                tpc_ds.store_sales,
                tpc_ds.date_dim,
                tpc_ds.store
         WHERE  ss_item_sk = i_item_sk 
                AND ss_sold_date_sk = d_date_sk 
                AND ss_store_sk = s_store_sk 
                AND ( d_year = 1999 
                       OR ( d_year = 1999 - 1 
                            AND d_moy = 12 ) 
                       OR ( d_year = 1999 + 1 
                            AND d_moy = 1 ) ) 
         GROUP  BY i_category, 
                   i_brand, 
                   s_store_name, 
                   s_company_name, 
                   d_year, 
                   d_moy), 
     v2 
     AS (SELECT v1.i_category, 
                v1.d_year, 
                v1.d_moy, 
                v1.avg_monthly_sales, 
                v1.sum_sales, 
                v1_lag.sum_sales  psum, 
                v1_lead.sum_sales nsum 
         FROM   v1, 
                v1 v1_lag, 
                v1 v1_lead 
         WHERE  v1.i_category = v1_lag.i_category 
                AND v1.i_category = v1_lead.i_category 
                AND v1.i_brand = v1_lag.i_brand 
                AND v1.i_brand = v1_lead.i_brand 
                AND v1.s_store_name = v1_lag.s_store_name 
                AND v1.s_store_name = v1_lead.s_store_name 
                AND v1.s_company_name = v1_lag.s_company_name 
                AND v1.s_company_name = v1_lead.s_company_name 
                AND v1.rn = v1_lag.rn + 1 
                AND v1.rn = v1_lead.rn - 1) 
SELECT * 
FROM   v2 
WHERE  d_year = 1999 
       AND avg_monthly_sales > 0 
       AND CASE 
             WHEN avg_monthly_sales > 0 THEN Abs(sum_sales - avg_monthly_sales) 
                                             / 
                                             avg_monthly_sales 
             ELSE NULL 
           END > 0.1 
ORDER  BY sum_sales - avg_monthly_sales, 
          3
LIMIT 100; 

--Query 54
WITH my_customers
     AS (SELECT DISTINCT c_customer_sk,
                         c_current_addr_sk
         FROM   (SELECT cs_sold_date_sk     sold_date_sk,
                        cs_bill_customer_sk customer_sk,
                        cs_item_sk          item_sk
                 FROM   tpc_ds.catalog_sales
                 UNION ALL
                 SELECT ws_sold_date_sk     sold_date_sk,
                        ws_bill_customer_sk customer_sk,
                        ws_item_sk          item_sk
                 FROM   tpc_ds.web_sales) cs_or_ws_sales,
                tpc_ds.item,
                tpc_ds.date_dim,
                tpc_ds.customer
         WHERE  sold_date_sk = d_date_sk
                AND item_sk = i_item_sk
                AND i_category = 'Sports'
                AND i_class = 'fitness'
                AND c_customer_sk = cs_or_ws_sales.customer_sk
                AND d_moy = 5
                AND d_year = 2000),
     my_revenue
     AS (SELECT c_customer_sk,
                Sum(ss_ext_sales_price) AS revenue
         FROM   my_customers,
                tpc_ds.store_sales,
                tpc_ds.customer_address,
                tpc_ds.store,
                tpc_ds.date_dim
         WHERE  c_current_addr_sk = ca_address_sk
                AND ca_county = s_county
                AND ca_state = s_state
                AND ss_sold_date_sk = d_date_sk
                AND c_customer_sk = ss_customer_sk
                AND d_month_seq BETWEEN (SELECT DISTINCT d_month_seq + 1
                                         FROM   tpc_ds.date_dim
                                         WHERE  d_year = 2000
                                                AND d_moy = 5) AND
                                        (SELECT DISTINCT
                                        d_month_seq + 3
                                         FROM   tpc_ds.date_dim
                                         WHERE  d_year = 2000
                                                AND d_moy = 5)
         GROUP  BY c_customer_sk),
     segments
     AS (SELECT Cast(( revenue / 50 ) AS INT) AS segment
         FROM   my_revenue)
SELECT segment,
               Count(*)     AS num_customers,
               segment * 50 AS segment_base
FROM   segments
GROUP  BY segment
ORDER  BY segment,
          num_customers
LIMIT 100;

--Query 57
WITH v1
     AS (SELECT i_category,
                i_brand,
                cc_name,
                d_year,
                d_moy,
                Sum(cs_sales_price)                                    sum_sales
                ,
                Avg(Sum(cs_sales_price))
                  OVER (
                    partition BY i_category, i_brand, cc_name, d_year)
                avg_monthly_sales
                   ,
                Rank()
                  OVER (
                    partition BY i_category, i_brand, cc_name
                    ORDER BY d_year, d_moy)                            rn
         FROM   tpc_ds.item,
                tpc_ds.catalog_sales,
                tpc_ds.date_dim,
                tpc_ds.call_center
         WHERE  cs_item_sk = i_item_sk
                AND cs_sold_date_sk = d_date_sk
                AND cc_call_center_sk = cs_call_center_sk
                AND ( d_year = 2000
                       OR ( d_year = 2000 - 1
                            AND d_moy = 12 )
                       OR ( d_year = 2000 + 1
                            AND d_moy = 1 ) )
         GROUP  BY i_category,
                   i_brand,
                   cc_name,
                   d_year,
                   d_moy),
     v2
     AS (SELECT v1.i_brand,
                v1.d_year,
                v1.avg_monthly_sales,
                v1.sum_sales,
                v1_lag.sum_sales  psum,
                v1_lead.sum_sales nsum
         FROM   v1,
                v1 v1_lag,
                v1 v1_lead
         WHERE  v1.i_category = v1_lag.i_category
                AND v1.i_category = v1_lead.i_category
                AND v1.i_brand = v1_lag.i_brand
                AND v1.i_brand = v1_lead.i_brand
                AND v1. cc_name = v1_lag. cc_name
                AND v1. cc_name = v1_lead. cc_name
                AND v1.rn = v1_lag.rn + 1
                AND v1.rn = v1_lead.rn - 1)
SELECT *
FROM   v2
WHERE  d_year = 2000
       AND avg_monthly_sales > 0
       AND CASE
             WHEN avg_monthly_sales > 0 THEN Abs(sum_sales - avg_monthly_sales)
                                             /
                                             avg_monthly_sales
             ELSE NULL
           END > 0.1
ORDER  BY sum_sales - avg_monthly_sales,
          3
LIMIT 100; 

--Query 64
WITH cs_ui 
     AS (SELECT cs_item_sk, 
                Sum(cs_ext_list_price) AS sale, 
                Sum(cr_refunded_cash + cr_reversed_charge 
                    + cr_store_credit) AS refund 
         FROM   tpc_ds.catalog_sales,
                tpc_ds.catalog_returns
         WHERE  cs_item_sk = cr_item_sk 
                AND cs_order_number = cr_order_number 
         GROUP  BY cs_item_sk 
         HAVING Sum(cs_ext_list_price) > 2 * Sum( 
                cr_refunded_cash + cr_reversed_charge 
                + cr_store_credit)), 
     cross_sales 
     AS (SELECT i_product_name         product_name, 
                i_item_sk              item_sk, 
                s_store_name           store_name, 
                s_zip                  store_zip, 
                ad1.ca_street_number   b_street_number, 
                ad1.ca_street_name     b_streen_name, 
                ad1.ca_city            b_city, 
                ad1.ca_zip             b_zip, 
                ad2.ca_street_number   c_street_number, 
                ad2.ca_street_name     c_street_name, 
                ad2.ca_city            c_city, 
                ad2.ca_zip             c_zip, 
                d1.d_year              AS syear, 
                d2.d_year              AS fsyear, 
                d3.d_year              s2year, 
                Count(*)               cnt, 
                Sum(ss_wholesale_cost) s1, 
                Sum(ss_list_price)     s2, 
                Sum(ss_coupon_amt)     s3 
         FROM   tpc_ds.store_sales,
                tpc_ds.store_returns,
                cs_ui, 
                tpc_ds.date_dim d1,
                tpc_ds.date_dim d2,
                tpc_ds.date_dim d3,
                tpc_ds.store,
                tpc_ds.customer,
                tpc_ds.customer_demographics cd1,
                tpc_ds.customer_demographics cd2,
                tpc_ds.promotion,
                tpc_ds.household_demographics hd1,
                tpc_ds.household_demographics hd2,
                tpc_ds.customer_address ad1,
                tpc_ds.customer_address ad2,
                tpc_ds.income_band ib1,
                tpc_ds.income_band ib2,
                tpc_ds.item
         WHERE  ss_store_sk = s_store_sk 
                AND ss_sold_date_sk = d1.d_date_sk 
                AND ss_customer_sk = c_customer_sk 
                AND ss_cdemo_sk = cd1.cd_demo_sk 
                AND ss_hdemo_sk = hd1.hd_demo_sk 
                AND ss_addr_sk = ad1.ca_address_sk 
                AND ss_item_sk = i_item_sk 
                AND ss_item_sk = sr_item_sk 
                AND ss_ticket_number = sr_ticket_number 
                AND ss_item_sk = cs_ui.cs_item_sk 
                AND c_current_cdemo_sk = cd2.cd_demo_sk 
                AND c_current_hdemo_sk = hd2.hd_demo_sk 
                AND c_current_addr_sk = ad2.ca_address_sk 
                AND c_first_sales_date_sk = d2.d_date_sk 
                AND c_first_shipto_date_sk = d3.d_date_sk 
                AND ss_promo_sk = p_promo_sk 
                AND hd1.hd_income_band_sk = ib1.ib_income_band_sk 
                AND hd2.hd_income_band_sk = ib2.ib_income_band_sk 
                AND cd1.cd_marital_status <> cd2.cd_marital_status 
                AND i_color IN ( 'cyan', 'peach', 'blush', 'frosted', 
                                 'powder', 'orange' ) 
                AND i_current_price BETWEEN 58 AND 58 + 10 
                AND i_current_price BETWEEN 58 + 1 AND 58 + 15 
         GROUP  BY i_product_name, 
                   i_item_sk, 
                   s_store_name, 
                   s_zip, 
                   ad1.ca_street_number, 
                   ad1.ca_street_name, 
                   ad1.ca_city, 
                   ad1.ca_zip, 
                   ad2.ca_street_number, 
                   ad2.ca_street_name, 
                   ad2.ca_city, 
                   ad2.ca_zip, 
                   d1.d_year, 
                   d2.d_year, 
                   d3.d_year) 
SELECT cs1.product_name, 
       cs1.store_name, 
       cs1.store_zip, 
       cs1.b_street_number, 
       cs1.b_streen_name, 
       cs1.b_city, 
       cs1.b_zip, 
       cs1.c_street_number, 
       cs1.c_street_name, 
       cs1.c_city, 
       cs1.c_zip, 
       cs1.syear, 
       cs1.cnt, 
       cs1.s1, 
       cs1.s2, 
       cs1.s3, 
       cs2.s1, 
       cs2.s2, 
       cs2.s3, 
       cs2.syear, 
       cs2.cnt 
FROM   cross_sales cs1, 
       cross_sales cs2 
WHERE  cs1.item_sk = cs2.item_sk 
       AND cs1.syear = 2001 
       AND cs2.syear = 2001 + 1 
       AND cs2.cnt <= cs1.cnt 
       AND cs1.store_name = cs2.store_name 
       AND cs1.store_zip = cs2.store_zip 
ORDER  BY cs1.product_name, 
          cs1.store_name, 
          cs2.cnt; 	

--Query 74
WITH year_total 
     AS (SELECT c_customer_id    customer_id, 
                c_first_name     customer_first_name, 
                c_last_name      customer_last_name, 
                d_year           AS year1, 
                Sum(ss_net_paid) year_total, 
                's'              sale_type 
         FROM   tpc_ds.customer,
                tpc_ds.store_sales,
                tpc_ds.date_dim
         WHERE  c_customer_sk = ss_customer_sk 
                AND ss_sold_date_sk = d_date_sk 
                AND d_year IN ( 1999, 1999 + 1 ) 
         GROUP  BY c_customer_id, 
                   c_first_name, 
                   c_last_name, 
                   d_year 
         UNION ALL 
         SELECT c_customer_id    customer_id, 
                c_first_name     customer_first_name, 
                c_last_name      customer_last_name, 
                d_year           AS year1, 
                Sum(ws_net_paid) year_total, 
                'w'              sale_type 
         FROM   tpc_ds.customer,
                tpc_ds.web_sales,
                tpc_ds.date_dim
         WHERE  c_customer_sk = ws_bill_customer_sk 
                AND ws_sold_date_sk = d_date_sk 
                AND d_year IN ( 1999, 1999 + 1 ) 
         GROUP  BY c_customer_id, 
                   c_first_name, 
                   c_last_name, 
                   d_year) 
SELECT t_s_secyear.customer_id, 
               t_s_secyear.customer_first_name, 
               t_s_secyear.customer_last_name 
FROM   year_total t_s_firstyear, 
       year_total t_s_secyear, 
       year_total t_w_firstyear, 
       year_total t_w_secyear 
WHERE  t_s_secyear.customer_id = t_s_firstyear.customer_id 
       AND t_s_firstyear.customer_id = t_w_secyear.customer_id 
       AND t_s_firstyear.customer_id = t_w_firstyear.customer_id 
       AND t_s_firstyear.sale_type = 's' 
       AND t_w_firstyear.sale_type = 'w' 
       AND t_s_secyear.sale_type = 's' 
       AND t_w_secyear.sale_type = 'w' 
       AND t_s_firstyear.year1 = 1999 
       AND t_s_secyear.year1 = 1999 + 1 
       AND t_w_firstyear.year1 = 1999 
       AND t_w_secyear.year1 = 1999 + 1 
       AND t_s_firstyear.year_total > 0 
       AND t_w_firstyear.year_total > 0 
       AND CASE 
             WHEN t_w_firstyear.year_total > 0 THEN t_w_secyear.year_total / 
                                                    t_w_firstyear.year_total 
             ELSE NULL 
           END > CASE 
                   WHEN t_s_firstyear.year_total > 0 THEN 
                   t_s_secyear.year_total / 
                   t_s_firstyear.year_total 
                   ELSE NULL 
                 END 
ORDER  BY 1, 
          2, 
          3
LIMIT 100; 		

--Query 75
WITH all_sales
     AS (SELECT d_year,
                i_brand_id,
                i_class_id,
                i_category_id,
                i_manufact_id,
                Sum(sales_cnt) AS sales_cnt,
                Sum(sales_amt) AS sales_amt
         FROM   (SELECT d_year,
                        i_brand_id,
                        i_class_id,
                        i_category_id,
                        i_manufact_id,
                        cs_quantity - COALESCE(cr_return_quantity, 0)        AS
                        sales_cnt,
                        cs_ext_sales_price - COALESCE(cr_return_amount, 0.0) AS
                        sales_amt
                 FROM   tpc_ds.catalog_sales
                        JOIN tpc_ds.item
                          ON i_item_sk = cs_item_sk
                        JOIN tpc_ds.date_dim
                          ON d_date_sk = cs_sold_date_sk
                        LEFT JOIN tpc_ds.catalog_returns
                               ON ( cs_order_number = cr_order_number
                                    AND cs_item_sk = cr_item_sk )
                 WHERE  i_category = 'Men'
                 UNION
                 SELECT d_year,
                        i_brand_id,
                        i_class_id,
                        i_category_id,
                        i_manufact_id,
                        ss_quantity - COALESCE(sr_return_quantity, 0)     AS
                        sales_cnt,
                        ss_ext_sales_price - COALESCE(sr_return_amt, 0.0) AS
                        sales_amt
                 FROM   tpc_ds.store_sales
                        JOIN tpc_ds.item
                          ON i_item_sk = ss_item_sk
                        JOIN tpc_ds.date_dim
                          ON d_date_sk = ss_sold_date_sk
                        LEFT JOIN tpc_ds.store_returns
                               ON ( ss_ticket_number = sr_ticket_number
                                    AND ss_item_sk = sr_item_sk )
                 WHERE  i_category = 'Men'
                 UNION
                 SELECT d_year,
                        i_brand_id,
                        i_class_id,
                        i_category_id,
                        i_manufact_id,
                        ws_quantity - COALESCE(wr_return_quantity, 0)     AS
                        sales_cnt,
                        ws_ext_sales_price - COALESCE(wr_return_amt, 0.0) AS
                        sales_amt
                 FROM   tpc_ds.web_sales
                        JOIN tpc_ds.item
                          ON i_item_sk = ws_item_sk
                        JOIN tpc_ds.date_dim
                          ON d_date_sk = ws_sold_date_sk
                        LEFT JOIN tpc_ds.web_returns
                               ON ( ws_order_number = wr_order_number
                                    AND ws_item_sk = wr_item_sk )
                 WHERE  i_category = 'Men') sales_detail
         GROUP  BY d_year,
                   i_brand_id,
                   i_class_id,
                   i_category_id,
                   i_manufact_id)
SELECT prev_yr.d_year                        AS prev_year,
               curr_yr.d_year                        AS year1,
               curr_yr.i_brand_id,
               curr_yr.i_class_id,
               curr_yr.i_category_id,
               curr_yr.i_manufact_id,
               prev_yr.sales_cnt                     AS prev_yr_cnt,
               curr_yr.sales_cnt                     AS curr_yr_cnt,
               curr_yr.sales_cnt - prev_yr.sales_cnt AS sales_cnt_diff,
               curr_yr.sales_amt - prev_yr.sales_amt AS sales_amt_diff
FROM   all_sales curr_yr,
       all_sales prev_yr
WHERE  curr_yr.i_brand_id = prev_yr.i_brand_id
       AND curr_yr.i_class_id = prev_yr.i_class_id
       AND curr_yr.i_category_id = prev_yr.i_category_id
       AND curr_yr.i_manufact_id = prev_yr.i_manufact_id
       AND curr_yr.d_year = 2002
       AND prev_yr.d_year = 2002 - 1
       AND Cast(curr_yr.sales_cnt AS DECIMAL(17, 2)) / Cast(prev_yr.sales_cnt AS
                                                                DECIMAL(17, 2))
           < 0.9
ORDER  BY sales_cnt_diff
LIMIT 100; 

--Query 97
WITH ssci
     AS (SELECT ss_customer_sk customer_sk,
                ss_item_sk     item_sk
         FROM   tpc_ds.store_sales,
                tpc_ds.date_dim
         WHERE  ss_sold_date_sk = d_date_sk
                AND d_month_seq BETWEEN 1196 AND 1196 + 11
         GROUP  BY ss_customer_sk,
                   ss_item_sk),
     csci
     AS (SELECT cs_bill_customer_sk customer_sk,
                cs_item_sk          item_sk
         FROM   tpc_ds.catalog_sales,
                tpc_ds.date_dim
         WHERE  cs_sold_date_sk = d_date_sk
                AND d_month_seq BETWEEN 1196 AND 1196 + 11
         GROUP  BY cs_bill_customer_sk,
                   cs_item_sk)
SELECT Sum(CASE
                     WHEN ssci.customer_sk IS NOT NULL
                          AND csci.customer_sk IS NULL THEN 1
                     ELSE 0
                   END) store_only,
               Sum(CASE
                     WHEN ssci.customer_sk IS NULL
                          AND csci.customer_sk IS NOT NULL THEN 1
                     ELSE 0
                   END) catalog_only,
               Sum(CASE
                     WHEN ssci.customer_sk IS NOT NULL
                          AND csci.customer_sk IS NOT NULL THEN 1
                     ELSE 0
                   END) store_and_catalog
FROM   ssci
       FULL OUTER JOIN csci
                    ON ( ssci.customer_sk = csci.customer_sk
                         AND ssci.item_sk = csci.item_sk )
LIMIT 100;

--Query 99
SELECT Substr(w_warehouse_name, 1, 20), 
               sm_type, 
               cc_name, 
               Sum(CASE 
                     WHEN ( cs_ship_date_sk - cs_sold_date_sk <= 30 ) THEN 1 
                     ELSE 0 
                   END) AS '30 days',
               Sum(CASE 
                     WHEN ( cs_ship_date_sk - cs_sold_date_sk > 30 ) 
                          AND ( cs_ship_date_sk - cs_sold_date_sk <= 60 ) THEN 1 
                     ELSE 0 
                   END) AS '31-60 days',
               Sum(CASE 
                     WHEN ( cs_ship_date_sk - cs_sold_date_sk > 60 ) 
                          AND ( cs_ship_date_sk - cs_sold_date_sk <= 90 ) THEN 1 
                     ELSE 0 
                   END) AS '61-90 days',
               Sum(CASE 
                     WHEN ( cs_ship_date_sk - cs_sold_date_sk > 90 ) 
                          AND ( cs_ship_date_sk - cs_sold_date_sk <= 120 ) THEN 
                     1 
                     ELSE 0 
                   END) AS '91-120 days',
               Sum(CASE 
                     WHEN ( cs_ship_date_sk - cs_sold_date_sk > 120 ) THEN 1 
                     ELSE 0 
                   END) AS '>120 days'
FROM   tpc_ds.catalog_sales,
       tpc_ds.warehouse,
       tpc_ds.ship_mode,
       tpc_ds.call_center,
       tpc_ds.date_dim
WHERE  d_month_seq BETWEEN 1200 AND 1200 + 11 
       AND cs_ship_date_sk = d_date_sk 
       AND cs_warehouse_sk = w_warehouse_sk 
       AND cs_ship_mode_sk = sm_ship_mode_sk 
       AND cs_call_center_sk = cc_call_center_sk 
GROUP  BY Substr(w_warehouse_name, 1, 20), 
          sm_type, 
          cc_name 
ORDER  BY Substr(w_warehouse_name, 1, 20), 
          sm_type, 
          cc_name
LIMIT 100; 

I didn’t set out on this experiment to compare Vertica’s performance to other vendors in the MPP category, so I cannot comment on how it could verse against other RDBMS systems. It would have been interesting to see how, for example, Pivotal’s Greenplum would match up when testing it out on the identical hardware and data volumes but this 4-part series was primarily focused on establishing the connection between the two variables: the number of nodes and data size. From that perspective, Vertica performs very well ‘out-of-the-box’, and without any tuning or tweaking it managed to not only execute the queries relatively fast (given the mediocre hardware specs) but also take full advantage of the extra computing resources to distribute the load across the available nodes and cut queries execution times.

Looking at the results and how they present the dichotomies across the multitude of different configurations, the first thing that jumps out is the fact the differences between performance levels across various data volumes and node counts are very linear. On an average, the query execution speed doubles for every scaling factor increase and goes down by a factor of two for every node that’s removed. There are some slight variations but on the whole Vertica performance is consistent and predictable, and all twenty queries exhibiting similar pattern when dealing with more data and/or more computing power thrown at it (additional nodes).

Looking at the side-by-side comparison of the execution results across single and multi-node configurations the differences between across scaling factors of 100, 200 and 300 are very consistent (click on image to expand).

Converting these differences (single vs multi-node deployment) into percentage yielded an average of 243% increase for the scaling factor of 100, 252% increase for the scaling factor of 200 and 217% increase for the scaling factor of 300 as per the chart below (click on image to expand).

Finally, let’s look at how Vertica (three-node cluster with 100 scaling factor TPC-DS data) performed against some ad hoc generated queries in PowerBI and Tableau applications. Below are two short video clips depicting a rudimentary analysis of store sales data (this fact table contains close to 300 million records) against date, item, store and customer_address dimensions. It’s the type of rapid-fire, exploratory analysis one would conduct to analyse retail data to answer some immediate questions. I also set up Tableau and PowerBI side-by-side a process viewer/system monitor (htop), with four panes displaying core performance metrics e.g. CPU load, swap, memory status etc. across the three nodes and the machine hosting visualisation application (bottom view) so that I could observer (and record) systems’ behaviour when interactively aggregating data. In this way it was easy to see load distribution and how each host reacted (performance-wise) to queries issued based on this analysis. With respect to PowerBI, you can see that my three years old work laptop I recorded this on was not up to the task due to large amount of CPU cycles consumed by screen-recording software alone. On the flip side, Tableau run on my dual-CPU Mac Pro so the whole process was stutter-free and could reflect the fact that given access to a better hardware, PowerBI may also perform better.

I used Tableau Desktop version 2018.1 with native support for Vertica. As opposed to PowerBI, Tableau had no issues reading data from ‘customer’ dimension but in order to make those two footages as comparable as possible I abandoned it and used ‘customer_address’ instead. In terms of raw performance, this mini-cluster actually turned out to be borderline usable and in most cases I didn’t need to wait more than 10 seconds for aggregation or filtering to complete. Each node CPU spiked close to 100 percent during queries execution but given the volume of data and the mediocre hardware this data was crunched on, I would say that Vertica performed quite well.

PowerBI version 2.59 (used in the below footage) included Vertica support in Beta only so chances are the final release will be much better supported and optimised. PowerBI wasn’t too cooperative when selecting the data out of ‘customer’ dimension (see footage below) but everything else worked as expected e.g. geospatial analysis, various graphs etc. Performance-wise, I still think that Tableau was more workable but given the fact that Vertica support was still in preview and my pedestrian-quality laptop I was testing it on, chances are that PowerBI would be a viable choice for such exploratory analysis, especially when factoring in the price – free!

Conclusion

There isn’t much I can add to what I have already stated based on my observations and performance results. Vertica easy set-up and hassle-free, almost plug-in configuration has made it very easy to work with. While I haven’t covered most of its ‘knobs and switches’ that come built-it to fine-tune some of its functionality, the installation process was simple and more intuitive the some of the other commercial RDBMS products out there, even the ones which come with pre-built binaries and executable packages. I only wish Vertica come with the latest Ubuntu Server LTS version support as the time of writing this post Ubuntu Bionic Beaver was the latest LTS release while Vertica 9.1 Ubuntu support only extended to version 14, dating back to July 2016 and supported only until April 2019.

Performance-wise, Vertica exceeded my expectations. Running on hardware which I had no real use for and originally intended to sell on Ebay for a few hundred dollars (if lucky), it managed to crunch through large volumes of data (by relative standards) in a very respectable time. Some of the more processing-intensive queries executed in seconds and scaled well (linearly) across additional nodes and increased data volumes. It would be a treat to compare it against the likes of Greenplum or CitusDB and explore some of its other features e.g. Machine Learning or Hadoop integration (an idea for a future blog) as SQL queries execution speed on structured data isn’t its only forte.

Tags: , , , ,