Kicking the tires On tSQLt – An Open-Source Unit Testing Framework for SQL Server Data Validation and Quality Assurance

December 1st, 2022 / No Comments » / by admin

Introduction

The importance of unit testing does not have to be conveyed to most database developers and data engineers – it’s common knowledge that testing should be an integral part of any project which relies on data quality and integrity. However, in reality, testing is often delegated to a pool of activities which are considered too time consuming and not directly contributing to a project success. Just like documentation, testing is often an afterthought even though it practice is saves money and time and can be reliably integrated into DevOps pipelines (continuous testing) alongside continuous delivery and continuous integration.

Unit testing is a software testing method which aims to test a discrete piece of code. The word “unit” refers to the smallest piece of code that can be tested separately, for example, a function or a procedure that can be tested in isolation. In a database solution, the “unit” is typically a stored procedure, a trigger or a user-defined function. Each test must verify one condition at a time. Tests should reflect software requirements. If we do so we will get a series of discrete tests by each unit. Verifying one condition at a time, it will be possible to identify which conditions have not been verified and which ones meet the requirements, confirming whether all the individual parts work.

tSQLt is a powerful, open-source framework for SQL Server unit testing and it has become a vital part of the modern database development approach. It’s free to use, integrates well into Microsoft SQL Server landscape and does not require developers to learn other programming languages or platforms. Unit tests are automatically executed in the transaction log – we don’t need any data clean-up work after the unit tests because every data manipulation process is rolled back. Additionally, tSQLt allows using mocked (fake) objects (a mocked object simulates the real object’s behavior so the tested objects do not affect other dependencies) and can be integrated into SSDT projects or 3rd party software. There are other unit testing frameworks out there which offer more functionality and wider use case coverage e.g. the GreatExpectations, however, these often rely on extensive configuration stored in JSON and YAML files, Python interpreter with additional dependencies and libraries installed and familiarity with Python programming concepts. This makes tSQLt a great, self-contained, SQL-based tool which can be easily integrated into most workflows with minimal fuss.

In this post I’ll outline how to approach unit testing of a Data Warehouse built on Microsoft SQL Server platform and outline some of the strategies for data validation and QA.

Environment Prep and tSQLt framework installation

Setting up tSQLt is pretty straightforward but to demonstrate its functionality on a sample database, we will start with downloading the venerable WideWorldImporters (WWI) database backup file for Azure SQL DB and deploying it in Azure – this will be our source. Additionally, we will also download and deploy WideWorldImporters SQL Server 2016 (OLTP) and data warehouse (OLAP) database backups and restore those in our target environment. The link to the aforementioned files on the Microsoft’s GitHub repo is HERE. I won’t go into the process of restoring individual databases on both environments in this post but when finished, we should have the sample WWI SQL DB in Azure in both – Azure and the local environment, mirroring our source and target, as well as WWI DW database in a local environment only. The two can be interfaced using a Linked Server functionality so that we can compare our source and target as part of our data acquisition unit testing.

We will install tSQLt is in the WWI DW database for this demo. I’d personally prefer to have a logical isolation across the suite of tests and DW objects and place tSQLt in its own “workspace” but in this case we’re following best practices and tSQLt schema with all its associated objects will be co-located with the DW tables and stored procedures.

Installing tSQLt is comprised of only a few steps:

  • Download the latest version of tSQLt framework and extract the zipped-up file content
  • Execute PrepareServer.sql file
  • Execute tSQLt.class.sql

Once we have the framework in place we can create a Linked Server connection to the Azure-hosted database. With this setup, we now have an environment which resembles some of the staple Microsoft SQL Server deployments – WWI Azure SQL DB (OLTP) serving as our remote data source and WWI and WWI DW database (local) serving as our staging/landing and data warehouse databases.

Testing Framework Architecture Components

The following diagram (click on image to enlarge) outlines basic architecture components behind the collection of tests, in tSQLt also known as classes. Is identifies four principal domains which provide a logical container for a suite of tests:

  • Test Landing zone objects’ existence
  • Test DW zone objects’ existence
  • Test primary keys across Landing DW zones align and match – denotes all data has been successfully acquired
  • Test business transformation rules

tSQLt comes with a plethora of other assertions and expectations-based features but in this instance, we will limit the scope to the above four applications. The four classes (implemented as schemas) we will generate will fulfill the following testing functions:

_wwi_local_objects – this test class generates assertion-based stored procedures for testing WWI OLTP database objects existence in the local environment. Each individual stored procedure is created by a “parent“ create_landing_objects_tests stored procedure which loops over objects stored in Azure-provisioned WWI database table to source the external metadata and compare it to the local version. It uses tSQLt.AssertObjectExists tSQLt built-in assertion and throws an error (test failed output) if expected local object name is not present in the target database. Typically, this information should come from a dedicated metadata database which stores more than just object names, but for simplicity, in this case it’s a simple SELECT over the Linked Server pointing to Azure SQL DB.

/*
1. Create _wwi_local_objects test class and associated stored procedures
*/
EXEC tSQLt.NewTestClass '_wwi_local_objects';
GO

CREATE OR ALTER PROCEDURE _wwi_local_objects.[create_wwi_local_objects_tests]
AS
BEGIN
    DECLARE @Local_Object_Name VARCHAR(256);
	DECLARE @Local_Schema_Name VARCHAR (56);
	DECLARE @Local_DB_Name VARCHAR (256)
    DECLARE @SQL NVARCHAR(MAX);
    DECLARE db_cursor CURSOR FOR
    SELECT *
    FROM OPENQUERY([WWI], 'SELECT	TRIM(LOWER(TABLE_NAME)), 
									TRIM(LOWER(TABLE_SCHEMA)), 
									TRIM(LOWER(TABLE_CATALOG))
							FROM WideWorldImporters.INFORMATION_SCHEMA.TABLES')
    OPEN db_cursor;
    FETCH NEXT FROM db_cursor
    INTO @Local_Object_Name, @Local_Schema_Name, @Local_DB_Name
    WHILE @@FETCH_STATUS = 0
    BEGIN
        SET @SQL = N'CREATE OR ALTER PROCEDURE _wwi_local_objects.[test that ' + @Local_Object_Name + ' object exists] AS ' +CHAR (13)
        SET @SQL = @SQL + N'BEGIN '																					+CHAR (13)
		SET @SQL = @SQL + 'EXEC tSQLt.AssertObjectExists '															+CHAR (13)
		SET @SQL = @SQL + '@objectName = '''+ @Local_DB_Name+'.'+ @Local_Schema_Name+'.'+ @Local_Object_Name+''' '	+CHAR (13)
		SET @SQL = @SQL + ',@message = ''Nominated object '''''+@Local_Object_Name+''''' cannot be found in '		+CHAR (13)
		SET @SQL = @SQL + ''''''+@Local_Schema_Name+''''' schema and '''''+@Local_DB_Name+''''' database.'' '		+CHAR (13)
		SET @SQL = @SQL + 'END';
		EXEC (@SQL)
        FETCH NEXT FROM db_cursor
        INTO @Local_Object_Name, @Local_Schema_Name, @Local_DB_Name;
    END;

    CLOSE db_cursor;
    DEALLOCATE db_cursor;
END;
GO

EXEC  _wwi_local_objects.[create_wwi_local_objects_tests]
GO
--EXEC tSQLt.Run '_wwi_local_objects'

_dw_objects – this test class generates assertion-based stored procedures for testing WWI DW objects existence. Each individual stored procedure is created by a “parent“ create_dw_objects_tests stored procedure which loops over WWI DW metadata output. It uses tSQLt.AssertObjectExists tSQLt built-in assertion and throws an error (test failed output) if expected WWI DW object name is not present in the target database. Just as the previous class, this information should be coming from a dedicated metadata catalog and not system views but for this demo, it’s the simplest possible configuration we’re after.

/*
2. Create _wwidw_objects test class and associated stored procedures
*/
EXEC tSQLt.NewTestClass '_wwidw_objects';
GO

CREATE OR ALTER PROCEDURE _wwidw_objects.[create_dw_objects_tests]
AS
BEGIN
    DECLARE @Local_Object_Name VARCHAR(256);
	DECLARE @Local_Schema_Name VARCHAR (56);
	DECLARE @Local_DB_Name VARCHAR (256)
    DECLARE @SQL NVARCHAR(MAX);
    DECLARE db_cursor CURSOR FOR
    SELECT TRIM(LOWER(TABLE_CATALOG)),
			TRIM(LOWER(TABLE_SCHEMA)),
			TRIM(LOWER(TABLE_NAME))
	FROM WideWorldImportersDW.INFORMATION_SCHEMA.TABLES
    OPEN db_cursor;
    FETCH NEXT FROM db_cursor
    INTO   @Local_DB_Name, @Local_Schema_Name,@Local_Object_Name
    WHILE @@FETCH_STATUS = 0
    BEGIN
        SET @SQL = N'CREATE OR ALTER PROCEDURE _wwidw_objects.[test that ' + @Local_Object_Name + ' object exists] AS ' +CHAR (13)
        SET @SQL = @SQL + N'BEGIN '																					+CHAR (13)
		SET @SQL = @SQL + 'EXEC tSQLt.AssertObjectExists '															+CHAR (13)
		SET @SQL = @SQL + '@objectName = '''+ @Local_DB_Name+'.'+ @Local_Schema_Name+'.'+ @Local_Object_Name+''' '	+CHAR (13)
		SET @SQL = @SQL + ',@message = ''Nominated object '''''+@Local_Object_Name+''''' cannot be found in '		+CHAR (13)
		SET @SQL = @SQL + ''''''+@Local_Schema_Name+''''' schema and '''''+@Local_DB_Name+''''' database.'' '		+CHAR (13)
		SET @SQL = @SQL + 'END';
		EXEC (@SQL)
        FETCH NEXT FROM db_cursor
        INTO @Local_DB_Name, @Local_Schema_Name,@Local_Object_Name
    END;

    CLOSE db_cursor;
    DEALLOCATE db_cursor;
END;
GO

EXEC  _wwidw_objects.[create_dw_objects_tests]
GO
--EXEC tSQLt.Run '_wwidw_objects'

_oltp_to_dw_pks – this test class generates assertion-based stored procedures for testing source (Azure hosted) to local WWI database primary key values comparison. It uses metadata extracted from SQL Server system views to source primary key(s) for each table and tSQLt.AssertEqualsTable built-in assertion to compare their values for each object nominated object.

/*
3. Create _wwi_remote_to_wwi_local_pks test class and associated stored procedures
*/
EXEC tSQLt.NewTestClass '_wwi_remote_to_wwi_local_pks';
GO

CREATE OR ALTER PROCEDURE _wwi_remote_to_wwi_local_pks.[create_wwi_remote_to_wwi_local_objects_pks_tests]
AS
BEGIN
	DECLARE @Error_Message NVARCHAR (MAX)

	DECLARE @SQL NVARCHAR(MAX)
	DECLARE @Source_Server_Name NVARCHAR (1024) = 'WWI'
	DECLARE @Source_DB_Name NVARCHAR (1024) = 'WideWorldImporters'
	DECLARE @Source_Schema_Name NVARCHAR (1024) = 'WideWorldImporters'
	DECLARE @Local_Schema_Name NVARCHAR (1024)=  'dbo'
	DECLARE @Local_DB_Name VARCHAR (1024) = 'WideWorldImporters'


	IF OBJECT_ID('tempdb..##exclude_tables') IS NOT NULL
	BEGIN
	    DROP TABLE ##exclude_tables;
	END;
	CREATE TABLE ##exclude_tables
	(
	    table_name VARCHAR(256)
	);
	INSERT INTO ##exclude_tables
	(
	    table_name
	)
	SELECT 'ColdRoomTemperatures'
	UNION ALL
	SELECT 'VehicleTemperatures';
	
	
	
	IF OBJECT_ID('tempdb..#temp_oltp_to_dw_metadata') IS NOT NULL
	BEGIN
	    DROP TABLE #temp_oltp_to_dw_metadata;
	END;
	CREATE TABLE #temp_oltp_to_dw_metadata
	(
	    [id] INT IDENTITY(1, 1),
	    [object_name] VARCHAR(256) NOT NULL,
	    [schema_name] VARCHAR(256) NOT NULL,
	    [db_name] VARCHAR(256) NOT NULL,
	    [primary_keys] VARCHAR(256) NOT NULL,
	    [origin] VARCHAR(56) NOT NULL
	);
	SET @SQL =			'WITH temp_data AS (SELECT * '
	SET @SQL = @SQL +	'FROM OPENQUERY(' + @Source_Server_Name + ',''SELECT '  												+CHAR(13)			
	SET @SQL = @SQL +	't.name as table_name, c.name AS column_name, ss.name as schema_name, '									+CHAR(13)
	SET @SQL = @SQL +	'tp.name AS data_type ,c.max_length AS character_maximum_length, '										+CHAR(13)						
	SET @SQL = @SQL +	'CASE WHEN indx.object_id IS NULL THEN 0 ELSE 1 END AS ''''is_primary_key''''	'						+CHAR(13)
	SET @SQL = @SQL +	'FROM sys.tables t'																						+CHAR(13)
	SET @SQL = @SQL +	'JOIN sys.columns c ON t.object_id = c.object_id'														+CHAR(13)
	SET @SQL = @SQL +	'JOIN sys.types tp ON c.user_type_id = tp.user_type_id'													+CHAR(13)
	SET @SQL = @SQL +	'JOIN sys.objects so ON so.object_id = t.object_id'														+CHAR(13)
	SET @SQL = @SQL +	'JOIN sys.schemas ss ON so.schema_id = ss.schema_id'													+CHAR(13)
	SET @SQL = @SQL +	'LEFT JOIN (SELECT	ic.object_id, ic.column_id '														+CHAR(13)
	SET @SQL = @SQL +	'FROM sys.indexes AS i '																				+CHAR(13)
	SET @SQL = @SQL +	'INNER JOIN sys.index_columns AS ic '																	+CHAR(13)
	SET @SQL = @SQL +	'ON i.OBJECT_ID = ic.OBJECT_ID AND i.index_id = ic.index_id '											+CHAR(13)
	SET @SQL = @SQL +	'WHERE   i.is_primary_key = 1) indx ON so.object_id = indx.object_id AND c.column_id = indx.column_id '	+CHAR(13)
	SET @SQL = @SQL +	'WHERE t.is_memory_optimized <> 1 AND tp.is_user_defined = 0 '											+CHAR(13)
	SET @SQL = @SQL +	'AND t.type = ''''u'''''' ) a) ' 																		+CHAR(13)
	SET @SQL = @SQL +   'INSERT INTO #temp_oltp_to_dw_metadata (db_name,schema_name,object_name,primary_keys,origin)'			+CHAR(13)
	SET @SQL = @SQL +	'SELECT '''+@Source_DB_Name+''' as remote_db_name, '													+CHAR(13)													
	SET @SQL = @SQL +   'schema_name AS remote_schema_name, table_name AS remote_table_name '									+CHAR(13)
	SET @SQL = @SQL +	', STRING_AGG(column_name, '','') WITHIN GROUP (ORDER BY column_name ASC) AS primary_keys '				+CHAR(13)
	SET @SQL = @SQL +	', ''Remote'' as Origin'																				+CHAR(13)
	SET @SQL = @SQL +	'FROM temp_data WHERE is_primary_key = 1 '																+CHAR(13)
	SET @SQL = @SQL +   'AND  table_name COLLATE DATABASE_DEFAULT NOT IN (SELECT table_name FROM ##exclude_tables) '			+CHAR(13)
	SET @SQL = @SQL +   'GROUP BY table_name, schema_name'																		+CHAR(13)

	EXEC(@SQL)

	SET @SQL = ''		
	SET @SQL = @SQL +	'WITH temp_data AS (SELECT * FROM (SELECT t.name as table_name, ss.name as schema_name, '				+CHAR(13)
	SET @SQL = @SQL +	'c.name AS column_name, tp.name AS data_type,'															+CHAR(13)
	SET @SQL = @SQL +	'c.max_length AS character_maximum_length, CASE WHEN indx.object_id IS NULL '							+CHAR(13)
	SET @SQL = @SQL +	'THEN 0 ELSE 1 END AS ''is_primary_key'''																+CHAR(13)
	SET @SQL = @SQL +	'FROM    '+@Local_DB_Name+'.sys.tables t'																+CHAR(13)
	SET @SQL = @SQL +	'JOIN '+@Local_DB_Name+'.sys.columns c ON t.object_id = c.object_id '									+CHAR(13)
	SET @SQL = @SQL +	'JOIN '+@Local_DB_Name+'.sys.types tp ON c.user_type_id = tp.user_type_id '								+CHAR(13)
	SET @SQL = @SQL +	'JOIN '+@Local_DB_Name+'.sys.objects so ON so.object_id = t.object_id '									+CHAR(13)
	SET @SQL = @SQL +	'JOIN '+@Local_DB_Name+'.sys.schemas ss ON so.schema_id = ss.schema_id '								+CHAR(13)
	SET @SQL = @SQL +	'LEFT JOIN (SELECT	ic.object_id, ic.column_id '														+CHAR(13)
	SET @SQL = @SQL +	'FROM '+@Local_DB_Name+'.sys.indexes AS i INNER JOIN '+@Local_DB_Name+'.sys.index_columns AS ic ON '	+CHAR(13)
	SET @SQL = @SQL +	'i.OBJECT_ID = ic.OBJECT_ID AND i.index_id = ic.index_id '												+CHAR(13)
	SET @SQL = @SQL +	'WHERE   i.is_primary_key = 1) indx ON so.object_id = indx.object_id AND c.column_id = indx.column_id '	+CHAR(13)
	SET @SQL = @SQL +	'WHERE t.type = ''u'' AND t.is_memory_optimized <> 1 AND tp.is_user_defined = 0)a)'						+CHAR(13)
	SET @SQL = @SQL +   'INSERT INTO #temp_oltp_to_dw_metadata (db_name,schema_name,object_name,primary_keys,origin)'			+CHAR(13)
	SET @SQL = @SQL +	'SELECT '''+@Local_DB_Name+''' as db_name, '															+CHAR(13)													
	SET @SQL = @SQL +   'schema_name AS schema_name, table_name AS object_name '												+CHAR(13)
	SET @SQL = @SQL +	', STRING_AGG(column_name, '','') WITHIN GROUP (ORDER BY column_name ASC) AS primary_keys, '			+CHAR(13)
	SET @SQL = @SQL +	'''Local'' as Origin'																					+CHAR(13)
	SET @SQL = @SQL +	'FROM temp_data '																						+CHAR(13)
	SET @SQL = @SQL +	'WHERE table_name COLLATE DATABASE_DEFAULT NOT IN (SELECT table_name FROM ##exclude_tables) '			+CHAR(13)
	SET @SQL = @SQL +   'AND is_primary_key = 1 GROUP BY table_name, schema_name'												+CHAR(13)			
	
	EXEC(@SQL)

	IF EXISTS
        (
            SELECT a.[object_name], a.primary_keys
            FROM #temp_oltp_to_dw_metadata a
            WHERE a.[Origin] = 'Remote'
			EXCEPT
			SELECT b.[object_name], b.primary_keys
            FROM #temp_oltp_to_dw_metadata b
            WHERE b.[Origin] = 'Local'
        )
		AND NOT EXISTS (SELECT TOP (1) 1 FROM #temp_oltp_to_dw_metadata)
        BEGIN
            SET @Error_Message
                = 'Tables and their corresponding primary keys cannot be reconciled between "' + @Local_DB_Name
                  + '" database and "' + @Local_DB_Name
                  + '" database. Please troubleshoot!';
            RAISERROR(   @Error_Message, -- Message text.
                         16,             -- Severity.
                         1               -- State.
                     );
            RETURN;
        END;
	
	DECLARE @object_name VARCHAR(256);
	DECLARE @schema_name VARCHAR (56);
	DECLARE @db_name VARCHAR (256);
	DECLARE @primary_keys VARCHAR (1024);
	DECLARE @Object_Data_Percentage VARCHAR(3) = '100'

    DECLARE db_cursor CURSOR FOR 
    SELECT [db_name], [schema_name], [Object_Name], [Primary_Keys]
	FROM #temp_oltp_to_dw_metadata
	WHERE [Origin] = 'remote'

    OPEN db_cursor;
    FETCH NEXT FROM db_cursor
    INTO   @db_name, @schema_name,@object_name, @Primary_keys
    WHILE @@FETCH_STATUS = 0
    BEGIN
		SET @SQL = ''
        SET @SQL = @SQL + 'CREATE OR ALTER PROCEDURE _wwi_remote_to_wwi_local_pks.[test that primary keys for ' + @Object_Name + ' match across source and target] AS '	+CHAR (13)
        SET @SQL = @SQL + 'BEGIN '																															+CHAR (13)
		SET @SQL = @SQL + 'DROP TABLE IF EXISTS #remote_' + LOWER(@Object_Name) + ''																		+CHAR (13)
		SET @SQL = @SQL + 'SELECT '+@Primary_Keys+' INTO #remote_' + LOWER(@Object_Name) + ''																+CHAR (13)
		SET @SQL = @SQL + 'FROM OPENQUERY(' + @Source_Server_Name + ',''SELECT TOP '+@Object_Data_Percentage+' PERCENT '									+CHAR (13)
		SET @SQL = @SQL + ''+@Primary_Keys+' FROM '																											+CHAR (13)
		SET @SQL = @SQL + ''+@DB_Name+'.'+@Schema_Name+'.'+@Object_Name+''')'																				+CHAR (13)
		SET @SQL = @SQL + 'DROP TABLE IF EXISTS #local_' + @Object_Name + ''																				+CHAR (13)
		SET @SQL = @SQL + 'SELECT TOP '+@Object_Data_Percentage+' PERCENT '+@Primary_Keys+' INTO #local_'+@Object_Name+''									+CHAR (13)
		SET @SQL = @SQL + 'FROM '+@db_name+'.'+@schema_name+'.'+@object_name+''																				+CHAR (13)
		SET @SQL = @SQL + 'EXEC tSQLt.AssertEqualsTable @Expected = #remote_'+@object_name+', @Actual = #local_'+@object_name+''							+CHAR (13)
		SET @SQL = @SQL + ',@message = ''The primary keys comparison for ' + @Object_Name + ' table resulted in differences being detected.'''				+CHAR (13)					
		SET @SQL = @SQL + 'END'																																+CHAR (13)
		EXEC (@SQL)
        FETCH NEXT FROM db_cursor
        INTO @DB_Name, @Schema_Name,@Object_Name, @Primary_Keys
    END;

    CLOSE db_cursor;
    DEALLOCATE db_cursor;
	
END;
GO

EXEC  _wwi_remote_to_wwi_local_pks.[create_wwi_remote_to_wwi_local_objects_pks_tests]
GO
--EXEC tSQLt.Run '_wwi_remote_to_wwi_local_pks'

_wwi_to_wwidw_businessrules – this test class is used for custom business rules validation and comparison across WWI and WWI DW databases. Each test case corresponds to a rule which mostly dictates that a source-to-target comparison is made between a value or a series of values and conforming to an expected result based on a predefined business logic. In the below examples, we’re comparing sales/order quantities, tax and customer ids or city ids across WideWorldImporters (OLTP) and WideWorldImportersDW (OLAP) databases to ensure that the transformed data (facts and dimensions) persisted in the DW database is in line with its raw counterpart in the transactional layer. Also, to simplify the SQL code, I only included the first 100 records across both data sets as denoted by the SELECT TOP 100 statetements. This is due to an additional logic required to reconcile source and target outputs for all the records – not really relevant for this demo but something that ought to be removed in a typical testing scenario where all-encompassing (not partial) testing is required.

/*
4. Create _wwi_to_wwdw_business_rules test class and associated stored procedures
*/
EXEC tSQLt.NewTestClass '_wwi_to_wwidw_businessrules';
GO

CREATE OR ALTER PROCEDURE _wwi_to_wwidw_businessrules.[test that sales quantity, tax and customer id values match across source and target]
AS
BEGIN
    DROP TABLE IF EXISTS #source;
    SELECT TOP 100
           SUM(il.Quantity) AS sales_quantity,
           SUM(il.ExtendedPrice - il.TaxAmount) AS total_excluding_tax,
           SUM(il.ExtendedPrice) AS total_including_tax,
           c.CustomerID AS customer_id
    INTO #source
    FROM [WideWorldImporters].Sales.Invoices AS i
        INNER JOIN [WideWorldImporters].Sales.InvoiceLines AS il
            ON i.InvoiceID = il.InvoiceID
        INNER JOIN [WideWorldImporters].Sales.Customers AS c
            ON i.CustomerID = c.CustomerID
    GROUP BY c.CustomerID
    ORDER BY c.CustomerID;


    DROP TABLE IF EXISTS #target;
    SELECT TOP 100
           SUM(fs.Quantity) AS sales_quantity,
           SUM(fs.[Total Excluding Tax]) AS total_excluding_tax,
           SUM(fs.[Total Including Tax]) AS total_including_tax,
           dc.[WWI Customer ID] AS customer_id
    INTO #target
    FROM WideWorldImportersDW.Fact.[Sale] fs
        JOIN WideWorldImportersDW.Dimension.Customer dc
            ON fs.[Customer Key] = dc.[Customer Key]
    WHERE dc.[WWI Customer ID] <> 0
    GROUP BY dc.[WWI Customer ID]
    ORDER BY dc.[WWI Customer ID];

    EXEC tSQLt.AssertEqualsTable @Expected = #source,
                                 @Actual = #target,
                                 @Message = 'Sales data reconciliation between WideWorldImporters and WideWorldImportersDW resulted in differences being detected.';
END;


CREATE OR ALTER PROCEDURE _wwi_to_wwidw_businessrules.[test that order quantity, tax and city id/name values match across source and target]
AS
BEGIN
    DROP TABLE IF EXISTS #source;
    SELECT TOP 100
           SUM(ol.Quantity) AS quantity,
           SUM(ol.UnitPrice) AS total_unit_price,
           SUM(ROUND(ol.Quantity * ol.UnitPrice, 2)) AS total_excluding_tax,
           SUM(ROUND(ol.Quantity * ol.UnitPrice, 2) + ROUND(ol.Quantity * ol.UnitPrice * ol.TaxRate / 100.0, 2)) AS total_including_tax,
           c.DeliveryCityID AS wwi_city_id,
           ci.CityName AS city
    INTO #source
    FROM [WideWorldImporters].Sales.Orders AS o
        INNER JOIN [WideWorldImporters].Sales.OrderLines AS ol
            ON o.OrderID = ol.OrderID
        INNER JOIN [WideWorldImporters].Sales.Customers AS c
            ON c.CustomerID = o.CustomerID
        INNER JOIN [WideWorldImporters].[Application].[Cities] ci
            ON c.DeliveryCityID = ci.CityID
    GROUP BY c.DeliveryCityID,
             ci.CityName
    ORDER BY c.DeliveryCityID;


    DROP TABLE IF EXISTS #target;
    SELECT TOP 100
           SUM([Quantity]) AS quantity,
           SUM([Unit Price]) AS total_unit_price,
           SUM([Total Excluding Tax]) AS total_excluding_tax,
           SUM([Total Including Tax]) AS total_including_tax,
           dc.[WWI City ID] AS wwi_city_id,
           dc.City AS city
    INTO #target
    FROM WideWorldImportersDW.Fact.[Order] fo
        JOIN WideWorldImportersDW.Dimension.City dc
            ON fo.[City Key] = dc.[City Key]
    GROUP BY dc.[WWI City ID],
             dc.City
    ORDER BY dc.[WWI City ID];

    EXEC tSQLt.AssertEqualsTable @Expected = #source,
                                 @Actual = #target,
                                 @Message = 'Order data reconciliation between WideWorldImporters and WideWorldImportersDW resulted in differences being detected.';
END;
--EXEC tSQLt.Run '_wwi_to_wwidw_businessrules';

_wwi_local_objects, _dw_objects and _oltp_to_dw_pks classes’ tests are auto-generated based on rules defined in a “parent” stored procedures. This means each test case is created following the same pattern and the output stored procedures are structurally the same. For example, _target_objects class (schema) generates “children” stored procedures, each of those responsible for an isolated testing unit.

_wwi_to_wwidw_businessrules class of tests, on the other hand, requires SQL to follow business rules applied to the transformation pipelines and cannot be auto generated from metadata.

Now that we have all the pieces of the puzzle in place, let’s run all the tests across the four classes we generated and see if we have any issues with the underlying data. In order to run all tests in sequence we issue EXEC tSQLt.RunALL command and after a short while, the following output should be generated in SSMS (click on image to enlarge).

As we can see, all but one tests were successful. The one which failed is the result of Azure SQL DB specific table being present in the source WWI database and since this table only applies to Azure MI and Azure SQL DB, next time, we can add it to our exclusion rules. Another important note to raise is that in the above demo examples, object names and associated schemas and databases were sourced directly from system tables. In a real world scenario, all this information should be stored in a metadata catalog – the main source for reference and control data.

Reporting

tSQLt framework captures test run history in a TestResult table, however due to ephemeral nature of this data, every time a test of suite of tests are run, this data is purged. Additionally, we may want to augment this data with additional attributes and turn it into a reporting data source. This way, long-term longitudinal analytics can draw from a series of test executions, enabling administrators to pinpoint areas of concerns or where repeating issues have been occuring. For this reason, we will create an additional schema and table mimicking TestResult output (for long-term data retention), add a few columns storing additional information we can use in our visualization tool and finally create a view which ties it all together.

/*
1. Create tSQLtRunHistory database schema and TestResults reporting table
*/
CREATE SCHEMA [tSQLtRunHistory];
GO

CREATE TABLE [tSQLtRunHistory].[TestResults]
(
    [Id] [BIGINT] IDENTITY(1, 1) NOT NULL,
    [Server] [NVARCHAR](MAX) NOT NULL,
    [LoginUserName] [VARCHAR](MAX) NOT NULL,
    [HostName] [VARCHAR](MAX) NOT NULL,
    [Database] [VARCHAR](MAX) NOT NULL,
    [Class] [NVARCHAR](MAX) NOT NULL,
    [TestCase] [NVARCHAR](MAX) NOT NULL,
    [Name] AS ((QUOTENAME([Class]) + '.') + QUOTENAME([TestCase])),
    [TranName] [NVARCHAR](MAX) NULL,
    [Result] [NVARCHAR](MAX) NULL,
    [Msg] [NVARCHAR](MAX) NULL,
    [TestStartTime] [DATETIME2](7) NOT NULL,
    [TestEndTime] [DATETIME2](7) NULL,
    CONSTRAINT [pk_tsqltrunhistory_testresults]
        PRIMARY KEY CLUSTERED ([Id] ASC)
        WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,
              ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF
             ) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY];
GO

ALTER TABLE [tSQLtRunHistory].[TestResults]
ADD CONSTRAINT [df_testresults(TestStartTime)]
    DEFAULT (SYSDATETIME()) FOR [TestStartTime];
GO


/*
2. Create tSQLtRunHistory vw_TestResults reporting view
*/
CREATE VIEW [tSQLtRunHistory].[vw_TestResults]
AS
SELECT [Id],
       [Server] AS Server_Name,
       [LoginUserName] AS User_Name,
       [HostName] AS Host_Name,
       [Database],
       [Class] AS Test_Class,
       [TestCase] AS Test_Case_Name,
       [Name] AS Class_Test_Case_Name,
       [TranName] AS Transaction_Name,
       [Result] AS Test_Result,
       CASE
           WHEN [Msg] IS NULL
                OR [Msg] = '' THEN
               'Nominal Results Recorded.'
           ELSE
               [Msg]
       END AS Output_Message,
       CASE
           WHEN [Class] IN ( '_wwi_local_objects', '_wwidw_objects' )
                AND [TestCase] LIKE 'test that%' THEN
               LOWER(SUBSTRING(
                                  [TestCase],
                                  CHARINDEX('test that', [TestCase]) + LEN('test that'),
                                  CHARINDEX('object exists', [TestCase]) - LEN('object exists') + 2
                              )
                    )
           WHEN [Class] IN ( '_wwi_remote_to_wwi_local_pks' )
                AND [TestCase] LIKE 'test that%' THEN
               LOWER(SUBSTRING(
                                  [TestCase],
                                  CHARINDEX('test that primary keys for', [TestCase])
                                  + LEN('test that primary keys for'),
                                  CHARINDEX('match across source and target', [TestCase])
                                  - LEN('match across source and target') + 2
                              )
                    )
           ELSE
               'Unspecyfied object'
       END AS Tested_Object_Name,
       [TestStartTime] AS Test_Start_Date_Time,
       [TestEndTime] AS Test_End_Date_Time,
       CAST([TestEndTime] AS DATE) AS Test_End_Date,
       [Year_Month_Number] = CAST(DATENAME(YEAR, [TestEndTime]) + FORMAT([TestEndTime], 'MM') AS INT),
       [Month_Year_Name] = LEFT(DATENAME(MONTH, [TestEndTime]), 3) + ' - ' + DATENAME(YEAR, [TestEndTime])
FROM [tSQLtRunHistory].[TestResults];
GO


/*
3. Make appropriate changes to the tSQLt framework to account for the new reporting tables and how tests execution data is logged/stored
*/
ALTER PROCEDURE [tSQLt].[NullTestResultFormatter]
AS
BEGIN
    INSERT INTO [tSQLtRunHistory].[dbo].[TestResults]
    (
        [Server],
        [LoginUserName],
        [HostName],
        [Database],
        [Class],
        [TestCase],
        [TranName],
        [Result],
        [Msg],
        [TestStartTime],
        [TestEndTime]
    )
    SELECT @@SERVERNAME,
           SUSER_NAME(),
           HOST_NAME(),
           DB_NAME(),
           [Class],
           [TestCase],
           [TranName],
           [Result],
           [Msg],
           [TestStartTime],
           [TestEndTime]
    FROM [tSQLt].[TestResult];
    RETURN 0;
END;
GO


CREATE OR ALTER PROCEDURE [tSQLt].[SaveTestResultFormatter]
AS
BEGIN
    INSERT INTO [tSQLtRunHistory].[TestResults]
    (
        [Server],
        [LoginUserName],
        [HostName],
        [Database],
        [Class],
        [TestCase],
        [TranName],
        [Result],
        [Msg],
        [TestStartTime],
        [TestEndTime]
    )
    SELECT @@SERVERNAME,
           SUSER_NAME(),
           HOST_NAME(),
           DB_NAME(),
           [Class],
           [TestCase],
           [TranName],
           [Result],
           [Msg],
           [TestStartTime],
           [TestEndTime]
    FROM [tSQLt].[TestResult];
END;
GO

DECLARE @RC INT;
DECLARE @Formatter NVARCHAR(4000);

SET @Formatter = N'tSQLt.SaveTestResultFormatter'; --'tSQLt.DefaultResultFormatter'

EXECUTE @RC = [tSQLt].[SetTestResultFormatter] @Formatter;
GO

Once we have enough data accrued, we can visualize it using any BI tool. The following is a sample Qlik Sense dashboard depicting individual tests results as well as some aggregate level metrics (click on image to enlarge).

This post outlined a few simple use cases for tSQLt framework unit testing applications. I only scratched the surface with what’s possible with its built-in assertions and expectations as tSQLt toolkit can also test triggers, stored procedures logic, constraints and more. While there are other, more feature-rich and comprehensive unit testing frameworks available, tSQLt integrates well into Microsoft SQL Server landscape, can run as part of your automated builds and does not require developers to learn other programming languages, tools or platforms. It’s really easy to configure and one can become productive with it in a matter of minutes. Most developers do not like writing tests but frameworks like tSQLt make the process of integrating unit tests into the database development lifecycle a breeze.

Tags: , , , ,

Data Build Tool (DBT) – The Emerging Standard For Building SQL-First Data Transformation Pipelines – Part 1

September 27th, 2022 / 4 Comments » / by admin

Note: Part 2 of this post can be found HERE.

Introduction

It’s never a dull moment when working in IT and although Data Warehousing domain was not subjected to the hamster wheel of relentless innovation in the first few decades when Oracle, IBM, Microsoft and SAP reigned supreme, with the advent of cloud computing, it too had to adapt and change. For me, the most prolific changes included the separation of storage and compute, in-database machine learning, on-demand elasticity and server-less database models. The resulting upending of the status quo also had a large impact on the good, old-fashion ETL (Extract Transform Load) paradigm which started to shift to the new, more cloud-aligned architecture and many businesses contemplating Data Warehouse modernization are jumping on the ELT bandwagon. This is also where a suite of new tools started to emerge, and one company with its flagship product started to make serious inroads in this market.

dbt (data build tool) emerged as a development framework that combines modular SQL with software engineering best practices to make data transformation reliable, fast, and fun. It makes data engineering activities accessible to people with data analyst skills to transform the data in the warehouse using simple SELECT statements, effectively creating your entire transformation process with code. You can write custom business logic using SQL, automate data quality testing, deploy the code, and deliver trusted data with a comprehensive documentation side-by-side with the code. This is more important today than ever due to the shortage of data engineering professionals in the marketplace. Anyone who knows SQL can now build production-grade data pipelines, reducing the entry barriers that previously limited staffing capabilities for legacy technologies. In short, dbt turns your data analysts into engineers and allows them to own the entire analytics engineering workflow.

dbt has two core workflows: building data models and testing data models. It fits nicely into the modern data stack and is cloud agnostic – meaning it works within each of the major cloud ecosystems: Azure, GCP, and AWS. However, the biggest advantage of dbt is its new approach to building pipelines which traditionally have been quite clunky and inefficient. Some of the most prolific issues with the standard ETL workflow are:

  • The schema within data warehouses is often strongly defined and controlled. The emphasis in ETL was therefore on getting data into the warehouse in the correct “one true” format, putting the burden on the people loading the data and making the process of getting data into the warehouse slow and fragile.
  • This warehouse and the ETL processes would usually be managed by centralized data teams. These teams would be a fairly siloed bottleneck, always behind with the needs of the business for integrating and transforming the data.
  • The ETL stacks and scripts would often be fragile, error prone, and difficult and slow to change.
  • The tools providing ETL would often be GUI based and proprietary. Not only would they be expensive to license, they would also require specialist skills. This meant that neither the producers or consumers of the data would have access to the ETL scripts or the ability to make changes to them.
  • Bringing ETL into anything which defines a software development lifecycle was tricky. For instance, the ETL process was always identified as being difficult to source control, version and test. Implementing the concept of development, test and production environments with accurate data management was also way behind the state of the art in the software development world.

With dbt, many of the above shortcomings have been addressed, improving reliability, repeatability and collaboration by breaking down organizational silos, and reducing time to market.

Environment Prep and Sample Data

Before I jump into what makes dbt such a powerful framework, I’d like to set the stage and outline the following mocked up example of building an end-to-end pipeline using modern data architecture tools to firstly acquire and load Google Analytics data into an Azure environment and finally to transform it using dbt framework. I believe that rather than installing dbt and running a few scripts to outline its core features it’s better to showcase it on a tangible mini-project which accurately reflects some of the problems many business may be grappling with. For this purpose, I will be following the below script:

  • Stand up Azure environment (using Azure Python SDK), including Azure SQL database, Azure Data Lake gen2 (ADLS) and associated Resource Group. Technically this part can be done using any cloud provider or even on-premises environment but since modern data stack tends to rely on APIs and tools available from major public cloud vendors, this is in line with more contemporary information architecture and management practices
  • Build a simple pipeline to acquire Google Analytics data and stage it in ADLS storage as well as Azure SQL database. This script can also be run as Azure Function to create automated, ETL-like process
  • Install dbt and the supporting SQL Server connector on an isolated local environment
  • Augment GA data with geocoding information to build a simple, one-table data mart using SQL and dbt Jinja templates
  • Test our data, create a snapshot using dbt functionality and generate sample project documentation

Firstly, let’s provision a sample Azure environment consisting of a dedicated resource group as well as Azure Data Lake Gen 2 and Azure SQL database.

from azure.identity import AzureCliCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.storage import StorageManagementClient
from azure.storage.filedatalake import DataLakeServiceClient
from azure.mgmt.sql import SqlManagementClient
from humanfriendly import format_timespan
from timeit import default_timer as timer
import time
import pyodbc
from os import popen


_RESOURCE_GROUP_NAME = 'gademoresourcegroup2022'
_RESOURCE_GROUP_LOCATION = 'australiaeast'
_STORAGE_ACCOUNT_NAME = 'gademostorageacct2022'
_STORAGE_CONTAINER_NAME = 'gademooutputfiles2022'
_SUBSCRIPTION_ID = 'your_subscription_id'
_DF_LINKED_SERVICE_NAME = 'lsoutputfiles'
_SQL_SERVER_NAME = 'gademosqlserver2022'
_SQL_DB_NAME = 'sourcedb'
_SQL_USERNAME = 'testusername'
_SQL_PASSWORD = 'MyV3ry$trongPa$$word'
_SQL_DRIVER = '{ODBC Driver 18 for SQL Server}'
external_IP = popen("curl -s ifconfig.me").readline()


# create resource group
def create_resource_group(resource_client, _RESOURCE_GROUP_NAME, _LOCATION):
    print("Creating Azure Resource Group {rg_name}...".format(
        rg_name=_RESOURCE_GROUP_NAME), end="", flush=True)
    try:
        resource_client.resource_groups.create_or_update(
            _RESOURCE_GROUP_NAME, {'location': _LOCATION})
    except Exception as e:
        print(e)
    rg = [g.name for g in resource_client.resource_groups.list()]
    if _RESOURCE_GROUP_NAME in rg:
        print('OK')


# create storage account in the nominated resource group
def create_storage_account(storage_client, _STORAGE_ACCOUNT_NAME, _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION):
    print("Creating Azure Storage Account {st_acct}...".format(
        st_acct=_STORAGE_ACCOUNT_NAME), end="", flush=True)
    try:
        availability_result = storage_client.storage_accounts.check_name_availability(
            {'name': _STORAGE_ACCOUNT_NAME})
        if not availability_result.name_available:
            print('storage name {st_acct} is already in use. Try another name.'.format(
                st_acct=_STORAGE_ACCOUNT_NAME))
            exit()
        poller = storage_client.storage_accounts.begin_create(_RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME,
                                                              {
                                                                  "location": _RESOURCE_GROUP_LOCATION,
                                                                  "kind": "StorageV2",
                                                                  "is_hns_enabled": "true",
                                                                  "sku": {"name": "Standard_LRS", "tier": "Standard"},
                                                                  "properties": {
                                                                      "minimumTlsVersion": "TLS1_2",
                                                                      "allowBlobPublicAccess": "true",
                                                                      "networkAcls": {
                                                                          "bypass": "AzureServices",
                                                                          "virtualNetworkRules": [],
                                                                          "ipRules": [],
                                                                          "defaultAction": "Allow"
                                                                      }
                                                                  }})
        account_result = poller.result()
        if account_result.name == _STORAGE_ACCOUNT_NAME:
            print('OK')
    except Exception as e:
        print(e)


# create storage container aka 'filesystem' in the nominated storage account
def create_adls_container(_STORAGE_ACCOUNT_NAME, _STORAGE_CONTAINER_NAME):
    print("Creating Azure Data Lake Storage Container {st_ct}...".format(
        st_ct=_STORAGE_CONTAINER_NAME), end="", flush=True)
    keys = storage_client.storage_accounts.list_keys(
        _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME)
    account_url = "https://{}.dfs.core.windows.net/".format(
        _STORAGE_ACCOUNT_NAME)
    datalake_service = DataLakeServiceClient(
        account_url=account_url, credential=keys.keys[0].value
    )
    try:
        datalake_service.create_file_system(
            file_system=_STORAGE_CONTAINER_NAME)
        file_systems = [i.name for i in datalake_service.list_file_systems()]
        if _STORAGE_CONTAINER_NAME in file_systems:
            print('OK')
    except Exception as e:
        print(e)


# create azure sql server in the nominated resource group
def create_sql_server(sql_client, _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME,
                      _RESOURCE_GROUP_LOCATION, _SQL_USERNAME, _SQL_PASSWORD):
    print("Creating Azure SQL Server {ssvr_name}...".format(
        ssvr_name=_SQL_SERVER_NAME), end="", flush=True)
    try:
        sql_server = sql_client.servers.begin_create_or_update(
            _RESOURCE_GROUP_NAME,
            _SQL_SERVER_NAME,
            {
                'location': _RESOURCE_GROUP_LOCATION,
                'version': '12.0',
                'administrator_login': _SQL_USERNAME,
                'administrator_login_password': _SQL_PASSWORD
            }
        )
        sql_server.wait()
    except Exception as e:
        print(e)
    ssvr = [i.name for i in sql_client.servers.list_by_resource_group(
        _RESOURCE_GROUP_NAME)]
    if _SQL_SERVER_NAME in ssvr:
        print('OK')


# create azure sql db in the nominated resource group
def create_sql_db(sql_client, _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME, _SQL_DB_NAME, _RESOURCE_GROUP_LOCATION):
    print("Creating Azure SQL Database {db_name}...".format(
        db_name=_SQL_DB_NAME), end="", flush=True)
    try:
        sql_db = sql_client.databases.begin_create_or_update(
            _RESOURCE_GROUP_NAME,
            _SQL_SERVER_NAME,
            _SQL_DB_NAME,
            {
                'location': _RESOURCE_GROUP_LOCATION,
                'collation': 'SQL_Latin1_General_CP1_CI_AS',
                'create_mode': 'default',
                'requested_service_objective_name': 'Basic'
            }
        )
        sql_db.wait()
    except Exception as e:
        print(e)
    dbs = [i.name for i in sql_client.databases.list_by_server(
        _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME)]
    if _SQL_DB_NAME in dbs:
        print('OK')


# configure azure sql server firewall to accept connections from the host ip address
def configure_firewall(sql_client, _SQL_DRIVER, _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME, _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD, external_IP):
    print("Configuring Azure SQL Server Firewall Settings...", end="", flush=True)
    try:
        sql_client.firewall_rules.create_or_update(
            _RESOURCE_GROUP_NAME,
            _SQL_SERVER_NAME,
            "firewall_rule_name_" + external_IP,
            {
                "startIpAddress": external_IP,
                "endIpAddress": external_IP
            }
        )
    except Exception as e:
        print(e)
    _AZURE_SQL_SERVER = _SQL_SERVER_NAME + '.database.windows.net'
    with pyodbc.connect('DRIVER='+_SQL_DRIVER+';SERVER='+_AZURE_SQL_SERVER+';PORT=1433;DATABASE='+_SQL_DB_NAME+';UID='+_SQL_USERNAME+';PWD='+_SQL_PASSWORD) as conn:
        with conn.cursor() as cursor:
            cursor.execute("SELECT @@version")
            row = cursor.fetchone()
    if row:
        print('OK')



if __name__ == '__main__':
    print("\n")
    execution_start_time = timer()
    credentials = AzureCliCredential()
    storage_client = StorageManagementClient(credentials, _SUBSCRIPTION_ID)
    resource_client = ResourceManagementClient(credentials, _SUBSCRIPTION_ID)
    sql_client = SqlManagementClient(credentials, _SUBSCRIPTION_ID)
    resource_groups = [i.name for i in resource_client.resource_groups.list()]
    if _RESOURCE_GROUP_NAME in resource_groups:
        print("Deleting existing resource group{res_gr}...".format(
            res_gr=_RESOURCE_GROUP_NAME), end="", flush=True)
        delete_async_operation = resource_client.resource_groups.begin_delete(
            _RESOURCE_GROUP_NAME)
        delete_async_operation.wait()
        print('OK')

    create_resource_group(
        resource_client, _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION)
    create_storage_account(storage_client, _STORAGE_ACCOUNT_NAME,
                           _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION)
    create_adls_container(_STORAGE_ACCOUNT_NAME, _STORAGE_CONTAINER_NAME)
    create_sql_server(sql_client, _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME,
                      _RESOURCE_GROUP_LOCATION, _SQL_USERNAME, _SQL_PASSWORD)
    create_sql_db(sql_client, _RESOURCE_GROUP_NAME,
                  _SQL_SERVER_NAME, _SQL_DB_NAME, _RESOURCE_GROUP_LOCATION)
    configure_firewall(sql_client, _SQL_DRIVER, _RESOURCE_GROUP_NAME,
                       _SQL_SERVER_NAME, _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD, external_IP)
    execution_end_time = timer()
    elapsed_duration = execution_end_time - execution_start_time
    print('Elapsed resources(s) provisioning time was {time}.\n'.format(
        time=format_timespan(elapsed_duration)))

Running the above script produces the following output, providing we have the Azure subscription set up and configured on the local environment.

Now that we have all artefacts supporting Google Analytics data acquisition in place, let’s start by defining GA attributes we’d like to source, and stage those in our data lake and SQL database. For this exercise I used data from my own website – the one you’re reading right now – and restricted it to last 30 days and the following attributes: PagePath, PageTitle, Country, City, Medium, DeviceCategory, OperatingSystem, Browser and SessionDuration. I won’t go into how to set up GA account in this post as there are countless other internet resources on this topic and most of this code is self-explanatory. The only thing that was unnecessarily frustrating and took me a while to figure out was creating a service account and providing it access to my GA view, as denoted by _GA_Service_ACCT_KEY (JSON file) and _GA_VIEW_ID variables. Getting the account and its key generated was not a problem but modifying security details so that the service account could access the view was quite convoluted. The following script is responsible for GA data acquisition, tabulating and formatting it into a Pandas data frame and inserting it into the provisioned Azure SQL DB (table object is also created/truncated as part of this code).

from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
from pathlib import PureWindowsPath
from azure.storage.filedatalake import DataLakeServiceClient
from azure.mgmt.storage import StorageManagementClient
from azure.identity import AzureCliCredential
import pandas as pd
import pyodbc
import time


_GA_SCOPES = ['https://www.googleapis.com/auth/analytics.readonly']
_GA_VIEW_ID = 'your_ga_view_id'
_GA_OUTPUT_FILE_NAME = 'GADataExtract-'+time.strftime("%Y%m%d-%H%M%S")+'.csv'
_GA_Service_ACCT_KEY = PureWindowsPath('C:/your_file_path/your_json_service_account_key_file.json')
_SQL_SERVER_NAME = 'gademosqlserver2022.database.windows.net'
_SQL_DB_NAME = 'sourcedb'
_SQL_USERNAME = 'testusername'
_SQL_PASSWORD = 'MyV3ry$trongPa$$word'
_SQL_DRIVER = '{ODBC Driver 18 for SQL Server}'
_RESOURCE_GROUP_NAME = 'gademoresourcegroup2022'
_STORAGE_CONTAINER_NAME = 'gademooutputfiles2022'
_STORAGE_ACCOUNT_NAME = 'gademostorageacct2022'
_STORAGE_CONTAINER_DIRECTORY_NAME = time.strftime("%Y%m%d")
_SUBSCRIPTION_ID = 'your_subscription_id'
_GA_OUTPUT_FILE_PATH = PureWindowsPath('C:/your_file_path/{file_name}'.format(
        file_name=_GA_OUTPUT_FILE_NAME))
_SCHEMAS = ['stg', 'mart']

# get Google Analytics service account credentials
def initialize_analyticsreporting():
    credentials = ServiceAccountCredentials.from_json_keyfile_name(
        _GA_Service_ACCT_KEY, _GA_SCOPES)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics


def get_report(analytics):
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': _GA_VIEW_ID,
                    'dateRanges': [{'startDate': '30daysAgo', 'endDate': 'today'}],
                    'metrics': [{'expression': 'ga:sessions'}],
                    'dimensions': [{"name": "ga:pagePath"}, {"name": "ga:pageTitle"}, {"name": "ga:country"}, {"name": "ga:city"}, {"name": "ga:medium"}, {"name": "ga:deviceCategory"}, {"name": "ga:operatingSystem"}, {"name": "ga:browser"}],
                    'orderBys': [{"fieldName": "ga:sessions", "sortOrder": "DESCENDING"}],
                    'pageSize': 1000
                }]
        }
    ).execute()


def ga_response_dataframe(response):
    row_list = []
    for report in response.get('reports', []):
        column_header = report.get('columnHeader', {})
        dimension_headers = column_header.get('dimensions', [])
        metric_headers = column_header.get(
            'metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            row_dict = {}
            dimensions = row.get('dimensions', [])
            date_range_values = row.get('metrics', [])

            for header, dimension in zip(dimension_headers, dimensions):
                row_dict[header] = dimension

            for i, values in enumerate(date_range_values):
                for metric, value in zip(metric_headers, values.get('values')):
                    if ',' in value or '.' in value:
                        row_dict[metric.get('name')] = float(value)
                    else:
                        row_dict[metric.get('name')] = int(value)

            row_list.append(row_dict)
    return pd.DataFrame(row_list)


# upload a file to data lake in Azure
def upload_file_to_lake(storage_client, ga_file_content, _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME, _STORAGE_CONTAINER_NAME, _STORAGE_CONTAINER_DIRECTORY_NAME, _GA_OUTPUT_FILE_PATH, _GA_OUTPUT_FILE_NAME):
    keys = storage_client.storage_accounts.list_keys(
        _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME)
    account_url = "https://{}.dfs.core.windows.net/".format(
        _STORAGE_ACCOUNT_NAME)
    service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
        "https", _STORAGE_ACCOUNT_NAME), credential=keys.keys[0].value)
    file_system_client = service_client.get_file_system_client(
        file_system=_STORAGE_CONTAINER_NAME)
    dir_client = file_system_client.get_directory_client(
        _STORAGE_CONTAINER_DIRECTORY_NAME)
    dir_client.create_directory()
    file_client = dir_client.create_file(_GA_OUTPUT_FILE_NAME)
    file_client.append_data(ga_file_content, 0, len(ga_file_content))
    file_client.flush_data(len(ga_file_content))


# create required database schemas
def create_stg_schema(_SQL_DRIVER, _SQL_SERVER_NAME, _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD, _SCHEMAS):
    with pyodbc.connect('DRIVER='+_SQL_DRIVER+';SERVER='+_SQL_SERVER_NAME+';PORT=1433;DATABASE='+_SQL_DB_NAME+';UID='+_SQL_USERNAME+';PWD=' + _SQL_PASSWORD) as conn:
        with conn.cursor() as cursor:
            for schema in _SCHEMAS:
                cursor.execute('''IF (NOT EXISTS (SELECT TOP 1 (1) FROM sys.schemas WHERE name = '{schema}')) 
                                BEGIN
                                    EXEC ('CREATE SCHEMA [{schema}] AUTHORIZATION [dbo]')
                                END'''.format(schema=schema))


# create required objects in the nomainated database and populate with data
def insert_into_azuresql(ga_data, _SQL_DRIVER, _SQL_SERVER_NAME, _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD):
    with pyodbc.connect('DRIVER='+_SQL_DRIVER+';SERVER='+_SQL_SERVER_NAME+';PORT=1433;DATABASE='+_SQL_DB_NAME+';UID='+_SQL_USERNAME+';PWD=' + _SQL_PASSWORD) as conn:
        with conn.cursor() as cursor:
            if not cursor.tables(table='ga_data', tableType='TABLE').fetchone():
                cursor.execute('''CREATE TABLE dbo.ga_data (ID INT IDENTITY (1,1),
                                                            PagePath NVARCHAR(1024),
                                                            PageTitle NVARCHAR (2048),
                                                            Country NVARCHAR (256),
                                                            City NVARCHAR (256),
                                                            Medium NVARCHAR (256),
                                                            DeviceCategory NVARCHAR (512),
                                                            OperatingSystem VARCHAR (128),
                                                            Browser NVARCHAR (256),
                                                            SessionDuration INT)''')  
                cursor.commit()
            for index, row in ga_data.iterrows():
                cursor.execute('''INSERT INTO dbo.ga_data
                                    (PagePath,
                                    PageTitle,
                                    Country,
                                    City,
                                    Medium,
                                    DeviceCategory,
                                    OperatingSystem,
                                    Browser,
                                    SessionDuration)
                          values (?, ?, ?, ?, ?, ?, ?, ?, ?)''',
                               row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8])
                cursor.commit()
            cursor.execute('SELECT TOP (1) 1 FROM dbo.ga_data')
            rows = cursor.fetchone()
            if rows:
                print('All Good!')
            else:
                raise ValueError(
                    'No data generated in the source table. Please troubleshoot!'
                )


def main():
    credentials = AzureCliCredential()
    storage_client = StorageManagementClient(credentials, _SUBSCRIPTION_ID)
    analytics = initialize_analyticsreporting()
    response = get_report(analytics)
    df = ga_response_dataframe(response)
    df.columns = [x.replace(':', '_') for x in df.columns]
    ga_data = df.replace('(none)', '').replace(
        '(not set)', '')
    ga_data.to_csv(_GA_OUTPUT_FILE_NAME, index=False)
    with open(_GA_OUTPUT_FILE_PATH) as file:
        ga_file_content = file.read()
    upload_file_to_lake(storage_client, ga_file_content, _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME,
                        _STORAGE_CONTAINER_NAME, _STORAGE_CONTAINER_DIRECTORY_NAME, _GA_OUTPUT_FILE_PATH, _GA_OUTPUT_FILE_NAME)
    create_stg_schema(_SQL_DRIVER, _SQL_SERVER_NAME,
                      _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD, _SCHEMAS)
    insert_into_azuresql(ga_data, _SQL_DRIVER, _SQL_SERVER_NAME,
                         _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD)
    
if __name__ == '__main__':
    main()

When executed, our Azure SQL DB table is created (if it does not exist) and GA data inserted into the ADLS container and the aforementioned table (click on image to enlarge).

Now that we have our GA data, lets launch into how dbt can help us with shaping it by building our sample ELT pipeline.

DBT Installation

Installing dbt is a fairly straightforward affair. One can also go with a Docker container or WSL approach but in this post I’ll outline the steps to perform a local installation in a Python virtual environment on a Windows system. Providing Python is already installed, let’s go ahead and set up our virtual environment – a self-contained Python installation.

python -m venv dbt_env
\dbt_env\Scripts\activate.ps1

Once set up and activated, we can install dbt using pip and install dbt-sqlserver adapter as dbt does not support SQL Server out-of-the-box. You can find the official GitHub repo for it will all the supporting documentation in the following LINK.

pip install dbt-core
pip install dbt-sqlserver

Next, we will initialize our demo dbt project using dbt init command and provide a new name for the project we’re creating (in this case it’s azure_sql_demo). Please also note that newer versions of dbt only allow for lower case letters and underscores being used when specifying the project name.

As we can see from the output, a sample profiles.yml file containing placeholder configuration was created. Depending on which option we selected i.e. (1) for BigQuery, (2) for PostgreSQL etc., the default profiles.yml file contains only generic properties or get created in an empty state. This will need to be amended to reflect our Azure SQL Server environment details e.g. user name, password, driver etc. As profiles.yml file contains database connections and credentials (sensitive information), it is generated in the ~/.dbt/ folder and not the project folder. On the other hand, the configuration file, the main file defining settings which apply to the whole project called dbt_project.yml, contains placeholders for development and production environment. Let’s go ahead populate profiles.yml file with the required information, ensuring that the profile name from dbt_project.yml matches that from profiles.yml file.

azure_sql_demo:
    target: dev
    outputs:
        dev:
            type: sqlserver
            driver: SQL Server
            server: demosqlserver2022.database.windows.net
            database: sourcedb
            port: 1433
            schema: stg
            user: testusername
            password: MyV3ry$trongPa$$word

Finally, we can check our target database connectivity to ensure all the parameters have been entered correctly by running dbt debug command as per below. It’s a good sign if you see no error messages at this point and all the critical outputs (color-coded in green) are telling us that all checks have passed.

In the next part of this post I will dive deeper in the functionality dbt provides out of the box and some of its features, for example, snapshots, tests, docs and more. You can view part 2 of this series HERE.

Tags: , , , , , ,