Tableau Server Workgroup PostgreSQL Database Schema And Data Synchronization

December 7th, 2015 / 3 Comments » / by admin

Last client who engaged me to architect and develop a small data warehouse for them also made a large investment in Tableau as their default reporting platform. The warehouse data, small by today’s standards, was to be uploaded into Tableau server as a scheduled overnight extract and users granted access to reports/data on Tableau server rather than querying star schema relational tables directly (implemented on the SQL Server database engine). As data availability and therefore meeting BI SLAs was paramount to the project success, a robust notification system was put in place to log any data warehouse issues that may have arisen from all activities on the database server. However, even though all precautions were taken to ensure data warehouse failed processes were logged and corresponding issues mitigated accordingly, Tableau extract failures along with other Tableau server activities were largely unaccounted for due to the lack of data. How could one address this issue?

Tableau provides access to their internal server PostgreSQL metadata database with just a few simple steps. As it turns out, during installation Tableau Server will create the almost empty ‘workgroup’ repository with over 100+ tables, 900+ columns (about 100 of them used as Keys), 300+ joins and 16+ views which can be accessed and queried. Tableau Server works as a collection processes, processors, programs and applications, like data engine, data server, VizQL Server, application server etc. Each of those processes generates log with data about user activities, data connections, queries and extractions, errors, views and interactions, etc. which is parsed regularly and stored into PostgreSQL-based Tableau Server Administrative Database. PostgreSQL Server containing Workgroup DB usually runs on the same Windows Server as Main Tableau Server or (if Tableau Server runs on multi-mode cluster with Worker Tableau Server(s)) on other Windows Server, which runs Worker Tableau Server and uses non-standard TCP/IP port 8060. Tableau’s PostgreSQL database access is provided by means of using a few different accounts, each with its own set of privileges. In November 2014 Tableau Software introduced (Release 8.2.5) a new, default user, named ‘readonly’ with read access to all tables and views of Workgroup Repository which is what I’m going to be using to get ‘under the hood’. Other user commonly used for Tableau server metadata exploration, aptly named ‘tableau’ can also be used for Tableau server activity analysis but has access to fewer database objects.

The easiest way to connect to Tableau’s ‘workgroup’ database using ‘readonly’ account is opening an administrator command prompt on your Tableau Server, navigating to your Tableau Server bin directory and issuing the tabadmin dbpass command, specifying your chosen password. After server restart the changes should take effect and you should be able to see the following output.

Tableau_Workgroup_DB_Sync_Enable_DB_Access

PostgreSQL can now be queried using a client tool of your choice e.g. PgAdmin. This is what the ‘public’ schema with all its tables looks like when imported in Navicat Data Modeller (click on image to enlarge).

Tableau_Workgroup_DB_Sync_DB_ERD_v9point1

In order to connect to it from SQL Server we can simply download PostgreSQL ODBC driver and configure it with the credentials of the ‘readonly’ user.

Tableau_Workgroup_DB_Sync_ODBC_Setup

All there is left to do is to create a linked server connection to PostgreSQL database directly from SQL Server Management Studio, exposing a collection of objects (tables and views) on the public schema.

Tableau_Workgroup_DB_Sync_MSSQL_LinkedSvr_Schema_View

Now we should be able to OPENQUERY Tableau metadata objects with ease but if we would like to go further and regularly copy Tableau’s data across to SQL Server (in case of my client that was precisely the requirement in order not to interfere with production database), the following code should provide this functionality.

Firstly, let’s create a sample database called ‘TableauDBCopy’ and a ‘tab’ schema on the SQL Server target instance. The below SQL snippet also creates and populates a small table called ‘tabSchemaObjectsExclude’ on the ‘dbo’ schema which stores table names we don’t want to import. The reason for this exclusion is that these tables do not contain the primary keys, therefore it is impossible to compare the two schemas using the code below which relies on primary key being defined on every single table object.

--CREATE 'TableauDBCopy' DATABASE AND 'tab' SCHEMA ON THE TARGET INSTANCE/SERVER
USE [master];
GO
IF EXISTS ( SELECT  name
            FROM    sys.databases
            WHERE   name = N'TableauDBCopy' )
    BEGIN
        ALTER DATABASE TableauDBCopy SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
        DROP DATABASE TableauDBCopy;
    END;
GO
CREATE DATABASE TableauDBCopy ON 
( NAME = 'TableauDBCopy_dat',
   FILENAME = 'D:\SQLData\MSSQL12.ServerName\MSSQL\DATA\TableauDBCopy.mdf',
   SIZE = 500MB,
   MAXSIZE = 2000MB,
   FILEGROWTH = 100 ) LOG ON
( NAME = 'TableauDBCopy_log',
   FILENAME = 'D:\SQLData\MSSQL12.ServerName\MSSQL\DATA\TableauDBCopy.ldf',
   SIZE = 100MB,
   MAXSIZE = 1000MB,
   FILEGROWTH = 50MB );
GO
EXEC TableauDBCopy.dbo.sp_changedbowner @loginame = N'SA', @map = false;
GO
ALTER DATABASE TableauDBCopy SET RECOVERY SIMPLE;
GO
USE TableauDBCopy;
GO
CREATE SCHEMA tab AUTHORIZATION dbo;
GO

--CREATE EXCEPTION 'tabSchemaObjectsExclude' TABLE AND POPULATE IT WITH EXCEPTION DATA
CREATE TABLE dbo.tabSchemaObjectsExclude
(ObjectName VARCHAR (256))
GO
INSERT INTO dbo.tabSchemaObjectsExclude( ObjectName )
SELECT 'dataengine_configurations'						UNION ALL
SELECT 'exportable_repository_id_columns'				UNION ALL
SELECT 'exportable_tables_column_transformations'		UNION ALL
SELECT 'monitoring_dataengine'							UNION ALL
SELECT 'monitoring_postgresql'							UNION ALL
SELECT 'permission_reasons'								UNION ALL
SELECT 'schema_migrations'								UNION ALL
SELECT 'users_view'

Next, let’s look at a simple stored procedure which compares the source schema (Tableau server public schema on the PostgreSQL database) with the target schema (our newly created SQL Server database with the ‘tab’ schema). This code is used to interrogate both databases for their ‘compatibility’ and tables metadata structure e.g. data types, character lengths, NULL-ablity, precision, scale etc. and if the target object(s) are found to be missing or out of sync with the source version, it creates a DROP and a CREATE table SQL DDLs statement on the fly and applies the changes directly in the target environment.

USE [TableauDBCopy]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE PROCEDURE [dbo].[usp_checkRemoteTableauServerTablesSchemaChanges]
    (
	  @Remote_Server_Name				VARCHAR			(256),
	  @Remote_Server_DB_Name			VARCHAR			(128),
	  @Remote_Server_DB_Schema_Name		VARCHAR			(128),
	  @Target_DB_Name					VARCHAR			(128),
	  @Target_DB_Schema_Name			VARCHAR			(128),
      @Is_All_OK						INT							OUTPUT ,
      @Process_Name						VARCHAR			(250)		OUTPUT ,
      @Error_Message					VARCHAR			(MAX)		OUTPUT
    )
    WITH RECOMPILE
AS
    SET NOCOUNT ON
    BEGIN
        DECLARE @Is_ReCheck BIT = 0
		DECLARE @SQL NVARCHAR (MAX)
        DECLARE @Is_Debug_Mode BIT = 1
		DECLARE @Remote_Server_Tableau	VARCHAR(55)		= 'TABPOSTGRESQLPROD'
        SET @Process_Name = ( SELECT    OBJECT_NAME(objectid)
                              FROM      sys.dm_exec_requests r
                                        CROSS   APPLY sys.dm_exec_sql_text(r.sql_handle) a
                              WHERE     session_id = @@spid
                            )
		
		IF OBJECT_ID('tempdb..#t_seqfloats') IS NOT NULL
            BEGIN
                DROP TABLE #t_seqfloats			
            END

		;WITH    Nbrs_3 ( n )
          AS ( SELECT   1
               UNION
               SELECT   0
             ),
        Nbrs_2 ( n )
          AS ( SELECT   1
               FROM     Nbrs_3 n1
                        CROSS JOIN Nbrs_3 n2
             ),
        Nbrs_1 ( n )
          AS ( SELECT   1
               FROM     Nbrs_2 n1
                        CROSS JOIN Nbrs_2 n2
             ),
        Nbrs_0 ( n )
          AS ( SELECT   1
               FROM     Nbrs_1 n1
                        CROSS JOIN Nbrs_1 n2
             ),
        Nbrs ( n )
          AS ( SELECT   1
               FROM     Nbrs_0 n1
                        CROSS JOIN Nbrs_0 n2
             )

			 SELECT    'float' + CAST(n AS VARCHAR(2)) seq_floats 
			 INTO #t_seqfloats
                              FROM      ( SELECT    ROW_NUMBER() OVER ( ORDER BY n )
                                          FROM      Nbrs
                                        ) D ( n )
                              WHERE     n <= 53
			UNION ALL 
			SELECT 'float'

        Check_RemoteSvr_Schema:
        IF OBJECT_ID('tempdb..#t_allTblMetadata') IS NOT NULL
            BEGIN
                DROP TABLE [#t_allTblMetadata]			
            END  
		CREATE TABLE tempdb..[#t_allTblMetadata]
		(
		table_name VARCHAR(256),
		column_name VARCHAR(256),
		ordinal_position INT ,
		is_nullable BIT ,
		data_type VARCHAR (256) ,
		character_maximum_length BIGINT,
		numeric_scale SMALLINT,
		numeric_precision SMALLINT,
		is_primary_key BIT,
		local_schema_name VARCHAR(55),
		remote_schema_name VARCHAR(55),
		local_or_remote VARCHAR(25) 
		)
	  
        SET @SQL = '
			INSERT INTO #t_allTblMetadata
				(
					[table_name]
					,[column_name]
					,[ordinal_position]
					,[is_nullable]
					,[data_type]
					,[character_maximum_length]
					,[numeric_scale]
					,[numeric_precision]
					,[is_primary_key]
					,[local_schema_name]
					,[remote_schema_name]
					,[local_or_remote]
				)
			SELECT 
			LTRIM(RTRIM(a.table_name))						AS table_name,
			LTRIM(RTRIM(a.column_name))						AS column_name, 
			LTRIM(RTRIM(a.ordinal_position))				AS ordinal_position,
			CASE WHEN a.is_nullable = ''YES''
			THEN 1 ELSE 0 END 								AS is_nullable,
			LTRIM(RTRIM(a.udt_name))						AS data_type,
			--a.data_type AS data_type,
			LTRIM(RTRIM(a.character_maximum_length))		AS character_maximum_length,
			LTRIM(RTRIM(a.numeric_scale))					AS numeric_scale,
			LTRIM(RTRIM(a.numeric_precision))				AS numeric_precision,
			--b.primary_key_definition,
			CASE WHEN b.PK_column_name IS NULL 
			THEN 0 ELSE 1 END								AS is_primary_key,
			''tab''											AS local_schema_name,
			LTRIM(RTRIM(a.table_schema))					AS remote_schema_name,
			''remote''										AS local_or_remote
			FROM OPENQUERY(' +@remote_server_name+ ', 
			''select  
			c.table_name, 
			c.column_name, 
			c.ordinal_position, 
			c.is_nullable, 
			c.data_type, 
			c.udt_name ,
			c.character_maximum_length, 
			c.numeric_scale, 
			c.numeric_precision, 
			c.table_schema
			from information_schema.columns c 	
			where c.table_catalog = ''''workgroup'''' and c.table_schema = ''''public'''''') a
			LEFT JOIN 
			OPENQUERY(' +@remote_server_name+ ', 
			''select 
			cl.relname as table_name, 
			co.conname as constraint_name, 
			co.contype conatraint_type,
			pg_get_constraintdef(co.oid) AS primary_key_definition ,
			ns.nspname as schema_name,
			pa.attname as PK_column_name
			from pg_class cl join pg_constraint co on cl.oid = co.conrelid 
			join pg_namespace ns on cl.relnamespace = ns.oid
			join pg_attribute pa on pa.attrelid = cl.oid and pa.attnum = co.conkey[1] 
			where co.contype = ''''p''''
			and cl.relkind=''''r''''
			and ns.nspname = ''''public'''''') b 
			ON a.table_name = b.table_name AND a.table_schema = b.[schema_name] AND a.column_name = b.PK_column_name			
			WHERE SUBSTRING(a.table_name, 1, 1) <> ''_'' AND SUBSTRING(a.table_name, 1, 7) <> ''orphans''
			AND NOT EXISTS (SELECT objectname FROM TableauDBCopy.dbo.tabSchemaObjectsExclude o WHERE o.objectname = a.table_name)			
			ORDER BY a.table_name, a.ordinal_position'
		

		IF @Is_Debug_Mode = 1
		BEGIN
			PRINT CHAR(13) + 'SQL statement for acquiring ''source'' tables metadata into #t_allTblMetadata temp table:'
			PRINT '-----------------------------------------------------------------------------------------------'
			PRINT @SQL +REPLICATE(CHAR(13),2) 
		END

		EXEC(@SQL)

		IF @Is_Debug_Mode = 1 
			BEGIN
				SELECT  '#t_allTblMetadata table content for remote objects metadata:' AS 'HINT'
                SELECT  *
                FROM    #t_allTblMetadata WHERE local_or_remote = 'Remote'
				ORDER BY table_name, ordinal_position
			END
		
        IF @Is_ReCheck = 1
            BEGIN
                GOTO Check_Local_Schema
            END

        Check_Local_Schema:  


		SET @SQL = 
		'INSERT INTO #t_allTblMetadata
				(
					[table_name]
					,[column_name]
					,[ordinal_position]
					,[is_nullable]
					,[data_type]
					,[character_maximum_length]
					,[numeric_scale]
					,[numeric_precision]
					,[is_primary_key]
					,[local_schema_name]
					,[remote_schema_name]
					,[local_or_remote]
				)		
        SELECT  
				t.name AS table_name ,
				c.name AS column_name ,
				c.column_id AS ordinal_position ,
				c.is_nullable ,										
				tp.name AS data_type ,
				c.max_length AS character_maximum_length ,
				c.scale AS numeric_scale ,
				c.precision AS numeric_precision ,
				ISNULL(idx.pk_flag,0) as ''is_primary_key'' ,
				ss.name ,
				''public'' ,
				''local'' AS local_or_remote
		FROM    sys.tables t
		JOIN sys.columns c ON t.object_id = c.object_id
		JOIN sys.types tp ON c.user_type_id = tp.user_type_id
		JOIN sys.objects so ON so.object_id = t.object_id
		JOIN sys.schemas ss ON so.schema_id = ss.schema_id
		LEFT JOIN		(select i.name as index_name, i.is_primary_key as pk_flag, OBJECT_NAME(ic.OBJECT_ID) AS table_name,
		COL_NAME(ic.OBJECT_ID,ic.column_id) AS column_name FROM sys.indexes AS i INNER JOIN 
		sys.index_columns AS ic ON  i.OBJECT_ID = ic.OBJECT_ID
		AND i.index_id = ic.index_id
		WHERE   i.is_primary_key = 1) idx on idx.table_name = t.name and idx.column_name = c.name
		JOIN INFORMATION_SCHEMA.TABLES tt on tt.table_schema = ss.name and tt.table_name = t.name
        WHERE   t.type = ''u'' 	
		AND tt.TABLE_CATALOG = 	'''+@Target_DB_Name+'''		 
		AND ss.name = '''+@Target_DB_Schema_Name+''''

		IF @Is_Debug_Mode = 1
		BEGIN
			PRINT 'SQL statement for acquiring ''target'' tables metadata into #t_allTblMetadata temp table:'
			PRINT '-----------------------------------------------------------------------------------------------'
			PRINT @SQL +REPLICATE(CHAR(13),2) 
		END

		EXEC(@SQL)

		IF @Is_Debug_Mode = 1 
			BEGIN
				SELECT  '#t_allTblMetadata table content for local objects metadata:' AS 'HINT'
                SELECT  *
                FROM    #t_allTblMetadata WHERE local_or_remote = 'local'
				ORDER BY table_name, ordinal_position
			END


        IF OBJECT_ID('tempdb..#t_sql') IS NOT NULL
            BEGIN
                DROP TABLE [#t_sql]
            END

        SELECT  DISTINCT
                t1.table_name AS Table_Name ,
                t1.local_schema_name AS Local_Schema_Name ,
                'create table [' + t1.local_schema_name + '].['
                + LOWER(t1.table_name) + '] (' + STUFF(o.list, LEN(o.list), 1, '')
                + ')' + CASE WHEN t2.is_primary_key = 0 THEN ''
                             ELSE '; ALTER TABLE   [' + t1.local_schema_name
                                  + '].[' + t1.table_name + '] '
                                  + ' ADD CONSTRAINT pk_'
                                  + LOWER(t1.local_schema_name) + '_'
                                  + LOWER(t2.table_name) + '_' +  
								  LOWER(REPLACE(t2.pk_column_names,',','_'))
                                  +' PRIMARY KEY CLUSTERED ' + '('
                                  + LOWER(t2.pk_column_names) + ')'
                        END AS Create_Table_Schema_Definition_SQL ,
                'if object_id (''[' + t1.local_schema_name + '].['
                + t1.table_name + ']' + ''', ''U'') IS NOT NULL drop table ['
                + t1.local_schema_name + '].[' + t1.table_name + ']' AS Drop_Table_SQL
        INTO    #t_sql
        FROM    #t_allTblMetadata t1
                CROSS APPLY ( SELECT    '[' + column_name + '] '
                                        +	CASE	WHEN data_type IN ( 'bigint',
																		'int8',
																		'bigserial')
													THEN				'bigint'
													WHEN data_type IN (	'integer',
																		'serial4', 
																		'serial',
																		'int4',
																		'int',
																		'oid')
													THEN				'int'
													WHEN data_type IN ( 'smallint', 
																		'serial2', 
																		'smallserial',
																		'int2')
													THEN				'smallint'
													WHEN data_type IN ( 'uuid')
													THEN				'uniqueidentifier'	
													WHEN data_type IN (	'bool', 
																		'boolean' )
													THEN				'bit'
													WHEN data_type IN (	'timestamp', 
																		'timestamptz')
													THEN				'datetime'
													WHEN data_type IN ( 'bytea',
																		'json',
																		'text',
																		'varchar')
													THEN				'nvarchar'
													WHEN data_type IN ( SELECT * FROM #t_seqfloats )																	
													THEN				'float'
													ELSE data_type 
													END									
                                        +	CASE	WHEN data_type	IN ('int2',
																		'int4',
																		'int8',
																		'oid',																		
																		'timestamp',																	
																		'uuid',
																		'bool')
													THEN ''
													WHEN 
													data_type IN ('text', 'json', 'bytea') OR (data_type = 'varchar' and character_maximum_length IS NULL) OR character_maximum_length > 8000
													THEN '(max)'			 							  
													WHEN data_type = 'decimal'
													THEN '('
                                                    + CAST(numeric_precision AS VARCHAR)
                                                    + ', '
                                                    + CAST(numeric_scale AS VARCHAR)
                                                    + ')'
													WHEN data_type in (SELECT * FROM #t_seqfloats)
													THEN '(53)'
													ELSE COALESCE('(' + CAST(character_maximum_length AS VARCHAR) + ')', '')
                                          END + ' '
                                        +( CASE WHEN is_nullable = 0
                                                THEN 'NOT '
                                                ELSE ''
                                           END ) + 'NULL' + ','
                              FROM      #t_allTblMetadata
                              WHERE     table_name = t1.table_name AND local_or_remote = 'Remote'
                              ORDER BY  ordinal_position
                            FOR
                              XML PATH('')
                            ) o ( list )
                JOIN ( SELECT   table_name ,
                                is_primary_key ,
                                pk_column_names ,
                                column_name = REVERSE(RIGHT(REVERSE(pk_column_names),
                                                            LEN(pk_column_names)
                                                            - CHARINDEX(',',
                                                              REVERSE(pk_column_names))))
                       FROM     ( SELECT    table_name ,
                                            is_primary_key ,
                                            pk_column_names = STUFF(( SELECT
                                                              ','
                                                              +CAST(column_name AS VARCHAR(500))
                                                              FROM
                                                              #t_allTblMetadata z2
                                                              WHERE
                                                              z1.table_name = z2.table_name
                                                              AND z2.is_primary_key = 1
															  AND z2.local_or_remote = 'Remote'
															  ORDER BY z2.column_name ASC
                                                              FOR
                                                              XML
                                                              PATH('')
                                                              ), 1, 1, '')
                                  FROM      #t_allTblMetadata z1
                                  WHERE     z1.is_primary_key = 1
											AND z1.local_or_remote = 'Remote'
                                  GROUP BY  z1.table_name ,
                                            z1.is_primary_key
                                ) a
                     ) t2 ON t1.table_name = t2.table_name
        WHERE   t1.local_schema_name <> 'unknown' and t1.local_or_remote = 'Remote'

        IF @Is_Debug_Mode = 1
            BEGIN
                SELECT  '#t_sql table content:' AS 'HINT'
               SELECT  *
                FROM    #t_sql
				ORDER BY Table_Name 
            END

        IF @Is_ReCheck = 1
            BEGIN
                GOTO Do_Table_Diff
            END



        Do_Table_Diff:


        IF OBJECT_ID('tempdb..#t_diff') IS NOT NULL
            BEGIN
                DROP TABLE [#t_diff]
            END

			  
        ;WITH    Temp_CTE ( table_name, column_name, is_nullable, data_type, local_schema_name, is_primary_key, character_maximum_length, numeric_scale, numeric_precision )
                  AS (			  				  				 				 
                       SELECT	table_name					= m.table_name ,
								column_name					= m.column_name ,
                                is_nullable					= m.is_nullable ,
                                data_type					= CASE	WHEN m.data_type IN (	'bigint',
																							'int8',
																							'bigserial')
																	THEN					'bigint'
																	WHEN m.data_type IN (	'integer',
																							'serial4', 
																							'serial',
																							'int4',
																							'int',
																							'oid')
																	THEN					'int'
																	WHEN m.data_type IN (	'smallint', 
																							'serial2', 
																							'smallserial',
																							'int2')
																	THEN					'smallint'
																	WHEN m.data_type IN (	'uuid')
																	THEN					'uniqueidentifier'	
																	WHEN m.data_type IN (	'bool', 
																							'boolean' )
																	THEN					'bit'
																	WHEN m.data_type IN (	'timestamp', 
																							'timestamptz')
																	THEN					'datetime'
																	WHEN m.data_type IN (	'bytea',
																							'json',
																							'text',
																							'varchar')
																	THEN					'nvarchar'
																	WHEN m.data_type IN ( SELECT * FROM #t_seqfloats )																	
																	THEN					'float'
																	ELSE m.data_type 
															END,
                                local_schema_name			= m.local_schema_name ,
                                is_primary_key				= m.is_primary_key ,
                                character_maximum_length	= COALESCE(CASE WHEN 
																		m.data_type IN ('text', 'json', 'bytea') OR (m.data_type = 'varchar' and m.character_maximum_length IS NULL) OR m.character_maximum_length > 8000
																			THEN 'max' 
																			ELSE CAST(m.character_maximum_length AS VARCHAR) END,
																			constants.character_maximum_length ,
																			CAST(l.character_maximum_length AS VARCHAR)),
                                numeric_scale				= COALESCE(	constants.numeric_scale, 
																		CAST(m.numeric_scale AS VARCHAR),
																		CAST(l.numeric_scale AS VARCHAR)),
								numeric_precision			= COALESCE(	constants.numeric_precision, 
																		CAST(m.numeric_precision AS VARCHAR), 
																		CAST(l.numeric_precision AS VARCHAR))
                       FROM     #t_allTblMetadata m
                                LEFT JOIN ( SELECT  'char' AS data_type ,
                                                    NULL AS character_maximum_length ,
                                                    0 AS numeric_scale ,
                                                    0 AS numeric_precision
                                            UNION ALL
                                            SELECT  'varchar' ,
                                                    NULL ,
                                                    '0' ,
                                                    '0'
											UNION ALL
                                            SELECT  'time' ,
                                                    '5' ,
                                                    '7' ,
                                                    '16'
                                            UNION ALL
                                            SELECT  'date' ,
                                                    '3' ,
                                                    '0' ,
                                                    '10'
                                            UNION ALL
                                            SELECT  'datetime' ,
                                                    '8' ,
                                                    '3' ,
                                                    '23'
                                            UNION ALL
                                            SELECT  'datetime2' ,
                                                    '8' ,
                                                    '7' ,
                                                    '27'
                                            UNION ALL
                                            SELECT  'smalldatetime' ,
                                                    '4' ,
                                                    '0' ,
                                                    '16'
                                            UNION ALL
                                            SELECT  'bit' ,
                                                    '1' ,
                                                    '0' ,
                                                    '1'
                                            UNION ALL
                                            SELECT  'float' ,
                                                    '8' ,
                                                    '0' ,
                                                    '53'
                                            UNION ALL
                                            SELECT  'money' ,
                                                    '8' ,
                                                    '4' ,
                                                    '19'
                                            UNION ALL
                                            SELECT  'smallmoney' ,
                                                    '4' ,
                                                    '4' ,
                                                    '10'
                                            UNION ALL
                                            SELECT  'uniqueidentifier' ,
                                                    '16' ,
                                                    '0' ,
                                                    '0'
                                            UNION ALL
                                            SELECT  'xml' ,
                                                    'max' ,
                                                    '0' ,
                                                    '0'
                                            UNION ALL
                                            SELECT  'numeric' ,
                                                    '9' ,
                                                    '0' ,
                                                    '18'
                                            UNION ALL
                                            SELECT  'real' ,
                                                    '4' ,
                                                    '0' ,
                                                    '24'
                                            UNION ALL
                                            SELECT  'tinyint' ,
                                                    '1' ,
                                                    '0' ,
                                                    '3'
                                            UNION ALL
                                            SELECT  'smallint' ,
                                                    '2' ,
                                                    '0' ,
                                                    '5'
                                            UNION ALL
                                            SELECT  'int' ,
                                                    '4' ,
                                                    '0' ,
                                                    '10'
                                            UNION ALL
                                            SELECT  'bigint' ,
                                                    '8' ,
                                                    '0' ,
                                                    '19'
                                          ) constants ON	(CASE	WHEN m.data_type IN (		'bigint',
																								'int8',
																								'bigserial')
																	THEN						'bigint'
																	WHEN m.data_type IN (		'integer',
																								'serial4', 
																								'serial',
																								'int4',
																								'int',
																								'oid')
																	THEN						'int'
																	WHEN m.data_type IN (		'smallint', 
																								'serial2', 
																								'smallserial',
																								'int2')
																	THEN						'smallint'
																	WHEN m.data_type IN (		'uuid')
																	THEN						'uniqueidentifier'	
																	WHEN m.data_type IN (		'bool', 
																								'boolean' )
																	THEN						'bit'
																	WHEN m.data_type IN (		'timestamp', 
																								'timestamptz')
																	THEN						'datetime'
																	WHEN m.data_type IN (		'bytea',
																								'json',
																								'text',
																								'varchar')
																	THEN						'nvarchar'
																	WHEN m.data_type IN ( SELECT * FROM #t_seqfloats )																	
																	THEN						'float'
																	ELSE m.data_type 
															END	) = constants.data_type
                                LEFT JOIN #t_allTblMetadata l ON l.column_name = m.column_name
                                                        AND l.table_name = m.table_name
                                                        AND l.data_type = ( CASE	WHEN m.data_type IN (	'bigint',
																											'int8',
																											'bigserial')
																					THEN					'bigint'
																					WHEN m.data_type IN (	'integer',
																											'serial4', 
																											'serial',
																											'int4',
																											'int',
																											'oid')
																					THEN					'int'
																					WHEN m.data_type IN (	'smallint', 
																											'serial2', 
																											'smallserial',
																											'int2')
																					THEN					'smallint'
																					WHEN m.data_type IN (	'uuid')
																					THEN					'uniqueidentifier'	
																					WHEN m.data_type IN (	'bool', 
																											'boolean' )
																					THEN					'bit'
																					WHEN m.data_type IN (	'timestamp', 
																											'timestamptz')
																					THEN					'datetime'
																					WHEN m.data_type IN (	'bytea',
																											'json',
																											'text',
																											'varchar')
																					THEN					'nvarchar'
																					WHEN m.data_type IN ( SELECT * FROM #t_seqfloats )																	
																					THEN					'float'
																					ELSE m.data_type 
																			END ) AND l.local_or_remote = 'Local'
			WHERE m.local_or_remote = 'Remote' 		
			EXCEPT
			SELECT				table_name ,
                                column_name ,
                                is_nullable ,
                                data_type , 
                                local_schema_name ,
                                is_primary_key ,
                                CASE	WHEN character_maximum_length > 8000 OR character_maximum_length = -1
								THEN 'max'
								WHEN data_type IN ('nvarchar', 'nchar') THEN CAST(character_maximum_length/2 AS VARCHAR)
								ELSE CAST(character_maximum_length AS VARCHAR) END AS character_maximum_length,
                                numeric_scale ,
                                numeric_precision
			FROM     #t_allTblMetadata 
			WHERE local_or_remote  ='Local' 
                     )
            SELECT DISTINCT
                    table_name ,
                    local_schema_name
            INTO    #t_diff
            FROM    Temp_CTE

        IF @Is_Debug_Mode = 1
            BEGIN
                SELECT  '#t_diff table content:' AS 'HINT'
                SELECT  *
                FROM    #t_diff
            END

        IF @Is_ReCheck = 1
            GOTO Results

        Run_SQL:
        IF NOT EXISTS ( SELECT DISTINCT
                                Table_Name ,
                                Local_Schema_Name
                        FROM    #t_sql a
                        WHERE   EXISTS ( SELECT table_name
                                         FROM   #t_diff i
                                         WHERE  a.Table_Name = i.table_name ) )
            BEGIN
                GOTO Schema_Diff_ReCheck
            END
        ELSE
            BEGIN
                DECLARE @schema_name VARCHAR(50)
                DECLARE @table_name VARCHAR(256)
                DECLARE @sql_select_dropcreate NVARCHAR(MAX)			

                DECLARE db_cursor CURSOR FORWARD_ONLY
                FOR
                    SELECT DISTINCT
                            Table_Name ,
                            Local_Schema_Name
                    FROM    #t_sql a
                    WHERE   EXISTS ( SELECT table_name
                                     FROM   #t_diff i
                                     WHERE  a.Table_Name = i.table_name )
                OPEN db_cursor
                FETCH NEXT
			FROM db_cursor INTO @table_name, @schema_name
                WHILE @@FETCH_STATUS = 0
                    BEGIN
                        BEGIN TRY
                            BEGIN TRANSACTION
                            SET @sql_select_dropcreate = ( SELECT
                                                              Drop_Table_SQL
                                                           FROM
                                                              #t_sql 
                                                           WHERE
                                                              Table_Name = @table_name
                                                         ) + '; ' +CHAR(13)
                                + ( SELECT  Create_Table_Schema_Definition_SQL 
                                    FROM    #t_sql
                                    WHERE   Table_Name = @table_name
                                  ) + REPLICATE(CHAR(13),2)    
								  
							IF @Is_Debug_Mode = 1
								BEGIN
									PRINT 'SQL statement for dropping/recreating ''source'' table(s):'
									PRINT '-----------------------------------------------------------------------------------------------'
									PRINT @sql_select_dropcreate									
								END

                            EXEC sp_sqlexec @sql_select_dropcreate
                            --SET @Is_All_OK = 1
                            SET @Error_Message = 'All Good!'
                            COMMIT TRANSACTION
                        END TRY
                        BEGIN CATCH		
                            IF @@TRANCOUNT > 0
                                ROLLBACK TRANSACTION;
                            SET @Is_All_OK = 0
                            SET @Error_Message = 'This operation has been unexpectandly terminated due to error: '''
                                + ERROR_MESSAGE() + ''' at line '
                                + CAST(ERROR_LINE() AS VARCHAR);								
                        END CATCH
                        FETCH NEXT FROM db_cursor INTO @table_name,@schema_name
                    END
                CLOSE db_cursor
                DEALLOCATE db_cursor
                SET @Is_ReCheck = 1
            END


        Schema_Diff_ReCheck:
        IF @Is_ReCheck = 1
            BEGIN
                GOTO Check_RemoteSvr_Schema
            END

        Results:
        IF EXISTS ( SELECT TOP 1
                            *
                    FROM    #t_diff )
            BEGIN 
                SET @Is_All_OK = 0
                SET @Error_Message = 'Table schema reconciliation between '
                    + '' + @@SERVERNAME + ''
                    + ' and remote database on '''+@remote_server_name+'''' + CHAR(10)
                SET @Error_Message = @Error_Message + 'failed. Please troubleshoot.'
            END
        ELSE
            BEGIN
                SET @Is_All_OK = 1
                SET @Error_Message = 'All Good!'
            END


		IF OBJECT_ID('tempdb..#t_seqfloats') IS NOT NULL
            BEGIN
                DROP TABLE #t_seqfloats		
            END
        IF OBJECT_ID('tempdb..#t_allTblMetadata') IS NOT NULL
            BEGIN
                DROP TABLE [#t_allTblMetadata]
            END 
        IF OBJECT_ID('tempdb..#t_sql') IS NOT NULL
            BEGIN
                DROP TABLE [#t_sql]
            END
        IF OBJECT_ID('tempdb..#t_sql') IS NOT NULL
            BEGIN
                DROP TABLE [#t_diff]
            END	
    END

Finally, we are ready to load the Tableau PostgreSQL data into the ‘TableauDBCopy’ database tables on the ‘tab’ schema. For that we can use SQL Server Integration Services but since the ‘workgroup’ database is quite small in size and most tables have a primary key defined on them, we can load the data in a sequential order i.e. table by table using a modified version of my database replication stored procedure which I described in one of my previous blog posts HERE. The stored procedure works in a similar fashion to the one described previously but allowances needed to be made in order to enable PostgreSQL and SQL Server data types and certain conventions conformance e.g. certain PostgreSQL reserved words need to be encapsulated in double quotes in the OPENQUERY statements in order to be validated and recognized by SQL Server. Likewise, certain SQL Server reserved words need to be used with square brackets delimiters. To reference those exceptions, I have created two views (downloadable from HERE) which are used in the merging stored procedure to provide greater cross-database compatibility. The code to the stored procedure works on a table-to-table basis but it it’s would not very hard to make it loop through a collection of objects e.g. using a cursor or ‘Foreach Loop’ SSIS transformation to automate a comprehensive data load. If, on the other hand, a much faster, asynchronous load is required you can always check out one of my previous blog posts on parallel SQL statements execution using SQL Server Agent jobs HERE. All the code for the data synchronization across Tableau’s PostgreSQL database and SQL Server instance as well as other T-SQL snippets presented in the post can be downloaded from my OneDrive folder HERE.

Below is a short video depicting how this solution works using both – schema synchronisation and data synchronisation stored procedures.

The ‘workgroup’ database data dictionary with all PostgreSQL objects description can be found HERE.

Tags: , , , ,

Enhancing SQL Server Integration Services (SSIS) Functionality Through Python Scripting

August 8th, 2015 / 5 Comments » / by admin

Ever since Python started to gain traction in the developers community as one of the most versatile and easy to learn programming language, Microsoft has made great strides to make it a first-class citizen in its ecosystem. Python Tools for Visual Studio, tight Python libraries integration with the .NET framework in IronPython project, the availability of the Python SDK on Windows Azure or even the latest addition of Python in Azure Machine Learning service are just a few examples of how Microsoft takes Python popularity more seriously these days. However, when it comes to tools such as Integration Services, C# and VB.NET are the only two options provided to extend built-in SSIS functionality out of the box. Whilst both languages are powerful enough to give the BI developers the tools needed to supplement default SSIS transformations, the entry barrier is often too high and reserved for those commanding a good understanding of .NET libraries and C# of VB.NET syntax. Also, both languages are often perceived as too verbose to use to build simple scripts dealing with rudimentary tasks such as reading and writing text files, working with Excel files, sending e-mails etc. That’s where high-level languages shine and that’s where Python often comes in handy in my experience.

SSIS execute Process Task allows the package to run Win32 executables or batch files. In this way, providing we have Python installed and added to the path, we can run Python scripts either through (1) directly invoking Python interpreter and passing the Python script name as an argument or (2) by wrapping it the script in a .bat file. Alternatively, (3) we can ‘freeze’ Python code as an executable – an option I have also explored in this post. Firstly, let’s explore options 1 and 2 and create a sample Python file in a C:\ directory along with a batch file to wrap the script in. Python script will be called PyScript1.py whereas the batch file – BatPyScript1.bat

#Python code saved as PyScript1.py
print ('This is how PRINT statement works in Python')
input('Press ENTER to continue...')

#Batch file code to execute the above Python code
#saved as BatPyScript1.bat under directly C:\ drive
python c:\PyScript1.py

The above code is very rudimentary and does not provide any functionality besides printing out a dummy string in a console window but is a good example of how python scripts can be invoked from the Execute Process Task. The below video clip outlines how PyScript1.py Python script can be triggered using SQL Server Integration Services task. Also, notice that FailTaskIfReturnCodeIsNotSuccessValue parameter needs to be set to False if we execute the script from a batch file.

Python being a general-purpose, high-level programming language can support or perform a variety of tasks to enhance ETL development (not only SSIS). Perceived by many as easy to learn, super productive and with a generous Standard Library (batteries included), Python can provide a great deal of functionality with hardly any code. Let’s explore a few examples of how Python can be used to scrape some data off a website, save it in a CSV file, transfer it into the SQL Server database and finally generate and email some basic reports based on this data.

Let’s assume that we would like to scrape stock prices data of the following website (click on image to expand).

Python_Scripting_ETL_Extension_Stock_Website_View

There are several libraries which can be used for web scraping but the most popular by far and the one we’re using here is Beautiful Soup – a Python package for parsing HTML and XML documents (including having malformed markup, i.e. non-closed tags, so named after Tag soup). The columns we are interested in are the company code and the share price only. As part of this exercise we will also add an ‘extract date and time’ column to be able to pinpoint when the data was scraped. The data will be saved in C:\Imports folder as a CSV file with an extracted date and time as part of the file name.

from urllib.request import urlopen
import csv
import datetime
import time
import os
from bs4 import BeautifulSoup
savePathImports = 'C:\\Imports'
if not os.path.exists(savePathImports):
    os.makedirs(savePathImports)
urlToOpen = 'http://www.marketindex.com.au/asx100'
soup = BeautifulSoup(urlopen(urlToOpen).read())
filename = time.strftime("extract_%d%b%Y_%H%M%S.csv")
csvFile = open(savePathImports + '\\' + filename,'wt',newline='')
writer = csv.writer(csvFile)
writer.writerow(["Company Code", "Share Price", "Extract DateTime"])
try:
    for row in soup("table", {"id": "asx_sp_table" })[0].tbody('tr'):
        tds = row('td')
        csvRow = []
        csvRow.append(tds[1].string)
        csvRow.append(tds[3].string.replace("$", ""))
        csvRow.append(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        writer.writerow(csvRow)
finally:
    csvFile.close()

Further on, now that we have the data scraped, let’s assume that rather then storing it in a CSV file, we would like to insert it into the SQL Server database. There are many good libraries that can enable Python to SQL Server instance interface – in this example I used PyPyODBC purely because of good Python 3.4 support. The below snippet creates ‘stock_prices’ table on the default ‘dbo’ schema, inserts the data from the CSV file created by the code above and appends ‘insert_datetime’ data to the table.

import pypyodbc
import datetime
import glob
import csv
import os

#define SQL Server database connection details,
#establish the connection and drop/create 'stock_prices' table
conn = pypyodbc.connect('Driver={SQL Server};'
                        'Server=ServerName\InstanceName;'
                        'Database=DatabaseName;'
                        'uid=Login;pwd=Password')
cur = conn.cursor()
cur.execute('''IF OBJECT_ID(N'dbo.stock_prices', N'U') IS NOT NULL
	        DROP TABLE dbo.stock_prices;
            CREATE TABLE stock_prices
            (id int IDENTITY (1,1),
            company_code CHAR (3) NOT NULL,
            share_price DECIMAL(6,2) NOT NULL,
            extract_datetime DATE NOT NULL,
            insert_datetime DATE);
            ''')
cur.commit()

#import data from CSV file into the SQL Server table skipping
#first/header row and append current 'time stamp' column
filesLocation = 'C:\\Imports'
import_data = []
current_timestamp = str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
os.chdir(filesLocation)
for csvFile in glob.glob("*.csv"):
    with open(csvFile, 'r') as f:
        reader = csv.reader(f)
        next(reader, None)
        import_data = [tuple(line + [current_timestamp]) for line in csv.reader(f)]

cur.executemany('''INSERT INTO dbo.stock_prices
                (company_code,share_price,extract_datetime,insert_datetime)
                VALUES (?,?,?,?)''',import_data)
cur.commit()

#close database connection
conn.close()

There is no point in storing the data in a database (or flat files) if you can’t do anything with it. The next snippet of code extracts the stock prices data from the database and creates a very simple Excel report using openpyxl library. First let’s create a sample empty Excel spreadsheet file in the Report subdirectory of Imports folder. Next, let’s assume that the report requires us to provide the data for the top 15 most expensive stocks on the market for the data we have scraped. On top of that we would like to have it graphed in the same workbook and e-mailed to a recipient. While SQL Server built-in features can partially cater for these requirements e.g. SQL Server database mail can handle e-mail distribution functionality, creating reports with built-in graphs requires a programmatic approach and that is where Python’s simplicity and flexibility shine.

The below code imports the data from SQL Server instance which has previously been scraped from the website featuring stock prices, imports it into the spreadsheet file, creates a simple bar graph from the first 2 columns (stock price and company code) and finally e-mails it to a recipient using smtplib library. A lot of functionality for a very small amount of code!

import openpyxl
import pypyodbc
import time
import os
import smtplib
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.utils import formatdate

#define SQL Server database connection details and report file path
savePathReport = 'C:\\Imports\Report'
xlsxReportName = 'samplereport.xlsx'
conn = pypyodbc.connect('Driver={SQL Server};'
                        'Server=ServerName\InstanceName;'
                        'Database=DatabaseName;'
                        'uid=Login;pwd=Password')
cur = conn.cursor()
os.chdir(savePathReport)

#open Excel spreadsheet, rename active workbook to 'samplereport_DDMonthYY'
wb = openpyxl.load_workbook(xlsxReportName)
sheet = wb.get_active_sheet()
wb.remove_sheet(sheet)
wb.create_sheet(title=time.strftime("samplereport_%d%b%Y"))
sheet = wb.get_active_sheet()

#get the data (share price, company code and date for the top 15 most expensive
# shares out of SQL Server table and insert it into the report/spreadsheet
cur.execute("""SELECT TOP 15 share_price,
               company_code, extract_datetime,
               rank() OVER (ORDER BY share_price DESC) +1 as row_number
               FROM [AdminDBA].[dbo].[stock_prices]
               ORDER BY share_price DESC""")
dbTblHeader = [item[0] for item in cur.description]
del dbTblHeader[-1]
sheet.append(dbTblHeader)
for row in cur.fetchall():
    sheet['A' + str(row[3])] = float(row[0])
    sheet['B' + str(row[3])] = row[1]
    sheet['C' + str(row[3])] = row[2]

#create a simple bar graph from the report data on the same workbook
values = openpyxl.charts.Reference(sheet, (2, 1), (16, 1))
labels = openpyxl.charts.Reference(sheet, (2, 2), (16, 2))
seriesObj = openpyxl.charts.Series(values, labels = labels, title='Stock Prices as at '+time.strftime("%d-%b-%Y"))
chartObj = openpyxl.charts.BarChart()
chartObj.append(seriesObj)
chartObj.drawing.top = 5
chartObj.drawing.left = 300
chartObj.drawing.width = 500
chartObj.drawing.height = 315
sheet.add_chart(chartObj)
wb.save(xlsxReportName)
cur.close()
conn.close()

#send e-mail to designated e-mail address
#with Excel spreadsheet attachment
attachmentPath = savePathReport+'\\'+xlsxReportName
sender = 'senders_email_address@domain_name.com'
recipient = 'recipient_email_address@domain_name.com'
sendersEmailLogin = 'Password'
HOST = "smtp.gmail.com" #other smtp providers include outlook/hotmail @ smtp-mail.outlook.com or Yahoo Mail @ smtp.mail.yahoo.com
msg = MIMEMultipart()
msg["From"] = sender
msg["To"] = recipient
msg["Subject"] = "Share Prices report for " + time.strftime("%d-%b-%Y")
msg['Date'] = formatdate(localtime=True)
part = MIMEBase('application', "octet-stream")
part.set_payload( open(attachmentPath,"rb").read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', 'attachment; filename="%s"' % os.path.basename(attachmentPath))
msg.attach(part)
server = smtplib.SMTP(HOST, 587)
server.ehlo()
server.starttls()
server.login(sender, sendersEmailLogin)
server.sendmail(sender, recipient, msg.as_string())
server.close()

The final output in C:\Imports\Report folder should be an Excel spreadsheet containing the data and a simple graph as per the image below.

Python_Scripting_ETL_Extension_Excel_Report_View

While Python can easily be installed on any system (some even coming with Python pre-installed), under some circumstances it may not be desirable or permitted to install third party software on certain environments. In these cases Python files can be compiled into executable files (DOS, OpenVMS, Microsoft Windows, Symbian or OS/2.) using a few different utilities e.g. py2exe or cx_freeze.

Let’s combine the above code into one executable using py2exe. py2exe is a Python extension which converts Python scripts (.py) into Microsoft Windows executables (.exe). These executables can run on a system without Python installed and the process is very straightforward. Depending on which version of Python you run, you will need to download applicable version of py2exe. As my installation is version 3.4 I downloaded it from HERE. All we need to do now is to create a ‘setup’ Python script, saving it in the same folder as the python file that needs to be converted and invoke py2exe from the command line. This will create a sub-folder called ‘dist’ where the executable will be created. The following code is the simplest version of the setup.py file.

from distutils.core import setup
import py2exe
setup(console=["filename.py"])

Once saved in a file we can reference it in the terminal with the following command.

python setup.py py2exe

Below is the output generated by this process (as my default Python installation is a distribution from Continuum Analytics called Anaconda, you can see a lot of DLLs and Python modules being referenced as part of the compilation).

Python_Scripting_ETL_Extension_Py2Exe_Terminal_View

Once the executable file is generated we can easily reference it as part of the ETL process, calling the file in the Execute Process Task transformation or even through xp_cmdshell form an SQL statement.

SQL Server Integration Services functionality allows the developers to complete a vast plethora of tasks and further extend its capabilities through .NET languages implementation, however, Python’s sublime flexibility coupled with the ease of development not only provides somewhat gentler entry point into the programming realm but also allows for complex task execution with minimal development input.

Tags: , , ,