Tableau Server Workgroup PostgreSQL Database Schema And Data Synchronization
Last client who engaged me to architect and develop a small data warehouse for them also made a large investment in Tableau as their default reporting platform. The warehouse data, small by today’s standards, was to be uploaded into Tableau server as a scheduled overnight extract and users granted access to reports/data on Tableau server rather than querying star schema relational tables directly (implemented on the SQL Server database engine). As data availability and therefore meeting BI SLAs was paramount to the project success, a robust notification system was put in place to log any data warehouse issues that may have arisen from all activities on the database server. However, even though all precautions were taken to ensure data warehouse failed processes were logged and corresponding issues mitigated accordingly, Tableau extract failures along with other Tableau server activities were largely unaccounted for due to the lack of data. How could one address this issue?
Tableau provides access to their internal server PostgreSQL metadata database with just a few simple steps. As it turns out, during installation Tableau Server will create the almost empty ‘workgroup’ repository with over 100+ tables, 900+ columns (about 100 of them used as Keys), 300+ joins and 16+ views which can be accessed and queried. Tableau Server works as a collection processes, processors, programs and applications, like data engine, data server, VizQL Server, application server etc. Each of those processes generates log with data about user activities, data connections, queries and extractions, errors, views and interactions, etc. which is parsed regularly and stored into PostgreSQL-based Tableau Server Administrative Database. PostgreSQL Server containing Workgroup DB usually runs on the same Windows Server as Main Tableau Server or (if Tableau Server runs on multi-mode cluster with Worker Tableau Server(s)) on other Windows Server, which runs Worker Tableau Server and uses non-standard TCP/IP port 8060. Tableau’s PostgreSQL database access is provided by means of using a few different accounts, each with its own set of privileges. In November 2014 Tableau Software introduced (Release 8.2.5) a new, default user, named ‘readonly’ with read access to all tables and views of Workgroup Repository which is what I’m going to be using to get ‘under the hood’. Other user commonly used for Tableau server metadata exploration, aptly named ‘tableau’ can also be used for Tableau server activity analysis but has access to fewer database objects.
The easiest way to connect to Tableau’s ‘workgroup’ database using ‘readonly’ account is opening an administrator command prompt on your Tableau Server, navigating to your Tableau Server bin directory and issuing the tabadmin dbpass command, specifying your chosen password. After server restart the changes should take effect and you should be able to see the following output.
PostgreSQL can now be queried using a client tool of your choice e.g. PgAdmin. This is what the ‘public’ schema with all its tables looks like when imported in Navicat Data Modeller (click on image to enlarge).
In order to connect to it from SQL Server we can simply download PostgreSQL ODBC driver and configure it with the credentials of the ‘readonly’ user.
All there is left to do is to create a linked server connection to PostgreSQL database directly from SQL Server Management Studio, exposing a collection of objects (tables and views) on the public schema.
Now we should be able to OPENQUERY Tableau metadata objects with ease but if we would like to go further and regularly copy Tableau’s data across to SQL Server (in case of my client that was precisely the requirement in order not to interfere with production database), the following code should provide this functionality.
Firstly, let’s create a sample database called ‘TableauDBCopy’ and a ‘tab’ schema on the SQL Server target instance. The below SQL snippet also creates and populates a small table called ‘tabSchemaObjectsExclude’ on the ‘dbo’ schema which stores table names we don’t want to import. The reason for this exclusion is that these tables do not contain the primary keys, therefore it is impossible to compare the two schemas using the code below which relies on primary key being defined on every single table object.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | --CREATE 'TableauDBCopy' DATABASE AND 'tab' SCHEMA ON THE TARGET INSTANCE/SERVER USE [master]; GO IF EXISTS ( SELECT name FROM sys.databases WHERE name = N 'TableauDBCopy' ) BEGIN ALTER DATABASE TableauDBCopy SET SINGLE_USER WITH ROLLBACK IMMEDIATE; DROP DATABASE TableauDBCopy; END ; GO CREATE DATABASE TableauDBCopy ON ( NAME = 'TableauDBCopy_dat' , FILENAME = 'D:\SQLData\MSSQL12.ServerName\MSSQL\DATA\TableauDBCopy.mdf' , SIZE = 500MB, MAXSIZE = 2000MB, FILEGROWTH = 100 ) LOG ON ( NAME = 'TableauDBCopy_log' , FILENAME = 'D:\SQLData\MSSQL12.ServerName\MSSQL\DATA\TableauDBCopy.ldf' , SIZE = 100MB, MAXSIZE = 1000MB, FILEGROWTH = 50MB ); GO EXEC TableauDBCopy.dbo.sp_changedbowner @loginame = N 'SA' , @map = false ; GO ALTER DATABASE TableauDBCopy SET RECOVERY SIMPLE; GO USE TableauDBCopy; GO CREATE SCHEMA tab AUTHORIZATION dbo; GO --CREATE EXCEPTION 'tabSchemaObjectsExclude' TABLE AND POPULATE IT WITH EXCEPTION DATA CREATE TABLE dbo.tabSchemaObjectsExclude (ObjectName VARCHAR (256)) GO INSERT INTO dbo.tabSchemaObjectsExclude( ObjectName ) SELECT 'dataengine_configurations' UNION ALL SELECT 'exportable_repository_id_columns' UNION ALL SELECT 'exportable_tables_column_transformations' UNION ALL SELECT 'monitoring_dataengine' UNION ALL SELECT 'monitoring_postgresql' UNION ALL SELECT 'permission_reasons' UNION ALL SELECT 'schema_migrations' UNION ALL SELECT 'users_view' |
Next, let’s look at a simple stored procedure which compares the source schema (Tableau server public schema on the PostgreSQL database) with the target schema (our newly created SQL Server database with the ‘tab’ schema). This code is used to interrogate both databases for their ‘compatibility’ and tables metadata structure e.g. data types, character lengths, NULL-ablity, precision, scale etc. and if the target object(s) are found to be missing or out of sync with the source version, it creates a DROP and a CREATE table SQL DDLs statement on the fly and applies the changes directly in the target environment.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 | USE [TableauDBCopy] GO SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO CREATE PROCEDURE [dbo].[usp_checkRemoteTableauServerTablesSchemaChanges] ( @Remote_Server_Name VARCHAR (256), @Remote_Server_DB_Name VARCHAR (128), @Remote_Server_DB_Schema_Name VARCHAR (128), @Target_DB_Name VARCHAR (128), @Target_DB_Schema_Name VARCHAR (128), @Is_All_OK INT OUTPUT , @Process_Name VARCHAR (250) OUTPUT , @Error_Message VARCHAR ( MAX ) OUTPUT ) WITH RECOMPILE AS SET NOCOUNT ON BEGIN DECLARE @Is_ReCheck BIT = 0 DECLARE @SQL NVARCHAR ( MAX ) DECLARE @Is_Debug_Mode BIT = 1 DECLARE @Remote_Server_Tableau VARCHAR (55) = 'TABPOSTGRESQLPROD' SET @Process_Name = ( SELECT OBJECT_NAME(objectid) FROM sys.dm_exec_requests r CROSS APPLY sys.dm_exec_sql_text(r.sql_handle) a WHERE session_id = @@spid ) IF OBJECT_ID( 'tempdb..#t_seqfloats' ) IS NOT NULL BEGIN DROP TABLE #t_seqfloats END ; WITH Nbrs_3 ( n ) AS ( SELECT 1 UNION SELECT 0 ), Nbrs_2 ( n ) AS ( SELECT 1 FROM Nbrs_3 n1 CROSS JOIN Nbrs_3 n2 ), Nbrs_1 ( n ) AS ( SELECT 1 FROM Nbrs_2 n1 CROSS JOIN Nbrs_2 n2 ), Nbrs_0 ( n ) AS ( SELECT 1 FROM Nbrs_1 n1 CROSS JOIN Nbrs_1 n2 ), Nbrs ( n ) AS ( SELECT 1 FROM Nbrs_0 n1 CROSS JOIN Nbrs_0 n2 ) SELECT 'float' + CAST (n AS VARCHAR (2)) seq_floats INTO #t_seqfloats FROM ( SELECT ROW_NUMBER() OVER ( ORDER BY n ) FROM Nbrs ) D ( n ) WHERE n <= 53 UNION ALL SELECT 'float' Check_RemoteSvr_Schema: IF OBJECT_ID( 'tempdb..#t_allTblMetadata' ) IS NOT NULL BEGIN DROP TABLE [#t_allTblMetadata] END CREATE TABLE tempdb..[#t_allTblMetadata] ( table_name VARCHAR (256), column_name VARCHAR (256), ordinal_position INT , is_nullable BIT , data_type VARCHAR (256) , character_maximum_length BIGINT , numeric_scale SMALLINT , numeric_precision SMALLINT , is_primary_key BIT , local_schema_name VARCHAR (55), remote_schema_name VARCHAR (55), local_or_remote VARCHAR (25) ) SET @SQL = ' INSERT INTO #t_allTblMetadata ( [table_name] ,[column_name] ,[ordinal_position] ,[is_nullable] ,[data_type] ,[character_maximum_length] ,[numeric_scale] ,[numeric_precision] ,[is_primary_key] ,[local_schema_name] ,[remote_schema_name] ,[local_or_remote] ) SELECT LTRIM(RTRIM(a.table_name)) AS table_name, LTRIM(RTRIM(a.column_name)) AS column_name, LTRIM(RTRIM(a.ordinal_position)) AS ordinal_position, CASE WHEN a.is_nullable = ' 'YES' ' THEN 1 ELSE 0 END AS is_nullable, LTRIM(RTRIM(a.udt_name)) AS data_type, --a.data_type AS data_type, LTRIM(RTRIM(a.character_maximum_length)) AS character_maximum_length, LTRIM(RTRIM(a.numeric_scale)) AS numeric_scale, LTRIM(RTRIM(a.numeric_precision)) AS numeric_precision, --b.primary_key_definition, CASE WHEN b.PK_column_name IS NULL THEN 0 ELSE 1 END AS is_primary_key, ' 'tab' ' AS local_schema_name, LTRIM(RTRIM(a.table_schema)) AS remote_schema_name, ' 'remote' ' AS local_or_remote FROM OPENQUERY(' +@remote_server_name+ ', ' 'select c.table_name, c.column_name, c.ordinal_position, c.is_nullable, c.data_type, c.udt_name , c.character_maximum_length, c.numeric_scale, c.numeric_precision, c.table_schema from information_schema.columns c where c.table_catalog = ' '' 'workgroup' '' ' and c.table_schema = ' '' 'public' '' '' ') a LEFT JOIN OPENQUERY(' +@remote_server_name+ ', ' 'select cl.relname as table_name, co.conname as constraint_name, co.contype conatraint_type, pg_get_constraintdef(co.oid) AS primary_key_definition , ns.nspname as schema_name, pa.attname as PK_column_name from pg_class cl join pg_constraint co on cl.oid = co.conrelid join pg_namespace ns on cl.relnamespace = ns.oid join pg_attribute pa on pa.attrelid = cl.oid and pa.attnum = co.conkey[1] where co.contype = ' '' 'p' '' ' and cl.relkind=' '' 'r' '' ' and ns.nspname = ' '' 'public' '' '' ') b ON a.table_name = b.table_name AND a.table_schema = b.[schema_name] AND a.column_name = b.PK_column_name WHERE SUBSTRING(a.table_name, 1, 1) <> ' '_' ' AND SUBSTRING(a.table_name, 1, 7) <> ' 'orphans' ' AND NOT EXISTS (SELECT objectname FROM TableauDBCopy.dbo.tabSchemaObjectsExclude o WHERE o.objectname = a.table_name) ORDER BY a.table_name, a.ordinal_position' IF @Is_Debug_Mode = 1 BEGIN PRINT CHAR (13) + 'SQL statement for acquiring ' 'source' ' tables metadata into #t_allTblMetadata temp table:' PRINT '-----------------------------------------------------------------------------------------------' PRINT @SQL +REPLICATE( CHAR (13),2) END EXEC (@SQL) IF @Is_Debug_Mode = 1 BEGIN SELECT '#t_allTblMetadata table content for remote objects metadata:' AS 'HINT' SELECT * FROM #t_allTblMetadata WHERE local_or_remote = 'Remote' ORDER BY table_name, ordinal_position END IF @Is_ReCheck = 1 BEGIN GOTO Check_Local_Schema END Check_Local_Schema: SET @SQL = 'INSERT INTO #t_allTblMetadata ( [table_name] ,[column_name] ,[ordinal_position] ,[is_nullable] ,[data_type] ,[character_maximum_length] ,[numeric_scale] ,[numeric_precision] ,[is_primary_key] ,[local_schema_name] ,[remote_schema_name] ,[local_or_remote] ) SELECT t.name AS table_name , c.name AS column_name , c.column_id AS ordinal_position , c.is_nullable , tp.name AS data_type , c.max_length AS character_maximum_length , c.scale AS numeric_scale , c.precision AS numeric_precision , ISNULL(idx.pk_flag,0) as ' 'is_primary_key' ' , ss.name , ' 'public' ' , ' 'local' ' AS local_or_remote FROM sys.tables t JOIN sys.columns c ON t.object_id = c.object_id JOIN sys.types tp ON c.user_type_id = tp.user_type_id JOIN sys.objects so ON so.object_id = t.object_id JOIN sys.schemas ss ON so.schema_id = ss.schema_id LEFT JOIN (select i.name as index_name, i.is_primary_key as pk_flag, OBJECT_NAME(ic.OBJECT_ID) AS table_name, COL_NAME(ic.OBJECT_ID,ic.column_id) AS column_name FROM sys.indexes AS i INNER JOIN sys.index_columns AS ic ON i.OBJECT_ID = ic.OBJECT_ID AND i.index_id = ic.index_id WHERE i.is_primary_key = 1) idx on idx.table_name = t.name and idx.column_name = c.name JOIN INFORMATION_SCHEMA.TABLES tt on tt.table_schema = ss.name and tt.table_name = t.name WHERE t.type = ' 'u' ' AND tt.TABLE_CATALOG = ' '' +@Target_DB_Name+ '' ' AND ss.name = ' '' +@Target_DB_Schema_Name+ '' '' IF @Is_Debug_Mode = 1 BEGIN PRINT 'SQL statement for acquiring ' 'target' ' tables metadata into #t_allTblMetadata temp table:' PRINT '-----------------------------------------------------------------------------------------------' PRINT @SQL +REPLICATE( CHAR (13),2) END EXEC (@SQL) IF @Is_Debug_Mode = 1 BEGIN SELECT '#t_allTblMetadata table content for local objects metadata:' AS 'HINT' SELECT * FROM #t_allTblMetadata WHERE local_or_remote = 'local' ORDER BY table_name, ordinal_position END IF OBJECT_ID( 'tempdb..#t_sql' ) IS NOT NULL BEGIN DROP TABLE [#t_sql] END SELECT DISTINCT t1.table_name AS Table_Name , t1.local_schema_name AS Local_Schema_Name , 'create table [' + t1.local_schema_name + '].[' + LOWER (t1.table_name) + '] (' + STUFF(o.list, LEN(o.list), 1, '' ) + ')' + CASE WHEN t2.is_primary_key = 0 THEN '' ELSE '; ALTER TABLE [' + t1.local_schema_name + '].[' + t1.table_name + '] ' + ' ADD CONSTRAINT pk_' + LOWER (t1.local_schema_name) + '_' + LOWER (t2.table_name) + '_' + LOWER ( REPLACE (t2.pk_column_names, ',' , '_' )) + ' PRIMARY KEY CLUSTERED ' + '(' + LOWER (t2.pk_column_names) + ')' END AS Create_Table_Schema_Definition_SQL , 'if object_id (' '[' + t1.local_schema_name + '].[' + t1.table_name + ']' + '' ', ' 'U' ') IS NOT NULL drop table [' + t1.local_schema_name + '].[' + t1.table_name + ']' AS Drop_Table_SQL INTO #t_sql FROM #t_allTblMetadata t1 CROSS APPLY ( SELECT '[' + column_name + '] ' + CASE WHEN data_type IN ( 'bigint' , 'int8' , 'bigserial' ) THEN 'bigint' WHEN data_type IN ( 'integer' , 'serial4' , 'serial' , 'int4' , 'int' , 'oid' ) THEN 'int' WHEN data_type IN ( 'smallint' , 'serial2' , 'smallserial' , 'int2' ) THEN 'smallint' WHEN data_type IN ( 'uuid' ) THEN 'uniqueidentifier' WHEN data_type IN ( 'bool' , 'boolean' ) THEN 'bit' WHEN data_type IN ( 'timestamp' , 'timestamptz' ) THEN 'datetime' WHEN data_type IN ( 'bytea' , 'json' , 'text' , 'varchar' ) THEN 'nvarchar' WHEN data_type IN ( SELECT * FROM #t_seqfloats ) THEN 'float' ELSE data_type END + CASE WHEN data_type IN ( 'int2' , 'int4' , 'int8' , 'oid' , 'timestamp' , 'uuid' , 'bool' ) THEN '' WHEN data_type IN ( 'text' , 'json' , 'bytea' ) OR (data_type = 'varchar' and character_maximum_length IS NULL ) OR character_maximum_length > 8000 THEN '(max)' WHEN data_type = 'decimal' THEN '(' + CAST (numeric_precision AS VARCHAR ) + ', ' + CAST (numeric_scale AS VARCHAR ) + ')' WHEN data_type in ( SELECT * FROM #t_seqfloats) THEN '(53)' ELSE COALESCE ( '(' + CAST (character_maximum_length AS VARCHAR ) + ')' , '' ) END + ' ' +( CASE WHEN is_nullable = 0 THEN 'NOT ' ELSE '' END ) + 'NULL' + ',' FROM #t_allTblMetadata WHERE table_name = t1.table_name AND local_or_remote = 'Remote' ORDER BY ordinal_position FOR XML PATH( '' ) ) o ( list ) JOIN ( SELECT table_name , is_primary_key , pk_column_names , column_name = REVERSE( RIGHT (REVERSE(pk_column_names), LEN(pk_column_names) - CHARINDEX( ',' , REVERSE(pk_column_names)))) FROM ( SELECT table_name , is_primary_key , pk_column_names = STUFF(( SELECT ',' + CAST (column_name AS VARCHAR (500)) FROM #t_allTblMetadata z2 WHERE z1.table_name = z2.table_name AND z2.is_primary_key = 1 AND z2.local_or_remote = 'Remote' ORDER BY z2.column_name ASC FOR XML PATH( '' ) ), 1, 1, '' ) FROM #t_allTblMetadata z1 WHERE z1.is_primary_key = 1 AND z1.local_or_remote = 'Remote' GROUP BY z1.table_name , z1.is_primary_key ) a ) t2 ON t1.table_name = t2.table_name WHERE t1.local_schema_name <> 'unknown' and t1.local_or_remote = 'Remote' IF @Is_Debug_Mode = 1 BEGIN SELECT '#t_sql table content:' AS 'HINT' SELECT * FROM #t_sql ORDER BY Table_Name END IF @Is_ReCheck = 1 BEGIN GOTO Do_Table_Diff END Do_Table_Diff: IF OBJECT_ID( 'tempdb..#t_diff' ) IS NOT NULL BEGIN DROP TABLE [#t_diff] END ; WITH Temp_CTE ( table_name, column_name, is_nullable, data_type, local_schema_name, is_primary_key, character_maximum_length, numeric_scale, numeric_precision ) AS ( SELECT table_name = m.table_name , column_name = m.column_name , is_nullable = m.is_nullable , data_type = CASE WHEN m.data_type IN ( 'bigint' , 'int8' , 'bigserial' ) THEN 'bigint' WHEN m.data_type IN ( 'integer' , 'serial4' , 'serial' , 'int4' , 'int' , 'oid' ) THEN 'int' WHEN m.data_type IN ( 'smallint' , 'serial2' , 'smallserial' , 'int2' ) THEN 'smallint' WHEN m.data_type IN ( 'uuid' ) THEN 'uniqueidentifier' WHEN m.data_type IN ( 'bool' , 'boolean' ) THEN 'bit' WHEN m.data_type IN ( 'timestamp' , 'timestamptz' ) THEN 'datetime' WHEN m.data_type IN ( 'bytea' , 'json' , 'text' , 'varchar' ) THEN 'nvarchar' WHEN m.data_type IN ( SELECT * FROM #t_seqfloats ) THEN 'float' ELSE m.data_type END , local_schema_name = m.local_schema_name , is_primary_key = m.is_primary_key , character_maximum_length = COALESCE ( CASE WHEN m.data_type IN ( 'text' , 'json' , 'bytea' ) OR (m.data_type = 'varchar' and m.character_maximum_length IS NULL ) OR m.character_maximum_length > 8000 THEN 'max' ELSE CAST (m.character_maximum_length AS VARCHAR ) END , constants.character_maximum_length , CAST (l.character_maximum_length AS VARCHAR )), numeric_scale = COALESCE ( constants.numeric_scale, CAST (m.numeric_scale AS VARCHAR ), CAST (l.numeric_scale AS VARCHAR )), numeric_precision = COALESCE ( constants.numeric_precision, CAST (m.numeric_precision AS VARCHAR ), CAST (l.numeric_precision AS VARCHAR )) FROM #t_allTblMetadata m LEFT JOIN ( SELECT 'char' AS data_type , NULL AS character_maximum_length , 0 AS numeric_scale , 0 AS numeric_precision UNION ALL SELECT 'varchar' , NULL , '0' , '0' UNION ALL SELECT 'time' , '5' , '7' , '16' UNION ALL SELECT 'date' , '3' , '0' , '10' UNION ALL SELECT 'datetime' , '8' , '3' , '23' UNION ALL SELECT 'datetime2' , '8' , '7' , '27' UNION ALL SELECT 'smalldatetime' , '4' , '0' , '16' UNION ALL SELECT 'bit' , '1' , '0' , '1' UNION ALL SELECT 'float' , '8' , '0' , '53' UNION ALL SELECT 'money' , '8' , '4' , '19' UNION ALL SELECT 'smallmoney' , '4' , '4' , '10' UNION ALL SELECT 'uniqueidentifier' , '16' , '0' , '0' UNION ALL SELECT 'xml' , 'max' , '0' , '0' UNION ALL SELECT 'numeric' , '9' , '0' , '18' UNION ALL SELECT 'real' , '4' , '0' , '24' UNION ALL SELECT 'tinyint' , '1' , '0' , '3' UNION ALL SELECT 'smallint' , '2' , '0' , '5' UNION ALL SELECT 'int' , '4' , '0' , '10' UNION ALL SELECT 'bigint' , '8' , '0' , '19' ) constants ON ( CASE WHEN m.data_type IN ( 'bigint' , 'int8' , 'bigserial' ) THEN 'bigint' WHEN m.data_type IN ( 'integer' , 'serial4' , 'serial' , 'int4' , 'int' , 'oid' ) THEN 'int' WHEN m.data_type IN ( 'smallint' , 'serial2' , 'smallserial' , 'int2' ) THEN 'smallint' WHEN m.data_type IN ( 'uuid' ) THEN 'uniqueidentifier' WHEN m.data_type IN ( 'bool' , 'boolean' ) THEN 'bit' WHEN m.data_type IN ( 'timestamp' , 'timestamptz' ) THEN 'datetime' WHEN m.data_type IN ( 'bytea' , 'json' , 'text' , 'varchar' ) THEN 'nvarchar' WHEN m.data_type IN ( SELECT * FROM #t_seqfloats ) THEN 'float' ELSE m.data_type END ) = constants.data_type LEFT JOIN #t_allTblMetadata l ON l.column_name = m.column_name AND l.table_name = m.table_name AND l.data_type = ( CASE WHEN m.data_type IN ( 'bigint' , 'int8' , 'bigserial' ) THEN 'bigint' WHEN m.data_type IN ( 'integer' , 'serial4' , 'serial' , 'int4' , 'int' , 'oid' ) THEN 'int' WHEN m.data_type IN ( 'smallint' , 'serial2' , 'smallserial' , 'int2' ) THEN 'smallint' WHEN m.data_type IN ( 'uuid' ) THEN 'uniqueidentifier' WHEN m.data_type IN ( 'bool' , 'boolean' ) THEN 'bit' WHEN m.data_type IN ( 'timestamp' , 'timestamptz' ) THEN 'datetime' WHEN m.data_type IN ( 'bytea' , 'json' , 'text' , 'varchar' ) THEN 'nvarchar' WHEN m.data_type IN ( SELECT * FROM #t_seqfloats ) THEN 'float' ELSE m.data_type END ) AND l.local_or_remote = 'Local' WHERE m.local_or_remote = 'Remote' EXCEPT SELECT table_name , column_name , is_nullable , data_type , local_schema_name , is_primary_key , CASE WHEN character_maximum_length > 8000 OR character_maximum_length = -1 THEN 'max' WHEN data_type IN ( 'nvarchar' , 'nchar' ) THEN CAST (character_maximum_length/2 AS VARCHAR ) ELSE CAST (character_maximum_length AS VARCHAR ) END AS character_maximum_length, numeric_scale , numeric_precision FROM #t_allTblMetadata WHERE local_or_remote = 'Local' ) SELECT DISTINCT table_name , local_schema_name INTO #t_diff FROM Temp_CTE IF @Is_Debug_Mode = 1 BEGIN SELECT '#t_diff table content:' AS 'HINT' SELECT * FROM #t_diff END IF @Is_ReCheck = 1 GOTO Results Run_SQL: IF NOT EXISTS ( SELECT DISTINCT Table_Name , Local_Schema_Name FROM #t_sql a WHERE EXISTS ( SELECT table_name FROM #t_diff i WHERE a.Table_Name = i.table_name ) ) BEGIN GOTO Schema_Diff_ReCheck END ELSE BEGIN DECLARE @schema_name VARCHAR (50) DECLARE @table_name VARCHAR (256) DECLARE @sql_select_dropcreate NVARCHAR( MAX ) DECLARE db_cursor CURSOR FORWARD_ONLY FOR SELECT DISTINCT Table_Name , Local_Schema_Name FROM #t_sql a WHERE EXISTS ( SELECT table_name FROM #t_diff i WHERE a.Table_Name = i.table_name ) OPEN db_cursor FETCH NEXT FROM db_cursor INTO @table_name, @schema_name WHILE @@FETCH_STATUS = 0 BEGIN BEGIN TRY BEGIN TRANSACTION SET @sql_select_dropcreate = ( SELECT Drop_Table_SQL FROM #t_sql WHERE Table_Name = @table_name ) + '; ' + CHAR (13) + ( SELECT Create_Table_Schema_Definition_SQL FROM #t_sql WHERE Table_Name = @table_name ) + REPLICATE( CHAR (13),2) IF @Is_Debug_Mode = 1 BEGIN PRINT 'SQL statement for dropping/recreating ' 'source' ' table(s):' PRINT '-----------------------------------------------------------------------------------------------' PRINT @sql_select_dropcreate END EXEC sp_sqlexec @sql_select_dropcreate --SET @Is_All_OK = 1 SET @Error_Message = 'All Good!' COMMIT TRANSACTION END TRY BEGIN CATCH IF @@TRANCOUNT > 0 ROLLBACK TRANSACTION ; SET @Is_All_OK = 0 SET @Error_Message = 'This operation has been unexpectandly terminated due to error: ' '' + ERROR_MESSAGE() + '' ' at line ' + CAST (ERROR_LINE() AS VARCHAR ); END CATCH FETCH NEXT FROM db_cursor INTO @table_name,@schema_name END CLOSE db_cursor DEALLOCATE db_cursor SET @Is_ReCheck = 1 END Schema_Diff_ReCheck: IF @Is_ReCheck = 1 BEGIN GOTO Check_RemoteSvr_Schema END Results: IF EXISTS ( SELECT TOP 1 * FROM #t_diff ) BEGIN SET @Is_All_OK = 0 SET @Error_Message = 'Table schema reconciliation between ' + '' + @@SERVERNAME + '' + ' and remote database on ' '' +@remote_server_name+ '' '' + CHAR (10) SET @Error_Message = @Error_Message + 'failed. Please troubleshoot.' END ELSE BEGIN SET @Is_All_OK = 1 SET @Error_Message = 'All Good!' END IF OBJECT_ID( 'tempdb..#t_seqfloats' ) IS NOT NULL BEGIN DROP TABLE #t_seqfloats END IF OBJECT_ID( 'tempdb..#t_allTblMetadata' ) IS NOT NULL BEGIN DROP TABLE [#t_allTblMetadata] END IF OBJECT_ID( 'tempdb..#t_sql' ) IS NOT NULL BEGIN DROP TABLE [#t_sql] END IF OBJECT_ID( 'tempdb..#t_sql' ) IS NOT NULL BEGIN DROP TABLE [#t_diff] END END |
Finally, we are ready to load the Tableau PostgreSQL data into the ‘TableauDBCopy’ database tables on the ‘tab’ schema. For that we can use SQL Server Integration Services but since the ‘workgroup’ database is quite small in size and most tables have a primary key defined on them, we can load the data in a sequential order i.e. table by table using a modified version of my database replication stored procedure which I described in one of my previous blog posts HERE. The stored procedure works in a similar fashion to the one described previously but allowances needed to be made in order to enable PostgreSQL and SQL Server data types and certain conventions conformance e.g. certain PostgreSQL reserved words need to be encapsulated in double quotes in the OPENQUERY statements in order to be validated and recognized by SQL Server. Likewise, certain SQL Server reserved words need to be used with square brackets delimiters. To reference those exceptions, I have created two views (downloadable from HERE) which are used in the merging stored procedure to provide greater cross-database compatibility. The code to the stored procedure works on a table-to-table basis but it it’s would not very hard to make it loop through a collection of objects e.g. using a cursor or ‘Foreach Loop’ SSIS transformation to automate a comprehensive data load. If, on the other hand, a much faster, asynchronous load is required you can always check out one of my previous blog posts on parallel SQL statements execution using SQL Server Agent jobs HERE. All the code for the data synchronization across Tableau’s PostgreSQL database and SQL Server instance as well as other T-SQL snippets presented in the post can be downloaded from my OneDrive folder HERE.
Below is a short video depicting how this solution works using both – schema synchronisation and data synchronisation stored procedures.
The ‘workgroup’ database data dictionary with all PostgreSQL objects description can be found HERE.
http://scuttle.org/bookmarks.php/pass?action=addThis entry was posted on Monday, December 7th, 2015 at 9:58 am and is filed under Data Modelling, SQL, SQL Server, Tableau. You can follow any responses to this entry through the RSS 2.0 feed. You can leave a response, or trackback from your own site.
Benoit March 21st, 2018 at 1:24 pm
Hi Martin,
Is the schema you posted is for Tableau 10.5 ?
Thanks
Ben