Building rapid data import interfaces with PySimpleGUI and Streamlit

December 5th, 2022 / 3 Comments » / by admin

Note: All code from this post can be downloaded from my OneDrive folder HERE.

Introduction

The proliferation of low-code or no-code solutions for building simple apps has taken a solid foothold in the industry and a lot can be achieved with minimal effort and supporting code. As we enter an era of AI-enabled development, we’re seeing a lot of productivity gains from complex linguistic framework e.g. ChatGPT which can take requirements as input and generate comprehensive code in a matter of seconds. This creates an interesting conundrum – are developers automating themselves out of their jobs or are they making their work more enjoyable by ‘outsourcing’ the most mundane parts of their job to focus on what’s truly important – providing business value. It’s a discussion for a separate post but there’s no denying that the days of manually crafting application ‘scaffolding’ and reinventing the wheel are coming to an end as more of us lean towards using mobile, desktop or Web frameworks, ORMs, predefined libraries and packages or other adaptive software development approaches to expedite output delivery.

For those who need to provide a visual interface for either data entry or data output, there is a plethora of choices out there. Building a simple data entry form or a bare-bones site with a few widgets to allow end-users to interact with the content typically involves a combination of multiple technologies and languages but it’s surprising how much functionality can be achieved using some of the frameworks available in Python. In this post I’d like to explore how anyone can build a simple flat file data import interface for SQL Server. The need for this type of solution came out of multiple requests from non-technical clients needing to interface data sources familiar to them e.g. CSV, Excel with their internal database(s) and with a few button clicks upload/insert the required dataset to augment or change upstream reporting or analytics. It’s nothing cutting-edge and most visualisation tools provide the means to do a low-level data prep through pseudo ETL-like process e.g. Power BI Data Flows or Tableau Prep Builder but they still require technical expertise and the knowledge of underlying schemas and structures. Sometimes, all that’s required is a simple interface with a few widgets – that’s where frameworks like PySimpleGUI and Streamlit shine.

For this exercise, let’s look at a very simple interface for validating and loading flat file data into a database table. The objective is to provide end-users with an app which allows them to:

  • Authenticate to Microsoft SQL Server instance running on their network using either Windows Authentication or SQL Server authentication
  • Validate their supplied credentials
  • Search for the required file on their file system
  • Select a table this data needs to be loaded into
  • Run a series of validation steps and if no issues are detected, load the CSV file into the nominated table

Let’s dive in and see how quick and productive one can be using these two frameworks.

PySimpleGUI

Launched in 2018, PySimpleGUI is a python library that wraps tkinter, Qt (pyside2), wxPython and Remi (for browser support), allowing very fast and simple-to-learn GUI programming. Your code is not required to have an object-oriented architecture – one of the major obstacles when writing GUI applications – which makes the package usable by a larger audience. PySimpleGUI code is simpler and shorter than writing directly using the underlying framework because PySimpleGUI implements much of the “boilerplate code” for you. Additionally, interfaces are simplified to require as little code as possible (half to 1/10th) to get the desired result.

This app interface can be designed in many different ways but for this exercise, I have kept the functionality and output to minimum and when executing app.py file we get the following output.

PySimpleGUI allows for creating bespoke windows and layouts using lists – in this example I’ve defined three to account for the separation between database login-specific elements, file search-specific elements and text output elements. These three are then combined using the ‘layout’ list which also ‘draws’ a frame around each of them to make these three areas visually distinct. Next, we have a function responsible for all the heavy listing i.e. data validation and load and finally the main method binding it all together. I’m not going to go over PySimpleGUI API as it’s one of those projects which has a very comprehensive support documentation. It also features over 300 Demo Programs which provide many design patterns for you to learn how to use PySimpleGUI and how to integrate PySimpleGUI with other packages and a cookbook featuring recipes covering many different scenarios. It’s by far one of the best documented open-source projects I have came across. To pip-install it into you default Python interpreter or a virtual environment simply run one of the following lines:

pip install pysimplegui
or
pip3 install pysimplegui

The following short snippet of Python is responsible for most of this little app’s functionality, proving that minimal amount of code is required to build a complete interface which fulfills all requirements specyfied.

import PySimpleGUI as sg
import os
import csv
from pathlib import Path
import helpers as helpers


_WORKING_DIR = os.getcwd()
_SQL_TABLES = ['Test_Table1', 'Test_Table2', 'Test_Table3']
_LOAD_STORED_PROC = 'usp_load_ext_table'
_TARGET_TABLE_SCHEMA_NAME = 'dbo'

db_layout = [[sg.Text('Provide SQL Server instance details and credentials to authenticate.')],
             [sg.Text('Host Name ', size=(15, 1)), sg.Input(
                 key='-HOSTNAME-', default_text='192.168.153.128')],
             [sg.Text('Database Name ', size=(15, 1)),
              sg.Input(key='-DATABASE-', enable_events=True, default_text='TestDB')], [sg.Checkbox('Use Windows Authentication', enable_events=True, default=False, key='-USEWINAUTH-')],
             [sg.Text('User Name ', size=(15, 1), key='-USERNAMELABEL-'), sg.Input(
                 key='-USERNAME-', enable_events=True, default_text='test_login')],
             [sg.Text('Password ', size=(15, 1), key='-PASSWORDLABEL-'), sg.Input(
                 key='-PASSWORD-',  password_char='*',
                 enable_events=True, default_text='test_password')],
             [sg.Button('Validate Database Connection', key='-VALIDATE-'), sg.Text(
                 '--> This operation may take up to 1 minute', visible=True, key='_text_visible_')]
             ]

file_search_layout = [[sg.Text('Provide CSV file for upload and database table to load into.')],
                      [sg.InputText(key='-FILEPATH-', size=(55, 1)),
                       sg.FileBrowse(initial_folder=_WORKING_DIR, file_types=[("CSV Files", "*.csv")])],
                      [sg.Text('Select database table name to load text data.')],
                      [sg.Combo(_SQL_TABLES, size=(37), key='-TABLECHOICE-', readonly=True),
                      sg.Checkbox('Truncate before insert?', default=True, key='-TRUCATESOURCE-')]]


stdout_layout = [[sg.Multiline(size=(62, 10), key='-OUTPUT-')], [
    sg.Button('Validate and Submit', key='-SUBMIT-'),
    sg.Button('Clear Output', key='-CLEAR-')]]


layout = [[sg.Frame('1. Database Connection Details', db_layout,
                    title_color='blue', font='Any 12', pad=(15, 20))],
          [sg.Frame('2. File Upload Details', file_search_layout,
                    title_color='blue', font='Any 12', pad=(15, 20))],
          [sg.Frame('3. File Upload Output', stdout_layout,
                    title_color='blue', font='Any 12', pad=(15, 20))]]


def check_file_and_load(db_conn, full_path, table_dropdown_value, truncate_source_table_flag, _LOAD_STORED_PROC, _TARGET_TABLE_SCHEMA_NAME):
    file_name = Path(full_path).name
    file_path = Path(full_path).parent

    # check csv file is comma delimited
    window['-OUTPUT-'].print('Validating selected file is comma-delimited...', end='')
    with open(full_path, 'r', newline='') as f:
        reader = csv.reader(f, delimiter=",")
        dialect = csv.Sniffer().sniff(f.read(1024))
    if dialect.delimiter != ',':
        window['-OUTPUT-'].print('Failed!', text_color='red')
        return
    else:
        window['-OUTPUT-'].print('OK!')

    # check csv file is not empty
    window['-OUTPUT-'].print('Validating selected file is not empty...', end='')
    with open(full_path, 'r', newline='') as f:
        reader = csv.reader(f, dialect)
        ncols = len(next(reader))
        f.seek(0)
        nrow = len(list(reader))
    if nrow < 2:
        window['-OUTPUT-'].print('Failed!', text_color='red')
        return
    else:
        window['-OUTPUT-'].print('OK!')

    # check for database connection values correctness
    window['-OUTPUT-'].print(
        'Validating database connection details are correct...', end='')
    conn = db_conn(HOSTNAME=values['-HOSTNAME-'], DATABASE=values['-DATABASE-'],
                   USERNAME=values['-USERNAME-'],   PASSWORD=values['-PASSWORD-'], USEWINAUTH=values['-USEWINAUTH-'])
    if conn:
        window['-OUTPUT-'].print('OK!')
    else:
        window['-OUTPUT-'].print('Failed!', text_color='red')

    # check required schema exists on the target server
    window['-OUTPUT-'].print('Validating target schema existance...', end='')
    with conn.cursor() as cursor:
        sql = """SELECT TOP (1) 1 FROM {target_db}.sys.schemas 
                        WHERE name = '{target_schema}'""".format(target_db=values['-DATABASE-'], target_schema = _TARGET_TABLE_SCHEMA_NAME)
        cursor.execute(sql)
        rows = cursor.fetchone()
    if rows:
        window['-OUTPUT-'].print('OK!')
    else:
        window['-OUTPUT-'].print('Failed!', text_color='red')
        return

    # check selected table exists on the target server
    window['-OUTPUT-'].print('Validating target table existance...', end='')
    with conn.cursor() as cursor:
        sql = """SELECT TOP (1) 1 FROM {target_db}.information_schema.tables 
                        WHERE table_name = '{target_table}'""".format(target_db=values['-DATABASE-'], target_table=table_dropdown_value)
        cursor.execute(sql)
        rows = cursor.fetchone()
    if rows:
        window['-OUTPUT-'].print('OK!')
    else:
        window['-OUTPUT-'].print('Failed!', text_color='red')
        return

    # check if insert sored proc exists
    window['-OUTPUT-'].print("Validating loading stored procedure exists...", end='')
    with conn.cursor() as cursor:
        sql = """SELECT TOP (1) 1 FROM {target_db}.sys.objects WHERE type = 'P' AND OBJECT_ID = OBJECT_ID('dbo.usp_load_ext_table')""".format(
            target_db=values['-DATABASE-'])
        cursor.execute(sql)
        sp = cursor.fetchone()
    if sp:
        window['-OUTPUT-'].print('OK!')
    else:
        window['-OUTPUT-'].print('Failed!', text_color='red')
        return

    # check if number of columns matches
    window['-OUTPUT-'].print('Validating columns number match...', end='')
    with conn.cursor() as cursor:
        sql = """SELECT COUNT(1) FROM {target_db}.information_schema.columns 
                        WHERE table_name = '{target_table}'""".format(target_db=values['-DATABASE-'], target_table=table_dropdown_value)
        cursor.execute(sql)
        dbcols = cursor.fetchone()[0]
    if dbcols == ncols:
        window['-OUTPUT-'].print('OK!')
    else:
        window['-OUTPUT-'].print('Failed!', text_color='red')
        return

    window['-OUTPUT-'].print('Attempting to load {csv_file} file into {target_table} table...'.format(
        csv_file=file_name.lower(), target_table=table_dropdown_value.lower()), end='')
    with conn.cursor() as cursor:
        sql = '''\
                DECLARE @col_names VARCHAR (MAX); 
                EXEC TestDB.dbo.{sp_name} 
                @temp_target_table_name=?,
                @target_table_name=?,
                @target_table_schema_name=?, 
                @file_path=?, 
                @truncate_target_table=?,
                @col_names = @col_names OUTPUT; 
                SELECT @col_names AS col_names
                '''.format(sp_name=_LOAD_STORED_PROC)
        params = ('##'+table_dropdown_value, table_dropdown_value,
                  _TARGET_TABLE_SCHEMA_NAME, full_path, truncate_source_table_flag)         
        cursor.execute(sql, params)
        col_names = cursor.fetchval()

        if truncate_source_table_flag == 1:
            test_1_sql = 'SELECT TOP (1) 1 FROM (SELECT {cols} FROM {temp_target_table_name} EXCEPT SELECT {cols} FROM {target_table_schema_name}.{target_table_name})a'.format(
                temp_target_table_name='##'+table_dropdown_value, target_table_schema_name=_TARGET_TABLE_SCHEMA_NAME, target_table_name=table_dropdown_value, cols=col_names)
            cursor.execute(test_1_sql)
            test_1_rows = cursor.fetchone()
            test_2_sql = 'SELECT TOP (1) status FROM {temp_target_table_name}'.format(
                temp_target_table_name='##'+table_dropdown_value)
            cursor.execute(test_2_sql)
            test_2_rows = cursor.fetchone()[0]
            if test_1_rows and test_2_rows != 'SUCCESS':
                window['-OUTPUT-'].print('Failed!', text_color='red')
            else:
                window['-OUTPUT-'].print('OK!')
                return
        elif truncate_source_table_flag == 0:
            test_1_sql = 'SELECT COUNT(1) as ct FROM (SELECT {cols} FROM {temp_target_table_name} INTERSECT SELECT {cols} FROM {target_table_schema_name}.{target_table_name})a'.format(
                temp_target_table_name='##'+table_dropdown_value, target_table_schema_name=_TARGET_TABLE_SCHEMA_NAME, target_table_name=table_dropdown_value, cols=col_names)
            cursor.execute(test_1_sql)
            test_1_rows = cursor.fetchone()[0]
            test_2_sql = 'SELECT TOP (1) status FROM {temp_target_table_name}'.format(
                temp_target_table_name='##'+table_dropdown_value)
            cursor.execute(test_2_sql)
            test_2_rows = cursor.fetchone()[0]
            if nrow-1 != test_1_rows or test_2_rows != 'SUCCESS':
                window['-OUTPUT-'].print('Failed!', text_color='red')
            else:
                window['-OUTPUT-'].print('OK!')
                return
        

window = sg.Window('File Upload Utility version 1.01', layout)
while True:
    event, values = window.read()
    if event == sg.WIN_CLOSED or event == 'Exit':
        break
    elif '-USEWINAUTH-' in event:
        win_auth_setting = values['-USEWINAUTH-']
        if win_auth_setting == True:
            window['-USERNAMELABEL-'].Update(visible=False)
            window['-PASSWORDLABEL-'].Update(visible=False)
            window['-USERNAME-'].Update(visible=False)
            window['-PASSWORD-'].Update(visible=False)
        elif win_auth_setting == False:
            window['-USERNAMELABEL-'].Update(visible=True)
            window['-PASSWORDLABEL-'].Update(visible=True)
            window['-USERNAME-'].Update(visible=True)
            window['-PASSWORD-'].Update(visible=True)

    elif '-VALIDATE-' in event:
        validation = helpers.validate_db_input_values(values)
        if validation:
            error_msg = ('\nInvalid: ' + value for value in validation)
            error_message = helpers.generate_error_message(validation)
            sg.popup(error_message, keep_on_top=True)
        else:
            HOSTNAME = values['-HOSTNAME-'],
            DATABASE = values['-DATABASE-'],
            USERNAME = values['-USERNAME-'],
            PASSWORD = values['-PASSWORD-'],
            USEWINAUTH = values['-USEWINAUTH-']
            conn = helpers.db_conn(HOSTNAME=values['-HOSTNAME-'], DATABASE=values['-DATABASE-'],
                                USERNAME=values['-USERNAME-'],   PASSWORD=values['-PASSWORD-'], USEWINAUTH=values['-USEWINAUTH-'])
            if conn:
                sg.popup('Supplied credentails are valid!',
                        keep_on_top=True, title='')
            else:
                sg.popup('Supplied credentails are invalid or there is a network connection issue.',
                        keep_on_top=True, title='', button_color='red')
    elif '-SUBMIT-' in event:
        validation = helpers.validate_file_input_values(values)
        if validation:
            error_msg = ('\nInvalid: ' + value for value in validation)
            error_message = helpers.generate_error_message(validation)
            sg.popup(error_message, keep_on_top=True)
        else:
            truncate_source_table_flag = bool(values['-TRUCATESOURCE-'])
            table_dropdown_value = values['-TABLECHOICE-']
            window['-OUTPUT-'].update('')
            full_path = str(Path(values['-FILEPATH-']))
            file_name = Path(full_path).name
            file_path = Path(full_path).parent
            db_conn = helpers.db_conn
            check_file_and_load(db_conn,
                                full_path, table_dropdown_value, truncate_source_table_flag, _LOAD_STORED_PROC, _TARGET_TABLE_SCHEMA_NAME)
    elif '-CLEAR-' in event:
        window['-OUTPUT-'].update('')
window.close()

The main app.py file also import from the helper module which contains functions used for input validation, database connection and error message generation. You can find all the code used for this solution in my OneDrive folder HERE.

Finally, tying it all together is a bit of T-SQL wrapped around a stored procedure. This code takes five parameters as input values and outputs one value into the Python code defining the names of the columns of our target database table (used in Python code validation). Its primary function is to:

  • Create a global temporary table mirrored on the schema of the target table
  • Using BULK INSERT operation insert our flat file’s data into it
  • Optionally truncate target object and insert the required data from the temp table into the destination table
  • Alter temp table with the operation status i.e. Failure of Success (used in Python validation code)

The reason why a global temporary table is created and loaded into first is that we want to ensure that the CSV file’s data and schema is conforming to what the target table structure is before we make any changes to it. Having this step in place allows for an extra level of validation – if the temporary table insert fails, target table insertion code is not triggered at all. Likewise, if the temporary table with the schema identical to that of the destination table is created and populated successfully, we can be confident that the target table’s data can be purged (as denoted by one of the parameter’s value) and loaded into without any issues. The full T-SQL code is as follows:

USE [TestDB];
GO
SET ANSI_NULLS ON;
GO
SET QUOTED_IDENTIFIER ON;
GO
CREATE PROCEDURE [dbo].[usp_load_ext_table]
(
    @temp_target_table_name VARCHAR(256),
    @target_table_name VARCHAR(256),
    @target_table_schema_name VARCHAR(256),
    @file_path VARCHAR(1024),
    @truncate_target_table BIT,
    @col_names VARCHAR(MAX) OUTPUT
)
AS
BEGIN
    SET NOCOUNT ON;
    DECLARE @cols VARCHAR(MAX);
    DECLARE @sql VARCHAR(MAX);
    DECLARE @error_message VARCHAR(MAX);

    SELECT @sql
        = 'DROP TABLE IF EXISTS ' + @temp_target_table_name + '; CREATE TABLE ' + @temp_target_table_name + ' ('
          + o.list + ')',
           @cols = j.list
    FROM sys.tables t
        CROSS APPLY
    (
        SELECT STUFF(
               (
                   SELECT ',' + QUOTENAME(c.COLUMN_NAME) + ' ' + c.DATA_TYPE
                          + CASE c.DATA_TYPE
                                WHEN 'sql_variant' THEN
                                    ''
                                WHEN 'text' THEN
                                    ''
                                WHEN 'ntext' THEN
                                    ''
                                WHEN 'xml' THEN
                                    ''
                                WHEN 'decimal' THEN
                                    '(' + CAST(c.NUMERIC_PRECISION AS VARCHAR) + ', '
                                    + CAST(c.NUMERIC_SCALE AS VARCHAR) + ')'
                                ELSE
                                    COALESCE(   '(' + CASE
                                                          WHEN c.CHARACTER_MAXIMUM_LENGTH = -1 THEN
                                                              'MAX'
                                                          ELSE
                                                              CAST(c.CHARACTER_MAXIMUM_LENGTH AS VARCHAR)
                                                      END + ')',
                                                ''
                                            )
                            END
                   FROM INFORMATION_SCHEMA.COLUMNS c
                       JOIN sysobjects o
                           ON c.TABLE_NAME = o.name
                   WHERE c.TABLE_NAME = @target_table_name
                         AND TABLE_NAME = t.name
                   ORDER BY ORDINAL_POSITION
                   FOR XML PATH('')
               ),
               1,
               1,
               ''
                    )
    ) o(list)
        CROSS APPLY
    (
        SELECT STUFF(
               (
                   SELECT ',' + QUOTENAME(c.COLUMN_NAME)
                   FROM INFORMATION_SCHEMA.COLUMNS c
                       JOIN sysobjects o
                           ON c.TABLE_NAME = o.name
                   WHERE c.TABLE_NAME = @target_table_name
                   ORDER BY ORDINAL_POSITION
                   FOR XML PATH('')
               ),
               1,
               1,
               ''
                    )
    ) j(list)
    WHERE t.name = @target_table_name;

    EXEC (@sql);
    SET @col_names = @cols;

    IF NOT EXISTS
    (
        SELECT *
        FROM tempdb.INFORMATION_SCHEMA.TABLES
        WHERE TABLE_NAME = @temp_target_table_name
    )
    BEGIN
        SET @error_message
            = 'Global temporary placeholder table has not been successfully created in the tempdb database. Please troubleshoot.';
        RAISERROR(   @error_message, -- Message text.
                     16,             -- Severity.
                     1               -- State.
                 );
        RETURN;
    END;

    SET @sql
        = '
			BULK INSERT ' + @temp_target_table_name + '
			FROM ' + QUOTENAME(@file_path, '''')
          + '
			WITH
			(
				FIRSTROW = 2,
				FIELDTERMINATOR = '','',
				ROWTERMINATOR = ''\n'',
				MAXERRORS=0,
				TABLOCK
			)';
    EXEC (@sql);

    SET @sql = CASE
                   WHEN @truncate_target_table = 1 THEN
                       'TRUNCATE TABLE ' + @target_table_schema_name + '.' + @target_table_name + '; '
                   ELSE
                       ''
               END + 'INSERT INTO ' + @target_table_schema_name + '.' + @target_table_name + ' (' + @cols + ') ';
    SET @sql = @sql + 'SELECT ' + @cols + ' FROM ' + @temp_target_table_name + '';
    BEGIN TRANSACTION;
    BEGIN TRY
        EXEC (@sql);
        SET @sql
            = 'ALTER TABLE ' + @temp_target_table_name + ' ADD status VARCHAR(56) DEFAULT ''FAILURE'', ' + CHAR(13);
        SET @sql = @sql + 'status_message varchar(256) NULL; ' + CHAR(13);
        EXEC (@sql);
        SET @sql = 'UPDATE ' + @temp_target_table_name + ' SET status = ''SUCCESS'', ' + CHAR(13);
        SET @sql = @sql + 'status_message = ''Operation executed successfully!''' + CHAR(13);
        EXEC (@sql);
    END TRY
    BEGIN CATCH
        SET @error_message = ERROR_MESSAGE();
        SET @sql
            = 'ALTER TABLE ' + @temp_target_table_name + ' ADD outcome varchar (56) DEFAULT ''FAILURE'',' + CHAR(13);
        SET @sql = @sql + 'outcome_message varchar(256) NULL; ' + CHAR(13);
        EXEC (@sql);
        SET @sql = 'UPDATE ' + @temp_target_table_name + ' SET outcome_message = ''' + @error_message + '''';
        EXEC (@sql);
        IF @@TRANCOUNT > 0
            ROLLBACK TRANSACTION;
    END CATCH;

    IF @@TRANCOUNT > 0
        COMMIT TRANSACTION;
END;

One drawback of this code is that it assumes the nominated flat file is located on the same server as our SQL Server instance – the above SQL code uses T-SQL BULK INSERT operation which expects access to the underlying file system and the file itself. This limitation can be alleviated by reading the file’s content into a dataframe and populating the target table using, for example, Python’s Pandas library instead. However, in this instance, I expected end users to have access to the system hosting MSSQL instance so all ‘heavy lifting’ i.e. data insertion is done using pure T-SQL.

Streamlit

As desktop GUI apps only work in certain scenarios, leveraging the previously created code (both T-SQL and Python) we can build a Web front end instead. Streamlit, an open-source Python app framework, allows anyone to transform scripts into sharable web apps, all without the prior knowledge of CSS, HTML or JavaScript. It’s a purely Python-based framework (though some customizations can be achieved with a sprinkling of HTML), claimed to be used by over 80% of Fortune 50 companies and with the backing of Snowflake (which recently acquired it for $800 million), is set to democratize access to data. Streamlit boasts a large gallery of apps, most of them with source code, so it’s relatively easy to explore its capabilities and poke around ‘under the hood’. It also offers Community Cloud where one can deploy, manage, and share apps with the world, directly from Streamlit — all for free. To pip-install it into you default Python interpreter or a virtual environment simply run the following:

pip install streamlit

To spin up a local web server and run the app in your default web browser you can run the following from the command line:

streamlit run your_script.py [-- script args]

For this Streamlit app, I tried to keep the interface layout largely unchanged from our previous PySimpleGUI app even though I was tempted to add a bit of eye-candy to the page – Streamlit has a pretty good integrations with some of the major plotting and visualization Python packages so adding a widget or two would be very easy and make the whole app a lot more appealing. The draft version of the app is as per the image below.

Most of the code changes relate to how Streamlit API works, and the components required to replicate the GUI app functionality. This means that any Python code handling the logic was mostly left unchanged – a testament to how easy both Streamlit and PySimpleGUI are to get up and running with. The below snippet of Python is used to build this tiny Streamlit app.

import streamlit as st
import tempfile
import csv
import os
from pathlib import Path
import helpers as helpers

_SQL_TABLES = ['Test_Table1', 'Test_Table2', 'Test_Table3']
_LOAD_STORED_PROC = 'usp_load_ext_table'
_TARGET_TABLE_SCHEMA_NAME = 'dbo'

st.header('Database Connection Details')
col1, col2 = st.columns([1, 1])
host_name = col1.text_input('Host Name', value='192.168.153.128')
db_name = col1.text_input('Database Name', value='TestDB')
win_auth = col1.checkbox('Use Windows Authentication', value=False)
validate = col1.button('Validate Database Connection')


user_name = col2.text_input('User Name', value='test_login')
password = col2.text_input('Password', type='password', value='test_password')

if validate:
    validation = helpers.validate_db_input_values(
        host_name, db_name, user_name, password)
    if validation:
        error_message = helpers.generate_error_message(validation)
        st.error(error_message)
    else:
        conn = helpers.db_conn(HOSTNAME=host_name, DATABASE=db_name,
                               USERNAME=user_name,   PASSWORD=password, USEWINAUTH=win_auth)
        if conn:
            st.success('Supplied credentails are valid.')
        else:
            st.error(
                'Supplied credentials are invalid or there is a network connection issue!')


st.header('File Upload Details')
uploaded_file = st.file_uploader(
    label='Provide CSV file for upload', type=['.csv'])
table_dropdown_value = st.selectbox(
    'Select database table name to load text data', options=_SQL_TABLES)
truncate_source_table = st.checkbox('Truncate source table?', value=True)

st.header('File Upload Output')
logtxtbox = st.empty()
logtxt = ''
logtxtbox.text_area("Log: ", logtxt, height=200, key='fdgfgjjj')
upload_status = st.button('Validate and Submit')


def check_file_and_load(db_conn, full_path, table_dropdown_value, truncate_source_table_flag, _LOAD_STORED_PROC, _TARGET_TABLE_SCHEMA_NAME):
    logtxt = 'Validating selected file is comma-delimited...'
    logtxtbox.text_area("Log: ", logtxt, height=200)

    # check csv file is comma delimited
    with open(full_path, 'r', newline='') as f:
        reader = csv.reader(f, delimiter=",")
        dialect = csv.Sniffer().sniff(f.read(1024))
    if dialect.delimiter != ',':
        logtxt = ''.join((logtxt, 'Failed!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
        return
    else:
        logtxt = ''.join((logtxt, 'OK!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)

    # check csv file is not empty
    logtxt = ''.join((logtxt, 'Validating selected file is not empty...'))
    with open(full_path, 'r', newline='') as f:
        reader = csv.reader(f, dialect)
        ncols = len(next(reader))
        f.seek(0)
        nrow = len(list(reader))
    if nrow < 2:
        logtxt = ''.join((logtxt, 'Failed!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
        return
    else:
        logtxt = ''.join((logtxt, 'OK!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)

    # check for database connection values correctness
    logtxt = ''.join(
        (logtxt, 'Validating database connection details are correct...'))
    conn = db_conn(host_name, db_name, user_name, password, win_auth)
    if conn:
        logtxt = ''.join((logtxt, 'OK!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
    else:
        logtxt = ''.join((logtxt, 'Failed!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
        return

    # check required schema exists on the target server
    logtxt = ''.join((logtxt, 'Validating target schema existance...'))
    with conn.cursor() as cursor:
        sql = """SELECT TOP (1) 1 FROM {target_db}.sys.schemas 
                        WHERE name = '{target_schema}'""".format(target_db=db_name, target_schema=_TARGET_TABLE_SCHEMA_NAME)
        cursor.execute(sql)
        rows = cursor.fetchone()
    if rows:
        logtxt = ''.join((logtxt, 'OK!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
    else:
        logtxt = ''.join((logtxt, 'Failed!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
        return

    # check selected table exists on the target server
    logtxt = ''.join((logtxt, 'Validating target table existance...'))
    with conn.cursor() as cursor:
        sql = """SELECT TOP (1) 1 FROM {target_db}.information_schema.tables 
                        WHERE table_name = '{target_table}'""".format(target_db=db_name, target_table=table_dropdown_value)
        cursor.execute(sql)
        rows = cursor.fetchone()
    if rows:
        logtxt = ''.join((logtxt, 'OK!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
    else:
        logtxt = ''.join((logtxt, 'Failed!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
        return

    # check if insert sored proc exists
    logtxt = ''.join((logtxt, 'Validating loading stored procedure exists...'))
    with conn.cursor() as cursor:
        sql = """SELECT TOP (1) 1 FROM {target_db}.sys.objects WHERE type = 'P' AND OBJECT_ID = OBJECT_ID('dbo.usp_load_ext_table')""".format(
            target_db=db_name)
        cursor.execute(sql)
        sp = cursor.fetchone()
    if sp:
        logtxt = ''.join((logtxt, 'OK!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
    else:
        logtxt = ''.join((logtxt, 'Failed!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
        return

    # check if number of columns matches
    logtxt = ''.join((logtxt, 'Validating columns number match...'))
    with conn.cursor() as cursor:
        sql = """SELECT COUNT(1) FROM {target_db}.information_schema.columns 
                        WHERE table_name = '{target_table}'""".format(target_db=db_name, target_table=table_dropdown_value)
        cursor.execute(sql)
        dbcols = cursor.fetchone()[0]
    if dbcols == ncols:
        logtxt = ''.join((logtxt, 'OK!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
    else:
        logtxt = ''.join((logtxt, 'Failed!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
        return
    logtxt = ''.join((logtxt, 'Attempting to load...'))
    with conn.cursor() as cursor:
        sql = '''\
                DECLARE @col_names VARCHAR (MAX); 
                EXEC TestDB.dbo.{sp_name} 
                @temp_target_table_name=?,
                @target_table_name=?,
                @target_table_schema_name=?, 
                @file_path=?, 
                @truncate_target_table=?,
                @col_names = @col_names OUTPUT; 
                SELECT @col_names AS col_names
                '''.format(sp_name=_LOAD_STORED_PROC)
        params = ('##'+table_dropdown_value, table_dropdown_value,
                  _TARGET_TABLE_SCHEMA_NAME, full_path, truncate_source_table_flag)
        cursor.execute(sql, params)
        col_names = cursor.fetchval()

        if truncate_source_table_flag == 1:
            test_1_sql = 'SELECT TOP (1) 1 FROM (SELECT {cols} FROM {temp_target_table_name} EXCEPT SELECT {cols} FROM {target_table_schema_name}.{target_table_name})a'.format(
                temp_target_table_name='##'+table_dropdown_value, target_table_schema_name=_TARGET_TABLE_SCHEMA_NAME, target_table_name=table_dropdown_value, cols=col_names)
            cursor.execute(test_1_sql)
            test_1_rows = cursor.fetchone()
            test_2_sql = 'SELECT TOP (1) status FROM {temp_target_table_name}'.format(
                temp_target_table_name='##'+table_dropdown_value)
            cursor.execute(test_2_sql)
            test_2_rows = cursor.fetchone()[0]
            if test_1_rows and test_2_rows != 'SUCCESS':
                st.error('File failed to load successfuly, please troubleshoot!')
                return
            else:
                st.success('File loaded successfully!')
        elif truncate_source_table_flag == 0:
            test_1_sql = 'SELECT COUNT(1) as ct FROM (SELECT {cols} FROM {temp_target_table_name} INTERSECT SELECT {cols} FROM {target_table_schema_name}.{target_table_name})a'.format(
                temp_target_table_name='##'+table_dropdown_value, target_table_schema_name=_TARGET_TABLE_SCHEMA_NAME, target_table_name=table_dropdown_value, cols=col_names)
            cursor.execute(test_1_sql)
            test_1_rows = cursor.fetchone()[0]
            test_2_sql = 'SELECT TOP (1) status FROM {temp_target_table_name}'.format(
                temp_target_table_name='##'+table_dropdown_value)
            cursor.execute(test_2_sql)
            test_2_rows = cursor.fetchone()[0]
            if nrow-1 != test_1_rows or test_2_rows != 'SUCCESS':
                st.error('File failed to load successfuly, please troubleshoot!')
                return
            else:
                st.success('File loaded successfully!')


if upload_status:
    validation = helpers.validate_db_input_values(
        host_name, db_name, user_name, password)
    if validation:
        error_message = helpers.generate_error_message(validation)
        st.error(error_message)
    if uploaded_file is not None:
        with tempfile.NamedTemporaryFile(delete=False, dir='.', suffix='.csv') as f:
            f.write(uploaded_file.getbuffer())
            fp = Path(f.name)
            full_path = str(Path(f.name))
            file_name = Path(full_path).name
            file_path = Path(full_path).parent
        db_conn = helpers.db_conn
        check_file_and_load(db_conn, full_path, table_dropdown_value,
                            truncate_source_table, _LOAD_STORED_PROC, _TARGET_TABLE_SCHEMA_NAME)
    else:
        st.error('Please select at least one flat file for upload!')

Conclusion

Python has brought a large number of people into the programming community. The number of programs and the range of areas it touches is mindboggling. But more often than not, these technologies are out of reach of all but a handful of people. Most Python programs are “command line” based. This isn’t a problem for programmer-types as we’re all used to interacting with computers through a text interface. While programmers don’t have a problem with command-line interfaces, most “normal people” do. This creates a digital divide, a “GUI Gap”. Adding a GUI or a Web front end to a program opens that program up to a wider audience as it instantly becomes more approachable. Visual interfaces can also make interacting with some programs easier, even for those that are comfortable with a command-line interface.

Both PySimpleGUI and Streamlit provide an easy entry barrier for anyone, even Python novices, and democratise Web and GUI development. For simple projects, these frameworks allow citizen developers to rapidly create value and solve business problems without needing to understand complex technologies and tools. And whilst more critical LOB (Line of Business) apps will demand the use of more advanced technologies in foreseeable future, for small projects requiring simple interfaces and business logic, PySimpleGUI and Streamlit provide a ton of immediate value. Alternatively, one can go even further down the simplification route and try something like Gooey which conveniently converts (almost) any Python 3 console program into a GUI application with a single line of code!

Tags: , , , , ,

Kicking the tires On tSQLt – An Open-Source Unit Testing Framework for SQL Server Data Validation and Quality Assurance

December 1st, 2022 / No Comments » / by admin

Introduction

The importance of unit testing does not have to be conveyed to most database developers and data engineers – it’s common knowledge that testing should be an integral part of any project which relies on data quality and integrity. However, in reality, testing is often delegated to a pool of activities which are considered too time consuming and not directly contributing to a project success. Just like documentation, testing is often an afterthought even though it practice is saves money and time and can be reliably integrated into DevOps pipelines (continuous testing) alongside continuous delivery and continuous integration.

Unit testing is a software testing method which aims to test a discrete piece of code. The word “unit” refers to the smallest piece of code that can be tested separately, for example, a function or a procedure that can be tested in isolation. In a database solution, the “unit” is typically a stored procedure, a trigger or a user-defined function. Each test must verify one condition at a time. Tests should reflect software requirements. If we do so we will get a series of discrete tests by each unit. Verifying one condition at a time, it will be possible to identify which conditions have not been verified and which ones meet the requirements, confirming whether all the individual parts work.

tSQLt is a powerful, open-source framework for SQL Server unit testing and it has become a vital part of the modern database development approach. It’s free to use, integrates well into Microsoft SQL Server landscape and does not require developers to learn other programming languages or platforms. Unit tests are automatically executed in the transaction log – we don’t need any data clean-up work after the unit tests because every data manipulation process is rolled back. Additionally, tSQLt allows using mocked (fake) objects (a mocked object simulates the real object’s behavior so the tested objects do not affect other dependencies) and can be integrated into SSDT projects or 3rd party software. There are other unit testing frameworks out there which offer more functionality and wider use case coverage e.g. the GreatExpectations, however, these often rely on extensive configuration stored in JSON and YAML files, Python interpreter with additional dependencies and libraries installed and familiarity with Python programming concepts. This makes tSQLt a great, self-contained, SQL-based tool which can be easily integrated into most workflows with minimal fuss.

In this post I’ll outline how to approach unit testing of a Data Warehouse built on Microsoft SQL Server platform and outline some of the strategies for data validation and QA.

Environment Prep and tSQLt framework installation

Setting up tSQLt is pretty straightforward but to demonstrate its functionality on a sample database, we will start with downloading the venerable WideWorldImporters (WWI) database backup file for Azure SQL DB and deploying it in Azure – this will be our source. Additionally, we will also download and deploy WideWorldImporters SQL Server 2016 (OLTP) and data warehouse (OLAP) database backups and restore those in our target environment. The link to the aforementioned files on the Microsoft’s GitHub repo is HERE. I won’t go into the process of restoring individual databases on both environments in this post but when finished, we should have the sample WWI SQL DB in Azure in both – Azure and the local environment, mirroring our source and target, as well as WWI DW database in a local environment only. The two can be interfaced using a Linked Server functionality so that we can compare our source and target as part of our data acquisition unit testing.

We will install tSQLt is in the WWI DW database for this demo. I’d personally prefer to have a logical isolation across the suite of tests and DW objects and place tSQLt in its own “workspace” but in this case we’re following best practices and tSQLt schema with all its associated objects will be co-located with the DW tables and stored procedures.

Installing tSQLt is comprised of only a few steps:

  • Download the latest version of tSQLt framework and extract the zipped-up file content
  • Execute PrepareServer.sql file
  • Execute tSQLt.class.sql

Once we have the framework in place we can create a Linked Server connection to the Azure-hosted database. With this setup, we now have an environment which resembles some of the staple Microsoft SQL Server deployments – WWI Azure SQL DB (OLTP) serving as our remote data source and WWI and WWI DW database (local) serving as our staging/landing and data warehouse databases.

Testing Framework Architecture Components

The following diagram (click on image to enlarge) outlines basic architecture components behind the collection of tests, in tSQLt also known as classes. Is identifies four principal domains which provide a logical container for a suite of tests:

  • Test Landing zone objects’ existence
  • Test DW zone objects’ existence
  • Test primary keys across Landing DW zones align and match – denotes all data has been successfully acquired
  • Test business transformation rules

tSQLt comes with a plethora of other assertions and expectations-based features but in this instance, we will limit the scope to the above four applications. The four classes (implemented as schemas) we will generate will fulfill the following testing functions:

_wwi_local_objects – this test class generates assertion-based stored procedures for testing WWI OLTP database objects existence in the local environment. Each individual stored procedure is created by a “parent“ create_landing_objects_tests stored procedure which loops over objects stored in Azure-provisioned WWI database table to source the external metadata and compare it to the local version. It uses tSQLt.AssertObjectExists tSQLt built-in assertion and throws an error (test failed output) if expected local object name is not present in the target database. Typically, this information should come from a dedicated metadata database which stores more than just object names, but for simplicity, in this case it’s a simple SELECT over the Linked Server pointing to Azure SQL DB.

/*
1. Create _wwi_local_objects test class and associated stored procedures
*/
EXEC tSQLt.NewTestClass '_wwi_local_objects';
GO

CREATE OR ALTER PROCEDURE _wwi_local_objects.[create_wwi_local_objects_tests]
AS
BEGIN
    DECLARE @Local_Object_Name VARCHAR(256);
	DECLARE @Local_Schema_Name VARCHAR (56);
	DECLARE @Local_DB_Name VARCHAR (256)
    DECLARE @SQL NVARCHAR(MAX);
    DECLARE db_cursor CURSOR FOR
    SELECT *
    FROM OPENQUERY([WWI], 'SELECT	TRIM(LOWER(TABLE_NAME)), 
									TRIM(LOWER(TABLE_SCHEMA)), 
									TRIM(LOWER(TABLE_CATALOG))
							FROM WideWorldImporters.INFORMATION_SCHEMA.TABLES')
    OPEN db_cursor;
    FETCH NEXT FROM db_cursor
    INTO @Local_Object_Name, @Local_Schema_Name, @Local_DB_Name
    WHILE @@FETCH_STATUS = 0
    BEGIN
        SET @SQL = N'CREATE OR ALTER PROCEDURE _wwi_local_objects.[test that ' + @Local_Object_Name + ' object exists] AS ' +CHAR (13)
        SET @SQL = @SQL + N'BEGIN '																					+CHAR (13)
		SET @SQL = @SQL + 'EXEC tSQLt.AssertObjectExists '															+CHAR (13)
		SET @SQL = @SQL + '@objectName = '''+ @Local_DB_Name+'.'+ @Local_Schema_Name+'.'+ @Local_Object_Name+''' '	+CHAR (13)
		SET @SQL = @SQL + ',@message = ''Nominated object '''''+@Local_Object_Name+''''' cannot be found in '		+CHAR (13)
		SET @SQL = @SQL + ''''''+@Local_Schema_Name+''''' schema and '''''+@Local_DB_Name+''''' database.'' '		+CHAR (13)
		SET @SQL = @SQL + 'END';
		EXEC (@SQL)
        FETCH NEXT FROM db_cursor
        INTO @Local_Object_Name, @Local_Schema_Name, @Local_DB_Name;
    END;

    CLOSE db_cursor;
    DEALLOCATE db_cursor;
END;
GO

EXEC  _wwi_local_objects.[create_wwi_local_objects_tests]
GO
--EXEC tSQLt.Run '_wwi_local_objects'

_dw_objects – this test class generates assertion-based stored procedures for testing WWI DW objects existence. Each individual stored procedure is created by a “parent“ create_dw_objects_tests stored procedure which loops over WWI DW metadata output. It uses tSQLt.AssertObjectExists tSQLt built-in assertion and throws an error (test failed output) if expected WWI DW object name is not present in the target database. Just as the previous class, this information should be coming from a dedicated metadata catalog and not system views but for this demo, it’s the simplest possible configuration we’re after.

/*
2. Create _wwidw_objects test class and associated stored procedures
*/
EXEC tSQLt.NewTestClass '_wwidw_objects';
GO

CREATE OR ALTER PROCEDURE _wwidw_objects.[create_dw_objects_tests]
AS
BEGIN
    DECLARE @Local_Object_Name VARCHAR(256);
	DECLARE @Local_Schema_Name VARCHAR (56);
	DECLARE @Local_DB_Name VARCHAR (256)
    DECLARE @SQL NVARCHAR(MAX);
    DECLARE db_cursor CURSOR FOR
    SELECT TRIM(LOWER(TABLE_CATALOG)),
			TRIM(LOWER(TABLE_SCHEMA)),
			TRIM(LOWER(TABLE_NAME))
	FROM WideWorldImportersDW.INFORMATION_SCHEMA.TABLES
    OPEN db_cursor;
    FETCH NEXT FROM db_cursor
    INTO   @Local_DB_Name, @Local_Schema_Name,@Local_Object_Name
    WHILE @@FETCH_STATUS = 0
    BEGIN
        SET @SQL = N'CREATE OR ALTER PROCEDURE _wwidw_objects.[test that ' + @Local_Object_Name + ' object exists] AS ' +CHAR (13)
        SET @SQL = @SQL + N'BEGIN '																					+CHAR (13)
		SET @SQL = @SQL + 'EXEC tSQLt.AssertObjectExists '															+CHAR (13)
		SET @SQL = @SQL + '@objectName = '''+ @Local_DB_Name+'.'+ @Local_Schema_Name+'.'+ @Local_Object_Name+''' '	+CHAR (13)
		SET @SQL = @SQL + ',@message = ''Nominated object '''''+@Local_Object_Name+''''' cannot be found in '		+CHAR (13)
		SET @SQL = @SQL + ''''''+@Local_Schema_Name+''''' schema and '''''+@Local_DB_Name+''''' database.'' '		+CHAR (13)
		SET @SQL = @SQL + 'END';
		EXEC (@SQL)
        FETCH NEXT FROM db_cursor
        INTO @Local_DB_Name, @Local_Schema_Name,@Local_Object_Name
    END;

    CLOSE db_cursor;
    DEALLOCATE db_cursor;
END;
GO

EXEC  _wwidw_objects.[create_dw_objects_tests]
GO
--EXEC tSQLt.Run '_wwidw_objects'

_oltp_to_dw_pks – this test class generates assertion-based stored procedures for testing source (Azure hosted) to local WWI database primary key values comparison. It uses metadata extracted from SQL Server system views to source primary key(s) for each table and tSQLt.AssertEqualsTable built-in assertion to compare their values for each object nominated object.

/*
3. Create _wwi_remote_to_wwi_local_pks test class and associated stored procedures
*/
EXEC tSQLt.NewTestClass '_wwi_remote_to_wwi_local_pks';
GO

CREATE OR ALTER PROCEDURE _wwi_remote_to_wwi_local_pks.[create_wwi_remote_to_wwi_local_objects_pks_tests]
AS
BEGIN
	DECLARE @Error_Message NVARCHAR (MAX)

	DECLARE @SQL NVARCHAR(MAX)
	DECLARE @Source_Server_Name NVARCHAR (1024) = 'WWI'
	DECLARE @Source_DB_Name NVARCHAR (1024) = 'WideWorldImporters'
	DECLARE @Source_Schema_Name NVARCHAR (1024) = 'WideWorldImporters'
	DECLARE @Local_Schema_Name NVARCHAR (1024)=  'dbo'
	DECLARE @Local_DB_Name VARCHAR (1024) = 'WideWorldImporters'


	IF OBJECT_ID('tempdb..##exclude_tables') IS NOT NULL
	BEGIN
	    DROP TABLE ##exclude_tables;
	END;
	CREATE TABLE ##exclude_tables
	(
	    table_name VARCHAR(256)
	);
	INSERT INTO ##exclude_tables
	(
	    table_name
	)
	SELECT 'ColdRoomTemperatures'
	UNION ALL
	SELECT 'VehicleTemperatures';
	
	
	
	IF OBJECT_ID('tempdb..#temp_oltp_to_dw_metadata') IS NOT NULL
	BEGIN
	    DROP TABLE #temp_oltp_to_dw_metadata;
	END;
	CREATE TABLE #temp_oltp_to_dw_metadata
	(
	    [id] INT IDENTITY(1, 1),
	    [object_name] VARCHAR(256) NOT NULL,
	    [schema_name] VARCHAR(256) NOT NULL,
	    [db_name] VARCHAR(256) NOT NULL,
	    [primary_keys] VARCHAR(256) NOT NULL,
	    [origin] VARCHAR(56) NOT NULL
	);
	SET @SQL =			'WITH temp_data AS (SELECT * '
	SET @SQL = @SQL +	'FROM OPENQUERY(' + @Source_Server_Name + ',''SELECT '  												+CHAR(13)			
	SET @SQL = @SQL +	't.name as table_name, c.name AS column_name, ss.name as schema_name, '									+CHAR(13)
	SET @SQL = @SQL +	'tp.name AS data_type ,c.max_length AS character_maximum_length, '										+CHAR(13)						
	SET @SQL = @SQL +	'CASE WHEN indx.object_id IS NULL THEN 0 ELSE 1 END AS ''''is_primary_key''''	'						+CHAR(13)
	SET @SQL = @SQL +	'FROM sys.tables t'																						+CHAR(13)
	SET @SQL = @SQL +	'JOIN sys.columns c ON t.object_id = c.object_id'														+CHAR(13)
	SET @SQL = @SQL +	'JOIN sys.types tp ON c.user_type_id = tp.user_type_id'													+CHAR(13)
	SET @SQL = @SQL +	'JOIN sys.objects so ON so.object_id = t.object_id'														+CHAR(13)
	SET @SQL = @SQL +	'JOIN sys.schemas ss ON so.schema_id = ss.schema_id'													+CHAR(13)
	SET @SQL = @SQL +	'LEFT JOIN (SELECT	ic.object_id, ic.column_id '														+CHAR(13)
	SET @SQL = @SQL +	'FROM sys.indexes AS i '																				+CHAR(13)
	SET @SQL = @SQL +	'INNER JOIN sys.index_columns AS ic '																	+CHAR(13)
	SET @SQL = @SQL +	'ON i.OBJECT_ID = ic.OBJECT_ID AND i.index_id = ic.index_id '											+CHAR(13)
	SET @SQL = @SQL +	'WHERE   i.is_primary_key = 1) indx ON so.object_id = indx.object_id AND c.column_id = indx.column_id '	+CHAR(13)
	SET @SQL = @SQL +	'WHERE t.is_memory_optimized <> 1 AND tp.is_user_defined = 0 '											+CHAR(13)
	SET @SQL = @SQL +	'AND t.type = ''''u'''''' ) a) ' 																		+CHAR(13)
	SET @SQL = @SQL +   'INSERT INTO #temp_oltp_to_dw_metadata (db_name,schema_name,object_name,primary_keys,origin)'			+CHAR(13)
	SET @SQL = @SQL +	'SELECT '''+@Source_DB_Name+''' as remote_db_name, '													+CHAR(13)													
	SET @SQL = @SQL +   'schema_name AS remote_schema_name, table_name AS remote_table_name '									+CHAR(13)
	SET @SQL = @SQL +	', STRING_AGG(column_name, '','') WITHIN GROUP (ORDER BY column_name ASC) AS primary_keys '				+CHAR(13)
	SET @SQL = @SQL +	', ''Remote'' as Origin'																				+CHAR(13)
	SET @SQL = @SQL +	'FROM temp_data WHERE is_primary_key = 1 '																+CHAR(13)
	SET @SQL = @SQL +   'AND  table_name COLLATE DATABASE_DEFAULT NOT IN (SELECT table_name FROM ##exclude_tables) '			+CHAR(13)
	SET @SQL = @SQL +   'GROUP BY table_name, schema_name'																		+CHAR(13)

	EXEC(@SQL)

	SET @SQL = ''		
	SET @SQL = @SQL +	'WITH temp_data AS (SELECT * FROM (SELECT t.name as table_name, ss.name as schema_name, '				+CHAR(13)
	SET @SQL = @SQL +	'c.name AS column_name, tp.name AS data_type,'															+CHAR(13)
	SET @SQL = @SQL +	'c.max_length AS character_maximum_length, CASE WHEN indx.object_id IS NULL '							+CHAR(13)
	SET @SQL = @SQL +	'THEN 0 ELSE 1 END AS ''is_primary_key'''																+CHAR(13)
	SET @SQL = @SQL +	'FROM    '+@Local_DB_Name+'.sys.tables t'																+CHAR(13)
	SET @SQL = @SQL +	'JOIN '+@Local_DB_Name+'.sys.columns c ON t.object_id = c.object_id '									+CHAR(13)
	SET @SQL = @SQL +	'JOIN '+@Local_DB_Name+'.sys.types tp ON c.user_type_id = tp.user_type_id '								+CHAR(13)
	SET @SQL = @SQL +	'JOIN '+@Local_DB_Name+'.sys.objects so ON so.object_id = t.object_id '									+CHAR(13)
	SET @SQL = @SQL +	'JOIN '+@Local_DB_Name+'.sys.schemas ss ON so.schema_id = ss.schema_id '								+CHAR(13)
	SET @SQL = @SQL +	'LEFT JOIN (SELECT	ic.object_id, ic.column_id '														+CHAR(13)
	SET @SQL = @SQL +	'FROM '+@Local_DB_Name+'.sys.indexes AS i INNER JOIN '+@Local_DB_Name+'.sys.index_columns AS ic ON '	+CHAR(13)
	SET @SQL = @SQL +	'i.OBJECT_ID = ic.OBJECT_ID AND i.index_id = ic.index_id '												+CHAR(13)
	SET @SQL = @SQL +	'WHERE   i.is_primary_key = 1) indx ON so.object_id = indx.object_id AND c.column_id = indx.column_id '	+CHAR(13)
	SET @SQL = @SQL +	'WHERE t.type = ''u'' AND t.is_memory_optimized <> 1 AND tp.is_user_defined = 0)a)'						+CHAR(13)
	SET @SQL = @SQL +   'INSERT INTO #temp_oltp_to_dw_metadata (db_name,schema_name,object_name,primary_keys,origin)'			+CHAR(13)
	SET @SQL = @SQL +	'SELECT '''+@Local_DB_Name+''' as db_name, '															+CHAR(13)													
	SET @SQL = @SQL +   'schema_name AS schema_name, table_name AS object_name '												+CHAR(13)
	SET @SQL = @SQL +	', STRING_AGG(column_name, '','') WITHIN GROUP (ORDER BY column_name ASC) AS primary_keys, '			+CHAR(13)
	SET @SQL = @SQL +	'''Local'' as Origin'																					+CHAR(13)
	SET @SQL = @SQL +	'FROM temp_data '																						+CHAR(13)
	SET @SQL = @SQL +	'WHERE table_name COLLATE DATABASE_DEFAULT NOT IN (SELECT table_name FROM ##exclude_tables) '			+CHAR(13)
	SET @SQL = @SQL +   'AND is_primary_key = 1 GROUP BY table_name, schema_name'												+CHAR(13)			
	
	EXEC(@SQL)

	IF EXISTS
        (
            SELECT a.[object_name], a.primary_keys
            FROM #temp_oltp_to_dw_metadata a
            WHERE a.[Origin] = 'Remote'
			EXCEPT
			SELECT b.[object_name], b.primary_keys
            FROM #temp_oltp_to_dw_metadata b
            WHERE b.[Origin] = 'Local'
        )
		AND NOT EXISTS (SELECT TOP (1) 1 FROM #temp_oltp_to_dw_metadata)
        BEGIN
            SET @Error_Message
                = 'Tables and their corresponding primary keys cannot be reconciled between "' + @Local_DB_Name
                  + '" database and "' + @Local_DB_Name
                  + '" database. Please troubleshoot!';
            RAISERROR(   @Error_Message, -- Message text.
                         16,             -- Severity.
                         1               -- State.
                     );
            RETURN;
        END;
	
	DECLARE @object_name VARCHAR(256);
	DECLARE @schema_name VARCHAR (56);
	DECLARE @db_name VARCHAR (256);
	DECLARE @primary_keys VARCHAR (1024);
	DECLARE @Object_Data_Percentage VARCHAR(3) = '100'

    DECLARE db_cursor CURSOR FOR 
    SELECT [db_name], [schema_name], [Object_Name], [Primary_Keys]
	FROM #temp_oltp_to_dw_metadata
	WHERE [Origin] = 'remote'

    OPEN db_cursor;
    FETCH NEXT FROM db_cursor
    INTO   @db_name, @schema_name,@object_name, @Primary_keys
    WHILE @@FETCH_STATUS = 0
    BEGIN
		SET @SQL = ''
        SET @SQL = @SQL + 'CREATE OR ALTER PROCEDURE _wwi_remote_to_wwi_local_pks.[test that primary keys for ' + @Object_Name + ' match across source and target] AS '	+CHAR (13)
        SET @SQL = @SQL + 'BEGIN '																															+CHAR (13)
		SET @SQL = @SQL + 'DROP TABLE IF EXISTS #remote_' + LOWER(@Object_Name) + ''																		+CHAR (13)
		SET @SQL = @SQL + 'SELECT '+@Primary_Keys+' INTO #remote_' + LOWER(@Object_Name) + ''																+CHAR (13)
		SET @SQL = @SQL + 'FROM OPENQUERY(' + @Source_Server_Name + ',''SELECT TOP '+@Object_Data_Percentage+' PERCENT '									+CHAR (13)
		SET @SQL = @SQL + ''+@Primary_Keys+' FROM '																											+CHAR (13)
		SET @SQL = @SQL + ''+@DB_Name+'.'+@Schema_Name+'.'+@Object_Name+''')'																				+CHAR (13)
		SET @SQL = @SQL + 'DROP TABLE IF EXISTS #local_' + @Object_Name + ''																				+CHAR (13)
		SET @SQL = @SQL + 'SELECT TOP '+@Object_Data_Percentage+' PERCENT '+@Primary_Keys+' INTO #local_'+@Object_Name+''									+CHAR (13)
		SET @SQL = @SQL + 'FROM '+@db_name+'.'+@schema_name+'.'+@object_name+''																				+CHAR (13)
		SET @SQL = @SQL + 'EXEC tSQLt.AssertEqualsTable @Expected = #remote_'+@object_name+', @Actual = #local_'+@object_name+''							+CHAR (13)
		SET @SQL = @SQL + ',@message = ''The primary keys comparison for ' + @Object_Name + ' table resulted in differences being detected.'''				+CHAR (13)					
		SET @SQL = @SQL + 'END'																																+CHAR (13)
		EXEC (@SQL)
        FETCH NEXT FROM db_cursor
        INTO @DB_Name, @Schema_Name,@Object_Name, @Primary_Keys
    END;

    CLOSE db_cursor;
    DEALLOCATE db_cursor;
	
END;
GO

EXEC  _wwi_remote_to_wwi_local_pks.[create_wwi_remote_to_wwi_local_objects_pks_tests]
GO
--EXEC tSQLt.Run '_wwi_remote_to_wwi_local_pks'

_wwi_to_wwidw_businessrules – this test class is used for custom business rules validation and comparison across WWI and WWI DW databases. Each test case corresponds to a rule which mostly dictates that a source-to-target comparison is made between a value or a series of values and conforming to an expected result based on a predefined business logic. In the below examples, we’re comparing sales/order quantities, tax and customer ids or city ids across WideWorldImporters (OLTP) and WideWorldImportersDW (OLAP) databases to ensure that the transformed data (facts and dimensions) persisted in the DW database is in line with its raw counterpart in the transactional layer. Also, to simplify the SQL code, I only included the first 100 records across both data sets as denoted by the SELECT TOP 100 statetements. This is due to an additional logic required to reconcile source and target outputs for all the records – not really relevant for this demo but something that ought to be removed in a typical testing scenario where all-encompassing (not partial) testing is required.

/*
4. Create _wwi_to_wwdw_business_rules test class and associated stored procedures
*/
EXEC tSQLt.NewTestClass '_wwi_to_wwidw_businessrules';
GO

CREATE OR ALTER PROCEDURE _wwi_to_wwidw_businessrules.[test that sales quantity, tax and customer id values match across source and target]
AS
BEGIN
    DROP TABLE IF EXISTS #source;
    SELECT TOP 100
           SUM(il.Quantity) AS sales_quantity,
           SUM(il.ExtendedPrice - il.TaxAmount) AS total_excluding_tax,
           SUM(il.ExtendedPrice) AS total_including_tax,
           c.CustomerID AS customer_id
    INTO #source
    FROM [WideWorldImporters].Sales.Invoices AS i
        INNER JOIN [WideWorldImporters].Sales.InvoiceLines AS il
            ON i.InvoiceID = il.InvoiceID
        INNER JOIN [WideWorldImporters].Sales.Customers AS c
            ON i.CustomerID = c.CustomerID
    GROUP BY c.CustomerID
    ORDER BY c.CustomerID;


    DROP TABLE IF EXISTS #target;
    SELECT TOP 100
           SUM(fs.Quantity) AS sales_quantity,
           SUM(fs.[Total Excluding Tax]) AS total_excluding_tax,
           SUM(fs.[Total Including Tax]) AS total_including_tax,
           dc.[WWI Customer ID] AS customer_id
    INTO #target
    FROM WideWorldImportersDW.Fact.[Sale] fs
        JOIN WideWorldImportersDW.Dimension.Customer dc
            ON fs.[Customer Key] = dc.[Customer Key]
    WHERE dc.[WWI Customer ID] <> 0
    GROUP BY dc.[WWI Customer ID]
    ORDER BY dc.[WWI Customer ID];

    EXEC tSQLt.AssertEqualsTable @Expected = #source,
                                 @Actual = #target,
                                 @Message = 'Sales data reconciliation between WideWorldImporters and WideWorldImportersDW resulted in differences being detected.';
END;


CREATE OR ALTER PROCEDURE _wwi_to_wwidw_businessrules.[test that order quantity, tax and city id/name values match across source and target]
AS
BEGIN
    DROP TABLE IF EXISTS #source;
    SELECT TOP 100
           SUM(ol.Quantity) AS quantity,
           SUM(ol.UnitPrice) AS total_unit_price,
           SUM(ROUND(ol.Quantity * ol.UnitPrice, 2)) AS total_excluding_tax,
           SUM(ROUND(ol.Quantity * ol.UnitPrice, 2) + ROUND(ol.Quantity * ol.UnitPrice * ol.TaxRate / 100.0, 2)) AS total_including_tax,
           c.DeliveryCityID AS wwi_city_id,
           ci.CityName AS city
    INTO #source
    FROM [WideWorldImporters].Sales.Orders AS o
        INNER JOIN [WideWorldImporters].Sales.OrderLines AS ol
            ON o.OrderID = ol.OrderID
        INNER JOIN [WideWorldImporters].Sales.Customers AS c
            ON c.CustomerID = o.CustomerID
        INNER JOIN [WideWorldImporters].[Application].[Cities] ci
            ON c.DeliveryCityID = ci.CityID
    GROUP BY c.DeliveryCityID,
             ci.CityName
    ORDER BY c.DeliveryCityID;


    DROP TABLE IF EXISTS #target;
    SELECT TOP 100
           SUM([Quantity]) AS quantity,
           SUM([Unit Price]) AS total_unit_price,
           SUM([Total Excluding Tax]) AS total_excluding_tax,
           SUM([Total Including Tax]) AS total_including_tax,
           dc.[WWI City ID] AS wwi_city_id,
           dc.City AS city
    INTO #target
    FROM WideWorldImportersDW.Fact.[Order] fo
        JOIN WideWorldImportersDW.Dimension.City dc
            ON fo.[City Key] = dc.[City Key]
    GROUP BY dc.[WWI City ID],
             dc.City
    ORDER BY dc.[WWI City ID];

    EXEC tSQLt.AssertEqualsTable @Expected = #source,
                                 @Actual = #target,
                                 @Message = 'Order data reconciliation between WideWorldImporters and WideWorldImportersDW resulted in differences being detected.';
END;
--EXEC tSQLt.Run '_wwi_to_wwidw_businessrules';

_wwi_local_objects, _dw_objects and _oltp_to_dw_pks classes’ tests are auto-generated based on rules defined in a “parent” stored procedures. This means each test case is created following the same pattern and the output stored procedures are structurally the same. For example, _target_objects class (schema) generates “children” stored procedures, each of those responsible for an isolated testing unit.

_wwi_to_wwidw_businessrules class of tests, on the other hand, requires SQL to follow business rules applied to the transformation pipelines and cannot be auto generated from metadata.

Now that we have all the pieces of the puzzle in place, let’s run all the tests across the four classes we generated and see if we have any issues with the underlying data. In order to run all tests in sequence we issue EXEC tSQLt.RunALL command and after a short while, the following output should be generated in SSMS (click on image to enlarge).

As we can see, all but one tests were successful. The one which failed is the result of Azure SQL DB specific table being present in the source WWI database and since this table only applies to Azure MI and Azure SQL DB, next time, we can add it to our exclusion rules. Another important note to raise is that in the above demo examples, object names and associated schemas and databases were sourced directly from system tables. In a real world scenario, all this information should be stored in a metadata catalog – the main source for reference and control data.

Reporting

tSQLt framework captures test run history in a TestResult table, however due to ephemeral nature of this data, every time a test of suite of tests are run, this data is purged. Additionally, we may want to augment this data with additional attributes and turn it into a reporting data source. This way, long-term longitudinal analytics can draw from a series of test executions, enabling administrators to pinpoint areas of concerns or where repeating issues have been occuring. For this reason, we will create an additional schema and table mimicking TestResult output (for long-term data retention), add a few columns storing additional information we can use in our visualization tool and finally create a view which ties it all together.

/*
1. Create tSQLtRunHistory database schema and TestResults reporting table
*/
CREATE SCHEMA [tSQLtRunHistory];
GO

CREATE TABLE [tSQLtRunHistory].[TestResults]
(
    [Id] [BIGINT] IDENTITY(1, 1) NOT NULL,
    [Server] [NVARCHAR](MAX) NOT NULL,
    [LoginUserName] [VARCHAR](MAX) NOT NULL,
    [HostName] [VARCHAR](MAX) NOT NULL,
    [Database] [VARCHAR](MAX) NOT NULL,
    [Class] [NVARCHAR](MAX) NOT NULL,
    [TestCase] [NVARCHAR](MAX) NOT NULL,
    [Name] AS ((QUOTENAME([Class]) + '.') + QUOTENAME([TestCase])),
    [TranName] [NVARCHAR](MAX) NULL,
    [Result] [NVARCHAR](MAX) NULL,
    [Msg] [NVARCHAR](MAX) NULL,
    [TestStartTime] [DATETIME2](7) NOT NULL,
    [TestEndTime] [DATETIME2](7) NULL,
    CONSTRAINT [pk_tsqltrunhistory_testresults]
        PRIMARY KEY CLUSTERED ([Id] ASC)
        WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,
              ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF
             ) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY];
GO

ALTER TABLE [tSQLtRunHistory].[TestResults]
ADD CONSTRAINT [df_testresults(TestStartTime)]
    DEFAULT (SYSDATETIME()) FOR [TestStartTime];
GO


/*
2. Create tSQLtRunHistory vw_TestResults reporting view
*/
CREATE VIEW [tSQLtRunHistory].[vw_TestResults]
AS
SELECT [Id],
       [Server] AS Server_Name,
       [LoginUserName] AS User_Name,
       [HostName] AS Host_Name,
       [Database],
       [Class] AS Test_Class,
       [TestCase] AS Test_Case_Name,
       [Name] AS Class_Test_Case_Name,
       [TranName] AS Transaction_Name,
       [Result] AS Test_Result,
       CASE
           WHEN [Msg] IS NULL
                OR [Msg] = '' THEN
               'Nominal Results Recorded.'
           ELSE
               [Msg]
       END AS Output_Message,
       CASE
           WHEN [Class] IN ( '_wwi_local_objects', '_wwidw_objects' )
                AND [TestCase] LIKE 'test that%' THEN
               LOWER(SUBSTRING(
                                  [TestCase],
                                  CHARINDEX('test that', [TestCase]) + LEN('test that'),
                                  CHARINDEX('object exists', [TestCase]) - LEN('object exists') + 2
                              )
                    )
           WHEN [Class] IN ( '_wwi_remote_to_wwi_local_pks' )
                AND [TestCase] LIKE 'test that%' THEN
               LOWER(SUBSTRING(
                                  [TestCase],
                                  CHARINDEX('test that primary keys for', [TestCase])
                                  + LEN('test that primary keys for'),
                                  CHARINDEX('match across source and target', [TestCase])
                                  - LEN('match across source and target') + 2
                              )
                    )
           ELSE
               'Unspecyfied object'
       END AS Tested_Object_Name,
       [TestStartTime] AS Test_Start_Date_Time,
       [TestEndTime] AS Test_End_Date_Time,
       CAST([TestEndTime] AS DATE) AS Test_End_Date,
       [Year_Month_Number] = CAST(DATENAME(YEAR, [TestEndTime]) + FORMAT([TestEndTime], 'MM') AS INT),
       [Month_Year_Name] = LEFT(DATENAME(MONTH, [TestEndTime]), 3) + ' - ' + DATENAME(YEAR, [TestEndTime])
FROM [tSQLtRunHistory].[TestResults];
GO


/*
3. Make appropriate changes to the tSQLt framework to account for the new reporting tables and how tests execution data is logged/stored
*/
ALTER PROCEDURE [tSQLt].[NullTestResultFormatter]
AS
BEGIN
    INSERT INTO [tSQLtRunHistory].[dbo].[TestResults]
    (
        [Server],
        [LoginUserName],
        [HostName],
        [Database],
        [Class],
        [TestCase],
        [TranName],
        [Result],
        [Msg],
        [TestStartTime],
        [TestEndTime]
    )
    SELECT @@SERVERNAME,
           SUSER_NAME(),
           HOST_NAME(),
           DB_NAME(),
           [Class],
           [TestCase],
           [TranName],
           [Result],
           [Msg],
           [TestStartTime],
           [TestEndTime]
    FROM [tSQLt].[TestResult];
    RETURN 0;
END;
GO


CREATE OR ALTER PROCEDURE [tSQLt].[SaveTestResultFormatter]
AS
BEGIN
    INSERT INTO [tSQLtRunHistory].[TestResults]
    (
        [Server],
        [LoginUserName],
        [HostName],
        [Database],
        [Class],
        [TestCase],
        [TranName],
        [Result],
        [Msg],
        [TestStartTime],
        [TestEndTime]
    )
    SELECT @@SERVERNAME,
           SUSER_NAME(),
           HOST_NAME(),
           DB_NAME(),
           [Class],
           [TestCase],
           [TranName],
           [Result],
           [Msg],
           [TestStartTime],
           [TestEndTime]
    FROM [tSQLt].[TestResult];
END;
GO

DECLARE @RC INT;
DECLARE @Formatter NVARCHAR(4000);

SET @Formatter = N'tSQLt.SaveTestResultFormatter'; --'tSQLt.DefaultResultFormatter'

EXECUTE @RC = [tSQLt].[SetTestResultFormatter] @Formatter;
GO

Once we have enough data accrued, we can visualize it using any BI tool. The following is a sample Qlik Sense dashboard depicting individual tests results as well as some aggregate level metrics (click on image to enlarge).

This post outlined a few simple use cases for tSQLt framework unit testing applications. I only scratched the surface with what’s possible with its built-in assertions and expectations as tSQLt toolkit can also test triggers, stored procedures logic, constraints and more. While there are other, more feature-rich and comprehensive unit testing frameworks available, tSQLt integrates well into Microsoft SQL Server landscape, can run as part of your automated builds and does not require developers to learn other programming languages, tools or platforms. It’s really easy to configure and one can become productive with it in a matter of minutes. Most developers do not like writing tests but frameworks like tSQLt make the process of integrating unit tests into the database development lifecycle a breeze.

Tags: , , , ,