{"id":2891,"date":"2016-05-25T06:33:45","date_gmt":"2016-05-25T06:33:45","guid":{"rendered":"http:\/\/bicortex.com\/?p=2891"},"modified":"2016-05-26T08:24:35","modified_gmt":"2016-05-26T08:24:35","slug":"designing-data-acquisition-framework-in-sql-server-and-ssis-how-to-source-and-integrate-external-data-for-a-decision-support-system-or-data-warehouse-part-3","status":"publish","type":"post","link":"http:\/\/bicortex.com\/bicortex\/designing-data-acquisition-framework-in-sql-server-and-ssis-how-to-source-and-integrate-external-data-for-a-decision-support-system-or-data-warehouse-part-3\/","title":{"rendered":"Designing data acquisition framework in SQL Server and SSIS \u2013 how to source and integrate external data for a decision support system or data warehouse (Part 3)"},"content":{"rendered":"<p style=\"text-align: justify;\">Note: Part 1 to this series can be found <a href=\"http:\/\/bicortex.com\/designing-data-acquisition-framework-in-sql-server-and-ssis-how-to-source-and-integrate-external-data-for-a-decision-support-system-or-data-warehouse-part-1\/\" target=\"_blank\">HERE<\/a><strong>, <\/strong>Part 2 <a href=\"http:\/\/bicortex.com\/designing-data-acquisition-framework-in-sql-server-and-ssis-how-to-source-and-integrate-external-data-for-a-decision-support-system-or-data-warehouse-part-2\/\" target=\"_blank\">HERE<\/a><strong>, <\/strong>Part 4 <a href=\"http:\/\/bicortex.com\/designing-data-acquisition-framework-in-sql-server-and-ssis-how-to-source-and-integrate-external-data-for-a-decision-support-system-or-data-warehouse-part-4\/\" target=\"_blank\">HERE<\/a> and all the code and additional files for this post can be downloaded from my OneDrive folder <a href=\"https:\/\/onedrive.live.com\/redir?resid=715AEF07A82832E1!60083&amp;authkey=!AFYnKHXMHLSfr5Q&amp;ithint=folder%2c\" target=\"_blank\">HERE<\/a>.<\/p>\n<p style=\"text-align: justify;\">Continuing on from Part 2 to this series, having set up all the routines dealing with pre-acquisition activities as well as large tables migration, this post focuses on smaller tables which do not require partitioning and parallel data load. I will also cover post-acquisition activities which run to tie up all the loose ends and ensure data consistency and readiness for business use. All in all, the following tasks will be covered in detail in this post.<\/p>\n<p style=\"text-align: justify;\"><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2016\/05\/Data_Acquisition_Framework_Part2_SSIS_Package_Bottom_Level_Overview.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-2948\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2016\/05\/Data_Acquisition_Framework_Part2_SSIS_Package_Bottom_Level_Overview.png\" alt=\"Data_Acquisition_Framework_Part2_SSIS_Package_Bottom_Level_Overview\" width=\"580\" height=\"665\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2016\/05\/Data_Acquisition_Framework_Part2_SSIS_Package_Bottom_Level_Overview.png 808w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2016\/05\/Data_Acquisition_Framework_Part2_SSIS_Package_Bottom_Level_Overview-261x300.png 261w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2016\/05\/Data_Acquisition_Framework_Part2_SSIS_Package_Bottom_Level_Overview-768x881.png 768w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<h3 style=\"text-align: center;\">Small\u00a0Tables Acquisition Tasks Overview and Code<\/h3>\n<p style=\"text-align: justify;\">Since some tables may contain only a few records, spinning up multiple SQL Server Agent jobs (see <a href=\"http:\/\/bicortex.com\/designing-data-acquisition-framework-in-sql-server-and-ssis-how-to-source-and-integrate-external-data-for-a-decision-support-system-or-data-warehouse-part-2\/\" target=\"_blank\">Part 2<\/a>) creates unnecessary overhead. In cases where only few records need to be copied across, it is easier to execute a simple MERGE SQL statement which performs simultaneous UPDATE and INSERT statement based on referenced tables column names. To avoid individual column-to-column mapping for each table we can query database metadata and, providing a primary key is present on source and target tables, we can reference each column is an automated fashion.\u00a0 The following stored procedure allows for dynamic MERGE SQL statement creation which elevates creating table-to-table row-level mapping. I wrote\u00a0about this type of data replication in my previous post <a href=\"http:\/\/bicortex.com\/how-to-synchronise-data-across-two-sql-server-databases-part-1-sql-code-and-application-for-individual-objects-processing\/\" target=\"_blank\">HERE<\/a>,<strong>\u00a0<\/strong>where similar stored procedure was used to copy data across from database X to database Y on the same SQL Server instance.<strong>\u00a0<\/strong>In order to modify that setup and make it applicable to the current scenario i.e. remote host running MySQL database, a few changes were necessary. Most importantly, all of the queries related to the source database metadata or\/and data had to be modified to include the OPENQUERY statement.\u00a0Secondly,\u00a0allowances needed to be made in order to enable MySQL and SQL Server data types and certain syntax conventions conformance e.g. certain MySQL reserved words need to be encapsulated in backtick quotes in the OPENQUERY statements in order to be validated and recognized by SQL Server. Likewise, certain SQL Server reserved words need to be used with square brackets delimiters. Finally, any MySQL metadata related queries had to conform to its internal database catalog architecture i.e.\u00a0metadata database storing\u00a0information about the MySQL server such as the name of a database or table, the data type of a column, or access privileges.<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nUSE &#x5B;StagingDB]\r\nGO\r\n\r\n \r\nCREATE PROCEDURE &#x5B;dbo].&#x5B;usp_runRemoteSvrDBSchemaSyncSmallTablesMaster]\r\n    (\r\n      @Remote_Server_Name\t\t\t\t\t\tSYSNAME ,\r\n\t  @Remote_Server_DB_Name\t\t\t\t\tVARCHAR\t\t\t\t(128) ,\r\n      @Remote_Server_DB_Schema_Name\t\t\t\tVARCHAR\t\t\t\t(128) ,\r\n      @Target_DB_Name\t\t\t\t\t\t\tVARCHAR\t\t\t\t(128) ,\r\n      @Target_DB_Schema_Name\t\t\t\t\tVARCHAR\t\t\t\t(128) ,\r\n      @Target_DB_Object_Name\t\t\t\t\tVARCHAR\t\t\t\t(128) ,\r\n      @Exec_Instance_GUID\t\t\t\t\t\tUNIQUEIDENTIFIER ,\r\n      @Package_Name\t\t\t\t\t\t\t\tVARCHAR\t\t\t\t(256)\r\n    )\r\n    WITH RECOMPILE\r\nAS\r\n    SET NOCOUNT ON \r\n\tBEGIN\r\n\r\n\r\n    DECLARE\t\t@IsDebugMode\t\t\t\t\tBIT \r\n\tDECLARE \t@ExecSQL\t\t\t\t\t\tNVARCHAR(MAX) \r\n\tDECLARE \t@Err_Msg\t\t\t\t\t\tNVARCHAR(1000) \r\n\tDECLARE \t@Remote_DB_Object_Name\t\t\tVARCHAR (128)\t\t= @Target_DB_Object_Name\r\n\tDECLARE \t@Exec_Instance_GUID_As_Nvarchar NVARCHAR(56) = ( SELECT REPLACE(CAST (@Exec_Instance_GUID AS NVARCHAR(56)), '-', '')) \r\n\r\n    SET @IsDebugMode = 1\r\n \t\t\r\n\/*====================================================================================\r\n                                CREATE TEMP TABLES                          \r\n======================================================================================*\/\r\n\r\n    IF OBJECT_ID('tempdb..#Src_Tgt_Tables') IS NOT NULL\r\n        BEGIN\r\n            DROP TABLE &#x5B;#Src_Tgt_Tables]\r\n        END \r\n    CREATE TABLE #Src_Tgt_Tables\r\n        (\r\n          &#x5B;Data_Obj_Id] &#x5B;INT] NOT NULL ,\r\n          &#x5B;Src_Tgt_Flag] &#x5B;VARCHAR](1) NOT NULL ,\r\n          &#x5B;Object_Id] &#x5B;INT] NOT NULL ,\r\n          &#x5B;Object_Name] &#x5B;sysname] NOT NULL ,\r\n          &#x5B;Schema_Name] &#x5B;sysname] NOT NULL ,\r\n          &#x5B;Schema_Object_Name] &#x5B;VARCHAR](260) NOT NULL ,\r\n          &#x5B;Column_Id] &#x5B;SMALLINT] NULL ,\r\n          &#x5B;Column_Name] &#x5B;VARCHAR](200) NULL ,\r\n          &#x5B;IsIdentity] &#x5B;TINYINT] NULL ,\r\n          &#x5B;IsComputed] &#x5B;TINYINT] NULL ,\r\n          &#x5B;IsNullable] &#x5B;TINYINT] NULL ,\r\n          &#x5B;Default] &#x5B;VARCHAR](MAX) NULL ,\r\n          &#x5B;DataType] &#x5B;VARCHAR](152) NULL ,\r\n          &#x5B;DataType_CastGroup] &#x5B;VARCHAR](134) NOT NULL ,\r\n          &#x5B;Collation_Name] &#x5B;sysname] NULL ,\r\n          &#x5B;RemoteObject_DataType] &#x5B;VARCHAR](152) NULL\r\n        )\r\n\t\t  \r\n\r\n    IF OBJECT_ID('tempdb..#Tgt_NK_Cols') IS NOT NULL\r\n        BEGIN\r\n            DROP TABLE &#x5B;#Tgt_NK_Cols]\r\n        END  \r\n    CREATE TABLE #Tgt_NK_Cols\r\n        (\r\n          &#x5B;Data_Obj_Id] &#x5B;INT] NOT NULL ,\r\n          &#x5B;Schema_Object_Name] &#x5B;VARCHAR](260) NOT NULL ,\r\n          &#x5B;Where_Clause] &#x5B;VARCHAR](MAX) NULL ,\r\n          S_Schema_Object_Name &#x5B;VARCHAR](260) NOT NULL\r\n        )\r\n \r\n\/*======================================================================================\r\n\t\t\t\t\tPERFORM DATABASES, SCHEMAS AND OBJECT CHECKS                           \r\n======================================================================================*\/ \r\n    IF OBJECT_ID('tempdb..#Objects_List') IS NOT NULL\r\n        BEGIN\r\n            DROP TABLE &#x5B;#Objects_List]\r\n        END  \r\n    CREATE TABLE #Objects_List\r\n        (\r\n          DatabaseName sysname ,\r\n          SchemaName sysname ,\r\n          ObjectName sysname ,\r\n\t\t  Is_Source_Target VARCHAR (56)\r\n        )\r\n \r\n    SET @ExecSQL = 'SELECT table_catalog, table_schema, table_name, ''Target'' as Is_Source_Target\r\n\t\t\t\t\tFROM INFORMATION_SCHEMA.tables \r\n\t\t\t\t\tWHERE table_type = ''base table'' \r\n\t\t\t\t\tand table_catalog = '''+@Target_DB_Name+'''\r\n\t\t\t\t\tand table_schema = '''+@Target_DB_Schema_Name+'''\r\n\t\t\t\t\tand table_name = '''+@Target_DB_Object_Name+''''\t\r\n\t\r\n\tIF @IsDebugMode = 1\r\n\t\tBEGIN\r\n\t\t\tPRINT 'SQL statement for acquiring ''target'' table base data into #Objects_List temp table:'\r\n\t\t\tPRINT '------------------------------------------------------------------------------------'\r\n\t\t\tPRINT @ExecSQL +REPLICATE(CHAR(13),2) \r\n\t\tEND\t\r\n\r\n\tINSERT  INTO #Objects_List (DatabaseName, SchemaName, ObjectName, Is_Source_Target)\r\n\tEXEC (@ExecSQL)\r\n\t\t\t\r\n\tIF @IsDebugMode = 1\r\n\t\tBEGIN\r\n\t\t\tSELECT * FROM #Objects_List WHERE Is_Source_Target = 'Target'\r\n\t\tEND\r\n\r\n\tSET @ExecSQL =\t\r\n\t\t\t\t\t\t'\r\n\t\t\t\t\t\tSELECT DatabaseName, SchemaName, ObjectName, ''Source'' as Is_Source_Target\r\n\t\t\t\t\t\tFROM OPENQUERY ('+@Remote_Server_Name+', ''select table_schema as DatabaseName, table_schema as SchemaName, table_name as ObjectName\r\n\t\t\t\t\t\tfrom information_schema.tables \r\n\t\t\t\t\t\tWHERE table_type = ''''BASE TABLE'''' \r\n\t\t\t\t\t\tand table_name = '''''+@Remote_DB_Object_Name+'''''\r\n\t\t\t\t\t\tand table_schema ='''''+@Remote_Server_DB_Name+''''''')'\r\n\t\t\t\t\t\t\t\r\n\tIF @IsDebugMode = 1\r\n\t\tBEGIN\r\n\t\t\tPRINT 'SQL statement for acquiring ''source'' table base data into #Objects_List temp table:'\r\n\t\t\tPRINT '------------------------------------------------------------------------------------'\r\n\t\t\tPRINT @ExecSQL +REPLICATE(CHAR(13),2) \t\t\t\r\n\t\tEND\r\n\r\n\tINSERT  INTO #Objects_List (DatabaseName, SchemaName, ObjectName, Is_Source_Target)\r\n\tEXEC (@ExecSQL)\t\r\n\r\n\tIF @IsDebugMode = 1\r\n\t\tBEGIN\r\n\t\t\tSELECT * FROM #Objects_List WHERE Is_Source_Target = 'Source'\r\n\t\tEND\r\n\r\n\r\n    \r\n\tIF @IsDebugMode = 1\r\n\t\tBEGIN\r\n\t\t\tSELECT\tSource_Server_Name\t\t\t= @Remote_Server_Name,\r\n\t\t\t\t\tSource_Server_DB_Name\t\t= @Remote_Server_DB_Name,\r\n\t\t\t\t\tSource_Object_Name\t\t\t= @Remote_DB_Object_Name,\r\n\t\t\t\t\tTarget_DB_Name\t\t\t\t= @Target_DB_Name,\r\n\t\t\t\t\tTarget_DB_Schema_Name\t\t= @Target_DB_Schema_Name,\r\n\t\t\t\t\tTarget_DB_Object_Name\t\t= @Target_DB_Object_Name\r\n\t\tEND\r\n    IF NOT EXISTS ( SELECT  TOP 1 1\r\n                    FROM    #Objects_List a\r\n                    WHERE   a.DatabaseName = @Remote_Server_DB_Name AND a.Is_Source_Target = 'Source' )\r\n        BEGIN\r\n            SET @Err_Msg = 'Source database cannot be found. You nominated &quot;'\r\n                + @Remote_Server_DB_Name + '&quot;. \r\n                Check that the database of that name exists on the instance'\r\n            RAISERROR (\r\n        @Err_Msg  -- Message text.\r\n        ,16 -- Severity.\r\n            ,1 -- State.\r\n  )\r\n            RETURN\r\n        END   \r\n \r\n    IF NOT EXISTS ( SELECT  1\r\n                    FROM    #Objects_List a\r\n                    WHERE   a.DatabaseName = @Target_DB_Name AND a.Is_Source_Target = 'Target')\r\n        BEGIN\r\n            SET @Err_Msg = 'Target database cannot be found. You nominated &quot;'\r\n                + @Target_DB_Name + '&quot;. \r\n                Check that the database of that name exists on the instance'\r\n            RAISERROR (\r\n        @Err_Msg  -- Message text.\r\n        ,16 -- Severity.\r\n            ,1 -- State.\r\n  )\r\n            RETURN\r\n        END   \r\n \r\n    IF NOT EXISTS ( SELECT TOP 1 1\r\n                    FROM    #Objects_List a\r\n                    WHERE   a.SchemaName = @Remote_Server_DB_Schema_Name AND  a.Is_Source_Target = 'Source' )\r\n        BEGIN\r\n            SET @Err_Msg = 'Source schema cannot be found. You nominated &quot;'\r\n                + @Remote_Server_DB_Schema_Name + '&quot;. \r\n                Check that the schema of that name exists on the database'\r\n            RAISERROR (\r\n        @Err_Msg  -- Message text.\r\n        ,16 -- Severity.\r\n            ,1 -- State.\r\n  )\r\n        END  \r\n \r\n    IF NOT EXISTS ( SELECT  TOP 1 1\r\n                    FROM    #Objects_List a\r\n                    WHERE   a.SchemaName = @Target_DB_Schema_Name AND a.Is_Source_Target = 'Target' )\r\n        BEGIN\r\n            SET @Err_Msg = 'Target schema cannot be found. You nominated &quot;'\r\n                + @Target_DB_Schema_Name + '&quot;. \r\n                Check that the schema of that name exists on the database'\r\n            RAISERROR (\r\n        @Err_Msg  -- Message text.\r\n        ,16 -- Severity.\r\n            ,1 -- State.\r\n  )\r\n            RETURN\r\n        END \r\n \r\n    IF NOT EXISTS ( SELECT TOP 1 1\r\n                    FROM    #Objects_List a\r\n                    WHERE   a.ObjectName = @Remote_DB_Object_Name AND a.Is_Source_Target = 'Source')\r\n        BEGIN\r\n            SET @Err_Msg = 'Source object cannot be found. You nominated &quot;'\r\n                + @Remote_DB_Object_Name + '&quot;. \r\n                Check that the object of that name exists on the database'\r\n            RAISERROR (\r\n        @Err_Msg  -- Message text.\r\n        ,16 -- Severity.\r\n            ,1 -- State.\r\n  )\r\n            RETURN\r\n        END\r\n \r\n    IF NOT EXISTS ( SELECT  1\r\n                    FROM    #Objects_List a\r\n                    WHERE   a.ObjectName = @Target_DB_Object_Name AND a.Is_Source_Target = 'Target')\r\n        BEGIN\r\n            SET @Err_Msg = 'Target object cannot be found. You nominated &quot;'\r\n                + @Target_DB_Object_Name + '&quot;. \r\n                Check that the object of that name exists on the database'\r\n            RAISERROR (\r\n        @Err_Msg  -- Message text.\r\n        ,16 -- Severity.\r\n            ,1 -- State.\r\n  )\r\n            RETURN\r\n        END\r\n \r\n \r\n\/*======================================================================================\r\n\t\t\t\t\tEXTRACT SOURCE DATABASE DATA FOR THE GIVEN OBJECT                          \r\n======================================================================================*\/\r\n\r\nSET @ExecSQL =\r\n     'INSERT INTO\r\n    #Src_Tgt_Tables\r\n    SELECT\r\n    &#x5B;Data_Object_Id]        =   so.id,\r\n    &#x5B;Src_Tgt_Flag]          =   src_tgt.Src_Trg_Flag,\r\n    &#x5B;Object_Id]             =   so.id,\r\n    &#x5B;Object_Name]           =   so.name,\r\n    &#x5B;Schema_Name]           =   sh.name,\r\n    &#x5B;Schema_Object_Name]    =   ''&#x5B;' + @Target_DB_Name + '].&#x5B;''+sh.name+''].&#x5B;''+so.name+'']'',\r\n    &#x5B;Column_Id]             =   sc.column_id,\r\n    &#x5B;Column_Name]           =   sc.name,\r\n    &#x5B;IsIdentity]            =   sc.is_identity,\r\n    &#x5B;IsComputed]            =   sc.is_computed,\r\n    &#x5B;IsNullable]            =   sc.is_nullable,\r\n    &#x5B;Default]               =   dc.definition,\r\n    &#x5B;DataType]              =   (\r\n                                CASE\r\n                                WHEN T.system_type_id IN (167, 175, 231, 239) AND SC.max_length &amp;gt; 0 \r\n                                THEN T.Name + ''('' + CAST(SC.max_length AS varchar(10)) + '')''\r\n                                WHEN T.system_type_id IN (167, 175, 231, 239) AND SC.max_length = -1 THEN T.Name + ''(MAX)''\r\n                                -- For Numeric and Decimal data types\r\n                                WHEN T.system_type_id IN (106, 108) THEN T.Name + ''('' + CAST(SC.precision AS varchar(10)) + '', '' + \r\n                                CAST(SC.scale AS varchar(10)) + '')''\r\n                                ELSE T.Name\r\n                                END\r\n                                ),\r\n    &#x5B;DataType_CastGroup]    =   (\r\n                                CASE\r\n                                WHEN T.system_type_id IN (167, 175, 231, 239) THEN ''String''\r\n                                -- For Numeric and Decimal data types\r\n                                WHEN T.system_type_id IN (106, 108) THEN ''Numeric''\r\n                                ELSE ''Other''\r\n                                END\r\n                                ),\r\n    &#x5B;Collation_Name]        =   SC.collation_name,\r\n\t&#x5B;RemoteObject_DataType]\t\t=\tmizz.data_type\r\n    FROM\r\n    ' + @Target_DB_Name + '.sys.sysobjects so (NOLOCK)\r\n    INNER JOIN ' + @Target_DB_Name + '.sys.columns sc (NOLOCK) ON\r\n    sc.object_id = so.id\r\n    LEFT JOIN ' + @Target_DB_Name + '.sys.default_constraints dc (NOLOCK) ON\r\n    dc.parent_object_id = so.id\r\n    AND dc.parent_column_id = sc.column_id\r\n    INNER JOIN ' + @Target_DB_Name + '.sys.types t (NOLOCK) ON\r\n    t.user_type_id = sc.user_type_id\r\n    INNER JOIN ' + @Target_DB_Name + '.sys.schemas sh (NOLOCK) ON\r\n    sh.schema_id = so.uid\r\n    INNER  JOIN (\r\n                select  \r\n                Data_Obj_Id = t.object_id, \r\n                t.name as phisical_name, \r\n                s.name as s_name  ,\r\n                Src_trg_Flag = ''S'',\r\n                Object_Type = ''U''\r\n                FROM   ' + @Target_DB_Name + '.sys.tables t\r\n                JOIN sys.schemas s ON t.schema_id = s.schema_id\r\n                WHERE s.name = ''' + @Target_DB_Schema_Name + '''\r\n                and t.name = ''' + @Target_DB_Object_Name + '''\r\n                ) src_tgt ON\r\n    src_tgt.phisical_name = so.name and\r\n    src_tgt.s_name = sh.name\r\n\tJOIN \r\n\t(SELECT table_name, column_name, data_type \r\n\tFROM OPENQUERY(' + @Remote_Server_Name + ', ''select table_name, column_name, data_type \r\n\tfrom information_schema.columns \r\n\twhere table_name = ''''' + @Remote_DB_Object_Name\r\n        + ''''' AND table_schema = ''''' + @Remote_Server_DB_Schema_Name\r\n        + ''''''')) mizz\r\n\ton mizz.table_name = so.name and mizz.column_name = sc.name\r\n    WHERE\r\n    so.xtype = src_tgt.Object_Type\r\n\tAND NOT EXISTS (SELECT 1\r\n\t\t\t\t\tFROM  ControlDB.dbo.Ctrl_RemoteSvrs_Tables2Process_ColumnExceptions o \r\n\t\t\t\t\tWHERE \r\n\t\t\t\t\to.Remote_Field_Name = sc.name AND\r\n\t\t\t\t\to.Remote_Table_Name = so.name AND \r\n\t\t\t\t\to.Remote_Schema_Name = sh.name AND\r\n\t\t\t\t\to.Is_Active = 1)'\r\n\r\n     \r\n    IF @IsDebugMode = 1\r\n\t\tBEGIN\r\n\t\t\tPRINT 'SQL statement for acquiring ''source'' table objects metadata into #Src_Tgt_Tables temp table:'\r\n\t\t\tPRINT '----------------------------------------------------------------------------------------------'\r\n\t\t\tPRINT @ExecSQL +REPLICATE(CHAR(13),2) \t\t\t\r\n\t\tEND\r\n   \r\n    EXEC sp_executesql @ExecSQL\r\n\r\n\tIF @IsDebugMode = 1\r\n\t\tBEGIN\r\n\t\t\tSELECT * FROM #Src_Tgt_Tables WHERE Src_Tgt_Flag = 'S'\r\n\t\tEND\r\n\r\n\r\n\r\n\/*======================================================================================\r\n\t\t\t\t\tEXTRACT TARGET DATABASE DATA FOR THE GIVEN OBJECT                          \r\n======================================================================================*\/\r\n   \r\n    SET @ExecSQL = \r\n\t'INSERT INTO\r\n    #Src_Tgt_Tables\r\n    SELECT\r\n    &#x5B;Data_Obj_Id]           =   src_tgt.Data_Obj_Id,\r\n    &#x5B;Src_Tgt_Flag]          =   src_tgt.Src_Trg_Flag,\r\n    &#x5B;Object_Id]             =   so.id,\r\n    &#x5B;Object_Name]           =   so.name,\r\n    &#x5B;Schema_Name]           =   sh.name,\r\n    &#x5B;Schema_Object_Name]    =   ''&#x5B;' + @Target_DB_Name + '].&#x5B;''+sh.name+''].&#x5B;''+so.name+'']'',\r\n    &#x5B;Column_Id]             =   sc.column_id,\r\n    &#x5B;Column_Name]           =   sc.name,\r\n    &#x5B;IsIdentity]            =   sc.is_identity,\r\n    &#x5B;IsComputed]            =   sc.is_computed,\r\n    &#x5B;IsNullable]            =   sc.is_nullable,\r\n    &#x5B;Default]               =   dc.definition,\r\n    &#x5B;DataType]              =   (\r\n                                CASE\r\n                                WHEN T.system_type_id IN (167, 175, 231, 239) AND SC.max_length &amp;gt; 0 \r\n                                THEN T.Name + ''('' + CAST(SC.max_length AS varchar(10)) + '')''\r\n                                WHEN T.system_type_id IN (167, 175, 231, 239) AND SC.max_length = -1 THEN T.Name + ''(MAX)''\r\n                                -- For Numeric and Decimal data types\r\n                                WHEN T.system_type_id IN (106, 108) THEN T.Name + ''('' + CAST(SC.precision AS varchar(10)) + '', '' \r\n                                + CAST(SC.scale AS varchar(10)) + '')''\r\n                                ELSE T.Name\r\n                                END\r\n                                ),\r\n    &#x5B;DataType_CastGroup]    =   (\r\n                                CASE\r\n                                WHEN T.system_type_id IN (167, 175, 231, 239) THEN ''String''\r\n                                -- For Numeric and Decimal data types\r\n                                WHEN T.system_type_id IN (106, 108) THEN ''Numeric''\r\n                                ELSE ''Other''\r\n                                END\r\n                                ),\r\n    &#x5B;Collation_Name]        =   SC.collation_name,\r\n\t&#x5B;RemoteObject_DataType]\t\t=\tmizz.data_type\r\n    FROM\r\n    ' + @Target_DB_Name + '.sys.sysobjects so\r\n    INNER JOIN ' + @Target_DB_Name + '.sys.columns sc ON\r\n    sc.object_id = so.id\r\n    LEFT JOIN ' + @Target_DB_Name + '.sys.default_constraints dc ON\r\n    dc.parent_object_id = so.id\r\n    AND dc.parent_column_id = sc.column_id\r\n    INNER JOIN ' + @Target_DB_Name + '.sys.types t ON\r\n    t.user_type_id = sc.user_type_id\r\n    INNER JOIN ' + @Target_DB_Name + '.sys.schemas sh ON\r\n    sh.schema_id = so.uid\r\n    INNER JOIN (\r\n                select  \r\n                Data_Obj_Id = t.object_id, \r\n                t.name as phisical_name, \r\n                s.name as s_name,\r\n                Src_trg_Flag = ''T'',\r\n                Object_Type = ''U''\r\n                FROM   ' + @Target_DB_Name + '.sys.tables t\r\n                JOIN sys.schemas s ON t.schema_id = s.schema_id\r\n                WHERE S.name = ''' + @Target_DB_Schema_Name + '''\r\n                and t.name = ''' + @Target_DB_Object_Name + '''\r\n                ) src_tgt ON\r\n    src_tgt.phisical_name = so.name and\r\n    src_tgt.s_name = sh.name\r\n\tJOIN \r\n\t(SELECT table_name, column_name, data_type \r\n\tFROM OPENQUERY(' + @Remote_Server_Name + ', ''select table_name, column_name, data_type \r\n\tfrom information_schema.columns \r\n\twhere table_name = ''''' + @Target_DB_Object_Name\r\n        + ''''' AND table_schema = ''''' + @Remote_Server_DB_Schema_Name\r\n        + ''''''')) mizz\r\n\ton mizz.table_name = so.name and mizz.column_name = sc.name\r\n    WHERE\r\n    so.xtype = ''U'' -- Table\r\n\tAND NOT EXISTS (SELECT 1 \r\n\t\t\t\t\tFROM  HNO_Control.dbo.Ctrl_RemoteSvrs_Tables2Process_ColumnExceptions o \r\n\t\t\t\t\tWHERE \r\n\t\t\t\t\to.Local_Field_Name = sc.name AND\r\n\t\t\t\t\to.Local_Table_name = so.name AND \r\n\t\t\t\t\to.Local_Schema_Name = sh.name AND\r\n\t\t\t\t\to.Is_Active = 1)\r\n    ORDER BY\r\n    so.name,\r\n    sc.column_id'\r\n\r\n\r\n\r\n\tIF @IsDebugMode = 1\r\n\t\tBEGIN\r\n\t\t\tPRINT 'SQL statement for acquiring ''target'' table objects metadata into #Src_Tgt_Tables temp table:'\r\n\t\t\tPRINT '----------------------------------------------------------------------------------------------'\r\n\t\t\tPRINT @ExecSQL +REPLICATE(CHAR(13),2) \r\n\t\tEND\r\n\r\n\tEXEC sp_executesql @ExecSQL\r\n\r\n\tIF @IsDebugMode = 1\r\n\t\tBEGIN\r\n\t\t\tSELECT * FROM #Src_Tgt_Tables WHERE Src_Tgt_Flag = 'T'\r\n\t\tEND\r\n \r\n\/*======================================================================================\r\n        ENSURE THAT SOURCE AND TARGET DETAILS ARE PRESENT IN TEMP TABLE                         \r\n======================================================================================*\/\r\n  \r\n    IF ( SELECT COUNT(*)\r\n         FROM   #Src_Tgt_Tables ST\r\n         WHERE  ST.Src_Tgt_Flag = 'S'\r\n       ) &amp;lt; 1\r\n        BEGIN\r\n            SET @Err_Msg = 'No Source table details found. Configured Source Database is &quot;'\r\n                + @Remote_Server_DB_Name + '&quot;.'\r\n            RAISERROR (\r\n        @Err_Msg  -- Message text.\r\n        ,16 -- Severity.\r\n            ,1 -- State.\r\n  )\r\n            RETURN\r\n        END\r\n \r\n    IF ( SELECT COUNT(*)\r\n         FROM   #Src_Tgt_Tables ST\r\n         WHERE  ST.Src_Tgt_Flag = 'T'\r\n       ) &amp;lt; 1\r\n        BEGIN\r\n            SET @Err_Msg = 'No Target table details found. Configured Source Database is &quot;'\r\n                + @Target_DB_Name + '&quot;.'\r\n            RAISERROR (\r\n        @Err_Msg  -- Message text.\r\n        ,16 -- Severity.\r\n            ,1 -- State.\r\n  )\r\n            RETURN\r\n        END  \r\n\r\n\r\n\/*======================================================================================\r\n           UPDATE COLUMN NAMES TO QUALIFIED STRINGS FOR MSSQL RESERVED WORDS                          \r\n======================================================================================*\/\r\n\r\n\r\n\t\t\tUPDATE #Src_Tgt_Tables \r\n\t\t\tSET Column_Name = LOWER(b.mssql_version)\r\n\t\t\tFROM #Src_Tgt_Tables a JOIN dbo.vw_MssqlReservedWords b \r\n\t\t\tON a.Column_Name = b.reserved_word\r\n\t\t\tWHERE Src_Tgt_Flag = 'T'\r\n\r\n\/*======================================================================================\r\n                PREPARE 'WHERE' CLAUSE FOR THE MERGE STATEMENT                          \r\n======================================================================================*\/\r\n \r\n    SET @ExecSQL = 'INSERT INTO\r\n    #Tgt_NK_Cols\r\n    SELECT\r\n    TOP 1\r\n    Data_Obj_Id             =   tgt.Data_Obj_Id,\r\n    &#x5B;Schema_Object_Name]    =   tgt.&#x5B;Schema_Object_Name],\r\n    &#x5B;Where_Clause]          =   STUFF(REPLACE((SELECT\r\n                                ''  AND'' + '' TGT.&#x5B;''+ sc.name +''] =\r\n                                SRC.&#x5B;''+ REPLACE(REPLACE(sc.name, ''&amp;lt;'', ''~''), ''&amp;gt;'', ''!'') + '']'' + CHAR(10)\r\n                                FROM\r\n                                ' + @Target_DB_Name + '.sys.sysindexkeys sik\r\n                                INNER JOIN ' + @Target_DB_Name\r\n        + '.sys.syscolumns sc on\r\n                                sc.id = sik.id\r\n                                and sc.colid = sik.colid\r\n                                WHERE\r\n                                sik.id = si.object_id\r\n                                AND sik.indid = si.index_id\r\n                                ORDER BY\r\n                                sik.keyno\r\n                                FOR XML PATH('''')\r\n                                ), ''&amp;amp;#x0D;'', ''''), 1, 5, ''''),\r\n    &#x5B;S_Schema_Object_Name]  =   (\r\n                                SELECT  Top 1\r\n                                S.Schema_Object_Name\r\n                                FROM\r\n                                #Src_Tgt_Tables S\r\n                                WHERE\r\n                                S.Src_Tgt_Flag = ''S'')\r\n    FROM\r\n    ' + @Target_DB_Name + '.sys.indexes si\r\n    INNER JOIN ' + @Target_DB_Name + '.sys.sysobjects so ON\r\n    so.id = si.object_id\r\n    INNER JOIN ' + @Target_DB_Name + '.sys.schemas sh ON\r\n    sh.schema_id = so.uid\r\n    INNER JOIN (\r\n                SELECT\r\n                &#x5B;Data_Obj_Id],\r\n                &#x5B;Object_Id],\r\n                &#x5B;Object_Name],\r\n                &#x5B;Schema_Name],\r\n                &#x5B;Schema_Object_Name]\r\n                FROM\r\n                #Src_Tgt_Tables\r\n                WHERE\r\n                Src_Tgt_Flag = ''T''\r\n                ) tgt ON\r\n                tgt.&#x5B;Object_Id] = so.id\r\n    WHERE \r\n    si.is_unique = 1 \/*Only Unique Index*\/'\r\n  \r\n    IF @IsDebugMode = 1\r\n\t\tBEGIN\r\n\t\t\tPRINT 'SQL statement for ''where'' table objects metadata into #Src_Tgt_Tables temp table:'\r\n\t\t\tPRINT '----------------------------------------------------------------------------------------------'\r\n\t\t\tPRINT @ExecSQL +REPLICATE(CHAR(13),2) \r\n\t\tEND\r\n  \r\n    EXEC sp_executesql @ExecSQL\r\n  \r\n    IF @IsDebugMode = 1\r\n        BEGIN \r\n\t\t\tSELECT  &#x5B;Table] = '#Tgt_NK_Cols' , * FROM    #Tgt_NK_Cols \r\n\t\tEND\r\n  \r\n \r\n\/*======================================================================================\r\n                ENSURE THAT UNIQUE KEY INDEX IS PRESENT                     \r\n======================================================================================*\/\r\n \r\n    IF EXISTS ( SELECT  1\r\n                FROM    #Tgt_NK_Cols NK\r\n                WHERE   NK.Where_Clause IS NULL )\r\n        BEGIN\r\n            SET @Err_Msg = 'No Unique Key Index is found. \r\n                    Configured Source Database is &quot;' + @Target_DB_Name + '&quot;.'\r\n  \r\n            RAISERROR (\r\n        @Err_Msg  -- Message text.\r\n        ,16 -- Severity.\r\n            ,1 -- State.\r\n  )\r\n            RETURN\r\n        END\r\n\r\n\r\n\/*======================================================================================\r\n                            PREPARE MERGE STATEMENT                         \r\n======================================================================================*\/\r\n \r\n    DECLARE @MergeSQL\t\t\t\t\tNVARCHAR(MAX) ,\r\n\t\t\t@UpdateColSet\t\t\t\tNVARCHAR(MAX) ,\r\n\t\t\t@TargetColSet\t\t\t\tNVARCHAR(MAX) ,\r\n\t\t\t@SourceColSet\t\t\t\tNVARCHAR(MAX) ,\r\n\t\t\t@ValueColSet\t\t\t\tNVARCHAR(MAX) ,\r\n\t\t\t@SourceColSetReplaceRemote\tNVARCHAR(MAX) \r\n\r\n\t\t\r\n \r\n    SELECT  @MergeSQL = '\r\n\t\t\t\t\t\tMERGE ' + NK.Schema_Object_Name + ' TGT\r\n\t\t\t\t\t\tUSING (SELECT{SOURCE_COLUMN_SET}\r\n\t\t\t\t\t\tFROM {NK.S_Schema_Object_Name} SRC) SRC ON ' + NK.Where_Clause + '\r\n\t\t\t\t\t\tWHEN MATCHED THEN\r\n\t\t\t\t\t\tUPDATE SET{UPDATE_COLUMN_SET}\r\n\t\t\t\t\t\tWHEN NOT MATCHED THEN\r\n\t\t\t\t\t\tINSERT({TARGET_COLUMN_SET})\r\n\t\t\t\t\t\tVALUES ({VALUE_COLUMN_SET})\r\n\t\t\t\t\t\tWHEN NOT MATCHED BY SOURCE THEN DELETE\r\n\t\t\t\t\t\tOUTPUT $action INTO #SummaryOfChanges(Action_Name);'\r\n\t\t\t\t\t\tFROM    #Tgt_NK_Cols NK\r\n\r\n\tIF @IsDebugMode = 1\r\n\t\tBEGIN\r\n\t\t\tPRINT 'SQL statement for the initial ''MERGE'' statement:'\r\n\t\t\tPRINT '----------------------------------------------------------------------------------------------'\r\n\t\t\tPRINT @MergeSQL +REPLICATE(CHAR(13),2) \r\n\t\tEND\r\n\r\n    SELECT  @TargetColSet = REPLACE(STUFF(( SELECT  ',' + CHAR(10)\r\n                                                    + CAST(TC.Column_Name AS VARCHAR(100))\r\n                                            FROM    #Src_Tgt_Tables TC ( NOLOCK )\r\n                                                    LEFT JOIN ( SELECT\r\n                                                              T.&#x5B;Data_Obj_Id] ,\r\n                                                              T.&#x5B;Column_Name] ,\r\n                                                              T.&#x5B;IsNullable] ,\r\n                                                              T.&#x5B;Default] ,\r\n                                                              T.&#x5B;DataType] ,\r\n                                                              T.&#x5B;DataType_CastGroup]\r\n                                                              FROM\r\n                                                              #Src_Tgt_Tables T ( NOLOCK )\r\n                                                              WHERE\r\n                                                              T.Src_Tgt_Flag = 'S'\r\n                                                              AND T.Data_Obj_Id = TS.S_Data_Obj_Id\r\n                                                              ) SC ON SC.Column_Name = TC.Column_Name\r\n                                            WHERE   TC.Src_Tgt_Flag = 'T'\r\n                                                    AND TC.Data_Obj_Id = TS.T_Data_Obj_Id\r\n\t\t\t\t\t\t\t\t\t\t\t--AND TC.IsIdentity &amp;lt;&amp;gt; 1 -- Ignore identity\r\n                                                    AND TC.IsComputed &amp;lt;&amp;gt; 1 -- and computed columns\r\n                                            ORDER BY TC.Column_Id\r\n                                          FOR\r\n                                            XML PATH('')\r\n                                          ), 1, 1, ''), '&amp;amp;#x0D;', '') , \r\n            @UpdateColSet = REPLACE(STUFF(( SELECT  ',' + CHAR(10)\r\n                                                    + CAST(TC.Column_Name AS VARCHAR(100))\r\n                                                    + ' = SRC.'\r\n                                                    + ISNULL(SC.Column_Name,\r\n                                                             TC.Column_Name)\r\n                                            FROM    #Src_Tgt_Tables TC ( NOLOCK )\r\n                                                    LEFT JOIN ( SELECT\r\n                                                              T.&#x5B;Data_Obj_Id] ,\r\n                                                              T.&#x5B;Column_Name] ,\r\n                                                              T.&#x5B;IsNullable] ,\r\n                                                              T.&#x5B;Default] ,\r\n                                                              T.&#x5B;DataType] ,\r\n                                                              T.&#x5B;DataType_CastGroup]\r\n                                                              FROM\r\n                                                              #Src_Tgt_Tables T ( NOLOCK )\r\n                                                              WHERE\r\n                                                              T.Src_Tgt_Flag = 'S'\r\n                                                              AND T.Data_Obj_Id = TS.S_Data_Obj_Id\r\n                                                              ) SC ON SC.Column_Name = TC.Column_Name\r\n                                            WHERE   TC.Src_Tgt_Flag = 'T'\r\n                                                    AND TC.Data_Obj_Id = TS.T_Data_Obj_Id\r\n                                                    AND TC.IsIdentity &amp;lt;&amp;gt; 1 -- Ignore identity\r\n                                                    AND TC.IsComputed &amp;lt;&amp;gt; 1 -- and computed columns\r\n                                            ORDER BY TC.Column_Id\r\n                                          FOR\r\n                                            XML PATH('')\r\n                                          ), 1, 1, ''), '&amp;amp;#x0D;', '') ,\r\n            @ValueColSet = REPLACE(STUFF(( SELECT   ',' + CHAR(10)\r\n                                                    + ISNULL('SRC.'\r\n                                                             + CAST(SC.Column_Name AS VARCHAR(100)),\r\n                                                             'SRC.'\r\n                                                             + CAST(TC.Column_Name AS VARCHAR(100)))\r\n                                           FROM     #Src_Tgt_Tables TC ( NOLOCK )\r\n                                                    LEFT JOIN ( SELECT\r\n                                                              T.&#x5B;Data_Obj_Id] ,\r\n                                                              T.&#x5B;Column_Name] ,\r\n                                                              T.&#x5B;IsNullable] ,\r\n                                                              T.&#x5B;Default] ,\r\n                                                              T.&#x5B;DataType] ,\r\n                                                              T.&#x5B;DataType_CastGroup]\r\n                                                              FROM\r\n                                                              #Src_Tgt_Tables T ( NOLOCK )\r\n                                                              WHERE\r\n                                                              T.Src_Tgt_Flag = 'S'\r\n                                                              AND T.Data_Obj_Id = TS.S_Data_Obj_Id\r\n                                                              ) SC ON SC.Column_Name = TC.Column_Name\r\n                                           WHERE    TC.Src_Tgt_Flag = 'T'\r\n                                                    AND TC.Data_Obj_Id = TS.T_Data_Obj_Id\r\n                                                    AND TC.IsIdentity &amp;lt;&amp;gt; 1 -- Ignore identity\r\n                                                    AND TC.IsComputed &amp;lt;&amp;gt; 1 -- and computed columns\r\n                                           ORDER BY TC.Column_Id\r\n                                         FOR\r\n                                           XML PATH('')\r\n                                         ), 1, 1, ''), '&amp;amp;#x0D;', '') ,\r\n            @SourceColSet = REPLACE(STUFF(( SELECT  ',' + CHAR(10)\r\n                                                    + ISNULL(CAST(SC.New_Column_Name AS VARCHAR(100)),\r\n                                                             CAST(TC.Column_Name AS VARCHAR(100)))\r\n                                            FROM    #Src_Tgt_Tables TC ( NOLOCK )\r\n                                                    LEFT JOIN ( SELECT\r\n                                                              T.&#x5B;Data_Obj_Id] ,\r\n                                                              T.&#x5B;Column_Name] ,\r\n                                                              &#x5B;New_Column_Name] = --T.&#x5B;Column_Name],\r\n                                                              CASE\r\n                                                              WHEN T.RemoteObject_DataType = 'enum'\r\n                                                              THEN 'CAST('\r\n                                                              + T.&#x5B;Column_Name]\r\n                                                              + ' as char) as '\r\n                                                              + T.&#x5B;Column_Name] \r\n                                                              WHEN T.RemoteObject_DataType = 'bytea'\r\n                                                              THEN 'CAST('\r\n                                                              + T.&#x5B;Column_Name]\r\n                                                              + ' as char(4000)) as '\r\n                                                              + T.&#x5B;Column_Name]\r\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t  ELSE T.&#x5B;Column_Name]\r\n                                                              END ,\r\n                                                              T.&#x5B;IsNullable] ,\r\n                                                              T.&#x5B;Default] ,\r\n                                                              T.&#x5B;DataType] ,\r\n                                                              T.&#x5B;DataType_CastGroup] ,\r\n                                                              T.&#x5B;RemoteObject_DataType]\r\n                                                              FROM\r\n                                                              #Src_Tgt_Tables T ( NOLOCK )\r\n                                                              WHERE\r\n                                                              T.Src_Tgt_Flag = 'S'\r\n                                                              AND T.Data_Obj_Id = TS.S_Data_Obj_Id\r\n                                                              ) SC ON SC.Column_Name = TC.Column_Name\r\n                                            WHERE   TC.Src_Tgt_Flag = 'T'\r\n                                                    AND TC.Data_Obj_Id = TS.T_Data_Obj_Id\r\n                                                    AND TC.IsIdentity &amp;lt;&amp;gt; 1 -- Ignore identity\r\n                                                    AND TC.IsComputed &amp;lt;&amp;gt; 1 -- and computed columns\r\n                                            ORDER BY TC.Column_Id\r\n                                          FOR\r\n                                            XML PATH('')\r\n                                          ), 1, 1, ''), '&amp;amp;#x0D;', '')\r\n    FROM    ( SELECT    &#x5B;T_Data_Obj_Id] = T.Data_Obj_Id ,\r\n                        &#x5B;T_Schema_Object_Name] = T.Schema_Object_Name ,\r\n                        &#x5B;S_Schema_Object_Name] = ( SELECT TOP 1\r\n                                                            S.Schema_Object_Name\r\n                                                   FROM     #Src_Tgt_Tables S\r\n                                                   WHERE    S.Src_Tgt_Flag = 'S'\r\n                                                 ) ,\r\n                        &#x5B;S_Data_Obj_Id] = ( SELECT TOP 1\r\n                                                    S.Data_Obj_Id\r\n                                            FROM    #Src_Tgt_Tables S\r\n                                            WHERE   S.Src_Tgt_Flag = 'S'\r\n                                          )\r\n              FROM      #Src_Tgt_Tables T\r\n              WHERE     T.Src_Tgt_Flag = 'T'\r\n            ) TS\t\t\r\n\r\n\r\n\tIF @IsDebugMode = 1\r\n\t\tBEGIN\r\n\t\t\tSELECT @TargetColSet\tAS Target_Column_Set\r\n\t\t\tSELECT @UpdateColSet\tAS Updat_Column_Set\r\n\t\t\tSELECT @ValueColSet\t\tAS Value_Column_Set\r\n\t\t\tSELECT @SourceColSet\tAS Source_Column_Set\r\n\t\tEND\r\n\t\r\n\r\n\r\n\t\t\tIF OBJECT_ID('tempdb..#TempSourceFieldsPivot') IS NOT NULL\r\n\t\t\t\tDROP TABLE #TempSourceFieldsPivot;\r\n\t\t\tCREATE TABLE #TempSourceFieldsPivot\r\n            (\r\n              ColName\t\t\tVARCHAR(1024) ,\r\n              MySQLOutput\t\tVARCHAR(255) ,\r\n              MSSQLOutput\t\tVARCHAR(255) \r\n            );\r\n\t\t\tDECLARE @X XML;\r\n\t\t\tSET @X = CAST('&amp;lt;A&amp;gt;' + REPLACE(REPLACE(REPLACE(@SourceColSet,char(13),''),char(10),''), ',', '&amp;lt;\/A&amp;gt;&amp;lt;A&amp;gt;') + '&amp;lt;\/A&amp;gt;' AS XML);\r\n\t\t\tINSERT  INTO #TempSourceFieldsPivot( ColName)\r\n            SELECT  t.value('.', 'VARCHAR(max)')\r\n            FROM    @x.nodes('\/A') AS x ( t );\r\n\r\n\r\n\t\t\tIF @IsDebugMode = 1\r\n\t\t\t\tBEGIN\r\n\t\t\t\t\tSELECT  *\r\n\t\t\t\t\tFROM    #TempSourceFieldsPivot;\r\n\t\t\t\tEND;\r\n\r\n\t\t\tUPDATE  #TempSourceFieldsPivot\r\n\t\t\tSET     ColName = REPLACE(REPLACE(ColName, ']', ''), '&#x5B;', '');\r\n\r\n\t\t\tUPDATE  #TempSourceFieldsPivot\r\n\t\t\tSET     MySQLOutput\t\t\t= b.mysql_version ,\r\n\t\t\t\t\tMSSQLOutput\t\t\t= c.mssql_version \r\n\t\t\tFROM    #TempSourceFieldsPivot a\r\n\t\t\t\t\tLEFT JOIN dbo.vw_MysqlReservedWords b\t\tON UPPER(LTRIM(RTRIM(a.ColName))) = UPPER(LTRIM(RTRIM(b.reserved_word)))\r\n\t\t\t\t\tLEFT JOIN dbo.vw_MssqlReservedWords c\t\tON UPPER(LTRIM(RTRIM(a.ColName))) = UPPER(LTRIM(RTRIM(c.reserved_word)));\r\n\t\t\t\t\t\r\n\r\n\t\t\tIF @IsDebugMode = 1\r\n\t\t\t\tBEGIN\r\n\t\t\t\t\tSELECT  *\r\n\t\t\t\t\tFROM    #TempSourceFieldsPivot;\r\n\t\t\t\tEND;\r\n\r\n\t\t\t\r\n\t\t\t\t\t\tSELECT  @SourceColSetReplaceRemote = 'OPENQUERY(' + @Remote_Server_Name\r\n\t\t\t\t\t\t\t\t+ ', ''SELECT '\r\n\t\t\t\t\t\t\t\t+ ( SELECT DISTINCT\r\n\t\t\t\t\t\t\t\tSTUFF(( SELECT  ',' + u.ColName\r\n\t\t\t\t\t\t\t\t\t\tFROM    (\tSELECT    COALESCE(MySQLOutput, ColName) AS ColName\r\n\t\t\t\t\t\t\t\t\t\t\t\t\tFROM      #TempSourceFieldsPivot\r\n\t\t\t\t\t\t\t\t\t\t\t\t) u\r\n\t\t\t\t\t\t\t\t\t\tWHERE   u.ColName = ColName\r\n\t\t\t\t\t\t\t\t\t--order by u.ColName\r\n\t\t\t\t\t\t\t\t\tFOR\r\n                                    XML PATH('')\r\n                                  ), 1, 1, '') AS list\r\n\t\t\t\t\t\tFROM    (\tSELECT    COALESCE(MySQLOutput, ColName) AS ColName\r\n\t\t\t\t\t\t\t\t\tFROM      #TempSourceFieldsPivot\r\n\t\t\t\t\t\t\t\t) a\r\n\t\t\t\t\t\tGROUP BY ColName\r\n\t\t\t\t\t\t\t\t\t) + ' FROM ' + @Remote_DB_Object_Name + ''')'\r\n\t\t\t\t\t\t\r\n\r\n\t\t\tIF @IsDebugMode = 1\r\n\t\t\t\tBEGIN\r\n\t\t\t\t\tSELECT  @SourceColSetReplaceRemote AS Replacement_Column_list\r\n\t\t\t\tEND;\r\n\r\n\r\n\t\t\tIF OBJECT_ID('tempdb..#TempSourceFieldsPivot') IS NOT NULL\r\n\t\t\t\tDROP TABLE #TempSourceFieldsPivot;\r\n\t\t\r\n\r\n\r\n    SELECT  @MergeSQL = REPLACE(@MergeSQL, '{UPDATE_COLUMN_SET}',@UpdateColSet)\r\n\t\t\t\t\t\t\t\t\r\n    SELECT  @MergeSQL = REPLACE(@MergeSQL, '{TARGET_COLUMN_SET}',@TargetColSet)\r\n\t\t\t\t\t\t\t\t\r\n    SELECT  @MergeSQL = REPLACE(@MergeSQL, '{SOURCE_COLUMN_SET}',@SourceColSet)\r\n\t\t\t\t\t\t\t\r\n    SELECT  @MergeSQL = REPLACE(@MergeSQL, '{VALUE_COLUMN_SET}', @ValueColSet)\r\n\r\n    SELECT  @MergeSQL = REPLACE(@MergeSQL, '{NK.S_Schema_Object_Name}', @SourceColSetReplaceRemote)\r\n\t\r\n\t\t\t\t\t\r\n\tIF @IsDebugMode = 1\r\n\t\tBEGIN\r\n\t\t\tPRINT 'SQL statement for the final ''MERGE'' statement:'\r\n\t\t\tPRINT '----------------------------------------------------------------------------------------------'\r\n\t\t\tPRINT @MergeSQL +REPLICATE(CHAR(13),2) \r\n\t\tEND\t\t\t\t\t\t\t\t\r\n\t\t\t\t\t\t\t\t\r\n\/*======================================================================================\r\n            EXECUTE MERGE STATEMENT AND CHECK FOR EXECUTION RESULTS                         \r\n======================================================================================*\/\r\n \r\n    DECLARE \r\n\t\t@UpdatedCount INT ,\r\n        @InsertedCount INT ,\r\n        @DeletedCount INT ,\r\n        @StartTime DATETIME ,\r\n        @EndTime DATETIME\r\n\r\n    IF OBJECT_ID('tempdb..#SummaryOfChanges') IS NOT NULL\r\n        BEGIN\r\n            DROP TABLE &#x5B;#SummaryOfChanges]\r\n        END \r\n    CREATE TABLE #SummaryOfChanges\r\n        (\r\n          Action_Name VARCHAR(50)\r\n        );\r\n    SET @StartTime = GETDATE()\r\n\r\n    IF @Err_Msg IS NULL\r\n        BEGIN TRY\r\n            BEGIN TRANSACTION         \r\n            EXEC sp_executesql @MergeSQL\r\n\r\n            SELECT  @UpdatedCount = SUM(CASE WHEN Action_Name = 'UPDATE'\r\n                                             THEN 1\r\n                                             ELSE 0\r\n                                        END) ,\r\n                    @InsertedCount = SUM(CASE WHEN Action_Name = 'INSERT'\r\n                                              THEN 1\r\n                                              ELSE 0\r\n                                         END) ,\r\n                    @DeletedCount = SUM(CASE WHEN Action_Name = 'DELETE'\r\n                                             THEN 1\r\n                                             ELSE 0\r\n                                        END)\r\n            FROM    #SummaryOfChanges\r\n     \r\n            IF @IsDebugMode = 1\r\n                BEGIN\r\n                    SELECT  @UpdatedCount AS Records_Updated\r\n                    SELECT  @InsertedCount AS Records_Inserted\r\n                    SELECT  @DeletedCount AS Records_Deleted\r\n                END\r\n            COMMIT TRANSACTION\r\n        END TRY\r\n        BEGIN CATCH\r\n            ROLLBACK TRANSACTION;\r\n            WITH    TempErr ( \r\n\t\t\t\t\t\t\t&#x5B;ErrorNumber], \r\n\t\t\t\t\t\t\t&#x5B;ErrorSeverity], \r\n\t\t\t\t\t\t\t&#x5B;ErrorState], \r\n\t\t\t\t\t\t\t&#x5B;ErrorLine], \r\n\t\t\t\t\t\t\t&#x5B;ErrorMessage], \r\n\t\t\t\t\t\t\t&#x5B;ErrorDateTime], \r\n\t\t\t\t\t\t\t&#x5B;LoginName], \r\n\t\t\t\t\t\t\t&#x5B;UserName], \r\n\t\t\t\t\t\t\t&#x5B;PackageName], \r\n\t\t\t\t\t\t\t&#x5B;ObjectID], \r\n\t\t\t\t\t\t\t&#x5B;ProcessID], \r\n\t\t\t\t\t\t\t&#x5B;ExecutionInstanceGUID], \r\n\t\t\t\t\t\t\t&#x5B;DBName] \r\n\t\t\t\t\t\t\t)\r\n                      AS ( SELECT   ERROR_NUMBER()\t\t\tAS ErrorNumber\t\t,\r\n                                    ERROR_SEVERITY()\t\tAS ErrorSeverity\t,\r\n                                    ERROR_STATE()\t\t\tAS ErrorState\t\t,\r\n                                    ERROR_LINE()\t\t\tAS ErrorLine\t\t,\r\n                                    ERROR_MESSAGE()\t\t\tAS ErrorMessage\t\t,\r\n                                    SYSDATETIME()\t\t\tAS ErrorDateTime\t,\r\n                                    SYSTEM_USER\t\t\t\tAS LoginName\t\t,\r\n                                    USER_NAME()\t\t\t\tAS UserName\t\t\t,\r\n                                    @Package_Name\t\t\tAS PackageName\t\t,\r\n                                    OBJECT_ID('' + @Target_DB_Name + '.'\r\n                                              + @Target_DB_Schema_Name + '.'\r\n                                              + @Target_DB_Object_Name + '') AS ObjectID ,\r\n                                    ( SELECT    a.objectid\r\n                                      FROM      sys.dm_exec_requests r\r\n                                                CROSS   APPLY sys.dm_exec_sql_text(r.sql_handle) a\r\n                                      WHERE     session_id = @@spid\r\n                                    ) AS ProcessID ,\r\n                                    @Exec_Instance_GUID\t\tAS ExecutionInstanceGUID ,\r\n                                    DB_NAME()\t\t\t\tAS DatabaseName\r\n                         )\r\n                INSERT  INTO AdminDBA.dbo.LogSSISErrors_Error\r\n                        ( &#x5B;ErrorNumber] ,\r\n                          &#x5B;ErrorSeverity] ,\r\n                          &#x5B;ErrorState] ,\r\n                          &#x5B;ErrorLine] ,\r\n                          &#x5B;ErrorMessage] ,\r\n                          &#x5B;ErrorDateTime] ,\r\n                          &#x5B;FKLoginID] ,\r\n                          &#x5B;FKUserID] ,\r\n                          &#x5B;FKPackageID] ,\r\n                          &#x5B;FKObjectID] ,\r\n                          &#x5B;FKProcessID] ,\r\n                          &#x5B;ExecutionInstanceGUID]\r\n                        )\r\n                        SELECT  ErrorNumber\t\t\t\t= COALESCE(err.ErrorNumber, -1) ,\r\n                                ErrorSeverity\t\t\t= COALESCE(err.&#x5B;ErrorSeverity], -1) ,\r\n                                ErrorState\t\t\t\t= COALESCE(err.&#x5B;ErrorState], -1) ,\r\n                                ErrorLine\t\t\t\t= COALESCE(err.&#x5B;ErrorLine], -1) ,\r\n                                ErrorMessage\t\t\t= COALESCE(err.&#x5B;ErrorMessage], 'Unknown') ,\r\n                                ErrorDateTime\t\t\t= ErrorDateTime ,\r\n                                FKLoginID\t\t\t\t= src_login.ID ,\r\n                                FKUserID\t\t\t\t= src_user.ID ,\r\n                                &#x5B;FKPackageID]\t\t\t= src_package.ID ,\r\n                                &#x5B;FKObjectID]\t\t\t= src_object.ID ,\r\n                                &#x5B;FKProcessID]\t\t\t= src_process.ID ,\r\n                                &#x5B;ExecutionInstanceGUID] = err.ExecutionInstanceGUID\r\n                        FROM    TempErr err\r\n                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Login src_login ON err.LoginName = src_login.LoginName\r\n                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_User src_user ON err.UserName = src_user.UserName\r\n                                                              AND src_user.FKDBID = \r\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t(\tSELECT ID FROM\r\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\tAdminDBA.dbo.LogSSISErrors_DB db\r\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\tWHERE\r\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\tdb.DBName = err.DBName\r\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t)\r\n                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Package src_package ON err.PackageName = \r\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t( LEFT(src_package.PackageName, CHARINDEX('.', src_package.PackageName)- 1) )\r\n                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Object src_object ON err.ObjectID = src_object.ObjectID\r\n                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Process src_process ON err.ProcessID = src_process.ProcessID\r\n                        WHERE   src_login.CurrentlyUsed = 1\r\n                                AND src_user.CurrentlyUsed = 1\r\n                                --AND src_package.CurrentlyUsed = 1\r\n                                AND src_object.CurrentlyUsed = 1\r\n                                AND src_process.CurrentlyUsed = 1   \t\t\t\t\t\t\t                                                 \r\n        END CATCH    \r\n\tEND\r\n\r\n<\/pre>\n<p style=\"text-align: justify;\">Another important aspect to highlight is the provision of error logging architecture. The BEGIN CATCH\u2026END CATCH statement enables a robust error handling capabilities in case unexpected event occurrence, logging error massages in AdminDBA database for future reporting and troubleshooting. In this way, even in spite of errors typically halting acquisition process completely, the package can continue its execution, recovering from error events gracefully and transparently. Since MERGE SQL statement can reconcile small to medium size tables, I have found it to be a good alternative to target tables truncation and insertion.<\/p>\n<h3 style=\"text-align: center;\">Post-acquisition Tasks Overview and Code<\/h3>\n<p style=\"text-align: justify;\">Continuing on, now that we have the nuts and bolts of our data acquisition process out of the way we can move into outlining post-acquisition tasks.\u00a0 As with all the activities preceding source-to-target data coping, some clean-up and maintenance tasks need to be run to finalise acquisition and tie up all the loose ends e.g. indexes re-creation, statistics update, error log checking etc.<\/p>\n<p style=\"text-align: justify;\">Firstly, previously dropped indexes can be re-created using the same stored procedure we used in <a href=\"http:\/\/bicortex.com\/designing-data-acquisition-framework-in-sql-server-and-ssis-how-to-source-and-integrate-external-data-for-a-decision-support-system-or-data-warehouse-part-2\/\" target=\"_blank\">part 2<\/a>. The only difference is the variable @Create_Drop_Idxs value which can now be set to CREATE, rather than DROP. Next, given the potentially considerable data and values distribution change resulting from new data being added, we will update all the tables\u2019 statistics using the following stored procedure.<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nUSE StagingDB;\r\nGO\r\n\r\nCREATE PROCEDURE &#x5B;dbo].&#x5B;usp_runUpdateStagingDBStatistics]\r\n    (\r\n      @Target_DB_Name VARCHAR(128) ,\r\n      @Target_DB_Schema_Name VARCHAR(128) ,\r\n      @Is_All_OK INT OUTPUT ,\r\n      @Error_Message VARCHAR(MAX) OUTPUT ,\r\n      @Process_Name VARCHAR(250) OUTPUT \r\n    )\r\n    WITH RECOMPILE\r\nAS\r\n    SET NOCOUNT ON;\t\r\n    BEGIN\r\n        DECLARE @ID TINYINT;\r\n        DECLARE @IsDebugMode BIT;\r\n        DECLARE @StartDateTime DATETIME = SYSDATETIME();\r\n        DECLARE @TableName VARCHAR(256);\r\n        DECLARE @TableSchemaName VARCHAR(128);\r\n        DECLARE @SQL VARCHAR(2056);\r\n\r\n        SET @Process_Name = ( SELECT    OBJECT_NAME(objectid)\r\n                              FROM      sys.dm_exec_requests r\r\n                                        CROSS   APPLY sys.dm_exec_sql_text(r.sql_handle) a\r\n                              WHERE     session_id = @@spid\r\n                            );\r\n        SET @IsDebugMode = 1;\r\n\r\n        IF OBJECT_ID('tempdb..#stats_details') IS NOT NULL\r\n            BEGIN\r\n                DROP TABLE #stats_details;\r\n            END;\r\n        CREATE TABLE #stats_details\r\n            (\r\n              ID INT IDENTITY(1, 1)\r\n                     NOT NULL ,\r\n              TableName VARCHAR(256) NOT NULL ,\r\n              SchemaName VARCHAR(128) NOT NULL ,\r\n              IndexID INT NULL ,\r\n              Statistic VARCHAR(256) NOT NULL ,\r\n              ColumnsInStatistic VARCHAR(256) NOT NULL ,\r\n              WasAutoCreated TINYINT NOT NULL ,\r\n              WasUserCreated TINYINT NOT NULL ,\r\n              IsFiltered TINYINT NULL ,\r\n              FilterDefinition VARCHAR(256) NULL ,\r\n              IsTemporary TINYINT NULL ,\r\n              StatisticsUpdateDate DATETIME NULL\r\n            );\r\n\r\n        DECLARE db_statscursor CURSOR FORWARD_ONLY\r\n        FOR\r\n            SELECT  ROW_NUMBER() OVER ( ORDER BY t.TABLE_NAME ASC, t.TABLE_SCHEMA ASC ) AS ID ,\r\n                    t.TABLE_NAME ,\r\n                    t.TABLE_SCHEMA\r\n            FROM    INFORMATION_SCHEMA.TABLES t\r\n                    JOIN HNO_Control.dbo.Ctrl_RemoteSvrs_Tables2Process m ON t.TABLE_NAME = m.Local_Table_Name\r\n                                                              AND t.TABLE_SCHEMA = m.Local_Schema_Name\r\n                                                              --AND m.Remote_Server_Name = @Remote_Server_Name\r\n                                                              AND m.Local_DB_Name = @Target_DB_Name\r\n                                                              AND m.Local_Schema_Name = @Target_DB_Schema_Name\r\n            WHERE   t.TABLE_TYPE = 'BASE TABLE'\r\n                    AND m.Is_Active = 1;        \r\n\r\n        OPEN db_statscursor;\r\n        FETCH NEXT\r\n\t\tFROM db_statscursor INTO @ID, @TableName, @TableSchemaName;\r\n        WHILE @@FETCH_STATUS = 0\r\n            BEGIN\r\n                SET @SQL = 'UPDATE STATISTICS ' + @TableSchemaName + '.'\r\n                    + @TableName + ' WITH FULLSCAN';\r\n\t\t\t\t\r\n                IF @IsDebugMode = 1\r\n                    BEGIN\r\n                        PRINT @SQL;\r\n                    END;\r\n\r\n                EXEC(@SQL);\r\n\r\n                INSERT  INTO #stats_details\r\n                        SELECT  &#x5B;so].&#x5B;name] AS &#x5B;TableName] ,\r\n                                @TableSchemaName ,\r\n                                &#x5B;si].&#x5B;index_id] AS &#x5B;Index_ID] ,\r\n                                &#x5B;ss].&#x5B;name] AS &#x5B;Statistic] ,\r\n                                STUFF(( SELECT  ', ' + &#x5B;c].&#x5B;name]\r\n                                        FROM    &#x5B;sys].&#x5B;stats_columns] &#x5B;sc]\r\n                                                JOIN &#x5B;sys].&#x5B;columns] &#x5B;c] ON &#x5B;c].&#x5B;column_id] = &#x5B;sc].&#x5B;column_id]\r\n                                                              AND &#x5B;c].&#x5B;object_id] = &#x5B;sc].&#x5B;object_id]\r\n                                        WHERE   &#x5B;sc].&#x5B;object_id] = &#x5B;ss].&#x5B;object_id]\r\n                                                AND &#x5B;sc].&#x5B;stats_id] = &#x5B;ss].&#x5B;stats_id]\r\n                                        ORDER BY &#x5B;sc].&#x5B;stats_column_id]\r\n                                      FOR\r\n                                        XML PATH('')\r\n                                      ), 1, 2, '') AS &#x5B;ColumnsInStatistic] ,\r\n                                &#x5B;ss].&#x5B;auto_created] AS &#x5B;WasAutoCreated] ,\r\n                                &#x5B;ss].&#x5B;user_created] AS &#x5B;WasUserCreated] ,\r\n                                &#x5B;ss].&#x5B;has_filter] AS &#x5B;IsFiltered] ,\r\n                                &#x5B;ss].&#x5B;filter_definition] AS &#x5B;FilterDefinition] ,\r\n                                &#x5B;ss].&#x5B;is_temporary] AS &#x5B;IsTemporary] ,\r\n                                STATS_DATE(&#x5B;so].&#x5B;object_id], stats_id) AS &#x5B;StatisticsUpdateDate]\r\n                        FROM    &#x5B;sys].&#x5B;stats] &#x5B;ss]\r\n                                JOIN &#x5B;sys].&#x5B;objects] AS &#x5B;so] ON &#x5B;ss].&#x5B;object_id] = &#x5B;so].&#x5B;object_id]\r\n                                JOIN &#x5B;sys].&#x5B;schemas] AS &#x5B;sch] ON &#x5B;so].&#x5B;schema_id] = &#x5B;sch].&#x5B;schema_id]\r\n                                LEFT OUTER JOIN &#x5B;sys].&#x5B;indexes] AS &#x5B;si] ON &#x5B;so].&#x5B;object_id] = &#x5B;si].&#x5B;object_id]\r\n                                                              AND &#x5B;ss].&#x5B;name] = &#x5B;si].&#x5B;name]\r\n                        WHERE   &#x5B;so].&#x5B;object_id] = OBJECT_ID(N''\r\n                                                             + @TableSchemaName\r\n                                                             + '.'\r\n                                                             + @TableName + '')\r\n                        ORDER BY &#x5B;ss].&#x5B;user_created] ,\r\n                                &#x5B;ss].&#x5B;auto_created] ,\r\n                                &#x5B;ss].&#x5B;has_filter];\t\t\t\t\t\t\t\t\t\t\r\n\r\n                FETCH NEXT    FROM db_statscursor INTO @ID, @TableName,\r\n                    @TableSchemaName;\r\n            END;\r\n        CLOSE db_statscursor;\r\n        DEALLOCATE db_statscursor;  \r\n\t\t\r\n        IF @IsDebugMode = 1\r\n            BEGIN\r\n                SELECT  *\r\n                FROM    #stats_details;\r\n            END;  \r\n\r\n        IF EXISTS ( SELECT  1\r\n                    FROM    ( SELECT    TableName ,\r\n                                        SchemaName ,\r\n                                        StatisticsUpdateDate\r\n                              FROM      #stats_details sd\r\n                                        JOIN INFORMATION_SCHEMA.TABLES t ON sd.TableName = t.TABLE_NAME\r\n                                                              AND t.TABLE_SCHEMA = sd.SchemaName\r\n                              WHERE     StatisticsUpdateDate NOT BETWEEN @StartDateTime\r\n                                                             AND\r\n                                                              SYSDATETIME()\r\n                            ) a )\r\n            BEGIN \r\n                SET @Error_Message = 'Statistics on ''' + @Target_DB_Name\r\n                    + ''' database for ''' + @Target_DB_Schema_Name\r\n                    + ''' schema could not be updated due to an error. Please troubleshoot.'\r\n                    + CHAR(10);\t\t\t\t\t\t\t\t\t\t\r\n                SET @Is_All_OK = 0;\r\n            END;\r\n        ELSE\r\n            BEGIN\r\n                SET @Is_All_OK = 1;\r\n                SET @Error_Message = 'All Good!';\r\n            END;\r\n\r\n        IF OBJECT_ID('tempdb..#stats_details') IS NOT NULL\r\n            BEGIN\r\n                DROP TABLE #stats_details;\r\n            END;\r\n    END;\r\nGO\r\n<\/pre>\n<p style=\"text-align: justify;\">Next step, executed as a script task, sets the value for the @Sync_Exec_EndTime variable. The @Sync_Exec_StartTime and @Sync_Exec_EndTime variables\u2019 values allow for establishing a time boundary between when the package commenced and completed its execution. This is important as error log checking that initiates as part of this framework towards the end of the execution workflow needs to be performed in the time window registered by @Sync_Exec_StartTime and @Sync_Exec_EndTime variables. In this way, we can pinpoint specific start and end time and make the process search for error log entries only between those exact times. I will enclose the actual code in <a href=\"http:\/\/bicortex.com\/designing-data-acquisition-framework-in-sql-server-and-ssis-how-to-source-and-integrate-external-data-for-a-decision-support-system-or-data-warehouse-part-4\/\" target=\"_blank\">part 4<\/a> to this series when describing the overall SSIS package structure.<\/p>\n<p style=\"text-align: justify;\">Moving on, we can perform some rudimentary validation steps e.g. record count between the source and target tables with the help of another stored procedure. As with most of the tasks in this framework, this part can be easily omitted but since a simple record count is the least we can do to confirm target-to-source data consistency, it is worthwhile to include this or similar check at the end of the process (providing the source database is not being written to during package execution). The following stored procedure compares source and target record counts for each table.<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nUSE &#x5B;StagingDB];\r\nGO\r\n\r\n\r\nCREATE PROCEDURE &#x5B;dbo].&#x5B;usp_checkRemoteSvrDBvsLocalDBRecCounts]\r\n    (\r\n      @Remote_Server_Name VARCHAR(256) ,\r\n      @Remote_Server_DB_Name VARCHAR(128) ,\r\n      @Remote_Server_DB_Schema_Name VARCHAR(128) ,\r\n      @Target_DB_Name VARCHAR(128) ,\r\n      @Is_All_OK INT OUTPUT ,\r\n      @Process_Name VARCHAR(250) OUTPUT ,\r\n      @Error_Message VARCHAR(MAX) OUTPUT\r\n    )\r\nAS\r\n    SET NOCOUNT ON;\r\n    BEGIN\r\n        SET @Process_Name = ( SELECT    OBJECT_NAME(objectid)\r\n                              FROM      sys.dm_exec_requests r\r\n                                        CROSS   APPLY sys.dm_exec_sql_text(r.sql_handle) a\r\n                              WHERE     session_id = @@spid\r\n                            );     \r\n\r\n        IF OBJECT_ID('tempdb..#TempTbl') IS NOT NULL\r\n            BEGIN\r\n                DROP TABLE #TempTbl;\r\n            END; \r\n        CREATE TABLE #TempTbl\r\n            (\r\n              ID INT IDENTITY(1, 1) ,\r\n              TableName VARCHAR(256) NOT NULL ,\r\n              TableSchemaName VARCHAR(50) NOT NULL ,\r\n              LocalOrRemote VARCHAR(32) NOT NULL ,\r\n              RecordCount BIGINT NOT NULL\r\n            );    \r\n        DECLARE @ID TINYINT;\r\n        DECLARE @Table_Name VARCHAR(256);\r\n        DECLARE @Table_Schema_Name VARCHAR(128);\r\n        DECLARE @SQL VARCHAR(2056);\r\n\r\n        DECLARE db_idxcursor CURSOR FORWARD_ONLY\r\n        FOR\r\n            SELECT  ROW_NUMBER() OVER ( ORDER BY t.TABLE_NAME ASC, t.TABLE_SCHEMA ASC ) AS ID ,\r\n                    t.TABLE_NAME ,\r\n                    t.TABLE_SCHEMA\r\n            FROM    INFORMATION_SCHEMA.TABLES t\r\n                    JOIN HNO_Control.dbo.Ctrl_RemoteSvrs_Tables2Process m ON t.TABLE_NAME = m.Local_Table_Name\r\n                                                              AND t.TABLE_SCHEMA = m.Local_Schema_Name\r\n                                                              AND m.Remote_Server_Name = @Remote_Server_Name\r\n                                                              AND t.TABLE_CATALOG = @Target_DB_Name\r\n            WHERE   t.TABLE_TYPE = 'BASE TABLE'\r\n                    AND m.Is_Active = 1;        \r\n\r\n        OPEN db_idxcursor;\r\n        FETCH NEXT\r\n\t\t\t FROM db_idxcursor INTO @ID, @Table_Name, @Table_Schema_Name;\r\n        WHILE @@FETCH_STATUS = 0\r\n            BEGIN\r\n                SET @SQL = 'INSERT INTO #TempTbl (TableName, TableSchemaName, LocalOrRemote, RecordCount)\r\n\t\t\t\t\t\t\tSELECT ''' + @Table_Name + ''','''\r\n                    + @Table_Schema_Name\r\n                    + ''' , ''Remote'', * FROM OPENQUERY('\r\n                    + @Remote_Server_Name + ',''select count(1) as ct from '\r\n                    + @Remote_Server_DB_Schema_Name + '.' + @Table_Name + ''')\r\n\t\t\t\t\t\t\tUNION ALL\r\n\t\t\t\t\t\t\tSELECT ''' + @Table_Name + ''', '''\r\n                    + @Table_Schema_Name + ''',''Local'', COUNT(1) \r\n\t\t\t\t\t\t\tFROM ' + @Target_DB_Name + '.'\r\n                    + @Table_Schema_Name + '.' + @Table_Name + '';\r\n                EXEC(@SQL);\r\n                FETCH NEXT    FROM db_idxcursor INTO @ID, @Table_Name,\r\n                    @Table_Schema_Name;\r\n            END;\r\n        CLOSE db_idxcursor;\r\n        DEALLOCATE db_idxcursor;\r\n\r\n        DECLARE @DiffSourceTarget TABLE\r\n            (\r\n              TableName VARCHAR(512) ,\r\n              RecordCount INT\r\n            );\r\n        INSERT  INTO @DiffSourceTarget\r\n                ( TableName ,\r\n                  RecordCount\r\n                )\r\n                SELECT  TableName ,\r\n                        RecordCount\r\n                FROM    #TempTbl\r\n                WHERE   LocalOrRemote = 'Local'\r\n                EXCEPT\r\n                SELECT  TableName ,\r\n                        RecordCount\r\n                FROM    #TempTbl\r\n                WHERE   LocalOrRemote = 'Remote';\r\n\r\n\r\n\r\n        DECLARE @tablesListSource VARCHAR(MAX) = ( SELECT   STUFF(( SELECT\r\n                                                              ', ' + TableName\r\n                                                              FROM\r\n                                                              @DiffSourceTarget\r\n                                                              FOR\r\n                                                              XML\r\n                                                              PATH('')\r\n                                                              ), 1, 1, '')\r\n                                                 );\r\n        IF EXISTS ( SELECT  1\r\n                    FROM    ( SELECT    TableName ,\r\n                                        RecordCount\r\n                              FROM      #TempTbl\r\n                              WHERE     LocalOrRemote = 'Local'\r\n                              EXCEPT\r\n                              SELECT    TableName ,\r\n                                        RecordCount\r\n                              FROM      #TempTbl\r\n                              WHERE     LocalOrRemote = 'Remote'\r\n                            ) a )\r\n            BEGIN \r\n                SET @Error_Message = 'Post-reconciliation record count between local and remote objects is different for the following tables:'\r\n                    + CHAR(10);\r\n                SET @Error_Message = @Error_Message + '' + @tablesListSource;\t\t\t\t\t\t\t\t\t\t\r\n                SET @Is_All_OK = 0;\r\n            END;\r\n        ELSE\r\n            BEGIN\r\n                SET @Is_All_OK = 1;\r\n                SET @Error_Message = 'All Good!';\r\n            END;\r\n\r\n        IF OBJECT_ID('tempdb..#TempTbl') IS NOT NULL\r\n            BEGIN\r\n                DROP TABLE #TempTbl;\r\n            END; \t\t\t\t\t\t\t\r\n    END;\r\nGO\r\n<\/pre>\n<p style=\"text-align: justify;\">Following on, we continue with more validations and check and interrogate the error logging database for any entries that may have occurred during acquisition. As you saw in <a href=\"http:\/\/bicortex.com\/designing-data-acquisition-framework-in-sql-server-and-ssis-how-to-source-and-integrate-external-data-for-a-decision-support-system-or-data-warehouse-part-2\/\" target=\"_blank\">part 2<\/a> of this series as well as in the small tables acquisition stored procedure above, the code is designed in a manner which does not stall or stop the process from continuing in the event of data coping failure. Rather, it logs any unexpected errors in the AdminDBA database and continues in a looping fashion until all tables are accounted for. This mechanism prevents the process from falling over in the event an exception was raised so even though the package may have reported a successful completion status, it is possible an errors was raised and needs to be addressed. The next piece of code queries the AdminDBA database for any entries that occurred between the package execution start time and end time to report any discrepancies.<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nUSE &#x5B;StagingDB]\r\nGO\r\n\r\n\r\nCREATE PROCEDURE &#x5B;dbo].&#x5B;usp_checkRemoteSvrDBvsLocalDBSyncErrors]\r\n    (\r\n      @Sync_Exec_StartTime\t\t\tDATETIME ,\r\n      @Sync_Exec_EndTime\t\t\tDATETIME ,\r\n      @Is_All_OK\t\t\t\t\tINT\t\t\t\t\t\t\tOUTPUT ,\r\n\t  @Error_Message\t\t\t\tVARCHAR\t\t\t\t(MAX)\tOUTPUT,\r\n\t  @Process_Name\t\t\t\t\tVARCHAR\t\t\t\t(250)\tOUTPUT \r\n    )\r\n    WITH RECOMPILE\r\nAS\r\n    SET NOCOUNT ON\t\r\n    BEGIN\r\n\tSET @Process_Name = ( SELECT    OBJECT_NAME(objectid)\r\n                              FROM      sys.dm_exec_requests r\r\n                                        CROSS   APPLY sys.dm_exec_sql_text(r.sql_handle) a\r\n                              WHERE     session_id = @@spid\r\n                            )\r\n        IF EXISTS ( SELECT TOP 1\r\n                            1\r\n                    FROM    &#x5B;AdminDBA].&#x5B;dbo].&#x5B;LogSSISErrors_Error]\r\n                    WHERE   ErrorDateTime BETWEEN @Sync_Exec_StartTime\r\n                                          AND     @Sync_Exec_EndTime )\r\n            BEGIN \r\n                SET @Is_All_OK = 0\r\n\t\t\t\tSET @Error_Message = 'Errors were raised during data acquisition process. '\t\t\t\t\t\t\t+CHAR(10)\r\n\t\t\t\tSET @Error_Message = @Error_Message+ 'A detailed log has been saved in AdminDBA database. '\t\t\t+CHAR(10) \r\n\t\t\t\tSET @Error_Message = @Error_Message+ 'Click on the link above to access error instances report '\t+CHAR(10)\r\n\t\t\t\tSET @Error_Message = @Error_Message+ 'or query the database directly to troubleshoot further.' \r\n            END\r\n        ELSE\r\n            BEGIN\r\n                SET @Is_All_OK = 1\r\n\t\t\t\tSET @Error_Message = 'All Good!'\r\n            END\r\n    END\r\nGO\r\n<\/pre>\n<p style=\"text-align: justify;\">Finally, nearly every transformation in the package links up with an email sending Execute SQL Task, notifying administrator(s) of any runtime issues that may arise. These tasks execute the \u2018usp_sendBIGroupETLFailMessage\u2018 stored procedure outlined in <a href=\"http:\/\/bicortex.com\/designing-data-acquisition-framework-in-sql-server-and-ssis-how-to-source-and-integrate-external-data-for-a-decision-support-system-or-data-warehouse-part-1\/\" target=\"_blank\">part 1<\/a> of this series, which in turn send out an e-mail with an appropriate message content to nominated e-mail addresses. Whether the execution of this stored procedure is triggered or not entirely depends on the value of @Is_All_OK variable (included in most stored procedures outlined in this series as an OUTPUT) thus the precedence constraint path the package selects i.e. if the @Is_All_OK variable is 1, next transformation is triggered, if @Is_All_OK variable is 0, error-raising stored procedure is run. Also, since Visual Studio development environment does not allow a single e-mail sending task to be linked up with multiple transformations, unfortunately, each of them has to be duplicated, introducing unneeded redundancy and making the package layout messy and cluttered. Beyond this pain point, the rest is fairly straightforward with details of the actual SSIS package outlined in the next post to this series in <a href=\"http:\/\/bicortex.com\/designing-data-acquisition-framework-in-sql-server-and-ssis-how-to-source-and-integrate-external-data-for-a-decision-support-system-or-data-warehouse-part-4\/\" target=\"_blank\">Part 4<\/a>.<\/p>\n<p style=\"text-align: justify;\">\n","protected":false},"excerpt":{"rendered":"<p>Note: Part 1 to this series can be found HERE, Part 2 HERE, Part 4 HERE and all the code and additional files for this post can be downloaded from my OneDrive folder HERE. Continuing on from Part 2 to this series, having set up all the routines dealing with pre-acquisition activities as well as [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[38,5,46,50],"tags":[10,49,19,13],"class_list":["post-2891","post","type-post","status-publish","format-standard","hentry","category-data-modelling","category-sql","category-sql-server","category-ssis","tag-data","tag-sql","tag-sql-server","tag-ssis"],"aioseo_notices":[],"jetpack_featured_media_url":"","_links":{"self":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/2891","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/comments?post=2891"}],"version-history":[{"count":12,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/2891\/revisions"}],"predecessor-version":[{"id":2978,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/2891\/revisions\/2978"}],"wp:attachment":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/media?parent=2891"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/categories?post=2891"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/tags?post=2891"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}