TPC-DS big data benchmark overview – how to generate and load sample data

January 11th, 2018 / 6 Comments » / by admin

Ever wanted to benchmark your data warehouse and see how it stacks up against the competition? Ever wanted to have a point of reference which would give you industry-applicable and measurable set of metrics you could use to objectively quantify your data warehouse or decision support system’s performance? And most importantly, have you ever considered which platform and vendor will provide you with the most bang for your buck setup?

Well, arguably one of the best way to find out is to simulate a suit of TPC-DS workloads (queries and data maintenance) against your target platform to obtain a representative evaluation of the System Under Test’s (SUT) performance as a general-purpose decision support system. There are a few other data sets and methodologies that can be used to gauge data storage and processing system’s performance e.g. TLC Trip Record Data released by the New York City Taxi & Limousine Commission has gained a lot of traction amongst big data specialists, however,  TPC-approved benchmarks have long been considered as the most objective, vendor-agnostic way to perform data-specific hardware and software comparisons, capturing the complexity of modern business processing to a much greater extent than its predecessors.

In this post I will explore how to generate test data and test queries using dsdgen and dsqgen utilities on a windows machine against the product supplier snowflake-type schema as well as how to load test data into the created database in order to run some or all of the 99 queries TPC-DS defines. I don’t want to go too much into details as TPC already provides a very comprehensive overview of the database and schema it provides, along with detail description of constraints and assumptions. This post is more focused on how to generate the required components used to evaluate the target platform the database used to for TPC-DS data sets and data loading.

In this post I will be referring to TPC-DS  version 2.5.0rc2, the most current version at the time of this post publication. The download link can be found on the consortium website. The key components used for system’s evaluation using TPC-DS tools include the following:

  • Data generator called dsdgen used for data sets creation
  • Query generator called dsqgen used for creating benchmark query sets
  • Three SQL files called tpcds.sql, tpcds_source.sql and tpc_ri.sql which create a sample implementation of the logical schema for the data warehouse

There are other components present in the toolkit, however, for the sake of brevity I will not be discussing data maintained functionality, verification data, alternative query sets or verification queries in this post. Let’s start with dsdgen tool – an executable used to create raw data sets in a form of files. The dsdgen solution supplied as part of the download needs to be complied before the use. For me it was as simple as firing up Microsoft Visual Studio, loading up dbgen2 solution and building the final executable. Once complied, the tool can be controlled from the command line by a combination of switches and environment variables assumed to be single flags preceded by a minus sign, optionally followed by an argument. In its simplest form of execution, it dsdgen can be called from Powershell e.g. I used the following combination of flags to generate the required data (1GB size) in a C:\TPCDSdata\SourceFiles\ folder location and to measure execution time.

Measure-Command {.\dsdgen /SCALE 1 /DIR C:\TPCDSdata\SourceFiles /FORCE /VERBOSE}

The captured output below shows that the execution time on my old Lenovo laptop (i7-4600U @ 2.1GHz, 12GB RAM, SSD HDD & Win10 Pro) was close to 10 minutes for 1GB of data generating sequentially.

The process can be sped up using PARALLEL flag which will engage multiple CPU cores, distributing workload across multiple processes. The VERBOSE parameter is pretty much self-explanatory – progress messages are displayed as the data is generated. FORCE flag overwrites previously created files. Finally, the scale parameter requires a little bit more explanation.

The TPC-DS benchmark defines a set of discrete scaling points – scale factors – based on the approximate size of the raw data produced by dsdgen. The set of scaling factors defined for the TPC-DS is 1TB, 3TB, 10TB, 30TB and 100TB where a terabyte (TB) is defined as 2 to the power of 40 bytes. The required row count for each permissible scale factor and each table in the test database is as follows.

Once the data has been generated, we can create a set of scripts used to simulate reporting/analytical business scenarios using another tool in the TPC-DS arsenal – dsqgen. As with the previous executable, dsqgen runs off the command line. Passing applicable query template (in this case Microsoft SQL Server specific), it creates a single SQL file containing queries used across a multitude of scenarios e.g. reporting queries, ad hoc queries, interactive OLAP queries as well as data mining queries. I used the following command to generate the SQL scripts file.

dsqgen /input .\query_templates\templates.lst /directory .\query_templates /dialect sqlserver /scale 1

Before I get to how the data loading script and schema overview, a quick mention of the final set of SQL scripts, tpcds.sql file, which create a sample implementation of the logical schema for the data warehouse. This file contains the necessary DDLs so running those on the environment of your choice should be executed before generated data is loaded into the data warehouse. Also, since the data/files generated by dsdgen utility come with a *.dat extension, it may be worthwhile to do some cleaning up before the target schema is populated. The following butchered Python script converts all *.dat files into *.csv files and breaks up some of the larger ones into a collection of smaller ones suffixed in an orderly fashion i.e. filename_1, filename_2, filename_3 etc. based on the values set for some of the variables declared at the start. This step is entirely optional but I found that some system utilities used for data loading find it difficult to deal with large files e.g. scaling factor greater than 100GB. Splitting a large file into a collection of smaller ones may be an easy loading speed optimisation technique, especially if the process can be parallelized.

from os import listdir, rename, remove, path, walk
from shutil import copy2, move
from csv import reader, writer
import pyodbc 
import win32file as win
from tqdm import tqdm
import platform as p

srcFilesLocation = path.normpath('C:/TPCDSdata/SourceFiles/') 
srcCSVFilesLocation = path.normpath('C:/TPCDSdata/SourceCSVFiles/') 
tgtFilesLocation = path.normpath('C:/TPCDSdata/TargetFiles/')    
delimiter = '|'
keepSrcFiles = True
keepSrcCSVFiles = True
rowLimit = 2000000
outputNameTemplate = '_%s.csv'
keepHeaders = False
            
def getSize(start_path):
    total_size = 0
    for dirpath, dirnames, filenames in walk(start_path):
        for f in filenames:
            fp = path.join(dirpath, f)
            total_size += path.getsize(fp)            
    return total_size 

def freeSpace(start_path):
    secsPerClus, bytesPerSec, nFreeClus, totClus = win.GetDiskFreeSpace(start_path)
    return secsPerClus * bytesPerSec * nFreeClus

def removeFiles(DirPath):
    filelist = [ f for f in listdir(DirPath)]
    for f in filelist:
        remove(path.join(DirPath, f))

def SplitAndMoveLargeFiles(file, rowCount, srcCSVFilesLocation, outputNameTemplate, keepHeaders=False):
    filehandler = open(path.join(srcCSVFilesLocation, file), 'r')
    csv_reader = reader(filehandler, delimiter=delimiter)
    current_piece = 1
    current_out_path = path.join(tgtFilesLocation, path.splitext(file)[0]+outputNameTemplate  % current_piece)
    current_out_writer = writer(open(current_out_path, 'w', newline=''), delimiter=delimiter)
    current_limit = rowLimit
    if keepHeaders:
        headers = next(csv_reader)
        current_out_writer.writerow(headers)
    pbar=tqdm(total=rowCount)        
    for i, row in enumerate(csv_reader):      
        pbar.update()             
        if i + 1 > current_limit:
            current_piece += 1
            current_limit = rowLimit * current_piece
            current_out_path = path.join(tgtFilesLocation, path.splitext(file)[0]+outputNameTemplate  % current_piece)
            current_out_writer = writer(open(current_out_path, 'w', newline=''), delimiter=delimiter)
            if keepHeaders:
                current_out_writer.writerow(headers)          
        current_out_writer.writerow(row)
    pbar.close()

def SourceFilesRename(srcFilesLocation, srcCSVFilesLocation):
    srcDirSize = getSize(srcFilesLocation)
    diskSize = freeSpace(srcFilesLocation)
    removeFiles(srcCSVFilesLocation)
    for file in listdir(srcFilesLocation):
        if file.endswith(".dat"):
            if keepSrcFiles:
                if srcDirSize >= diskSize:
                    print ('''Not enough space on the nominated disk to duplicate the files (you nominated to keep source files as they were generated i.e. with the .dat extension). 
                    Current source files directory size is {} MB. At least {} MB required to continue. Bailing out...'''
                    .format(round(srcDirSize/1024/1024,2), round(2*srcDirSize/1024/1024),2))
                    exit(1)
                else:                    
                    copy2(path.join(srcFilesLocation , file), srcCSVFilesLocation)
                    rename(path.join(srcCSVFilesLocation , file), path.join(srcCSVFilesLocation , file[:-4]+'.csv'))
            else:
                move(srcFilesLocation + file, srcCSVFilesLocation)
                rename(path.join(srcCVSFilesLocation , file), path.join(srcCSVFilesLocation , file[:-4]+'.csv'))

def ProcessLargeFiles(srcCSVFilesLocation,outputNameTemplate,rowLimit):
    fileRowCounts = []
    for file in listdir(srcCSVFilesLocation):
        if file.endswith(".csv"):
            fileRowCounts.append([file,sum(1 for line in open(path.join(srcCSVFilesLocation , file), newline=''))])               
    removeFiles(tgtFilesLocation)
    for file in fileRowCounts: 
        if file[1]>rowLimit:
            print("Processing File:", file[0],)
            print("Measured Row Count:", file[1])
            SplitAndMoveLargeFiles(file[0], file[1], srcCSVFilesLocation, outputNameTemplate)
        else:
            #print(path.join(srcCSVFilesLocation,file[0]))
            move(path.join(srcCSVFilesLocation, file[0]), tgtFilesLocation)
    removeFiles(srcCSVFilesLocation)            

if __name__ == "__main__":
    SourceFilesRename(srcFilesLocation, srcCSVFilesLocation)
    ProcessLargeFiles(srcCSVFilesLocation, outputNameTemplate, rowLimit)

The following screenshot demonstrates execution output when the file is run in Powershell. Since there were only two tables with the record count greater than the threshold set in the rowLimit variable i.e. 2000000 rows, only those two tables get broken up into smaller chunks.

Once the necessary data files have been created and the data warehouse is prepped for loading we can proceed to load the data. Depending on the volume of data, which in case of TPC-DS directly correlates with the scaling factor used when running dsdgen utility, a number of different methods can be used to populate the schema. Looking at Microsoft SQL Server as a test platform, the most straightforward approach would be to use BCP utility. The following Python script executes bulk copy program utility across many cores/CPUs to load the data created in the previous step (persisted in C:\TPCSdata\TargetFiles\ directory). The script utilises python’s multiprocessing library to make the most out of the resources available.

from multiprocessing import Pool, cpu_count
from os import listdir,path, getpid,system
import argparse

odbcDriver = '{ODBC Driver 13 for SQL Server}'

parser = argparse.ArgumentParser(description='TPC-DS Data Loading Scriptby bicortex.com')
parser.add_argument('-S','--svrinstance', help='Server and Instance Name of the MSSQL installation', required=True)
parser.add_argument('-D','--db', help='Database Name', required=True)
parser.add_argument('-U','--username', help='User Name', required=True)
parser.add_argument('-P','--password', help='Autenticating Password', required=True)
parser.add_argument('-L','--filespath', help='UNC path where the files are located', required=True)
args = parser.parse_args()

if not args.svrinstance or not args.db or not args.username or not args.password or not args.filespath:
    parser.print_help()
    exit(1)

def loadFiles(tableName, fileName):    
    print("Loading from CPU: %s" % getpid())
    fullPath = path.join(args.filespath, fileName)
    bcp = 'bcp %s in %s -S %s -d %s -U %s -P %s -q -c -t "|" -r"|\\n"' % (tableName, fullPath, args.svrinstance, args.db, args.username, args.password)
    #bcp store_sales in c:\TPCDSdata\TargetFiles\store_sales_1.csv -S <servername>\<instancename> -d <dbname> -U <username> -P <password> -q -c -t  "|" -r "|\n"  
    print(bcp)
    system(bcp)
    print("Done loading data from CPU: %s" % getpid())

if __name__ == "__main__":    
    p = Pool(processes = 2*cpu_count())
    for file in listdir(args.filespath):
        if file.endswith(".csv"):
            tableName = ''.join([i for i in path.splitext(file)[0] if not i.isdigit()]).rstrip('_')
            p.apply_async(loadFiles, [tableName, file])
    p.close()
    p.join()

The following screenshot (you can also see data load sample execution video HERE) depicts the Python script loading 10GB of data, with some files generated by dsdgen utility further split up into smaller chunks by the the split_files.py script. The data was loaded into a Microsoft SQL Server 2016 Enterprise instance running on Azure virtual machine with 20 virtual cores (Intel E5-2673 v3 @ 2.40GHz) and 140GB of memory. Apart from the fact that it’s very satisfying to watch 20 cores being put to work simultaneously, spawning multiple BCP instances significantly reduces data load times, even when accounting for locking and contention.

I have not bothered to test the actual load times as SQL Server has a number of knobs and switches which can be used to adjust and optimise system’s performance. For example, in-memory optimised tables, introduced in SQL Server 2014, store their data in memory using multiple versions of each row’s data. This technique is characterised as ‘non-blocking multi-version optimistic concurrency control’ and eliminates both locks and latches so a separate post may be in order to do justice to this feature and conduct a full performance analysis in a fair and unbiased manner.

So there you go – a fairly rudimentary rundown of some of the features of TPC-DS benchmark. For more detailed overview of TPC-DS please refer to the TPC website and their online resources HERE.

Tags: , , , , ,

Vertica MPP Database Overview and TPC-DS Benchmark Performance Analysis (Part 1)

January 5th, 2018 / No Comments » / by admin

Note: Post 2 can be found HERE, Post 3 HERE and Post 4 HERE

Introduction

I have previously covered Massively Parallel Processing (MPP) databases and their preliminary benchmark numbers before e.g. AWS Redshift HERE and Azure SQL DW HERE. Both of those cloud juggernauts are touting their platforms as the logical next step in big data computation and storage evolution, leaving old-guard, legacy vendors such as Teradata and Netezza with a hard choice: shift into cloud or cloud-first operating model or be remembered in the database hall of fame of now defunct and obsolete technologies.

With this hard ultimatum, many vendors are desperately trying to pivot or at least re-align their offering with the even more demanding business expectations and provide a point of differentiation in this competitive market. For example, one of the most prominent MPP vendor, Pivotal, decided to open source its Greenplum platform under the Apache Software 2.0 license. Vertica, another large MPP market player, although still very much a proprietary platform, allows freedom of environment (no cloud lock-in) with an option of running on commodity hardware (just like Greenplum) as well as comprehensive in-database Machine Learning capabilities. Vertica also offers free usage with up to three nodes and 1TB of data and very sophisticated database engine with plenty of bells, knobs and whistles for tuning and control. Doing some digging around what experience others have had with this technology, Vertica’s database engine seems to easily match, if not surpass that of many other providers in MPP market in terms of number of features and the breadth of flexibility it provides. It is said to scale well in terms of number of concurrent users and offers a competitive TCO and cost per query (pricing license cost, storage, compute resources on a VCP e.g. AWS or Azure etc.). As someone on Quora eloquently put it when contrasting it with its biggest competitor, AWS Redshift, ‘Vertica is a magical sword. Redshift is a giant club. It doesn’t take a lot of skill to use that giant club and it can do massive crunching. But I’d rather be the wizard-ninja’ Vertica’s pedigree only seems to be amplified by the fact it was founded by Michael Stonebraker, perhaps the most prominent database pioneer and the recipient of Turing Award aka ‘Nobel prize in computing’, who also founded other database software e.g. Ingres, VoltDB or the ubiquitous PostgreSQL.

Vertica RDBMS is still very much a niche player in terms of the market share, however, given that the likes of Facebook, Etsy, Guess or Uber have entrusted their data in Vertica’s ability to deliver on enterprise analytics, it certainly deserves more attention. With that in mind I would like to outline some of Vertica’s core architectural components and conduct a small performance comparison on the dataset used for TPC-DS benchmarking across one and three nodes. This post will introduce Vertica’s key architectural and design components, with Post 2 focusing on Vertica’s installation and configuration process on a number of commodity machines. Finally, in Post 3 and Post 4 I will run some TPC-DS benchmark queries to test its performance across the maximum number of nodes allowed for the Community Edition. If you’re only interested in the benchmark outcomes you can jump into Post 3 right away.

Vertica Key Architecture Components Primer

Vertica’s appeal, outside features such as ACID transactions, being ANSI-SQL compliant, high availability etc. is manly driven by its database engine optimisation for executing complex analytical queries fast.  Analytic workloads are characterised by large data sets and small transaction volumes (10s-100s per second), but high number of rows per operation. This is very different to typical transactional workloads associated with the majority of legacy RDBMS applications, which can be characterised by a large number of transitions per second, where each transaction involves a handful of tuples. This performance is primarily based off of a number of technologies and design goals which facilitate this capability, most important being columnar storage, compression and MPP scale-out architecture.

Touching on some of those key points above, the following provides a quick overview of these technologies which form an integral part of its architecture.

 

Columnar Data Storage

MPP databases differ from traditional transactional, row-oriented databases e.g. PostgreSQL or MySQL where data is stored and manipulated in rows. Row-based operations work really well in an application context, where the database load consists of relatively large numbers of CRUD-operations (create, read, update, delete). In analytical contexts, where the workload consists of a relatively small number of queries over a small number of columns but large numbers of records, the row based approach is not ideal. Columnar databases have been developed to work around the limitations of row-based databases for analytical purposes. They store data compressed and per column, much like an index in a row-based database reducing disk I/O as only the columns required to answer the query are read.

The following example depicts how a simple query can take advantage of columnar storage. In a typical, row-based RDBMS all rows, in all tables included in the query would need to be read in order to retrieve its results, regardless how wide the rows are or how many columns are required to satisfy the results output. In contrast, Vertica utilises a concept of projections – a columns based structure. Given that we only reference three columns – symbol, date and volume – only these columns need to be read from disk.

Given that majority of legacy RDBMS systems were built for data collection, not data retrieval, the columnar data storage facilitates analytical queries execution speed by avoiding scanning the data not referenced in the query itself.

Vertica also sorts data prior to storage. Queries make use of the sortedness of the data by skipping rows which would be filtered out by predicates (in a manner similar to clustered B-Tree indexes). Sorting can also be used to optimise join and aggregation algorithms.

 

Data Encoding And Compression

Vertica uses sophisticated encoding and compression technologies to optimize query performance and reducing storage footprint. Encoding converts data into a standard format to decrease disk I/O during query execution and reducing storage requirements. Vertica uses a number of different encoding strategies, depending on column data type, table cardinality, and sort order. Different columns in the projection may have different encodings and the same column may have a different encoding in each projection in which it appears. Vertica employs the following encoding types:

  • AUTO – this encoding is ideal for sorted, many-valued columns such as primary keys. It is also suitable for general purpose applications for which no other encoding or compression scheme is applicable. Therefore, it serves as the default if no encoding/compression is specified.
  • BLOCK_DICT – each block of storage, Vertica compiles distinct column values into a dictionary and then stores the dictionary and a list of indexes to represent the data block.BLOCK_DICT is ideal for few-valued, unsorted columns where saving space is more important than encoding speed. Certain kinds of data, such as stock prices, are typically few-valued within a localized area after the data is sorted, such as by stock symbol and timestamp, and are good candidates for BLOCK_DICT. By contrast, long CHAR/VARCHAR columns are not good candidates for BLOCK_DICT encoding. BLOCK_DICT encoding requires significantly higher CPU usage than default encoding schemes. The maximum data expansion is eight percent (8%).
  • BLOCKDICT_COMP – this encoding type is similar to BLOCK_DICT except dictionary indexes are entropy coded. This encoding type requires significantly more CPU time to encode and decode and has a poorer worst-case performance. However, if the distribution of values is extremely skewed, using BLOCK_DICT_COMP encoding can lead to space savings.
  • BZIP_COMP – BZIP_COMP encoding uses the bzip2 compression algorithm on the block contents. This algorithm results in higher compression than the automatic LZO and gzip encoding; however, it requires more CPU time to compress. This algorithm is best used on large string columns such as VARCHAR, VARBINARY, CHAR, and BINARY. Choose this encoding type when you are willing to trade slower load speeds for higher data compression.
  • COMMONDELTA_COMP – This compression scheme builds a dictionary of all deltas in the block and then stores indexes into the delta dictionary using entropy coding.This scheme is ideal for sorted FLOAT and INTEGER-based (DATE/TIME/TIMESTAMP/INTERVAL) data columns with predictable sequences and only occasional sequence breaks, such as timestamps recorded at periodic intervals or primary keys. For example, the following sequence compresses well: 300, 600, 900, 1200, 1500, 600, 1200, 1800, 2400. The following sequence does not compress well: 1, 3, 6, 10, 15, 21, 28, 36, 45, 55.If delta distribution is excellent, columns can be stored in less than one bit per row. However, this scheme is very CPU intensive. If you use this scheme on data with arbitrary deltas, it can cause significant data expansion.
  • DELTARANGE_COMP – This compression scheme is primarily used for floating-point data; it stores each value as a delta from the previous one.This scheme is ideal for many-valued FLOAT columns that are sorted or confined to a range. Do not use this scheme for unsorted columns that contain NULL values, as the storage cost for representing a NULL value is high. This scheme has a high cost for both compression and decompression.To determine if DELTARANGE_COMP is suitable for a particular set of data, compare it to other schemes. Be sure to use the same sort order as the projection, and select sample data that will be stored consecutively in the database.
  • DELTAVAL – For INTEGER and DATE/TIME/TIMESTAMP/INTERVAL columns, data is recorded as a difference from the smallest value in the data block. This encoding has no effect on other data types.
  • GCDDELTA – For INTEGER and DATE/TIME/TIMESTAMP/INTERVAL columns, and NUMERIC columns with 18 or fewer digits, data is recorded as the difference from the smallest value in the data block divided by the greatest common divisor (GCD) of all entries in the block. This encoding has no effect on other data types. GCDDELTA is best used for many-valued, unsorted, integer columns or integer-based columns, when the values are a multiple of a common factor. For example, timestamps are stored internally in microseconds, so data that is only precise to the millisecond are all multiples of 1000. The CPU requirements for decoding GCDDELTA encoding are minimal, and the data never expands, but GCDDELTA may take more encoding time than DELTAVAL.
  • GZIP_COMP – This encodingtype uses the gzip compression algorithm. This algorithm results in better compression than the automatic LZO compression, but lower compression than BZIP_COMP. It requires more CPU time to compress than LZO but less CPU time than BZIP_COMP. This algorithm is best used on large string columns such as VARCHAR, VARBINARY, CHAR, and BINARY. Use this encoding when you want a better compression than LZO, but at less CPU time than bzip2.
  • RLE – RLE (run length encoding) replaces sequences (runs) of identical values with a single pair that contains the value and number of occurrences. Therefore, it is best used for low cardinality columns that are present in the ORDER BY clause of a projection.
  • The Vertica execution engine processes RLE encoding run-by-run and the Vertica optimizer gives it preference. Use it only when run length is large, such as when low-cardinality columns are sorted.The storage for RLE and AUTO encoding of CHAR/VARCHAR and BINARY/VARBINARY is always the same.

Compression, on the other hand, transforms data into a compact format. Vertica uses several different compression methods and automatically chooses the best one for the data being compressed. Using compression, Vertica stores more data, and uses less hardware than other databases.

In one experiment, a text file containing a million random integers between 1 and 10 million and a size of 7.5 MB was compressed using gzip (sorted and unsorted data) and the size compared to its original version as well as Vertica. The unsorted gzip version averaged the compression ratio of 2.1, sorted version was around 3.3, whereas Vertica managed a respectable 12.5.

In another example, Vertica has a customer that collects metrics from some meters. There are 4 columns in the schema: Metric: There are a few hundred metrics collected. Meter: There are a couple of thousand meters. Collection Time Stamp: Each meter spits out metrics every 5 minutes, 10 minutes, hour, etc., depending on the metric. Metric Value: A 64-bit floating point value. A baseline file of 200 million comma separated values (CSV) of the meter/metric/time/value rows takes 6200 MB, for 32 bytes per row. Compressing with gzip reduces this to 1050 MB. By sorting the data on metric, meter, and collection time, Vertica not only optimises common query predicates (which specify the metric or a time range), but exposes great compression opportunities for each column. The total size for all the columns in Vertica is 418MB (slightly over 2 bytes per row). Metric: There aren’t many. With RLE, it is as if there are only a few hundred rows. Vertica compressed this column to 5 KB. Meter: There are quite a few, and there is one record for each meter for each metric. With RLE, Vertica brings this down to a mere 35 MB. Collection Time Stamp: The regular collection intervals present a great compression opportunity. Vertica compressed this column to 20 MB. Metric Value: Some metrics have trends (like lots of 0 values when nothing happens). Others change gradually with time. Some are much more random, and less compressible. However, Vertica compressed the data to only 363MB.

 

MPP Scale-Out And Distributed Queries

Vertica is not just an analytical database; it is a distributed, ‘shared-nothing’ analytical database capable of running on clusters of inexpensive, off-the-shelf servers, Amazon and Azure Cloud servers, and Hadoop. Its performance can not only be tuned with features like resource pools and projections, but it can be scaled simply by adding new servers to the cluster.

Data within a table may be spread across a Vertica cluster either by replicating the data across all nodes or by ‘segmenting’ the data by attribute values using a consistent hashing schema. This allows many classes of joins to be performed without moving the data across the network. Vertica considers CPU, network and storage access costs when optimising query plans, and parallelizes computation based on SQL’s JOIN keys, GROUP BY keys, and PARTITION BY keys.

Clustering speeds up performance by parallelizing querying and loading across the nodes in the cluster for higher throughput.

Clustering also allows the database to maintain RAID-like high availability in case one or more nodes are down and no longer part of the quorum. This provides a robust mechanism to ensure little to no downtime as multiple copies of same data are stored on different nodes.

The traditional method to ensure that a database system can recover from a crash is to use logging and (in the case of a distributed databases), a protocol called two-phase commit. The main idea is to write in a sequential log a log record for each update operation before the operation is actually applied to the tables on the disk. These log records are a redundant copy of the data in the database, and when a crash occurs, they can be replayed to ensure that transactions are atomic – that is, all of the updates of a transaction appear to have occurred, or none of them do. The two-phase commit protocol is then used to ensure that all of the nodes in a distributed database agree that a transaction has successfully committed; it requires several additional log records to be written. Log-based recovery is widely used in other commercial systems, as it provides strong recoverability guarantees at the expense of significant performance and disk space overhead. Vertica has a unique approach to distributed recoverability that avoids these costs. The basic idea is to exploit the distributed nature of a Vertica database. The Vertica DB Designer ensures that every column in every table in the database is stored on at least k+1 machines in the Vertica cluster. We call such a database k-safe, because if k machines crash or otherwise fail, a complete copy of the database is still available. As long as k or fewer machines fail simultaneously, a crashed machine can recover its state by copying data about transactions that committed or aborted while it was crashed from other machines in the system. This approach does not require logging because nodes replicating the data ensure that a recovering machine always has another (correct) copy of the data to compare against, replacing the role of a log in a traditional database. As long as k-safety holds, there is always one machine that knows the correct outcome (commit or abort) of every transaction. In the unlikely event that the system loses k-safety, Vertica brings the database back to a consistent point in time across all nodes. K-safety also means that Vertica is highly available: it can tolerate the simultaneous crash of up to any k machines in a grid without interrupting query processing. The value of k can be configured to provide the desired trade-off between hardware costs and availability guarantees.

It is instructive to contrast Vertica’s high-availability schemes with traditional database systems where high availability is achieved through the use of active standbys – essentially completely unused hardware that has an exact copy of the database and is ready to take over in the event of a primary database failure. Unlike Vertica’s k-safe design employing different sort orders, active standbys simply add to the cost of the database system without improving performance. Because Vertica is k-safe, it supports hot-swapping of nodes. A node can be removed, and the database will continue to process queries (at a lower rate). Conversely, a node can be added, and the database will automatically allocate a collection of objects to that node so that it can begin processing queries, increasing database performance automatically.

 

Projections

Projections store data in a format that optimises query execution. Similar to materialised vies, they store results sets on disk rather than compute them each time they are used in a query. Vertica automatically refreshes these result sets with updated or new data.

A Vertica table typically has multiple projections, each defined to contain different content. Content for the projections of the given table can differ in scope and how it is organised. These differences can generally be divided into the following projection types:

  • Superprojections – a superprojection contains all the column of the table. For each table in the database, Vertica requires a minimum of one superprojection. Under certain conditions, Vertica automatically creates a table’s superprjection immediately on the table creation. Vertica also creates a superprojection when you first load data into that table, if none exists already.
  • Query-Specific Projections – A query-specific projection contains only the subset of table columns to process a given query. Query-specific projections significantly improve the performance of those queries for which they are optimised.
  • Aggregate Projections – Queries which include expressions or aggregate functions such as SUM and COUNT can perform more efficiently when they use projections that already contain the aggregated data. This is especially true for queries on large values of data.

Projections provide the following benefits:

  • Compress and encode data to reduce storage space. Additionally, Vertica operates on the encoded data representation whenever possible to avoid the cost of decoding. This combination of compression and encoding optimises disk space while maximising query performance.
  • Facilitate distribution across the database cluster. Depending on their size, projections can be segmented or replicated across cluster nodes. For instance, projections for large tables can be segmented and distributed across all nodes. Unsegmented projections for small tables can be replicated across all nodes.
  • Transparent to end-users. The Vertica query optimiser automatically picks up the best projections to execute a given query.
  • Provide high-availability and recovery. Vertica duplicates table columns on at least K+1 nodes in the cluster. If one machine fails in a K-Safe environment, the database continues to operate using replicated data on the remaining nodes. When the node resumes normal operation, it automatically queries other nodes to recover data and lost objects.

These architectural differences – column storage, compression, MPP Scale-Out architecture and the ability to distribute a query are what fundamentally enable analytic applications based on Vertica to scale seamlessly and offer many more users access to much more data.

In Part 2 of this series I will dive into installing Vertica on a cluster of inexpensive, desktop-class machines i.e. three Lenovo Tiny units running Ubuntu Server and go over some of the administrative tools e.g. Management Console, metadata queries etc. used to manage it.

Tags: , , , ,