SSIS Packages Execution Workflow Control – How To Run a SSIS Package Based on Another Package Execution Status

July 20th, 2015 / No Comments » / by admin

SQL Server Integration Services architecture offers an easy way to impose workflow control based on individual transformation execution status through precedence constraints implementation. If, for example, a transformation fails during package execution and the subsequent tasks don’t require to be executed as a result of this, precedence constraint allows for the package workflow to be re-directed to an error-handling routine without failing the package as a whole. Precedence constraints also provide a rudimentary expression evaluation engine which further extends their functionality through the ability to utilise system or user-created variables in conjunction with various functions.

This functionality works well in the context of a single, self-contained package but what if we would like to execute a package or a package’s task based on the execution status of another, different package? One potential scenario that I found this solution useful for was during a data warehouse reload, where based on the data acquisition package execution status i.e. success or failure, the subsequent data warehouse re-load package had ‘the green light’ (or otherwise) to proceed with the data warehouse refresh. This functionality can be achieved by means of maintaining a log table with the packages execution results for each run but since SQL Server is more than capable to look after SQL Server Agent metadata, it’s probably easier to source this information straight out of msdb system database.

Let’s look at a sample example. First, let’s create a sample SQL Server database with one dummy table in it and a package called ‘SampleTask1′ which we also deploy as a SQL Server agent job. The package itself runs some simple SQL code so its functionality is limited to either failing or succeeding based on whether the embedded SELECT SQL statement is dividing by 0 or 1 as per image below.

USE [master];
GO
IF EXISTS ( SELECT  name
            FROM    sys.databases
            WHERE   name = N'SampleDB' )
    BEGIN
-- Close connections to the DW_Sample database
        ALTER DATABASE [SampleDB] SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
        DROP DATABASE [SampleDB];
    END;
GO
CREATE DATABASE [SampleDB] ON PRIMARY
( NAME = N'SampleDB'
, FILENAME = N'C:\DB_Files\SampleDB.mdf'
, SIZE = 10MB
, MAXSIZE = 1GB
, FILEGROWTH = 10MB ) LOG ON
( NAME = N'SampleDB_log'
, FILENAME = N'C:\DB_Files\SampleDB_log.LDF'
, SIZE = 1MB
, MAXSIZE = 1GB
, FILEGROWTH = 10MB);
GO
--Assign database ownership to login SA
EXEC SampleDB.dbo.sp_changedbowner @loginame = N'SA', @map = false;
GO
--Change the recovery model to BULK_LOGGED
ALTER DATABASE [SampleDB] SET RECOVERY BULK_LOGGED;
GO
USE SampleDB;
GO
--Create a sample table inside the SampleDB database
CREATE TABLE [dbo].[dummy_table]
    (
      [dummy_record] [DECIMAL](18, 4) NULL
    )
ON  [PRIMARY];
GO

SSIS_Exec_Precedence_SampleTask1SQLStatement

The important aspect is that when executed via the SQL Agent job, some of its execution metadata e.g. last execution status can be read from msdb database by querying its system tables e.g. mdsb.dbo.sysjobs, msdb.dbo.sysjobschedules. The values from those tables can be easily translated into variables and used to control package execution workflow using SSIS script component. Further, let’s assume that the next package we would like to start running (let’s call it ‘SampleTask2′) depends directly on the outcome of ‘SampleTask1′ package successful execution. To examine this we will create a sample Script Task and create a few variables which will be passed and read from during this task execution:

  • first_package_execution_status – exexecution status of the first package (represented as Int32 data type). This variable will guide the execution flow of the subsequent Script Tasks responsible for displaying message box popup
  • first_package_name – name of the first package
  • output_message – message displayed as a result of first package execution status i.e. ‘SampleTask1 Failed!’ or ‘SampleTask1 Succeeded!’
  • server_name – SQL Server instance name where the first package is executed on

SSIS_Exec_Precedence_SampleTask2Variables

Depending on whether ‘SampleTask1′ (executed as a SQL Server Agent job) failed or succeeded, ‘SampleTask2′ package control flow logic should adjust accordingly. The core logic governing the control flow is created by a simple snippet of C#.

#region Help:  Introduction to the script task
/* The Script Task allows you to perform virtually any operation that can be accomplished in
 * a .Net application within the context of an Integration Services control flow.
 *
 * Expand the other regions which have "Help" prefixes for examples of specific ways to use
 * Integration Services features within this script task. */
#endregion

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.Data.SqlClient;
using System.Data.OleDb;
using System.Data.Common;
using Wrap = Microsoft.SqlServer.Dts.Runtime.ManagedWrapper;
#endregion

namespace ST_44c20947eb6c4c5cb2f698bdd17b3534
{
    ///
<summary>
    /// ScriptMain is the entry point class of the script.  Do not change the name, attributes,
    /// or parent of this class.
    /// </summary>

    [Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
    public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
    {
        #region Help:  Using Integration Services variables and parameters in a script
        /* To use a variable in this script, first ensure that the variable has been added to
         * either the list contained in the ReadOnlyVariables property or the list contained in
         * the ReadWriteVariables property of this script task, according to whether or not your
         * code needs to write to the variable.  To add the variable, save this script, close this instance of
         * Visual Studio, and update the ReadOnlyVariables and
         * ReadWriteVariables properties in the Script Transformation Editor window.
         * To use a parameter in this script, follow the same steps. Parameters are always read-only.
         *
         * Example of reading from a variable:
         *  DateTime startTime = (DateTime) Dts.Variables["System::StartTime"].Value;
         *
         * Example of writing to a variable:
         *  Dts.Variables["User::myStringVariable"].Value = "new value";
         *
         * Example of reading from a package parameter:
         *  int batchId = (int) Dts.Variables["$Package::batchId"].Value;
         *
         * Example of reading from a project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].Value;
         *
         * Example of reading from a sensitive project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].GetSensitiveValue();
         * */

        #endregion

        #region Help:  Firing Integration Services events from a script
        /* This script task can fire events for logging purposes.
         *
         * Example of firing an error event:
         *  Dts.Events.FireError(18, "Process Values", "Bad value", "", 0);
         *
         * Example of firing an information event:
         *  Dts.Events.FireInformation(3, "Process Values", "Processing has started", "", 0, ref fireAgain)
         *
         * Example of firing a warning event:
         *  Dts.Events.FireWarning(14, "Process Values", "No values received for input", "", 0);
         * */
        #endregion

        #region Help:  Using Integration Services connection managers in a script
        /* Some types of connection managers can be used in this script task.  See the topic
         * "Working with Connection Managers Programatically" for details.
         *
         * Example of using an ADO.Net connection manager:
         *  object rawConnection = Dts.Connections["Sales DB"].AcquireConnection(Dts.Transaction);
         *  SqlConnection myADONETConnection = (SqlConnection)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Sales DB"].ReleaseConnection(rawConnection);
         *
         * Example of using a File connection manager
         *  object rawConnection = Dts.Connections["Prices.zip"].AcquireConnection(Dts.Transaction);
         *  string filePath = (string)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Prices.zip"].ReleaseConnection(rawConnection);
         * */
        #endregion

        ///
<summary>
        /// This method is called when this script task executes in the control flow.
        /// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
        /// To open Help, press F1.
        /// </summary>

        ///
        public void Main()
        {

            bool fireAgain = true;
            //Dts.Events.FireInformation(0, "Test1", "Val1", String.Empty, 0, ref fireAgain);

            try
            {
                var conn1 = new OleDbConnection();
                string sqlServerInstance = Dts.Variables["server_name"].Value.ToString();
                conn1.ConnectionString =
                              "Driver=SQLOLEDB;" +
                              "Data Source=" + sqlServerInstance + ";" +
                              "Provider=SQLOLEDB;" +
                              "Initial Catalog=msdb;" +
                              "Integrated Security=SSPI;";
                conn1.Open();
                OleDbDataReader myDataReader1 = null;
                OleDbCommand LRO_Conn = new OleDbCommand(@"SELECT CAST(sjs.last_run_outcome as INT) as LRO
                                                                FROM msdb.dbo.sysjobs sj
                                                                LEFT JOIN msdb.dbo.sysjobservers  sjs ON sj.job_id = sjs.job_id
                                                                WHERE sj.name = ?
                                                                AND sj.enabled = 1", conn1);

                LRO_Conn.Parameters.Add("@name", OleDbType.VarChar, 150).Value = Dts.Variables["first_package_name"].Value;

                myDataReader1 = LRO_Conn.ExecuteReader();
                while (myDataReader1.Read())
                {
                    Dts.Variables["first_package_execution_status"].Value = myDataReader1["LRO"];
                }
                myDataReader1.Close();
                conn1.Close();
                bool b1 = Convert.ToBoolean(Dts.Variables["first_package_execution_status"].Value);
                if (b1 != true)
                {
                    Dts.Variables["output_message"].Value = "SampleTask1 Failed!";
                    Dts.Variables["first_package_execution_status"].Value = 0;
                }
                else
                {
                    Dts.Variables["output_message"].Value = "SampleTask1 Succeeded!";
                    Dts.Variables["first_package_execution_status"].Value = 1;
                }
            }
            catch (Exception e)
            {
                Dts.Events.FireInformation(0, "Exception occured: ", e.ToString(), String.Empty, 0, ref fireAgain);
            }
        }
        #region ScriptResults declaration
        ///
<summary>
        /// This enum provides a convenient shorthand within the scope of this class for setting the
        /// result of the script.
        ///
        /// This code was generated automatically.
        /// </summary>

        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion
    }
}

Most important part is the SQL statement which queries msdb database for ‘SampleTask1′ package execution metadata and populates the variables used for further execution with the respective values. Below is a screenshot of how the variables defined earlier are assigned their respective values based on their function during the first Script Task execution.

SSIS_Exec_Precedence_SampleTask2ModifiedWriteVariablesMapping

The subsequent Script Task components simply displays message box window with the ‘output_message’ variable value to visually determine whether the preceding package was executed successfully or not. Below is a sample footage depicting this solution final execution output (in this case first handling SampleTask1 failure notification followed by the successful execution pop-up alert). You can also download all the solution files for this exercise from my OneDrive folder HERE.

In my professional practice I have always liked to combine this logic with the error logging solution which I have written about extensively in my previous blog posts HERE and HERE. In this way I can create a robust architecture which not only queries the msdb database for metadata but also checks if there were any errors encountered during package execution. In that way, even if the package executed successfully I can pick up on potential issues which wouldn’t necessarily cause the execution failure but may be critical enough to decide on subsequent packages execution.

To demonstrate this let’s use our sample database created previously and create a stored procedure which would include the error-invoking SQL code (dividing by zero). This stored procedure will be run by a modified version of ‘SampleTask1′ package in order to map and pass a couple of system parameters which would constitute the error logging metadata i.e. ExecutionInstanceGUID and PackageName as per the image below.

USE SampleDB;
GO
CREATE PROCEDURE usp_divide_by_zero
    (
      @Exec_Instance_GUID UNIQUEIDENTIFIER ,
      @Package_Name VARCHAR(256)
    )
AS
    BEGIN
        DECLARE @Target_DB_Name VARCHAR(128) = 'SampleDB';
        DECLARE @Target_DB_Schema_Name VARCHAR(56) = 'dbo';
        DECLARE @Target_DB_Object_Name VARCHAR(128) = 'dummy_table';

        BEGIN TRY
            BEGIN TRANSACTION;
            INSERT  INTO SampleDB.dbo.dummy_table
                    ( dummy_record )
                    SELECT  1 / 0;
            COMMIT TRANSACTION;
        END TRY

        BEGIN CATCH
            ROLLBACK TRANSACTION;
            WITH    TempErr (
							[ErrorNumber],
							[ErrorSeverity],
							[ErrorState],
							[ErrorLine],
							[ErrorMessage],
							[ErrorDateTime],
							[LoginName],
							[UserName],
							[PackageName],
							[ObjectID],
							[ProcessID],
							[ExecutionInstanceGUID],
							[DBName] )
                      AS ( SELECT   ERROR_NUMBER() AS ErrorNumber ,
                                    ERROR_SEVERITY() AS ErrorSeverity ,
                                    ERROR_STATE() AS ErrorState ,
                                    ERROR_LINE() AS ErrorLine ,
                                    ERROR_MESSAGE() AS ErrorMessage ,
                                    SYSDATETIME() AS ErrorDateTime ,
                                    SYSTEM_USER AS LoginName ,
                                    USER_NAME() AS UserName ,
                                    @Package_Name ,
                                    OBJECT_ID('' + @Target_DB_Name + '.'
                                              + @Target_DB_Schema_Name + '.'
                                              + @Target_DB_Object_Name + '') AS ObjectID ,
                                    ( SELECT    a.objectid
                                      FROM      sys.dm_exec_requests r
                                                CROSS   APPLY sys.dm_exec_sql_text(r.sql_handle) a
                                      WHERE     session_id = @@spid
                                    ) AS ProcessID ,
                                    @Exec_Instance_GUID AS ExecutionInstanceGUID ,
                                    DB_NAME() AS DatabaseName
                         )
                INSERT  INTO AdminDBA.dbo.LogSSISErrors_Error
                        ( [ErrorNumber] ,
                          [ErrorSeverity] ,
                          [ErrorState] ,
                          [ErrorLine] ,
                          [ErrorMessage] ,
                          [ErrorDateTime] ,
                          [FKLoginID] ,
                          [FKUserID] ,
                          [FKPackageID] ,
                          [FKObjectID] ,
                          [FKProcessID] ,
                          [ExecutionInstanceGUID]
                        )
                        SELECT  ErrorNumber = COALESCE(err.ErrorNumber, -1) ,
                                ErrorSeverity = COALESCE(err.[ErrorSeverity],
                                                         -1) ,
                                ErrorState = COALESCE(err.[ErrorState], -1) ,
                                ErrorLine = COALESCE(err.[ErrorLine], -1) ,
                                ErrorMessage = COALESCE(err.[ErrorMessage],
                                                        'Unknown') ,
                                ErrorDateTime = ErrorDateTime ,
                                FKLoginID = src_login.ID ,
                                FKUserID = src_user.ID ,
                                [FKPackageID] = src_package.ID ,
                                [FKObjectID] = src_object.ID ,
                                [FKProcessID] = src_process.ID ,
                                [ExecutionInstanceGUID] = err.ExecutionInstanceGUID
                        FROM    TempErr err
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Login src_login ON err.LoginName = src_login.LoginName
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_User src_user ON err.UserName = src_user.UserName
                                                              AND src_user.FKDBID = ( SELECT
                                                              ID
                                                              FROM
                                                              AdminDBA.dbo.LogSSISErrors_DB db
                                                              WHERE
                                                              db.DBName = err.DBName
                                                              )
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Package src_package ON err.PackageName = ( LEFT(src_package.PackageName,
                                                              CHARINDEX('.',
                                                              src_package.PackageName)
                                                              - 1) )
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Object src_object ON err.ObjectID = src_object.ObjectID
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Process src_process ON err.ProcessID = src_process.ProcessID
                        WHERE   src_login.CurrentlyUsed = 1
                                AND src_user.CurrentlyUsed = 1
                                --AND src_package.CurrentlyUsed = 1
                                AND src_object.CurrentlyUsed = 1
                                AND src_process.CurrentlyUsed = 1;
        END CATCH;
    END;

SSIS_Exec_Precedence_SampleTask1ModifiedParametersMapping

Providing we have the AdminDBA database created (see my previous posts HERE and HERE), the BEGIN TRY … END TRY error handling code in the stored procedure should now capture the ‘divide by zero’ statement and log it into the AdminDBA database, in spite of the package reporting successful execution (see image below).

SSIS_Exec_Precedence_AdminDBAErrorLogView

Armed with this knowledge we can now determine further course of action for subsequent packages using the following code (handles both, msdb metadata and detailed error logging metadata).

#region Help:  Introduction to the script task
/* The Script Task allows you to perform virtually any operation that can be accomplished in
 * a .Net application within the context of an Integration Services control flow.
 *
 * Expand the other regions which have "Help" prefixes for examples of specific ways to use
 * Integration Services features within this script task. */
#endregion

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.Data.SqlClient;
using System.Data.OleDb;
using System.Data.Common;
using Wrap = Microsoft.SqlServer.Dts.Runtime.ManagedWrapper;
#endregion

namespace ST_44c20947eb6c4c5cb2f698bdd17b3534
{
    ///
<summary>
    /// ScriptMain is the entry point class of the script.  Do not change the name, attributes,
    /// or parent of this class.
    /// </summary>

    [Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
    public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
    {
        #region Help:  Using Integration Services variables and parameters in a script
        /* To use a variable in this script, first ensure that the variable has been added to
         * either the list contained in the ReadOnlyVariables property or the list contained in
         * the ReadWriteVariables property of this script task, according to whether or not your
         * code needs to write to the variable.  To add the variable, save this script, close this instance of
         * Visual Studio, and update the ReadOnlyVariables and
         * ReadWriteVariables properties in the Script Transformation Editor window.
         * To use a parameter in this script, follow the same steps. Parameters are always read-only.
         *
         * Example of reading from a variable:
         *  DateTime startTime = (DateTime) Dts.Variables["System::StartTime"].Value;
         *
         * Example of writing to a variable:
         *  Dts.Variables["User::myStringVariable"].Value = "new value";
         *
         * Example of reading from a package parameter:
         *  int batchId = (int) Dts.Variables["$Package::batchId"].Value;
         *
         * Example of reading from a project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].Value;
         *
         * Example of reading from a sensitive project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].GetSensitiveValue();
         * */

        #endregion

        #region Help:  Firing Integration Services events from a script
        /* This script task can fire events for logging purposes.
         *
         * Example of firing an error event:
         *  Dts.Events.FireError(18, "Process Values", "Bad value", "", 0);
         *
         * Example of firing an information event:
         *  Dts.Events.FireInformation(3, "Process Values", "Processing has started", "", 0, ref fireAgain)
         *
         * Example of firing a warning event:
         *  Dts.Events.FireWarning(14, "Process Values", "No values received for input", "", 0);
         * */
        #endregion

        #region Help:  Using Integration Services connection managers in a script
        /* Some types of connection managers can be used in this script task.  See the topic
         * "Working with Connection Managers Programatically" for details.
         *
         * Example of using an ADO.Net connection manager:
         *  object rawConnection = Dts.Connections["Sales DB"].AcquireConnection(Dts.Transaction);
         *  SqlConnection myADONETConnection = (SqlConnection)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Sales DB"].ReleaseConnection(rawConnection);
         *
         * Example of using a File connection manager
         *  object rawConnection = Dts.Connections["Prices.zip"].AcquireConnection(Dts.Transaction);
         *  string filePath = (string)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Prices.zip"].ReleaseConnection(rawConnection);
         * */
        #endregion

        ///
<summary>
        /// This method is called when this script task executes in the control flow.
        /// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
        /// To open Help, press F1.
        /// </summary>

        ///
        public void Main()
        {

            bool fireAgain = true;
            try
            {
                var conn1 = new OleDbConnection();
                string sqlServerInstance = Dts.Variables["server_name"].Value.ToString();
                conn1.ConnectionString =
                              "Driver=SQLOLEDB;" +
                              "Data Source=" + sqlServerInstance + ";" +
                              "Provider=SQLOLEDB;" +
                              "Initial Catalog=msdb;" +
                              "Integrated Security=SSPI;";
                conn1.Open();
                OleDbDataReader myDataReader1 = null;
                OleDbCommand LRO_Conn = new OleDbCommand(@"SELECT CAST(sjs.last_run_outcome as INT) as LRO
                                                                FROM msdb.dbo.sysjobs sj
                                                                LEFT JOIN msdb.dbo.sysjobservers  sjs ON sj.job_id = sjs.job_id
                                                                WHERE sj.name = ?
                                                                AND sj.enabled = 1", conn1);

                LRO_Conn.Parameters.Add("@name", OleDbType.VarChar, 150).Value = Dts.Variables["first_package_name"].Value;

                myDataReader1 = LRO_Conn.ExecuteReader();
                while (myDataReader1.Read())
                {
                    Dts.Variables["first_package_execution_status"].Value = myDataReader1["LRO"];
                }
                myDataReader1.Close();
                conn1.Close();
                Dts.Events.FireInformation(0, "first_package_execution_status variable value", Dts.Variables["first_package_execution_status"].Value.ToString(), String.Empty, 0, ref fireAgain);

                var conn2 = new OleDbConnection();
                string sqlServerInstance2 = Dts.Variables["server_name"].Value.ToString();
                conn2.ConnectionString =
                              "Driver=SQLOLEDB;" +
                              "Data Source=" + sqlServerInstance2 + ";" +
                              "Provider=SQLOLEDB;" +
                              "Initial Catalog=AdminDBA;" +
                              "Integrated Security=SSPI;";
                conn2.Open();

                OleDbDataReader myDataReader2 = null;
                OleDbCommand LRE_Conn = new OleDbCommand(@"SELECT	CASE WHEN EXISTS(
							                               SELECT * FROM
							                                        (SELECT TOP 1 e.ErrorMessage, e.ErrorDateTime,
							                                        LEFT(p.packagename+'.', CHARINDEX('.',p.packagename+'.')-1) AS packagename
							                                        FROM [AdminDBA].[dbo].[LogSSISErrors_Error] e
							                                        JOIN [AdminDBA].[dbo].[LogSSISErrors_Package] p
							                                        ON e.fkpackageid = p.id) a
							                                        WHERE a.packagename = ? AND a.ErrorDateTime BETWEEN
							                                        DATEADD(DAY, DATEDIFF(DAY, '19000101', GETDATE()), '19000101')
							                                        AND SYSDATETIME())
			                                            THEN 0 ELSE 1 END AS LRE", conn2);

                LRE_Conn.Parameters.Add("@packagename", OleDbType.VarChar, 150).Value = Dts.Variables["first_package_name"].Value;

                myDataReader2 = LRE_Conn.ExecuteReader();
                while (myDataReader2.Read())
                {
                    Dts.Variables["first_package_error_report"].Value = myDataReader2["LRE"];
                }
                myDataReader2.Close();
                conn2.Close();
                Dts.Events.FireInformation(0, "first_package_error_report variable value", Dts.Variables["first_package_error_report"].Value.ToString(), String.Empty, 0, ref fireAgain);

                bool b1 = Convert.ToBoolean(Dts.Variables["first_package_execution_status"].Value);
                bool b2 = Convert.ToBoolean(Dts.Variables["first_package_error_report"].Value);
                if (b1 != true)
                {
                    Dts.Variables["output_message"].Value = "SampleTask1 Failed!";
                    Dts.Variables["first_package_execution_status"].Value = 0;
                }
                if (b2 != true)
                {
                    Dts.Variables["output_message"].Value = "SampleTask1 Failed!";
                    Dts.Variables["first_package_error_report"].Value = 0;
                }
                else
                {
                    Dts.Variables["output_message"].Value = "SampleTask1 Succeeded!";
                    Dts.Variables["first_package_execution_status"].Value = 1;
                    Dts.Variables["first_package_error_report"].Value = 1;
                }
            }
            catch (Exception e)
            {
                Dts.Events.FireInformation(0, "Exception occured: ", e.ToString(), String.Empty, 0, ref fireAgain);
            }
        }
        #region ScriptResults declaration
        ///
<summary>
        /// This enum provides a convenient shorthand within the scope of this class for setting the
        /// result of the script.
        ///
        /// This code was generated automatically.
        /// </summary>

        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion
    }
}

This code not only queries the msdb database to determine the last execution status for the ‘SampleTask1′ package but also checks for any runtime error logs stored in the AdminDBA database which were generated by it in the last 24 hours. In this way, even if the package executed successfully, any errors captured and logged during runtime can form the basis for further execution workflow changes and this determination can be evaluated for and processed on the fly.

Based on that functionality, the subsequent package design needs to encompass an additional variable hence the inclusion of ‘first_package_error_report’ variable into the variables collection, first script task and precedence constraints as per the image below.

SSIS_Exec_Precedence_SampleTask2ModifiedVariables

In order to confirm this functionality we can add an ‘OnPostExecute Event’ breakpoint to observe the variable value change during execution but a quicker way would be to add FireInformation method for each variable passed to the precedence constraints so that their values are displayed at runtime in the output pane using a small snippet of C# as per below.

Dts.Events.FireInformation(0, "first_package_execution_status variable value", Dts.Variables["first_package_execution_status"].Value.ToString(), String.Empty, 0, ref fireAgain);

SSIS_Exec_Precedence_SampleTask2ModifiedRunTimeVarsValueOutput

Once both queries in the Script Task execute and the variables responsible for the package execution metadata and error checking are populated with their respective values we can dictate further package execution workflow with much greater control i.e. in the above example, raise a ‘Task Failed’ message if either of those variables are assigned the value of 0.

All packages and their solution files as well as SQL scripts used in this post can be downloaded from my OneDrive account under the following LINK if you wish to replicate this functionality in your own practice.

Tags: , , , , ,

Asynchronous SQL Execution via SQL Server Agent Jobs

May 27th, 2015 / No Comments » / by admin

Recently, one of the clients tasked me with creating a data acquisition routine for an externally hosted MySQL database data, a part of a larger ETL solution to merge dispersed data sources into a staging area for further data warehouse processing. The database size was very small i.e. just over a few gigabytes, distributed across around 35 tables with the largest object holding over 15 million records. Microsoft SQL Server 2014 instance served as the local data storage engine and all the code required to be Microsoft dialect of SQL i.e. Transact-SQL, with data flow and execution governed by Integration Services packages (SSIS). The interface to querying MySQL database data was through a linked server set up on the SQL Server instance using Oracle’s ODBC driver, over 100Mbps internet link. The requirements seemed quite easy to accommodate i.e. create a SSIS package to manage data acquisition pipelines, either through a collection of SSIS-specific transformations or T-SQL code. Also, a few preliminary tests showed that the connection speed should be robust enough to copy or even merge the data across source/target without much hassle.

Once the development commenced, I quickly learned that Integration Services may not have been the best tool for the job (more due to the ODBC driver stability rather then SSIS functionality). Regardless of how much reconfiguration and tweaking went into optimising SSIS package to handle moving the data across the linked server connection, the package could not cope with a few of the large tables, continually failing on execution with cryptic messages and confusing error logs. Likewise, running pure SQL MERGE statements often resulted in connection time-outs or buffer overflows, indicating that the ODBC driver was too flaky and unstable to handle long running queries. The only solution that seemed not to throw the connection out of balance was to abandon more convenient ‘upserts’ for truncations and inserts, broken down into multiple statements to dissect the data flow across the network. The less data needed to go through the network as a single transaction the more reliable the transfer was, with one caveat – the largest MySQL database table (15 million records) was taking nearly one hour to read from and insert into the local SQL Server instance. Regardless of how many batches the data acquisition job was comprised of, the performance oscillated around 55 minutes mark – an unacceptable result by both my and the client’s standards, even if proven to solve the reliability issues. As data acquisition failures occurrence increased with the source tables records count (when executed as a single transaction) and breaking it up into multiple batches seemed to provide the needed stability, albeit at the expense of prolonged execution, the one potential solution and a silver bullet to this conundrum was to find a half-point between the two approaches. That compromise turned out to be batch processing (stability) combined with asynchronous, concurrent execution (speed).

Microsoft SQL Server does not offer built-in asynchronous SQL processing – all statements part of a query execute in a sequential, non-parallel fashion. Even when looping through a process using a WHILE statement or a cursor, each SQL statement gets invoked in a synchronous manner. There are different approaches which can help achieving concurrent SQL execution, however, these require either a programming knowledge or a fair amount of inelegant hacking to circumvent this limitation. One simple solution that I found worked really well for my project was to create a number of SQL Server Agent jobs (using T-SQL only), which would get initialised in parallel to dissect the query thus ‘multi-threading’ the whole process and shaving a considerable amount of time off the final execution duration.

Let’s look at an example of two tables placed in two different databases simulating this event – first one containing ten million records of mock data and the second one containing no data which will become the target of the parallel INSERT operation. The following code creates two sample databases with their corresponding objects and populates the source table with dummy data.

--CREATE 'Source' AND 'Target' DATABASES
USE master;
GO
IF DB_ID('SourceDB') IS NOT NULL
    BEGIN
        ALTER DATABASE SourceDB SET SINGLE_USER
        WITH ROLLBACK IMMEDIATE;
        DROP DATABASE SourceDB;
    END;
GO
CREATE DATABASE SourceDB;
GO
ALTER DATABASE SourceDB SET RECOVERY SIMPLE;
GO
ALTER DATABASE SourceDB
MODIFY FILE
    (NAME = SourceDB,
    SIZE = 10240MB);
GO

USE master;
GO
IF DB_ID('TargetDB') IS NOT NULL
    BEGIN
        ALTER DATABASE TargetDB SET SINGLE_USER
        WITH ROLLBACK IMMEDIATE;
        DROP DATABASE TargetDB;
    END;
GO
CREATE DATABASE TargetDB;
GO
ALTER DATABASE TargetDB SET RECOVERY SIMPLE;
GO
ALTER DATABASE TargetDB
MODIFY FILE
    (NAME = TargetDB,
    SIZE = 10240MB);
GO

--CREATE 'Source' AND 'Target' TABLES
USE SourceDB;
CREATE TABLE dbo.SourceTable
    (
      ID INT IDENTITY(1, 1) NOT NULL ,
      Value1 INT NOT NULL ,
      Value2 INT NOT NULL ,
      Value3 DECIMAL(10, 2) NOT NULL ,
      Value4 DATETIME NOT NULL ,
      Value5 DATETIME NOT NULL ,
	  Value6 NVARCHAR (512) NOT NULL,
	  Value7 NVARCHAR (512) NOT NULL,
	  Value8 UNIQUEIDENTIFIER,
      CONSTRAINT PK_ID PRIMARY KEY CLUSTERED ( ID ASC )
    );

USE TargetDB;
CREATE TABLE dbo.TargetTable
    (
      ID INT IDENTITY(1, 1) NOT NULL ,
      Value1 INT NOT NULL ,
      Value2 INT NOT NULL ,
      Value3 DECIMAL(10, 2) NOT NULL ,
      Value4 DATETIME NOT NULL ,
      Value5 DATETIME NOT NULL ,
	  Value6 NVARCHAR (512) NOT NULL,
	  Value7 NVARCHAR (512) NOT NULL,
	  Value8 UNIQUEIDENTIFIER,
      CONSTRAINT PK_ID PRIMARY KEY CLUSTERED ( ID ASC )
    );

--POPULATE 'SourceTable' WITH TEST DATA
USE SourceDB;
SET NOCOUNT ON;
DECLARE @count INT = 0;
DECLARE @records_to_insert INT = 10000000;
WHILE @count < @records_to_insert
    BEGIN
        INSERT  INTO dbo.SourceTable
                ( Value1 ,
                  Value2 ,
                  Value3 ,
                  Value4 ,
                  Value5 ,
				  Value6 ,
				  Value7 ,
				  Value8
                )
                SELECT  10 * RAND() ,
                        20 * RAND() ,
                        10000 * RAND() / 100 ,
                        DATEADD(ss, @count, SYSDATETIME()) ,
                        CURRENT_TIMESTAMP ,
						REPLICATE(CAST(NEWID() AS NVARCHAR(MAX)),10) ,
						REPLICATE(CAST(RAND() AS NVARCHAR(MAX)),10) ,
						NEWID();
        SET @count = @count + 1;
    END;

Next let’s test out sequential INSERT performance, inserting all records from SourceDB database and SourceTable table into TargetDB database and TargetTable table. For this purpose I simply run INSERT statement, coping all the data across with no alterations or changes in the target object generating the following execution plan.

SET STATISTICS TIME ON;
GO
INSERT  INTO TargetDB.dbo.TargetTable
        ( Value1 ,
          Value2 ,
          Value3 ,
          Value4 ,
          Value5 ,
		  Value6 ,
		  Value7 ,
		  Value8
        )
        SELECT  Value1 ,
                Value2 ,
                Value3 ,
                Value4 ,
                Value5 ,
				Value6 ,
				Value7 ,
				Value8
        FROM    SourceDB.dbo.SourceTable;
GO
SET STATISTICS TIME OFF;
GO

Async_SQL_Exec_INSERT_Statement

Given that SQL Server 2014 edition offers some enhancements to SELECT…INTO execution, I think it’s worthwhile to also try it out as an alternative. If there is a provision to create the target table from scratch and the developer has the liberty to execute SELECT…INTO in place of an INSERT statement, Microsoft claims that this improvement will allow it to run in a parallel mode in SQL Server 2014 (as confirmed by the execution plan below), speeding up the execution considerably.

IF OBJECT_ID('TargetDB.dbo.TargetTable', 'U') IS NOT NULL
  BEGIN
	DROP TABLE TargetDB.dbo.TargetTable
  END

SET STATISTICS TIME ON;
GO
SELECT			Value1 ,
                Value2 ,
                Value3 ,
                Value4 ,
                Value5 ,
				Value6 ,
				Value7 ,
				Value8
		INTO	TargetDB.dbo.TargetTable
        FROM    SourceDB.dbo.SourceTable;
GO
SET STATISTICS TIME OFF;
GO

Async_SQL_Exec_SELECT_INTO_Statement

Finally, let look at the solution to enforce parallel queries execution, in this case also INSERT statement by means of ‘spinning up’ multiple SQL Server Agent jobs. The following code creates a stored procedure which ‘dissects’ all source table records into multiple, record-count comparable batches (the exact number controlled by the parameter value passed) and generates SQL Server Agent jobs executing in parallel.

USE [SourceDB]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE PROCEDURE [dbo].[usp_asyncTableInsert]
(@Source_DB_Name			VARCHAR				(128) ,
@Source_DB_Schema_Name		VARCHAR				(128) ,
@Source_DB_Object_Name		VARCHAR				(256) ,
@Target_DB_Name				VARCHAR				(128) ,
@Target_DB_Schema_Name		VARCHAR				(128) ,
@Target_DB_Object_Name		VARCHAR				(256) ,
@SQL_Exec_No				VARCHAR				(10))
AS
BEGIN
	SET NOCOUNT ON;

	/*
	Declare additional variables:
	(1) @SQL				- stores dynamic SQL string
	(2) @Error_Massage		- stores error message string
	(3) @Is_Debug			- stores a binary flag for allowing/disallowing displaying messages during procedure execution
	(4) @Check_Count		- stores the starting number for the cursor execution loop
	(5) @Max_Check_Count	- stores the maximum number for the cursor execution loop
	*/
    DECLARE @SQL				NVARCHAR(MAX)
	DECLARE @Error_Message		VARCHAR (4000)
	DECLARE @Is_Debug			INT = 1			

	/*
	Create temporary table to hold the ranges of values for dissecting
	the 'source' table into semi-equal chunks of data for further processing
	*/
    IF OBJECT_ID('tempdb..#Ids_Range') IS NOT NULL
		BEGIN
			DROP TABLE #Ids_Range;
        END;
    CREATE TABLE #Ids_Range
    (
    id SMALLINT IDENTITY(1, 1) ,
    range_FROM BIGINT ,
    range_TO BIGINT
    );				

	/*
	Create and populate temporary variables and #Ids_Range temporary table with value ranges
	This table contains value ranges i.e. pairs of values which will be used to build the WHERE
	clause of the INSERT statment, breaking down all the 'source' records into smaller chunks
	*/
	SET @SQL =			'DECLARE @R1 INT = (SELECT MIN(ID) AS ID FROM '									+CHAR(13)
	SET @SQL = @SQL +	''+@Source_DB_Name+'.'+@Source_DB_Schema_Name+'.'+@Source_DB_Object_Name+')'	+CHAR(13)
	SET @SQL = @SQL +	'DECLARE @R2 BIGINT = (SELECT (MAX(ID)-MIN(ID)+1)/'+@SQL_Exec_No+' AS ID FROM'	+CHAR(13)
	SET @SQL = @SQL +	''+@Source_DB_Name+'.'+@Source_DB_Schema_Name+'.'+@Source_DB_Object_Name+')'	+CHAR(13)
	SET @SQL = @SQL +	'DECLARE @R3 BIGINT = (SELECT MAX(ID) AS ID FROM '								+CHAR(13)
	SET @SQL = @SQL +	''+@Source_DB_Name+'.'+@Source_DB_Schema_Name+'.'+@Source_DB_Object_Name+')'	+CHAR(13)
	SET @SQL = @SQL +	'DECLARE @t int = @r2+@r2+2 '													+CHAR(13)
	SET @SQL = @SQL +	'INSERT INTO #Ids_Range '														+CHAR(13)
	SET @SQL = @SQL +	'(range_FROM, range_to) '														+CHAR(13)
	SET @SQL = @SQL +	'SELECT @R1, @R2 '																+CHAR(13)
	SET @SQL = @SQL +	'UNION ALL '																	+CHAR(13)
	SET @SQL = @SQL +	'SELECT @R2+1, @R2+@R2+1 '														+CHAR(13)
	SET @SQL = @SQL +	'WHILE @t<=@r3 '																+CHAR(13)
	SET @SQL = @SQL +	'BEGIN '																		+CHAR(13)
	SET @SQL = @SQL +	'INSERT INTO #Ids_Range '														+CHAR(13)
	SET @SQL = @SQL +	'(range_FROM, range_to) '														+CHAR(13)
	SET @SQL = @SQL +	'SELECT @t, CASE WHEN (@t+@r2)>=@r3 THEN @r3 ELSE @t+@r2 END '					+CHAR(13)
	SET @SQL = @SQL +	'SET @t = @t+@r2+1 '															+CHAR(13)
	SET @SQL = @SQL +	'END'																			+CHAR(13)

	EXEC(@SQL)

	/*
	Truncate Target table if any data exists
	*/
	SET @SQL =			'IF EXISTS (SELECT TOP 1 1 FROM '														+CHAR(13)
	SET @SQL = @SQL +	''+@Target_DB_Name+'.'+@Target_DB_Schema_Name+'.'+@Target_DB_Object_Name+''				+CHAR(13)
	SET @SQL = @SQL +	'WHERE ID IS NOT NULL) BEGIN '															+CHAR(13)
	SET @SQL = @SQL +	'TRUNCATE TABLE '																		+CHAR(13)
	SET @SQL = @SQL +	''+@Target_DB_Name+'.'+@Target_DB_Schema_Name+'.'+@Target_DB_Object_Name+' END'			+CHAR(13)

	EXEC(@SQL)

	/*
	Create temporary #Temp_Tbl_AgentJob_Stats table to store SQL Server Agent job
	*/
	IF OBJECT_ID('tempdb..#Temp_Tbl_AgentJob_Stats') IS NOT NULL
    BEGIN
		DROP TABLE #Temp_Tbl_AgentJob_Stats;
    END;
    CREATE TABLE #Temp_Tbl_AgentJob_Stats
    (
    ID SMALLINT IDENTITY(1, 1) ,
    Job_Name VARCHAR(256) ,
    Job_Exec_Start_Date DATETIME
    );		

	/*
	Create a cursor and a range of variables to define SQL execution string
	to create and manage SQL Server Agent jobs (exact count defined by @SQL_Exec_No
	varaiable). This SQL splits the INSERT statment into multiple, concurrently-executing
	batches which run in asunchronous mode. This part of code also manages stopping
	and deleting already created jobs in case a re-run is required as well as deleting
	SQL Agent jobs created as part of this process on successful completion
	*/
	IF CURSOR_STATUS('global', 'sp_cursor') >= -1
		BEGIN
			DEALLOCATE sp_cursor
		END
		DECLARE @z INT
		DECLARE @err INT
		DECLARE sp_cursor CURSOR
		FOR
		SELECT id FROM #ids_range
		SELECT  @err = @@error
        IF @err <> 0
			BEGIN
				DEALLOCATE sp_cursor
				RETURN @err
            END
		OPEN sp_cursor
		FETCH NEXT
		FROM sp_cursor INTO @z
		WHILE @@FETCH_STATUS = 0
			BEGIN
				DECLARE
				@range_from		VARCHAR(10)		= (SELECT CAST(range_FROM AS VARCHAR(10)) FROM #ids_range where id = @z),
				@range_to		VARCHAR(10)		= (SELECT CAST(range_TO AS VARCHAR(10)) FROM #ids_range where id = @z),
				@job_name		VARCHAR (256)	= 'Temp_'+UPPER(LEFT(@Target_DB_Object_Name,1))+LOWER(SUBSTRING(@Target_DB_Object_Name,2,LEN(@Target_DB_Object_Name)))+'_TableSync_'+'AsyncJob'+'_'+CAST(@z AS VARCHAR (20)),
				@job_owner		VARCHAR (256)	= 'sa'
				DECLARE
				@sql_job_delete VARCHAR (400)	= 'EXEC msdb..sp_delete_job @job_name='''''+@job_name+''''''
				DECLARE
				@sql_job		NVARCHAR(MAX)	=
				'INSERT  INTO TargetDB.dbo.TargetTable
				(Value1, Value2, Value3, Value4, Value5, Value6, Value7,Value8)
				SELECT Value1, Value2, Value3, Value4, Value5, Value6, Value7, Value8
				FROM SourceDB.dbo.SourceTable
				WHERE id >= '+cast(@range_FROM as varchar (20))+' AND ID <= '+cast(@range_to as varchar(20))+''

				SET @SQL =			'IF EXISTS'
				SET @SQL = @SQL +	'(SELECT TOP 1 1 FROM msdb..sysjobs_view job JOIN msdb.dbo.sysjobactivity activity'					+CHAR(13)
				SET @SQL = @SQL +	'ON job.job_id = activity.job_id WHERE job.name = N'''+@job_name+''''								+CHAR(13)
				SET @SQL = @SQL +	'AND activity.start_execution_date IS NOT NULL AND activity.stop_execution_date IS NULL)'			+CHAR(13)
				SET @SQL = @SQL +	'BEGIN'																								+CHAR(13)
				SET @SQL = @SQL +	'EXEC msdb..sp_stop_job @job_name=N'''+@job_name+''';'												+CHAR(13)
				SET @SQL = @SQL +	'EXEC msdb..sp_delete_job @job_name=N'''+@job_name+''', @delete_unused_schedule=1'					+CHAR(13)
				SET @SQL = @SQL +	'END'																								+CHAR(13)
				SET @SQL = @SQL +	'IF EXISTS'																							+CHAR(13)
				SET @SQL = @SQL +	'(SELECT TOP 1 1 FROM msdb..sysjobs_view job JOIN msdb.dbo.sysjobactivity activity'					+CHAR(13)
				SET @SQL = @SQL +	'ON job.job_id = activity.job_id WHERE job.name = N'''+@job_name+''''								+CHAR(13)
				SET @SQL = @SQL +	'AND activity.start_execution_date IS NULL AND activity.stop_execution_date IS NOT NULL)'			+CHAR(13)
				SET @SQL = @SQL +	'BEGIN'																								+CHAR(13)
				SET @SQL = @SQL +	'EXEC msdb..sp_delete_job @job_name=N'''+@job_name+''', @delete_unused_schedule=1'					+CHAR(13)
				SET @SQL = @SQL +	'END'																								+CHAR(13)
				SET @SQL = @SQL +	'EXEC msdb..sp_add_job '''+@job_name+''', @owner_login_name= '''+@job_owner+''';'					+CHAR(13)
				SET @SQL = @SQL +	'EXEC msdb..sp_add_jobserver @job_name= '''+@job_name+''';'											+CHAR(13)
				SET @SQL = @SQL +	'EXEC msdb..sp_add_jobstep @job_name='''+@job_name+''', @step_name= ''Step1'', '					+CHAR(13)
				SET @SQL = @SQL +	'@command = '''+@sql_job+''', @database_name = '''+@Target_DB_Name+''', @on_success_action = 3;'	+CHAR(13)
				SET @SQL = @SQL +	'EXEC msdb..sp_add_jobstep @job_name = '''+@job_name+''', @step_name= ''Step2'','					+CHAR(13)
				SET @SQL = @SQL +   '@command = '''+@sql_job_delete+''''																+CHAR(13)
				SET @SQL = @SQL +	'EXEC msdb..sp_start_job @job_name= '''+@job_name+''''												+CHAR(13)
																																		+REPLICATE(CHAR(13),4)
				EXEC (@SQL)

				/*
				Wait for the job to register in the SQL Server metadata sys views 
				*/
				WAITFOR DELAY '00:00:01'					

				INSERT INTO #Temp_Tbl_AgentJob_Stats
				(Job_Name, Job_Exec_Start_Date)
				SELECT job.Name, activity.start_execution_date
				FROM msdb.dbo.sysjobs_view job
				INNER JOIN msdb.dbo.sysjobactivity activity
				ON job.job_id = activity.job_id
				WHERE job.name = @job_name    

                FETCH NEXT
				FROM sp_cursor INTO @z
			END
			CLOSE sp_cursor
			DEALLOCATE sp_cursor
END													

Running the stored procedure (with the prior target table truncation) will generate as many INSERT statements as specified by the value of @SQL_Exec_No parameter passed. When executed, we can observe multiple instances of the SQL statement inserting records falling between certain ID ranges, here shown as an output from Adam Mechanic’s sp_whoisactive stored procedure as well as SQL Server Agent jobs view. Given that the value of @SQL_Exec_No parameter was 5, there were five instances of this statement running in parallel as per image below.

Async_SQL_Exec_StoredProc_Agent_Parallel_View

Looking at the execution times (run on my trusty Lenovo x240 laptop) and the differences between individual approaches, it is evident that the enhancements made in SQL Server 2014 edition regarding parallelisation of SELECT…INTO statement are profound, however, given the optimizer taking advantage of multiple cores I also noticed a large spike in CPU utilization i.e. 20% for INSERT and 90% for SELECT…INTO. If, on the other hand, SELECT…INTO is not an option and a substantial amount of data is to be copied as part of a long-running transaction, asynchronous execution via SQL Server Agent jobs seems like a good solution.

Async_SQL_Exec_Speed_Comparison

I have only scratched the surface of how SQL Server Agent jobs can facilitate workload distribution across the server to speed up data processing and take advantage of SQL Server available resource pool. There are other ways to achieve the same result but given the simplicity and the ease of development/provisioning I think that taking advantage of this technique can provide a quick and robust workaround to parallel SQL statements execution with SQL Server.

Tags: ,