Kicking the tires on BigQuery – Google’s Serverless Enterprise Data Warehouse (Part 2)

March 1st, 2019 / 2 Comments » / by admin

Note: Part 1 can be found HERE.

TPC-DS Benchmark Continued…

In the previous post I outlined some basic principles around Google’s BigQuery architecture as well as a programmatic way to interact with TPC-DS benchmark data set (files prep, staging data on Google cloud storage bucket, creating datasets and tables schema, data loading etc). In this post I will go through a few analytical queries defined as part of the comprehensive, 99-queries-based TPC-DS standard and their execution patterns (similar to the blog post I wrote on spinning up a small Vertica cluster HERE) and look at how BigQuery integrates with the likes of Tableau and PowerBI, most likely the top two applications used nowadays for rapid data analysis.

Firstly, let’s see how BigQuery handles some of the TPC-DS queries running across 100GB and 300GB dataset comprised of 24 tables and over 2 billion rows collectively (300GB dataset). As with the previous blog posts depicting TPC-DS queries execution on a Vertica platform HERE, the following image depicts row numbers and data size distribution for this exercise.

For this exercise, I run a small Python script executing the following queries from the standard TPC-DS benchmark suite: 5, 10, 15, 19, 24, 27, 33, 36, 42, 45, 48, 54, 58, 60, 65, 68, 73, 78, 83, 85, 88, 93, 96, 99. The script sequentially executes TPC-DS SQL commands stored in an external file reporting execution times each time a query is run. All scripts used for this project can be found in my OneDrive folder HERE.

#!/usr/bin/python
import configparser
import sys
from os import path, remove
from time import time
from google.cloud import bigquery

config = configparser.ConfigParser()
config.read("params.cfg")

bq_tpc_ds_sql_statements_file_path = config.get(
    "Files_Path", path.normpath("bq_tpc_ds_sql_statements_file_path"))
bq_client = config.get("Big_Query", path.normpath("bq_client"))
bq_client = bigquery.Client.from_service_account_json(bq_client)


def get_sql(bq_tpc_ds_sql_statements_file_path):
    """
    Source operation types from the tpc_ds_sql_queries.sql SQL file.
    Each operation is denoted by the use of four dash characters 
    and a corresponding query number and store them in a dictionary 
    (referenced in the main() function). 
    """
    query_number = []
    query_sql = []

    with open(bq_tpc_ds_sql_statements_file_path, "r") as f:
        for i in f:
            if i.startswith("----"):
                i = i.replace("----", "")
                query_number.append(i.rstrip("\n"))

    temp_query_sql = []
    with open(bq_tpc_ds_sql_statements_file_path, "r") as f:
        for i in f:
            temp_query_sql.append(i)
        l = [i for i, s in enumerate(temp_query_sql) if "----" in s]
        l.append((len(temp_query_sql)))
        for first, second in zip(l, l[1:]):
            query_sql.append("".join(temp_query_sql[first:second]))
    sql = dict(zip(query_number, query_sql))
    return sql


def run_sql(query_sql, query_number):
    """
    Execute SQL query as per the SQL text stored in the tpc_ds_sql_queries.sql
    file by it's number e.g. 10, 24 etc. and report returned row number and 
    query processing time
    """
    query_start_time = time()
    job_config = bigquery.QueryJobConfig()
    job_config.use_query_cache = False
    query_job = bq_client.query(query_sql, job_config=job_config)
    rows = query_job.result()
    rows_count = sum(1 for row in rows)
    query_end_time = time()
    query_duration = query_end_time - query_start_time
    print("Query {q_number} executed in {time} seconds...{ct} rows returned.".format(
        q_number=query_number, time=format(query_duration, '.2f'), ct=rows_count))


def main(param, sql):
    """
    Depending on the argv value i.e. query number or 'all' value, execute 
    sql queries by calling run_sql() function
    """
    if param == "all":
        for key, val in sql.items():
            query_sql = val
            query_number = key.replace("Query", "")
            run_sql(query_sql, query_number)
    else:
        query_sql = sql.get("Query"+str(param))
        query_number = param
        run_sql(query_sql, query_number)


if __name__ == "__main__":
    if len(sys.argv[1:]) == 1:
        sql = get_sql(bq_tpc_ds_sql_statements_file_path)
        param = sys.argv[1]
        query_numbers = [q.replace("Query", "") for q in sql]
        query_numbers.append("all")
        if param not in query_numbers:
            raise ValueError("Incorrect argument given. Looking for <all> or <query number>. Specify <all> argument or choose from the following numbers:\n {q}".format(
                q=', '.join(query_numbers[:-1])))
        else:
            param = sys.argv[1]
            main(param, sql)
    else:
        raise ValueError(
            "Too many arguments given. Looking for 'all' or <query number>.")

The following charts depicts queries performance based on 100GB and 300GB datasets with BigQuery cache engaged and disengaged. As BigQuery, in most cases, writes all query results (including both interactive and batch queries) to a table for 24 hours, it is possible for BigQuery to retrieve the previously computed results from cache thus speeding up the overall execution time.

While these results and test cases are probably not truly representative of the ‘big data’ workloads, they provide a quick snapshot of how traditional analytical workloads would perform on the BigQuery platform. Scanning large tables with hundreds of millions of rows seemed to perform quite well (especially cached version) and performance differences between 100GB and 300GB datasets were consistent with the increase in data volumes. As BigQuery does not provide many ‘knobs and switches’ to tune queries based on their execution pattern (part of its no-ops appeal), there is very little room for improving its performance beyond avoiding certain anti-patterns. The ‘execution details’ pane provides some diagnostic insight into the query plan and timing information however, many optimizations happen automatically, which may differ from other environments where tuning, provisioning, and monitoring may require dedicated, knowledgeable staff. I believe that for similar data volumes, much faster runtimes are possible if run on a different OLAP database from a different vendor (for carefully tuned and optimised queries), however, this will imply substantial investment in the upfront infrastructure setup and specialised resources, which for a lot of companies will negate the added benefits and possibly impair time-to-market competitive advantage.

Looking at some ad hoc queries execution through Tableau, the first thing I needed to do was to create a custom SQL to slice and dice TPC-DS data as Tableau ‘conveniently’ hides attributes with NUMERIC data type (unsupported at this time in Tableau). Casting those to FLOAT64 data type fixed the issue and I was able to proceed with building a sample dashboard using the following query.

SELECT CAST(store_sales.ss_sales_price AS FLOAT64) AS ss_sales_price,
       CAST(store_sales.ss_net_paid AS FLOAT64)    AS ss_net_paid,
       store_sales.ss_quantity,
       date_dim.d_fy_quarter_seq,
       date_dim.d_holiday,
       item.i_color,
       store.s_city,
       customer_address.ca_state
FROM tpc_ds_test_data.store_sales
         JOIN tpc_ds_test_data.customer_address ON
    store_sales.ss_addr_sk = customer_address.ca_address_sk
         JOIN tpc_ds_test_data.date_dim ON
    store_sales.ss_sold_date_sk = date_dim.d_date_sk
         JOIN tpc_ds_test_data.item ON
    store_sales.ss_item_sk = item.i_item_sk
         JOIN tpc_ds_test_data.store ON
    store_sales.ss_store_sk = store.s_store_sk

Running a few analytical queries across the 300GB TPC-DS dataset with all measures derived from its largest fact table (store_sales) BigQuery performed quite well, with majority of the queries taking less than 5 seconds to complete. As all queries executed multiple times are cached, I found BigQuery response even faster (sub-second level response) when used with the same measures, dimensions and calculations repetitively. However, using this engine for an ad-hoc exploratory analysis in a live/direct access mode can become very expensive, very quickly. The few queries run (73 to be exact) as part of this demo (see footage below) cost me over 30 Australia dollars on this 300GB dataset. I can only imagine how much it would cost to analyse much bigger dataset in this manner, so unless analysis like this can be performed on an extracted dataset, I find it hard to recommend as a rapid-fire, exploratory database engine for an impromptu data analysis.

BigQuery Machine Leaning (BQML)

As more and more data warehousing vendors jump on the bandwagon of bringing data science capability closer to the data, Google is also trying to cater for analysts who predominantly operate in the world of SQL by providing a simplified interface for a machine learning models creation. This democratizes the use of ML by empowering data analysts, the primary data warehouse users, to build and run models using existing business intelligence tools and spreadsheets. It also increases the speed of model development and innovation by removing the need to export data from the data warehouse.

BigQuery ML is a series of SQL extensions that allow data scientists to build and deploy machine learning models that use data stored in the BigQuery platform, obfuscating many of the painful and highly mathematical aspects of machine learning methods into simple SQL statements. BigQuery ML is a part of a much larger ecosystem of Machine Learning tools available on Google Cloud.

The current release (as of Dec. 2018) only supports two types of machine learning models.

  • Linear regression – these models can be used for predicting a numerical value representing a relation variables e.g. between customer waiting (queue) time and customer satisfaction.
  • Binary logistic regression – these models can be used for predicting one of two classes (such as identifying whether an email is spam).

Developers can create a model by using the CREATE or CREATE OR REPLACE MODEL or CREATE MODEL IF NOT EXISTS syntax (presently, the size of the model cannot exceed 90 MB). A number of parameters can be passed (some optional), dictating model type, name of label column, regularization technique etc.

Looking at a concrete example, I downloaded the venerable and widely used by many aspiring data scientists Titanic Kaggle dataset. It contains training data for 891 passengers and provides information on the fate of passengers on the Titanic, summarized according to economic status (class), sex, age etc. and is the go-to dataset for predicting passenger survivability based on those features. Once the data is loaded into two separate tables – test_data and train_data – creating a simple model is as easy as creating a database view. We can perform binary classification and create a sample logistic regression model with the following syntax.

CREATE MODEL 
  `bq_ml.test_model`
OPTIONS 
  (model_type = 'logistic_reg',
  input_label_cols = ['Survived'],
  max_iterations=10) AS
SELECT 
  * 
FROM 
  bq_ml.train_data

Once the model has been created we can view its details and training options in BigQuery as per the image below.

Now that we have a classifier model created on our data we can evaluate it using ML.EVALUATE function.

SELECT 
  * 
FROM 
  ML.EVALUATE (MODEL `bq_ml.test_model`, 
    (
  SELECT 
    * 
  FROM 
    `bq_ml.train_data`), 
    STRUCT (0.55 AS threshold))

This produces a single row of evaluation parameters e.g. accuracy, recall, precision etc. which allow us to determine model performance.

We can also use the ML.ROC_CURVE function to evaluate logistic regression models, but ML.ROC_CURVE is not supported for multiclass models. Also, notice the optional threshold parameter which is a custom threshold for this logistic regression model to be used for evaluation. The default value is 0.5. The threshold value that is supplied must be of type STRUCT. A zero value for precision or recall means that the selected threshold produced no true positive labels. A NaN value for precision means that the selected threshold produced no positive labels, neither true positives nor false positives. If both table_name and query_statement are unspecified, you cannot use a threshold. Also, threshold cannot be used with multiclass logistic regression models.

Finally, we can use our model to predict who survived the Titanic disaster by applying our trained model to the test data. The output of the ML.PREDICT function has as many rows as the input table, and it includes all columns from the input table and all output columns from the model. The output column names for the model are predicted_ and (for logistic regression models) predicted__probs. In both columns, label_column_name is the name of the input label column used during training. The following query generates the prediction – 1 for ‘survived’ and 0 for ‘did not survive’, the probability and the name of the passengers on the ship.

SELECT 
  predicted_survived, 
  predicted_survived_probs, 
  name
FROM 
  ML.PREDICT (MODEL `bq_ml.test_model`, 
    (
  SELECT 
    * 
  FROM 
    `bq_ml.test_data`))

When executed, we can clearly see the prediction for each individual passenger name.

Conclusion

Why Use the Google Cloud Platform? The initial investment required to import data into the cloud is offset by some of the advantages offered by BigQuery. For example, as a fully-managed service, BigQuery requires no capacity planning, provisioning, 24×7 monitoring or operations, nor does it require manual security patch updates. You simply upload datasets to Google Cloud Storage of your account, import them into BigQuery, and let Google’s experts manage the rest. This significantly reduces your total cost of ownership (TCO) for a data handling solution. Growing datasets have become a major burden for many IT department using data warehouse and BI tools. Engineers have to worry about so many issues beyond data analysis and problem-solving. By using BigQuery, IT teams can get back to focusing on essential activities such as building queries to analyse business-critical customer and performance data. Also, BigQuery’s REST API enables you to easily build App Engine-based dashboards and mobile front-ends.

On the other hand, BigQuery is no panacea for all your BI issues. For starters, its ecosystem of tools and supporting platforms it can integrate with is substandard to that of other cloud data warehouse providers e.g. Amazon’s Redshift. To execute the queries from a sample IDE (DataGrip), it took me a few hours to configure a 3rd party ODBC driver (Google leaves you in the cold here) as BigQuery currently does not offer robust integration with any enterprise tool for data wrangling. Whilst Google has continually tried to reduce number of limitations and improve BigQuery features, the product has a number of teething warts which can be a deal breaker for some e.g. developers can’t rename, remove or change the type of a column (without re-writing the whole table), there is no option to create table as SELECT, there are two different SQL dialects etc. These omissions are small and insignificant in the grand scheme of things, however, touting itself as an enterprise product with those simple limitations in place is still a bit of a misnomer. Finally, BigQuery’s on-demand usage-based pricing model can be difficult to work with from procurement point of view (and therefore approved by the executives). Because there is no notion of instance type, you are charged by storage, streaming inserts and queries, which is unpredictable and may put some teams off (Google recently added Cost Control feature to partially combat this issue). Finally, Google’s nebulous road-map for the service improvements and features implementation, coupled with its sketchy track record for sun-setting its products if they do not they generate enough revenue (most big enterprises want stability and long-term support guarantee) do not inspire confidence.

All in all, as it is the case with the majority of big data vendors, any enterprise considering its adoption will have to weight up all the pros and cons the service comes with and decide whether they can live with BigQuery’s shortcomings. No tool or service is perfect but for those which have limiting dev-ops capability and want true, serverless platform capable of processing huge volumes of data very fast, BigQuery presents an interesting alternative to the current incumbents.

Tags: , , , , , ,

Speeding up SQL Server Data Warehouse Architecture With Automation Procedures – 10 Problem-Solution Scenarios To Jump-Start Your Development

February 7th, 2019 / 11 Comments » / by admin

Note: all code used in this post can be viewed and downloaded from my publicly accessible OneDrive folder HERE.

Most skilled BI/DW professionals, when planning and scoping to deliver a new data warehouse will choose to work with some sort of framework to automate and simplify certain common tasks e.g. data acquisition, index creation, schema rebuilt or synchronization etc. Over the years, I have design and built a large number of data marts and enterprise data warehouses using both industry standard as well as bespoke design patterns. It is fair to say that it often pays not to reinvent the wheel and reuse (where possible) some of the staple elements and code bases which may be applicable to those development activities. After all, time is money, and the quicker I can focus on delivering value through providing well-structured and rich data, the happier my clients will be. With that in mind, I thought it would be a good idea to share some of the code which handles many of the tedious but necessary tasks so that one can operationalize those with relative speed to start addressing a business or problem-specific architecture. In my experience, even most technology-savvy clients are not interested in the intricate details of how data is piped across the various domains and constructs. In the end, for most stakeholders, even the most elaborate and elegant solutions are just a part of a ‘shoe-box’ referred to as the data warehouse and until data can be realized and utilized through a report or a piece of analytics, code is just a means to an end – the quicker we can build it and make it work, the better.

The following tidbits of SQL provide a small collection of ‘utilities’ one can implement with zero to little modifications to automate activities such as index creation/dropping, foreign key creation/dropping, automated data acquisition from source databases, reconciling schema changes etc. All this code was developed using SQL Server platform as large chunk of my engagements still heavily rely on a virtual environment, on-premise or hybrid Microsoft SQL Server deployments. Also, rather than simply delivering the source code or a Github link, I tried to provide a common scenario or a problem statement, along with a short explanation of what it does and how it solves the after mentioned issue. In this structure, each problem statement roughly follows a sequence of tasks a typical architect or developer may need to address in the process of building a data acquisition framework i.e. check if source data/environment is accessible, build-up metadata for source and target environments, manage schema changes based on metadata, acquire data etc. This is not an exhaustive list of all activities and many of those may be redundant or not applicable to your specific scenario, but if you’re grappling with implementing a proof of concept or maybe a small scale project and look for a more defined hands-on springboard to take your architecture from idea to a set of tangible artifacts to jump-start development process, this may be of great help.

Also, it is worth pointing out that these scripts work well with POCs and prototypes as well as small projects, where speed and agility is paramount. Most enterprise data warehouse developments are not like that and require a fair amount of upfront planning and preparation (even when done in an agile manner). If your project is complex in nature and, for example, requires large systems integration, complex ETL/ELT functionality, changing APIs, non-relation/non-file-based data stores or comes with bespoke requirements, you may want to look at either buying an off-the-shelf framework or customize one which has already been developed. Building a new framework from scratch is probably a bad idea and since most DW core concepts have been left unchanged for many years, chances are it’s better to ‘stand on the shoulders of giants’ rather than trying to reinvent the wheel.

Problem 1

Check if Linked Server connection is active before data acquisition job can be initiated. If not, retry predefined number of times, waiting for a preset amount of time and if still not resolving notify the operator/administrator.

 

On a SQL Server platform, most database connections across number of different vendors can be configured using the good, old-fashion Linked Server functionality – in my experience it worked well on MSSQL, MySQL/MariaDB, PostgreSQL, Vertica, Oracle, Teradata and even Sqlite. Before data warehouse can be loaded with data from a source system (usually an externally hosted database), it’s worth checking if the connection we’ve created is active and resolving without issues. If not, we may want to wait for a few minutes (instead of failing the load completely) and if the problem persists, notify administrator to start the troubleshooting process. These issues are quite unlikely to appear and, in my personal experience, are mostly related to network failures and not source system availability problems but when they do surface, DW administrators are often left in the dark. The code for this small stored procedure can be downloaded from HERE.

Problem 2

Provide a selective list of objects, attributes and data and store this as metadata to be referenced at later stage e.g. schema rebuilt, data acquisition, indexes creation etc.

 

Often times, whether it’s because of security reasons (data viewed as a liability) or simply as a storage space-saving mechanism (largely uncommon these days), clients want to be able to selectively nominate a list of objects from a database, along with some of their attributes and data to be acquired. They may not want all tables and tables’ columns to be replicated in the staging area of the data warehouse. Furthermore, they may want to go down to the level of individual values from a given attribute. For example:

  • Out of 200 tables in the source system, they are only interested in 150
  • Since some of the columns are used for logging and metadata storage only, the want those excluded as they don’t provide any valuable information
  • Finally, some columns may contain a mixture of data with various levels of sensitivity e.g. staff and clients’ e-mail addressed. As a result, they want staff e-mail address values but client e-mail addresses should be masked or obfuscated for security and/or privacy reasons

There are a few different ways to address this issue, but the simplest solution would be to create three views containing some of the metadata mentioned above i.e. table names, column names etc. and load those into a small, three-table database which can be used as a reference point for individual objects, attributes and data exclusion/inclusion. Let’s see how that would work in practice.

After creating the target environment as well as sample linked server to an Azure SQL DB to further demonstrate this in practice (again, all T-SQL downloadable from HERE) I typically create three views which hold source system metadata. The content of those is loaded into a small three-table database which can be referenced in a number of ways. The reason I create intermediary views as opposed to only storing this data in tables is because it’s a lot easier to manage metadata in this format e.g. version control it, make changes and alternations etc. Once the views are created we can use a simple stored procedure to (re)build table schema, provide the initial data load and update data based on business requirements and source systems changes.

These three tables will become very helpful in providing the reference data for any other subsequent activities. For example, if we’d like to selectively specify object names for data acquisition tasks or run post-acquisition data reconciliation job, it’s a lot easier to have this information available on the local instance.

Problem 3

Dynamically manage schema changes in target database.

Changes to the source schema are usually communicated downstream, before going into production, so that data engineers can unit-test and validate them for impact on the data warehouse and ETL logic. However, in my experience, that’s often a pie-in-the-sky scenario and for teams/divisions with weak change control culture a highly likely event. Cloud technologies have offered tools to easily reconcile both: schema and data across various data sources, but in case I need to roll my own, I often rely on a script which can do it for me. The code (downloadable from HERE) stages source and target metadata and compares it for attributes such as data types, object and column names, numeric precision, numeric scale etc. If any changes are detected, it tries to replicate them on the target database, after which the comparison is run again to verify the fix.

I have used this code extensively for schema reconciliation between number of different vendor database e.g. MSSQL, MariaDB, PostgreSQL with only small modifications to account for idiosyncrasies between how data can be represented across those (SQL Server being the target/sink database). In ideal world schema changes should always be treated as a potential risk to downstream processing, with a proper chain of custody in place to account for potential issues. Howerer, in many cases, it’s either not possible or the approach taken is ‘too agile’ to ensure precautions are taken before those are communicated and deployed so if ensuring schemas are in sync is required, this small step (executed before data acquisition kick-off) can be of great benefit.

Problem 4

Acquire data from source system based on metadata input and changes.

 

Sometimes, the most difficult issue when dealing with new data mart development is source data acquisition. Once data has been imported into the target zone, it usually quite easy to mold, transform and apply business rules to. However, due to factors such as source-to- target schema changes, history and CDC tracking, delta values reconciliation etc., acquiring source data can sometimes turn into a challenging problem to solve. Most organizations turn to purchasing a COTS (Commercial, off-the-shelf) framework or developing one themselves if the business requirements are too bespoke to warrant going the commercial route. However, for quick projects with small amount of data, or even as a test/POC solution, we can just as easily create a Linked Server connection to the source database (if available) and build a simple acquisition pipeline to speed up and automate the whole process.

This code, run on schedule or in on-demand fashion, relies on the control (meta)data created as part of Problem 2 outline, acquiring source data in parallel, spooling multiple SQL Server Agent jobs. It is comprised of two separate stored procedures: (1) Parent module responsible for the metadata management and spawning SQL Server Agent Jobs and (2) Worker module which does the heavy-lifting i.e. inserts data into the target object based on metadata information passed down from its parent.

The number of jobs can be determined by the count of vCPUs/CPU cores available on the host machine and can even be further partitioned to ‘break-up’ individual objects using additional logic e.g. in case of very large tables, those can be broken up into smaller ‘chunks’ of data (using numeric value primary key filed) and SELECTED/INSERTED in parallel. Below is a sample logic to split a table’s data into 10 evenly-distributed blocks in T-SQL and Python.

--Create dynamic SQL
DECLARE @SQL NVARCHAR(MAX)
DECLARE @PK_Col_Name VARCHAR (100) = 'id'
DECLARE @Target_DB_Object_Name VARCHAR(1000) = 'Target_Object'
DECLARE @Proc_Exec_No INT = 10
DECLARE @Remote_Server_Name VARCHAR (100) = 'Source_Server_Name'

SET @SQL =				'DECLARE @R1 INT = (SELECT id FROM OPENQUERY ('+@Remote_Server_Name+', '	+CHAR(13)
SET @SQL = @SQL +		'''SELECT MIN('+@PK_Col_Name+') as id from '+@Target_DB_Object_Name+''')) '	+CHAR(13) 
SET @SQL = @SQL +		'DECLARE @R2 BIGINT = (SELECT id FROM OPENQUERY ('+@Remote_Server_Name+', '	+CHAR(13)
SET @SQL = @SQL +		'''SELECT (MAX('+@PK_Col_Name+')-MIN('+@PK_Col_Name+')+1)'					+CHAR(13)
SET @SQL = @SQL +		'/'+CAST(@Proc_Exec_No AS VARCHAR(10))+' as id FROM'						+CHAR(13)
SET @SQL = @SQL +		''+@Target_DB_Object_Name+'''))'											+CHAR(13)
SET @SQL = @SQL +		'DECLARE @R3 BIGINT = (SELECT id FROM OPENQUERY ('+@Remote_Server_Name+', '	+CHAR(13)
SET @SQL = @SQL +		'''SELECT MAX('+@PK_Col_Name+') as id from '+@Target_DB_Object_Name+''')) '	+CHAR(13) 
SET @SQL = @SQL +		'INSERT INTO #Ids_Range'													+CHAR(13)
SET @SQL = @SQL +		'(range_FROM, range_to)'													+CHAR(13)
SET @SQL = @SQL +		'SELECT @R1, @R1+@R2'														+CHAR(13)
SET @SQL = @SQL +		'DECLARE @z INT = 1'														+CHAR(13)
SET @SQL = @SQL +		'WHILE @z <= '+CAST(@Proc_Exec_No AS VARCHAR(10))+'-1'					   +CHAR(13)
SET @SQL = @SQL +		'BEGIN'																		+CHAR(13)
SET @SQL = @SQL +		'INSERT INTO #Ids_Range (range_FROM, range_TO) '							+CHAR(13)
SET @SQL = @SQL +		'SELECT LAG(range_TO,0) OVER (ORDER BY id DESC)+1, '						+CHAR(13)
SET @SQL = @SQL +		'CASE WHEN LAG(range_TO,0) OVER (ORDER BY id DESC)+@R2+1 >= @R3'			   +CHAR(13)
SET @SQL = @SQL +		'THEN @R3 ELSE LAG(range_TO,0) OVER (ORDER BY id DESC)+@R2+1 END'			+CHAR(13)
SET @SQL = @SQL +		'FROM tempdb..#Ids_Range WHERE @z = id'										+CHAR(13)
SET @SQL = @SQL +		'SET @z = @z+1'																+CHAR(13)
SET @SQL = @SQL +		'END'	
PRINT (@SQL)

--Take printed dynamic SQL and generate ranges (stored in a temporary table)
IF OBJECT_ID('tempdb..#Ids_Range') IS NOT NULL
BEGIN
    DROP TABLE #Ids_Range;
END;
CREATE TABLE #Ids_Range
(
    id SMALLINT IDENTITY(1, 1),
    range_FROM BIGINT,
    range_TO BIGINT
);

DECLARE @R1 INT =
        (
            SELECT id
            FROM OPENQUERY
                 (Source_Server_Name, 'SELECT MIN(id) as id from Target_Object')
        );
DECLARE @R2 BIGINT =
        (
            SELECT id
            FROM OPENQUERY
                 (Source_Server_Name, 'SELECT (MAX(id)-MIN(id)+1)
/10 as id FROM
Target_Object')
        );
DECLARE @R3 BIGINT =
        (
            SELECT id
            FROM OPENQUERY
                 (Source_Server_Name, 'SELECT MAX(id) as id from Target_Object')
        );
INSERT INTO #Ids_Range
(
    range_FROM,
    range_TO
)
SELECT @R1,
       @R1 + @R2;
DECLARE @z INT = 1;
WHILE @z <= 10 - 1
BEGIN
    INSERT INTO #Ids_Range
    (
        range_FROM,
        range_TO
    )
    SELECT LAG(range_TO, 0) OVER (ORDER BY id DESC) + 1,
           CASE
               WHEN LAG(range_TO, 0) OVER (ORDER BY id DESC) + @R2 + 1 >= @R3 THEN
                   @R3
               ELSE
                   LAG(range_TO, 0) OVER (ORDER BY id DESC) + @R2 + 1
           END
    FROM tempdb..#Ids_Range
    WHERE @z = id;
    SET @z = @z + 1;
END;

SELECT * FROM #Ids_Range
def split_into_ranges(start, end, parts):
    ranges = []
    x = round((end - start) / parts)
    for _ in range(parts):
        ranges.append([start, start + x])
        start = start + x + 1
        if end - start <= x:
            remainder = end - ranges[-1][-1]
            ranges.append([ranges[-1][-1] + 1, ranges[-1][-1] + remainder])
            break
    return ranges

Problem 5

Capture stored procedure/SSIS package execution errors and exceptions information for analysis.

 

SQL Server comes with a built-in mechanism to capture various levels of information and metadata on SSIS package execution. However, since my preference has mostly been to use SSIS as an orchestration engine, executing finely tuned and optimized SQL code, and not relying on ‘black box’ SSIS transformations, I needed to devise a way to capture and persist execution errors and related data in a central place which can be reported out of. Since I have already written about this at length in my other two blog posts, mainly HERE and HERE, for brevity, I won’t be repeating this information again. All the code (two stored procedures – one for creating and one for updating database objects) can be found HERE. The schema created by the script is as per below and the way to reference and integrate other code against it can be found in my two previous posts: PART 1 and PART 2.

Once populated with target instance metadata, captured errors can be visualized and analyzed. The following image depicts a simple dashboard outlining various error metrics and their associated attributes.

Problem 6

Persist and historize data changes across all objects and attributes.

 

Old-school approach to ensuring that source data changes e.g. deletion, updates etc. are captured and stored in the data warehouse rely heavily on the concept on Slowly Changing Dimension (SCD). Depending on which SCD type is used (more on this topic can be read in the Wikipedia article HERE), data changes do not overwrite the initial value(s) but create a separate record or column to persist any modifications on the nominated set of attributes. This operation is typically reserved for dimensions as facts are transactional in nature thus not subjected to changes. The problem with this approach is that data is required to be shaped and molded in a very specific way, sometimes loosing some of its ‘raw’ value along the way in favor of adhering to a specific modelling methodology. Sometimes it would be great to re-delegate history persistence functionality to the ‘area’ responsible for staging source system data, across all objects and fields. I have already written a post on building relational data lake (link HERE) so reiterating this content is out of scope but it’s fair to say that with ever-changing landscape for how data is captured and stored, allowing for the maximum flexibility in how it can be modeled should be paramount.

There are many patterns which can facilitate implementing relational data lake but this simple design allows for a clever way of persisting all application data changes (one the core requirements of any data warehouse, irrespective of modelling approach taken) along with providing robust foundations for provisioning virtual data marts to satisfy reporting and analytical data needs. The virtual data marts can be easily implemented as views, with the required business rules embedded inside the SQL statements and, if required for performance reasons, materialized or physically persisted on disk or in memory.

This framework relies on creating a custom, performance-optimized data flow logic along with four databases which together handle transient application data (as acquired from the source system), staged data and finally data containing all transitional changes (inserts, deletes and updates) across all history. Again, repeating my previous post outlining all the details is out of scope here so for a comprehensive overview of how this method works for history persistence and performantly handling data changes across large systems please refer to the previous post HERE.

Problem 7

Use metadata-driven approach for indexes creation and dropping.

 

If a large number of indexes is required to be created on the nominated objects (or dropped prior to data load execution to speed up the process), it is nice to have a way to store it as a metadata, which in turn can be referenced in an automated fashion. Again, this type of requirement can be addressed in a number of ways but probably the simplest solution would be to have a piece of code which can do it for us on demand, relying on metadata stored in dedicated objects (as per Problem 2), where number of values can be passed as parameters to target specific use case e.g. drop/create indexes using same codebase, do it across the whole schema or just individual objects etc.

The code (downloadable from HERE) can be used as part of the ETL logic to either DROP or CREATE indexes on one or more tables in the nominated database and schema. It does not cater for indexes rebuilt or reorganization (I may add this functionality to a later version) but it simplifies indexes management and can be used for automation.

Problem 8

Use metadata-driven approach for foreign keys constraints creation and dropping.

 

As it is the case with Problem 6, providing metadata-driven approach to any repeatably-executed operation may be a good investment in time taken to initially develop the code. This SQL can be taken and plugged into a new or existing ETL process if referential integrity across facts and dimensions is required. It relies on the ‘control’ database table entries which makes it easy to reference without having to hard-code individual SQL statements and can be used for both: dropping and creating foreign key constraints. The code is published HERE.

Problem 9

Check data acquisition status and validate record count.

 

Finally, if we need to ensure that no errors were raised during data acquisition (logged in AdminDBA table as per Problem 5 description) and check if all records were copied successfully (this only relies on record count and does not take changes/alterations into consideration so use with caution) we can use the solution I outlined previously HERE and a small piece of SQL for record count comparison. The SQL part is very straightforward (this implementation relies on MSSQL to MSSQL comparison) and relies on a static threshold value of 0.001% permissible variance between source and target across all tables. This is so that when sourcing data from a ‘live’ system, where data is being subjected to standard CRUD operations, small differences i.e. smaller than 0.001% of all record count do not raise exceptions or errors. There’s also an option to get source record counts from two different places: the ‘Control’ database created earlier as per Problem 2 outline or the actual source system. The C# code on the other hand is an SSIS package implementation and relies on the output from both: msdb database and error logging database (AdminDBA) for acquisition job execution status. The stored procedure can be downloaded from HERE whereas a full description of the SSIS code can be viewed HERE.

Problem 10

Notify administrator(s) on execution failure

 

As with a lot of solutions in this space, there is nearly an infinite amount of ways one could create a process of notifications and alerts. For larger projects this is usually the ETL framework’s core functionality and most well-respected and popular tools, cloud or on-premise, do it to some extent. For small projects e.g. where a small data mart is populated using SSIS or SQL, I usually plug-in a small stored procedure which wraps some extra functionality around the SQL Server’s native msdb.dbo.sp_send_dbmail process. Providing the Database Mail has been enabled on the target instance (beyond the content of this post), a small piece of code can send out an e-mail message containing key pieces of data on what exactly failed, along with some peripheral information helping administrator(s) identify and troubleshoot a problem. Most developers these days also like Slack or other IM applications integration but for a small project with infrequent level of alerting, email notification is still the best way to raise potential issues.

Below is a sample email sent out by the ETL pipeline containing a small snippet of HTML code (click on image to enlarge). It provides a simple template for outlining all necessary pieces of data required to identify the problem e.g. affected object name, process name, executed package name, date and tie of the event as well as a link to the report which one can access to gain the ‘big picture’ view from the AdminDBA database as per what’s outlined in Problem 5.

The stored procedure execution can be triggered by the output parameter of another task (a simple IF…THEN logic) and relies on a number of parameters typically passed as values from a SSIS package. It’s execution is also preceded by running a function responsible for acquiring individual e-mail addresses of individuals required to be notified (these are stored in the AdminDBA database as per Problem 5 description). In this way multiple administrators can be notified of an issue. All other parameters’ values are derived or come from the SSIS package itself. And again, all this code (including the aforementioned function) can be downloaded from my OneDrive folder HERE.

There you go – a simple list of ten potential problems and corresponding solutions one may face and need to resolve when provisioning a data mart or data warehouse on a SQL Server environment.

Tags: , , ,