Kicking the Tires on Azure SQL Database External REST Endpoints – Sample Integration Solution Architecture

June 8th, 2023 / 4 Comments » / by admin

Introduction

SQL Server in-database REST API integration was always roll-your-own, bubble gum and duct tape type of affair – it was possible but never easy. Some may argue it’s for all the right reasons as imposing strict distinction between database and application layers created a well-defined separation of concerns and delegated the former to do one thing and one thing only – data storage and management. However, as vendors’ competition increased, more innovative features were added to and around these platforms to expand their capabilities and accessibility – new formats e.g. Apache Arrow, new Machine Learning features e.g. vector support or even new ways of merging applications and data storage paradigms e.g. WebAssembly-compiled (in-browser) RDBMS. As such, the word database, though synonymous with its primary function of data storage and management, has taken on a new meaning and with that, as set of new capabilities as partly discussed in the post.

Azure SQL Database external REST endpoint integration has not long ago come out of Public Preview and represents an improved way to natively (to Azure ecosystem) query REST API endpoints with little fuss. External REST Endpoint Invocation makes it possible for developers to call REST/GraphQL endpoints from other Azure Services from right within the Azure SQL Database. With a quick call to sp_invoke_external_rest_endpoint system stored procedure, you can have data processed via an Azure Function, update a PowerBI dashboard, or even talk to Cognitive Service or OpenAI.

For a full list of supported services, you can peruse Microsoft documentation but in order to explore real-world application of this functionality, let’s build a simple solution and see how easy or difficult it is to put it to work.

Example Architecture and Use Case Scenario

Let’s assume that we have a telephone conversations data arriving in Azure Blob Storage as a JSON file. Next, we’d like to persist it in our SQL database in near real time and enrich it with sentiment analysis data using Azure Cognitive Services. Additionally, if the sentiment is negative, perhaps indicating customer complaint or dissatisfaction, we would like an email sent to a member of a customer service team to triage and follow up on.

The following diagram (click on image to enlarge) represent a proposed solution architecture behind this requirement, with emphasis on activities number 3, 6 and 9 as these correspond to using SQL Server sp_invoke_external_rest_endpoint system stored procedure to communicate with external services. The idea here is that SQL Server engine can act a connecting tissue for most of integration work, allowing simple workflows to be built and executed directly from the underlying database. And, as you will see, most of this functionality can be achieved using vanilla T-SQL with a combination of stored procedures and triggers, something which was very difficult to solution before this feature was made available.

Also, please note that I do not advocate for building high-volume, high-velocity, real-time pipelines using database triggers and SQL Server system stored procedures. Microsoft clearly outlines limits imposed on throttling for the number of concurrent connections to external endpoints as well as limitations in the HTTP request and response payload supported media types and size, URL length, header size etc., so it’s clearly not a panacea for all your integration needs. However, for sporadic and limited use cases – think in-database Zapier – this can significantly cut development time and allow DBAs and database devs to reach into other corners of Azure ecosystem with little fuss.

Solution Implementation

To start with, we need Azure Storage Account with the input container to store our incoming JSON files. Once we have one created, we can develop a small Azure Function which executes on blob being persisted in the target location and executes Azure SQL Database stored procedure responsible for data acquisition. The following is a small Python script calling our first stored procedure – usp_load_from_azure_blob – every time a new blob is created. For simplicity’s sake, the code does not do any file schema validation or pre-processing and its sole role is to execute SQL Server stored procedure.

import logging
import pyodbc
import azure.functions as func
from os import path

# For demo only - any cryptographic keys should be stored in Secrets Store e.g. Azure Key Vault!
_SQL_SERVER = 'Your_Azure_Server_Name'
_SQL_DB = 'Your_Your_DB_Name'
_USERNAME = 'Your_DB_User_Name'
_PASSWORD = 'Your_DB_User_Name_Password'
_DRIVER = '{ODBC Driver 18 for SQL Server}'
_TARGET_TABLE_NAME = 'customer_interactions'
_TARGET_SCHEMA_NAME ='dbo'
_TARGET_STORED_PROC_NAME = 'usp_load_from_azure_blob'


def main(inputblob: func.InputStream):
    
    logging.info('Python blob trigger function processed blob {blob_name}'.format(blob_name = inputblob.name))
    try:
        cnxn = pyodbc.connect('DRIVER='+_DRIVER+';SERVER='+_SQL_SERVER +
                              ';PORT=1433;DATABASE='+_SQL_DB+';UID='+_USERNAME+';PWD='+_PASSWORD)
        if cnxn:
            logging.info('Connection to {mssql} SQL Server succeeded!'.format(mssql=SQL_SERVER))
    except pyodbc.Error as e:
        sqlstate = e.args[1]
        logging.error(
            sqlstate)
    if cnxn:
        logging.info('Executing {stored_proc} stored procedure...'.format(stored_proc=_TARGET_STORED_PROC_NAME))
        cursor = cnxn.cursor()
        sql = '''\
                DECLARE @Return_Code INT;
                EXEC @Return_Code = {stored_proc} ?,?,?;
                SELECT @Return_Code AS rc;'''.format(stored_proc = _TARGET_STORED_PROC_NAME)
        values = (path.basename(inputblob.name), _TARGET_SCHEMA_NAME, _TARGET_TABLE_NAME)
        cursor.execute(sql, values)
        rc = cursor.fetchval()
        if rc == 0:
            logging.info('Stored procedure {stored_proc} executed successfully!'.format(stored_proc=_TARGET_STORED_PROC_NAME))
        cursor.commit()  

Now that we have our function, let’s create a small JSON file called ‘customer12345.json’ (I used ChatGPT for this), target table the stored procedure used in our Python script. Also, given that some REST endpoints require authentication in order to be properly invoked, we will need to create Database Scoped Credentials (DSC) to securely store authentication data (like a Bearer token for example) to call a protected endpoint. The following code creates Scoped Credential ‘azblobstore’ with SAS access token, a table called customer_interactions where unparsed JSON data will be stored, and the main stored procedure used for data acquisition. Notice that in line 42, there is also a reference to a table value function called tvf_compare_json_docs which is there to allow JSON payload comparison in the odd case the same file (with the same file name) is submitted more than once and we’d like to update the original version and populated Update_DataTime field in the target table (the code behind this tvf and JSON file can be found in my OneDrive folder HERE).

-- create encryption key
IF NOT EXISTS
(
    SELECT *
    FROM sys.symmetric_keys
    WHERE [name] = '##MS_DatabaseMasterKey##'
)
BEGIN
    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '$trong_Pa$$word';
END;

-- create credential name
IF EXISTS
(
    SELECT TOP (1)
           1
    FROM sys.database_credentials
    WHERE name = 'azblobstore'
)
BEGIN
    DROP DATABASE SCOPED CREDENTIAL azblobstore;
END;

-- create database scoped credential
CREATE DATABASE SCOPED CREDENTIAL [azblobstore]
WITH IDENTITY = 'SHARED ACCESS SIGNATURE',
     SECRET = 'Your_Azure_Blob_Storage_SAS_Secret_Value';
GO

-- create target table
DROP TABLE IF EXISTS [dbo].[customer_interactions]
CREATE TABLE [dbo].[customer_interactions](
	[file_id] [UNIQUEIDENTIFIER] NOT NULL,
	[file_name] [NVARCHAR](1024) NULL,
	[payload] [NVARCHAR](MAX) NULL,
	[sentiment] [VARCHAR](20) NULL,
	[insert_datetime] [DATETIME2](7) NULL,
	[update_datetime] [DATETIME2](7) NULL,
 CONSTRAINT [file_name] PRIMARY KEY CLUSTERED 
(
	[file_id] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
GO
ALTER TABLE [dbo].[customer_interactions] ADD  CONSTRAINT [df_file_id]  DEFAULT (NEWSEQUENTIALID()) FOR [file_id]
GO
ALTER TABLE [dbo].[customer_interactions] ADD  DEFAULT (NULL) FOR [update_datetime]
GO

-- create usp_load_from_azure_blob stored procedure
CREATE OR ALTER PROCEDURE [dbo].[usp_load_from_azure_blob]
(
    @file_name VARCHAR(1024),
    @schema_name sysname,
    @table_name sysname,
    @table_spec sysname = NULL
)
AS
BEGIN
    SET NOCOUNT ON;
    DECLARE @has_identity_column INT;
    DECLARE @new_json NVARCHAR(MAX);
    DECLARE @old_json NVARCHAR(MAX);
	DECLARE @new_json_file_name NVARCHAR(1024);
    DECLARE @old_json_file_name NVARCHAR(1024);
    DECLARE @error_message VARCHAR(MAX);
    DECLARE @url NVARCHAR(MAX) = CONCAT('https://your_storage_account_name.blob.core.windows.net/input-json/', @file_name);
    DECLARE @response NVARCHAR(MAX);
	DECLARE @time_zone VARCHAR (128)

    IF @table_name IS NULL
        SELECT @table_name = PARSENAME(@table_spec, 1);
    IF @schema_name IS NULL
        SELECT @schema_name = PARSENAME(@table_spec, 2);
    IF @table_name IS NULL
       OR @schema_name IS NULL
    BEGIN
        SET @error_message = 'Target DB, schema or table name was not provided. Bailing out!';
        RAISERROR(   @error_message, 
                     16,             
                     1          
                 );
        RETURN;
    END;

	IF NOT EXISTS
	(
	    SELECT current_utc_offset
	    FROM sys.time_zone_info
	    WHERE name = 'AUS Eastern Standard Time'
	)
	BEGIN
	    SET @time_zone = 'UTC';
	END
	ELSE
	BEGIN
	    SET @time_zone = 'AUS Eastern Standard Time';
	END;

    EXEC sp_invoke_external_rest_endpoint @url = @url,
                                          @method = 'GET',
                                          @headers = '{"Accept":"application/json"}',
                                          @credential = azblobstore,
                                          @response = @response OUTPUT;

    IF TRIM(JSON_VALUE(@response, '$.response.status.http.code')) <> '200'
       AND TRIM(JSON_VALUE(@response, '$.response.status.http.description')) <> 'OK'
    BEGIN
        SET @error_message = 'Rest call response was unsuccessfull. Bailing out!';
        RAISERROR(   @error_message, 
                     16,           
                     1              
                 );
        RETURN;
    END;

	SET @new_json =
	(
	    SELECT JSON_QUERY(@response, '$.result')
	);
	SET @old_json =
	(
	    SELECT payload FROM dbo.customer_interactions WHERE file_name = @file_name
	);
	SET @new_json_file_name = @file_name;
	SET @old_json_file_name =
	(
	    SELECT file_name FROM dbo.customer_interactions WHERE file_name = @file_name
	);


    IF (ISJSON(@new_json) < 1)
    BEGIN
        SET @error_message
            = 'Provided source JSON payload is not properly formatted or the file does not exist. Bailing out!';
        RAISERROR(   @error_message, 
                     16,             
                     1               
                 );
        RETURN;
    END;

    DROP TABLE IF EXISTS #returntable;
    SELECT *
    INTO #returntable
    FROM dbo.tvf_compare_json_docs(@new_json, @old_json);

    DECLARE @select_sql NVARCHAR(200) =
            (
                SELECT 'SELECT * FROM ' + QUOTENAME(@schema_name) + '.' + QUOTENAME(@table_name)
            );

    SELECT @has_identity_column = MAX(CONVERT(INT, is_identity_column))
    FROM sys.dm_exec_describe_first_result_set(@select_sql, NULL, 1) AS f;

	DECLARE @delete_cmd VARCHAR(MAX)
	    = 'DELETE FROM ' + QUOTENAME(@schema_name) + '.' + QUOTENAME(@table_name) + ' WHERE file_name = ''' + @file_name
	      + ''';';
	DECLARE @update_cmd VARCHAR(MAX)
	    = 'UPDATE ' + QUOTENAME(@schema_name) + '.' + QUOTENAME(@table_name) + ' SET payload  = ''' + @new_json
	      + ''', sentiment = NULL, update_datetime = SYSDATETIME() AT TIME ZONE ''UTC'' AT TIME ZONE '''+@time_zone+''' WHERE file_name = ''' + @file_name + ''';';
	DECLARE @insert_cmd VARCHAR(MAX)
	    = 'INSERT INTO ' + QUOTENAME(@schema_name) + '.' + QUOTENAME(@table_name) + ' (file_name, payload, insert_datetime) 
			SELECT ''' + @file_name + ''', ''' + @new_json + ''',  SYSDATETIME() AT TIME ZONE ''UTC'' AT TIME ZONE '''+@time_zone+''';';


    DECLARE @command NVARCHAR(MAX)
        =
            (
                SELECT CASE
                           WHEN @old_json IS NOT NULL AND @old_json_file_name IS NOT NULL AND @old_json_file_name = @new_json_file_name
                                AND EXISTS
                                    (
                                        SELECT TOP (1) 1 FROM #returntable WHERE SideIndicator = '<>'
                                    ) THEN
                               @update_cmd
							WHEN @old_json IS NOT NULL AND @old_json_file_name IS NOT NULL AND @old_json_file_name = @new_json_file_name
                                AND NOT EXISTS
                                    (
                                        SELECT TOP (1) 1 FROM #returntable WHERE SideIndicator = '<>'
                                    ) THEN ''
                           ELSE 
                               CASE
                                   WHEN @old_json IS NOT NULL AND @old_json_file_name IS NOT NULL AND @old_json_file_name = @new_json_file_name THEN
                                       @delete_cmd
                                   ELSE
                                       ''
                               END
                               + CASE
                                     WHEN @has_identity_column > 0 THEN
                                         ' SET IDENTITY_INSERT ' + QUOTENAME(@schema_name) + '.'
                                         + QUOTENAME(@table_name) + ' OFF; '
                                     ELSE
                                         ''
                                 END + @insert_cmd
                               + CASE
                                     WHEN @has_identity_column > 0 THEN
                                         ' SET IDENTITY_INSERT ' + QUOTENAME(@schema_name) + '.'
                                         + QUOTENAME(@table_name) + ' ON '
                                     ELSE
                                         ''
                                 END
                       END
            );
    EXEC (@command);
END;

The main part is as per lines 100-104 where SQL Server sp_invoke_external_rest_endpoint system stored procedure is used for data acquisition. We’re using GET HTTP method (must be one of the following values: GET, POST, PUT, PATCH, DELETE, HEAD), passing the previously created Database Scoped Credentials in the @credential parameter and using concatenated blob URL and file name as the @url parameter. All going well, execution will return 0 if the HTTPS call was done, the HTTP code received is of 2xx status (Success) and the returned JSON in the @response parameter can be further parsed (if required) using SQL Server JSON-specific syntax.

We now have our JSON file content in the target table but, as per the original requirement, we also need to ascertain client’s conversation sentiment which can help us get the overall gauge on how our customers’ cohort is tracking with respect to the service satisfaction. Again, previously, that would have been a laborious, if not challenging task for someone who doesn’t have a lot of experience in applications integration and Azure ecosystem of services. However, now it’s just a matter of provisioning Azure Cognitive Service account (something we can be easily done from Azure portal) and creating a database trigger used to execute Cognitive Services API call using the same system stored procedure we used before.

Let’s go ahead and save our Azure Cognitive Services authentication key as a DSC, and wrap the sp_invoke_external_rest_endpoint call in a separate stored procedure which also parses JSON payload to extract sentiment value. We will also create a database trigger to automated procedure execution and invoke it every time a record is inserted or updated.

-- create database scoped credential
IF EXISTS
(
    SELECT TOP (1)
           1
    FROM sys.database_credentials
    WHERE name = 'Your_Cognitive_Services_Endpoint_URL'
)
BEGIN
    DROP DATABASE SCOPED CREDENTIAL [Your_Cognitive_Services_Endpoint_URL];
END;

CREATE DATABASE SCOPED CREDENTIAL [Your_Cognitive_Services_Endpoint_URL]
WITH IDENTITY = 'HTTPEndpointHeaders',
     SECRET = '{"Ocp-Apim-Subscription-Key":"Your_Key_Value"}';
GO

-- create usp_run_sentiment_analysis stored procedure
CREATE OR ALTER PROCEDURE [dbo].[usp_run_sentiment_analysis]
(@file_id UNIQUEIDENTIFIER)
AS
BEGIN
    DECLARE @error_message VARCHAR(MAX);
    DECLARE @url NVARCHAR(2000) = N'https://Your_Cognitive_Services_Endpoint_URL/text/analytics/v3.0/sentiment';
    DECLARE @response NVARCHAR(MAX);
    DECLARE @json NVARCHAR(MAX) =
            (
                SELECT payload FROM [dbo].[customer_interactions] WHERE file_id = @file_id
            );

    DECLARE @customer_text NVARCHAR(MAX) =
            (
                SELECT STRING_AGG(message, ' ') AS customer_text
                FROM
                    OPENJSON(@json, '$.conversation')
                    WITH
                    (
                        speaker NVARCHAR(100),
                        message NVARCHAR(MAX) '$.message'
                    )
                WHERE speaker = 'Customer'
            );

    DECLARE @payload NVARCHAR(MAX)
        = N'{"documents": [{"id": "1", "language": "en", "text": "' + @customer_text + N'"}]}';
    DECLARE @headers NVARCHAR(102) = N'{"Content-Type": "application/json"}';


    EXEC sp_invoke_external_rest_endpoint @url = @url,
                                          @method = 'POST',
                                          @headers = @headers,
                                          @payload = @payload,
                                          @credential = [Your_Cognitive_Services_Endpoint_URL],
                                          @response = @response OUTPUT;

    IF TRIM(JSON_VALUE(@response, '$.response.status.http.code')) <> '200'
    BEGIN
        SET @error_message = 'Rest call response was unsuccessful. Bailing out!';
        RAISERROR(   @error_message, 
                     16,             
                     1          
                 );
        RETURN;
    END;
    ELSE
    BEGIN
        UPDATE [dbo].[customer_interactions]
        SET sentiment =
            (
                SELECT TOP (1) JSON_VALUE(@response, '$.result.documents[0].sentiment')
            )
        WHERE file_id = @file_id;
    END;
END;
GO

-- create trigger_sentiment_analysis database trigger
CREATE OR ALTER TRIGGER [dbo].[trigger_sentiment_analysis]
ON [dbo].[customer_interactions]
AFTER INSERT, UPDATE
AS
BEGIN
    SET NOCOUNT ON;
    DECLARE @file_id VARCHAR(128);
    SELECT @file_id = inserted.file_id
    FROM inserted;
    EXEC usp_run_sentiment_analysis @file_id = @file_id;
END;
GO

ALTER TABLE [dbo].[customer_interactions] ENABLE TRIGGER [trigger_sentiment_analysis];
GO

The 3-stage logic in the above stored procedure dictates that we extract customer’s text from our JSON entry, omitting everything that relates to speaker dialog, call our sentiment analysis API with this data to determine sentiment value and finally, persist it in the target table against the file_id in question. All there’s left to do is to create another database trigger which activates only if the sentiment value is negative and, you guessed it, calls a stored procedure responsible for running Azure Logic App.

This is our third Azure services integration using REST endpoint in SQL DB and it just goes to show how versatile this functionality is and how it opens a world of possibilities, all within the confines of the database and with little to no development required outside of T-SQL.

For this part let’s create a small Logic App which triggers ‘Send an email (V2)’ task when a HTTP request is received, the final stored procedure calling this workflow and a database trigger to automate execution process. Also, to make it more interesting, we’ll pass customer’s feedback text and date/time this file was created at to our email content so that whoever receives this correspondence does not have to wonder what text triggered this workflow.

Our Logic App and the final piece of SQL code will look like this:

-- create usp_send_email_on_negative_sentiment stored procedure
CREATE OR ALTER PROCEDURE [dbo].[usp_send_email_on_negative_sentiment]
(
    @insert_date DATETIME2,
    @customer_feedback NVARCHAR(MAX)
)
AS
BEGIN
    DECLARE @url NVARCHAR(MAX)
        = N'Your_Logic_App_URL';
    DECLARE @response NVARCHAR(MAX);
    DECLARE @payload NVARCHAR(MAX) = N'{
        "feedback":  "' + @customer_feedback + N'",
        "date": "' + CONVERT(VARCHAR, @insert_date, 0) + N'"
		}';

    DECLARE @headers NVARCHAR(102) = N'{"Content-Type": "application/json"}';

    EXEC sp_invoke_external_rest_endpoint @url = @url,
                                          @method = 'POST',
                                          @headers = @headers,
                                          @payload = @payload,
                                          @response = @response OUTPUT;
END;
GO

-- create trigger_send_email_on_negative_sentiment database trigger
CREATE OR ALTER TRIGGER [dbo].[trigger_send_email_on_negative_sentiment]
ON [dbo].[customer_interactions]
AFTER UPDATE
AS
BEGIN
    SET NOCOUNT ON;
    DECLARE @JSON NVARCHAR(MAX);
    SELECT @JSON = inserted.payload
    FROM Inserted;
    DECLARE @customer_feedback NVARCHAR(MAX);
    SET @customer_feedback =
    (
        SELECT STRING_AGG(message, ' ') AS customer_text
        FROM
            OPENJSON(@JSON, '$.conversation')
            WITH
            (
                speaker NVARCHAR(100),
                message NVARCHAR(MAX) '$.message'
            )
        WHERE speaker = 'Customer'
    );
    DECLARE @insert_date DATETIME2;
    SELECT @insert_date = inserted.insert_datetime
    FROM inserted;
    DECLARE @sentiment VARCHAR(20);
    SELECT @sentiment = inserted.sentiment
    FROM inserted;
    IF @sentiment = 'negative'
    BEGIN
        EXEC usp_send_email_on_negative_sentiment @insert_date = @insert_date,
                                                  @customer_feedback = @customer_feedback;
    END;
END;
GO

ALTER TABLE [dbo].[customer_interactions] ENABLE TRIGGER [trigger_send_email_on_negative_sentiment];
GO

We can run this workflow, end-to-end by uploading our sample JSON conversation file into Azure storage container and, providing we have the Logic App and Azure function running (either in Azure or locally with Azure Functions Core Tools), the whole process should only take a few seconds to complete (you can confirm it by looking at time stamps) and result in an email notification being received – see screenshots as per below (click on it to enlarge).

Conclusion

Using Azure SQL DB REST endpoint integration, a large number of Azure services can be interfaced with Azure SQL DB, further expanding and extending platform’s capability. These workflows allow SQL database to act as the connecting tissue for data interoperability across API-enabled interfaces. In addition to workflow activation e.g. triggering Logic Apps or Azure functions as demonstrated above, additional use cases can include further integration with event-based architectures e.g. Azure Event Hub, creating data streams for fraud detection via Stream Analytics, websites updates using broadcasting SignalR messages or cache invalidation using Azure Functions. As long as you don’t think of the feature as a MuleSoft or Boomi replacement and understand the limitations of this approach, querying REST Endpoints with Azure SQL Database opens up a lot of possibilities.

Tags: , , , , ,

Data Build Tool (DBT) – The Emerging Standard For Building SQL-First Data Transformation Pipelines – Part 2

January 20th, 2023 / 4 Comments » / by admin

In Part 1 of this series, I went over the high-level introduction to dbt, stood up a small development SQL Server database in Azure, acquired a sample size of Google Analytics data to populate a few tables used in this post and finally installed and configured dbt on a local environment. In this post I’d like to dive deeper into dbt functionality and outline some of its key features that make it an attractive proposition for data transformation and loading part of the ETL/ELT process.

DBT Models

In dbt framework, a model is simply a SELECT SQL statement. When executing dbt run command, dbt will build this model in our data warehouse by wrapping it in a CREATE VIEW AS or CREATE TABLE AS statement. By default dbt is configured to persist those SELECT SQL statements as views, however, this behaviour can be modified to take advantage of other materialization options e.g. table, ephemeral or incremental. Each type of materialization has its advantages and disadvantages and should be evaluated based on a specify use case and scenario. The following image depicts core pros and cons associated with each approach.

Before we dive headfirst into creating dbt models, first, let’s explore some high-level guiding principles around structuring our project files. The team at dbt recommends organizing your models into at least two different folders – staging and marts. In a simple project, these may be the only models we build; more complex projects may have a number of intermediate models that provide a better logical separation as well as accessories to these models.

  • Staging models – the atomic unit of data modeling. Each model bears a one-to-one relationship with the source data table it represents. It has the same granularity, but the columns have been renamed, recast, or standardized into a consistent format
  • Marts models – models that represent business processes and entities, abstracted from the data sources that they are based on. Where the work of staging models is limited to cleaning and preparing, fact tables are the product of substantive data transformation: choosing (and reducing) dimensions, date-spinning, executing business logic, and making informed decisions based on business requirements

Sometimes, mainly due to the level of data complexity or additional security requirements, further logical separation is recommended. In this case ‘Sources’ models layer is introduced before data is loaded into the Staging layer. Sources store schemas and tables in a source-conformed structure (i.e. tables and columns in a structure based on what an API returns), loaded by a third party tool.

Because we often work with multiple data sources, in our Staging and Marts directories, we create one folder per source – in our case, since we’re only working with a single source, we will simply call these google_analytics. Conforming to the dbt minimum standards for project organization and layout i.e. Staging and Marts layers, let’s create the required folders so that the overall structure looks as the one on the left.

At this point we should have everything in place to build our first model based on the table we created in the Azure SQL Server DB in the previous post. Creating simple models is dbt is a straightforward affair and in this case it’s just a SELECT SQL statement. To begin, in our staging\google_analytics folder we create a SQL file, name it after the source table we will be staging and save it with the following two-line statement.

{{ config(materialized='table') }}
SELECT * FROM ga_data

The top line simply tells dbt to materialize the output as a physical table (default is a view) and in doing that select everything from our previously created dbo.ga_data table into a new stg.ga_data table. dbt uses Jinja templating language, making a dbt project an ideal programming environment for SQL. With Jinja, we can do transformations which are not typically possible in SQL, for example, using environment variables or macros to abstract snippets of SQL, which is analogous to functions in most programming languages. Whenever you see a {{ … }}, you’re already using Jinja.

To execute this model, we will simply issue ‘dbt run’ command (here with an optional parameter ‘–select staging’, denoting the name of the model we want to compile) and the output should tell us that we successfully created a staging version of our ga_data table.

Obviously, in order to build more complete analytics, we need to combine data from across multiple tables and data sources so let’s create another table called ‘ga_geo_ref_data’ containing latitude, longitude and display name values using Geopy Python library. Geopy makes it easy to locate the coordinates of addresses, cities, countries, and landmarks across the globe using third-party geocoders. This will provide us with an additional reference data which we will blend with the core ‘ga_data’ table/model and create a single, overarching dataset containing both: Google Analytics data and reference Geo data used to enrich it.

from pathlib import PureWindowsPath
import pyodbc
import pandas as pd
from geopy.geocoders import Nominatim

_SQL_SERVER_NAME = 'gademosqlserver2022.database.windows.net'
_SQL_DB = 'sourcedb'
_SQL_USERNAME = 'testusername'
_SQL_PASSWORD = 'MyV3ry$trongPa$$word'
_SQL_DRIVER = '{ODBC Driver 18 for SQL Server}'

geolocator = Nominatim(user_agent='testapp')


def enrich_with_geocoding_vals(row, val):
    loc = geolocator.geocode(row, exactly_one=True, timeout=10)
    if val == 'lat':
        if loc is None:
            return -1
        else:
            return loc.raw['lat']
    if val == 'lon':
        if loc is None:
            return -1
        else:
            return loc.raw['lon']
    if val == 'name':
        if loc is None:
            return 'Unknown'
        else:
            return loc.raw['display_name']
    else:
        pass


try:
    with pyodbc.connect('DRIVER='+_SQL_DRIVER+';SERVER='+_SQL_SERVER_NAME+';PORT=1433;DATABASE='+_SQL_DB+';UID='+_SQL_USERNAME+';PWD=' + _SQL_PASSWORD) as conn:
        with conn.cursor() as cursor:
            if not cursor.tables(table='ga_geo_ref_data', tableType='TABLE').fetchone():
                cursor.execute('''CREATE TABLE dbo.ga_geo_ref_data (ID INT IDENTITY (1,1),
                                                            Country NVARCHAR (256),
                                                            City NVARCHAR (256),
                                                            Latitude DECIMAL(12,8),
                                                            Longitude DECIMAL(12,8),
                                                            Display_Name NVARCHAR (1024))''')
            cursor.execute('TRUNCATE TABLE dbo.ga_geo_ref_data;')
            query = "SELECT country, city FROM dbo.ga_data WHERE city <> '' AND country <> '' GROUP BY country, city;"
            df = pd.read_sql(query, conn)
            df['latitude'] = df['city'].apply(
                enrich_with_geocoding_vals, val='lat')
            df['longitude'] = df['city'].apply(
                enrich_with_geocoding_vals, val='lon')
            df['display_name'] = df['city'].apply(
                enrich_with_geocoding_vals, val='name')
            for index, row in df.iterrows():
                cursor.execute('''INSERT INTO dbo.ga_geo_ref_data
                                    (Country,
                                    City,
                                    Latitude,
                                    Longitude,
                                    Display_Name)
                          values (?, ?, ?, ?, ?)''',
                               row[0], row[1], row[2], row[3], row[4])

            cursor.execute('SELECT TOP (1) 1 FROM dbo.ga_geo_ref_data')
            rows = cursor.fetchone()
            if rows:
                print('All Good!')
            else:
                raise ValueError(
                    'No data generated in the source table. Please troubleshoot!'
                )
except pyodbc.Error as ex:
    sqlstate = ex.args[1]
    print(sqlstate)

We will also materialize this table using the same technique we tested before and now we should be in a position to create our first data mart object, combining ga_geo_ref_data and ga_data into a single table.

This involves creating another SQL file, this time in our marts\google_analytics folder, and using the following query to blend these two data sets together.

{{ config(materialized='table') }}

SELECT ga.*, ref_ga.Latitude, ref_ga.Longitude, ref_ga.Display_Name 
FROM {{ ref('ga_data') }} ga
LEFT JOIN {{ ref('ga_geo_ref_data') }} ref_ga
ON ga.country = ref_ga.country 
AND ga.city = ref_ga.city

As with one of the previous queries, we’re using the familiar configuration syntax in the first line but there is also an additional reference configuration applied which uses the most important function in dbt – the ref() function. For building more complex models, ref() function is very handy as it allows us to refer to other models. ref() is, under the hood, actually doing two important things. First, it is interpolating the schema into our model file to allow us to change our deployment schema via configuration. Second, it is using these references between models to automatically build the dependency graph. This will enable dbt to deploy models in the correct order when using ‘dbt run’ command.

If we were to run this model as is, dbt would concatenate our default schema name (as expressed in the profiles.yml file) with the schema we would like to output it into. It’s a default behavior which we need to override using a macro. Therefore, to change the way dbt generates a schema name, we should add a macro named generate_schema_name to the project, where we can then define our own logic. We will place the following bit of code in the macros folder in our solution and define our custom schema name in the dbt_project.yml file as per below

{% macro generate_schema_name(custom_schema_name, node) -%}
    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}
        {{ default_schema }}
    {%- else -%}
        {{ custom_schema_name | trim }}
    {%- endif -%}
{%- endmacro %}
name: 'azure_sql_demo'
version: '1.0.0'
config-version: 2

# This setting configures which 'profile' dbt uses for this project.
profile: 'azure_sql_demo'

# These configurations specify where dbt should look for different types of files.
# The 'model-paths' config, for example, states that models in this project can be
# found in the 'models/' directory. You probably won't need to change these!
model-paths: ['models']
analysis-paths: ['analyses']
test-paths: ['tests']
seed-paths: ['seeds']
macro-paths: ['macros']
snapshot-paths: ['snapshots']

target-path: 'target'  # directory which will store compiled SQL files
clean-targets:         # directories to be removed by `dbt clean`
  - 'target'
  - 'dbt_packages'

# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
models:
  azure_sql_demo:
    staging:
      +materialized: view
      +schema: stg
    marts:
      +materialized: view
      +schema: mart

With everything in place, we can now save our SQL code into a file called ga_geo.sql and execute dbt to materialize it in a mart schema using ‘dbt run –select marts’ command. On model built completion, our first mart table should be persisted in the database as per the image below (click to enlarge).

Another great feature of dbt is the ability to create snapshots which is synonymous with the concept of Slowly Changing Dimensions (SCD) in data warehousing. Snapshots implement Type-2 SCD logic, identifying how a row in a table changes over time. If we were to issue an ALTER statement to our ‘ga_data’ table located in the staging schema and add one extra column denoting when the row was created or updated, we could track it’s history using a typical SCD-2 pattern i.e. expiring and watermarking rows which were changed or added using a combination of GUIDs and date attributes.

For this demo let’s execute the following SQL statement to alter our ga_data object, adding a new field called UpdatedAt with a default value of current system timestamp.

ALTER TABLE stg.ga_data
ADD UpdatedAt DATETIME DEFAULT SYSDATETIME()

Once we have our changes in place we can add the following SQL to our solution under the snapshots node and call it ga_data_snapshot.sql.

{% snapshot ga_data_snapshot %}
    {{
        config(
          target_schema = 'staging',
          unique_key = 'ID',
          strategy = 'check',
          check_col = 'all'
        )
    }}
    SELECT * FROM ga_data
{% endsnapshot %}

Next, running ‘dbt snapshot’ command a new table in the staging schema is created and a few additional attributes added to allow for SCD Type-2 tracking (click on image to enlarge).

Snapshots are a powerful feature in dbt that facilitate keeping track of our mutable data through time and generally, they’re as simple as creating any other type of model. This allows for very simple implementation of the SCD Type-2 pattern, with no complex MERGE or UPDATE and INSERT (upsert) SQL statements required.

DBT Tests

Testing data solutions has been notoriously difficult and data validation and QA has always been an afterthought. Best case scenario, third party applications had to be used to guarantee data conformance and minimal standards. Worst case, tests were not developed at all and the job of validating the final output fell on analysts or report developers, eyeballing dashboards before pushing them into production. In extreme cases, I even saw customers being delegated to the roles of unsuspected testers, having raising support tickets due to dashboards coming up empty.

In dbt, tests are assertions we make about the models and other resources in our dbt project. When we run dbt test, dbt will tell us if each test in our project passes or fails. There are two ways of defining tests in dbt:

  • A singular test is testing in its simplest form: If we can write a SQL query that returns failing rows, we can save that query in a .sql file within our test directory. It’s now a test, and it will be executed by the dbt test command
  • A generic test is a parametrized query that accepts arguments. The test query is defined in a special test block (similar to a macro). Once defined, you we reference the generic test by name throughout our .yml files — define it on models, columns, sources, snapshots, and seeds. dbt ships with four generic tests built in: unique, not_null, accepted_values and relationships

In our scenario, we will use generic tests to ensure that:

  • The id field on our ga_geo_ref_data is unique and does not contain any NULL values
  • The DeviceCategory attribute should only contain a list of accepted values

Test definitions are stored in our staging directory, in a yml file called ‘schema.yml’ and once we issue dbt test command, the following output is generated, denoting all test passed successfully.

DBT Docs

One of the great features of dbt is the fact we can easily generate a fully self-documenting solution, together with a lineage graph that helps easily navigate through the nodes and understand the hierarchy. dbt provides a way to generate documentation for our dbt project and render it as a website. The documentation includes the following:

  • Information about our project: including model code, a DAG of our project, any tests we’ve added to a column, and more
  • Information about our data warehouse: including column data types, and table sizes. This information is generated by running queries against the information schema

Running ‘dbt docs generate’ command instructs dbt to compiles all relevant information about our project and warehouse into manifest.json and catalog.json files. Next, executing ‘dbt docs serve’ starts a local web server (http://127.0.0.1:8080) and allows dbt to use these JSON files to generate a local website. We can see a representation of the project structure, a markdown description for a model, and a list of all of the columns (with documentation) in the model. Additionally, we can click the green button in the bottom-right corner of the webpage to expand a ‘mini-map’ of our DAG with, relevant lineage information (click on image to expand).

Conclusion

I barely scratched the surface of dbt can do to establish a full-fledged framework for data transformations using SQL and it looks like the company is not resting on its laurels, adding more features and partnering with other vendors. From my limited time with dbt, the key benefits that allow it to stand out in the sea of other tools include:

  • Quickly and easily provide clean, transformed data ready for analysis: dbt enables data analysts to custom-write transformations through SQL SELECT statements. There is no need to write boilerplate code. This makes data transformation accessible for analysts that don’t have extensive experience in other programming languages, as the initial learning curve is quite low
  • Apply software engineering practices—such as modular code, version control, testing, and continuous integration/continuous deployment (CI/CD)—to analytics code. Continuous integration means less time testing and quicker time to development, especially with dbt Cloud. You don’t need to push an entire repository when there are necessary changes to deploy, but rather just the components that change. You can test all the changes that have been made before deploying your code into production. dbt Cloud also has integration with GitHub for automation of your continuous integration pipelines, so you won’t need to manage your own orchestration, which simplifies the process
  • Build reusable and modular code using Jinja. dbt allows you to establish macros and integrate other functions outside of SQL’s capabilities for advanced use cases. Macros in Jinja are pieces of code that can be used multiple times. Instead of starting at the raw data with every analysis, analysts instead build up reusable data models that can be referenced in subsequent work
  • Maintain data documentation and definitions within dbt as they build and develop lineage graphs: Data documentation is accessible, easily updated, and allows you to deliver trusted data across the organization. dbt automatically generates documentation around descriptions, models dependencies, model SQL, sources, and tests. dbt creates lineage graphs of the data pipeline, providing transparency and visibility into what the data is describing, how it was produced, as well as how it maps to business logic
  • Perform simplified data refreshes within dbt Cloud: There is no need to host an orchestration tool when using dbt Cloud. It includes a feature that provides full autonomy with scheduling production refreshes at whatever cadence the business wants

Obviously, dbt is not a perfect solution and some of its current shortcomings include:

  • It covers only the T of ETL, so you will need separate tools to perform Extraction and Load
  • It’s SQL based; it might offer less readability compared with tools that have an interactive UI
  • Sometimes circumstances necessitate rewriting of macros used at the backend. Overriding this standard behavior of dbt requires knowledge and expertise in handling source code
  • Integrations with many (less popular) database engines are missing or handled by community-supported adapters e.g. Microsoft SQL Server, SQLite, MySQL

All in all, I really enjoyed the multitude of features dbt carries in its arsenal and understand why it’s become the favorite horse in the race to dominate minds and hearts of analytics and data engineers. It’s a breath of fresh air, as its main focus is SQL – a novel approach in the landscape dominated by Python and Scala, it runs in the cloud and on-premises and has good external vendors’ support. Additionally, it has some bells and whistles which typically involve integrating with other 3rd party tooling e.g. tests and docs. Finally, at its core, it’s an open-source product and as such, anyone can take it for a spin and start building extensible, modular and reusable ‘plumbing’ for your next project.

Tags: , , , , , , , , ,