Vertica MPP Database Overview and TPC-DS Benchmark Performance Analysis (Part 1)

January 5th, 2018 / No Comments » / by admin

Note: Post 2 can be found HERE, Post 3 HERE and Post 4 HERE

Introduction

I have previously covered Massively Parallel Processing (MPP) databases and their preliminary benchmark numbers before e.g. AWS Redshift HERE and Azure SQL DW HERE. Both of those cloud juggernauts are touting their platforms as the logical next step in big data computation and storage evolution, leaving old-guard, legacy vendors such as Teradata and Netezza with a hard choice: shift into cloud or cloud-first operating model or be remembered in the database hall of fame of now defunct and obsolete technologies.

With this hard ultimatum, many vendors are desperately trying to pivot or at least re-align their offering with the even more demanding business expectations and provide a point of differentiation in this competitive market. For example, one of the most prominent MPP vendor, Pivotal, decided to open source its Greenplum platform under the Apache Software 2.0 license. Vertica, another large MPP market player, although still very much a proprietary platform, allows freedom of environment (no cloud lock-in) with an option of running on commodity hardware (just like Greenplum) as well as comprehensive in-database Machine Learning capabilities. Vertica also offers free usage with up to three nodes and 1TB of data and very sophisticated database engine with plenty of bells, knobs and whistles for tuning and control. Doing some digging around what experience others have had with this technology, Vertica’s database engine seems to easily match, if not surpass that of many other providers in MPP market in terms of number of features and the breadth of flexibility it provides. It is said to scale well in terms of number of concurrent users and offers a competitive TCO and cost per query (pricing license cost, storage, compute resources on a VCP e.g. AWS or Azure etc.). As someone on Quora eloquently put it when contrasting it with its biggest competitor, AWS Redshift, ‘Vertica is a magical sword. Redshift is a giant club. It doesn’t take a lot of skill to use that giant club and it can do massive crunching. But I’d rather be the wizard-ninja’ Vertica’s pedigree only seems to be amplified by the fact it was founded by Michael Stonebraker, perhaps the most prominent database pioneer and the recipient of Turing Award aka ‘Nobel prize in computing’, who also founded other database software e.g. Ingres, VoltDB or the ubiquitous PostgreSQL.

Vertica RDBMS is still very much a niche player in terms of the market share, however, given that the likes of Facebook, Etsy, Guess or Uber have entrusted their data in Vertica’s ability to deliver on enterprise analytics, it certainly deserves more attention. With that in mind I would like to outline some of Vertica’s core architectural components and conduct a small performance comparison on the dataset used for TPC-DS benchmarking across one and three nodes. This post will introduce Vertica’s key architectural and design components, with Post 2 focusing on Vertica’s installation and configuration process on a number of commodity machines. Finally, in Post 3 and Post 4 I will run some TPC-DS benchmark queries to test its performance across the maximum number of nodes allowed for the Community Edition. If you’re only interested in the benchmark outcomes you can jump into Post 3 right away.

Vertica Key Architecture Components Primer

Vertica’s appeal, outside features such as ACID transactions, being ANSI-SQL compliant, high availability etc. is manly driven by its database engine optimisation for executing complex analytical queries fast.  Analytic workloads are characterised by large data sets and small transaction volumes (10s-100s per second), but high number of rows per operation. This is very different to typical transactional workloads associated with the majority of legacy RDBMS applications, which can be characterised by a large number of transitions per second, where each transaction involves a handful of tuples. This performance is primarily based off of a number of technologies and design goals which facilitate this capability, most important being columnar storage, compression and MPP scale-out architecture.

Touching on some of those key points above, the following provides a quick overview of these technologies which form an integral part of its architecture.

 

Columnar Data Storage

MPP databases differ from traditional transactional, row-oriented databases e.g. PostgreSQL or MySQL where data is stored and manipulated in rows. Row-based operations work really well in an application context, where the database load consists of relatively large numbers of CRUD-operations (create, read, update, delete). In analytical contexts, where the workload consists of a relatively small number of queries over a small number of columns but large numbers of records, the row based approach is not ideal. Columnar databases have been developed to work around the limitations of row-based databases for analytical purposes. They store data compressed and per column, much like an index in a row-based database reducing disk I/O as only the columns required to answer the query are read.

The following example depicts how a simple query can take advantage of columnar storage. In a typical, row-based RDBMS all rows, in all tables included in the query would need to be read in order to retrieve its results, regardless how wide the rows are or how many columns are required to satisfy the results output. In contrast, Vertica utilises a concept of projections – a columns based structure. Given that we only reference three columns – symbol, date and volume – only these columns need to be read from disk.

Given that majority of legacy RDBMS systems were built for data collection, not data retrieval, the columnar data storage facilitates analytical queries execution speed by avoiding scanning the data not referenced in the query itself.

Vertica also sorts data prior to storage. Queries make use of the sortedness of the data by skipping rows which would be filtered out by predicates (in a manner similar to clustered B-Tree indexes). Sorting can also be used to optimise join and aggregation algorithms.

 

Data Encoding And Compression

Vertica uses sophisticated encoding and compression technologies to optimize query performance and reducing storage footprint. Encoding converts data into a standard format to decrease disk I/O during query execution and reducing storage requirements. Vertica uses a number of different encoding strategies, depending on column data type, table cardinality, and sort order. Different columns in the projection may have different encodings and the same column may have a different encoding in each projection in which it appears. Vertica employs the following encoding types:

  • AUTO – this encoding is ideal for sorted, many-valued columns such as primary keys. It is also suitable for general purpose applications for which no other encoding or compression scheme is applicable. Therefore, it serves as the default if no encoding/compression is specified.
  • BLOCK_DICT – each block of storage, Vertica compiles distinct column values into a dictionary and then stores the dictionary and a list of indexes to represent the data block.BLOCK_DICT is ideal for few-valued, unsorted columns where saving space is more important than encoding speed. Certain kinds of data, such as stock prices, are typically few-valued within a localized area after the data is sorted, such as by stock symbol and timestamp, and are good candidates for BLOCK_DICT. By contrast, long CHAR/VARCHAR columns are not good candidates for BLOCK_DICT encoding. BLOCK_DICT encoding requires significantly higher CPU usage than default encoding schemes. The maximum data expansion is eight percent (8%).
  • BLOCKDICT_COMP – this encoding type is similar to BLOCK_DICT except dictionary indexes are entropy coded. This encoding type requires significantly more CPU time to encode and decode and has a poorer worst-case performance. However, if the distribution of values is extremely skewed, using BLOCK_DICT_COMP encoding can lead to space savings.
  • BZIP_COMP – BZIP_COMP encoding uses the bzip2 compression algorithm on the block contents. This algorithm results in higher compression than the automatic LZO and gzip encoding; however, it requires more CPU time to compress. This algorithm is best used on large string columns such as VARCHAR, VARBINARY, CHAR, and BINARY. Choose this encoding type when you are willing to trade slower load speeds for higher data compression.
  • COMMONDELTA_COMP – This compression scheme builds a dictionary of all deltas in the block and then stores indexes into the delta dictionary using entropy coding.This scheme is ideal for sorted FLOAT and INTEGER-based (DATE/TIME/TIMESTAMP/INTERVAL) data columns with predictable sequences and only occasional sequence breaks, such as timestamps recorded at periodic intervals or primary keys. For example, the following sequence compresses well: 300, 600, 900, 1200, 1500, 600, 1200, 1800, 2400. The following sequence does not compress well: 1, 3, 6, 10, 15, 21, 28, 36, 45, 55.If delta distribution is excellent, columns can be stored in less than one bit per row. However, this scheme is very CPU intensive. If you use this scheme on data with arbitrary deltas, it can cause significant data expansion.
  • DELTARANGE_COMP – This compression scheme is primarily used for floating-point data; it stores each value as a delta from the previous one.This scheme is ideal for many-valued FLOAT columns that are sorted or confined to a range. Do not use this scheme for unsorted columns that contain NULL values, as the storage cost for representing a NULL value is high. This scheme has a high cost for both compression and decompression.To determine if DELTARANGE_COMP is suitable for a particular set of data, compare it to other schemes. Be sure to use the same sort order as the projection, and select sample data that will be stored consecutively in the database.
  • DELTAVAL – For INTEGER and DATE/TIME/TIMESTAMP/INTERVAL columns, data is recorded as a difference from the smallest value in the data block. This encoding has no effect on other data types.
  • GCDDELTA – For INTEGER and DATE/TIME/TIMESTAMP/INTERVAL columns, and NUMERIC columns with 18 or fewer digits, data is recorded as the difference from the smallest value in the data block divided by the greatest common divisor (GCD) of all entries in the block. This encoding has no effect on other data types. GCDDELTA is best used for many-valued, unsorted, integer columns or integer-based columns, when the values are a multiple of a common factor. For example, timestamps are stored internally in microseconds, so data that is only precise to the millisecond are all multiples of 1000. The CPU requirements for decoding GCDDELTA encoding are minimal, and the data never expands, but GCDDELTA may take more encoding time than DELTAVAL.
  • GZIP_COMP – This encodingtype uses the gzip compression algorithm. This algorithm results in better compression than the automatic LZO compression, but lower compression than BZIP_COMP. It requires more CPU time to compress than LZO but less CPU time than BZIP_COMP. This algorithm is best used on large string columns such as VARCHAR, VARBINARY, CHAR, and BINARY. Use this encoding when you want a better compression than LZO, but at less CPU time than bzip2.
  • RLE – RLE (run length encoding) replaces sequences (runs) of identical values with a single pair that contains the value and number of occurrences. Therefore, it is best used for low cardinality columns that are present in the ORDER BY clause of a projection.
  • The Vertica execution engine processes RLE encoding run-by-run and the Vertica optimizer gives it preference. Use it only when run length is large, such as when low-cardinality columns are sorted.The storage for RLE and AUTO encoding of CHAR/VARCHAR and BINARY/VARBINARY is always the same.

Compression, on the other hand, transforms data into a compact format. Vertica uses several different compression methods and automatically chooses the best one for the data being compressed. Using compression, Vertica stores more data, and uses less hardware than other databases.

In one experiment, a text file containing a million random integers between 1 and 10 million and a size of 7.5 MB was compressed using gzip (sorted and unsorted data) and the size compared to its original version as well as Vertica. The unsorted gzip version averaged the compression ratio of 2.1, sorted version was around 3.3, whereas Vertica managed a respectable 12.5.

In another example, Vertica has a customer that collects metrics from some meters. There are 4 columns in the schema: Metric: There are a few hundred metrics collected. Meter: There are a couple of thousand meters. Collection Time Stamp: Each meter spits out metrics every 5 minutes, 10 minutes, hour, etc., depending on the metric. Metric Value: A 64-bit floating point value. A baseline file of 200 million comma separated values (CSV) of the meter/metric/time/value rows takes 6200 MB, for 32 bytes per row. Compressing with gzip reduces this to 1050 MB. By sorting the data on metric, meter, and collection time, Vertica not only optimises common query predicates (which specify the metric or a time range), but exposes great compression opportunities for each column. The total size for all the columns in Vertica is 418MB (slightly over 2 bytes per row). Metric: There aren’t many. With RLE, it is as if there are only a few hundred rows. Vertica compressed this column to 5 KB. Meter: There are quite a few, and there is one record for each meter for each metric. With RLE, Vertica brings this down to a mere 35 MB. Collection Time Stamp: The regular collection intervals present a great compression opportunity. Vertica compressed this column to 20 MB. Metric Value: Some metrics have trends (like lots of 0 values when nothing happens). Others change gradually with time. Some are much more random, and less compressible. However, Vertica compressed the data to only 363MB.

 

MPP Scale-Out And Distributed Queries

Vertica is not just an analytical database; it is a distributed, ‘shared-nothing’ analytical database capable of running on clusters of inexpensive, off-the-shelf servers, Amazon and Azure Cloud servers, and Hadoop. Its performance can not only be tuned with features like resource pools and projections, but it can be scaled simply by adding new servers to the cluster.

Data within a table may be spread across a Vertica cluster either by replicating the data across all nodes or by ‘segmenting’ the data by attribute values using a consistent hashing schema. This allows many classes of joins to be performed without moving the data across the network. Vertica considers CPU, network and storage access costs when optimising query plans, and parallelizes computation based on SQL’s JOIN keys, GROUP BY keys, and PARTITION BY keys.

Clustering speeds up performance by parallelizing querying and loading across the nodes in the cluster for higher throughput.

Clustering also allows the database to maintain RAID-like high availability in case one or more nodes are down and no longer part of the quorum. This provides a robust mechanism to ensure little to no downtime as multiple copies of same data are stored on different nodes.

The traditional method to ensure that a database system can recover from a crash is to use logging and (in the case of a distributed databases), a protocol called two-phase commit. The main idea is to write in a sequential log a log record for each update operation before the operation is actually applied to the tables on the disk. These log records are a redundant copy of the data in the database, and when a crash occurs, they can be replayed to ensure that transactions are atomic – that is, all of the updates of a transaction appear to have occurred, or none of them do. The two-phase commit protocol is then used to ensure that all of the nodes in a distributed database agree that a transaction has successfully committed; it requires several additional log records to be written. Log-based recovery is widely used in other commercial systems, as it provides strong recoverability guarantees at the expense of significant performance and disk space overhead. Vertica has a unique approach to distributed recoverability that avoids these costs. The basic idea is to exploit the distributed nature of a Vertica database. The Vertica DB Designer ensures that every column in every table in the database is stored on at least k+1 machines in the Vertica cluster. We call such a database k-safe, because if k machines crash or otherwise fail, a complete copy of the database is still available. As long as k or fewer machines fail simultaneously, a crashed machine can recover its state by copying data about transactions that committed or aborted while it was crashed from other machines in the system. This approach does not require logging because nodes replicating the data ensure that a recovering machine always has another (correct) copy of the data to compare against, replacing the role of a log in a traditional database. As long as k-safety holds, there is always one machine that knows the correct outcome (commit or abort) of every transaction. In the unlikely event that the system loses k-safety, Vertica brings the database back to a consistent point in time across all nodes. K-safety also means that Vertica is highly available: it can tolerate the simultaneous crash of up to any k machines in a grid without interrupting query processing. The value of k can be configured to provide the desired trade-off between hardware costs and availability guarantees.

It is instructive to contrast Vertica’s high-availability schemes with traditional database systems where high availability is achieved through the use of active standbys – essentially completely unused hardware that has an exact copy of the database and is ready to take over in the event of a primary database failure. Unlike Vertica’s k-safe design employing different sort orders, active standbys simply add to the cost of the database system without improving performance. Because Vertica is k-safe, it supports hot-swapping of nodes. A node can be removed, and the database will continue to process queries (at a lower rate). Conversely, a node can be added, and the database will automatically allocate a collection of objects to that node so that it can begin processing queries, increasing database performance automatically.

 

Projections

Projections store data in a format that optimises query execution. Similar to materialised vies, they store results sets on disk rather than compute them each time they are used in a query. Vertica automatically refreshes these result sets with updated or new data.

A Vertica table typically has multiple projections, each defined to contain different content. Content for the projections of the given table can differ in scope and how it is organised. These differences can generally be divided into the following projection types:

  • Superprojections – a superprojection contains all the column of the table. For each table in the database, Vertica requires a minimum of one superprojection. Under certain conditions, Vertica automatically creates a table’s superprjection immediately on the table creation. Vertica also creates a superprojection when you first load data into that table, if none exists already.
  • Query-Specific Projections – A query-specific projection contains only the subset of table columns to process a given query. Query-specific projections significantly improve the performance of those queries for which they are optimised.
  • Aggregate Projections – Queries which include expressions or aggregate functions such as SUM and COUNT can perform more efficiently when they use projections that already contain the aggregated data. This is especially true for queries on large values of data.

Projections provide the following benefits:

  • Compress and encode data to reduce storage space. Additionally, Vertica operates on the encoded data representation whenever possible to avoid the cost of decoding. This combination of compression and encoding optimises disk space while maximising query performance.
  • Facilitate distribution across the database cluster. Depending on their size, projections can be segmented or replicated across cluster nodes. For instance, projections for large tables can be segmented and distributed across all nodes. Unsegmented projections for small tables can be replicated across all nodes.
  • Transparent to end-users. The Vertica query optimiser automatically picks up the best projections to execute a given query.
  • Provide high-availability and recovery. Vertica duplicates table columns on at least K+1 nodes in the cluster. If one machine fails in a K-Safe environment, the database continues to operate using replicated data on the remaining nodes. When the node resumes normal operation, it automatically queries other nodes to recover data and lost objects.

These architectural differences – column storage, compression, MPP Scale-Out architecture and the ability to distribute a query are what fundamentally enable analytic applications based on Vertica to scale seamlessly and offer many more users access to much more data.

In Part 2 of this series I will dive into installing Vertica on a cluster of inexpensive, desktop-class machines i.e. three Lenovo Tiny units running Ubuntu Server and go over some of the administrative tools e.g. Management Console, metadata queries etc. used to manage it.

Tags: , , , ,

Automating Tableau Workbook Exports Using Python and tabcmd Command Tool

October 27th, 2017 / 2 Comments » / by admin

Tableau comes with a plethora of functionality out of the box but sometimes the following will inevitably be asked/proposed to meet business reporting requirements:

  • End users will ask for features which do not exist (at least until version X is released or comes out of preview)
  • In order to meet those bespoke demands, some level of custom development i.e. ‘hacking the status quo’ will be required to meet those requirements

In case of Tableau automated reports generation, where pagination and more comprehensive controls are required e.g. producing the same report as a series of PDF documents based on report filter value, the software falls short of providing this level of control. However, given the fact that I am not aware of any other application which provides such fine level of functionality (it’s always a fine balance between features selection and usability) and Tableau’s ability to allow users to dive ‘under the hood’ to breech such gaps, for technically savvy analysts it’s fairly easy to accommodate those demands with a little bit reverse-engineering.

Tableau server allows end users to query its internal workgroup database sitting atop of PostgreSQL DBMS. I have previously written about PostgreSQL to Microsoft SQL Server schema and data synchronisation HERE so I will not go into details of how Tableau stores its metadata but it’s fair to say that anyone who’s ever worked with a relational data stores should have no trouble cobbling together a simple query together to acquire the details on workbooks, views, jobs etc. that Tableau database stores and manages.

Tableau server also allows users to administer some tasks through its built-in tabcmd command-line utility e.g. creating and deleting users, exporting views or workbooks, publishing data sources etc. It’s a handy tool which can simplify administrative efforts and in this specific example we will be using it for creating report exports in an automated fashion.

In order to demo this solution on a typical business-like data, let’s create a sample data set and a very rudimentary Tableau report. The following SQL builds a single table database running on a local Microsoft SQL Server platform and populates it with dummy sales data.


/*==============================================================================
STEP 1
Create SampleDB databases on the local instance
==============================================================================*/
USE [master];
GO
IF EXISTS (SELECT name FROM sys.databases WHERE name = N'SampleDB')
BEGIN
    -- Close connections to the StagingDB database
    ALTER DATABASE SampleDB SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
    DROP DATABASE SampleDB;
END;
GO
-- Create SampleDB database and log files
CREATE DATABASE SampleDB
ON PRIMARY
       (
           NAME = N'SampleDB',
           FILENAME = N'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQL2016DEV\MSSQL\DATA\SampleDB.mdf',		   
           SIZE = 10MB,
           MAXSIZE = 1GB,
           FILEGROWTH = 10MB
       )
LOG ON
    (
        NAME = N'SampleDB_log',
        FILENAME = N'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQL2016DEV\MSSQL\DATA\SampleDB_log.LDF',
        SIZE = 1MB,
        MAXSIZE = 1GB,
        FILEGROWTH = 10MB
    );
GO
--Assign database ownership to login SA
EXEC SampleDB.dbo.sp_changedbowner @loginame = N'SA', @map = false;
GO
--Change the recovery model to BULK_LOGGED
ALTER DATABASE SampleDB SET RECOVERY BULK_LOGGED;
GO

/*======================================================================================
STEP 2
Create sample_data table on SampleDB database and insert 10000 dummy records
imitating mocked up sales transations across random dates, postcodes and SKU numbers 
======================================================================================*/
USE SampleDB;
GO
SET NOCOUNT ON;
IF OBJECT_ID('sample_data') IS NOT NULL
BEGIN
    DROP TABLE sample_data;
END;
-- Create target object 
CREATE TABLE sample_data
(
    id INT IDENTITY(1, 1) NOT NULL,
    product_sku VARCHAR(512) NOT NULL,
    quantity INT NOT NULL,
    sales_amount MONEY NOT NULL,
    postcode VARCHAR(4) NOT NULL,
    state VARCHAR(128) NOT NULL,
    order_date DATETIME NOT NULL
);
-- Populate target object with dummy data
DECLARE @postcodestate TABLE
(
    id INT IDENTITY(1, 1),
    postcode VARCHAR(4),
    state VARCHAR(128)
);
INSERT INTO @postcodestate
(
    postcode,
    state
)
SELECT '2580', 'NSW' UNION ALL SELECT '2618', 'NSW' UNION ALL SELECT '2618', 'NSW' UNION ALL SELECT '2581', 'NSW' UNION ALL
SELECT '2582', 'NSW' UNION ALL SELECT '2580', 'NSW' UNION ALL SELECT '2550', 'NSW' UNION ALL SELECT '2550', 'NSW' UNION ALL
SELECT '2450', 'NSW' UNION ALL SELECT '3350', 'VIC' UNION ALL SELECT '3350', 'VIC' UNION ALL SELECT '3212', 'VIC' UNION ALL
SELECT '3215', 'VIC' UNION ALL SELECT '3880', 'VIC' UNION ALL SELECT '3759', 'VIC' UNION ALL SELECT '3037', 'VIC' UNION ALL
SELECT '3631', 'VIC' UNION ALL SELECT '4006', 'QLD' UNION ALL SELECT '4069', 'QLD' UNION ALL SELECT '4171', 'QLD' UNION ALL
SELECT '4852', 'QLD' UNION ALL SELECT '4852', 'QLD' UNION ALL SELECT '4352', 'QLD' UNION ALL SELECT '4701', 'QLD' UNION ALL
SELECT '4218', 'QLD' UNION ALL SELECT '5095', 'SA'	UNION ALL SELECT '5097', 'SA'  UNION ALL SELECT '5573', 'SA'  UNION ALL
SELECT '5700', 'SA'  UNION ALL SELECT '5214', 'SA'	UNION ALL SELECT '6209', 'WA'  UNION ALL SELECT '6054', 'WA'  UNION ALL
SELECT '6068', 'WA'  UNION ALL SELECT '6430', 'WA'  UNION ALL SELECT '6770', 'WA'  UNION ALL SELECT '7054', 'TAS' UNION ALL
SELECT '7253', 'TAS' UNION ALL SELECT '7140', 'TAS' UNION ALL SELECT '7179', 'TAS' UNION ALL SELECT '7109', 'TAS' UNION ALL
SELECT '0870', 'NT'  UNION ALL SELECT '0872', 'NT'  UNION ALL SELECT '0852', 'NT'  UNION ALL SELECT '2617', 'ACT' UNION ALL
SELECT '2617', 'ACT' UNION ALL SELECT '2913', 'ACT' UNION ALL SELECT '2905', 'ACT' UNION ALL SELECT '2903', 'ACT';
DECLARE @allchars VARCHAR(100) = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789';
DECLARE @max_row_number INT;
SET @max_row_number = 10000;
DECLARE @count INT = 1;
WHILE @count <= @max_row_number
BEGIN
    INSERT INTO SampleDB.dbo.sample_data
    (
        product_sku,
        quantity,
        sales_amount,
        postcode,
        state,
        order_date
    )
    SELECT RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35) + 1), 1)
           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35) + 1), 1)
           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35) + 1), 1)
           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35) + 1), 1)
           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35) + 1), 1),
           CAST(RAND() * 5 + 3 AS INT),
           CAST(RAND() * 50 AS INT) / 2.5,
           (
               SELECT TOP 1 postcode FROM @postcodestate ORDER BY NEWID()
           ),
           (
               SELECT TOP 1 state FROM @postcodestate ORDER BY NEWID()
           ),
           GETDATE() - (365 * 3 * RAND() - 365);
    SET @count = @count + 1;
END;

This table will become the basis for the following Tableau dashboard depicting random sales data aggregated by state, postcode, date and product SKU. The sample report looks as per the image below and is located on a Tableau server instance which we will be using to generate the extracts from. You can also notice that the dashboard contains a filter called ‘state’ (as highlighted) with a number of values assigned to it. This filter, along with the filter values representing individual states, will become the basis for producing separate workbook extracts for each of the filter value e.g. NT.pdf, VIC.pdf, NSW.pdf etc.

Next, let’s create a very simple database which will hold necessary metadata used for subsequent reports generation. SQLite is more than adequate for such low volumes of data. The database schema is an amalgamation of Tableau’s own workbook and view tables and other objects defined to store tabcmd options, attributes and filters we’d like to use in this example. The schema is generated and populated by the following Python script. The actual SQL used to define the DDL and DML statements can be found on my OneDrive folder HERE and is required as part of the overall solution.

import configparser
import sqlite3
import os
import sys
import argparse
import psycopg2

config = configparser.ConfigParser()
config.read('params.cfg')

#pg tables selection
pg_tblsNames = ['workbooks', 'views']

#sqlite args
dbsqlite_location = config.get('Sqlite',os.path.normpath('dbsqlite_location'))
dbsqlite_fileName = config.get('Sqlite','dbsqlite_fileName')
dbsqlite_sql = config.get('Sqlite','dbsqlite_sql')

parser = argparse.ArgumentParser(description='Tableau data extraction solution by bicortex.com')

#tableau server args
parser.add_argument('-n','--hostname', help='Tableau postgresql server name', required=True)
parser.add_argument('-d','--dbname', help='Tableau postgresql database name', required=True)
parser.add_argument('-u','--username', help='Tableau postgresql user name', required=True)
parser.add_argument('-p','--passwd', help='Tableau postgresql password', required=True)
args = parser.parse_args()

if not args.hostname or not args.dbname or not args.username or not args.passwd:
    parser.print_help()

def run_DB_built(dbsqlite_sql, dbsqlite_location, dbsqlite_fileName, dbname, username, hostname, passwd, *args):
    with sqlite3.connect(os.path.join(dbsqlite_location, dbsqlite_fileName)) as sqlLiteConn:
        sqlLiteConn.text_factory = lambda x: x.unicode('utf-8', 'ignore')              

        operations=[]
        commands=[]
        sql={}

        #get all SQL operation types as defined by the '----SQL' prefix in tableau_export.sql file
        print('Reading tableau_export.sql file...')
        with open(dbsqlite_sql, 'r') as f:        
            for i in f:
                if i.startswith('----'):
                    i=i.replace('----','')
                    operations.append(i.rstrip('\n'))
        f.close()                

        #get all SQL DML & DDL statements from tableau_export.sql file
        tempCommands=[]
        f = open(dbsqlite_sql, 'r').readlines()
        for i in f:
            tempCommands.append(i)
        l = [i for i, s in enumerate(tempCommands) if '----' in s]
        l.append((len(tempCommands)))    
        for first, second in zip(l, l[1:]):
            commands.append(''.join(tempCommands[first:second]))       
        sql=dict(zip(operations, commands))

        #run database CREATE SQL
        print('Building TableauEX.db database schema... ')
        sqlCommands = sql.get('SQL 1: Create DB').split(';')
        for c in sqlCommands:
            try:
                sqlLiteConn.execute(c)
            except sqlite3.OperationalError as e:
                print (e)
                sqlLiteConn.rollback()
                sys.exit(1)
            else:
                sqlLiteConn.commit()

        #acquire PostgreSQL workgroup database data and populate SQLite database schema with that data  
        print('Acquiring Tableau PostgreSQL data and populating SQLite database for the following tables: {0}...'.format(', '.join(map(str, pg_tblsNames)))) 
        pgConn = "dbname={0} user={1} host={2} password={3} port=8060".format(dbname, username, hostname, passwd)       
        pgConn = psycopg2.connect(pgConn)         
        pgCursor = pgConn.cursor()
        for tbl in pg_tblsNames:
            try:
                tbl_cols={}
                pgCursor.execute("""SELECT ordinal_position, column_name 
                                FROM information_schema.columns 
                                WHERE table_name = '{}' 
                                AND table_schema = 'public'
                                AND column_name != 'index'
                                ORDER BY 1 ASC""".format(tbl)) 
                rows = pgCursor.fetchall()
                for row in rows:                
                    tbl_cols.update({row[0]:row[1]})
                sortd = [tbl_cols[key] for key in sorted(tbl_cols.keys())]
                cols = ",".join(sortd)   
                pgCursor.execute("SELECT {} FROM {}".format(cols,tbl)) 
                rows = pgCursor.fetchall()
                num_columns = max(len(rows[0]) for t in rows)            
                pgsql="INSERT INTO {} ({}) VALUES({})".format(tbl[:-1],cols,",".join('?' * num_columns))
                sqlLiteConn.executemany(pgsql,rows)
            
            except psycopg2.Error as e:
                print(e)
                sys.exit(1)  
            else:               
                sqlLiteConn.commit()

        #update SQLite bridging tables based on DML statements under 'SQL 2: Update DB' opertation type header                  
        print('Updating SQLite database bridging tables...')
        sqlCommands = sql.get('SQL 2: Update DB').split(';')
        for c in sqlCommands:
            try:
                sqlLiteConn.execute(c)
            except sqlite3.OperationalError as e:
                print (e)
                sqlLiteConn.rollback()
                sys.exit(1)
            else:
                sqlLiteConn.commit()
    
    sqlLiteConn.close()
    pgConn.close()    

if __name__ == "__main__":  
    run_DB_built(dbsqlite_sql, dbsqlite_location, dbsqlite_fileName, args.dbname, args.username, args.hostname, args.passwd, pg_tblsNames) 

I have run this code against Tableau version 10.5 so there is a slight chance that some SQL statements will require adjustments depending on Tableau’s PostgreSQL database changes in subsequent releases. The script uses the aforementioned SQL file as well as one configuration file where some of the required parameters are stored as per the image below. Please also note that this SQL file contains data for the filter table comprised of Australian states (corresponding to Tableau workbook filter and its values as indicated above) so your actual data may contain other filter names and values or may even not use them at all. Also, Tableau workgroup database schema does not store filter names and/or values so in order to have them referenced in the script they need to be added manually.

Once executed, the following schema should be built and populated.

Now that we have laid down the foundations for this solution and have the SQLite database created and populated as well as a sample Tableau dashboard deployed on Tableau server pointing to our dummy data stored in SQL Server we can run Tableau reports generation script. For this example, let’s assume that we need to generate a separate PDF copy of the dashboard for each of the states defined in the filter table. The query will look as per the following.

SELECT
  w.name  AS workbook_name,
  v.name  AS view_name,
  f.name  AS filter_name,
  f.value AS filter_value
FROM workbook w
  JOIN view v ON w.id = v.fk_workbook_id
  JOIN view_filter_bridge vfb ON v.id = vfb.fk_view_id
  JOIN filter f ON vfb.fk_filter_id = f.id
WHERE w.name = 'Sample_Tableau_Report'
      AND v.sheettype = 'dashboard'
      AND v.state = 'active';

Entering the same query into a Python script, which than further breaks down the returned attributes and their values into tabcmd command arguments in a looped fashion, produces a series of statements which in turn produce PDF version of the nominated dashboard. Below is the Python script which saves a copy of Sales_Overview_Dashboard report in a PDF format for each state to the specified location.

import configparser
import sqlite3
import os
import sys
import argparse
import subprocess
import pandas as pd

config = configparser.ConfigParser()
config.read('params.cfg')

#sqlite & tabcmd args
dbsqlite_location = config.get('Sqlite',os.path.normpath('dbsqlite_location'))
dbsqlite_fileName = config.get('Sqlite','dbsqlite_fileName')
tabcmd_location = config.get('Tableau_CMD','tableau_cmd_tool_path')
tabcmd_url = config.get('Tableau_CMD','tableau_svr_url')
pdfs_location = config.get('PDFs', 'save_path')

parser = argparse.ArgumentParser(description='Tableau report(s) generation script by bicortex.com')

#tableau server args
parser.add_argument('-u', '--username', help='Tableau server user name', required=True)
parser.add_argument('-p', '--passwd', help='Tableau server password', required=True)
parser.add_argument('-o', '--option', help='Other options and arguments provided by tabcmd', required=False)
args = parser.parse_args()

if not args.username or not args.passwd:
    parser.print_help()

#tableau login function
def tab_login(tabcmd_location, tabcmd_url, username, passwd): 
    try:    
        p=subprocess.run('{0} login -s {1} -u {2} -p {3} -no-certcheck'\
        .format(os.path.join(os.path.normpath(tabcmd_location),'tabcmd'),\
        tabcmd_url, args.username, args.passwd ),shell=True)         
        r=p.returncode
        return r
    except subprocess.SubprocessError as e:
            print(e)
            sys.exit(1)

#tableau logout function
def tab_logout(tabcmd_location):
    try:    
        p=subprocess.run('{0} logout'.format(os.path.join(os.path.normpath(tabcmd_location),'tabcmd')),shell=True)         
    except subprocess.SubprocessError as e:
            print(e)
            sys.exit(1)

#tabcmd report export function
def run_extracts(pdfs_location, tabcmd_location, username=args.username, passwd=args.passwd, option=args.option):
    standard_export_options = '--pdf --pagelayout landscape --no-certcheck --timeout 500'
    login_ok = tab_login(tabcmd_location, tabcmd_url, username, passwd)  
    if login_ok==0:
        with sqlite3.connect(os.path.join(dbsqlite_location, dbsqlite_fileName)) as sqlLiteConn:
            sqliteCursor = sqlLiteConn.cursor()
            sqliteCursor.execute( """
                                    SELECT
                                    w.name                     AS workbook_name,
                                    v.name                     AS view_name,
                                    f.name                     AS filter_name,
                                    f.value                    AS filter_value
                                    FROM workbook w
                                    JOIN view v ON w.id = v.fk_workbook_id
                                    JOIN view_filter_bridge vfb ON v.id = vfb.fk_view_id
                                    JOIN filter f ON vfb.fk_filter_id = f.id                            
                                    WHERE w.name = 'Sample_Tableau_Report'
                                    AND v.sheettype = 'dashboard'
                                    AND v.state = 'active';
                                """)
            result_set = sqliteCursor.fetchall()
            if result_set:                
                df = pd.DataFrame(result_set)
                col_name_list = [tuple[0] for tuple in sqliteCursor.description]
                df.columns = col_name_list
                print('\nThe following attributes and values were returned from the SQL query:')                
                print(df)
                for row in result_set:
                    workbook_name           = row[0]
                    view_name               = row[1]
                    filter_name             = row[2]
                    filter_value            = row[3]

                    if filter_name:
                        if ' ' in row[2]==True:
                            filter_name = row[2].replace(' ', '%20')
                        if ' ' in row[2]==True:
                            filter_value = row[2].replace(' ', '%20')  
                        if not option:
                            option_value = standard_export_options
                            command = '{0} export "{1}?{2}={3}" -f "{4}{5}.pdf" {6} '\
                            .format(os.path.join(os.path.normpath(tabcmd_location),'tabcmd'),\
                            workbook_name + '/' + view_name, filter_name, filter_value, pdfs_location, filter_value, option_value)     
                            try:    
                                p=subprocess.run(command, shell=True)                                 
                            except subprocess.SubprocessError as e:
                                print(e)
                                sys.exit(1)                                        
                    else:  
                        if not option:
                            option_value = standard_export_options
                            command = '{0} export "{1}" -f "{2}{3}.pdf" {4} '\
                            .format(os.path.join(os.path.normpath(tabcmd_location),'tabcmd'),\
                            workbook_name + '/' + view_name, pdfs_location, view_name, option_value)     
                            try:    
                                p=subprocess.run(command, shell=True)                                 
                            except subprocess.SubprocessError as e:
                                print(e)
                                sys.exit(1)
        tab_logout(tabcmd_location)

if __name__ == "__main__":  
    if ' ' in tabcmd_location:
        tabcmd_location = '"'+ os.path.normpath(tabcmd_location) + '"' 
    else:
        tabcmd_location = os.path.normpath(tabcmd_location)    
    run_extracts(pdfs_location, tabcmd_location, args.username, args.passwd)         

This script assumes that tabcmd utility is installed on the local machine if not run from the Tableau server environment (which has it installed by default) and produces the following output when run from the command prompt (click on image to expand).

The following is the folder view of where the individual reports have been saved as well as a partial view of South Australia state report (SA.pdf) opened depicting SA data only, with remaining states filtered out of the context.

Similar scripts can be run for all states i.e. no filter assign. The difference being the SQL statement we pass in the Python script. For example, if we would like to run Sales Overview Dashboard report across all states on a single PDF sheet we could run the following SQL and embed it into the script.

SELECT
  w.name AS workbook_name,
  v.name AS view_name,
  NULL   AS filter_name,
  NULL   AS filter_value
FROM workbook w
  JOIN view v ON w.id = v.fk_workbook_id
WHERE w.name = 'Sample_Tableau_Report'
      AND v.sheettype = 'dashboard'
      AND v.state = 'active';

This technique can be used across a variety of different combinations and permutations of workbooks, sheets, filters, option etc. and is just a small example of how one can generate a simple solution for an automated workbook extracts. For the full scope of tabcmd commands and associated functionality please refer to Tableau online documentation.

Tags: , ,