Enhancing SQL Server Integration Services (SSIS) Functionality Through Python Scripting

August 8th, 2015 / No Comments » / by admin

Ever since Python started to gain traction in the developers community as one of the most versatile and easy to learn programming language, Microsoft has made great strides to make it a first-class citizen in its ecosystem. Python Tools for Visual Studio, tight Python libraries integration with the .NET framework in IronPython project, the availability of the Python SDK on Windows Azure or even the latest addition of Python in Azure Machine Learning service are just a few examples of how Microsoft takes Python popularity more seriously these days. However, when it comes to tools such as Integration Services, C# and VB.NET are the only two options provided to extend built-in SSIS functionality out of the box. Whilst both languages are powerful enough to give the BI developers the tools needed to supplement default SSIS transformations, the entry barrier is often too high and reserved for those commanding a good understanding of .NET libraries and C# of VB.NET syntax. Besides, both languages are often perceived as too verbose to use to build simple scripts dealing with rudimentary tasks such as reading and writing text files, working with Excel files, sending e-mails etc. That’s where high-level languages shine and that’s where Python often comes in handy in my experience.

SSIS execute Process Task allows the package to run Win32 executables or batch files. In this way, providing we have Python installed and added to the path, we can run Python scripts either through (1) directly invoking Python interpreter and passing the Python script name as an argument or (2) by wrapping it the script in a .bat file. Alternatively, (3) we can ‘freeze’ Python code as an executable – an option I have also explored in this post. Firstly, let’s explore options 1 and 2 and create a sample Python file in a C:\ directory along with a batch file to wrap the script in. Python script will be called PyScript1.py whereas the batch file – BatPyScript1.bat

#Python code saved as PyScript1.py
print ('This is how PRINT statement works in Python')
input('Press ENTER to continue...')

#Batch file code to execute the above Python code
#saved as BatPyScript1.bat under directly C:\ drive
python c:\PyScript1.py

The above code is very rudimentary and does not provide any functionality besides printing out a dummy string in a console window but is a good example of how python scripts can be invoked from the Execute Process Task. The below video clip outlines how PyScript1.py Python script can be triggered using SQL Server Integration Services task. Also, notice that FailTaskIfReturnCodeIsNotSuccessValue parameter needs to be set to False if we execute the script from a batch file.

Python being a general-purpose, high-level programming language can support or perform a variety of tasks to enhance ETL development (not only SSIS). Perceived by many as easy to learn, super productive and with a generous Standard Library (batteries included), Python can provide a great deal of functionality with hardly any code. Let’s explore a few examples of how Python can be used to scrape some data off the website, save it in a CSV file, transfer it into the SQL Server database and finally generate and email some basic reports based on this data.

Let’s assume that we would like to scrape stock prices data of the following website (click on image to expand).

Python_Scripting_ETL_Extension_Stock_Website_View

There are several libraries which can be used for web scraping but the most popular by far and the one we’re using here is Beautiful Soup – a Python package for parsing HTML and XML documents (including having malformed markup, i.e. non-closed tags, so named after Tag soup). The columns we are interested in are the company code and the share price only. As part of this exercise we will also add an ‘extract date and time’ column to be able to pinpoint when the data was scrapped. The data will be saved in C:\Imports folder as a CSV file with an extracted date and time as part of the file name.

from urllib.request import urlopen
import csv
import datetime
import time
import os
from bs4 import BeautifulSoup
savePathImports = 'C:\\Imports'
if not os.path.exists(savePathImports):
    os.makedirs(savePathImports)
urlToOpen = 'http://www.marketindex.com.au/asx100'
soup = BeautifulSoup(urlopen(urlToOpen).read())
filename = time.strftime("extract_%d%b%Y_%H%M%S.csv")
csvFile = open(savePathImports + '\\' + filename,'wt',newline='')
writer = csv.writer(csvFile)
writer.writerow(["Company Code", "Share Price", "Extract DateTime"])
try:
    for row in soup("table", {"id": "asx_sp_table" })[0].tbody('tr'):
        tds = row('td')
        csvRow = []
        csvRow.append(tds[1].string)
        csvRow.append(tds[3].string.replace("$", ""))
        csvRow.append(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        writer.writerow(csvRow)
finally:
    csvFile.close()

Further on, now that we have the data scrapped, let’s assume that rather then storing it in a CSV file, we would like to insert it into the SQL Server database. There are many good libraries that can enable Python to SQL Server instance interface – in this example I used PyPyODBC purely because of good Python 3.4 support. The below snippet creates ‘stock_prices’ table on the default ‘dbo’ schema, inserts the data from the CSV file created by the code above and appends ‘insert_datetime’ data to the table.

import pypyodbc
import datetime
import glob
import csv
import os

#define SQL Server database connection details,
#establish the connection and drop/create 'stock_prices' table
conn = pypyodbc.connect('Driver={SQL Server};'
                        'Server=ServerName\InstanceName;'
                        'Database=DatabaseName;'
                        'uid=Login;pwd=Password')
cur = conn.cursor()
cur.execute('''IF OBJECT_ID(N'dbo.stock_prices', N'U') IS NOT NULL
	        DROP TABLE dbo.stock_prices;
            CREATE TABLE stock_prices
            (id int IDENTITY (1,1),
            company_code CHAR (3) NOT NULL,
            share_price DECIMAL(6,2) NOT NULL,
            extract_datetime DATE NOT NULL,
            insert_datetime DATE);
            ''')
cur.commit()

#import data from CSV file into the SQL Server table skipping
#first/header row and append current 'time stamp' column
filesLocation = 'C:\\Imports'
import_data = []
current_timestamp = str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
os.chdir(filesLocation)
for csvFile in glob.glob("*.csv"):
    with open(csvFile, 'r') as f:
        reader = csv.reader(f)
        next(reader, None)
        import_data = [tuple(line + [current_timestamp]) for line in csv.reader(f)]

cur.executemany('''INSERT INTO dbo.stock_prices
                (company_code,share_price,extract_datetime,insert_datetime)
                VALUES (?,?,?,?)''',import_data)
cur.commit()

#close database connection
conn.close()

There is no point in storing the data in a database (or flat files) if you can’t do anything with it. The next snippet of code extracts the stock prices data from the database and creates a very simple Excel report using openpyxl library. First let’s create a sample empty Excel spreadsheet file in the Report subdirectory of Imports folder. Next, let’s assume that the report requires us to provide the data for the top 15 most expensive stocks on the market for the data we have scraped. On top of that we would like to have it graphed in the same workbook and e-mailed to a recipient. While SQL Server built-in features can partially cater for these requirements e.g. SQL Server database mail can handle e-mail distribution functionality, creating reports with built-in graphs requires a programmatic approach and that is where Python’s simplicity and flexibility shine.

The below code imports the data from SQL Server instance which has previously been scraped from the website featuring stock prices, imports it into the spreadsheet file, creates a simple bar graph from the first 2 columns (stock price and company code) and finally e-mails it to a recipient using smtplib library. A lot of functionality for a very small amount of code!

import openpyxl
import pypyodbc
import time
import os
import smtplib
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.utils import formatdate

#define SQL Server database connection details and report file path
savePathReport = 'C:\\Imports\Report'
xlsxReportName = 'samplereport.xlsx'
conn = pypyodbc.connect('Driver={SQL Server};'
                        'Server=ServerName\InstanceName;'
                        'Database=DatabaseName;'
                        'uid=Login;pwd=Password')
cur = conn.cursor()
os.chdir(savePathReport)

#open Excel spreadsheet, rename active workbook to 'samplereport_DDMonthYY'
wb = openpyxl.load_workbook(xlsxReportName)
sheet = wb.get_active_sheet()
wb.remove_sheet(sheet)
wb.create_sheet(title=time.strftime("samplereport_%d%b%Y"))
sheet = wb.get_active_sheet()

#get the data (share price, company code and date for the top 15 most expensive
# shares out of SQL Server table and insert it into the report/spreadsheet
cur.execute("""SELECT TOP 15 share_price,
               company_code, extract_datetime,
               rank() OVER (ORDER BY share_price DESC) +1 as row_number
               FROM [AdminDBA].[dbo].[stock_prices]
               ORDER BY share_price DESC""")
dbTblHeader = [item[0] for item in cur.description]
del dbTblHeader[-1]
sheet.append(dbTblHeader)
for row in cur.fetchall():
    sheet['A' + str(row[3])] = float(row[0])
    sheet['B' + str(row[3])] = row[1]
    sheet['C' + str(row[3])] = row[2]

#create a simple bar graph from the report data on the same workbook
values = openpyxl.charts.Reference(sheet, (2, 1), (16, 1))
labels = openpyxl.charts.Reference(sheet, (2, 2), (16, 2))
seriesObj = openpyxl.charts.Series(values, labels = labels, title='Stock Prices as at '+time.strftime("%d-%b-%Y"))
chartObj = openpyxl.charts.BarChart()
chartObj.append(seriesObj)
chartObj.drawing.top = 5
chartObj.drawing.left = 300
chartObj.drawing.width = 500
chartObj.drawing.height = 315
sheet.add_chart(chartObj)
wb.save(xlsxReportName)
cur.close()
conn.close()

#send e-mail to designated e-mail address
#with Excel spreadsheet attachment
attachmentPath = savePathReport+'\\'+xlsxReportName
sender = 'senders_email_address@domain_name.com'
recipient = 'recipient_email_address@domain_name.com'
sendersEmailLogin = 'Password'
HOST = "smtp.gmail.com" #other smtp providers include outlook/hotmail @ smtp-mail.outlook.com or Yahoo Mail @ smtp.mail.yahoo.com
msg = MIMEMultipart()
msg["From"] = sender
msg["To"] = recipient
msg["Subject"] = "Share Prices report for " + time.strftime("%d-%b-%Y")
msg['Date'] = formatdate(localtime=True)
part = MIMEBase('application', "octet-stream")
part.set_payload( open(attachmentPath,"rb").read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', 'attachment; filename="%s"' % os.path.basename(attachmentPath))
msg.attach(part)
server = smtplib.SMTP(HOST, 587)
server.ehlo()
server.starttls()
server.login(sender, sendersEmailLogin)
server.sendmail(sender, recipient, msg.as_string())
server.close()

The final output in C:\Imports\Report folder should be an Excel spreadsheet containing the data and a simple graph as per the image below.

Python_Scripting_ETL_Extension_Excel_Report_View

While Python can easily be installed on any system (some even coming with Python pre-installed), under some circumstances it may not be desirable or permitted to install third party software on certain environments. In these cases Python files can be compiled into executable files (DOS, OpenVMS, Microsoft Windows, Symbian or OS/2.) using a few different utilities e.g. py2exe or cx_freeze.

Let’s combine the above code into one executable using py2exe. py2exe is a Python extension which converts Python scripts (.py) into Microsoft Windows executables (.exe). These executables can run on a system without Python installed and the process is very straightforward. Depending on which version of Python you run, you will need to download applicable version of py2exe. As my installation is version 3.4 I downloaded it from HERE. All we need to do now is to create a ‘setup’ Python script, saving it in the same folder as the python file that needs to be converted and invoke py2exe from the command line. This will create a sub-folder called ‘dist’ where the executable will be created. The following code is the simplest version of the setup.py file.

from distutils.core import setup
import py2exe
setup(console=["filename.py"])

Once saved in a file we can reference it in the terminal with the following command.

python setup.py py2exe

Below is the output generated by this process (as my default Python installation is a distribution from Continuum Analytics called Anaconda, you can see a lot of DLLs and Python modules being referenced as part of the compilation).

Python_Scripting_ETL_Extension_Py2Exe_Terminal_View

Once the executable file is generated we can easily reference it as part of the ETL process, calling the file in the Execute Process Task transformation or even through xp_cmdshell form an SQL statement.

SQL Server Integration Services functionality allow the developers to complete a vast plethora of tasks and further extend its capabilities through .NET languages implementation, however, Python’s sublime flexibility coupled with the ease of development not only provide somewhat gentler entry point into the programming realm but also allow for complex task execution with minimal development input.

Tags: , , ,

SSIS Packages Execution Workflow Control – How To Run a SSIS Package Based on Another Package Execution Status

July 20th, 2015 / No Comments » / by admin

SQL Server Integration Services architecture offers an easy way to impose workflow control based on individual transformation execution status through precedence constraints implementation. If, for example, a transformation fails during package execution and the subsequent tasks don’t require to be executed as a result of this, precedence constraint allows for the package workflow to be re-directed to an error-handling routine without failing the package as a whole. Precedence constraints also provide a rudimentary expression evaluation engine which further extends their functionality through the ability to utilise system or user-created variables in conjunction with various functions.

This functionality works well in the context of a single, self-contained package but what if we would like to execute a package or a package’s task based on the execution status of another, different package? One potential scenario that I found this solution useful for was during a data warehouse reload, where based on the data acquisition package execution status i.e. success or failure, the subsequent data warehouse re-load package had ‘the green light’ (or otherwise) to proceed with the data warehouse refresh. This functionality can be achieved by means of maintaining a log table with the packages execution results for each run but since SQL Server is more than capable to look after SQL Server Agent metadata, it’s probably easier to source this information straight out of msdb system database.

Let’s look at a sample example. First, let’s create a sample SQL Server database with one dummy table in it and a package called ‘SampleTask1’ which we also deploy as a SQL Server agent job. The package itself runs some simple SQL code so its functionality is limited to either failing or succeeding based on whether the embedded SELECT SQL statement is dividing by 0 or 1 as per image below.

USE [master];
GO
IF EXISTS ( SELECT  name
            FROM    sys.databases
            WHERE   name = N'SampleDB' )
    BEGIN
-- Close connections to the DW_Sample database
        ALTER DATABASE [SampleDB] SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
        DROP DATABASE [SampleDB];
    END;
GO
CREATE DATABASE [SampleDB] ON PRIMARY
( NAME = N'SampleDB'
, FILENAME = N'C:\DB_Files\SampleDB.mdf'
, SIZE = 10MB
, MAXSIZE = 1GB
, FILEGROWTH = 10MB ) LOG ON
( NAME = N'SampleDB_log'
, FILENAME = N'C:\DB_Files\SampleDB_log.LDF'
, SIZE = 1MB
, MAXSIZE = 1GB
, FILEGROWTH = 10MB);
GO
--Assign database ownership to login SA
EXEC SampleDB.dbo.sp_changedbowner @loginame = N'SA', @map = false;
GO
--Change the recovery model to BULK_LOGGED
ALTER DATABASE [SampleDB] SET RECOVERY BULK_LOGGED;
GO
USE SampleDB;
GO
--Create a sample table inside the SampleDB database
CREATE TABLE [dbo].[dummy_table]
    (
      [dummy_record] [DECIMAL](18, 4) NULL
    )
ON  [PRIMARY];
GO

SSIS_Exec_Precedence_SampleTask1SQLStatement

The important aspect is that when executed via the SQL Agent job, some of its execution metadata e.g. last execution status can be read from msdb database by querying its system tables e.g. mdsb.dbo.sysjobs, msdb.dbo.sysjobschedules. The values from those tables can be easily translated into variables and used to control package execution workflow using SSIS script component. Further, let’s assume that the next package we would like to start running (let’s call it ‘SampleTask2’) depends directly on the outcome of ‘SampleTask1’ package successful execution. To examine this we will create a sample Script Task and create a few variables which will be passed and read from during this task execution:

  • first_package_execution_status – exexecution status of the first package (represented as Int32 data type). This variable will guide the execution flow of the subsequent Script Tasks responsible for displaying message box popup
  • first_package_name – name of the first package
  • output_message – message displayed as a result of first package execution status i.e. ‘SampleTask1 Failed!’ or ‘SampleTask1 Succeeded!’
  • server_name – SQL Server instance name where the first package is executed on

SSIS_Exec_Precedence_SampleTask2Variables

Depending on whether ‘SampleTask1’ (executed as a SQL Server Agent job) failed or succeeded, ‘SampleTask2’ package control flow logic should adjust accordingly. The core logic governing the control flow is created by a simple snippet of C#.

#region Help:  Introduction to the script task
/* The Script Task allows you to perform virtually any operation that can be accomplished in
 * a .Net application within the context of an Integration Services control flow.
 *
 * Expand the other regions which have "Help" prefixes for examples of specific ways to use
 * Integration Services features within this script task. */
#endregion

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.Data.SqlClient;
using System.Data.OleDb;
using System.Data.Common;
using Wrap = Microsoft.SqlServer.Dts.Runtime.ManagedWrapper;
#endregion

namespace ST_44c20947eb6c4c5cb2f698bdd17b3534
{
    ///
<summary>
    /// ScriptMain is the entry point class of the script.  Do not change the name, attributes,
    /// or parent of this class.
    /// </summary>

    [Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
    public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
    {
        #region Help:  Using Integration Services variables and parameters in a script
        /* To use a variable in this script, first ensure that the variable has been added to
         * either the list contained in the ReadOnlyVariables property or the list contained in
         * the ReadWriteVariables property of this script task, according to whether or not your
         * code needs to write to the variable.  To add the variable, save this script, close this instance of
         * Visual Studio, and update the ReadOnlyVariables and
         * ReadWriteVariables properties in the Script Transformation Editor window.
         * To use a parameter in this script, follow the same steps. Parameters are always read-only.
         *
         * Example of reading from a variable:
         *  DateTime startTime = (DateTime) Dts.Variables["System::StartTime"].Value;
         *
         * Example of writing to a variable:
         *  Dts.Variables["User::myStringVariable"].Value = "new value";
         *
         * Example of reading from a package parameter:
         *  int batchId = (int) Dts.Variables["$Package::batchId"].Value;
         *
         * Example of reading from a project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].Value;
         *
         * Example of reading from a sensitive project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].GetSensitiveValue();
         * */

        #endregion

        #region Help:  Firing Integration Services events from a script
        /* This script task can fire events for logging purposes.
         *
         * Example of firing an error event:
         *  Dts.Events.FireError(18, "Process Values", "Bad value", "", 0);
         *
         * Example of firing an information event:
         *  Dts.Events.FireInformation(3, "Process Values", "Processing has started", "", 0, ref fireAgain)
         *
         * Example of firing a warning event:
         *  Dts.Events.FireWarning(14, "Process Values", "No values received for input", "", 0);
         * */
        #endregion

        #region Help:  Using Integration Services connection managers in a script
        /* Some types of connection managers can be used in this script task.  See the topic
         * "Working with Connection Managers Programatically" for details.
         *
         * Example of using an ADO.Net connection manager:
         *  object rawConnection = Dts.Connections["Sales DB"].AcquireConnection(Dts.Transaction);
         *  SqlConnection myADONETConnection = (SqlConnection)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Sales DB"].ReleaseConnection(rawConnection);
         *
         * Example of using a File connection manager
         *  object rawConnection = Dts.Connections["Prices.zip"].AcquireConnection(Dts.Transaction);
         *  string filePath = (string)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Prices.zip"].ReleaseConnection(rawConnection);
         * */
        #endregion

        ///
<summary>
        /// This method is called when this script task executes in the control flow.
        /// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
        /// To open Help, press F1.
        /// </summary>

        ///
        public void Main()
        {

            bool fireAgain = true;
            //Dts.Events.FireInformation(0, "Test1", "Val1", String.Empty, 0, ref fireAgain);

            try
            {
                var conn1 = new OleDbConnection();
                string sqlServerInstance = Dts.Variables["server_name"].Value.ToString();
                conn1.ConnectionString =
                              "Driver=SQLOLEDB;" +
                              "Data Source=" + sqlServerInstance + ";" +
                              "Provider=SQLOLEDB;" +
                              "Initial Catalog=msdb;" +
                              "Integrated Security=SSPI;";
                conn1.Open();
                OleDbDataReader myDataReader1 = null;
                OleDbCommand LRO_Conn = new OleDbCommand(@"SELECT CAST(sjs.last_run_outcome as INT) as LRO
                                                                FROM msdb.dbo.sysjobs sj
                                                                LEFT JOIN msdb.dbo.sysjobservers  sjs ON sj.job_id = sjs.job_id
                                                                WHERE sj.name = ?
                                                                AND sj.enabled = 1", conn1);

                LRO_Conn.Parameters.Add("@name", OleDbType.VarChar, 150).Value = Dts.Variables["first_package_name"].Value;

                myDataReader1 = LRO_Conn.ExecuteReader();
                while (myDataReader1.Read())
                {
                    Dts.Variables["first_package_execution_status"].Value = myDataReader1["LRO"];
                }
                myDataReader1.Close();
                conn1.Close();
                bool b1 = Convert.ToBoolean(Dts.Variables["first_package_execution_status"].Value);
                if (b1 != true)
                {
                    Dts.Variables["output_message"].Value = "SampleTask1 Failed!";
                    Dts.Variables["first_package_execution_status"].Value = 0;
                }
                else
                {
                    Dts.Variables["output_message"].Value = "SampleTask1 Succeeded!";
                    Dts.Variables["first_package_execution_status"].Value = 1;
                }
            }
            catch (Exception e)
            {
                Dts.Events.FireInformation(0, "Exception occured: ", e.ToString(), String.Empty, 0, ref fireAgain);
            }
        }
        #region ScriptResults declaration
        ///
<summary>
        /// This enum provides a convenient shorthand within the scope of this class for setting the
        /// result of the script.
        ///
        /// This code was generated automatically.
        /// </summary>

        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion
    }
}

Most important part is the SQL statement which queries msdb database for ‘SampleTask1’ package execution metadata and populates the variables used for further execution with the respective values. Below is a screenshot of how the variables defined earlier are assigned their respective values based on their function during the first Script Task execution.

SSIS_Exec_Precedence_SampleTask2ModifiedWriteVariablesMapping

The subsequent Script Task components simply displays message box window with the ‘output_message’ variable value to visually determine whether the preceding package was executed successfully or not. Below is a sample footage depicting this solution final execution output (in this case first handling SampleTask1 failure notification followed by the successful execution pop-up alert). You can also download all the solution files for this exercise from my OneDrive folder HERE.

In my professional practice I have always liked to combine this logic with the error logging solution which I have written about extensively in my previous blog posts HERE and HERE. In this way I can create a robust architecture which not only queries the msdb database for metadata but also checks if there were any errors encountered during package execution. In that way, even if the package executed successfully I can pick up on potential issues which wouldn’t necessarily cause the execution failure but may be critical enough to decide on subsequent packages execution.

To demonstrate this let’s use our sample database created previously and create a stored procedure which would include the error-invoking SQL code (dividing by zero). This stored procedure will be run by a modified version of ‘SampleTask1’ package in order to map and pass a couple of system parameters which would constitute the error logging metadata i.e. ExecutionInstanceGUID and PackageName as per the image below.

USE SampleDB;
GO
CREATE PROCEDURE usp_divide_by_zero
    (
      @Exec_Instance_GUID UNIQUEIDENTIFIER ,
      @Package_Name VARCHAR(256)
    )
AS
    BEGIN
        DECLARE @Target_DB_Name VARCHAR(128) = 'SampleDB';
        DECLARE @Target_DB_Schema_Name VARCHAR(56) = 'dbo';
        DECLARE @Target_DB_Object_Name VARCHAR(128) = 'dummy_table';

        BEGIN TRY
            BEGIN TRANSACTION;
            INSERT  INTO SampleDB.dbo.dummy_table
                    ( dummy_record )
                    SELECT  1 / 0;
            COMMIT TRANSACTION;
        END TRY

        BEGIN CATCH
            ROLLBACK TRANSACTION;
            WITH    TempErr (
							[ErrorNumber],
							[ErrorSeverity],
							[ErrorState],
							[ErrorLine],
							[ErrorMessage],
							[ErrorDateTime],
							[LoginName],
							[UserName],
							[PackageName],
							[ObjectID],
							[ProcessID],
							[ExecutionInstanceGUID],
							[DBName] )
                      AS ( SELECT   ERROR_NUMBER() AS ErrorNumber ,
                                    ERROR_SEVERITY() AS ErrorSeverity ,
                                    ERROR_STATE() AS ErrorState ,
                                    ERROR_LINE() AS ErrorLine ,
                                    ERROR_MESSAGE() AS ErrorMessage ,
                                    SYSDATETIME() AS ErrorDateTime ,
                                    SYSTEM_USER AS LoginName ,
                                    USER_NAME() AS UserName ,
                                    @Package_Name ,
                                    OBJECT_ID('' + @Target_DB_Name + '.'
                                              + @Target_DB_Schema_Name + '.'
                                              + @Target_DB_Object_Name + '') AS ObjectID ,
                                    ( SELECT    a.objectid
                                      FROM      sys.dm_exec_requests r
                                                CROSS   APPLY sys.dm_exec_sql_text(r.sql_handle) a
                                      WHERE     session_id = @@spid
                                    ) AS ProcessID ,
                                    @Exec_Instance_GUID AS ExecutionInstanceGUID ,
                                    DB_NAME() AS DatabaseName
                         )
                INSERT  INTO AdminDBA.dbo.LogSSISErrors_Error
                        ( [ErrorNumber] ,
                          [ErrorSeverity] ,
                          [ErrorState] ,
                          [ErrorLine] ,
                          [ErrorMessage] ,
                          [ErrorDateTime] ,
                          [FKLoginID] ,
                          [FKUserID] ,
                          [FKPackageID] ,
                          [FKObjectID] ,
                          [FKProcessID] ,
                          [ExecutionInstanceGUID]
                        )
                        SELECT  ErrorNumber = COALESCE(err.ErrorNumber, -1) ,
                                ErrorSeverity = COALESCE(err.[ErrorSeverity],
                                                         -1) ,
                                ErrorState = COALESCE(err.[ErrorState], -1) ,
                                ErrorLine = COALESCE(err.[ErrorLine], -1) ,
                                ErrorMessage = COALESCE(err.[ErrorMessage],
                                                        'Unknown') ,
                                ErrorDateTime = ErrorDateTime ,
                                FKLoginID = src_login.ID ,
                                FKUserID = src_user.ID ,
                                [FKPackageID] = src_package.ID ,
                                [FKObjectID] = src_object.ID ,
                                [FKProcessID] = src_process.ID ,
                                [ExecutionInstanceGUID] = err.ExecutionInstanceGUID
                        FROM    TempErr err
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Login src_login ON err.LoginName = src_login.LoginName
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_User src_user ON err.UserName = src_user.UserName
                                                              AND src_user.FKDBID = ( SELECT
                                                              ID
                                                              FROM
                                                              AdminDBA.dbo.LogSSISErrors_DB db
                                                              WHERE
                                                              db.DBName = err.DBName
                                                              )
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Package src_package ON err.PackageName = ( LEFT(src_package.PackageName,
                                                              CHARINDEX('.',
                                                              src_package.PackageName)
                                                              - 1) )
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Object src_object ON err.ObjectID = src_object.ObjectID
                                LEFT JOIN AdminDBA.dbo.LogSSISErrors_Process src_process ON err.ProcessID = src_process.ProcessID
                        WHERE   src_login.CurrentlyUsed = 1
                                AND src_user.CurrentlyUsed = 1
                                --AND src_package.CurrentlyUsed = 1
                                AND src_object.CurrentlyUsed = 1
                                AND src_process.CurrentlyUsed = 1;
        END CATCH;
    END;

SSIS_Exec_Precedence_SampleTask1ModifiedParametersMapping

Providing we have the AdminDBA database created (see my previous posts HERE and HERE), the BEGIN TRY … END TRY error handling code in the stored procedure should now capture the ‘divide by zero’ statement and log it into the AdminDBA database, in spite of the package reporting successful execution (see image below).

SSIS_Exec_Precedence_AdminDBAErrorLogView

Armed with this knowledge we can now determine further course of action for subsequent packages using the following code (handles both, msdb metadata and detailed error logging metadata).

#region Help:  Introduction to the script task
/* The Script Task allows you to perform virtually any operation that can be accomplished in
 * a .Net application within the context of an Integration Services control flow.
 *
 * Expand the other regions which have "Help" prefixes for examples of specific ways to use
 * Integration Services features within this script task. */
#endregion

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.Data.SqlClient;
using System.Data.OleDb;
using System.Data.Common;
using Wrap = Microsoft.SqlServer.Dts.Runtime.ManagedWrapper;
#endregion

namespace ST_44c20947eb6c4c5cb2f698bdd17b3534
{
    ///
<summary>
    /// ScriptMain is the entry point class of the script.  Do not change the name, attributes,
    /// or parent of this class.
    /// </summary>

    [Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
    public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
    {
        #region Help:  Using Integration Services variables and parameters in a script
        /* To use a variable in this script, first ensure that the variable has been added to
         * either the list contained in the ReadOnlyVariables property or the list contained in
         * the ReadWriteVariables property of this script task, according to whether or not your
         * code needs to write to the variable.  To add the variable, save this script, close this instance of
         * Visual Studio, and update the ReadOnlyVariables and
         * ReadWriteVariables properties in the Script Transformation Editor window.
         * To use a parameter in this script, follow the same steps. Parameters are always read-only.
         *
         * Example of reading from a variable:
         *  DateTime startTime = (DateTime) Dts.Variables["System::StartTime"].Value;
         *
         * Example of writing to a variable:
         *  Dts.Variables["User::myStringVariable"].Value = "new value";
         *
         * Example of reading from a package parameter:
         *  int batchId = (int) Dts.Variables["$Package::batchId"].Value;
         *
         * Example of reading from a project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].Value;
         *
         * Example of reading from a sensitive project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].GetSensitiveValue();
         * */

        #endregion

        #region Help:  Firing Integration Services events from a script
        /* This script task can fire events for logging purposes.
         *
         * Example of firing an error event:
         *  Dts.Events.FireError(18, "Process Values", "Bad value", "", 0);
         *
         * Example of firing an information event:
         *  Dts.Events.FireInformation(3, "Process Values", "Processing has started", "", 0, ref fireAgain)
         *
         * Example of firing a warning event:
         *  Dts.Events.FireWarning(14, "Process Values", "No values received for input", "", 0);
         * */
        #endregion

        #region Help:  Using Integration Services connection managers in a script
        /* Some types of connection managers can be used in this script task.  See the topic
         * "Working with Connection Managers Programatically" for details.
         *
         * Example of using an ADO.Net connection manager:
         *  object rawConnection = Dts.Connections["Sales DB"].AcquireConnection(Dts.Transaction);
         *  SqlConnection myADONETConnection = (SqlConnection)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Sales DB"].ReleaseConnection(rawConnection);
         *
         * Example of using a File connection manager
         *  object rawConnection = Dts.Connections["Prices.zip"].AcquireConnection(Dts.Transaction);
         *  string filePath = (string)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Prices.zip"].ReleaseConnection(rawConnection);
         * */
        #endregion

        ///
<summary>
        /// This method is called when this script task executes in the control flow.
        /// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
        /// To open Help, press F1.
        /// </summary>

        ///
        public void Main()
        {

            bool fireAgain = true;
            try
            {
                var conn1 = new OleDbConnection();
                string sqlServerInstance = Dts.Variables["server_name"].Value.ToString();
                conn1.ConnectionString =
                              "Driver=SQLOLEDB;" +
                              "Data Source=" + sqlServerInstance + ";" +
                              "Provider=SQLOLEDB;" +
                              "Initial Catalog=msdb;" +
                              "Integrated Security=SSPI;";
                conn1.Open();
                OleDbDataReader myDataReader1 = null;
                OleDbCommand LRO_Conn = new OleDbCommand(@"SELECT CAST(sjs.last_run_outcome as INT) as LRO
                                                                FROM msdb.dbo.sysjobs sj
                                                                LEFT JOIN msdb.dbo.sysjobservers  sjs ON sj.job_id = sjs.job_id
                                                                WHERE sj.name = ?
                                                                AND sj.enabled = 1", conn1);

                LRO_Conn.Parameters.Add("@name", OleDbType.VarChar, 150).Value = Dts.Variables["first_package_name"].Value;

                myDataReader1 = LRO_Conn.ExecuteReader();
                while (myDataReader1.Read())
                {
                    Dts.Variables["first_package_execution_status"].Value = myDataReader1["LRO"];
                }
                myDataReader1.Close();
                conn1.Close();
                Dts.Events.FireInformation(0, "first_package_execution_status variable value", Dts.Variables["first_package_execution_status"].Value.ToString(), String.Empty, 0, ref fireAgain);

                var conn2 = new OleDbConnection();
                string sqlServerInstance2 = Dts.Variables["server_name"].Value.ToString();
                conn2.ConnectionString =
                              "Driver=SQLOLEDB;" +
                              "Data Source=" + sqlServerInstance2 + ";" +
                              "Provider=SQLOLEDB;" +
                              "Initial Catalog=AdminDBA;" +
                              "Integrated Security=SSPI;";
                conn2.Open();

                OleDbDataReader myDataReader2 = null;
                OleDbCommand LRE_Conn = new OleDbCommand(@"SELECT	CASE WHEN EXISTS(
							                               SELECT * FROM
							                                        (SELECT TOP 1 e.ErrorMessage, e.ErrorDateTime,
							                                        LEFT(p.packagename+'.', CHARINDEX('.',p.packagename+'.')-1) AS packagename
							                                        FROM [AdminDBA].[dbo].[LogSSISErrors_Error] e
							                                        JOIN [AdminDBA].[dbo].[LogSSISErrors_Package] p
							                                        ON e.fkpackageid = p.id) a
							                                        WHERE a.packagename = ? AND a.ErrorDateTime BETWEEN
							                                        DATEADD(DAY, DATEDIFF(DAY, '19000101', GETDATE()), '19000101')
							                                        AND SYSDATETIME())
			                                            THEN 0 ELSE 1 END AS LRE", conn2);

                LRE_Conn.Parameters.Add("@packagename", OleDbType.VarChar, 150).Value = Dts.Variables["first_package_name"].Value;

                myDataReader2 = LRE_Conn.ExecuteReader();
                while (myDataReader2.Read())
                {
                    Dts.Variables["first_package_error_report"].Value = myDataReader2["LRE"];
                }
                myDataReader2.Close();
                conn2.Close();
                Dts.Events.FireInformation(0, "first_package_error_report variable value", Dts.Variables["first_package_error_report"].Value.ToString(), String.Empty, 0, ref fireAgain);

                bool b1 = Convert.ToBoolean(Dts.Variables["first_package_execution_status"].Value);
                bool b2 = Convert.ToBoolean(Dts.Variables["first_package_error_report"].Value);
                if (b1 != true)
                {
                    Dts.Variables["output_message"].Value = "SampleTask1 Failed!";
                    Dts.Variables["first_package_execution_status"].Value = 0;
                }
                if (b2 != true)
                {
                    Dts.Variables["output_message"].Value = "SampleTask1 Failed!";
                    Dts.Variables["first_package_error_report"].Value = 0;
                }
                else
                {
                    Dts.Variables["output_message"].Value = "SampleTask1 Succeeded!";
                    Dts.Variables["first_package_execution_status"].Value = 1;
                    Dts.Variables["first_package_error_report"].Value = 1;
                }
            }
            catch (Exception e)
            {
                Dts.Events.FireInformation(0, "Exception occured: ", e.ToString(), String.Empty, 0, ref fireAgain);
            }
        }
        #region ScriptResults declaration
        ///
<summary>
        /// This enum provides a convenient shorthand within the scope of this class for setting the
        /// result of the script.
        ///
        /// This code was generated automatically.
        /// </summary>

        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion
    }
}

This code not only queries the msdb database to determine the last execution status for the ‘SampleTask1’ package but also checks for any runtime error logs stored in the AdminDBA database which were generated by it in the last 24 hours. In this way, even if the package executed successfully, any errors captured and logged during runtime can form the basis for further execution workflow changes and this determination can be evaluated for and processed on the fly.

Based on that functionality, the subsequent package design needs to encompass an additional variable hence the inclusion of ‘first_package_error_report’ variable into the variables collection, first script task and precedence constraints as per the image below.

SSIS_Exec_Precedence_SampleTask2ModifiedVariables

In order to confirm this functionality we can add an ‘OnPostExecute Event’ breakpoint to observe the variable value change during execution but a quicker way would be to add FireInformation method for each variable passed to the precedence constraints so that their values are displayed at runtime in the output pane using a small snippet of C# as per below.

Dts.Events.FireInformation(0, "first_package_execution_status variable value", Dts.Variables["first_package_execution_status"].Value.ToString(), String.Empty, 0, ref fireAgain);

SSIS_Exec_Precedence_SampleTask2ModifiedRunTimeVarsValueOutput

Once both queries in the Script Task execute and the variables responsible for the package execution metadata and error checking are populated with their respective values we can dictate further package execution workflow with much greater control i.e. in the above example, raise a ‘Task Failed’ message if either of those variables are assigned the value of 0.

All packages and their solution files as well as SQL scripts used in this post can be downloaded from my OneDrive account under the following LINK if you wish to replicate this functionality in your own practice.

Tags: , , , , ,