Designing data acquisition framework in SQL Server and SSIS – how to source and integrate external data for a decision support system or data warehouse (Part 4)

May 25th, 2016 / No Comments » / by admin

Note: Part 1 to this series can be found HERE, Part 2 HERE, Part 3 HERE and all the code and additional files for this post can be downloaded from my OneDrive folder HERE.

Closing this series with the final post, this part focuses on the SSIS package structure and ties up all the code and architecture described in previous posts. The SQL Server Integration Services solution featured here is very minimalistic as all the heavy lifting is done by the code described mainly in parts two and three. The package is meant to mainly provide a structure or framework to execute individual components and control the workflow and, beyond this functionality, does not utilise any features which facilitate data acquisition e.g. SSIS lookups, merges, conditional splits, data conversion or aggregation etc. There is a little bit of C# code driving some transformations, but most of the components that constitute the package’s structure act as triggers to external code execution only.

The whole package (Control Flow as there is no Data Flow transformations) consists mainly of Execute SQL Task transformations, with a few Script Tasks here and there as per the image below.

Data_Acquisition_Framework_Part4_Complete_SSIS_Package

With the exception of two For Each Loop containers which, as the name suggests, loop through small and large tables, all components execute in a sequential order and once only. Most transformations are connected using Precedence Constraints based on two evaluation operators: Expression and Constraint, as per the image below.

Data_Acquisition_Framework_Part4_SSIS_Precedence_Constraint_View

This ensures that all the tasks executed not only are required to finish with the status of SUCCESS but also that the underlying code (a stored procedure in most cases) has completed without errors i.e. the returned variable’s @Is_All_OK value is one (1). If the value of @Is_All_OK variable is not one (alternative value is zero), the package workflow should steer into executing task(s) responsible for notifying administrators of the potential issue and terminating subsequent tasks execution.

Let’s go over the package skeleton and how some of the transformations are structured starting with the two top components i.e. ‘Set_All_OK and Sync_Exec_StartTime Variables Values’ and ‘Set DBMail_Receipients Variable Value’. The first transformation simply populates two variables which are used in subsequent package workflow – The @Sync_Exec_Start which is storing package start date and time and @Is_All_OK one which is preliminarily set to zero (0). The following C# snippet is used for this functionality.

#region Help:  Introduction to the script task
/* The Script Task allows you to perform virtually any operation that can be accomplished in
 * a .Net application within the context of an Integration Services control flow. 
 * 
 * Expand the other regions which have "Help" prefixes for examples of specific ways to use
 * Integration Services features within this script task. */
#endregion


#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
#endregion

namespace ST_44c20947eb6c4c5cb2f698bdd17b3534
{
    /// <summary>
    /// ScriptMain is the entry point class of the script.  Do not change the name, attributes,
    /// or parent of this class.
    /// </summary>
    [Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
    public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
    {
        #region Help:  Using Integration Services variables and parameters in a script
        /* To use a variable in this script, first ensure that the variable has been added to 
         * either the list contained in the ReadOnlyVariables property or the list contained in 
         * the ReadWriteVariables property of this script task, according to whether or not your
         * code needs to write to the variable.  To add the variable, save this script, close this instance of
         * Visual Studio, and update the ReadOnlyVariables and 
         * ReadWriteVariables properties in the Script Transformation Editor window.
         * To use a parameter in this script, follow the same steps. Parameters are always read-only.
         * 
         * Example of reading from a variable:
         *  DateTime startTime = (DateTime) Dts.Variables["System::StartTime"].Value;
         * 
         * Example of writing to a variable:
         *  Dts.Variables["User::myStringVariable"].Value = "new value";
         * 
         * Example of reading from a package parameter:
         *  int batchId = (int) Dts.Variables["$Package::batchId"].Value;
         *  
         * Example of reading from a project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].Value;
         * 
         * Example of reading from a sensitive project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].GetSensitiveValue();
         * */

        #endregion

        #region Help:  Firing Integration Services events from a script
        /* This script task can fire events for logging purposes.
         * 
         * Example of firing an error event:
         *  Dts.Events.FireError(18, "Process Values", "Bad value", "", 0);
         * 
         * Example of firing an information event:
         *  Dts.Events.FireInformation(3, "Process Values", "Processing has started", "", 0, ref fireAgain)
         * 
         * Example of firing a warning event:
         *  Dts.Events.FireWarning(14, "Process Values", "No values received for input", "", 0);
         * */
        #endregion

        #region Help:  Using Integration Services connection managers in a script
        /* Some types of connection managers can be used in this script task.  See the topic 
         * "Working with Connection Managers Programatically" for details.
         * 
         * Example of using an ADO.Net connection manager:
         *  object rawConnection = Dts.Connections["Sales DB"].AcquireConnection(Dts.Transaction);
         *  SqlConnection myADONETConnection = (SqlConnection)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Sales DB"].ReleaseConnection(rawConnection);
         *
         * Example of using a File connection manager
         *  object rawConnection = Dts.Connections["Prices.zip"].AcquireConnection(Dts.Transaction);
         *  string filePath = (string)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Prices.zip"].ReleaseConnection(rawConnection);
         * */
        #endregion


        /// <summary>
        /// This method is called when this script task executes in the control flow.
        /// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
        /// To open Help, press F1.
        /// </summary>
        /// 
        public void Main()
        {
            // TODO: Add your code here
            DateTime saveNow = DateTime.Now;
            Dts.Variables["Is_All_OK"].Value = 0;
            Dts.Variables["Sync_Exec_StartTime"].Value = saveNow;
            Dts.TaskResult = (int)ScriptResults.Success;
        }

        #region ScriptResults declaration
        /// <summary>
        /// This enum provides a convenient shorthand within the scope of this class for setting the
        /// result of the script.
        /// 
        /// This code was generated automatically.
        /// </summary>
        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion

    }
}

The ‘Set DBMail_Receipients Variable Value’ task uses a SQL function described in part 1 to fetch e-mail addresses stored in one of the control tables and map those to one of the package variables used for sending out notifications in the event of any errors (see image below).

Data_Acquisition_Framework_Part4_SSIS_Email_Notification_Function_ExecuteSQLTask

Continuing on, we have a bunch of Execute SQL Task transformations, all simply triggering stored procedures executions (code outlined in part 3) while passing a number of parameter values or reading values passed back into the package from the underlying code. Some of the SSIS variables are predefined with their respective values, whereas others are assigned to values as the package executes. Below is a sample image depicting parameter mapping for a task responsible for schema definition changes check.

Data_Acquisition_Framework_Part4_SSIS_Task_Parameter_Mapping

As we can see, a number of input parameters are mapped to user pre-defined variables. A full list of package-defined variables is as per image below (click on image to enlarge).

Data_Acquisition_Framework_Part4_SSIS_Package_Variables_View

Moving on, after all pre-acquisition tasks have been completed successfully, a Sequence Container encompassing actual data acquisition tasks is placed. This transformation is broken down into two groups – one for small tables (running dynamic merge statement) and one for larger ones (running parallel INSERTS). Each of those tasks runs in a For Each Loop container, iterating through object names assigned based on the query from the preceding tasks which populate ‘Object’ data type variables as per image below.

Data_Acquisition_Framework_Part4_SSIS_Get_Big_Tables_SQL_Task

Once schema and table names are retrieved, this data is passed into the looping transformation which executes data acquisition routines for each combination of schema and table name as per image below.

Data_Acquisition_Framework_Part4_SSIS_ForEachLoop_Editor

Both data acquisition routines, even though looping through each table in sequential order, execute in simultaneously until all objects marked for synchronisation have been accounted for. Since both data acquisition stored procedures are designed not to halt data copying process in case of any exception occurring, but rather log any issues and continue on, this sequence container also links up with a package size error triggering process which assigns predefined values to error-related variables using Script Task and send out an e-mail notification to nominated e-mail addresses. The code snippet below provides predefined set of values for further error notification routine execution.

#region Help:  Introduction to the script task
/* The Script Task allows you to perform virtually any operation that can be accomplished in
 * a .Net application within the context of an Integration Services control flow. 
 * 
 * Expand the other regions which have "Help" prefixes for examples of specific ways to use
 * Integration Services features within this script task. */
#endregion


#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
#endregion

namespace ST_7be0163238e6428e99c1178dd2a1c435
{
    /// <summary>
    /// ScriptMain is the entry point class of the script.  Do not change the name, attributes,
    /// or parent of this class.
    /// </summary>
	[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
	public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
	{
        #region Help:  Using Integration Services variables and parameters in a script
        /* To use a variable in this script, first ensure that the variable has been added to 
         * either the list contained in the ReadOnlyVariables property or the list contained in 
         * the ReadWriteVariables property of this script task, according to whether or not your
         * code needs to write to the variable.  To add the variable, save this script, close this instance of
         * Visual Studio, and update the ReadOnlyVariables and 
         * ReadWriteVariables properties in the Script Transformation Editor window.
         * To use a parameter in this script, follow the same steps. Parameters are always read-only.
         * 
         * Example of reading from a variable:
         *  DateTime startTime = (DateTime) Dts.Variables["System::StartTime"].Value;
         * 
         * Example of writing to a variable:
         *  Dts.Variables["User::myStringVariable"].Value = "new value";
         * 
         * Example of reading from a package parameter:
         *  int batchId = (int) Dts.Variables["$Package::batchId"].Value;
         *  
         * Example of reading from a project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].Value;
         * 
         * Example of reading from a sensitive project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].GetSensitiveValue();
         * */

        #endregion

        #region Help:  Firing Integration Services events from a script
        /* This script task can fire events for logging purposes.
         * 
         * Example of firing an error event:
         *  Dts.Events.FireError(18, "Process Values", "Bad value", "", 0);
         * 
         * Example of firing an information event:
         *  Dts.Events.FireInformation(3, "Process Values", "Processing has started", "", 0, ref fireAgain)
         * 
         * Example of firing a warning event:
         *  Dts.Events.FireWarning(14, "Process Values", "No values received for input", "", 0);
         * */
        #endregion

        #region Help:  Using Integration Services connection managers in a script
        /* Some types of connection managers can be used in this script task.  See the topic 
         * "Working with Connection Managers Programatically" for details.
         * 
         * Example of using an ADO.Net connection manager:
         *  object rawConnection = Dts.Connections["Sales DB"].AcquireConnection(Dts.Transaction);
         *  SqlConnection myADONETConnection = (SqlConnection)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Sales DB"].ReleaseConnection(rawConnection);
         *
         * Example of using a File connection manager
         *  object rawConnection = Dts.Connections["Prices.zip"].AcquireConnection(Dts.Transaction);
         *  string filePath = (string)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Prices.zip"].ReleaseConnection(rawConnection);
         * */
        #endregion


		/// <summary>
        /// This method is called when this script task executes in the control flow.
        /// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
        /// To open Help, press F1.
        /// </summary>
		public void Main()
		{
			// TODO: Add your code here
            Dts.Variables["Error_Message"].Value = "Execution of the Mizzen data acquisition package failed on an unspecyfied step, most likely due to an SSIS transformation failure. Please troubleshoot.";
            Dts.Variables["Process_Name"].Value = "Non-specyfic SSIS Job Transformation Failure";
            Dts.Variables["Object_Name"].Value = "N/A";
            Dts.TaskResult = (int)ScriptResults.Success;
		}

        #region ScriptResults declaration
        /// <summary>
        /// This enum provides a convenient shorthand within the scope of this class for setting the
        /// result of the script.
        /// 
        /// This code was generated automatically.
        /// </summary>
        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion

	}
}

Following on, the rest of the tasks are responsible for index recreation, statistics refresh and performing a number of rudimentary checks to ensure the data copied across is consistent with the source environment. Recreating indexes task is utilising the same stored procedure as the one responsible for dropping them, the only difference is the argument passed to the underlying code i.e. instead DROP we are using CREATE statement.

Data_Acquisition_Framework_Part4_SSIS_Index_SQL_Statement

Next up we have another Execute SQL Task for updating table statistics and completing acquisition process a simple Execute Script task to store core package execution tasks completion date and time. This Execute Script task carries almost identical code to the one used to log package execution start date and time, the only difference being the variable name used i.e. ‘Sync_Exec_EndTime’ rather then ‘Sync_Exec_StartTime’.

public void Main()
        {
            // TODO: Add your code here
            DateTime saveNow = DateTime.Now;
            Dts.Variables["Sync_Exec_EndTime"].Value = saveNow;
            Dts.TaskResult = (int)ScriptResults.Success;
        }

Final section of this workflow deals with record count comparison between source and target and querying AdminDBA database for any errors that may have been logged. Details of the actual code is explained in part 3 to this series so for the sake of succinctness I will refrain from repeating myself here, but hopefully by now you have a fairly good understanding of how the package works and that, in its core, it is just a shell to control individual steps execution workflow. The whole package, as previously mentioned, can be downloaded from my OneDrive folder HERE.

To conclude, this short series should provide anyone versed in Microsoft Business Intelligence tools with a basic framework used to source transactional data for staging purposes. As a blueprint, many steps outlined in previous parts can be skipped or extended depending on your business needs as this implementation was ‘ripped out’ one of my clients’ environments and may not be suitable in its entirety as a drop-in solution in other projects. However, unless streaming or near-real time staging data is a requirement, this sort of framework can become and good springboard for the majority of projects, allowing for further modifications and tweaking if required.

Tags: , , ,

Designing data acquisition framework in SQL Server and SSIS – how to source and integrate external data for a decision support system or data warehouse (Part 3)

May 25th, 2016 / No Comments » / by admin

Note: Part 1 to this series can be found HERE, Part 2 HERE, Part 4 HERE and all the code and additional files for this post can be downloaded from my OneDrive folder HERE.

Continuing on from Part 2 to this series, having set up all the routines dealing with pre-acquisition activities as well as large tables migration, this post focuses on smaller tables which do not require partitioning and parallel data load. I will also cover post-acquisition activities which run to tie up all the loose ends and ensure data consistency and readiness for business use. All in all, the following tasks will be covered in detail in this post.

Data_Acquisition_Framework_Part2_SSIS_Package_Bottom_Level_Overview

Small Tables Acquisition Tasks Overview and Code

Since some tables may contain only a few records, spinning up multiple SQL Server Agent jobs (see Part 2) creates unnecessary overhead. In cases where only few records need to be copied across, it is easier to execute a simple MERGE SQL statement which performs simultaneous UPDATE and INSERT statement based on referenced tables column names. To avoid individual column-to-column mapping for each table we can query database metadata and, providing a primary key is present on source and target tables, we can reference each column is an automated fashion.  The following stored procedure allows for dynamic MERGE SQL statement creation which elevates creating table-to-table row-level mapping. I wrote about this type of data replication in my previous post HERE, where similar stored procedure was used to copy data across from database X to database Y on the same SQL Server instance. In order to modify that setup and make it applicable to the current scenario i.e. remote host running MySQL database, a few changes were necessary. Most importantly, all of the queries related to the source database metadata or/and data had to be modified to include the OPENQUERY statement. Secondly, allowances needed to be made in order to enable MySQL and SQL Server data types and certain syntax conventions conformance e.g. certain MySQL reserved words need to be encapsulated in backtick quotes in the OPENQUERY statements in order to be validated and recognized by SQL Server. Likewise, certain SQL Server reserved words need to be used with square brackets delimiters. Finally, any MySQL metadata related queries had to conform to its internal database catalog architecture i.e. metadata database storing information about the MySQL server such as the name of a database or table, the data type of a column, or access privileges.

USE [StagingDB]
GO

 
CREATE PROCEDURE [dbo].[usp_runRemoteSvrDBSchemaSyncSmallTablesMaster]
    (
      @Remote_Server_Name						SYSNAME ,
	  @Remote_Server_DB_Name					VARCHAR				(128) ,
      @Remote_Server_DB_Schema_Name				VARCHAR				(128) ,
      @Target_DB_Name							VARCHAR				(128) ,
      @Target_DB_Schema_Name					VARCHAR				(128) ,
      @Target_DB_Object_Name					VARCHAR				(128) ,
      @Exec_Instance_GUID						UNIQUEIDENTIFIER ,
      @Package_Name								VARCHAR				(256)
    )
    WITH RECOMPILE
AS
    SET NOCOUNT ON 
	BEGIN


    DECLARE		@IsDebugMode					BIT 
	DECLARE 	@ExecSQL						NVARCHAR(MAX) 
	DECLARE 	@Err_Msg						NVARCHAR(1000) 
	DECLARE 	@Remote_DB_Object_Name			VARCHAR (128)		= @Target_DB_Object_Name
	DECLARE 	@Exec_Instance_GUID_As_Nvarchar NVARCHAR(56) = ( SELECT REPLACE(CAST (@Exec_Instance_GUID AS NVARCHAR(56)), '-', '')) 

    SET @IsDebugMode = 1
 		
/*====================================================================================
                                CREATE TEMP TABLES                          
======================================================================================*/

    IF OBJECT_ID('tempdb..#Src_Tgt_Tables') IS NOT NULL
        BEGIN
            DROP TABLE [#Src_Tgt_Tables]
        END 
    CREATE TABLE #Src_Tgt_Tables
        (
          [Data_Obj_Id] [INT] NOT NULL ,
          [Src_Tgt_Flag] [VARCHAR](1) NOT NULL ,
          [Object_Id] [INT] NOT NULL ,
          [Object_Name] [sysname] NOT NULL ,
          [Schema_Name] [sysname] NOT NULL ,
          [Schema_Object_Name] [VARCHAR](260) NOT NULL ,
          [Column_Id] [SMALLINT] NULL ,
          [Column_Name] [VARCHAR](200) NULL ,
          [IsIdentity] [TINYINT] NULL ,
          [IsComputed] [TINYINT] NULL ,
          [IsNullable] [TINYINT] NULL ,
          [Default] [VARCHAR](MAX) NULL ,
          [DataType] [VARCHAR](152) NULL ,
          [DataType_CastGroup] [VARCHAR](134) NOT NULL ,
          [Collation_Name] [sysname] NULL ,
          [RemoteObject_DataType] [VARCHAR](152) NULL
        )
		  

    IF OBJECT_ID('tempdb..#Tgt_NK_Cols') IS NOT NULL
        BEGIN
            DROP TABLE [#Tgt_NK_Cols]
        END  
    CREATE TABLE #Tgt_NK_Cols
        (
          [Data_Obj_Id] [INT] NOT NULL ,
          [Schema_Object_Name] [VARCHAR](260) NOT NULL ,
          [Where_Clause] [VARCHAR](MAX) NULL ,
          S_Schema_Object_Name [VARCHAR](260) NOT NULL
        )
 
/*======================================================================================
					PERFORM DATABASES, SCHEMAS AND OBJECT CHECKS                           
======================================================================================*/ 
    IF OBJECT_ID('tempdb..#Objects_List') IS NOT NULL
        BEGIN
            DROP TABLE [#Objects_List]
        END  
    CREATE TABLE #Objects_List
        (
          DatabaseName sysname ,
          SchemaName sysname ,
          ObjectName sysname ,
		  Is_Source_Target VARCHAR (56)
        )
 
    SET @ExecSQL = 'SELECT table_catalog, table_schema, table_name, ''Target'' as Is_Source_Target
					FROM INFORMATION_SCHEMA.tables 
					WHERE table_type = ''base table'' 
					and table_catalog = '''+@Target_DB_Name+'''
					and table_schema = '''+@Target_DB_Schema_Name+'''
					and table_name = '''+@Target_DB_Object_Name+''''	
	
	IF @IsDebugMode = 1
		BEGIN
			PRINT 'SQL statement for acquiring ''target'' table base data into #Objects_List temp table:'
			PRINT '------------------------------------------------------------------------------------'
			PRINT @ExecSQL +REPLICATE(CHAR(13),2) 
		END	

	INSERT  INTO #Objects_List (DatabaseName, SchemaName, ObjectName, Is_Source_Target)
	EXEC (@ExecSQL)
			
	IF @IsDebugMode = 1
		BEGIN
			SELECT * FROM #Objects_List WHERE Is_Source_Target = 'Target'
		END

	SET @ExecSQL =	
						'
						SELECT DatabaseName, SchemaName, ObjectName, ''Source'' as Is_Source_Target
						FROM OPENQUERY ('+@Remote_Server_Name+', ''select table_schema as DatabaseName, table_schema as SchemaName, table_name as ObjectName
						from information_schema.tables 
						WHERE table_type = ''''BASE TABLE'''' 
						and table_name = '''''+@Remote_DB_Object_Name+'''''
						and table_schema ='''''+@Remote_Server_DB_Name+''''''')'
							
	IF @IsDebugMode = 1
		BEGIN
			PRINT 'SQL statement for acquiring ''source'' table base data into #Objects_List temp table:'
			PRINT '------------------------------------------------------------------------------------'
			PRINT @ExecSQL +REPLICATE(CHAR(13),2) 			
		END

	INSERT  INTO #Objects_List (DatabaseName, SchemaName, ObjectName, Is_Source_Target)
	EXEC (@ExecSQL)	

	IF @IsDebugMode = 1
		BEGIN
			SELECT * FROM #Objects_List WHERE Is_Source_Target = 'Source'
		END


    
	IF @IsDebugMode = 1
		BEGIN
			SELECT	Source_Server_Name			= @Remote_Server_Name,
					Source_Server_DB_Name		= @Remote_Server_DB_Name,
					Source_Object_Name			= @Remote_DB_Object_Name,
					Target_DB_Name				= @Target_DB_Name,
					Target_DB_Schema_Name		= @Target_DB_Schema_Name,
					Target_DB_Object_Name		= @Target_DB_Object_Name
		END
    IF NOT EXISTS ( SELECT  TOP 1 1
                    FROM    #Objects_List a
                    WHERE   a.DatabaseName = @Remote_Server_DB_Name AND a.Is_Source_Target = 'Source' )
        BEGIN
            SET @Err_Msg = 'Source database cannot be found. You nominated "'
                + @Remote_Server_DB_Name + '". 
                Check that the database of that name exists on the instance'
            RAISERROR (
        @Err_Msg  -- Message text.
        ,16 -- Severity.
            ,1 -- State.
  )
            RETURN
        END   
 
    IF NOT EXISTS ( SELECT  1
                    FROM    #Objects_List a
                    WHERE   a.DatabaseName = @Target_DB_Name AND a.Is_Source_Target = 'Target')
        BEGIN
            SET @Err_Msg = 'Target database cannot be found. You nominated "'
                + @Target_DB_Name + '". 
                Check that the database of that name exists on the instance'
            RAISERROR (
        @Err_Msg  -- Message text.
        ,16 -- Severity.
            ,1 -- State.
  )
            RETURN
        END   
 
    IF NOT EXISTS ( SELECT TOP 1 1
                    FROM    #Objects_List a
                    WHERE   a.SchemaName = @Remote_Server_DB_Schema_Name AND  a.Is_Source_Target = 'Source' )
        BEGIN
            SET @Err_Msg = 'Source schema cannot be found. You nominated "'
                + @Remote_Server_DB_Schema_Name + '". 
                Check that the schema of that name exists on the database'
            RAISERROR (
        @Err_Msg  -- Message text.
        ,16 -- Severity.
            ,1 -- State.
  )
        END  
 
    IF NOT EXISTS ( SELECT  TOP 1 1
                    FROM    #Objects_List a
                    WHERE   a.SchemaName = @Target_DB_Schema_Name AND a.Is_Source_Target = 'Target' )
        BEGIN
            SET @Err_Msg = 'Target schema cannot be found. You nominated "'
                + @Target_DB_Schema_Name + '". 
                Check that the schema of that name exists on the database'
            RAISERROR (
        @Err_Msg  -- Message text.
        ,16 -- Severity.
            ,1 -- State.
  )
            RETURN
        END 
 
    IF NOT EXISTS ( SELECT TOP 1 1
                    FROM    #Objects_List a
                    WHERE   a.ObjectName = @Remote_DB_Object_Name AND a.Is_Source_Target = 'Source')
        BEGIN
            SET @Err_Msg = 'Source object cannot be found. You nominated "'
                + @Remote_DB_Object_Name + '". 
                Check that the object of that name exists on the database'
            RAISERROR (
        @Err_Msg  -- Message text.
        ,16 -- Severity.
            ,1 -- State.
  )
            RETURN
        END
 
    IF NOT EXISTS ( SELECT  1
                    FROM    #Objects_List a
                    WHERE   a.ObjectName = @Target_DB_Object_Name AND a.Is_Source_Target = 'Target')
        BEGIN
            SET @Err_Msg = 'Target object cannot be found. You nominated "'
                + @Target_DB_Object_Name + '". 
                Check that the object of that name exists on the database'
            RAISERROR (
        @Err_Msg  -- Message text.
        ,16 -- Severity.
            ,1 -- State.
  )
            RETURN
        END
 
 
/*======================================================================================
					EXTRACT SOURCE DATABASE DATA FOR THE GIVEN OBJECT                          
======================================================================================*/

SET @ExecSQL =
     'INSERT INTO
    #Src_Tgt_Tables
    SELECT
    [Data_Object_Id]        =   so.id,
    [Src_Tgt_Flag]          =   src_tgt.Src_Trg_Flag,
    [Object_Id]             =   so.id,
    [Object_Name]           =   so.name,
    [Schema_Name]           =   sh.name,
    [Schema_Object_Name]    =   ''[' + @Target_DB_Name + '].[''+sh.name+''].[''+so.name+'']'',
    [Column_Id]             =   sc.column_id,
    [Column_Name]           =   sc.name,
    [IsIdentity]            =   sc.is_identity,
    [IsComputed]            =   sc.is_computed,
    [IsNullable]            =   sc.is_nullable,
    [Default]               =   dc.definition,
    [DataType]              =   (
                                CASE
                                WHEN T.system_type_id IN (167, 175, 231, 239) AND SC.max_length &gt; 0 
                                THEN T.Name + ''('' + CAST(SC.max_length AS varchar(10)) + '')''
                                WHEN T.system_type_id IN (167, 175, 231, 239) AND SC.max_length = -1 THEN T.Name + ''(MAX)''
                                -- For Numeric and Decimal data types
                                WHEN T.system_type_id IN (106, 108) THEN T.Name + ''('' + CAST(SC.precision AS varchar(10)) + '', '' + 
                                CAST(SC.scale AS varchar(10)) + '')''
                                ELSE T.Name
                                END
                                ),
    [DataType_CastGroup]    =   (
                                CASE
                                WHEN T.system_type_id IN (167, 175, 231, 239) THEN ''String''
                                -- For Numeric and Decimal data types
                                WHEN T.system_type_id IN (106, 108) THEN ''Numeric''
                                ELSE ''Other''
                                END
                                ),
    [Collation_Name]        =   SC.collation_name,
	[RemoteObject_DataType]		=	mizz.data_type
    FROM
    ' + @Target_DB_Name + '.sys.sysobjects so (NOLOCK)
    INNER JOIN ' + @Target_DB_Name + '.sys.columns sc (NOLOCK) ON
    sc.object_id = so.id
    LEFT JOIN ' + @Target_DB_Name + '.sys.default_constraints dc (NOLOCK) ON
    dc.parent_object_id = so.id
    AND dc.parent_column_id = sc.column_id
    INNER JOIN ' + @Target_DB_Name + '.sys.types t (NOLOCK) ON
    t.user_type_id = sc.user_type_id
    INNER JOIN ' + @Target_DB_Name + '.sys.schemas sh (NOLOCK) ON
    sh.schema_id = so.uid
    INNER  JOIN (
                select  
                Data_Obj_Id = t.object_id, 
                t.name as phisical_name, 
                s.name as s_name  ,
                Src_trg_Flag = ''S'',
                Object_Type = ''U''
                FROM   ' + @Target_DB_Name + '.sys.tables t
                JOIN sys.schemas s ON t.schema_id = s.schema_id
                WHERE s.name = ''' + @Target_DB_Schema_Name + '''
                and t.name = ''' + @Target_DB_Object_Name + '''
                ) src_tgt ON
    src_tgt.phisical_name = so.name and
    src_tgt.s_name = sh.name
	JOIN 
	(SELECT table_name, column_name, data_type 
	FROM OPENQUERY(' + @Remote_Server_Name + ', ''select table_name, column_name, data_type 
	from information_schema.columns 
	where table_name = ''''' + @Remote_DB_Object_Name
        + ''''' AND table_schema = ''''' + @Remote_Server_DB_Schema_Name
        + ''''''')) mizz
	on mizz.table_name = so.name and mizz.column_name = sc.name
    WHERE
    so.xtype = src_tgt.Object_Type
	AND NOT EXISTS (SELECT 1
					FROM  ControlDB.dbo.Ctrl_RemoteSvrs_Tables2Process_ColumnExceptions o 
					WHERE 
					o.Remote_Field_Name = sc.name AND
					o.Remote_Table_Name = so.name AND 
					o.Remote_Schema_Name = sh.name AND
					o.Is_Active = 1)'

     
    IF @IsDebugMode = 1
		BEGIN
			PRINT 'SQL statement for acquiring ''source'' table objects metadata into #Src_Tgt_Tables temp table:'
			PRINT '----------------------------------------------------------------------------------------------'
			PRINT @ExecSQL +REPLICATE(CHAR(13),2) 			
		END
   
    EXEC sp_executesql @ExecSQL

	IF @IsDebugMode = 1
		BEGIN
			SELECT * FROM #Src_Tgt_Tables WHERE Src_Tgt_Flag = 'S'
		END



/*======================================================================================
					EXTRACT TARGET DATABASE DATA FOR THE GIVEN OBJECT                          
======================================================================================*/
   
    SET @ExecSQL = 
	'INSERT INTO
    #Src_Tgt_Tables
    SELECT
    [Data_Obj_Id]           =   src_tgt.Data_Obj_Id,
    [Src_Tgt_Flag]          =   src_tgt.Src_Trg_Flag,
    [Object_Id]             =   so.id,
    [Object_Name]           =   so.name,
    [Schema_Name]           =   sh.name,
    [Schema_Object_Name]    =   ''[' + @Target_DB_Name + '].[''+sh.name+''].[''+so.name+'']'',
    [Column_Id]             =   sc.column_id,
    [Column_Name]           =   sc.name,
    [IsIdentity]            =   sc.is_identity,
    [IsComputed]            =   sc.is_computed,
    [IsNullable]            =   sc.is_nullable,
    [Default]               =   dc.definition,
    [DataType]              =   (
                                CASE
                                WHEN T.system_type_id IN (167, 175, 231, 239) AND SC.max_length &gt; 0 
                                THEN T.Name + ''('' + CAST(SC.max_length AS varchar(10)) + '')''
                                WHEN T.system_type_id IN (167, 175, 231, 239) AND SC.max_length = -1 THEN T.Name + ''(MAX)''
                                -- For Numeric and Decimal data types
                                WHEN T.system_type_id IN (106, 108) THEN T.Name + ''('' + CAST(SC.precision AS varchar(10)) + '', '' 
                                + CAST(SC.scale AS varchar(10)) + '')''
                                ELSE T.Name
                                END
                                ),
    [DataType_CastGroup]    =   (
                                CASE
                                WHEN T.system_type_id IN (167, 175, 231, 239) THEN ''String''
                                -- For Numeric and Decimal data types
                                WHEN T.system_type_id IN (106, 108) THEN ''Numeric''
                                ELSE ''Other''
                                END
                                ),
    [Collation_Name]        =   SC.collation_name,
	[RemoteObject_DataType]		=	mizz.data_type
    FROM
    ' + @Target_DB_Name + '.sys.sysobjects so
    INNER JOIN ' + @Target_DB_Name + '.sys.columns sc ON
    sc.object_id = so.id
    LEFT JOIN ' + @Target_DB_Name + '.sys.default_constraints dc ON
    dc.parent_object_id = so.id
    AND dc.parent_column_id = sc.column_id
    INNER JOIN ' + @Target_DB_Name + '.sys.types t ON
    t.user_type_id = sc.user_type_id
    INNER JOIN ' + @Target_DB_Name + '.sys.schemas sh ON
    sh.schema_id = so.uid
    INNER JOIN (
                select  
                Data_Obj_Id = t.object_id, 
                t.name as phisical_name, 
                s.name as s_name,
                Src_trg_Flag = ''T'',
                Object_Type = ''U''
                FROM   ' + @Target_DB_Name + '.sys.tables t
                JOIN sys.schemas s ON t.schema_id = s.schema_id
                WHERE S.name = ''' + @Target_DB_Schema_Name + '''
                and t.name = ''' + @Target_DB_Object_Name + '''
                ) src_tgt ON
    src_tgt.phisical_name = so.name and
    src_tgt.s_name = sh.name
	JOIN 
	(SELECT table_name, column_name, data_type 
	FROM OPENQUERY(' + @Remote_Server_Name + ', ''select table_name, column_name, data_type 
	from information_schema.columns 
	where table_name = ''''' + @Target_DB_Object_Name
        + ''''' AND table_schema = ''''' + @Remote_Server_DB_Schema_Name
        + ''''''')) mizz
	on mizz.table_name = so.name and mizz.column_name = sc.name
    WHERE
    so.xtype = ''U'' -- Table
	AND NOT EXISTS (SELECT 1 
					FROM  HNO_Control.dbo.Ctrl_RemoteSvrs_Tables2Process_ColumnExceptions o 
					WHERE 
					o.Local_Field_Name = sc.name AND
					o.Local_Table_name = so.name AND 
					o.Local_Schema_Name = sh.name AND
					o.Is_Active = 1)
    ORDER BY
    so.name,
    sc.column_id'



	IF @IsDebugMode = 1
		BEGIN
			PRINT 'SQL statement for acquiring ''target'' table objects metadata into #Src_Tgt_Tables temp table:'
			PRINT '----------------------------------------------------------------------------------------------'
			PRINT @ExecSQL +REPLICATE(CHAR(13),2) 
		END

	EXEC sp_executesql @ExecSQL

	IF @IsDebugMode = 1
		BEGIN
			SELECT * FROM #Src_Tgt_Tables WHERE Src_Tgt_Flag = 'T'
		END
 
/*======================================================================================
        ENSURE THAT SOURCE AND TARGET DETAILS ARE PRESENT IN TEMP TABLE                         
======================================================================================*/
  
    IF ( SELECT COUNT(*)
         FROM   #Src_Tgt_Tables ST
         WHERE  ST.Src_Tgt_Flag = 'S'
       ) &lt; 1
        BEGIN
            SET @Err_Msg = 'No Source table details found. Configured Source Database is "'
                + @Remote_Server_DB_Name + '".'
            RAISERROR (
        @Err_Msg  -- Message text.
        ,16 -- Severity.
            ,1 -- State.
  )
            RETURN
        END
 
    IF ( SELECT COUNT(*)
         FROM   #Src_Tgt_Tables ST
         WHERE  ST.Src_Tgt_Flag = 'T'
       ) &lt; 1
        BEGIN
            SET @Err_Msg = 'No Target table details found. Configured Source Database is "'
                + @Target_DB_Name + '".'
            RAISERROR (
        @Err_Msg  -- Message text.
        ,16 -- Severity.
            ,1 -- State.
  )
            RETURN
        END  


/*======================================================================================
           UPDATE COLUMN NAMES TO QUALIFIED STRINGS FOR MSSQL RESERVED WORDS                          
======================================================================================*/


			UPDATE #Src_Tgt_Tables 
			SET Column_Name = LOWER(b.mssql_version)
			FROM #Src_Tgt_Tables a JOIN dbo.vw_MssqlReservedWords b 
			ON a.Column_Name = b.reserved_word
			WHERE Src_Tgt_Flag = 'T'

/*======================================================================================
                PREPARE 'WHERE' CLAUSE FOR THE MERGE STATEMENT                          
======================================================================================*/
 
    SET @ExecSQL = 'INSERT INTO
    #Tgt_NK_Cols
    SELECT
    TOP 1
    Data_Obj_Id             =   tgt.Data_Obj_Id,
    [Schema_Object_Name]    =   tgt.[Schema_Object_Name],
    [Where_Clause]          =   STUFF(REPLACE((SELECT
                                ''  AND'' + '' TGT.[''+ sc.name +''] =
                                SRC.[''+ REPLACE(REPLACE(sc.name, ''&lt;'', ''~''), ''&gt;'', ''!'') + '']'' + CHAR(10)
                                FROM
                                ' + @Target_DB_Name + '.sys.sysindexkeys sik
                                INNER JOIN ' + @Target_DB_Name
        + '.sys.syscolumns sc on
                                sc.id = sik.id
                                and sc.colid = sik.colid
                                WHERE
                                sik.id = si.object_id
                                AND sik.indid = si.index_id
                                ORDER BY
                                sik.keyno
                                FOR XML PATH('''')
                                ), ''&amp;#x0D;'', ''''), 1, 5, ''''),
    [S_Schema_Object_Name]  =   (
                                SELECT  Top 1
                                S.Schema_Object_Name
                                FROM
                                #Src_Tgt_Tables S
                                WHERE
                                S.Src_Tgt_Flag = ''S'')
    FROM
    ' + @Target_DB_Name + '.sys.indexes si
    INNER JOIN ' + @Target_DB_Name + '.sys.sysobjects so ON
    so.id = si.object_id
    INNER JOIN ' + @Target_DB_Name + '.sys.schemas sh ON
    sh.schema_id = so.uid
    INNER JOIN (
                SELECT
                [Data_Obj_Id],
                [Object_Id],
                [Object_Name],
                [Schema_Name],
                [Schema_Object_Name]
                FROM
                #Src_Tgt_Tables
                WHERE
                Src_Tgt_Flag = ''T''
                ) tgt ON
                tgt.[Object_Id] = so.id
    WHERE 
    si.is_unique = 1 /*Only Unique Index*/'
  
    IF @IsDebugMode = 1
		BEGIN
			PRINT 'SQL statement for ''where'' table objects metadata into #Src_Tgt_Tables temp table:'
			PRINT '----------------------------------------------------------------------------------------------'
			PRINT @ExecSQL +REPLICATE(CHAR(13),2) 
		END
  
    EXEC sp_executesql @ExecSQL
  
    IF @IsDebugMode = 1
        BEGIN 
			SELECT  [Table] = '#Tgt_NK_Cols' , * FROM    #Tgt_NK_Cols 
		END
  
 
/*======================================================================================
                ENSURE THAT UNIQUE KEY INDEX IS PRESENT                     
======================================================================================*/
 
    IF EXISTS ( SELECT  1
                FROM    #Tgt_NK_Cols NK
                WHERE   NK.Where_Clause IS NULL )
        BEGIN
            SET @Err_Msg = 'No Unique Key Index is found. 
                    Configured Source Database is "' + @Target_DB_Name + '".'
  
            RAISERROR (
        @Err_Msg  -- Message text.
        ,16 -- Severity.
            ,1 -- State.
  )
            RETURN
        END


/*======================================================================================
                            PREPARE MERGE STATEMENT                         
======================================================================================*/
 
    DECLARE @MergeSQL					NVARCHAR(MAX) ,
			@UpdateColSet				NVARCHAR(MAX) ,
			@TargetColSet				NVARCHAR(MAX) ,
			@SourceColSet				NVARCHAR(MAX) ,
			@ValueColSet				NVARCHAR(MAX) ,
			@SourceColSetReplaceRemote	NVARCHAR(MAX) 

		
 
    SELECT  @MergeSQL = '
						MERGE ' + NK.Schema_Object_Name + ' TGT
						USING (SELECT{SOURCE_COLUMN_SET}
						FROM {NK.S_Schema_Object_Name} SRC) SRC ON ' + NK.Where_Clause + '
						WHEN MATCHED THEN
						UPDATE SET{UPDATE_COLUMN_SET}
						WHEN NOT MATCHED THEN
						INSERT({TARGET_COLUMN_SET})
						VALUES ({VALUE_COLUMN_SET})
						WHEN NOT MATCHED BY SOURCE THEN DELETE
						OUTPUT $action INTO #SummaryOfChanges(Action_Name);'
						FROM    #Tgt_NK_Cols NK

	IF @IsDebugMode = 1
		BEGIN
			PRINT 'SQL statement for the initial ''MERGE'' statement:'
			PRINT '----------------------------------------------------------------------------------------------'
			PRINT @MergeSQL +REPLICATE(CHAR(13),2) 
		END

    SELECT  @TargetColSet = REPLACE(STUFF(( SELECT  ',' + CHAR(10)
                                                    + CAST(TC.Column_Name AS VARCHAR(100))
                                            FROM    #Src_Tgt_Tables TC ( NOLOCK )
                                                    LEFT JOIN ( SELECT
                                                              T.[Data_Obj_Id] ,
                                                              T.[Column_Name] ,
                                                              T.[IsNullable] ,
                                                              T.[Default] ,
                                                              T.[DataType] ,
                                                              T.[DataType_CastGroup]
                                                              FROM
                                                              #Src_Tgt_Tables T ( NOLOCK )
                                                              WHERE
                                                              T.Src_Tgt_Flag = 'S'
                                                              AND T.Data_Obj_Id = TS.S_Data_Obj_Id
                                                              ) SC ON SC.Column_Name = TC.Column_Name
                                            WHERE   TC.Src_Tgt_Flag = 'T'
                                                    AND TC.Data_Obj_Id = TS.T_Data_Obj_Id
											--AND TC.IsIdentity &lt;&gt; 1 -- Ignore identity
                                                    AND TC.IsComputed &lt;&gt; 1 -- and computed columns
                                            ORDER BY TC.Column_Id
                                          FOR
                                            XML PATH('')
                                          ), 1, 1, ''), '&amp;#x0D;', '') , 
            @UpdateColSet = REPLACE(STUFF(( SELECT  ',' + CHAR(10)
                                                    + CAST(TC.Column_Name AS VARCHAR(100))
                                                    + ' = SRC.'
                                                    + ISNULL(SC.Column_Name,
                                                             TC.Column_Name)
                                            FROM    #Src_Tgt_Tables TC ( NOLOCK )
                                                    LEFT JOIN ( SELECT
                                                              T.[Data_Obj_Id] ,
                                                              T.[Column_Name] ,
                                                              T.[IsNullable] ,
                                                              T.[Default] ,
                                                              T.[DataType] ,
                                                              T.[DataType_CastGroup]
                                                              FROM
                                                              #Src_Tgt_Tables T ( NOLOCK )
                                                              WHERE
                                                              T.Src_Tgt_Flag = 'S'
                                                              AND T.Data_Obj_Id = TS.S_Data_Obj_Id
                                                              ) SC ON SC.Column_Name = TC.Column_Name
                                            WHERE   TC.Src_Tgt_Flag = 'T'
                                                    AND TC.Data_Obj_Id = TS.T_Data_Obj_Id
                                                    AND TC.IsIdentity &lt;&gt; 1 -- Ignore identity
                                                    AND TC.IsComputed &lt;&gt; 1 -- and computed columns
                                            ORDER BY TC.Column_Id
                                          FOR
                                            XML PATH('')
                                          ), 1, 1, ''), '&amp;#x0D;', '') ,
            @ValueColSet = REPLACE(STUFF(( SELECT   ',' + CHAR(10)
                                                    + ISNULL('SRC.'
                                                             + CAST(SC.Column_Name AS VARCHAR(100)),
                                                             'SRC.'
                                                             + CAST(TC.Column_Name AS VARCHAR(100)))
                                           FROM     #Src_Tgt_Tables TC ( NOLOCK )
                                                    LEFT JOIN ( SELECT
                                                              T.[Data_Obj_Id] ,
                                                              T.[Column_Name] ,
                                                              T.[IsNullable] ,
                                                              T.[Default] ,
                                                              T.[DataType] ,
                                                              T.[DataType_CastGroup]
                                                              FROM
                                                              #Src_Tgt_Tables T ( NOLOCK )
                                                              WHERE
                                                              T.Src_Tgt_Flag = 'S'
                                                              AND T.Data_Obj_Id = TS.S_Data_Obj_Id
                                                              ) SC ON SC.Column_Name = TC.Column_Name
                                           WHERE    TC.Src_Tgt_Flag = 'T'
                                                    AND TC.Data_Obj_Id = TS.T_Data_Obj_Id
                                                    AND TC.IsIdentity &lt;&gt; 1 -- Ignore identity
                                                    AND TC.IsComputed &lt;&gt; 1 -- and computed columns
                                           ORDER BY TC.Column_Id
                                         FOR
                                           XML PATH('')
                                         ), 1, 1, ''), '&amp;#x0D;', '') ,
            @SourceColSet = REPLACE(STUFF(( SELECT  ',' + CHAR(10)
                                                    + ISNULL(CAST(SC.New_Column_Name AS VARCHAR(100)),
                                                             CAST(TC.Column_Name AS VARCHAR(100)))
                                            FROM    #Src_Tgt_Tables TC ( NOLOCK )
                                                    LEFT JOIN ( SELECT
                                                              T.[Data_Obj_Id] ,
                                                              T.[Column_Name] ,
                                                              [New_Column_Name] = --T.[Column_Name],
                                                              CASE
                                                              WHEN T.RemoteObject_DataType = 'enum'
                                                              THEN 'CAST('
                                                              + T.[Column_Name]
                                                              + ' as char) as '
                                                              + T.[Column_Name] 
                                                              WHEN T.RemoteObject_DataType = 'bytea'
                                                              THEN 'CAST('
                                                              + T.[Column_Name]
                                                              + ' as char(4000)) as '
                                                              + T.[Column_Name]
															  ELSE T.[Column_Name]
                                                              END ,
                                                              T.[IsNullable] ,
                                                              T.[Default] ,
                                                              T.[DataType] ,
                                                              T.[DataType_CastGroup] ,
                                                              T.[RemoteObject_DataType]
                                                              FROM
                                                              #Src_Tgt_Tables T ( NOLOCK )
                                                              WHERE
                                                              T.Src_Tgt_Flag = 'S'
                                                              AND T.Data_Obj_Id = TS.S_Data_Obj_Id
                                                              ) SC ON SC.Column_Name = TC.Column_Name
                                            WHERE   TC.Src_Tgt_Flag = 'T'
                                                    AND TC.Data_Obj_Id = TS.T_Data_Obj_Id
                                                    AND TC.IsIdentity &lt;&gt; 1 -- Ignore identity
                                                    AND TC.IsComputed &lt;&gt; 1 -- and computed columns
                                            ORDER BY TC.Column_Id
                                          FOR
                                            XML PATH('')
                                          ), 1, 1, ''), '&amp;#x0D;', '')
    FROM    ( SELECT    [T_Data_Obj_Id] = T.Data_Obj_Id ,
                        [T_Schema_Object_Name] = T.Schema_Object_Name ,
                        [S_Schema_Object_Name] = ( SELECT TOP 1
                                                            S.Schema_Object_Name
                                                   FROM     #Src_Tgt_Tables S
                                                   WHERE    S.Src_Tgt_Flag = 'S'
                                                 ) ,
                        [S_Data_Obj_Id] = ( SELECT TOP 1
                                                    S.Data_Obj_Id
                                            FROM    #Src_Tgt_Tables S
                                            WHERE   S.Src_Tgt_Flag = 'S'
                                          )
              FROM      #Src_Tgt_Tables T
              WHERE     T.Src_Tgt_Flag = 'T'
            ) TS		


	IF @IsDebugMode = 1
		BEGIN
			SELECT @TargetColSet	AS Target_Column_Set
			SELECT @UpdateColSet	AS Updat_Column_Set
			SELECT @ValueColSet		AS Value_Column_Set
			SELECT @SourceColSet	AS Source_Column_Set
		END
	


			IF OBJECT_ID('tempdb..#TempSourceFieldsPivot') IS NOT NULL
				DROP TABLE #TempSourceFieldsPivot;
			CREATE TABLE #TempSourceFieldsPivot
            (
              ColName			VARCHAR(1024) ,
              MySQLOutput		VARCHAR(255) ,
              MSSQLOutput		VARCHAR(255) 
            );
			DECLARE @X XML;
			SET @X = CAST('&lt;A&gt;' + REPLACE(REPLACE(REPLACE(@SourceColSet,char(13),''),char(10),''), ',', '&lt;/A&gt;&lt;A&gt;') + '&lt;/A&gt;' AS XML);
			INSERT  INTO #TempSourceFieldsPivot( ColName)
            SELECT  t.value('.', 'VARCHAR(max)')
            FROM    @x.nodes('/A') AS x ( t );


			IF @IsDebugMode = 1
				BEGIN
					SELECT  *
					FROM    #TempSourceFieldsPivot;
				END;

			UPDATE  #TempSourceFieldsPivot
			SET     ColName = REPLACE(REPLACE(ColName, ']', ''), '[', '');

			UPDATE  #TempSourceFieldsPivot
			SET     MySQLOutput			= b.mysql_version ,
					MSSQLOutput			= c.mssql_version 
			FROM    #TempSourceFieldsPivot a
					LEFT JOIN dbo.vw_MysqlReservedWords b		ON UPPER(LTRIM(RTRIM(a.ColName))) = UPPER(LTRIM(RTRIM(b.reserved_word)))
					LEFT JOIN dbo.vw_MssqlReservedWords c		ON UPPER(LTRIM(RTRIM(a.ColName))) = UPPER(LTRIM(RTRIM(c.reserved_word)));
					

			IF @IsDebugMode = 1
				BEGIN
					SELECT  *
					FROM    #TempSourceFieldsPivot;
				END;

			
						SELECT  @SourceColSetReplaceRemote = 'OPENQUERY(' + @Remote_Server_Name
								+ ', ''SELECT '
								+ ( SELECT DISTINCT
								STUFF(( SELECT  ',' + u.ColName
										FROM    (	SELECT    COALESCE(MySQLOutput, ColName) AS ColName
													FROM      #TempSourceFieldsPivot
												) u
										WHERE   u.ColName = ColName
									--order by u.ColName
									FOR
                                    XML PATH('')
                                  ), 1, 1, '') AS list
						FROM    (	SELECT    COALESCE(MySQLOutput, ColName) AS ColName
									FROM      #TempSourceFieldsPivot
								) a
						GROUP BY ColName
									) + ' FROM ' + @Remote_DB_Object_Name + ''')'
						

			IF @IsDebugMode = 1
				BEGIN
					SELECT  @SourceColSetReplaceRemote AS Replacement_Column_list
				END;


			IF OBJECT_ID('tempdb..#TempSourceFieldsPivot') IS NOT NULL
				DROP TABLE #TempSourceFieldsPivot;
		


    SELECT  @MergeSQL = REPLACE(@MergeSQL, '{UPDATE_COLUMN_SET}',@UpdateColSet)
								
    SELECT  @MergeSQL = REPLACE(@MergeSQL, '{TARGET_COLUMN_SET}',@TargetColSet)
								
    SELECT  @MergeSQL = REPLACE(@MergeSQL, '{SOURCE_COLUMN_SET}',@SourceColSet)
							
    SELECT  @MergeSQL = REPLACE(@MergeSQL, '{VALUE_COLUMN_SET}', @ValueColSet)

    SELECT  @MergeSQL = REPLACE(@MergeSQL, '{NK.S_Schema_Object_Name}', @SourceColSetReplaceRemote)
	
					
	IF @IsDebugMode = 1
		BEGIN
			PRINT 'SQL statement for the final ''MERGE'' statement:'
			PRINT '----------------------------------------------------------------------------------------------'
			PRINT @MergeSQL +REPLICATE(CHAR(13),2) 
		END								
								
/*======================================================================================
            EXECUTE MERGE STATEMENT AND CHECK FOR EXECUTION RESULTS                         
======================================================================================*/
 
    DECLARE 
		@UpdatedCount INT ,
        @InsertedCount INT ,
        @DeletedCount INT ,
        @StartTime DATETIME ,
        @EndTime DATETIME

    IF OBJECT_ID('tempdb..#SummaryOfChanges') IS NOT NULL
        BEGIN
            DROP TABLE [#SummaryOfChanges]
        END 
    CREATE TABLE #SummaryOfChanges
        (
          Action_Name VARCHAR(50)
        );
    SET @StartTime = GETDATE()

    IF @Err_Msg IS NULL
        BEGIN TRY
            BEGIN TRANSACTION         
            EXEC sp_executesql @MergeSQL

            SELECT  @UpdatedCount = SUM(CASE WHEN Action_Name = 'UPDATE'
                                             THEN 1
                                             ELSE 0
                                        END) ,
                    @InsertedCount = SUM(CASE WHEN Action_Name = 'INSERT'
                                              THEN 1
                                              ELSE 0
                                         END) ,
                    @DeletedCount = SUM(CASE WHEN Action_Name = 'DELETE'
                                             THEN 1
                                             ELSE 0
                                        END)
            FROM    #SummaryOfChanges
     
            IF @IsDebugMode = 1
                BEGIN
                    SELECT  @UpdatedCount AS Records_Updated
                    SELECT  @InsertedCount AS Records_Inserted
                    SELECT  @DeletedCount AS Records_Deleted
                END
            COMMIT TRANSACTION
        END TRY
        BEGIN CATCH
            ROLLBACK TRANSACTION;
            WITH    TempErr ( 
							[ErrorNumber], 
							[ErrorSeverity], 
							[ErrorState], 
							[ErrorLine], 
							[ErrorMessage], 
							[ErrorDateTime], 
							[LoginName], 
							[UserName], 
							[PackageName], 
							[ObjectID], 
							[ProcessID], 
							[ExecutionInstanceGUID], 
							[DBName] 
							)
                      AS ( SELECT   ERROR_NUMBER()			AS ErrorNumber		,
                                    ERROR_SEVERITY()		AS ErrorSeverity	,
                                    ERROR_STATE()			AS ErrorState		,
                                    ERROR_LINE()			AS ErrorLine		,
                                    ERROR_MESSAGE()			AS ErrorMessage		,
                                    SYSDATETIME()			AS ErrorDateTime	,
                                    SYSTEM_USER				AS LoginName		,
                                    USER_NAME()				AS UserName			,
                                    @Package_Name			AS PackageName		,
                                    OBJECT_ID('' + @Target_DB_Name + '.'
                                              + @Target_DB_Schema_Name + '.'
                                              + @Target_DB_Object_Name + '') AS ObjectID ,
                                    ( SELECT    a.objectid
                                      FROM      sys.dm_exec_requests r
                                                CROSS   APPLY sys.dm_exec_sql_text(r.sql_handle) a
                                      WHERE     session_id = @@spid
                                    ) AS ProcessID ,
                                    @Exec_Instance_GUID		AS ExecutionInstanceGUID ,
                                    DB_NAME()				AS DatabaseName
                         )
                INSERT  INTO AdminDBA.dbo.LogSSISErrors_Error
                        ( [ErrorNumber] ,
                          [ErrorSeverity] ,
                          [ErrorState] ,
                          [ErrorLine] ,
                          [ErrorMessage] ,
                          [ErrorDateTime] ,
                          [FKLoginID] ,
                          [FKUserID] ,
                          [FKPackageID] ,
                          [FKObjectID] ,
                          [FKProcessID] ,
                          [ExecutionInstanceGUID]
                        )
                        SELECT  ErrorNumber				= COALESCE(err.ErrorNumber, -1) ,
                                ErrorSeverity			= COALESCE(err.[ErrorSeverity], -1) ,
                                ErrorState				= COALESCE(err.[ErrorState], -1) ,
                                ErrorLine				= COALESCE(err.[ErrorLine], -1) ,
                                ErrorMessage			= COALESCE(err.[ErrorMessage], 'Unknown') ,
                                ErrorDateTime			= ErrorDateTime ,
                                FKLoginID				= src_login.ID ,
                                FKUserID				= src_user.ID ,
                                [FKPackageID]			= src_package.ID ,
                                [FKObjectID]			= src_object.ID ,
                                [FKProcessID]			= src_process.ID ,
                                [ExecutionInstanceGUID] = err.ExecutionInstanceGUID
                        FROM    TempErr err
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Login src_login ON err.LoginName = src_login.LoginName
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_User src_user ON err.UserName = src_user.UserName
                                                              AND src_user.FKDBID = 
																					(	SELECT ID FROM
																						AdminDBA.dbo.LogSSISErrors_DB db
																						WHERE
																						db.DBName = err.DBName
																					)
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Package src_package ON err.PackageName = 
																											( LEFT(src_package.PackageName, CHARINDEX('.', src_package.PackageName)- 1) )
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Object src_object ON err.ObjectID = src_object.ObjectID
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Process src_process ON err.ProcessID = src_process.ProcessID
                        WHERE   src_login.CurrentlyUsed = 1
                                AND src_user.CurrentlyUsed = 1
                                --AND src_package.CurrentlyUsed = 1
                                AND src_object.CurrentlyUsed = 1
                                AND src_process.CurrentlyUsed = 1   							                                                 
        END CATCH    
	END

Another important aspect to highlight is the provision of error logging architecture. The BEGIN CATCH…END CATCH statement enables a robust error handling capabilities in case unexpected event occurrence, logging error massages in AdminDBA database for future reporting and troubleshooting. In this way, even in spite of errors typically halting acquisition process completely, the package can continue its execution, recovering from error events gracefully and transparently. Since MERGE SQL statement can reconcile small to medium size tables, I have found it to be a good alternative to target tables truncation and insertion.

Post-acquisition Tasks Overview and Code

Continuing on, now that we have the nuts and bolts of our data acquisition process out of the way we can move into outlining post-acquisition tasks.  As with all the activities preceding source-to-target data coping, some clean-up and maintenance tasks need to be run to finalise acquisition and tie up all the loose ends e.g. indexes re-creation, statistics update, error log checking etc.

Firstly, previously dropped indexes can be re-created using the same stored procedure we used in part 2. The only difference is the variable @Create_Drop_Idxs value which can now be set to CREATE, rather than DROP. Next, given the potentially considerable data and values distribution change resulting from new data being added, we will update all the tables’ statistics using the following stored procedure.

USE StagingDB;
GO

CREATE PROCEDURE [dbo].[usp_runUpdateStagingDBStatistics]
    (
      @Target_DB_Name VARCHAR(128) ,
      @Target_DB_Schema_Name VARCHAR(128) ,
      @Is_All_OK INT OUTPUT ,
      @Error_Message VARCHAR(MAX) OUTPUT ,
      @Process_Name VARCHAR(250) OUTPUT 
    )
    WITH RECOMPILE
AS
    SET NOCOUNT ON;	
    BEGIN
        DECLARE @ID TINYINT;
        DECLARE @IsDebugMode BIT;
        DECLARE @StartDateTime DATETIME = SYSDATETIME();
        DECLARE @TableName VARCHAR(256);
        DECLARE @TableSchemaName VARCHAR(128);
        DECLARE @SQL VARCHAR(2056);

        SET @Process_Name = ( SELECT    OBJECT_NAME(objectid)
                              FROM      sys.dm_exec_requests r
                                        CROSS   APPLY sys.dm_exec_sql_text(r.sql_handle) a
                              WHERE     session_id = @@spid
                            );
        SET @IsDebugMode = 1;

        IF OBJECT_ID('tempdb..#stats_details') IS NOT NULL
            BEGIN
                DROP TABLE #stats_details;
            END;
        CREATE TABLE #stats_details
            (
              ID INT IDENTITY(1, 1)
                     NOT NULL ,
              TableName VARCHAR(256) NOT NULL ,
              SchemaName VARCHAR(128) NOT NULL ,
              IndexID INT NULL ,
              Statistic VARCHAR(256) NOT NULL ,
              ColumnsInStatistic VARCHAR(256) NOT NULL ,
              WasAutoCreated TINYINT NOT NULL ,
              WasUserCreated TINYINT NOT NULL ,
              IsFiltered TINYINT NULL ,
              FilterDefinition VARCHAR(256) NULL ,
              IsTemporary TINYINT NULL ,
              StatisticsUpdateDate DATETIME NULL
            );

        DECLARE db_statscursor CURSOR FORWARD_ONLY
        FOR
            SELECT  ROW_NUMBER() OVER ( ORDER BY t.TABLE_NAME ASC, t.TABLE_SCHEMA ASC ) AS ID ,
                    t.TABLE_NAME ,
                    t.TABLE_SCHEMA
            FROM    INFORMATION_SCHEMA.TABLES t
                    JOIN HNO_Control.dbo.Ctrl_RemoteSvrs_Tables2Process m ON t.TABLE_NAME = m.Local_Table_Name
                                                              AND t.TABLE_SCHEMA = m.Local_Schema_Name
                                                              --AND m.Remote_Server_Name = @Remote_Server_Name
                                                              AND m.Local_DB_Name = @Target_DB_Name
                                                              AND m.Local_Schema_Name = @Target_DB_Schema_Name
            WHERE   t.TABLE_TYPE = 'BASE TABLE'
                    AND m.Is_Active = 1;        

        OPEN db_statscursor;
        FETCH NEXT
		FROM db_statscursor INTO @ID, @TableName, @TableSchemaName;
        WHILE @@FETCH_STATUS = 0
            BEGIN
                SET @SQL = 'UPDATE STATISTICS ' + @TableSchemaName + '.'
                    + @TableName + ' WITH FULLSCAN';
				
                IF @IsDebugMode = 1
                    BEGIN
                        PRINT @SQL;
                    END;

                EXEC(@SQL);

                INSERT  INTO #stats_details
                        SELECT  [so].[name] AS [TableName] ,
                                @TableSchemaName ,
                                [si].[index_id] AS [Index_ID] ,
                                [ss].[name] AS [Statistic] ,
                                STUFF(( SELECT  ', ' + [c].[name]
                                        FROM    [sys].[stats_columns] [sc]
                                                JOIN [sys].[columns] [c] ON [c].[column_id] = [sc].[column_id]
                                                              AND [c].[object_id] = [sc].[object_id]
                                        WHERE   [sc].[object_id] = [ss].[object_id]
                                                AND [sc].[stats_id] = [ss].[stats_id]
                                        ORDER BY [sc].[stats_column_id]
                                      FOR
                                        XML PATH('')
                                      ), 1, 2, '') AS [ColumnsInStatistic] ,
                                [ss].[auto_created] AS [WasAutoCreated] ,
                                [ss].[user_created] AS [WasUserCreated] ,
                                [ss].[has_filter] AS [IsFiltered] ,
                                [ss].[filter_definition] AS [FilterDefinition] ,
                                [ss].[is_temporary] AS [IsTemporary] ,
                                STATS_DATE([so].[object_id], stats_id) AS [StatisticsUpdateDate]
                        FROM    [sys].[stats] [ss]
                                JOIN [sys].[objects] AS [so] ON [ss].[object_id] = [so].[object_id]
                                JOIN [sys].[schemas] AS [sch] ON [so].[schema_id] = [sch].[schema_id]
                                LEFT OUTER JOIN [sys].[indexes] AS [si] ON [so].[object_id] = [si].[object_id]
                                                              AND [ss].[name] = [si].[name]
                        WHERE   [so].[object_id] = OBJECT_ID(N''
                                                             + @TableSchemaName
                                                             + '.'
                                                             + @TableName + '')
                        ORDER BY [ss].[user_created] ,
                                [ss].[auto_created] ,
                                [ss].[has_filter];										

                FETCH NEXT    FROM db_statscursor INTO @ID, @TableName,
                    @TableSchemaName;
            END;
        CLOSE db_statscursor;
        DEALLOCATE db_statscursor;  
		
        IF @IsDebugMode = 1
            BEGIN
                SELECT  *
                FROM    #stats_details;
            END;  

        IF EXISTS ( SELECT  1
                    FROM    ( SELECT    TableName ,
                                        SchemaName ,
                                        StatisticsUpdateDate
                              FROM      #stats_details sd
                                        JOIN INFORMATION_SCHEMA.TABLES t ON sd.TableName = t.TABLE_NAME
                                                              AND t.TABLE_SCHEMA = sd.SchemaName
                              WHERE     StatisticsUpdateDate NOT BETWEEN @StartDateTime
                                                             AND
                                                              SYSDATETIME()
                            ) a )
            BEGIN 
                SET @Error_Message = 'Statistics on ''' + @Target_DB_Name
                    + ''' database for ''' + @Target_DB_Schema_Name
                    + ''' schema could not be updated due to an error. Please troubleshoot.'
                    + CHAR(10);										
                SET @Is_All_OK = 0;
            END;
        ELSE
            BEGIN
                SET @Is_All_OK = 1;
                SET @Error_Message = 'All Good!';
            END;

        IF OBJECT_ID('tempdb..#stats_details') IS NOT NULL
            BEGIN
                DROP TABLE #stats_details;
            END;
    END;
GO

Next step, executed as a script task, sets the value for the @Sync_Exec_EndTime variable. The @Sync_Exec_StartTime and @Sync_Exec_EndTime variables’ values allow for establishing a time boundary between when the package commenced and completed its execution. This is important as error log checking that initiates as part of this framework towards the end of the execution workflow needs to be performed in the time window registered by @Sync_Exec_StartTime and @Sync_Exec_EndTime variables. In this way, we can pinpoint specific start and end time and make the process search for error log entries only between those exact times. I will enclose the actual code in part 4 to this series when describing the overall SSIS package structure.

Moving on, we can perform some rudimentary validation steps e.g. record count between the source and target tables with the help of another stored procedure. As with most of the tasks in this framework, this part can be easily omitted but since a simple record count is the least we can do to confirm target-to-source data consistency, it is worthwhile to include this or similar check at the end of the process (providing the source database is not being written to during package execution). The following stored procedure compares source and target record counts for each table.

USE [StagingDB];
GO


CREATE PROCEDURE [dbo].[usp_checkRemoteSvrDBvsLocalDBRecCounts]
    (
      @Remote_Server_Name VARCHAR(256) ,
      @Remote_Server_DB_Name VARCHAR(128) ,
      @Remote_Server_DB_Schema_Name VARCHAR(128) ,
      @Target_DB_Name VARCHAR(128) ,
      @Is_All_OK INT OUTPUT ,
      @Process_Name VARCHAR(250) OUTPUT ,
      @Error_Message VARCHAR(MAX) OUTPUT
    )
AS
    SET NOCOUNT ON;
    BEGIN
        SET @Process_Name = ( SELECT    OBJECT_NAME(objectid)
                              FROM      sys.dm_exec_requests r
                                        CROSS   APPLY sys.dm_exec_sql_text(r.sql_handle) a
                              WHERE     session_id = @@spid
                            );     

        IF OBJECT_ID('tempdb..#TempTbl') IS NOT NULL
            BEGIN
                DROP TABLE #TempTbl;
            END; 
        CREATE TABLE #TempTbl
            (
              ID INT IDENTITY(1, 1) ,
              TableName VARCHAR(256) NOT NULL ,
              TableSchemaName VARCHAR(50) NOT NULL ,
              LocalOrRemote VARCHAR(32) NOT NULL ,
              RecordCount BIGINT NOT NULL
            );    
        DECLARE @ID TINYINT;
        DECLARE @Table_Name VARCHAR(256);
        DECLARE @Table_Schema_Name VARCHAR(128);
        DECLARE @SQL VARCHAR(2056);

        DECLARE db_idxcursor CURSOR FORWARD_ONLY
        FOR
            SELECT  ROW_NUMBER() OVER ( ORDER BY t.TABLE_NAME ASC, t.TABLE_SCHEMA ASC ) AS ID ,
                    t.TABLE_NAME ,
                    t.TABLE_SCHEMA
            FROM    INFORMATION_SCHEMA.TABLES t
                    JOIN HNO_Control.dbo.Ctrl_RemoteSvrs_Tables2Process m ON t.TABLE_NAME = m.Local_Table_Name
                                                              AND t.TABLE_SCHEMA = m.Local_Schema_Name
                                                              AND m.Remote_Server_Name = @Remote_Server_Name
                                                              AND t.TABLE_CATALOG = @Target_DB_Name
            WHERE   t.TABLE_TYPE = 'BASE TABLE'
                    AND m.Is_Active = 1;        

        OPEN db_idxcursor;
        FETCH NEXT
			 FROM db_idxcursor INTO @ID, @Table_Name, @Table_Schema_Name;
        WHILE @@FETCH_STATUS = 0
            BEGIN
                SET @SQL = 'INSERT INTO #TempTbl (TableName, TableSchemaName, LocalOrRemote, RecordCount)
							SELECT ''' + @Table_Name + ''','''
                    + @Table_Schema_Name
                    + ''' , ''Remote'', * FROM OPENQUERY('
                    + @Remote_Server_Name + ',''select count(1) as ct from '
                    + @Remote_Server_DB_Schema_Name + '.' + @Table_Name + ''')
							UNION ALL
							SELECT ''' + @Table_Name + ''', '''
                    + @Table_Schema_Name + ''',''Local'', COUNT(1) 
							FROM ' + @Target_DB_Name + '.'
                    + @Table_Schema_Name + '.' + @Table_Name + '';
                EXEC(@SQL);
                FETCH NEXT    FROM db_idxcursor INTO @ID, @Table_Name,
                    @Table_Schema_Name;
            END;
        CLOSE db_idxcursor;
        DEALLOCATE db_idxcursor;

        DECLARE @DiffSourceTarget TABLE
            (
              TableName VARCHAR(512) ,
              RecordCount INT
            );
        INSERT  INTO @DiffSourceTarget
                ( TableName ,
                  RecordCount
                )
                SELECT  TableName ,
                        RecordCount
                FROM    #TempTbl
                WHERE   LocalOrRemote = 'Local'
                EXCEPT
                SELECT  TableName ,
                        RecordCount
                FROM    #TempTbl
                WHERE   LocalOrRemote = 'Remote';



        DECLARE @tablesListSource VARCHAR(MAX) = ( SELECT   STUFF(( SELECT
                                                              ', ' + TableName
                                                              FROM
                                                              @DiffSourceTarget
                                                              FOR
                                                              XML
                                                              PATH('')
                                                              ), 1, 1, '')
                                                 );
        IF EXISTS ( SELECT  1
                    FROM    ( SELECT    TableName ,
                                        RecordCount
                              FROM      #TempTbl
                              WHERE     LocalOrRemote = 'Local'
                              EXCEPT
                              SELECT    TableName ,
                                        RecordCount
                              FROM      #TempTbl
                              WHERE     LocalOrRemote = 'Remote'
                            ) a )
            BEGIN 
                SET @Error_Message = 'Post-reconciliation record count between local and remote objects is different for the following tables:'
                    + CHAR(10);
                SET @Error_Message = @Error_Message + '' + @tablesListSource;										
                SET @Is_All_OK = 0;
            END;
        ELSE
            BEGIN
                SET @Is_All_OK = 1;
                SET @Error_Message = 'All Good!';
            END;

        IF OBJECT_ID('tempdb..#TempTbl') IS NOT NULL
            BEGIN
                DROP TABLE #TempTbl;
            END; 							
    END;
GO

Following on, we continue with more validations and check and interrogate the error logging database for any entries that may have occurred during acquisition. As you saw in part 2 of this series as well as in the small tables acquisition stored procedure above, the code is designed in a manner which does not stall or stop the process from continuing in the event of data coping failure. Rather, it logs any unexpected errors in the AdminDBA database and continues in a looping fashion until all tables are accounted for. This mechanism prevents the process from falling over in the event an exception was raised so even though the package may have reported a successful completion status, it is possible an errors was raised and needs to be addressed. The next piece of code queries the AdminDBA database for any entries that occurred between the package execution start time and end time to report any discrepancies.

USE [StagingDB]
GO


CREATE PROCEDURE [dbo].[usp_checkRemoteSvrDBvsLocalDBSyncErrors]
    (
      @Sync_Exec_StartTime			DATETIME ,
      @Sync_Exec_EndTime			DATETIME ,
      @Is_All_OK					INT							OUTPUT ,
	  @Error_Message				VARCHAR				(MAX)	OUTPUT,
	  @Process_Name					VARCHAR				(250)	OUTPUT 
    )
    WITH RECOMPILE
AS
    SET NOCOUNT ON	
    BEGIN
	SET @Process_Name = ( SELECT    OBJECT_NAME(objectid)
                              FROM      sys.dm_exec_requests r
                                        CROSS   APPLY sys.dm_exec_sql_text(r.sql_handle) a
                              WHERE     session_id = @@spid
                            )
        IF EXISTS ( SELECT TOP 1
                            1
                    FROM    [AdminDBA].[dbo].[LogSSISErrors_Error]
                    WHERE   ErrorDateTime BETWEEN @Sync_Exec_StartTime
                                          AND     @Sync_Exec_EndTime )
            BEGIN 
                SET @Is_All_OK = 0
				SET @Error_Message = 'Errors were raised during data acquisition process. '							+CHAR(10)
				SET @Error_Message = @Error_Message+ 'A detailed log has been saved in AdminDBA database. '			+CHAR(10) 
				SET @Error_Message = @Error_Message+ 'Click on the link above to access error instances report '	+CHAR(10)
				SET @Error_Message = @Error_Message+ 'or query the database directly to troubleshoot further.' 
            END
        ELSE
            BEGIN
                SET @Is_All_OK = 1
				SET @Error_Message = 'All Good!'
            END
    END
GO

Finally, nearly every transformation in the package links up with an email sending Execute SQL Task, notifying administrator(s) of any runtime issues that may arise. These tasks execute the ‘usp_sendBIGroupETLFailMessage‘ stored procedure outlined in part 1 of this series, which in turn send out an e-mail with an appropriate message content to nominated e-mail addresses. Whether the execution of this stored procedure is triggered or not entirely depends on the value of @Is_All_OK variable (included in most stored procedures outlined in this series as an OUTPUT) thus the precedence constraint path the package selects i.e. if the @Is_All_OK variable is 1, next transformation is triggered, if @Is_All_OK variable is 0, error-raising stored procedure is run. Also, since Visual Studio development environment does not allow a single e-mail sending task to be linked up with multiple transformations, unfortunately, each of them has to be duplicated, introducing unneeded redundancy and making the package layout messy and cluttered. Beyond this pain point, the rest is fairly straightforward with details of the actual SSIS package outlined in the next post to this series in Part 4.

Tags: , , ,