SQL Server Hash-Partitioned Parallel Data Acquisition – How to Accelerate Your ‘E’ in ELT/ETL Using a Simple T-SQL Framework

June 1st, 2021 / 7 Comments » / by admin

Introduction

Note: All artifacts used in this demo can be downloaded from my shared OneDrive folder HERE.

Big part of Data Warehouse development has always been tied to structuring data acquisition pipelines before more rigid modelling takes place and data is wrangled (most likely) using the most the venerable methodologies today: Data Vault, Kimbal or Inmon. I have covered data acquisition topic in a number of previous posts (mainly HERE and HERE), but would like to further expand on this subject with a short post on how to enhance this process (using a simplified example) and enable hash-partitioning on individual tables. Having concurrent executions i.e. a number of loads running in parallel, one for every source table, is fairly straightforward to achieve using SQL Server platform and a little bit of T-SQL. There are certainly more elegant approaches which enable turning sequential loads into a parallel ones, but with a little bit of elbow grease one can script out a batch of good, old-fashioned SQL Server Agent jobs with ease to be spun up and run simultaneously (example HERE). However, in some cases, source tables can be quite large and having a single transaction responsible for the insertion of the whole table’s content into a target object can be quite time-consuming. The situation can be further exacerbated if the primary key on the source table is a composite one or not of a numeric type.

One of the clients I worked with a few years ago had this problem. They needed to reload the entire content of a few source tables on a nightly schedule and the framework they used was up to the task until data stored in those objects grew by a large margin. Those tables contained a lot of text data i.e. email, chat and telephone transcripts so many of the fields were quite wide. As the acquisition framework they used dynamically generated BIML scrips and SSIS packages, there was very little one could do to tune or modify the logic due to the proprietary nature of the framework’s design. Data volumes increased exponentially and over time packages responsible for objects with very wide attributes started timing out, requiring expensive, bespoke development. Another, much more recent case, involved my current employer’s choice of survey software. If you’ve ever used LimeSurvey, you will know that each survey’s data is stored in a single, very wide table (think hundreds of attributes), with a dedicated column assigned to each question. That’s not a big issue, especially that MySQL (database used in LimeSurvey) has a row limit of 65,535 bytes, however, this schema architecture enabled objects with very wide rows and shallow row count to be proliferated across the database. Extracting this data, particularly when lots of textual values are involved, can be quite slow when thousands of tables are involved.

To demonstrate how to mitigate this type of problem, I will create a simple scenario where a source system table acquisition job can be run across multiple, hash-partitioned ‘chunks’ concurrently. Based on a distinct primary (single column or composite) key, we will be able to distribute the table content across multiple, consistent and non-blocking streams of data, thus providing a significant acceleration of data movement and cutting the time dedicated to ‘E’ (out of ELT/ETL) by a large margin. I will also compare the time it takes to load this data using variable number of streams i.e. between 2 and 8, to demonstrate how this framework can be tuned to enable accelerated performance.

The framework’s simplified architecture view can be depicted as per the diagram below.

On the left we have our source system (augmented by the system metadata stored in a separate database) and a ‘parent’ process used for jobs coordination and dispatching. In the middle we have individual ‘workers’ operating on a hash-defined partition of source data. Finally, on the right we have our target system where it all gets assembled into a mirror copy of the source table. I will go over how all this ‘hangs together’ in more details in the section below.

Azure Source Database and Dummy Data Setup

First, let’s create all required resources on Azure. The following Python script is used to generate Azure resource group, Azure SQL server and an empty database. You can easily replace it with a different self-hosted SQL Server instance (it doesn’t need to live in Azure).

from azure.common.client_factory import get_client_from_cli_profile
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.sql import SqlManagementClient
from msrestazure.azure_exceptions import CloudError
from os import popen
import pyodbc

RESOURCE_GROUP = 'Test_Resource_Group'
LOCATION = 'australiasoutheast'
SQL_SERVER = 'sourceserver2020'
SQL_DB = 'sourcedb'
USERNAME = 'testusername'
PASSWORD = 'MyV3ry$trongPa$$word'
DRIVER = '{ODBC Driver 17 for SQL Server}'
external_IP = popen("curl -s ifconfig.me").readline()


def create_resource_group(resource_client, RESOURCE_GROUP, LOCATION):
    print("\nCreating Azure RG {rg_name}...".format(
        rg_name=RESOURCE_GROUP), end="", flush=True)
    try:
        resource_client.resource_groups.create_or_update(
            RESOURCE_GROUP, {'location': LOCATION})
    except CloudError as e:
        print(e)
    rg = [g.name for g in resource_client.resource_groups.list()]
    if RESOURCE_GROUP in rg:
        print('OK')


def create_sql_server(sql_client, RESOURCE_GROUP, SQL_SERVER, LOCATION, USERNAME, PASSWORD):
    print("Creating Azure SQL Server {ssvr_name}...".format(
        ssvr_name=SQL_SERVER), end="", flush=True)
    try:
        sql_server = sql_client.servers.begin_create_or_update(
            RESOURCE_GROUP,
            SQL_SERVER,
            {
                'location': LOCATION,
                'version': '12.0',
                'administrator_login': USERNAME,
                'administrator_login_password': PASSWORD
            }
        )
        sql_server.wait()
    except CloudError as e:
        print(e)
    ssvr = [i.name for i in sql_client.servers.list_by_resource_group(
        RESOURCE_GROUP)]
    if SQL_SERVER in ssvr:
        print('OK')


def create_sql_db(sql_client, RESOURCE_GROUP, SQL_SERVER, SQL_DB, LOCATION):
    print("Creating Azure SQL Database {db_name}...".format(
        db_name=SQL_DB), end="", flush=True)
    try:
        sql_db = sql_client.databases.begin_create_or_update(
            RESOURCE_GROUP,
            SQL_SERVER,
            SQL_DB,
            {
                'location': LOCATION,
                'collation': 'SQL_Latin1_General_CP1_CI_AS',
                'create_mode': 'default',
                'requested_service_objective_name': 'Basic'
            }
        )
        sql_db.wait()
    except CloudError as e:
        print(e)
    dbs = [i.name for i in sql_client.databases.list_by_server(
        RESOURCE_GROUP, SQL_SERVER)]
    if SQL_DB in dbs:
        print('OK')


def configure_firewall(sql_client, DRIVER, RESOURCE_GROUP, SQL_SERVER, SQL_DB, USERNAME, PASSWORD, external_IP):
    print("Configuring Azure SQL Server Firewall Settings...", end="", flush=True)
    try:
        sql_client.firewall_rules.create_or_update(
            RESOURCE_GROUP,
            SQL_SERVER,
            "firewall_rule_name_" + external_IP,
            {
                "startIpAddress": external_IP,
                "endIpAddress": external_IP
            }
        )
    except CloudError as e:
        print(e)
    SQL_SERVER = SQL_SERVER + '.database.windows.net'
    with pyodbc.connect('DRIVER='+DRIVER+';SERVER='+SQL_SERVER+';PORT=1433;DATABASE='+SQL_DB+';UID='+USERNAME+';PWD=' + PASSWORD) as conn:
        with conn.cursor() as cursor:
            cursor.execute("SELECT @@version")
            row = cursor.fetchone()
    if row:
        print('OK')


def main():
    # create resource client
    resource_client = get_client_from_cli_profile(ResourceManagementClient)
    create_resource_group(resource_client, RESOURCE_GROUP, LOCATION)
    # create sql client
    sql_client = get_client_from_cli_profile(SqlManagementClient)
    # create sql server
    create_sql_server(sql_client, RESOURCE_GROUP, SQL_SERVER,
                      LOCATION, USERNAME, PASSWORD)
    # create sql db in the basic tier
    create_sql_db(sql_client, RESOURCE_GROUP, SQL_SERVER, SQL_DB, LOCATION)
    # configure firewall
    configure_firewall(sql_client, DRIVER, RESOURCE_GROUP,
                       SQL_SERVER, SQL_DB, USERNAME, PASSWORD, external_IP)


if __name__ == "__main__":
    main()

Providing all necessary Azure SDK for Python modules were installed and referenced correctly and we’ve signed in using Azure CLI, we should see the following status output at the end of script execution.

Next, let’s generate some mock data. This is required to simulate the scenario I was referring to before i.e. a table with a large number of columns and variable-length text data which we will try to acquire as efficiently and quickly as possible. The following script creates a small stored procedure used to generate dummy data, Dummy_Table object and finally, it assigns ID column as a primary key on the newly populated table.

from pathlib import PureWindowsPath
import pyodbc

SQL_SERVER = 'sourceserver2020.database.windows.net'
SQL_DB = 'sourcedb'
USERNAME = 'testusername'
PASSWORD = 'MyV3ry$trongPa$$word'
DRIVER = '{ODBC Driver 17 for SQL Server}'


sql_file = PureWindowsPath(
    '/Path/Azure_SQLDB_Deployment/SQL/create_wide_tbl.sql')

with open(sql_file, "r") as f:
    sqlFile = f.read()

sql = sqlFile.rstrip('\n')

try:
    with pyodbc.connect('DRIVER='+DRIVER+';SERVER='+SQL_SERVER+';PORT=1433;DATABASE='+SQL_DB+';UID='+USERNAME+';PWD=' + PASSWORD) as conn:
        with conn.cursor() as cursor:
            cursor.execute('DROP PROCEDURE IF EXISTS usp_generate_dummy_data')
            cursor.execute(sql)
            cursor.execute('EXEC dbo.usp_generate_dummy_data')
            cursor.execute('SELECT TOP (1) 1 FROM dbo.Dummy_Table')
            rows = cursor.fetchone()
            if rows:
                print('All Good!')
            else:
                raise ValueError(
                    'No data generated in the source table. Please troubleshoot!'
                )
except pyodbc.Error as ex:
    sqlstate = ex.args[1]
    print(sqlstate)

The stored procedure (code below) is configured to create 300 columns across 100000 rows of dense, text data. When finished (executed for approx. 20min with the default configuration of data volume and Azure resources specified in the script above), the very wide table schema as well as synthetically created data will look similar to the one in the image below (click on image to enlarge).

CREATE PROCEDURE usp_generate_dummy_data
AS
BEGIN
    SET NOCOUNT ON;
    DECLARE @wide INT = 100;
    DECLARE @deep INT = 100000;
    DECLARE @allchars VARCHAR(100) = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 ';
    DECLARE @target_table_name VARCHAR(56) = 'Dummy_Table';

    DROP TABLE IF EXISTS ##metadata;
    CREATE TABLE ##metadata
    (
        ID INT IDENTITY(1, 1),
        Column_Name VARCHAR(256),
        Data_Type VARCHAR(56),
        Column_Size VARCHAR(56)
    );


    DECLARE @count INT = 1;
    WHILE @count <= @wide
    BEGIN
        INSERT INTO ##metadata
        (
            Column_Name,
            Data_Type,
            Column_Size
        )
        SELECT 'Column_' + CAST(@count AS VARCHAR(56)),
               'varchar',
               CEILING(RAND() * 100);

        SET @count = @count + 1;
    END;

    DECLARE @SQL VARCHAR(MAX);
    SELECT @SQL
        = 'DROP TABLE IF EXISTS ' + @target_table_name + '; CREATE TABLE ' + @target_table_name
          + ' (Id INT IDENTITY(1,1),' + STRING_AGG(Column_Name + ' ' + Data_Type + '(' + Column_Size + ')', ',') + ')'
    FROM ##metadata;
    EXEC (@SQL);

    SET @count = 1;
    WHILE @count <= @deep
    BEGIN
        DECLARE @vals VARCHAR(MAX);
        SELECT @vals
            = STRING_AGG(
                            CAST(QUOTENAME(
                                              SUBSTRING(
                                                           RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35) + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5)
                                                           + RIGHT(LEFT(@allchars, ABS(BINARY_CHECKSUM(NEWID()) % 35)
                                                                                   + 5), 5),
                                                           1,
                                                           CAST(Column_Size AS INT)
                                                       ),
                                              ''''
                                          ) AS NVARCHAR(MAX)),
                            ','
                        )
        FROM ##metadata;
        SELECT @SQL
            = 'INSERT INTO ' + @target_table_name + '(' + STRING_AGG(Column_Name, ',') + ') SELECT ' + @vals + ''
        FROM ##metadata;
        EXEC (@SQL);
        SET @count = @count + 1;
    END;

    ALTER TABLE dbo.Dummy_Table
    ADD PRIMARY KEY (ID);

    DROP TABLE IF EXISTS ##metadata;
END;

Next, I will create a linked server connection from my on-premise SQL Server instance to the one I’ve just provisioned in Azure and recreate the source Dummy_Table schema in the target database.

USE [master]
GO
EXEC master.dbo.sp_addlinkedserver @server = N'AZURESOURCELINKEDSVR', @srvproduct=N'', @provider=N'SQLNCLI', @datasrc=N'sourceserver2020.database.windows.net', @catalog=N'sourcedb'
GO
EXEC master.dbo.sp_addlinkedsrvlogin @rmtsrvname=N'AZURESOURCELINKEDSVR',@useself=N'False',@locallogin=NULL,@rmtuser=N'testusername',@rmtpassword='MyV3ry$trongPa$$word'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'collation compatible', @optvalue=N'false'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'data access', @optvalue=N'true'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'dist', @optvalue=N'false'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'pub', @optvalue=N'false'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'rpc', @optvalue=N'true'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'rpc out', @optvalue=N'true'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'sub', @optvalue=N'false'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'connect timeout', @optvalue=N'0'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'collation name', @optvalue=NULL
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'lazy schema validation', @optvalue=N'false'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'query timeout', @optvalue=N'0'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'use remote collation', @optvalue=N'true'
GO
EXEC master.dbo.sp_serveroption @server=N'AZURESOURCELINKEDSVR', @optname=N'remote proc transaction promotion', @optvalue=N'false'
GO

Finally, in this example I will also be useing two small databases (EDW_Control and AdminDBA) which contain metadata information on the object(s) I will be processing and error logs for loads which fail to run correctly. The structure of EDW_Control database is pretty straightforward – it uses a collection of tables to track things like source and target schema names, record counts, table and indexes size etc. The important part is the fields called ‘Is_Big’ and ‘ETL_Batch_No’ which dictate whether the nominated object should be hash-partitioned and if so, into how many batches/streams. I wrote more about how these tables are structured in one of my previous blog post HERE. Details on how to build and deploy AdminDBA database can be found in my previous posts HERE and HERE. For reference, the EDW_Control database schema and a snapshot of metadata I will be processing looks as per below.

Hash-Partitioned Data Acquisition

Now that we’re all set up let’s explore the main code base behind the process responsible for all the heavy lifting. This acquisition method is based on two different stored procedures: one which handles metadata, processes coordination and jobs assignment and the second one which builds the actual SQL statements responsible for data insertion. You can download all the code from my OneDrive folder HERE so I will only go over some of the more interesting aspects of these stored procedures and finally provide a quick demo depicting runtime behavior and a short summary of its performance.

As mentioned, the magic sauce which allows for building reliable concurrent acquisition job is the hash-partitioning which converts single or composite primary key into a hash string, dynamically building self-contained stream of data. We can specify as many partitions as we want (no greater than the number of rows) which can be further scaled back based on the number of CPU cores available e.g. in the demo below I will be partitioning the source table into 8 partitions. We can also have the process determine the number of partitions (when run for multiple tables) based on the metadata collected e.g. row counts, data size etc. by assigning the value of 1 to ‘@Concurrent_Batch_No_Override’ parameter. The main functionality, however, is tied to the following bit of code which creates even ‘chunks’ of target data based on the sorted hash values.

SET @SQL = 'DECLARE @R1 INT = (SELECT  MIN(id) from #Hash_Temp)'										+CHAR(13);
SET @SQL = @SQL + 'DECLARE @R1_Hash VARCHAR(128) = (SELECT hash_key FROM #Hash_Temp WHERE id = @R1)'	+CHAR(13);
SET @SQL = @SQL + 'DECLARE @R2 BIGINT = (SELECT (MAX(id)-MIN(id)+1)'									+CHAR(13);
SET @SQL = @SQL + '/'+CAST(@etl_batch_no AS VARCHAR(10))+' as id FROM #Hash_Temp)'						+CHAR(13);
SET @SQL = @SQL + 'DECLARE @R2_Hash VARCHAR (128) = (SELECT hash_key FROM #Hash_Temp WHERE id = @R2)'	+CHAR(13);
SET @SQL = @SQL + 'DECLARE @R3 BIGINT = (SELECT MAX(id) from #Hash_Temp)'								+CHAR(13);
SET @SQL = @SQL + 'DECLARE @R3_Hash VARCHAR(128) = (SELECT hash_key FROM #Hash_Temp WHERE id = @R3)'	+CHAR(13);
SET @SQL = @SQL + 'INSERT INTO #Ids_Range'																+CHAR(13);
SET @SQL = @SQL + '(range_FROM, range_TO, hash_FROM, hash_TO) '											+CHAR(13);
SET @SQL = @SQL + 'SELECT @R1, @R1+@R2, @R1_Hash,(SELECT hash_key FROM #Hash_Temp WHERE id =@R1+@R2)'	+CHAR(13);
SET @SQL = @SQL + 'DECLARE @z INT = 1'																	+CHAR(13);
SET @SQL = @SQL + 'WHILE @z <= '+CAST(@etl_batch_no AS VARCHAR(10))+'-1'								   +CHAR(13);
SET @SQL = @SQL + 'BEGIN'																				+CHAR(13);
SET @SQL = @SQL + 'INSERT INTO #Ids_Range (Range_FROM, Range_TO, Hash_FROM, Hash_TO)'					+CHAR(13);
SET @SQL = @SQL + 'SELECT LAG(Range_TO,0) OVER (ORDER BY id DESC)+1, '									+CHAR(13);
SET @SQL = @SQL + 'CASE WHEN LAG(Range_TO,0) OVER (ORDER BY id DESC)+@R2+1 >= @R3'					   +CHAR(13);
SET @SQL = @SQL + 'THEN @R3 ELSE LAG(Range_TO,0) OVER (ORDER BY id DESC)+@R2+1 END,'					+CHAR(13);
SET @SQL = @SQL + '(SELECT hash_key FROM #Hash_Temp WHERE id =(SELECT LAG(Range_TO,0) '					+CHAR(13);
SET @SQL = @SQL + 'OVER (ORDER BY id DESC)+1 FROM #Ids_Range WHERE @z = id)),'							+CHAR(13);
SET @SQL = @SQL + '(SELECT Hash_key FROM #Hash_Temp WHERE id =(SELECT'									+CHAR(13);
SET @SQL = @SQL + 'CASE WHEN LAG(Range_TO,0) OVER (ORDER BY id DESC)+@R2+1 >= @R3'					   +CHAR(13);
SET @SQL = @SQL + 'THEN @R3 ELSE LAG(Range_TO,0) OVER (ORDER BY id DESC)+@R2+1 END'					    +CHAR(13);
SET @SQL = @SQL + 'FROM #Ids_Range WHERE @z = id))'														+CHAR(13);
SET @SQL = @SQL + 'FROM #Ids_Range WHERE @z = id'														+CHAR(13);
SET @SQL = @SQL + 'SET @z = @z+1'																		+CHAR(13);
SET @SQL = @SQL + 'END'																					+CHAR(13);
EXEC(@SQL)

This code creates uniform ‘slices’ of source table’s data using metadata information stored in Hash_Temp temp table. This metadata is created using HASHBYTES() function with SHA1 algorithm allowing for efficient values indexation and sorting, which in turn allows for partition ranges creation. For wide objects with small number of rows i.e. less than 1 million this should work relatively speedy, however, for bigger tables you may want to move some of this logic into a memory-optimized table. Given that my source data set is quite small (100K rows), I did not encounter any performance bottlenecks with TempDB-stored tables in this set up. Additionally, this stored procedure acts as a coordinator/dispatcher, ensuring that all conditions are validated, the number of concurrent streams does not exceed number of CPU cores, the runtime (checked every 5 seconds and capped at 200 lookups) does not go over this limit, in which case any outstanding jobs are terminated.

Finally, a self-contained SQL Server agent job is instantiated, calling the second stored procedure which is responsible for data insertion based on the partition number and hash values boundaries.

DECLARE @sql_job		NVARCHAR(MAX)	=		
'USE [targetdb]
EXEC	[dbo].['+@Worker_Proc_Name+']
@Source_Server_Name = '+@Source_Server_Name+',
@Source_Server_DB_Name = '''''+@Source_Server_DB_Name+''''',
@Source_Server_Schema_Name = '''''+@Source_Server_Schema_Name+''''',
@Source_Server_Object_Name = '''''+@table+''''',
@Target_Server_DB_Name = N'''''+@Target_Server_DB_Name+''''',
@Target_Server_Schema_Name = '''''+@Target_Server_Schema_Name+''''',
@Hash_SQL = N'''''+@hash_SQL+''''',
@Hash_FROM = N'''''+@hash_FROM+''''',
@Hash_TO = N'''''+@hash_TO+''''',
@Is_Big = N'''''+CAST(@is_big AS CHAR(1))+''''',
@Col_List_MSSQL = N'''''+@Col_List_MSSQL+''''',
@Target_Server_Object_Name = '''''+@table+''''',
@Exec_Instance_GUID	='''''+CAST(@Exec_Instance_GUID AS VARCHAR(128))+''''',
@Package_Name='''''+@Package_Name+''''' '
				

SET @SQL =			'IF EXISTS'
SET @SQL = @SQL +	'(SELECT TOP 1 1 FROM msdb..sysjobs_view job JOIN msdb.dbo.sysjobactivity activity '					+CHAR(13)
SET @SQL = @SQL +	'ON job.job_id = activity.job_id WHERE job.name = N'''+@job+''''										+CHAR(13)
SET @SQL = @SQL +	'AND job.date_created IS NOT NULL AND activity.stop_execution_date IS NULL)'							+CHAR(13)
SET @SQL = @SQL +	'BEGIN'																									+CHAR(13)
SET @SQL = @SQL +	'EXEC msdb..sp_stop_job @job_name=N'''+@job+''';'														+CHAR(13)																	
SET @SQL = @SQL +	'EXEC msdb..sp_delete_job @job_name=N'''+@job+''', @delete_unused_schedule=1'							+CHAR(13)									
SET @SQL = @SQL +	'END'																									+CHAR(13)
SET @SQL = @SQL +	'IF EXISTS'																								+CHAR(13)
SET @SQL = @SQL +	'(SELECT TOP 1 1 FROM msdb..sysjobs_view job JOIN msdb.dbo.sysjobactivity activity'						+CHAR(13)
SET @SQL = @SQL +	'ON job.job_id = activity.job_id WHERE job.name = N'''+@job+''''										+CHAR(13)
SET @SQL = @SQL +	'AND job.date_created IS NULL AND activity.stop_execution_date IS NOT NULL)'							+CHAR(13)
SET @SQL = @SQL +	'BEGIN'																									+CHAR(13)
SET @SQL = @SQL +	'EXEC msdb..sp_delete_job @job_name=N'''+@job+''', @delete_unused_schedule=1'							+CHAR(13)									
SET @SQL = @SQL +	'END'																									+CHAR(13)
SET @SQL = @SQL +	'IF EXISTS'																								+CHAR(13)
SET @SQL = @SQL +	'(SELECT TOP 1 1 FROM msdb..sysjobs_view job WHERE job.name = N'''+@job+''')'							+CHAR(13)
SET @SQL = @SQL +	'BEGIN'																									+CHAR(13)
SET @SQL = @SQL +	'EXEC msdb..sp_delete_job @job_name=N'''+@job+''', @delete_unused_schedule=1'							+CHAR(13)									
SET @SQL = @SQL +	'END'																									+CHAR(13)
SET @SQL = @SQL +	'EXEC msdb..sp_add_job '''+@job+''', @owner_login_name= '''+@job_owner+''';'							+CHAR(13)
SET @SQL = @SQL +	'EXEC msdb..sp_add_jobserver @job_name= '''+@job+''';'													+CHAR(13)			
SET @SQL = @SQL +	'EXEC msdb..sp_add_jobstep @job_name='''+@job+''', @step_name= ''Step1'', '								+CHAR(13)
SET @SQL = @SQL +	'@command = '''+@sql_job+''', @database_name = '''+@Target_Server_DB_Name+''', @on_success_action = 3;'	+CHAR(13)						
SET @SQL = @SQL +	'EXEC msdb..sp_add_jobstep @job_name = '''+@job+''', @step_name= ''Step2'','							+CHAR(13)
SET @SQL = @SQL +   '@command = '''+@delete_job_sql+''''																	+CHAR(13)
SET @SQL = @SQL +	'EXEC msdb..sp_start_job @job_name= '''+@job+''', @output_flag = 0'	

The short footage below demonstrates how our target Dummy_Table data is loaded using this architecture.

It is also worth highlighting that this code can run acquisitions for tables which are not hash-partitioned, in which case it will still create multiple parallel streams, providing a significant boost in comparison to sequential loads. As such we can mix and match objects which need to be broken into multiple streams i.e. large tables, as well as stand-alone loads which can move data across in a single transaction. Another interesting feature is the ability to ‘throttle’ the number of concurrent streams using the @Queue_Size parameter, which is set to the number of cores allocated to a SQL Server instance this process is executing on or a number we can specify. As such, even when we partition a large table into a number of streams which exceed that of the available cores, the queue size will always stay consistent and not outgrow the maximum number of concurrent streams allowed. This enables the process to ‘nibble’ at the queue (queue size check set to 1 second interval), maximizing resources utilization and ensuring that we have an optimal number of jobs running concurrently.

Sample Data Testing Results

Now that I’ve demonstrated how this technique can increase the performance of moving data, let’s look at some concrete numbers. I re-run the code for two small samples of data i.e. tables with 200 columns and 300 columns, across 100K rows of synthetic data and the results clearly demonstrated that even with the additional overhead of calculating hash values on primary keys, it is possible to achieve at least 4x performance boost with this architecture. Even on my old-ish E5-2670 v3 rig clocked at 2.3GHz with SAN storage attached we can observe a significant speed increase, with Azure SQL DB CPU not exceeding 15% utilization at any time. The rule of diminishing results will most likely kick in when a hash value for a very long composite primary key will need to be calculated. Likewise, I would expect this solution to regress in performance if long tables e.g. 10M+ rows are involved. To prevent this, it may be possible that with enough network throughput and a well optimized source table we can re-factor this code to transition the disk-based temporary hash table (as it’s implemented in this demo) to memory-optimized table, but I have not tried it myself.

My testing machine had only 4 cores at its disposal so I had limited resources available, however, with 32 core machines becoming the norm these days, with the right application I would expect those numbers to be significantly better. As a matter of fact, transitioning to this method yielded 8-10x performance increase for few tables sourced from Dynamics365 schema using more powerful hardware on my current project.

Naturally, the overall performance will depend heavily on the number of factors e.g. network and storage speed, CPU specs, schema structure, data types etc. but when possible (and applicable) one could improve data acquisition performance without resorting to bespoke frameworks, on a single machine and all using T-SQL/SQL Server Agent functionality as the underlining technology.

Tags: , , , , , , , ,

Kicking The Tires On Azure Functions – How To Build Simple Event-Driven Or Trigger-Based Data Pipelines Using Serverless Compute Workloads

May 11th, 2021 / 4 Comments » / by admin

Introduction

There are many ways to skin the cat when it comes to using public cloud services to solve business problems, so understanding pros and cons of each can often make a big difference. Container-based deployments have recently been all the rage for stateless applications but for small, cost-effective and short-lived workloads serverless (not a big fan of the buzzword BTW) may offer a better fit. For medium to large data volumes, I would refrain from using this paradigm due to architectural constraints e.g. limit on execution time. However, for anything lightweight e.g. data augmentation, file handling, real-time stream processing, API calls, essentially anything that is designed to be ‘chatty’ rather than ‘chunky’, functions may be a good alternative to running those processes in a dedicated environment. In this post I want to look at how Azure Functions can be used to augment relational database entries with Azure Cognitive Services and how flat files stored in Azure Blob Storage can be processed using trigger-based logic.

Both of those scenarios can be handled using many different compute styles but given the short-lived nature of these workloads, Azure Function may be the most cost-effective way of executing these pipelines. Additionally, these integrate well with other Azure services e.g. Data Factory, so for certain bespoke and targeted scenarios these may offer a good alternative to, for example, provisioning virtual machines or containers.

What are Azure Functions

Azure Functions is an event driven, compute-on-demand experience that extends the existing Azure application platform with capabilities to implement code triggered by events occurring in virtually any Azure or 3rd party service as well as on-premises systems. Azure Functions allows developers to take action by connecting to data sources or messaging solutions, thus making it easy to process and react to events. Additionally, as of March this year, Microsoft also announced Python support for Durable Functions – an extension to Azure Functions that lets developers orchestrate complex data processing and data science pipelines. Here is a quick, high-level video from Microsoft outlining the intended purpose and some of its functionality.

As Microsoft has a lot of good documentation outlining many possible scenarios where this type of compute paradigm can be useful, I won’t go into details on how to set up your environment, debug and monitor your Azure Functions; those topics are just a Google query away if you’d like to familiarize yourself with the overall development lifecycle. I will, however, go over a few details relating to some hurdles I encountered, as I found the overall development process straightforward, but not without a few little pitfalls.

Azure Functions Quick Development Guide

A function is the primary concept in Azure Functions. A function contains two important pieces – our code, which can be written in a variety of languages, and some config, the function.json file. For compiled languages, this config file is generated automatically from annotations in your code. For scripting languages, you must provide the config file yourself.

The function.json file defines the function’s trigger, bindings, and other configuration settings. Every function has one and only one trigger. The runtime uses this config file to determine the events to monitor and how to pass data into and return data from a function execution. The host.json file contains runtime-specific configurations and is in the root folder of the function app. A bin folder contains packages and other library files that the function app requires. This file does get published to Azure. The local.settings.json file is used to store app settings and connection strings when running locally. This file also doesn’t get published to Azure.

Azure Functions expects a function to be a stateless method in our Python script that processes input and produces output. By default, the runtime expects the method to be implemented as a global method called main() in the __init__.py file. Also, all functions in a function app must be authored in the same language.

A function app provides an execution context in Azure in which your functions run. As such, it is the unit of deployment and management for your functions. A function app is comprised of one or more individual functions that are managed, deployed, and scaled together. All of the functions in a function app share the same pricing plan, deployment method, and runtime version. Think of a function app as a way to organize and collectively manage your functions. The code for all the functions in a specific function app is located in a root project folder that contains a host configuration file and one or more sub-folders. Each sub-folder contains the code for a separate function. The recommended folder structure for a Python Functions project looks like the following example.

 <project_root>/
 | - .venv/
 | - .vscode/
 | - my_first_function/
 | | - __init__.py
 | | - function.json
 | | - example.py
 | - my_second_function/
 | | - __init__.py
 | | - function.json
 | - shared_code/
 | | - __init__.py
 | | - my_first_helper_function.py
 | | - my_second_helper_function.py
 | - tests/
 | | - test_my_second_function.py
 | - .funcignore
 | - host.json
 | - local.settings.json
 | - requirements.txt
 | - Dockerfile

Function apps can be authored and published using a variety of tools, including Visual Studio, Visual Studio Code, IntelliJ, Eclipse, and the Azure Functions Core Tools. All the solution artifacts are created automatically by the development environment (in my case VS Code) and even though Azure portal lets you update your code and your function.json file directly inline, best practice is to use a local development tool like VS Code.

Ok, let’s jump in into our first case scenario.

Augmenting Azure SQL DB data with Azure Cognitive Services Sentiment Analysis and Key Terms

Microsoft, just as any other major cloud vendor, provides as suite of text-mining AI services that uncover insights such as sentiment analysis, entities, relations and key phrases in unstructured and structured text. Working with a client in my past engagement, I used it to provide sentiment analysis and key terms extraction for Dynamics 365 emails. Both of those allowed the business to prioritise and respond to messages which had negative tone e.g. complains, dissatisfied clients, urgent issues that required immediate attention as well as looking at the most frequently occurring statements and phrases to discern most talked about issues and topics (these look really good when visualized as word clouds). In this scenario I will outline how with very little code (Python in this case) one can do the same and augment dummy feedback data with sentiment analysis and term extraction. Feedback data is stored in a SQL database and the function app runs on a 5 minute cadence (fully configurable). The architecture for this is pretty straightforward and as always, I will make all code required to replicate this solution in my OneDrive folder HERE.

To simplify things, I will skip the App Service Web app and assume that that we already have feedback data stored inside the nominated database object, ready for further processing. Also, the code to deploy Azure SQL DB along with its objects and sample feedback data is very similar to the one from my previous blog post HERE so you can source it from there (alternatively there is a separate copy in my aforementioned OneDrive folder HERE). Finally, creating the Cognitive Services resource is very easy to spin up in the portal so in the interest of keeping this post concise, I will skip the details. The only piece of information required from the service is the pair of keys for API endpoint access.

Now, assuming we have Azure SQL DB and Cognitive Services resource up-and-running, we should be able to investigate how Azure Function is built and start augmenting the feedback entries with text analytics. The following code (__init__.py file) takes 6 records from our Azure SQL database containing feedback information (see DDL and DML code HERE) and passes those through the Azure Cognitive Service API to assign the following values: Feedback_Sentiment, Positive_Sentiment_Score, Negative_Sentiment_Score, Neutral_Sentiment_Score. It also extracts key phrases and stores those in the Key_Phrases field in the same table.

import logging
import azure.functions as func
import pyodbc
from azure.ai.textanalytics import TextAnalyticsClient
from azure.core.credentials import AzureKeyCredential

RESOURCE_GROUP = 'YourResourceGroupName'
LOCATION = 'australiasoutheast'
SQL_SERVER = 'yourservername.database.windows.net'
SQL_DB = 'yourdatabasename'
USERNAME = 'yourusername'
PASSWORD = 'YourTopS3cretPa$$word'
DRIVER = '{ODBC Driver 17 for SQL Server}'

resource_key = 'YourResourceKey'
resource_endpoint = 'https://yoururl.cognitiveservices.azure.com/'


def authenticate_client():
    ta_credential = AzureKeyCredential(resource_key)
    text_analytics_client = TextAnalyticsClient(
        endpoint=resource_endpoint,
        credential=ta_credential,
        api_version="v3.1-preview.3")
    return text_analytics_client


def main(mytimer: func.TimerRequest) -> None:
    client = authenticate_client()
    logging.info(
        'Attempting {mssql} SQL Server connection...'.format(mssql=SQL_SERVER))
    try:
        cnxn = pyodbc.connect('DRIVER='+DRIVER+';SERVER='+SQL_SERVER +
                              ';PORT=1433;DATABASE='+SQL_DB+';UID='+USERNAME+';PWD=' + PASSWORD)
    except pyodbc.Error as e:
        sqlstate = e.args[1]
        logging.error(
            sqlstate)
    if cnxn:
        cursor = cnxn.cursor()
        # Get all Feedback field entries from the table skipping where Feedback_Sentiment values have already been assigned
        # Also filter any values which exceed maximum allowed length i.e. 5120 characters
        rows = cursor.execute(
            """SELECT a.ID,
                    a.Feedback
                    FROM
                    (
                        SELECT 
                            ID,
                            REPLACE(REPLACE(REPLACE(REPLACE(feedback, CHAR(13), ' '), CHAR(10), ' '), CHAR(160), ' '), CHAR(9), ' ') AS Feedback
                        FROM dbo.feedback
                        WHERE Feedback_Sentiment IS NULL
                    ) a
                    WHERE LEN(a.Feedback) <= 5120""")
        rows = cursor.fetchall()
        if rows:
            for row in rows:
                try:
                    # for each row returned get sentiment value and update Feedback table
                    response = client.analyze_sentiment(
                        documents=[row[1]])[0]
                    if not response.is_error:
                        cursor.execute("""UPDATE dbo.feedback
                                        SET Feedback_Sentiment = ?,
                                        Positive_Sentiment_Score = ?,
                                        Negative_Sentiment_Score = ?,
                                        Neutral_Sentiment_Score = ?
                                        WHERE ID = ?""", response.sentiment.title(), response.confidence_scores.positive, response.confidence_scores.negative, response.confidence_scores.neutral, row[0])
                        cursor.commit()
                except pyodbc.Error as e:
                    sqlstate = e.args[1]
                    logging.error(sqlstate)
        # Get all description field entries from the table skipping where Key_Phrases values have already been assigned
        # Also filter any values which exceed maximum allowed length i.e. 5120 characters
        else:
            logging.info(
                'No records with matching update criteria for Feedback_Sentiment fields found in the Feedback table.')
        rows = cursor.execute("""SELECT a.ID,
                        a.Feedback
                    FROM
                    (
                        SELECT ID,
                            REPLACE(REPLACE(REPLACE(REPLACE(Feedback, CHAR(13), ' '), CHAR(10), ' '), CHAR(160), ' '), CHAR(9), ' ') AS Feedback
                        FROM dbo.feedback
                        WHERE Key_Phrases IS NULL
                    ) a
                    WHERE LEN(a.Feedback) <= 5120""")
        rows = cursor.fetchall()
        if rows:
            for row in rows:
                try:
                    # for each row returned get key phrases value and update Feedback table
                    response = client.extract_key_phrases(
                        documents=[row[1]])[0]
                    if not response.is_error:
                        cursor.execute("""UPDATE dbo.Feedback
                                        SET Key_Phrases = ?
                                        WHERE ID = ?""", ', '.join(response.key_phrases), row[0])
                        cursor.commit()
                except pyodbc.Error as e:
                    sqlstate = e.args[1]
                    logging.error(sqlstate)
        else:
            logging.info(
                'No records with matching update criteria for Key_Phrases field found in the Feedback table')
        cursor.execute("""UPDATE dbo.feedback
                            SET Key_Phrases = CASE WHEN LEN(Feedback) > 5120 
                            THEN 'Content Deemed Too Long For Terms Extraction' ELSE 'Unknown' END
                            WHERE Key_Phrases IS NULL
                            """)
        cursor.commit()
        cursor.execute("""UPDATE dbo.Feedback
                            SET Feedback_Sentiment = CASE WHEN LEN(Feedback) > 5120 
                            THEN 'Content Deemed Too Long For Sentiment Extraction' ELSE 'Unknown' END
                            WHERE Feedback_Sentiment IS NULL
                            """)
        cursor.commit()

The function is executed in 5-minute intervals (using timer trigger). Schedule value is a six-field CRON expression – by providing */5 * * * * value, the function will run every 5 minutes from the first run. We can also notice that even though none of the entries used for sentiment analysis or term extraction were longer than 5120 characters, due to API limitation on the maximum length of string allowed, there is a SQL predicate used to only account for those entries which fall under this threshold.

Looking at the Azure portal in the dedicated resource created by the deployment process we can notice the first 3 function invocations failing. This is where logs become very helpful and with a quick glance at the execution details, we can see the issue was caused by the function app not being whitelisted for access to Azure SQL database, where the feedback data was stored. Once the function environment IP address was added to the firewall rule, it all run smoothly.

Finally, looking at the feedback data we can see that all required attributes were updated as per the above script (click on image to enlarge).

Updating Azure SQL Database based on Blob Storage File Input Trigger

Next up, we will be looking at another scenario where a number of JSON files have been generated using the sample Python code and the task is to extract specific data out of those entries and log it in the already per-provisioned database and table.

To make things a bit more interactive and closer to what the real-world scenario might look like, let’s assume that we need to source the RSS feeds data from Microsoft’s Tech Community blogs dealing with three topics: Azure DevOps, Azure Data Factory and Azure SQL. Each one of those feeds has been associated with a URL and, using the below script, I downloaded 5 entries for each channel, parsed the output and serialized data into 3 different JSON files (each corresponding to the dedicated RSS feed) and finally staged those in the Azure Blob storage.

import xml.etree.ElementTree as ET
import json
import requests
from datetime import datetime
from azure.storage.blob import BlobServiceClient, ContainerClient
from azure.common.client_factory import get_client_from_cli_profile

urls = {'Azure Data Factory': 'https://techcommunity.microsoft.com/plugins/custom/microsoft/o365/custom-blog-rss?tid=5653966150464677274&board=AzureDataFactoryBlog&label=&messages=&size=5',
        'Azure DevOps': 'https://techcommunity.microsoft.com/plugins/custom/microsoft/o365/custom-blog-rss?tid=5653966150464677274&board=AzureDevOps&label=&messages=&size=5',
        'Azure SQL': 'https://techcommunity.microsoft.com/plugins/custom/microsoft/o365/custom-blog-rss?tid=5653966150464677274&board=AzureSQLBlog&label=&messages=&size=5'
        }
az_storage_connection_string = 'YourAzureStorageConnectionString'
container_name = 'rssfiles'


def parse_rss_feed(content):
    articles = []
    root = ET.fromstring(content)
    articles_collection = root.findall("./channel/item")
    for article in articles_collection:
        article_dict = {}
        for elem in article.iter():
            article_dict[elem.tag] = elem.text.strip()
        articles.append(article_dict)
    return articles


def main(urls, az_storage_connection_string, container_name):
    for k, v in urls.items():
        articles = []
        response = requests.get(url=v)
        if response.ok:
            articles_parsed = parse_rss_feed(content=response.content)
            if articles_parsed:
                articles = articles + articles_parsed
        for element in articles:
            if 'item' in element:
                del element['item']
        filename = "Microsoft_RSS_Feed_{blog}_{ts}.json".format(blog=k.replace(' ', '_'),
                                                                ts=datetime.now().strftime("%Y%m%d_%I%M%S%p")
                                                                )

        container_client = ContainerClient.from_connection_string(
            conn_str=az_storage_connection_string, container_name=container_name)

        blob_service_client = BlobServiceClient(
            account_url=az_storage_connection_string)

        all_containers = blob_service_client.list_containers(
            name_starts_with='rss')

        container_client.upload_blob(
            name=filename,
            data=json.dumps(obj=articles, indent=4)
        )


if __name__ == "__main__":
    main(urls, az_storage_connection_string, container_name)

When executed, the following three files were created in the nominated destination.

Next, let’s assume we would like to extract File Name, Title, Link and Description attributes and insert those into a table as soon as the files have been created in the Azure Blob storage. This is where another type of Azure function, one that can react to changes in the blob storage, can be used. The Blob storage trigger starts a function when a new or updated blob is detected. Polling works as a hybrid between inspecting logs and running periodic container scans and blobs are scanned in groups of 10,000 at a time with a continuation token used between intervals. Once the new file has been discovered, the function is invoked, and the process simply reads the file content and inserts the selected data into the rss_feeds table as per the diagram below.

The following short Python snippet was used to construct the Blob Trigger function responsible for reading the RSS feed from one or many JSON files, extract relevant attributes and finally insert the required data into the table.

import logging
import json
import azure.functions as func
import pyodbc
from azure.storage.blob import BlobServiceClient, ContainerClient


SQL_SERVER = 'yourservername.database.windows.net'
SQL_DB = 'yourdatabasename'
USERNAME = 'yourusername'
PASSWORD = 'YourTopS3cretPa$$word'
DRIVER = '{ODBC Driver 17 for SQL Server}'
az_storage_connection_string = 'YourAzureStorageConnectionString'
container_name = 'rssfiles'


def main(myblob: func.InputStream):
    blob_service_client = BlobServiceClient.from_connection_string(
        az_storage_connection_string)
    container = ContainerClient.from_connection_string(
        conn_str=az_storage_connection_string, container_name=container_name)
    blob_list = container.list_blobs()

    if blob_list:
        for blob in blob_list:
            blob_client = blob_service_client.get_blob_client(
                container=container_name, blob=blob.name)
            streamdownloader = blob_client.download_blob()
            file_content = json.loads(streamdownloader.readall())
            if file_content:
                for col in file_content:
                    logging.info(
                        'Attempting {mssql} SQL Server connection...'.format(mssql=SQL_SERVER))
                    cnxn = pyodbc.connect('DRIVER='+DRIVER+';SERVER='+SQL_SERVER +
                                          ';PORT=1433;DATABASE='+SQL_DB+';UID='+USERNAME+';PWD=' + PASSWORD)
                    if cnxn:
                        cursor = cnxn.cursor()
                        cursor.execute("""INSERT INTO dbo.rss_feeds (RSS_Filename, RSS_Title, RSS_Link, RSS_Description)
                                    SELECT ?,?,?,?""", blob.name, col['title'], col['link'], col['description'])
                        cursor.commit()
                    else:
                        logging.warning(
                            'Could not establish connection to ''{mssql}'' server.'.format(mssql=SQL_SERVER))
            else:
                logging.warning(
                    'No data found in file ''{file}''.'.format(file=blob.name))

Finally, after function’s first invocation we can query the rss_feeds table to ensure the data has been extracted successfully (click on image to enlarge).

While I have not developed a dedicated logic to handle files which were already read from, one can easily archive those into a separate blob container or delete them altogether. Likewise, the first Python snippet of code responsible for JSON file creation can be easily tuned into a function itself thus fully automating the process lifecycle. As you can see, there are many different ways in which robust pipelines can be provisioned, even going as far as using orchestrator function (part of durable functions) to manage state, checkpoints and restarts.

Conclusion

So there you go, a quick overview of some of the capabilities that Azure Functions provide without having to explicitly provision or manage infrastructure.

Azure also offers other integration and automation services which define inputs, actions, conditions, and outputs e.g. Web Jobs, Logic Apps etc. and sometimes it difficult to discern which one of those options is the right one for the given scenario. There are many factors to consider e.g. from development point of view, Azure Functions offer more control and fewer limitations when faced with complex scenarios due to its code-first development model (Logic Apps offer designer-first approach). However, Logic Apps offer better out-of-the-box connectivity with extensive suite of options, ranging from Azure Service to popular SaaS solutions. As Azure Functions do not have connectors, they rely on triggers and input and output bindings for Storage, Event Hubs, Service Bus, and Cosmos DB services. From my perspective, Logic Apps are better suited when building integration solutions due to the very extensive list of connectors that should reduce the time-to-market, and when rich visual tools to build and manage are preferred. On the flip side, Azure Functions are a better fit if you require or prefer to have all the power and flexibility of a robust programming language, or need more portability, and the available bindings and logging capabilities are sufficient.

Azure Functions is designed to accelerate the process of application development and make it quick and simple. Through Azure Functions serverless architecture, it is possible to stop worrying about all the infrastructure considerations, and just focus on creating and managing proper code defining triggers and events. The highlight of Azure Functions is that you can write the code in the easy-to-use web interfaces and build and debug them locally on your machine of choice. Additionally, the pay-per-execution model is a good way to build scalable, enterprise-grade applications and pipelines without having to pay upfront and oftentimes through the nose. For anything that can be modeled as an event or trigger e.g. processing individual files, HTTP-triggers, handling API calls, managing queue messages etc. Azure Functions offer a cheap (micro-pay), no-ops (fully managed), scalable compute option.

Tags: , , , , , , ,