Data Build Tool (DBT) – The Emerging Standard For Building SQL-First Data Transformation Pipelines – Part 1

September 27th, 2022 / 4 Comments » / by admin

Note: Part 2 of this post can be found HERE.

Introduction

It’s never a dull moment when working in IT and although Data Warehousing domain was not subjected to the hamster wheel of relentless innovation in the first few decades when Oracle, IBM, Microsoft and SAP reigned supreme, with the advent of cloud computing, it too had to adapt and change. For me, the most prolific changes included the separation of storage and compute, in-database machine learning, on-demand elasticity and server-less database models. The resulting upending of the status quo also had a large impact on the good, old-fashion ETL (Extract Transform Load) paradigm which started to shift to the new, more cloud-aligned architecture and many businesses contemplating Data Warehouse modernization are jumping on the ELT bandwagon. This is also where a suite of new tools started to emerge, and one company with its flagship product started to make serious inroads in this market.

dbt (data build tool) emerged as a development framework that combines modular SQL with software engineering best practices to make data transformation reliable, fast, and fun. It makes data engineering activities accessible to people with data analyst skills to transform the data in the warehouse using simple SELECT statements, effectively creating your entire transformation process with code. You can write custom business logic using SQL, automate data quality testing, deploy the code, and deliver trusted data with a comprehensive documentation side-by-side with the code. This is more important today than ever due to the shortage of data engineering professionals in the marketplace. Anyone who knows SQL can now build production-grade data pipelines, reducing the entry barriers that previously limited staffing capabilities for legacy technologies. In short, dbt turns your data analysts into engineers and allows them to own the entire analytics engineering workflow.

dbt has two core workflows: building data models and testing data models. It fits nicely into the modern data stack and is cloud agnostic – meaning it works within each of the major cloud ecosystems: Azure, GCP, and AWS. However, the biggest advantage of dbt is its new approach to building pipelines which traditionally have been quite clunky and inefficient. Some of the most prolific issues with the standard ETL workflow are:

  • The schema within data warehouses is often strongly defined and controlled. The emphasis in ETL was therefore on getting data into the warehouse in the correct “one true” format, putting the burden on the people loading the data and making the process of getting data into the warehouse slow and fragile.
  • This warehouse and the ETL processes would usually be managed by centralized data teams. These teams would be a fairly siloed bottleneck, always behind with the needs of the business for integrating and transforming the data.
  • The ETL stacks and scripts would often be fragile, error prone, and difficult and slow to change.
  • The tools providing ETL would often be GUI based and proprietary. Not only would they be expensive to license, they would also require specialist skills. This meant that neither the producers or consumers of the data would have access to the ETL scripts or the ability to make changes to them.
  • Bringing ETL into anything which defines a software development lifecycle was tricky. For instance, the ETL process was always identified as being difficult to source control, version and test. Implementing the concept of development, test and production environments with accurate data management was also way behind the state of the art in the software development world.

With dbt, many of the above shortcomings have been addressed, improving reliability, repeatability and collaboration by breaking down organizational silos, and reducing time to market.

Environment Prep and Sample Data

Before I jump into what makes dbt such a powerful framework, I’d like to set the stage and outline the following mocked up example of building an end-to-end pipeline using modern data architecture tools to firstly acquire and load Google Analytics data into an Azure environment and finally to transform it using dbt framework. I believe that rather than installing dbt and running a few scripts to outline its core features it’s better to showcase it on a tangible mini-project which accurately reflects some of the problems many business may be grappling with. For this purpose, I will be following the below script:

  • Stand up Azure environment (using Azure Python SDK), including Azure SQL database, Azure Data Lake gen2 (ADLS) and associated Resource Group. Technically this part can be done using any cloud provider or even on-premises environment but since modern data stack tends to rely on APIs and tools available from major public cloud vendors, this is in line with more contemporary information architecture and management practices
  • Build a simple pipeline to acquire Google Analytics data and stage it in ADLS storage as well as Azure SQL database. This script can also be run as Azure Function to create automated, ETL-like process
  • Install dbt and the supporting SQL Server connector on an isolated local environment
  • Augment GA data with geocoding information to build a simple, one-table data mart using SQL and dbt Jinja templates
  • Test our data, create a snapshot using dbt functionality and generate sample project documentation

Firstly, let’s provision a sample Azure environment consisting of a dedicated resource group as well as Azure Data Lake Gen 2 and Azure SQL database.

from azure.identity import AzureCliCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.storage import StorageManagementClient
from azure.storage.filedatalake import DataLakeServiceClient
from azure.mgmt.sql import SqlManagementClient
from humanfriendly import format_timespan
from timeit import default_timer as timer
import time
import pyodbc
from os import popen


_RESOURCE_GROUP_NAME = 'gademoresourcegroup2022'
_RESOURCE_GROUP_LOCATION = 'australiaeast'
_STORAGE_ACCOUNT_NAME = 'gademostorageacct2022'
_STORAGE_CONTAINER_NAME = 'gademooutputfiles2022'
_SUBSCRIPTION_ID = 'your_subscription_id'
_DF_LINKED_SERVICE_NAME = 'lsoutputfiles'
_SQL_SERVER_NAME = 'gademosqlserver2022'
_SQL_DB_NAME = 'sourcedb'
_SQL_USERNAME = 'testusername'
_SQL_PASSWORD = 'MyV3ry$trongPa$$word'
_SQL_DRIVER = '{ODBC Driver 18 for SQL Server}'
external_IP = popen("curl -s ifconfig.me").readline()


# create resource group
def create_resource_group(resource_client, _RESOURCE_GROUP_NAME, _LOCATION):
    print("Creating Azure Resource Group {rg_name}...".format(
        rg_name=_RESOURCE_GROUP_NAME), end="", flush=True)
    try:
        resource_client.resource_groups.create_or_update(
            _RESOURCE_GROUP_NAME, {'location': _LOCATION})
    except Exception as e:
        print(e)
    rg = [g.name for g in resource_client.resource_groups.list()]
    if _RESOURCE_GROUP_NAME in rg:
        print('OK')


# create storage account in the nominated resource group
def create_storage_account(storage_client, _STORAGE_ACCOUNT_NAME, _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION):
    print("Creating Azure Storage Account {st_acct}...".format(
        st_acct=_STORAGE_ACCOUNT_NAME), end="", flush=True)
    try:
        availability_result = storage_client.storage_accounts.check_name_availability(
            {'name': _STORAGE_ACCOUNT_NAME})
        if not availability_result.name_available:
            print('storage name {st_acct} is already in use. Try another name.'.format(
                st_acct=_STORAGE_ACCOUNT_NAME))
            exit()
        poller = storage_client.storage_accounts.begin_create(_RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME,
                                                              {
                                                                  "location": _RESOURCE_GROUP_LOCATION,
                                                                  "kind": "StorageV2",
                                                                  "is_hns_enabled": "true",
                                                                  "sku": {"name": "Standard_LRS", "tier": "Standard"},
                                                                  "properties": {
                                                                      "minimumTlsVersion": "TLS1_2",
                                                                      "allowBlobPublicAccess": "true",
                                                                      "networkAcls": {
                                                                          "bypass": "AzureServices",
                                                                          "virtualNetworkRules": [],
                                                                          "ipRules": [],
                                                                          "defaultAction": "Allow"
                                                                      }
                                                                  }})
        account_result = poller.result()
        if account_result.name == _STORAGE_ACCOUNT_NAME:
            print('OK')
    except Exception as e:
        print(e)


# create storage container aka 'filesystem' in the nominated storage account
def create_adls_container(_STORAGE_ACCOUNT_NAME, _STORAGE_CONTAINER_NAME):
    print("Creating Azure Data Lake Storage Container {st_ct}...".format(
        st_ct=_STORAGE_CONTAINER_NAME), end="", flush=True)
    keys = storage_client.storage_accounts.list_keys(
        _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME)
    account_url = "https://{}.dfs.core.windows.net/".format(
        _STORAGE_ACCOUNT_NAME)
    datalake_service = DataLakeServiceClient(
        account_url=account_url, credential=keys.keys[0].value
    )
    try:
        datalake_service.create_file_system(
            file_system=_STORAGE_CONTAINER_NAME)
        file_systems = [i.name for i in datalake_service.list_file_systems()]
        if _STORAGE_CONTAINER_NAME in file_systems:
            print('OK')
    except Exception as e:
        print(e)


# create azure sql server in the nominated resource group
def create_sql_server(sql_client, _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME,
                      _RESOURCE_GROUP_LOCATION, _SQL_USERNAME, _SQL_PASSWORD):
    print("Creating Azure SQL Server {ssvr_name}...".format(
        ssvr_name=_SQL_SERVER_NAME), end="", flush=True)
    try:
        sql_server = sql_client.servers.begin_create_or_update(
            _RESOURCE_GROUP_NAME,
            _SQL_SERVER_NAME,
            {
                'location': _RESOURCE_GROUP_LOCATION,
                'version': '12.0',
                'administrator_login': _SQL_USERNAME,
                'administrator_login_password': _SQL_PASSWORD
            }
        )
        sql_server.wait()
    except Exception as e:
        print(e)
    ssvr = [i.name for i in sql_client.servers.list_by_resource_group(
        _RESOURCE_GROUP_NAME)]
    if _SQL_SERVER_NAME in ssvr:
        print('OK')


# create azure sql db in the nominated resource group
def create_sql_db(sql_client, _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME, _SQL_DB_NAME, _RESOURCE_GROUP_LOCATION):
    print("Creating Azure SQL Database {db_name}...".format(
        db_name=_SQL_DB_NAME), end="", flush=True)
    try:
        sql_db = sql_client.databases.begin_create_or_update(
            _RESOURCE_GROUP_NAME,
            _SQL_SERVER_NAME,
            _SQL_DB_NAME,
            {
                'location': _RESOURCE_GROUP_LOCATION,
                'collation': 'SQL_Latin1_General_CP1_CI_AS',
                'create_mode': 'default',
                'requested_service_objective_name': 'Basic'
            }
        )
        sql_db.wait()
    except Exception as e:
        print(e)
    dbs = [i.name for i in sql_client.databases.list_by_server(
        _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME)]
    if _SQL_DB_NAME in dbs:
        print('OK')


# configure azure sql server firewall to accept connections from the host ip address
def configure_firewall(sql_client, _SQL_DRIVER, _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME, _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD, external_IP):
    print("Configuring Azure SQL Server Firewall Settings...", end="", flush=True)
    try:
        sql_client.firewall_rules.create_or_update(
            _RESOURCE_GROUP_NAME,
            _SQL_SERVER_NAME,
            "firewall_rule_name_" + external_IP,
            {
                "startIpAddress": external_IP,
                "endIpAddress": external_IP
            }
        )
    except Exception as e:
        print(e)
    _AZURE_SQL_SERVER = _SQL_SERVER_NAME + '.database.windows.net'
    with pyodbc.connect('DRIVER='+_SQL_DRIVER+';SERVER='+_AZURE_SQL_SERVER+';PORT=1433;DATABASE='+_SQL_DB_NAME+';UID='+_SQL_USERNAME+';PWD='+_SQL_PASSWORD) as conn:
        with conn.cursor() as cursor:
            cursor.execute("SELECT @@version")
            row = cursor.fetchone()
    if row:
        print('OK')



if __name__ == '__main__':
    print("\n")
    execution_start_time = timer()
    credentials = AzureCliCredential()
    storage_client = StorageManagementClient(credentials, _SUBSCRIPTION_ID)
    resource_client = ResourceManagementClient(credentials, _SUBSCRIPTION_ID)
    sql_client = SqlManagementClient(credentials, _SUBSCRIPTION_ID)
    resource_groups = [i.name for i in resource_client.resource_groups.list()]
    if _RESOURCE_GROUP_NAME in resource_groups:
        print("Deleting existing resource group{res_gr}...".format(
            res_gr=_RESOURCE_GROUP_NAME), end="", flush=True)
        delete_async_operation = resource_client.resource_groups.begin_delete(
            _RESOURCE_GROUP_NAME)
        delete_async_operation.wait()
        print('OK')

    create_resource_group(
        resource_client, _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION)
    create_storage_account(storage_client, _STORAGE_ACCOUNT_NAME,
                           _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION)
    create_adls_container(_STORAGE_ACCOUNT_NAME, _STORAGE_CONTAINER_NAME)
    create_sql_server(sql_client, _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME,
                      _RESOURCE_GROUP_LOCATION, _SQL_USERNAME, _SQL_PASSWORD)
    create_sql_db(sql_client, _RESOURCE_GROUP_NAME,
                  _SQL_SERVER_NAME, _SQL_DB_NAME, _RESOURCE_GROUP_LOCATION)
    configure_firewall(sql_client, _SQL_DRIVER, _RESOURCE_GROUP_NAME,
                       _SQL_SERVER_NAME, _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD, external_IP)
    execution_end_time = timer()
    elapsed_duration = execution_end_time - execution_start_time
    print('Elapsed resources(s) provisioning time was {time}.\n'.format(
        time=format_timespan(elapsed_duration)))

Running the above script produces the following output, providing we have the Azure subscription set up and configured on the local environment.

Now that we have all artefacts supporting Google Analytics data acquisition in place, let’s start by defining GA attributes we’d like to source, and stage those in our data lake and SQL database. For this exercise I used data from my own website – the one you’re reading right now – and restricted it to last 30 days and the following attributes: PagePath, PageTitle, Country, City, Medium, DeviceCategory, OperatingSystem, Browser and SessionDuration. I won’t go into how to set up GA account in this post as there are countless other internet resources on this topic and most of this code is self-explanatory. The only thing that was unnecessarily frustrating and took me a while to figure out was creating a service account and providing it access to my GA view, as denoted by _GA_Service_ACCT_KEY (JSON file) and _GA_VIEW_ID variables. Getting the account and its key generated was not a problem but modifying security details so that the service account could access the view was quite convoluted. The following script is responsible for GA data acquisition, tabulating and formatting it into a Pandas data frame and inserting it into the provisioned Azure SQL DB (table object is also created/truncated as part of this code).

from googleapiclient.discovery import build
from oauth2client.service_account import ServiceAccountCredentials
from pathlib import PureWindowsPath
from azure.storage.filedatalake import DataLakeServiceClient
from azure.mgmt.storage import StorageManagementClient
from azure.identity import AzureCliCredential
import pandas as pd
import pyodbc
import time


_GA_SCOPES = ['https://www.googleapis.com/auth/analytics.readonly']
_GA_VIEW_ID = 'your_ga_view_id'
_GA_OUTPUT_FILE_NAME = 'GADataExtract-'+time.strftime("%Y%m%d-%H%M%S")+'.csv'
_GA_Service_ACCT_KEY = PureWindowsPath('C:/your_file_path/your_json_service_account_key_file.json')
_SQL_SERVER_NAME = 'gademosqlserver2022.database.windows.net'
_SQL_DB_NAME = 'sourcedb'
_SQL_USERNAME = 'testusername'
_SQL_PASSWORD = 'MyV3ry$trongPa$$word'
_SQL_DRIVER = '{ODBC Driver 18 for SQL Server}'
_RESOURCE_GROUP_NAME = 'gademoresourcegroup2022'
_STORAGE_CONTAINER_NAME = 'gademooutputfiles2022'
_STORAGE_ACCOUNT_NAME = 'gademostorageacct2022'
_STORAGE_CONTAINER_DIRECTORY_NAME = time.strftime("%Y%m%d")
_SUBSCRIPTION_ID = 'your_subscription_id'
_GA_OUTPUT_FILE_PATH = PureWindowsPath('C:/your_file_path/{file_name}'.format(
        file_name=_GA_OUTPUT_FILE_NAME))
_SCHEMAS = ['stg', 'mart']

# get Google Analytics service account credentials
def initialize_analyticsreporting():
    credentials = ServiceAccountCredentials.from_json_keyfile_name(
        _GA_Service_ACCT_KEY, _GA_SCOPES)
    analytics = build('analyticsreporting', 'v4', credentials=credentials)
    return analytics


def get_report(analytics):
    return analytics.reports().batchGet(
        body={
            'reportRequests': [
                {
                    'viewId': _GA_VIEW_ID,
                    'dateRanges': [{'startDate': '30daysAgo', 'endDate': 'today'}],
                    'metrics': [{'expression': 'ga:sessions'}],
                    'dimensions': [{"name": "ga:pagePath"}, {"name": "ga:pageTitle"}, {"name": "ga:country"}, {"name": "ga:city"}, {"name": "ga:medium"}, {"name": "ga:deviceCategory"}, {"name": "ga:operatingSystem"}, {"name": "ga:browser"}],
                    'orderBys': [{"fieldName": "ga:sessions", "sortOrder": "DESCENDING"}],
                    'pageSize': 1000
                }]
        }
    ).execute()


def ga_response_dataframe(response):
    row_list = []
    for report in response.get('reports', []):
        column_header = report.get('columnHeader', {})
        dimension_headers = column_header.get('dimensions', [])
        metric_headers = column_header.get(
            'metricHeader', {}).get('metricHeaderEntries', [])
        for row in report.get('data', {}).get('rows', []):
            row_dict = {}
            dimensions = row.get('dimensions', [])
            date_range_values = row.get('metrics', [])

            for header, dimension in zip(dimension_headers, dimensions):
                row_dict[header] = dimension

            for i, values in enumerate(date_range_values):
                for metric, value in zip(metric_headers, values.get('values')):
                    if ',' in value or '.' in value:
                        row_dict[metric.get('name')] = float(value)
                    else:
                        row_dict[metric.get('name')] = int(value)

            row_list.append(row_dict)
    return pd.DataFrame(row_list)


# upload a file to data lake in Azure
def upload_file_to_lake(storage_client, ga_file_content, _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME, _STORAGE_CONTAINER_NAME, _STORAGE_CONTAINER_DIRECTORY_NAME, _GA_OUTPUT_FILE_PATH, _GA_OUTPUT_FILE_NAME):
    keys = storage_client.storage_accounts.list_keys(
        _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME)
    account_url = "https://{}.dfs.core.windows.net/".format(
        _STORAGE_ACCOUNT_NAME)
    service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
        "https", _STORAGE_ACCOUNT_NAME), credential=keys.keys[0].value)
    file_system_client = service_client.get_file_system_client(
        file_system=_STORAGE_CONTAINER_NAME)
    dir_client = file_system_client.get_directory_client(
        _STORAGE_CONTAINER_DIRECTORY_NAME)
    dir_client.create_directory()
    file_client = dir_client.create_file(_GA_OUTPUT_FILE_NAME)
    file_client.append_data(ga_file_content, 0, len(ga_file_content))
    file_client.flush_data(len(ga_file_content))


# create required database schemas
def create_stg_schema(_SQL_DRIVER, _SQL_SERVER_NAME, _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD, _SCHEMAS):
    with pyodbc.connect('DRIVER='+_SQL_DRIVER+';SERVER='+_SQL_SERVER_NAME+';PORT=1433;DATABASE='+_SQL_DB_NAME+';UID='+_SQL_USERNAME+';PWD=' + _SQL_PASSWORD) as conn:
        with conn.cursor() as cursor:
            for schema in _SCHEMAS:
                cursor.execute('''IF (NOT EXISTS (SELECT TOP 1 (1) FROM sys.schemas WHERE name = '{schema}')) 
                                BEGIN
                                    EXEC ('CREATE SCHEMA [{schema}] AUTHORIZATION [dbo]')
                                END'''.format(schema=schema))


# create required objects in the nomainated database and populate with data
def insert_into_azuresql(ga_data, _SQL_DRIVER, _SQL_SERVER_NAME, _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD):
    with pyodbc.connect('DRIVER='+_SQL_DRIVER+';SERVER='+_SQL_SERVER_NAME+';PORT=1433;DATABASE='+_SQL_DB_NAME+';UID='+_SQL_USERNAME+';PWD=' + _SQL_PASSWORD) as conn:
        with conn.cursor() as cursor:
            if not cursor.tables(table='ga_data', tableType='TABLE').fetchone():
                cursor.execute('''CREATE TABLE dbo.ga_data (ID INT IDENTITY (1,1),
                                                            PagePath NVARCHAR(1024),
                                                            PageTitle NVARCHAR (2048),
                                                            Country NVARCHAR (256),
                                                            City NVARCHAR (256),
                                                            Medium NVARCHAR (256),
                                                            DeviceCategory NVARCHAR (512),
                                                            OperatingSystem VARCHAR (128),
                                                            Browser NVARCHAR (256),
                                                            SessionDuration INT)''')  
                cursor.commit()
            for index, row in ga_data.iterrows():
                cursor.execute('''INSERT INTO dbo.ga_data
                                    (PagePath,
                                    PageTitle,
                                    Country,
                                    City,
                                    Medium,
                                    DeviceCategory,
                                    OperatingSystem,
                                    Browser,
                                    SessionDuration)
                          values (?, ?, ?, ?, ?, ?, ?, ?, ?)''',
                               row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8])
                cursor.commit()
            cursor.execute('SELECT TOP (1) 1 FROM dbo.ga_data')
            rows = cursor.fetchone()
            if rows:
                print('All Good!')
            else:
                raise ValueError(
                    'No data generated in the source table. Please troubleshoot!'
                )


def main():
    credentials = AzureCliCredential()
    storage_client = StorageManagementClient(credentials, _SUBSCRIPTION_ID)
    analytics = initialize_analyticsreporting()
    response = get_report(analytics)
    df = ga_response_dataframe(response)
    df.columns = [x.replace(':', '_') for x in df.columns]
    ga_data = df.replace('(none)', '').replace(
        '(not set)', '')
    ga_data.to_csv(_GA_OUTPUT_FILE_NAME, index=False)
    with open(_GA_OUTPUT_FILE_PATH) as file:
        ga_file_content = file.read()
    upload_file_to_lake(storage_client, ga_file_content, _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME,
                        _STORAGE_CONTAINER_NAME, _STORAGE_CONTAINER_DIRECTORY_NAME, _GA_OUTPUT_FILE_PATH, _GA_OUTPUT_FILE_NAME)
    create_stg_schema(_SQL_DRIVER, _SQL_SERVER_NAME,
                      _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD, _SCHEMAS)
    insert_into_azuresql(ga_data, _SQL_DRIVER, _SQL_SERVER_NAME,
                         _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD)
    
if __name__ == '__main__':
    main()

When executed, our Azure SQL DB table is created (if it does not exist) and GA data inserted into the ADLS container and the aforementioned table (click on image to enlarge).

Now that we have our GA data, lets launch into how dbt can help us with shaping it by building our sample ELT pipeline.

DBT Installation

Installing dbt is a fairly straightforward affair. One can also go with a Docker container or WSL approach but in this post I’ll outline the steps to perform a local installation in a Python virtual environment on a Windows system. Providing Python is already installed, let’s go ahead and set up our virtual environment – a self-contained Python installation.

python -m venv dbt_env
\dbt_env\Scripts\activate.ps1

Once set up and activated, we can install dbt using pip and install dbt-sqlserver adapter as dbt does not support SQL Server out-of-the-box. You can find the official GitHub repo for it will all the supporting documentation in the following LINK.

pip install dbt-core
pip install dbt-sqlserver

Next, we will initialize our demo dbt project using dbt init command and provide a new name for the project we’re creating (in this case it’s azure_sql_demo). Please also note that newer versions of dbt only allow for lower case letters and underscores being used when specifying the project name.

As we can see from the output, a sample profiles.yml file containing placeholder configuration was created. Depending on which option we selected i.e. (1) for BigQuery, (2) for PostgreSQL etc., the default profiles.yml file contains only generic properties or get created in an empty state. This will need to be amended to reflect our Azure SQL Server environment details e.g. user name, password, driver etc. As profiles.yml file contains database connections and credentials (sensitive information), it is generated in the ~/.dbt/ folder and not the project folder. On the other hand, the configuration file, the main file defining settings which apply to the whole project called dbt_project.yml, contains placeholders for development and production environment. Let’s go ahead populate profiles.yml file with the required information, ensuring that the profile name from dbt_project.yml matches that from profiles.yml file.

azure_sql_demo:
    target: dev
    outputs:
        dev:
            type: sqlserver
            driver: SQL Server
            server: demosqlserver2022.database.windows.net
            database: sourcedb
            port: 1433
            schema: stg
            user: testusername
            password: MyV3ry$trongPa$$word

Finally, we can check our target database connectivity to ensure all the parameters have been entered correctly by running dbt debug command as per below. It’s a good sign if you see no error messages at this point and all the critical outputs (color-coded in green) are telling us that all checks have passed.

In the next part of this post I will dive deeper in the functionality dbt provides out of the box and some of its features, for example, snapshots, tests, docs and more. You can view part 2 of this series HERE.

Tags: , , , , , ,

Tackling Data-Intensive Workloads with Azure Batch – Elastic, On-Demand and Cloud-Scale Compute For The Masses

December 22nd, 2021 / 3 Comments » / by admin

Introduction

As a data domain architect (amongst many other hats I wear these days), I sometimes find it difficult to recommend a public cloud service which can fulfill all my clients’ needs without any compromises and drawbacks. Bespoke development is always an option but with the speed at which this industry is moving, chances are that someone has already encountered and solved the problem using off-the-shelf technology. As cloud vendors learn to adapt and expand on their offering, the lines between different methodologies and tools to accomplish the same result have become more blurred and it’s not always easy to discern which is the cheapest, most performant, quickest to implement etc. Take compute as an example – gone are the days where buying dedicated hardware was the only option as virtualization, containerization and serverless have become the de facto standards for many workloads. Stateless architecture is the new norm and there is no going back unless you have deep pockets and require a deep level of isolation.

Using Microsoft Azure one can conduct both: large scale, batch-style, parallelized high-performance computing (HPC) or, in case of lightweight, ‘chatty’ rather than ‘chunky’ workloads, end users can opt for more scalable, elastic and ops-free services e.g. Azure Logic Apps, Azure Functions etc. The choices are plentiful and chances are Azure has all your compute needs covered. In one of my previous posts HERE I already outlined how to implement Azure Functions so in this post I will turn my attention to the more ‘heavy-duty’ service offering – Azure Batch.

To adequately define how Azure Batch differs from other compute services offered by Microsoft on their public cloud we will build a small POC used to generate TPC-DS benchmark data. For a more detailed primer on how to generate TPC-DS data please visit my previous post HERE, so to keep things concise I will only skim over what TPC-DS benchmark is. TPC-DS utility is mainly used for mock data (flat files) generation which in turn is used for big data benchmarking. As creating those files is a time-consuming and compute-intensive process, particularly when specifying large scale factors, we will try to use Azure Batch to expedite this process. To achieve this using Azure Python SDK and Azure Batch service, we will create all Azure infrastructure scaffolding, clone TPC-DS repository containing the dsdgen utility used to create the aforementioned flat files, run the data generating process using a simple bash command and finally upload the files created into our blob storage account.

The following is high-level Azure Batch service overview from Microsoft, outlining the intended purpose and some of its functionality.

Azure Batch Service Quick Primer

Azure Batch is used to run large-scale parallel and high-performance computing (HPC) batch jobs efficiently in Azure. Azure Batch creates and manages a pool of compute nodes (virtual machines), installs the applications you want to run, and schedules jobs to run on the nodes. There’s no cluster or job scheduler software to install, manage, or scale. Instead, you use Batch APIs and tools, command-line scripts, or the Azure portal to configure, manage, and monitor your jobs.

Developers can use Batch as a platform service to build SaaS applications or client apps where large-scale execution is required. Those applications can involve VFX and 3D image rendering, image analysis and processing, genetic sequence analysis or even data ingestion, processing, and ETL operations.

Batch works well with intrinsically parallel (also known as ‘embarrassingly parallel’) workloads. These workloads have applications which can run independently, with each instance completing part of the work. When the applications are executing, they might access some common data, but they do not communicate with other instances of the application. Intrinsically parallel workloads can therefore run at a large scale, determined by the amount of compute resources available to run applications simultaneously. Some examples of intrinsically parallel workloads you can bring to Batch include financial risk modelling, software tests execution or media transcoding. You can also use Batch to execute tightly coupled workloads, where the applications you run need to communicate with each other, rather than running independently. Tightly coupled applications normally use the Message Passing Interface (MPI) API. You can run your tightly coupled workloads with Batch using Microsoft MPI or Intel MPI. Improve application performance with specialized HPC and GPU-optimized VM sizes e.g. fluid dynamics or multi-node AI training.

Batch Application Example – Generating TPC-DS Benchmark Data

In the following example I will outline how one can use the power of Batch service and associated compute capacity to generate TPC-DS benchmark data. You can find a lot of literature and blog posts describing this process in detail on the internet (including my blog post HERE) so I will skip over the nitty gritty. The only thing worth mentioning is that the utility used to create flat files used to load those into the TPC-DS database schema is both compute and time-intensive and therefore lends itself to being executed on the high-performing compute instances. This is particularly applicable when the scaling factor indicating the volume of data generated is set to a high number e.g. 10s of terabytes. Using Python SDK for Azure we will provision a pool of Batch compute nodes (virtual machines), create a job that runs tasks to generate output files in the pool using shell commands and dsdgen utility and finally upload the newly created files to the nominated Azure storage account using between one and four compute nodes.

The following image depicts a high-level architecture behind this process.

With that in mind, let’s start by creating the underlying infrastructure on Azure. The following Python script is responsible for creating Azure Resource Group, Storage Account and Storage Container. The Storage Container will be used in the subsequent script to house flat files generated by the dsdgen utility (part of TPC-DS benchmark suite of tools) as blobs.

from azure.identity import AzureCliCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.storage import StorageManagementClient
from azure.storage.blob import BlobServiceClient
from msrestazure.azure_exceptions import CloudError


_RESOURCE_GROUP_NAME = 'batchtestresourcegroup'
_RESOURCE_GROUP_LOCATION = 'australiasoutheast'
_STORAGE_ACCOUNT_NAME = 'batchdemo2021'
_STORAGE_CONTAINER_NAME = 'outputfiles'
_SUBSCRIPTION_ID = 'YourAzureSubscriptionID'


# Create resource group
def create_resource_group(resource_client, _RESOURCE_GROUP_NAME, _LOCATION):
    print("\nCreating Azure Resource Group {rg_name}...".format(
        rg_name=_RESOURCE_GROUP_NAME), end="", flush=True)
    try:
        resource_client.resource_groups.create_or_update(
            _RESOURCE_GROUP_NAME, {'location': _LOCATION})
    except CloudError as e:
        print(e)
    rg = [g.name for g in resource_client.resource_groups.list()]
    if _RESOURCE_GROUP_NAME in rg:
        print('OK')


# Create storage account in the nominated resource group
def create_storage_account(storage_client, _STORAGE_ACCOUNT_NAME, _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION):
    print("Creating Azure Storage Account {st_acct}...".format(
        st_acct=_STORAGE_ACCOUNT_NAME), end="", flush=True)
    try:
        availability_result = storage_client.storage_accounts.check_name_availability(
            {'name': _STORAGE_ACCOUNT_NAME})
        if not availability_result.name_available:
            print('storage name {st_acct} is already in use. Try another name.'.format(
                st_acct=_STORAGE_ACCOUNT_NAME))
            exit()
        poller = storage_client.storage_accounts.begin_create(_RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME,
                                                              {
                                                                  "location": _RESOURCE_GROUP_LOCATION,
                                                                  "kind": "StorageV2",
                                                                  "sku": {"name": "Standard_LRS"}
                                                              })
        account_result = poller.result()
        if account_result.name == _STORAGE_ACCOUNT_NAME:
            print('OK')
    except CloudError as e:
        print(e)


# Create storage container in the nominated resource group
def create_blob_storage(storage_client, _STORAGE_CONTAINER_NAME, _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME):
    print("Creating Azure Storage Container {st_blob}...".format(
        st_blob=_STORAGE_CONTAINER_NAME), end="", flush=True)
    storage_client.blob_containers.create(
        _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME, _STORAGE_CONTAINER_NAME, {})
    keys = storage_client.storage_accounts.list_keys(
        _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME)
    conn_string = f"DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName={_STORAGE_ACCOUNT_NAME};AccountKey={keys.keys[0].value}"
    blob_service = BlobServiceClient.from_connection_string(
        conn_str=conn_string)
    containers = [i.name for i in blob_service.list_containers()]
    if _STORAGE_CONTAINER_NAME in containers:
        print('OK\n')


if __name__ == '__main__':
    credential = AzureCliCredential()
    storage_client = StorageManagementClient(credential, _SUBSCRIPTION_ID)
    resource_client = ResourceManagementClient(credential, _SUBSCRIPTION_ID)

    create_resource_group(
        resource_client, _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION)
    create_storage_account(storage_client, _STORAGE_ACCOUNT_NAME,
                           _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION)
    create_blob_storage(storage_client, _STORAGE_CONTAINER_NAME,
                        _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME)

Once the supporting infrastructure is in place we can go ahead and create Azure Batch account. The easiest way to do it is by logging into Azure Portal and from the ‘Create a Resource’ blade select Batch Service as per the image below.

When completed, the three values we will need are a Storage Account Name, a Storage Account Key and finally a Storage Container Name. These values will need to be specified in the below script before executing.

Next, let’s look at the final snippet of code used to stand up Azure Batch pool, create the Job and associated Tasks and finally do all the heavy lifting regarding TPC-DS data generation and transfer into the nominated storage container. The below script, implemented mainly as a collection of functions for readability, is responsible for the following:

  • Create pool resource running on a Linux (Ubuntu) server with a single node. A pool is the collection of nodes that applications runs on. A node is an Azure virtual machine (VM) or cloud service VM that is dedicated to processing a portion of the application’s workload. The size of a node determines the number of CPU cores, memory capacity, and local file system size that is allocated to the node. On node creation, a pre-deployment shell scrip is run to install required packages, download the azcopy utility (used for files transfer), cloning Github repo containing TPC-DS tools and compiling it in the destination directory.
  • Create a Batch job. A job is a collection of tasks. It manages how computation is performed by its tasks on the compute nodes in a pool.
  • Create a series of tasks (defined by the number of flat files/tables specified in the ‘export_files’ variable). A task is a unit of computation that is associated with a job. It runs on a node. Tasks are assigned to a node for execution, or are queued until a node becomes free. Put simply, a task runs one or more programs or scripts on a compute node to perform the work you need done. In this demo each task is responsible for executing the dsdgen utility for the specified table/file and copying the output into the nominated storage account.
  • Monitor the progress of the instantiated job.
  • Delete the pool and the job on tasks completion. Deleting job also deletes individual tasks assigned to it.

To highlight some of the more important sections we can see that the lines 105-112 and 156-165 (see highlighted sections) are where the shell commands are defined. The first section installs required TPC-DS repo libraries, downloads azcopy utility and compiles it. There is also an interesting use of $AZ_BATCH_NODE_SHARED_DIR runtime variable which denotes the full path of the shared directory on the node. All tasks that execute on the node have read/write access to this directory. Moving on to the second section, the shell script executes dsdgen utility with a scaling factor of 1 (1 GB), outputting flat files into $AZ_BATCH_TASK_WORKING_DIR location. This variable denotes the full path of the task working directory on the node. This section also triggers the azcopy utility used for transferring newly created flat files into the Azure blob storage container.

from datetime import datetime, timedelta
from timeit import default_timer as timer
from humanfriendly import format_timespan
import os
import itertools
import sys
import time

from azure.storage.blob import BlobServiceClient, generate_container_sas, AccountSasPermissions
import azure.batch._batch_service_client as batch
import azure.batch.batch_auth as batch_auth
import azure.batch.models as batchmodels


_BATCH_ACCOUNT_NAME = 'batchdemo'
_BATCH_ACCOUNT_KEY = 'YourBatchAccountKey'
_BATCH_ACCOUNT_URL = 'https://batchdemo.australiasoutheast.batch.azure.com'

_STORAGE_ACCOUNT_NAME = 'batchdemo2021'
_STORAGE_ACCOUNT_KEY = 'YourStorageAccountKey'
_STORAGE_CONTAINER_NAME = 'outputfiles'


_POOL_ID = 'PythonTutorialPool'
_POOL_NODE_COUNT = 1
_POOL_VM_SIZE = 'Standard_A1_v2'  # Standard_A2_v2, Standard_A4_v2, Standard_A8_v2
_NODE_OS_PUBLISHER = 'Canonical'
_NODE_OS_OFFER = 'UbuntuServer'
_NODE_OS_SKU = '18.04-LTS'
_JOB_ID = 'AzureBatchPythonDemoJob'


export_files = ['call_center.dat',
                'catalog_page.dat',
                'catalog_sales.dat',
                'customer.dat',
                'customer_address.dat',
                'customer_demographics.dat',
                'income_band.dat',
                'inventory.dat',
                'item.dat',
                'promotion.dat',
                'reason.dat',
                'ship_mode.dat',
                'store.dat',
                'store_sales.dat',
                'time_dim.dat',
                'warehouse.dat',
                'web_page.dat',
                'web_sales.dat',
                'web_site.dat']

# Prompt the user for yes/no input, displaying the specified question text
def query_yes_no(question, default="yes"):
    valid = {'y': 'yes', 'n': 'no'}
    if default is None:
        prompt = ' [y/n] '
    elif default == 'yes':
        prompt = ' [Y/n] '
    elif default == 'no':
        prompt = ' [y/N] '
    else:
        raise ValueError("Invalid default answer: '{}'".format(default))

    while 1:
        choice = input(question + prompt).lower()
        if default and not choice:
            return default
        try:
            return valid[choice[0]]
        except (KeyError, IndexError):
            print("Please respond with 'yes' or 'no' (or 'y' or 'n').\n")


# Wrap cmd/bash command in a shell
def wrap_commands_in_shell(ostype, commands):
    if ostype.lower() == 'linux':
        return '/bin/bash -c \'set -e; set -o pipefail; {}; wait\''.format(
            ';'.join(commands))
    elif ostype.lower() == 'windows':
        return 'cmd.exe /c "{}"'.format('&'.join(commands))
    else:
        raise ValueError('unknown ostype: {}'.format(ostype))


# Print the contents of the specified Batch exception
def print_batch_exception(batch_exception):
    print('-------------------------------------------')
    print('Exception encountered:')
    if batch_exception.error and \
            batch_exception.error.message and \
            batch_exception.error.message.value:
        print(batch_exception.error.message.value)
        if batch_exception.error.values:
            print()
            for mesg in batch_exception.error.values:
                print('{}:\t{}'.format(mesg.key, mesg.value))
    print('-------------------------------------------')


# Create a pool of compute nodes with the specified OS settings
def create_pool(batch_client, _POOL_ID, _POOL_NODE_COUNT, _POOL_VM_SIZE):
    print('\nCreating pool {poolid} (allocated VM size is: {vm})...'.format(
        poolid=_POOL_ID, vm=_POOL_VM_SIZE))
    task_commands = [' sudo apt-get update',
                     ' sudo apt-get install -y gcc make flex bison byacc git',
                     ' cd $AZ_BATCH_NODE_SHARED_DIR',
                     ' wget -O azcopy_v10.tar.gz https://aka.ms/downloadazcopy-v10-linux',
                     ' tar -xf azcopy_v10.tar.gz --strip-components=1',
                     ' git clone https://github.com/gregrahn/tpcds-kit.git',
                     ' cd tpcds-kit/tools',
                     ' make OS=LINUX']
    user = batchmodels.AutoUserSpecification(
        scope=batchmodels.AutoUserScope.pool,
        elevation_level=batchmodels.ElevationLevel.admin)
    new_pool = batch.models.PoolAddParameter(
        id=_POOL_ID,
        virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
            image_reference=batchmodels.ImageReference(
                publisher=_NODE_OS_PUBLISHER,
                offer=_NODE_OS_OFFER,
                sku=_NODE_OS_SKU,
                version="latest"
            ),
            node_agent_sku_id="batch.node.ubuntu 18.04"),
        vm_size=_POOL_VM_SIZE,
        target_dedicated_nodes=_POOL_NODE_COUNT,
        start_task=batch.models.StartTask(
            command_line=wrap_commands_in_shell('linux',
                                                task_commands),
            user_identity=batchmodels.UserIdentity(auto_user=user),
            wait_for_success=True)
    )
    batch_client.pool.add(new_pool)


# Create a job with the specified ID, associated with the specified pool.
def create_job(batch_client, _JOB_ID, _POOL_ID):

    print('Creating job {}...'.format(_JOB_ID))

    job = batch.models.JobAddParameter(
        id=_JOB_ID,
        pool_info=batch.models.PoolInformation(pool_id=_POOL_ID))
    batch_client.job.add(job)


# Add a task for each input file in the collection to the specified job
def add_tasks(batch_client, output_container_sas_token, export_files, _JOB_ID, _STORAGE_CONTAINER_NAME, _STORAGE_ACCOUNT_NAME):
    print('Adding {} tasks to job {}...'.format(len(export_files), _JOB_ID))
    tasks = list()
    output_files_path = '$AZ_BATCH_TASK_WORKING_DIR/TPCDS-DATA'
    tpcds_utilility_path = '$AZ_BATCH_NODE_SHARED_DIR/tpcds-kit/tools'
    azcopy_utility_path = '$AZ_BATCH_NODE_SHARED_DIR'
    for idx, file_name in enumerate(export_files):
        command = "/bin/bash -c \"mkdir {fpath} ".format(
            fpath=output_files_path)
        command += " && cd {upath} ".format(upath=tpcds_utilility_path)
        command += " && ./dsdgen -SCALE 1 -TABLE {tname} -DIR {fpath}".format(
            fpath=output_files_path, tname=os.path.splitext(file_name)[0])
        command += " && cd {az} ".format(az=azcopy_utility_path)
        command += " && sudo ./azcopy copy \"{opath}/TPCDS-DATA/{fname}\"".format(
            opath=output_files_path, fname=file_name)
        command += " \"https://{sacct}.blob.core.windows.net/{scontainer}/?{sastoken}\" \"".format(
            sacct=_STORAGE_ACCOUNT_NAME, scontainer=_STORAGE_CONTAINER_NAME, sastoken=output_container_sas_token)
        tasks.append(batch.models.TaskAddParameter(
            id='Task{}'.format(idx),
            command_line=command
        )
        )
    batch_client.task.add_collection(_JOB_ID, tasks)


# Return when all tasks in the specified job reach the Completed state.
def wait_for_tasks_to_complete(batch_service_client, job_id, timeout):
    timeout_expiration = datetime.now() + timeout
    spinner = itertools.cycle(['-', '/', '|', '\\'])
    print("Monitoring all tasks for 'Completed' state, timeout in {}..."
          .format(timeout), end='')
    while datetime.now() < timeout_expiration:
        sys.stdout.write(next(spinner))
        sys.stdout.flush()
        tasks = batch_service_client.task.list(job_id)

        incomplete_tasks = [task for task in tasks if
                            task.state != batchmodels.TaskState.completed]
        if not incomplete_tasks:
            print()
            return True
        else:
            sys.stdout.write('\b')
            time.sleep(1)
    raise RuntimeError("ERROR: Tasks did not reach 'Completed' state within "
                       "timeout period of " + str(timeout))


if __name__ == '__main__':
    batch_execution_start_time = timer()
    # Create a Batch service client with associated credentails
    credentials = batch_auth.SharedKeyCredentials(_BATCH_ACCOUNT_NAME,
                                                  _BATCH_ACCOUNT_KEY)
    batch_client = batch.BatchServiceClient(
        credentials,
        batch_url=_BATCH_ACCOUNT_URL)

    # Create SAS (Shared Access Signature) token
    output_container_sas_token = 
        generate_container_sas(account_name=_STORAGE_ACCOUNT_NAME,
        container_name=_STORAGE_CONTAINER_NAME,
        account_key=_STORAGE_ACCOUNT_KEY,
        permission=AccountSasPermissions(
        write=True),
        expiry=datetime.utcnow() + timedelta(hours=1))
    try:
        # Create the pool that will contain the compute nodes that will execute the tasks.
        create_pool(batch_client, _POOL_ID, _POOL_NODE_COUNT, _POOL_VM_SIZE)

        # Create the job that will run the tasks.
        create_job(batch_client, _JOB_ID, _POOL_ID)

        # Add the tasks to the job.
        add_tasks(batch_client, output_container_sas_token,
                  export_files, _JOB_ID, _STORAGE_CONTAINER_NAME, _STORAGE_ACCOUNT_NAME)

        # Pause execution until tasks reach Completed state.
        wait_for_tasks_to_complete(batch_client,
                                   _JOB_ID,
                                   timedelta(minutes=20))
    except batchmodels.BatchErrorException as err:
        print_batch_exception(err)
        raise
    batch_execution_end_time = timer()
    elapsed_duration = batch_execution_end_time - batch_execution_start_time
    print('Elapsed batch processing time was {time}.'.format(
        time=format_timespan(elapsed_duration)))

    # Clean up Batch resources (if the user so chooses).
    if query_yes_no('Delete job?') == 'yes':
        batch_client.job.delete(_JOB_ID)
    if query_yes_no('Delete pool?') == 'yes':
        batch_client.pool.delete(_POOL_ID)

When executed, we can view the progress status, results and additional high-level information for individual tasks in Azure Portal (click on image to enlarge).

I also run the script across multiple compute instance sizes to see what difference processing time will make depending on the selected VM type. The following is a table outlying total processing time (including Pool node creation) across 4 different node sizes and 3 image types.

Looking at those times, it’s evident that the higher number of nodes and the larger the VM size was, the quicker the application executed. In my case it wasn’t linear and only small gains were achieved by increasing the compute capacity, most likely due to significant amount of time dedicated exclusively to VM provisioning. As such, I would imagine that for workloads spanning tens of minutes or even hours, the balance would even out proportionally to the VM size and node count.

Conclusion

The example I outlined is only a small nod to the breath of capabilities and potential use-cases Azure Batch can provide – I haven’t even outlined features such as auto-scaling or using low-priority/spot VMs. The number of applications where on-demand and scalable compute power is required is limitless and almost any domain or project can benefit from it. Additionally, one can extend these capabilities and link it up with other Azure services to build more bespoke solutions e.g. in this context, using Azure Data Factory the TPC-DS process can be executed multiple times, each run generating data for different scaling factor and loading those into a database schema for further testing and evaluation. Similarly, the internet is full of interesting and creative posts of other engineers using Batch to help them solve intricate problems, which otherwise would not see the light of day (mainly due to large upfront investment in hardware). It’s amazing how many of today’s cloud services excel at obfuscating complex and nuanced paradigms for simple implementations, saving time and money. Azure Batch is one of them.

Tags: , , , ,