Tackling Data-Intensive Workloads with Azure Batch – Elastic, On-Demand and Cloud-Scale Compute For The Masses

December 22nd, 2021 / 3 Comments » / by admin

Introduction

As a data domain architect (amongst many other hats I wear these days), I sometimes find it difficult to recommend a public cloud service which can fulfill all my clients’ needs without any compromises and drawbacks. Bespoke development is always an option but with the speed at which this industry is moving, chances are that someone has already encountered and solved the problem using off-the-shelf technology. As cloud vendors learn to adapt and expand on their offering, the lines between different methodologies and tools to accomplish the same result have become more blurred and it’s not always easy to discern which is the cheapest, most performant, quickest to implement etc. Take compute as an example – gone are the days where buying dedicated hardware was the only option as virtualization, containerization and serverless have become the de facto standards for many workloads. Stateless architecture is the new norm and there is no going back unless you have deep pockets and require a deep level of isolation.

Using Microsoft Azure one can conduct both: large scale, batch-style, parallelized high-performance computing (HPC) or, in case of lightweight, ‘chatty’ rather than ‘chunky’ workloads, end users can opt for more scalable, elastic and ops-free services e.g. Azure Logic Apps, Azure Functions etc. The choices are plentiful and chances are Azure has all your compute needs covered. In one of my previous posts HERE I already outlined how to implement Azure Functions so in this post I will turn my attention to the more ‘heavy-duty’ service offering – Azure Batch.

To adequately define how Azure Batch differs from other compute services offered by Microsoft on their public cloud we will build a small POC used to generate TPC-DS benchmark data. For a more detailed primer on how to generate TPC-DS data please visit my previous post HERE, so to keep things concise I will only skim over what TPC-DS benchmark is. TPC-DS utility is mainly used for mock data (flat files) generation which in turn is used for big data benchmarking. As creating those files is a time-consuming and compute-intensive process, particularly when specifying large scale factors, we will try to use Azure Batch to expedite this process. To achieve this using Azure Python SDK and Azure Batch service, we will create all Azure infrastructure scaffolding, clone TPC-DS repository containing the dsdgen utility used to create the aforementioned flat files, run the data generating process using a simple bash command and finally upload the files created into our blob storage account.

The following is high-level Azure Batch service overview from Microsoft, outlining the intended purpose and some of its functionality.

Azure Batch Service Quick Primer

Azure Batch is used to run large-scale parallel and high-performance computing (HPC) batch jobs efficiently in Azure. Azure Batch creates and manages a pool of compute nodes (virtual machines), installs the applications you want to run, and schedules jobs to run on the nodes. There’s no cluster or job scheduler software to install, manage, or scale. Instead, you use Batch APIs and tools, command-line scripts, or the Azure portal to configure, manage, and monitor your jobs.

Developers can use Batch as a platform service to build SaaS applications or client apps where large-scale execution is required. Those applications can involve VFX and 3D image rendering, image analysis and processing, genetic sequence analysis or even data ingestion, processing, and ETL operations.

Batch works well with intrinsically parallel (also known as ‘embarrassingly parallel’) workloads. These workloads have applications which can run independently, with each instance completing part of the work. When the applications are executing, they might access some common data, but they do not communicate with other instances of the application. Intrinsically parallel workloads can therefore run at a large scale, determined by the amount of compute resources available to run applications simultaneously. Some examples of intrinsically parallel workloads you can bring to Batch include financial risk modelling, software tests execution or media transcoding. You can also use Batch to execute tightly coupled workloads, where the applications you run need to communicate with each other, rather than running independently. Tightly coupled applications normally use the Message Passing Interface (MPI) API. You can run your tightly coupled workloads with Batch using Microsoft MPI or Intel MPI. Improve application performance with specialized HPC and GPU-optimized VM sizes e.g. fluid dynamics or multi-node AI training.

Batch Application Example – Generating TPC-DS Benchmark Data

In the following example I will outline how one can use the power of Batch service and associated compute capacity to generate TPC-DS benchmark data. You can find a lot of literature and blog posts describing this process in detail on the internet (including my blog post HERE) so I will skip over the nitty gritty. The only thing worth mentioning is that the utility used to create flat files used to load those into the TPC-DS database schema is both compute and time-intensive and therefore lends itself to being executed on the high-performing compute instances. This is particularly applicable when the scaling factor indicating the volume of data generated is set to a high number e.g. 10s of terabytes. Using Python SDK for Azure we will provision a pool of Batch compute nodes (virtual machines), create a job that runs tasks to generate output files in the pool using shell commands and dsdgen utility and finally upload the newly created files to the nominated Azure storage account using between one and four compute nodes.

The following image depicts a high-level architecture behind this process.

With that in mind, let’s start by creating the underlying infrastructure on Azure. The following Python script is responsible for creating Azure Resource Group, Storage Account and Storage Container. The Storage Container will be used in the subsequent script to house flat files generated by the dsdgen utility (part of TPC-DS benchmark suite of tools) as blobs.

from azure.identity import AzureCliCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.storage import StorageManagementClient
from azure.storage.blob import BlobServiceClient
from msrestazure.azure_exceptions import CloudError


_RESOURCE_GROUP_NAME = 'batchtestresourcegroup'
_RESOURCE_GROUP_LOCATION = 'australiasoutheast'
_STORAGE_ACCOUNT_NAME = 'batchdemo2021'
_STORAGE_CONTAINER_NAME = 'outputfiles'
_SUBSCRIPTION_ID = 'YourAzureSubscriptionID'


# Create resource group
def create_resource_group(resource_client, _RESOURCE_GROUP_NAME, _LOCATION):
    print("\nCreating Azure Resource Group {rg_name}...".format(
        rg_name=_RESOURCE_GROUP_NAME), end="", flush=True)
    try:
        resource_client.resource_groups.create_or_update(
            _RESOURCE_GROUP_NAME, {'location': _LOCATION})
    except CloudError as e:
        print(e)
    rg = [g.name for g in resource_client.resource_groups.list()]
    if _RESOURCE_GROUP_NAME in rg:
        print('OK')


# Create storage account in the nominated resource group
def create_storage_account(storage_client, _STORAGE_ACCOUNT_NAME, _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION):
    print("Creating Azure Storage Account {st_acct}...".format(
        st_acct=_STORAGE_ACCOUNT_NAME), end="", flush=True)
    try:
        availability_result = storage_client.storage_accounts.check_name_availability(
            {'name': _STORAGE_ACCOUNT_NAME})
        if not availability_result.name_available:
            print('storage name {st_acct} is already in use. Try another name.'.format(
                st_acct=_STORAGE_ACCOUNT_NAME))
            exit()
        poller = storage_client.storage_accounts.begin_create(_RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME,
                                                              {
                                                                  "location": _RESOURCE_GROUP_LOCATION,
                                                                  "kind": "StorageV2",
                                                                  "sku": {"name": "Standard_LRS"}
                                                              })
        account_result = poller.result()
        if account_result.name == _STORAGE_ACCOUNT_NAME:
            print('OK')
    except CloudError as e:
        print(e)


# Create storage container in the nominated resource group
def create_blob_storage(storage_client, _STORAGE_CONTAINER_NAME, _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME):
    print("Creating Azure Storage Container {st_blob}...".format(
        st_blob=_STORAGE_CONTAINER_NAME), end="", flush=True)
    storage_client.blob_containers.create(
        _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME, _STORAGE_CONTAINER_NAME, {})
    keys = storage_client.storage_accounts.list_keys(
        _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME)
    conn_string = f"DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName={_STORAGE_ACCOUNT_NAME};AccountKey={keys.keys[0].value}"
    blob_service = BlobServiceClient.from_connection_string(
        conn_str=conn_string)
    containers = [i.name for i in blob_service.list_containers()]
    if _STORAGE_CONTAINER_NAME in containers:
        print('OK\n')


if __name__ == '__main__':
    credential = AzureCliCredential()
    storage_client = StorageManagementClient(credential, _SUBSCRIPTION_ID)
    resource_client = ResourceManagementClient(credential, _SUBSCRIPTION_ID)

    create_resource_group(
        resource_client, _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION)
    create_storage_account(storage_client, _STORAGE_ACCOUNT_NAME,
                           _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION)
    create_blob_storage(storage_client, _STORAGE_CONTAINER_NAME,
                        _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME)

Once the supporting infrastructure is in place we can go ahead and create Azure Batch account. The easiest way to do it is by logging into Azure Portal and from the ‘Create a Resource’ blade select Batch Service as per the image below.

When completed, the three values we will need are a Storage Account Name, a Storage Account Key and finally a Storage Container Name. These values will need to be specified in the below script before executing.

Next, let’s look at the final snippet of code used to stand up Azure Batch pool, create the Job and associated Tasks and finally do all the heavy lifting regarding TPC-DS data generation and transfer into the nominated storage container. The below script, implemented mainly as a collection of functions for readability, is responsible for the following:

  • Create pool resource running on a Linux (Ubuntu) server with a single node. A pool is the collection of nodes that applications runs on. A node is an Azure virtual machine (VM) or cloud service VM that is dedicated to processing a portion of the application’s workload. The size of a node determines the number of CPU cores, memory capacity, and local file system size that is allocated to the node. On node creation, a pre-deployment shell scrip is run to install required packages, download the azcopy utility (used for files transfer), cloning Github repo containing TPC-DS tools and compiling it in the destination directory.
  • Create a Batch job. A job is a collection of tasks. It manages how computation is performed by its tasks on the compute nodes in a pool.
  • Create a series of tasks (defined by the number of flat files/tables specified in the ‘export_files’ variable). A task is a unit of computation that is associated with a job. It runs on a node. Tasks are assigned to a node for execution, or are queued until a node becomes free. Put simply, a task runs one or more programs or scripts on a compute node to perform the work you need done. In this demo each task is responsible for executing the dsdgen utility for the specified table/file and copying the output into the nominated storage account.
  • Monitor the progress of the instantiated job.
  • Delete the pool and the job on tasks completion. Deleting job also deletes individual tasks assigned to it.

To highlight some of the more important sections we can see that the lines 105-112 and 156-165 (see highlighted sections) are where the shell commands are defined. The first section installs required TPC-DS repo libraries, downloads azcopy utility and compiles it. There is also an interesting use of $AZ_BATCH_NODE_SHARED_DIR runtime variable which denotes the full path of the shared directory on the node. All tasks that execute on the node have read/write access to this directory. Moving on to the second section, the shell script executes dsdgen utility with a scaling factor of 1 (1 GB), outputting flat files into $AZ_BATCH_TASK_WORKING_DIR location. This variable denotes the full path of the task working directory on the node. This section also triggers the azcopy utility used for transferring newly created flat files into the Azure blob storage container.

from datetime import datetime, timedelta
from timeit import default_timer as timer
from humanfriendly import format_timespan
import os
import itertools
import sys
import time

from azure.storage.blob import BlobServiceClient, generate_container_sas, AccountSasPermissions
import azure.batch._batch_service_client as batch
import azure.batch.batch_auth as batch_auth
import azure.batch.models as batchmodels


_BATCH_ACCOUNT_NAME = 'batchdemo'
_BATCH_ACCOUNT_KEY = 'YourBatchAccountKey'
_BATCH_ACCOUNT_URL = 'https://batchdemo.australiasoutheast.batch.azure.com'

_STORAGE_ACCOUNT_NAME = 'batchdemo2021'
_STORAGE_ACCOUNT_KEY = 'YourStorageAccountKey'
_STORAGE_CONTAINER_NAME = 'outputfiles'


_POOL_ID = 'PythonTutorialPool'
_POOL_NODE_COUNT = 1
_POOL_VM_SIZE = 'Standard_A1_v2'  # Standard_A2_v2, Standard_A4_v2, Standard_A8_v2
_NODE_OS_PUBLISHER = 'Canonical'
_NODE_OS_OFFER = 'UbuntuServer'
_NODE_OS_SKU = '18.04-LTS'
_JOB_ID = 'AzureBatchPythonDemoJob'


export_files = ['call_center.dat',
                'catalog_page.dat',
                'catalog_sales.dat',
                'customer.dat',
                'customer_address.dat',
                'customer_demographics.dat',
                'income_band.dat',
                'inventory.dat',
                'item.dat',
                'promotion.dat',
                'reason.dat',
                'ship_mode.dat',
                'store.dat',
                'store_sales.dat',
                'time_dim.dat',
                'warehouse.dat',
                'web_page.dat',
                'web_sales.dat',
                'web_site.dat']

# Prompt the user for yes/no input, displaying the specified question text
def query_yes_no(question, default="yes"):
    valid = {'y': 'yes', 'n': 'no'}
    if default is None:
        prompt = ' [y/n] '
    elif default == 'yes':
        prompt = ' [Y/n] '
    elif default == 'no':
        prompt = ' [y/N] '
    else:
        raise ValueError("Invalid default answer: '{}'".format(default))

    while 1:
        choice = input(question + prompt).lower()
        if default and not choice:
            return default
        try:
            return valid[choice[0]]
        except (KeyError, IndexError):
            print("Please respond with 'yes' or 'no' (or 'y' or 'n').\n")


# Wrap cmd/bash command in a shell
def wrap_commands_in_shell(ostype, commands):
    if ostype.lower() == 'linux':
        return '/bin/bash -c \'set -e; set -o pipefail; {}; wait\''.format(
            ';'.join(commands))
    elif ostype.lower() == 'windows':
        return 'cmd.exe /c "{}"'.format('&'.join(commands))
    else:
        raise ValueError('unknown ostype: {}'.format(ostype))


# Print the contents of the specified Batch exception
def print_batch_exception(batch_exception):
    print('-------------------------------------------')
    print('Exception encountered:')
    if batch_exception.error and \
            batch_exception.error.message and \
            batch_exception.error.message.value:
        print(batch_exception.error.message.value)
        if batch_exception.error.values:
            print()
            for mesg in batch_exception.error.values:
                print('{}:\t{}'.format(mesg.key, mesg.value))
    print('-------------------------------------------')


# Create a pool of compute nodes with the specified OS settings
def create_pool(batch_client, _POOL_ID, _POOL_NODE_COUNT, _POOL_VM_SIZE):
    print('\nCreating pool {poolid} (allocated VM size is: {vm})...'.format(
        poolid=_POOL_ID, vm=_POOL_VM_SIZE))
    task_commands = [' sudo apt-get update',
                     ' sudo apt-get install -y gcc make flex bison byacc git',
                     ' cd $AZ_BATCH_NODE_SHARED_DIR',
                     ' wget -O azcopy_v10.tar.gz https://aka.ms/downloadazcopy-v10-linux',
                     ' tar -xf azcopy_v10.tar.gz --strip-components=1',
                     ' git clone https://github.com/gregrahn/tpcds-kit.git',
                     ' cd tpcds-kit/tools',
                     ' make OS=LINUX']
    user = batchmodels.AutoUserSpecification(
        scope=batchmodels.AutoUserScope.pool,
        elevation_level=batchmodels.ElevationLevel.admin)
    new_pool = batch.models.PoolAddParameter(
        id=_POOL_ID,
        virtual_machine_configuration=batchmodels.VirtualMachineConfiguration(
            image_reference=batchmodels.ImageReference(
                publisher=_NODE_OS_PUBLISHER,
                offer=_NODE_OS_OFFER,
                sku=_NODE_OS_SKU,
                version="latest"
            ),
            node_agent_sku_id="batch.node.ubuntu 18.04"),
        vm_size=_POOL_VM_SIZE,
        target_dedicated_nodes=_POOL_NODE_COUNT,
        start_task=batch.models.StartTask(
            command_line=wrap_commands_in_shell('linux',
                                                task_commands),
            user_identity=batchmodels.UserIdentity(auto_user=user),
            wait_for_success=True)
    )
    batch_client.pool.add(new_pool)


# Create a job with the specified ID, associated with the specified pool.
def create_job(batch_client, _JOB_ID, _POOL_ID):

    print('Creating job {}...'.format(_JOB_ID))

    job = batch.models.JobAddParameter(
        id=_JOB_ID,
        pool_info=batch.models.PoolInformation(pool_id=_POOL_ID))
    batch_client.job.add(job)


# Add a task for each input file in the collection to the specified job
def add_tasks(batch_client, output_container_sas_token, export_files, _JOB_ID, _STORAGE_CONTAINER_NAME, _STORAGE_ACCOUNT_NAME):
    print('Adding {} tasks to job {}...'.format(len(export_files), _JOB_ID))
    tasks = list()
    output_files_path = '$AZ_BATCH_TASK_WORKING_DIR/TPCDS-DATA'
    tpcds_utilility_path = '$AZ_BATCH_NODE_SHARED_DIR/tpcds-kit/tools'
    azcopy_utility_path = '$AZ_BATCH_NODE_SHARED_DIR'
    for idx, file_name in enumerate(export_files):
        command = "/bin/bash -c \"mkdir {fpath} ".format(
            fpath=output_files_path)
        command += " && cd {upath} ".format(upath=tpcds_utilility_path)
        command += " && ./dsdgen -SCALE 1 -TABLE {tname} -DIR {fpath}".format(
            fpath=output_files_path, tname=os.path.splitext(file_name)[0])
        command += " && cd {az} ".format(az=azcopy_utility_path)
        command += " && sudo ./azcopy copy \"{opath}/TPCDS-DATA/{fname}\"".format(
            opath=output_files_path, fname=file_name)
        command += " \"https://{sacct}.blob.core.windows.net/{scontainer}/?{sastoken}\" \"".format(
            sacct=_STORAGE_ACCOUNT_NAME, scontainer=_STORAGE_CONTAINER_NAME, sastoken=output_container_sas_token)
        tasks.append(batch.models.TaskAddParameter(
            id='Task{}'.format(idx),
            command_line=command
        )
        )
    batch_client.task.add_collection(_JOB_ID, tasks)


# Return when all tasks in the specified job reach the Completed state.
def wait_for_tasks_to_complete(batch_service_client, job_id, timeout):
    timeout_expiration = datetime.now() + timeout
    spinner = itertools.cycle(['-', '/', '|', '\\'])
    print("Monitoring all tasks for 'Completed' state, timeout in {}..."
          .format(timeout), end='')
    while datetime.now() < timeout_expiration:
        sys.stdout.write(next(spinner))
        sys.stdout.flush()
        tasks = batch_service_client.task.list(job_id)

        incomplete_tasks = [task for task in tasks if
                            task.state != batchmodels.TaskState.completed]
        if not incomplete_tasks:
            print()
            return True
        else:
            sys.stdout.write('\b')
            time.sleep(1)
    raise RuntimeError("ERROR: Tasks did not reach 'Completed' state within "
                       "timeout period of " + str(timeout))


if __name__ == '__main__':
    batch_execution_start_time = timer()
    # Create a Batch service client with associated credentails
    credentials = batch_auth.SharedKeyCredentials(_BATCH_ACCOUNT_NAME,
                                                  _BATCH_ACCOUNT_KEY)
    batch_client = batch.BatchServiceClient(
        credentials,
        batch_url=_BATCH_ACCOUNT_URL)

    # Create SAS (Shared Access Signature) token
    output_container_sas_token = 
        generate_container_sas(account_name=_STORAGE_ACCOUNT_NAME,
        container_name=_STORAGE_CONTAINER_NAME,
        account_key=_STORAGE_ACCOUNT_KEY,
        permission=AccountSasPermissions(
        write=True),
        expiry=datetime.utcnow() + timedelta(hours=1))
    try:
        # Create the pool that will contain the compute nodes that will execute the tasks.
        create_pool(batch_client, _POOL_ID, _POOL_NODE_COUNT, _POOL_VM_SIZE)

        # Create the job that will run the tasks.
        create_job(batch_client, _JOB_ID, _POOL_ID)

        # Add the tasks to the job.
        add_tasks(batch_client, output_container_sas_token,
                  export_files, _JOB_ID, _STORAGE_CONTAINER_NAME, _STORAGE_ACCOUNT_NAME)

        # Pause execution until tasks reach Completed state.
        wait_for_tasks_to_complete(batch_client,
                                   _JOB_ID,
                                   timedelta(minutes=20))
    except batchmodels.BatchErrorException as err:
        print_batch_exception(err)
        raise
    batch_execution_end_time = timer()
    elapsed_duration = batch_execution_end_time - batch_execution_start_time
    print('Elapsed batch processing time was {time}.'.format(
        time=format_timespan(elapsed_duration)))

    # Clean up Batch resources (if the user so chooses).
    if query_yes_no('Delete job?') == 'yes':
        batch_client.job.delete(_JOB_ID)
    if query_yes_no('Delete pool?') == 'yes':
        batch_client.pool.delete(_POOL_ID)

When executed, we can view the progress status, results and additional high-level information for individual tasks in Azure Portal (click on image to enlarge).

I also run the script across multiple compute instance sizes to see what difference processing time will make depending on the selected VM type. The following is a table outlying total processing time (including Pool node creation) across 4 different node sizes and 3 image types.

Looking at those times, it’s evident that the higher number of nodes and the larger the VM size was, the quicker the application executed. In my case it wasn’t linear and only small gains were achieved by increasing the compute capacity, most likely due to significant amount of time dedicated exclusively to VM provisioning. As such, I would imagine that for workloads spanning tens of minutes or even hours, the balance would even out proportionally to the VM size and node count.

Conclusion

The example I outlined is only a small nod to the breath of capabilities and potential use-cases Azure Batch can provide – I haven’t even outlined features such as auto-scaling or using low-priority/spot VMs. The number of applications where on-demand and scalable compute power is required is limitless and almost any domain or project can benefit from it. Additionally, one can extend these capabilities and link it up with other Azure services to build more bespoke solutions e.g. in this context, using Azure Data Factory the TPC-DS process can be executed multiple times, each run generating data for different scaling factor and loading those into a database schema for further testing and evaluation. Similarly, the internet is full of interesting and creative posts of other engineers using Batch to help them solve intricate problems, which otherwise would not see the light of day (mainly due to large upfront investment in hardware). It’s amazing how many of today’s cloud services excel at obfuscating complex and nuanced paradigms for simple implementations, saving time and money. Azure Batch is one of them.

Tags: , , , ,

SQL Server Hash-Partitioned Parallel Data Acquisition – How to Accelerate Your ‘E’ in ELT/ETL Using a Simple T-SQL Framework

June 1st, 2021 / 7 Comments » / by admin

Introduction

Note: All artifacts used in this demo can be downloaded from my shared OneDrive folder HERE.

Big part of Data Warehouse development has always been tied to structuring data acquisition pipelines before more rigid modelling takes place and data is wrangled (most likely) using the most the venerable methodologies today: Data Vault, Kimbal or Inmon. I have covered data acquisition topic in a number of previous posts (mainly HERE and HERE), but would like to further expand on this subject with a short post on how to enhance this process (using a simplified example) and enable hash-partitioning on individual tables. Having concurrent executions i.e. a number of loads running in parallel, one for every source table, is fairly straightforward to achieve using SQL Server platform and a little bit of T-SQL. There are certainly more elegant approaches which enable turning sequential loads into a parallel ones, but with a little bit of elbow grease one can script out a batch of good, old-fashioned SQL Server Agent jobs with ease to be spun up and run simultaneously (example HERE). However, in some cases, source tables can be quite large and having a single transaction responsible for the insertion of the whole table’s content into a target object can be quite time-consuming. The situation can be further exacerbated if the primary key on the source table is a composite one or not of a numeric type.

One of the clients I worked with a few years ago had this problem. They needed to reload the entire content of a few source tables on a nightly schedule and the framework they used was up to the task until data stored in those objects grew by a large margin. Those tables contained a lot of text data i.e. email, chat and telephone transcripts so many of the fields were quite wide. As the acquisition framework they used dynamically generated BIML scrips and SSIS packages, there was very little one could do to tune or modify the logic due to the proprietary nature of the framework’s design. Data volumes increased exponentially and over time packages responsible for objects with very wide attributes started timing out, requiring expensive, bespoke development. Another, much more recent case, involved my current employer’s choice of survey software. If you’ve ever used LimeSurvey, you will know that each survey’s data is stored in a single, very wide table (think hundreds of attributes), with a dedicated column assigned to each question. That’s not a big issue, especially that MySQL (database used in LimeSurvey) has a row limit of 65,535 bytes, however, this schema architecture enabled objects with very wide rows and shallow row count to be proliferated across the database. Extracting this data, particularly when lots of textual values are involved, can be quite slow when thousands of tables are involved.

To demonstrate how to mitigate this type of problem, I will create a simple scenario where a source system table acquisition job can be run across multiple, hash-partitioned ‘chunks’ concurrently. Based on a distinct primary (single column or composite) key, we will be able to distribute the table content across multiple, consistent and non-blocking streams of data, thus providing a significant acceleration of data movement and cutting the time dedicated to ‘E’ (out of ELT/ETL) by a large margin. I will also compare the time it takes to load this data using variable number of streams i.e. between 2 and 8, to demonstrate how this framework can be tuned to enable accelerated performance.

The framework’s simplified architecture view can be depicted as per the diagram below.

On the left we have our source system (augmented by the system metadata stored in a separate database) and a ‘parent’ process used for jobs coordination and dispatching. In the middle we have individual ‘workers’ operating on a hash-defined partition of source data. Finally, on the right we have our target system where it all gets assembled into a mirror copy of the source table. I will go over how all this ‘hangs together’ in more details in the section below.

Azure Source Database and Dummy Data Setup

First, let’s create all required resources on Azure. The following Python script is used to generate Azure resource group, Azure SQL server and an empty database. You can easily replace it with a different self-hosted SQL Server instance (it doesn’t need to live in Azure).

from azure.common.client_factory import get_client_from_cli_profile
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.sql import SqlManagementClient
from msrestazure.azure_exceptions import CloudError
from os import popen
import pyodbc

RESOURCE_GROUP = 'Test_Resource_Group'
LOCATION = 'australiasoutheast'
SQL_SERVER = 'sourceserver2020'
SQL_DB = 'sourcedb'
USERNAME = 'testusername'
PASSWORD = 'MyV3ry$trongPa$$word'
DRIVER = '{ODBC Driver 17 for SQL Server}'
external_IP = popen("curl -s ifconfig.me").readline()


def create_resource_group(resource_client, RESOURCE_GROUP, LOCATION):
    print("\nCreating Azure RG {rg_name}...".format(
        rg_name=RESOURCE_GROUP), end="", flush=True)
    try:
        resource_client.resource_groups.create_or_update(
            RESOURCE_GROUP, {'location': LOCATION})
    except CloudError as e:
        print(e)
    rg = [g.name for g in resource_client.resource_groups.list()]
    if RESOURCE_GROUP in rg:
        print('OK')


def create_sql_server(sql_client, RESOURCE_GROUP, SQL_SERVER, LOCATION, USERNAME, PASSWORD):
    print("Creating Azure SQL Server {ssvr_name}...".format(
        ssvr_name=SQL_SERVER), end="", flush=True)
    try:
        sql_server = sql_client.servers.begin_create_or_update(
            RESOURCE_GROUP,
            SQL_SERVER,
            {
                'location': LOCATION,
                'version': '12.0',
                'administrator_login': USERNAME,
                'administrator_login_password': PASSWORD
            }
        )
        sql_server.wait()
    except CloudError as e:
        print(e)
    ssvr = [i.name for i in sql_client.servers.list_by_resource_group(
        RESOURCE_GROUP)]
    if SQL_SERVER in ssvr:
        print('OK')


def create_sql_db(sql_client, RESOURCE_GROUP, SQL_SERVER, SQL_DB, LOCATION):
    print("Creating Azure SQL Database {db_name}...".format(
        db_name=SQL_DB), end="", flush=True)
    try:
        sql_db = sql_client.databases.begin_create_or_update(
            RESOURCE_GROUP,
            SQL_SERVER,
            SQL_DB,
            {
                'location': LOCATION,
                'collation': 'SQL_Latin1_General_CP1_CI_AS',
                'create_mode': 'default',
                'requested_service_objective_name': 'Basic'
            }
        )
        sql_db.wait()
    except CloudError as e:
        print(e)
    dbs = [i.name for i in sql_client.databases.list_by_server(
        RESOURCE_GROUP, SQL_SERVER)]
    if SQL_DB in dbs:
        print('OK')


def configure_firewall(sql_client, DRIVER, RESOURCE_GROUP, SQL_SERVER, SQL_DB, USERNAME, PASSWORD, external_IP):
    print("Configuring Azure SQL Server Firewall Settings...", end="", flush=True)
    try:
        sql_client.firewall_rules.create_or_update(
            RESOURCE_GROUP,
            SQL_SERVER,
            "firewall_rule_name_" + external_IP,
            {
                "startIpAddress": external_IP,
                "endIpAddress": external_IP
            }
        )
    except CloudError as e:
        print(e)
    SQL_SERVER = SQL_SERVER + '.database.windows.net'
    with pyodbc.connect('DRIVER='+DRIVER+';SERVER='+SQL_SERVER+';PORT=1433;DATABASE='+SQL_DB+';UID='+USERNAME+';PWD=' + PASSWORD) as conn:
        with conn.cursor() as cursor:
            cursor.execute("SELECT @@version")
            row = cursor.fetchone()
    if row:
        print('OK')


def main():
    # create resource client
    resource_client = get_client_from_cli_profile(ResourceManagementClient)
    create_resource_group(resource_client, RESOURCE_GROUP, LOCATION)
    # create sql client
    sql_client = get_client_from_cli_profile(SqlManagementClient)
    # create sql server
    create_sql_server(sql_client, RESOURCE_GROUP, SQL_SERVER,
                      LOCATION, USERNAME, PASSWORD)
    # create sql db in the basic tier
    create_sql_db(sql_client, RESOURCE_GROUP, SQL_SERVER, SQL_DB, LOCATION)
    # configure firewall
    configure_firewall(sql_client, DRIVER, RESOURCE_GROUP,
                       SQL_SERVER, SQL_DB, USERNAME, PASSWORD, external_IP)


if __name__ == "__main__":
    main()

Providing all necessary Azure SDK for Python modules were installed and referenced correctly and we’ve signed in using Azure CLI, we should see the following status output at the end of script execution.

Next, let’s generate some mock data. This is required to simulate the scenario I was referring to before i.e. a table with a large number of columns and variable-length text data which we will try to acquire as efficiently and quickly as possible. The following script creates a small stored procedure used to generate dummy data, Dummy_Table object and finally, it assigns ID column as a primary key on the newly populated table.

from pathlib import PureWindowsPath
import pyodbc

SQL_SERVER = 'sourceserver2020.database.windows.net'
SQL_DB = 'sourcedb'
USERNAME = 'testusername'
PASSWORD = 'MyV3ry$trongPa$$word'
DRIVER = '{ODBC Driver 17 for SQL Server}'


sql_file = PureWindowsPath(
    '/Path/Azure_SQLDB_Deployment/SQL/create_wide_tbl.sql')

with open(sql_file, "r") as f:
    sqlFile = f.read()

sql = sqlFile.rstrip('\n')

try:
    with pyodbc.connect('DRIVER='+DRIVER+';SERVER='+SQL_SERVER+';PORT=1433;DATABASE='+SQL_DB+';UID='+USERNAME+';PWD=' + PASSWORD) as conn:
        with conn.cursor() as cursor:
            cursor.execute('DROP PROCEDURE IF EXISTS usp_generate_dummy_data')
            cursor.execute(sql)
            cursor.execute('EXEC dbo.usp_generate_dummy_data')
            cursor.execute('SELECT TOP (1) 1 FROM dbo.Dummy_Table')
            rows = cursor.fetchone()
            if rows:
                print('All Good!')
            else:
                raise ValueError(
                    'No data generated in the source table. Please troubleshoot!'
                )
except pyodbc.Error as ex:
    sqlstate = ex.args[1]
    print(sqlstate)

The stored procedure (code below) is configured to create 300 columns across 100000 rows of dense, text data. When finished (executed for approx. 20min with the default configuration of data volume and Azure resources specified in the script above), the very wide table schema as well as synthetically created data will look similar to the one in the image below (click on image to enlarge).

CREATE PROCEDURE usp_generate_dummy_data
AS
BEGIN
    SET NOCOUNT ON;
    DECLARE @wide INT = 100;
    DECLARE @deep INT = 100000;
    DECLARE @allchars VARCHAR(100) = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 ';
    DECLARE @target_table_name VARCHAR(56) = 'Dummy_Table';

    DROP TABLE IF EXISTS ##metadata;
    CREATE TABLE ##metadata
    (
        ID INT IDENTITY(1, 1),
        Column_Name VARCHAR(256),
        Data_Type VARCHAR(56),
        Column_Size VARCHAR(56)
    );


    DECLARE @count INT = 1;
    WHILE @count <= @wide
    BEGIN
        INSERT INTO ##metadata
        (
            Column_Name,
            Data_Type,
            Column_Size
        )
        SELECT 'Column_' + CAST(@count AS VARCHAR(56)),
               'varchar',
               CEILING(RAND() * 100);

        SET @count = @count + 1;
    END;

    DECLARE @SQL VARCHAR(MAX);
    SELECT @SQL
        = 'DROP TABLE IF EXISTS ' + @target_table_name + '; CREATE TABLE ' + @target_table_name
          + ' (Id INT IDENTITY(1,1),' + STRING_AGG(Column_Name + ' ' + Data_Type + '(' + Column_Size + ')', ',') + ')'
    FROM ##metadata;
    EXEC (@SQL);

    SET @count = 1;
    WHILE @count <= @deep
    BEGIN
        DECLARE @vals VARCHAR(MAX);
        SELECT @vals
            = STRING_AGG(
                            CAST(QUOTENAME(
                                              SUBSTRING(
                                                           RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35) + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5),
                                                           1,
                                                           CAST(Column_Size AS INT)
                                                       ),
                                              ''''
                                          ) AS NVARCHAR(MAX)),
                            ','
                        )
        FROM ##metadata;
        SELECT @SQL
            = 'INSERT INTO ' + @target_table_name + '(' + STRING_AGG(Column_Name, ',') + ') SELECT ' + @vals + ''
        FROM ##metadata;
        EXEC (@SQL);
        SET @count = @count + 1;
    END;

    ALTER TABLE dbo.Dummy_Table
    ADD PRIMARY KEY (ID);

    DROP TABLE IF EXISTS ##metadata;
END;

Next, I will create a linked server connection from my on-premise SQL Server instance to the one I’ve just provisioned in Azure and recreate the source Dummy_Table schema in the target database.

USE [master]
GO
EXEC master.dbo.sp_addlinkedserver @server = N'AZURESOURCELINKEDSVR', @srvproduct=N'', @provider=N'SQLNCLI', @datasrc=N'sourceserver2020.database.windows.net', @catalog=N'sourcedb'
GO
EXEC master.dbo.sp_addlinkedsrvlogin @rmtsrvname=N'AZURESOURCELINKEDSVR',@useself=N'False',@locallogin=NULL,@rmtuser=N'testusername',@rmtpassword='MyV3ry$trongPa$$word'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'collation compatible', @optvalue=N'false'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'data access', @optvalue=N'true'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'dist', @optvalue=N'false'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'pub', @optvalue=N'false'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'rpc', @optvalue=N'true'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'rpc out', @optvalue=N'true'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'sub', @optvalue=N'false'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'connect timeout', @optvalue=N'0'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'collation name', @optvalue=NULL
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'lazy schema validation', @optvalue=N'false'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'query timeout', @optvalue=N'0'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'use remote collation', @optvalue=N'true'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'remote proc transaction promotion', @optvalue=N'false'
GO

Finally, in this example I will also be useing two small databases (EDW_Control and AdminDBA) which contain metadata information on the object(s) I will be processing and error logs for loads which fail to run correctly. The structure of EDW_Control database is pretty straightforward – it uses a collection of tables to track things like source and target schema names, record counts, table and indexes size etc. The important part is the fields called ‘Is_Big’ and ‘ETL_Batch_No’ which dictate whether the nominated object should be hash-partitioned and if so, into how many batches/streams. I wrote more about how these tables are structured in one of my previous blog post HERE. Details on how to build and deploy AdminDBA database can be found in my previous posts HERE and HERE. For reference, the EDW_Control database schema and a snapshot of metadata I will be processing looks as per below.

Hash-Partitioned Data Acquisition

Now that we’re all set up let’s explore the main code base behind the process responsible for all the heavy lifting. This acquisition method is based on two different stored procedures: one which handles metadata, processes coordination and jobs assignment and the second one which builds the actual SQL statements responsible for data insertion. You can download all the code from my OneDrive folder HERE so I will only go over some of the more interesting aspects of these stored procedures and finally provide a quick demo depicting runtime behavior and a short summary of its performance.

As mentioned, the magic sauce which allows for building reliable concurrent acquisition job is the hash-partitioning which converts single or composite primary key into a hash string, dynamically building self-contained stream of data. We can specify as many partitions as we want (no greater than the number of rows) which can be further scaled back based on the number of CPU cores available e.g. in the demo below I will be partitioning the source table into 8 partitions. We can also have the process determine the number of partitions (when run for multiple tables) based on the metadata collected e.g. row counts, data size etc. by assigning the value of 1 to ‘@Concurrent_Batch_No_Override’ parameter. The main functionality, however, is tied to the following bit of code which creates even ‘chunks’ of target data based on the sorted hash values.

SET @SQL = 'DECLARE @R1 INT = (SELECT  MIN(id) from #Hash_Temp)'										+CHAR(13);
SET @SQL = @SQL + 'DECLARE @R1_Hash VARCHAR(128) = (SELECT hash_key FROM #Hash_Temp WHERE id = @R1)'	+CHAR(13);
SET @SQL = @SQL + 'DECLARE @R2 BIGINT = (SELECT (MAX(id)-MIN(id)+1)'									+CHAR(13);
SET @SQL = @SQL + '/'+CAST(@etl_batch_no AS VARCHAR(10))+' as id FROM #Hash_Temp)'						+CHAR(13);
SET @SQL = @SQL + 'DECLARE @R2_Hash VARCHAR (128) = (SELECT hash_key FROM #Hash_Temp WHERE id = @R2)'	+CHAR(13);
SET @SQL = @SQL + 'DECLARE @R3 BIGINT = (SELECT MAX(id) from #Hash_Temp)'								+CHAR(13);
SET @SQL = @SQL + 'DECLARE @R3_Hash VARCHAR(128) = (SELECT hash_key FROM #Hash_Temp WHERE id = @R3)'	+CHAR(13);
SET @SQL = @SQL + 'INSERT INTO #Ids_Range'																+CHAR(13);
SET @SQL = @SQL + '(range_FROM, range_TO, hash_FROM, hash_TO) '											+CHAR(13);
SET @SQL = @SQL + 'SELECT @R1, @R1+@R2, @R1_Hash,(SELECT hash_key FROM #Hash_Temp WHERE id =@R1+@R2)'	+CHAR(13);
SET @SQL = @SQL + 'DECLARE @z INT = 1'																	+CHAR(13);
SET @SQL = @SQL + 'WHILE @z <= '+CAST(@etl_batch_no AS VARCHAR(10))+'-1'								   +CHAR(13);
SET @SQL = @SQL + 'BEGIN'																				+CHAR(13);
SET @SQL = @SQL + 'INSERT INTO #Ids_Range (Range_FROM, Range_TO, Hash_FROM, Hash_TO)'					+CHAR(13);
SET @SQL = @SQL + 'SELECT LAG(Range_TO,0) OVER (ORDER BY id DESC)+1, '									+CHAR(13);
SET @SQL = @SQL + 'CASE WHEN LAG(Range_TO,0) OVER (ORDER BY id DESC)+@R2+1 >= @R3'					   +CHAR(13);
SET @SQL = @SQL + 'THEN @R3 ELSE LAG(Range_TO,0) OVER (ORDER BY id DESC)+@R2+1 END,'					+CHAR(13);
SET @SQL = @SQL + '(SELECT hash_key FROM #Hash_Temp WHERE id =(SELECT LAG(Range_TO,0) '					+CHAR(13);
SET @SQL = @SQL + 'OVER (ORDER BY id DESC)+1 FROM #Ids_Range WHERE @z = id)),'							+CHAR(13);
SET @SQL = @SQL + '(SELECT Hash_key FROM #Hash_Temp WHERE id =(SELECT'									+CHAR(13);
SET @SQL = @SQL + 'CASE WHEN LAG(Range_TO,0) OVER (ORDER BY id DESC)+@R2+1 >= @R3'					   +CHAR(13);
SET @SQL = @SQL + 'THEN @R3 ELSE LAG(Range_TO,0) OVER (ORDER BY id DESC)+@R2+1 END'					    +CHAR(13);
SET @SQL = @SQL + 'FROM #Ids_Range WHERE @z = id))'														+CHAR(13);
SET @SQL = @SQL + 'FROM #Ids_Range WHERE @z = id'														+CHAR(13);
SET @SQL = @SQL + 'SET @z = @z+1'																		+CHAR(13);
SET @SQL = @SQL + 'END'																					+CHAR(13);
EXEC(@SQL)

This code creates uniform ‘slices’ of source table’s data using metadata information stored in Hash_Temp temp table. This metadata is created using HASHBYTES() function with SHA1 algorithm allowing for efficient values indexation and sorting, which in turn allows for partition ranges creation. For wide objects with small number of rows i.e. less than 1 million this should work relatively speedy, however, for bigger tables you may want to move some of this logic into a memory-optimized table. Given that my source data set is quite small (100K rows), I did not encounter any performance bottlenecks with TempDB-stored tables in this set up. Additionally, this stored procedure acts as a coordinator/dispatcher, ensuring that all conditions are validated, the number of concurrent streams does not exceed number of CPU cores, the runtime (checked every 5 seconds and capped at 200 lookups) does not go over this limit, in which case any outstanding jobs are terminated.

Finally, a self-contained SQL Server agent job is instantiated, calling the second stored procedure which is responsible for data insertion based on the partition number and hash values boundaries.

DECLARE @sql_job		NVARCHAR(MAX)	=		
'USE [targetdb]
EXEC	[dbo].['+@Worker_Proc_Name+']
@Source_Server_Name = '+@Source_Server_Name+',
@Source_Server_DB_Name = '''''+@Source_Server_DB_Name+''''',
@Source_Server_Schema_Name = '''''+@Source_Server_Schema_Name+''''',
@Source_Server_Object_Name = '''''+@table+''''',
@Target_Server_DB_Name = N'''''+@Target_Server_DB_Name+''''',
@Target_Server_Schema_Name = '''''+@Target_Server_Schema_Name+''''',
@Hash_SQL = N'''''+@hash_SQL+''''',
@Hash_FROM = N'''''+@hash_FROM+''''',
@Hash_TO = N'''''+@hash_TO+''''',
@Is_Big = N'''''+CAST(@is_big AS CHAR(1))+''''',
@Col_List_MSSQL = N'''''+@Col_List_MSSQL+''''',
@Target_Server_Object_Name = '''''+@table+''''',
@Exec_Instance_GUID	='''''+CAST(@Exec_Instance_GUID AS VARCHAR(128))+''''',
@Package_Name='''''+@Package_Name+''''' '
				

SET @SQL =			'IF EXISTS'
SET @SQL = @SQL +	'(SELECT TOP 1 1 FROM msdb..sysjobs_view job JOIN msdb.dbo.sysjobactivity activity '					+CHAR(13)
SET @SQL = @SQL +	'ON job.job_id = activity.job_id WHERE job.name = N'''+@job+''''										+CHAR(13)
SET @SQL = @SQL +	'AND job.date_created IS NOT NULL AND activity.stop_execution_date IS NULL)'							+CHAR(13)
SET @SQL = @SQL +	'BEGIN'																									+CHAR(13)
SET @SQL = @SQL +	'EXEC msdb..sp_stop_job @job_name=N'''+@job+''';'														+CHAR(13)																	
SET @SQL = @SQL +	'EXEC msdb..sp_delete_job @job_name=N'''+@job+''', @delete_unused_schedule=1'							+CHAR(13)									
SET @SQL = @SQL +	'END'																									+CHAR(13)
SET @SQL = @SQL +	'IF EXISTS'																								+CHAR(13)
SET @SQL = @SQL +	'(SELECT TOP 1 1 FROM msdb..sysjobs_view job JOIN msdb.dbo.sysjobactivity activity'						+CHAR(13)
SET @SQL = @SQL +	'ON job.job_id = activity.job_id WHERE job.name = N'''+@job+''''										+CHAR(13)
SET @SQL = @SQL +	'AND job.date_created IS NULL AND activity.stop_execution_date IS NOT NULL)'							+CHAR(13)
SET @SQL = @SQL +	'BEGIN'																									+CHAR(13)
SET @SQL = @SQL +	'EXEC msdb..sp_delete_job @job_name=N'''+@job+''', @delete_unused_schedule=1'							+CHAR(13)									
SET @SQL = @SQL +	'END'																									+CHAR(13)
SET @SQL = @SQL +	'IF EXISTS'																								+CHAR(13)
SET @SQL = @SQL +	'(SELECT TOP 1 1 FROM msdb..sysjobs_view job WHERE job.name = N'''+@job+''')'							+CHAR(13)
SET @SQL = @SQL +	'BEGIN'																									+CHAR(13)
SET @SQL = @SQL +	'EXEC msdb..sp_delete_job @job_name=N'''+@job+''', @delete_unused_schedule=1'							+CHAR(13)									
SET @SQL = @SQL +	'END'																									+CHAR(13)
SET @SQL = @SQL +	'EXEC msdb..sp_add_job '''+@job+''', @owner_login_name= '''+@job_owner+''';'							+CHAR(13)
SET @SQL = @SQL +	'EXEC msdb..sp_add_jobserver @job_name= '''+@job+''';'													+CHAR(13)			
SET @SQL = @SQL +	'EXEC msdb..sp_add_jobstep @job_name='''+@job+''', @step_name= ''Step1'', '								+CHAR(13)
SET @SQL = @SQL +	'@command = '''+@sql_job+''', @database_name = '''+@Target_Server_DB_Name+''', @on_success_action = 3;'	+CHAR(13)						
SET @SQL = @SQL +	'EXEC msdb..sp_add_jobstep @job_name = '''+@job+''', @step_name= ''Step2'','							+CHAR(13)
SET @SQL = @SQL +   '@command = '''+@delete_job_sql+''''																	+CHAR(13)
SET @SQL = @SQL +	'EXEC msdb..sp_start_job @job_name= '''+@job+''', @output_flag = 0'	

The short footage below demonstrates how our target Dummy_Table data is loaded using this architecture.

It is also worth highlighting that this code can run acquisitions for tables which are not hash-partitioned, in which case it will still create multiple parallel streams, providing a significant boost in comparison to sequential loads. As such we can mix and match objects which need to be broken into multiple streams i.e. large tables, as well as stand-alone loads which can move data across in a single transaction. Another interesting feature is the ability to ‘throttle’ the number of concurrent streams using the @Queue_Size parameter, which is set to the number of cores allocated to a SQL Server instance this process is executing on or a number we can specify. As such, even when we partition a large table into a number of streams which exceed that of the available cores, the queue size will always stay consistent and not outgrow the maximum number of concurrent streams allowed. This enables the process to ‘nibble’ at the queue (queue size check set to 1 second interval), maximizing resources utilization and ensuring that we have an optimal number of jobs running concurrently.

Sample Data Testing Results

Now that I’ve demonstrated how this technique can increase the performance of moving data, let’s look at some concrete numbers. I re-run the code for two small samples of data i.e. tables with 200 columns and 300 columns, across 100K rows of synthetic data and the results clearly demonstrated that even with the additional overhead of calculating hash values on primary keys, it is possible to achieve at least 4x performance boost with this architecture. Even on my old-ish E5-2670 v3 rig clocked at 2.3GHz with SAN storage attached we can observe a significant speed increase, with Azure SQL DB CPU not exceeding 15% utilization at any time. The rule of diminishing results will most likely kick in when a hash value for a very long composite primary key will need to be calculated. Likewise, I would expect this solution to regress in performance if long tables e.g. 10M+ rows are involved. To prevent this, it may be possible that with enough network throughput and a well optimized source table we can re-factor this code to transition the disk-based temporary hash table (as it’s implemented in this demo) to memory-optimized table, but I have not tried it myself.

My testing machine had only 4 cores at its disposal so I had limited resources available, however, with 32 core machines becoming the norm these days, with the right application I would expect those numbers to be significantly better. As a matter of fact, transitioning to this method yielded 8-10x performance increase for few tables sourced from Dynamics365 schema using more powerful hardware on my current project.

Naturally, the overall performance will depend heavily on the number of factors e.g. network and storage speed, CPU specs, schema structure, data types etc. but when possible (and applicable) one could improve data acquisition performance without resorting to bespoke frameworks, on a single machine and all using T-SQL/SQL Server Agent functionality as the underlining technology.

Tags: , , , , , , , ,