{"id":3755,"date":"2019-02-06T05:57:54","date_gmt":"2019-02-06T05:57:54","guid":{"rendered":"http:\/\/bicortex.com\/?p=3755"},"modified":"2020-05-03T21:46:56","modified_gmt":"2020-05-03T11:46:56","slug":"designing-a-historised-relational-data-lake-how-to-speed-up-data-warehouse-development-and-not-stress-about-upfront-data-modelling","status":"publish","type":"post","link":"https:\/\/bicortex.com\/bicortex\/designing-a-historised-relational-data-lake-how-to-speed-up-data-warehouse-development-and-not-stress-about-upfront-data-modelling\/","title":{"rendered":"Designing a historised, relational &#8216;data lake&#8217; \u2013 how to speed up data warehouse development and not stress about upfront data modelling"},"content":{"rendered":"<h3 style=\"text-align: center;\">Problem Statement<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">Data warehouse modelling techniques have largely been left unchanged for decades and most of common methodologies and patterns which were prevalent when I was starting my career in Business Intelligence are still applicable to most projects. In the early days, Inmon created the accepted definition of what a data warehouse is &#8211; a subject-oriented, non-volatile, integrated, time-variant collection of data in support of management&#8217;s decisions. Inmon approach to enterprise data warehousing assumes that tables are grouped together by subject areas that reflect general data categories (e.g. data on customers, products, finance, etc.). The normalized structure divides data into entities, which creates several tables in a relational database. When applied in large enterprises the result is dozens of tables that are linked together by a web of joins, with departmental data marts created for ease of use. His approach was often criticized for being top-heavy, complex to execute and generally requiring a lot more planning and up-front work. Ralph Kimbal, another prolific figure and the author of Kimbal approach to data warehouse architecture advocated for a simplified approach, where only denormalised data marts were created to satisfy business requirements which carried the benefit of being easier to understand, set-up and develop. However, just as Inmon methodology was often criticized, Kimbal\u2019s approach was not without its faults e.g. data redundancy and maintaining integrity of facts and dimensions can be a challenge. Over the last decade, a third approach to data warehouse modelling started to make inroads. Data vault, a hybrid paradigm that combines the best of 3rd Normal Form (3NF) and dimension modeling, was conceived to address agility, flexibility, and scalability problems found in the other main stream data modelling approaches. It simplifies the data ingestion process, removes the cleansing requirement of a Star Schema, puts the focus on the real problem instead of programming around it and finally, it allows for the addition of new data sources integration without disruption to existing schema. However, as with other two modelling approaches, Data Vault has its fair share of disadvantages and it is not a panacea for all business data needs e.g. increase number of joins, not intended for ad-hoc end user access (including BI tools and OLAP), higher than average number of database and ETL objects etc.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">The main issue with data being modelled in a very specific way is that organisations have to undergo a very rigorous and comprehensive scoping exercise to ensure that either of the three modelling methodologies disadvantages will not prohibit them from extracting intelligence out of the data they collect and allow for the most flexible, scalable and long-term support for their analytical platform they provisioning. Data warehouse projects, particularly the enterprise-wide ones, are an expensive and risky endeavour, often taking large amount of resources and years to complete. Even the most data-literate, mature and information management-engaged organizations embarking on the data warehouse built journey often fall prey to constraining themselves to structure their critical data assets in a very prescriptive and rigid way. As a result, once their enterprise data has been structured and formatted according to a specific methodology, it becomes very hard to go back to data\u2019s \u2018raw\u2019 state. This often means a compromised and biased view of the business functions which does not fully reflect the answers the business is trying to find. This myopic portrayal of the data, in best case scenarios leads to creating other, disparate and siloed data sources (often by shadow BI\/IT or untrusting end-users), whereas in extreme cases, the answers provided are unreliable and untrustworthy.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">As such, over the last few years I have observed more and more business taking a completely different approach and abandon or at least minimise data warehouse-specific modelling techniques in favour of maintaining a historised data lake and creating a number of virtual data marts on top of it to satisfy even the most bespoke business requirements. With on-demand cloud computing becoming more prevalent and very powerful hardware able to crunch even copious amounts of raw data in seconds, the need for meticulously designed star or snowflake schema is slowly becoming secondary to how quickly organisations can get access to their data. With in-memory caching, solid state storage and distributed computing, for the first time ever we are witnessing old application design patterns and data modelling techniques becoming the bottleneck for the speed at which data can be processed. Also, with the ever-growing number of data sources to integrate with and data volumes, velocity and variety on the increase, data warehouse modelling paradigms are becoming more of a hindrance, constricting organisations to follow a standard or convention designed decades ago. As a result, what we\u2019re witnessing is the following two scenarios for corporate data management playing out with more frequency:<\/p>\n<ul>\n<li style=\"text-align: justify;\">Enterprises looking for providing federation capabilities to support Logical Data Warehouse architecture across multiple platforms without having to move data and deal with complexities such as different technologies, formats, schemas, security protocols, locations etc.<\/li>\n<li style=\"text-align: justify;\">Enterprises looking for consolidating all their data sources in a single place with the speed and agility required to move at a quicker pace, without the constrain of modelling data in a prescriptive, narrow and methodical way as to not throw away the richness of the narrative the raw data may provide long-term<\/li>\n<\/ul>\n<h3 style=\"text-align: center;\">Implementation<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">There are many patterns which can facilitate implementing the data lake but in this post we will look at creating four databases which together will handle transient application data (as acquired from the source system), staged data and finally data containing all transitional changes (inserts, deletes and updates) across all history. This design allows for a clever way of persisting all application data changes (one the core requirements of any data warehouse, irrespective of modelling approach taken) along with providing robust foundations for provisioning virtual data marts to satisfy reporting and analytical data needs. The virtual data marts can be easily implemented as views, with the required business rules embedded inside the SQL statements and, if required for performance reasons, materialised or physically persisted on disk or in memory.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">The following diagram depicts the high-level architecture used in this scenario and the logical placement of EDW_Landing database used for initial data load, EDW_Staging database used for storing all changes, EDW_History database storing current view of the data and finally EDW_History_Archive storing all history.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2019\/04\/DW__DataLake_Architecture_Diagram.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-3771\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2019\/04\/DW__DataLake_Architecture_Diagram.png\" alt=\"\" width=\"580\" height=\"617\" srcset=\"https:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2019\/04\/DW__DataLake_Architecture_Diagram.png 647w, https:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2019\/04\/DW__DataLake_Architecture_Diagram-282x300.png 282w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">To provide a tangible case scenario, let\u2019s create a sample environment with the after mentioned database using Microsoft SQL Server platform. The following script can be used to create the required data stores along with all the objects and dummy data.<\/p>\n<pre class=\"brush: sql; collapse: true; light: false; title: (+) Expand To View Environment Built SQL; toolbar: true; notranslate\" title=\"(+) Expand To View Environment Built SQL\">\r\n\/*========================================================================================================\r\nSTEP 1\r\nCreate EDW_Landing, EDW_Staging, EDW_History, EDW_History_Archive databases on the local instance\r\n========================================================================================================*\/\r\nSET NOCOUNT ON;\r\nGO\r\nUSE master;\r\nGO\r\nIF EXISTS (SELECT name FROM sys.databases WHERE name = N'EDW_Landing')\r\nBEGIN\r\n    -- Close connections to the EDW_Landing database\r\n    ALTER DATABASE EDW_Landing SET SINGLE_USER WITH ROLLBACK IMMEDIATE;\r\n    DROP DATABASE EDW_Landing;\r\nEND;\r\nGO\r\n-- Create SampleDB database and log files\r\nCREATE DATABASE EDW_Landing\r\nON PRIMARY\r\n       (\r\n           NAME = N'EDW_Landing',\r\n           FILENAME = N'D:\\Program Files\\Microsoft SQL Server\\MSSQL13.HNODWDEV\\MSSQL\\DATA\\EDW_Landing.mdf',\r\n           SIZE = 10MB,\r\n           MAXSIZE = 1GB,\r\n           FILEGROWTH = 10MB\r\n       )\r\nLOG ON\r\n    (\r\n        NAME = N'EDW_Landing_log',\r\n        FILENAME = N'D:\\Program Files\\Microsoft SQL Server\\MSSQL13.HNODWDEV\\MSSQL\\DATA\\EDW_Landing_log.ldf',\r\n        SIZE = 1MB,\r\n        MAXSIZE = 1GB,\r\n        FILEGROWTH = 10MB\r\n    );\r\nGO\r\n--Assign database ownership to login SA\r\nEXEC EDW_Landing.dbo.sp_changedbowner @loginame = N'SA', @map = false;\r\nGO\r\n--Change the recovery model to BULK_LOGGED\r\nALTER DATABASE EDW_Landing SET RECOVERY SIMPLE;\r\nGO\r\n\r\n\r\nIF EXISTS (SELECT name FROM sys.databases WHERE name = N'EDW_Staging')\r\nBEGIN\r\n    -- Close connections to the EDW_Staging database\r\n    ALTER DATABASE EDW_Staging SET SINGLE_USER WITH ROLLBACK IMMEDIATE;\r\n    DROP DATABASE EDW_Staging;\r\nEND;\r\nGO\r\n-- Create EDW_Staging database and log files\r\nCREATE DATABASE EDW_Staging\r\nON PRIMARY\r\n       (\r\n           NAME = N'EDW_Staging',\r\n           FILENAME = N'D:\\Program Files\\Microsoft SQL Server\\MSSQL13.HNODWDEV\\MSSQL\\DATA\\EDW_Staging.mdf',\r\n           SIZE = 10MB,\r\n           MAXSIZE = 1GB,\r\n           FILEGROWTH = 10MB\r\n       )\r\nLOG ON\r\n    (\r\n        NAME = N'EDW_Staging_log',\r\n        FILENAME = N'D:\\Program Files\\Microsoft SQL Server\\MSSQL13.HNODWDEV\\MSSQL\\DATA\\EDW_Staging_log.ldf',\r\n        SIZE = 1MB,\r\n        MAXSIZE = 1GB,\r\n        FILEGROWTH = 10MB\r\n    );\r\nGO\r\n--Assign database ownership to login SA\r\nEXEC EDW_Staging.dbo.sp_changedbowner @loginame = N'SA', @map = false;\r\nGO\r\n--Change the recovery model to BULK_LOGGED\r\nALTER DATABASE EDW_Staging SET RECOVERY SIMPLE;\r\nGO\r\n\r\n\r\nIF EXISTS (SELECT name FROM sys.databases WHERE name = N'EDW_History')\r\nBEGIN\r\n    -- Close connections to the EDW_History database\r\n    ALTER DATABASE EDW_History SET SINGLE_USER WITH ROLLBACK IMMEDIATE;\r\n    DROP DATABASE EDW_History;\r\nEND;\r\nGO\r\n-- Create EDW_History database and log files\r\nCREATE DATABASE EDW_History\r\nON PRIMARY\r\n       (\r\n           NAME = N'EDW_History',\r\n           FILENAME = N'D:\\Program Files\\Microsoft SQL Server\\MSSQL13.HNODWDEV\\MSSQL\\DATA\\EDW_History.mdf',\r\n           SIZE = 10MB,\r\n           MAXSIZE = 1GB,\r\n           FILEGROWTH = 10MB\r\n       )\r\nLOG ON\r\n    (\r\n        NAME = N'EDW_History_log',\r\n        FILENAME = N'D:\\Program Files\\Microsoft SQL Server\\MSSQL13.HNODWDEV\\MSSQL\\DATA\\EDW_History_log.ldf',\r\n        SIZE = 1MB,\r\n        MAXSIZE = 1GB,\r\n        FILEGROWTH = 10MB\r\n    );\r\nGO\r\n--Assign database ownership to login SA\r\nEXEC EDW_History.dbo.sp_changedbowner @loginame = N'SA', @map = false;\r\nGO\r\n--Change the recovery model to BULK_LOGGED\r\nALTER DATABASE EDW_History SET RECOVERY SIMPLE;\r\nGO\r\n\r\n\r\n\r\n\r\nIF EXISTS\r\n(\r\n    SELECT name\r\n    FROM sys.databases\r\n    WHERE name = N'EDW_History_Archive'\r\n)\r\nBEGIN\r\n    -- Close connections to the EDW_History_Archive database\r\n    ALTER DATABASE EDW_History_Archive\r\n    SET SINGLE_USER\r\n    WITH ROLLBACK IMMEDIATE;\r\n    DROP DATABASE EDW_History_Archive;\r\nEND;\r\nGO\r\n-- Create EDW_History database and log files\r\nCREATE DATABASE EDW_History_Archive\r\nON PRIMARY\r\n       (\r\n           NAME = N'EDW_History_Archive',\r\n           FILENAME = N'D:\\Program Files\\Microsoft SQL Server\\MSSQL13.HNODWDEV\\MSSQL\\DATA\\EDW_History_Archive.mdf',\r\n           SIZE = 10MB,\r\n           MAXSIZE = 1GB,\r\n           FILEGROWTH = 10MB\r\n       )\r\nLOG ON\r\n    (\r\n        NAME = N'EDW_History_Archive_log',\r\n        FILENAME = N'D:\\Program Files\\Microsoft SQL Server\\MSSQL13.HNODWDEV\\MSSQL\\DATA\\EDW_History_Archive_log.ldf',\r\n        SIZE = 1MB,\r\n        MAXSIZE = 1GB,\r\n        FILEGROWTH = 10MB\r\n    );\r\nGO\r\n--Assign database ownership to login SA\r\nEXEC EDW_History_Archive.dbo.sp_changedbowner @loginame = N'SA',\r\n                                              @map = false;\r\nGO\r\n--Change the recovery model to BULK_LOGGED\r\nALTER DATABASE EDW_History_Archive SET RECOVERY SIMPLE;\r\nGO\r\n\r\n\r\n\/*========================================================================================================\r\nSTEP 2\r\nCreate 'testapp' schema on all 4 databases created in Step 1\r\n========================================================================================================*\/\r\nIF OBJECT_ID('tempdb..##dbs ') IS NOT NULL\r\nBEGIN\r\n    DROP TABLE ##dbs;\r\nEND;\r\nCREATE TABLE ##dbs\r\n(\r\n    db VARCHAR(128),\r\n    dbid SMALLINT\r\n);\r\nINSERT INTO ##dbs\r\n(\r\n    db,\r\n    dbid\r\n)\r\nSELECT name,\r\n       database_id\r\nFROM sys.databases;\r\n\r\nEXEC dbo.sp_MSforeachdb @command1 = 'IF DB_ID(''?'') in (SELECT dbid from ##dbs where db in(''EDW_Landing'', ''EDW_Staging'',''EDW_History'', ''EDW_History_Archive'')) BEGIN USE ? EXEC(''CREATE SCHEMA testapp'') END';\r\n\r\n\r\n\r\n\/*========================================================================================================\r\nSTEP 3\r\nCreate three tables - Table1, Table2 &amp; Table3 across all 4 databases on the testapp schema.\r\nNote: Depending on which database each table is created, its schema may vary slightly. \r\n========================================================================================================*\/\r\nUSE EDW_Landing;\r\nGO\r\nCREATE TABLE testapp.Table1\r\n(\r\n    id INT NOT NULL,\r\n    transaction_key UNIQUEIDENTIFIER NOT NULL,\r\n    sale_price DECIMAL(10, 2) NOT NULL,\r\n    customer_id INT NOT NULL,\r\n    sale_datetime DATETIME NOT NULL,\r\n    etl_batch_datetime DATETIME2 NOT NULL,\r\n    etl_event_datetime DATETIME2 NOT NULL,\r\n    etl_cdc_operation VARCHAR(56) NOT NULL,\r\n    etl_row_id INT IDENTITY(1, 1) NOT NULL,\r\n    CONSTRAINT pk_testapp_table1_id\r\n        PRIMARY KEY CLUSTERED (id ASC)\r\n        WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,\r\n              ALLOW_PAGE_LOCKS = ON\r\n             ) ON &#x5B;PRIMARY]\r\n) ON &#x5B;PRIMARY];\r\n\r\n\r\nUSE EDW_Landing;\r\nGO\r\nCREATE TABLE testapp.Table2\r\n(\r\n    id INT NOT NULL,\r\n    transaction_key UNIQUEIDENTIFIER NOT NULL,\r\n    sale_price DECIMAL(10, 2) NOT NULL,\r\n    customer_id INT NOT NULL,\r\n    sale_datetime DATETIME NOT NULL,\r\n    etl_Batch_datetime DATETIME2 NOT NULL,\r\n    etl_Event_datetime DATETIME2 NOT NULL,\r\n    etl_cdc_operation VARCHAR(56) NOT NULL,\r\n    etl_row_id INT IDENTITY(1, 1) NOT NULL,\r\n    CONSTRAINT pk_testapp_table2_id\r\n        PRIMARY KEY CLUSTERED (id ASC)\r\n        WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,\r\n              ALLOW_PAGE_LOCKS = ON\r\n             ) ON &#x5B;PRIMARY]\r\n) ON &#x5B;PRIMARY];\r\n\r\n\r\nUSE EDW_Landing;\r\nGO\r\nCREATE TABLE testapp.Table3\r\n(\r\n    id INT NOT NULL,\r\n    transaction_key UNIQUEIDENTIFIER NOT NULL,\r\n    sale_price DECIMAL(10, 2) NOT NULL,\r\n    customer_id INT NOT NULL,\r\n    sale_datetime DATETIME NOT NULL,\r\n    etl_batch_datetime DATETIME2 NOT NULL,\r\n    etl_event_dateTime DATETIME2 NOT NULL,\r\n    etl_cdc_operation VARCHAR(56) NOT NULL,\r\n    etl_row_id INT IDENTITY(1, 1) NOT NULL,\r\n    CONSTRAINT pk_testapp_table3_id\r\n        PRIMARY KEY CLUSTERED (id ASC)\r\n        WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,\r\n              ALLOW_PAGE_LOCKS = ON\r\n             ) ON &#x5B;PRIMARY]\r\n) ON &#x5B;PRIMARY];\r\n\r\n\r\nUSE EDW_Staging;\r\nGO\r\nCREATE TABLE testapp.Table1\r\n(\r\n    id INT NULL,\r\n    transaction_key UNIQUEIDENTIFIER NULL,\r\n    sale_price DECIMAL(10, 2) NULL,\r\n    customer_id INT NULL,\r\n    sale_datetime DATETIME NULL,\r\n    etl_batch_datetime DATETIME2 NOT NULL,\r\n    etl_event_datetime DATETIME2 NOT NULL,\r\n    etl_cdc_operation VARCHAR(56) NOT NULL,\r\n    etl_row_id INT IDENTITY(1, 1) NOT NULL,\r\n    etl_hash_full_record VARCHAR(512)\r\n) ON &#x5B;PRIMARY];\r\n\r\n\r\nUSE EDW_Staging;\r\nGO\r\nCREATE TABLE testapp.Table2\r\n(\r\n    id INT NULL,\r\n    transaction_key UNIQUEIDENTIFIER NULL,\r\n    sale_price DECIMAL(10, 2) NULL,\r\n    customer_id INT NULL,\r\n    sale_datetime DATETIME NULL,\r\n    etl_batch_datetime DATETIME2 NOT NULL,\r\n    etl_event_datetime DATETIME2 NOT NULL,\r\n    etl_cdc_operation VARCHAR(56) NOT NULL,\r\n    etl_row_id INT IDENTITY(1, 1) NOT NULL,\r\n    etl_hash_full_record VARCHAR(512)\r\n) ON &#x5B;PRIMARY];\r\n\r\n\r\nUSE EDW_Staging;\r\nGO\r\nCREATE TABLE testapp.Table3\r\n(\r\n    id INT NULL,\r\n    transaction_key UNIQUEIDENTIFIER NULL,\r\n    sale_price DECIMAL(10, 2) NULL,\r\n    customer_id INT NULL,\r\n    sale_datetime DATETIME NULL,\r\n    etl_batch_datetime DATETIME2 NOT NULL,\r\n    etl_event_datetime DATETIME2 NOT NULL,\r\n    etl_cdc_operation VARCHAR(56) NOT NULL,\r\n    etl_row_id INT IDENTITY(1, 1) NOT NULL,\r\n    etl_hash_full_record VARCHAR(512)\r\n) ON &#x5B;PRIMARY];\r\n\r\n\r\nUSE EDW_History;\r\nGO\r\nCREATE TABLE testapp.Table1\r\n(\r\n    hist_testapp_table1_sk CHAR(40) NOT NULL,\r\n    id INT NULL,\r\n    transaction_key UNIQUEIDENTIFIER NULL,\r\n    sale_price DECIMAL(10, 2) NULL,\r\n    customer_id INT NULL,\r\n    sale_datetime DATETIME NULL,\r\n    etl_batch_datetime DATETIME2 NOT NULL,\r\n    etl_event_datetime DATETIME2 NOT NULL,\r\n    etl_cdc_operation VARCHAR(56) NOT NULL,\r\n    etl_row_id INT NOT NULL,\r\n    etl_hash_full_record VARCHAR(512) NOT NULL,\r\n    CONSTRAINT pk_hist_testapp_table1\r\n        PRIMARY KEY CLUSTERED (\r\n                                  hist_testapp_table1_sk DESC,\r\n                                  etl_event_datetime ASC,\r\n                                  etl_row_id ASC\r\n                              )\r\n        WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,\r\n              ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 90\r\n             ) ON &#x5B;PRIMARY]\r\n) ON &#x5B;PRIMARY];\r\nGO\r\n\r\n\r\nUSE EDW_History;\r\nGO\r\nCREATE TABLE testapp.Table2\r\n(\r\n    hist_testapp_table2_sk CHAR(40) NOT NULL,\r\n    id INT NULL,\r\n    transaction_key UNIQUEIDENTIFIER NULL,\r\n    sale_price DECIMAL(10, 2) NULL,\r\n    customer_id INT NULL,\r\n    sale_datetime DATETIME NULL,\r\n    etl_batch_datetime DATETIME2 NOT NULL,\r\n    etl_event_datetime DATETIME2 NOT NULL,\r\n    etl_cdc_operation VARCHAR(56) NOT NULL,\r\n    etl_row_id INT NOT NULL,\r\n    etl_hash_full_record VARCHAR(512) NOT NULL,\r\n    CONSTRAINT pk_hist_testapp_table2\r\n        PRIMARY KEY CLUSTERED (\r\n                                  hist_testapp_table2_sk DESC,\r\n                                  etl_event_datetime ASC,\r\n                                  etl_row_id ASC\r\n                              )\r\n        WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,\r\n              ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 90\r\n             ) ON &#x5B;PRIMARY]\r\n) ON &#x5B;PRIMARY];\r\nGO\r\n\r\n\r\nUSE EDW_History;\r\nGO\r\nCREATE TABLE testapp.Table3\r\n(\r\n    hist_testapp_table3_sk CHAR(40) NOT NULL,\r\n    id INT NULL,\r\n    transaction_key UNIQUEIDENTIFIER NULL,\r\n    sale_price DECIMAL(10, 2) NULL,\r\n    customer_id INT NULL,\r\n    sale_datetime DATETIME NULL,\r\n    etl_batch_datetime DATETIME2 NOT NULL,\r\n    etl_event_datetime DATETIME2 NOT NULL,\r\n    etl_cdc_operation VARCHAR(56) NOT NULL,\r\n    etl_row_id INT NOT NULL,\r\n    etl_hash_full_record VARCHAR(512) NOT NULL,\r\n    CONSTRAINT pk_hist_testapp_table3\r\n        PRIMARY KEY CLUSTERED (\r\n                                  hist_testapp_table3_sk DESC,\r\n                                  etl_event_datetime ASC,\r\n                                  etl_row_id ASC\r\n                              )\r\n        WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,\r\n              ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 90\r\n             ) ON &#x5B;PRIMARY]\r\n) ON &#x5B;PRIMARY];\r\nGO\r\n\r\n\r\nUSE EDW_History_Archive;\r\nGO\r\nCREATE TABLE testapp.Table1\r\n(\r\n    hist_testapp_table1_sk CHAR(40) NOT NULL,\r\n    id INT NULL,\r\n    transaction_key UNIQUEIDENTIFIER NULL,\r\n    sale_price DECIMAL(10, 2) NULL,\r\n    customer_id INT NULL,\r\n    sale_datetime DATETIME NULL,\r\n    etl_batch_datetime DATETIME2 NOT NULL,\r\n    etl_event_datetime DATETIME2 NOT NULL,\r\n    etl_cdc_operation VARCHAR(56) NOT NULL,\r\n    etl_row_id INT NOT NULL,\r\n    etl_hash_full_record VARCHAR(512) NOT NULL,\r\n    CONSTRAINT pk_hist_testapp_table1\r\n        PRIMARY KEY CLUSTERED (\r\n                                  hist_testapp_table1_sk DESC,\r\n                                  etl_event_datetime ASC,\r\n                                  etl_row_id ASC\r\n                              )\r\n        WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,\r\n              ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 90\r\n             ) ON &#x5B;PRIMARY]\r\n) ON &#x5B;PRIMARY];\r\nGO\r\n\r\n\r\nUSE EDW_History_Archive;\r\nGO\r\nCREATE TABLE testapp.Table2\r\n(\r\n    hist_testapp_table2_sk CHAR(40) NOT NULL,\r\n    id INT NULL,\r\n    transaction_key UNIQUEIDENTIFIER NULL,\r\n    sale_price DECIMAL(10, 2) NULL,\r\n    customer_id INT NULL,\r\n    sale_datetime DATETIME NULL,\r\n    etl_batch_datetime DATETIME2 NOT NULL,\r\n    etl_event_datetime DATETIME2 NOT NULL,\r\n    etl_cdc_operation VARCHAR(56) NOT NULL,\r\n    etl_row_id INT NOT NULL,\r\n    etl_hash_full_record VARCHAR(512) NOT NULL,\r\n    CONSTRAINT pk_hist_testapp_table2\r\n        PRIMARY KEY CLUSTERED (\r\n                                  hist_testapp_table2_sk DESC,\r\n                                  etl_event_datetime ASC,\r\n                                  etl_row_id ASC\r\n                              )\r\n        WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,\r\n              ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 90\r\n             ) ON &#x5B;PRIMARY]\r\n) ON &#x5B;PRIMARY];\r\nGO\r\n\r\n\r\nUSE EDW_History_Archive;\r\nGO\r\nCREATE TABLE testapp.Table3\r\n(\r\n    hist_testapp_table3_sk CHAR(40) NOT NULL,\r\n    id INT NULL,\r\n    transaction_key UNIQUEIDENTIFIER NULL,\r\n    sale_price DECIMAL(10, 2) NULL,\r\n    customer_id INT NULL,\r\n    sale_datetime DATETIME NULL,\r\n    etl_batch_datetime DATETIME2 NOT NULL,\r\n    etl_event_datetime DATETIME2 NOT NULL,\r\n    etl_cdc_operation VARCHAR(56) NOT NULL,\r\n    etl_row_id INT NOT NULL,\r\n    etl_hash_full_record VARCHAR(512) NOT NULL,\r\n    CONSTRAINT pk_hist_testapp_table3\r\n        PRIMARY KEY CLUSTERED (\r\n                                  hist_testapp_table3_sk DESC,\r\n                                  etl_event_datetime ASC,\r\n                                  etl_row_id ASC\r\n                              )\r\n        WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,\r\n              ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 90\r\n             ) ON &#x5B;PRIMARY]\r\n) ON &#x5B;PRIMARY];\r\nGO\r\n\r\n\r\n\/*========================================================================================================\r\nSTEP 4\r\nSeed EDW_Landing database with dummy data \r\n========================================================================================================*\/\r\nUSE EDW_Landing;\r\nGO\r\nDECLARE @dt DATETIME = SYSDATETIME();\r\nDECLARE @ct INT;\r\nSET @ct = 0;\r\nWHILE @ct &lt; 1000\r\nBEGIN\r\n    INSERT INTO testapp.Table1\r\n    (\r\n        id,\r\n        transaction_key,\r\n        sale_price,\r\n        customer_id,\r\n        sale_datetime,\r\n        etl_batch_datetime,\r\n        etl_event_datetime,\r\n        etl_cdc_operation\r\n    )\r\n    SELECT @ct + 1,\r\n           CONVERT(VARCHAR(255), NEWID()),\r\n           ROUND(RAND(CHECKSUM(NEWID())) * (100), 2),\r\n           CAST(RAND() * 1000000 AS INT),\r\n           DATEADD(DAY, (ABS(CHECKSUM(NEWID())) % 3650) * -1, GETDATE()),\r\n           @dt,\r\n           SYSDATETIME(),\r\n           'Insert';\r\n\r\n    INSERT INTO testapp.Table2\r\n    (\r\n        id,\r\n        transaction_key,\r\n        sale_price,\r\n        customer_id,\r\n        sale_datetime,\r\n        etl_batch_datetime,\r\n        etl_event_datetime,\r\n        etl_cdc_operation\r\n    )\r\n    SELECT @ct + 1,\r\n           CONVERT(VARCHAR(255), NEWID()),\r\n           ROUND(RAND(CHECKSUM(NEWID())) * (100), 2),\r\n           CAST(RAND() * 1000000 AS INT),\r\n           DATEADD(DAY, (ABS(CHECKSUM(NEWID())) % 3650) * -1, GETDATE()),\r\n           @dt,\r\n           SYSDATETIME(),\r\n           'Insert';\r\n\r\n    INSERT INTO testapp.Table3\r\n    (\r\n        id,\r\n        transaction_key,\r\n        sale_price,\r\n        customer_id,\r\n        sale_datetime,\r\n        etl_batch_datetime,\r\n        etl_event_datetime,\r\n        etl_cdc_operation\r\n    )\r\n    SELECT @ct + 1,\r\n           CONVERT(VARCHAR(255), NEWID()),\r\n           ROUND(RAND(CHECKSUM(NEWID())) * (100), 2),\r\n           CAST(RAND() * 1000000 AS INT),\r\n           DATEADD(DAY, (ABS(CHECKSUM(NEWID())) % 3650) * -1, GETDATE()),\r\n           @dt,\r\n           SYSDATETIME(),\r\n           'Insert';\r\n    SET @ct = @ct + 1;\r\nEND;\r\n\r\n\r\nSELECT t.TABLE_CATALOG,\r\n       t.TABLE_SCHEMA,\r\n       t.TABLE_NAME,\r\n       c.COLUMN_NAME,\r\n       c.IS_NULLABLE,\r\n       c.DATA_TYPE,\r\n       c.ORDINAL_POSITION,\r\n       c.CHARACTER_MAXIMUM_LENGTH\r\nFROM EDW_Landing.INFORMATION_SCHEMA.TABLES t\r\n    JOIN EDW_Landing.INFORMATION_SCHEMA.COLUMNS c\r\n        ON t.TABLE_SCHEMA = c.TABLE_SCHEMA\r\n           AND t.TABLE_CATALOG = c.TABLE_CATALOG\r\n           AND t.TABLE_NAME = c.TABLE_NAME\r\nWHERE c.TABLE_NAME = 'Table1';\r\n\r\nSELECT TOP 5\r\n       *\r\nFROM EDW_Landing.testapp.Table1;\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">Once executed (allowing for any environment adjustments on your side), all four databases should be present on the instance and three tables containing dummy data created on each of those. I will not go over the source to target data acquisition part as I have already covered a few different methods and frameworks which may be used for this purpose, mainly <a href=\"http:\/\/bicortex.com\/designing-data-acquisition-framework-in-sql-server-and-ssis-how-to-source-and-integrate-external-data-for-a-decision-support-system-or-data-warehouse-part-1\/\" target=\"_blank\" rel=\"noopener noreferrer\">HERE<\/a> and <a href=\"http:\/\/bicortex.com\/data-acquisition-framework-using-custom-python-wrapper-for-concurrent-bcp-utility-execution\/\" target=\"_blank\" rel=\"noopener noreferrer\">HERE<\/a>. Therefore, assuming our data has already been copied across in its raw form into EDW_Landing database, let\u2019s focus on how to we can utilise this framework to reconcile row-level changes across all tables using this methodology.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">This architecture can be used across many different scenarios but it is most applicable in cases where the source table does not have a reliable method to determine which records have undergone changes since the previous execution of this process, therefore making it the Data Warehouse responsibility to derive the delta. By design, this architecture accomplishes this by comparing the current source data set with the previous snapshot as maintained in the Data Lake by executing full outer join between the source data set and the equivalent copy of the data in the Data Lake (EDW_History database). Any deleted records i.e. keys which exists in EDW_History tables but do not exists in the source will be merged into the EDW_Staging dataset with the CDC operation field value set to \u2018D\u2019 (delete). The following diagram depicts the pattern used to derive all changes across source and target data sets based on hashed keys comparison (click on image to enlarge).<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2019\/05\/DW__DataLake_STG_DB_Insert_Logic.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-3773\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2019\/05\/DW__DataLake_STG_DB_Insert_Logic.png\" alt=\"\" width=\"580\" height=\"402\" srcset=\"https:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2019\/05\/DW__DataLake_STG_DB_Insert_Logic.png 882w, https:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2019\/05\/DW__DataLake_STG_DB_Insert_Logic-300x208.png 300w, https:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2019\/05\/DW__DataLake_STG_DB_Insert_Logic-768x532.png 768w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">What is important to notice is the way record-level comparison is accomplished. On larger datasets e.g. over 100 million rows, comparing each value in each column across all records would yield significant performance issues. To speed up values matching across source and target data sets, a hash key is computed, concatenating all values in the tuple and hashing those to a single block of bytes using HASHBYTES() SQL Server function. Comparing hashed values in a single column (especially when indexed) makes for an easy and flexible way of comparing large datasets quickly. The following code depicts the actual implementation of this architecture, comparing EDW_Landing data with the EDW_History data and staging changed data in EDW_Staging database based on the environment and data created by the previous script.<\/p>\n<pre class=\"brush: sql; collapse: true; light: false; title: (+) Expand To View EDW_Landing-to-EDW_Staging SQL; toolbar: true; notranslate\" title=\"(+) Expand To View EDW_Landing-to-EDW_Staging SQL\">\r\n;WITH lnd\r\nAS (SELECT id,\r\n           transaction_key,\r\n           sale_price,\r\n           customer_id,\r\n           sale_datetime,\r\n           etl_batch_datetime,\r\n           CAST(ROW_NUMBER() OVER (PARTITION BY id ORDER BY etl_event_datetime, etl_row_id) AS INT) AS source_row_number,\r\n           CONVERT(\r\n                      VARCHAR(40),\r\n                      HASHBYTES(\r\n                                   'SHA1',\r\n                                   UPPER(ISNULL(RTRIM(CONVERT(VARCHAR(200), transaction_key)), 'NA') + '|'\r\n                                         + ISNULL(RTRIM(CONVERT(VARCHAR(20), sale_price)), 'NA') + '|'\r\n                                         + ISNULL(RTRIM(CONVERT(NVARCHAR(100), customer_id)), 'NA') + '|'\r\n                                         + ISNULL(RTRIM(CONVERT(VARCHAR(20), id)), 'NA')\r\n                                         + ISNULL(RTRIM(CONVERT(VARCHAR(20), etl_cdc_operation)), 'NA') + '|' + '|'\r\n                                        )\r\n                               ),\r\n                      2\r\n                  ) AS etl_hash_full_record\r\n    FROM EDW_Landing.testapp.Table1),\r\n      hist\r\nAS (SELECT hist.id,\r\n           hist.transaction_key,\r\n           hist.sale_price,\r\n           hist.customer_id,\r\n           hist.sale_datetime,\r\n           hist.etl_hash_full_record,\r\n           CONVERT(\r\n                      VARCHAR(40),\r\n                      HASHBYTES(\r\n                                   'SHA1',\r\n                                   UPPER(ISNULL(RTRIM(CONVERT(VARCHAR(200), hist.transaction_key)), 'NA') + '|'\r\n                                         + ISNULL(RTRIM(CONVERT(VARCHAR(20), hist.sale_price)), 'NA') + '|'\r\n                                         + ISNULL(RTRIM(CONVERT(NVARCHAR(100), hist.customer_id)), 'NA')\r\n                                         + ISNULL(RTRIM(CONVERT(VARCHAR(20), hist.id)), 'NA')\r\n                                         + ISNULL(RTRIM(CONVERT(VARCHAR(20), 'Delete')), 'NA') + '|' + '|'\r\n                                        )\r\n                               ),\r\n                      2\r\n                  ) AS etl_hash_full_record_delete\r\n    FROM EDW_History.testapp.Table1 hist\r\n        JOIN\r\n        (\r\n            SELECT id,\r\n                   MAX(etl_event_datetime) AS etl_event_datetime\r\n            FROM EDW_History.testapp.Table1\r\n            GROUP BY id\r\n        ) sub\r\n            ON hist.id = sub.id\r\n               AND hist.etl_event_datetime = sub.etl_event_datetime)\r\nINSERT INTO EDW_Staging.testapp.Table1\r\n(\r\n    id,\r\n    transaction_key,\r\n    sale_price,\r\n    customer_id,\r\n    sale_datetime,\r\n    etl_batch_datetime,\r\n    etl_event_datetime,\r\n    etl_cdc_operation,\r\n    etl_hash_full_record\r\n)\r\nSELECT COALESCE(lnd.id, hist.id),\r\n       COALESCE(lnd.transaction_key, hist.transaction_key),\r\n       COALESCE(lnd.sale_price, hist.sale_price),\r\n       COALESCE(lnd.customer_id, hist.customer_id),\r\n       COALESCE(lnd.sale_datetime, hist.sale_datetime),\r\n       COALESCE(lnd.etl_batch_datetime, GETDATE()),\r\n       GETDATE(),\r\n       CASE\r\n           WHEN lnd.id IS NOT NULL THEN\r\n               'Insert'\r\n           WHEN hist.id IS NOT NULL THEN\r\n               'Delete'\r\n           ELSE\r\n               NULL\r\n       END,\r\n       COALESCE(lnd.etl_hash_full_record, hist.etl_hash_full_record_delete)\r\nFROM lnd\r\n    FULL OUTER JOIN hist\r\n        ON lnd.id = hist.id\r\nWHERE hist.id IS NULL\r\n      OR lnd.id IS NULL\r\n      OR hist.etl_hash_full_record &lt;&gt; lnd.etl_hash_full_record;\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">Once newly arrived or changed data has been staged in EDW_Staging database we can move it to EDW_History database. As EDW_Staging database is truncated every time a load is executed, EDW_History is responsible for storing the most current version of the data along with EDW_History_Archive which is used for persisting of all previous versions of the record. As with the previous pattern of loading EDW_Staging database, history is derived based on hash keys comparison for any changed records as well as hash and primary keys comparison for new data. The following diagram depicts the change detection pattern used in populating EDW_History database.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2019\/05\/DW__DataLake_HIST_DB_Insert_Logic.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-3774\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2019\/05\/DW__DataLake_HIST_DB_Insert_Logic.png\" alt=\"\" width=\"580\" height=\"379\" srcset=\"https:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2019\/05\/DW__DataLake_HIST_DB_Insert_Logic.png 755w, https:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2019\/05\/DW__DataLake_HIST_DB_Insert_Logic-300x196.png 300w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Looking at how this architecture can be achieved using SQL on the dummy data which was created previously, the following script implements historical data loading into EDW_History database.<\/p>\n<pre class=\"brush: sql; collapse: true; light: false; title: (+) Expand To View EDW_Staging-to-EDW_History SQL; toolbar: true; notranslate\" title=\"(+) Expand To View EDW_Staging-to-EDW_History SQL\">\r\n;WITH stg\r\nAS (SELECT CONVERT(VARCHAR(40), HASHBYTES('SHA1', ISNULL(RTRIM(CONVERT(VARCHAR(MAX), id)), 'NA')), 2) AS stg_testapp_table1_sk,\r\n           transaction_key,\r\n           sale_price,\r\n           customer_id,\r\n           sale_datetime,\r\n           etl_batch_datetime,\r\n           etl_hash_full_record,\r\n           etl_event_datetime,\r\n           etl_cdc_operation,\r\n           id,\r\n           CAST(ROW_NUMBER() OVER (PARTITION BY id ORDER BY etl_event_datetime, etl_row_id) AS INT) AS rn\r\n    FROM EDW_Staging.testapp.Table1),\r\n      hist\r\nAS (SELECT hist.hist_testapp_table1_sk,\r\n           hist.etl_event_datetime,\r\n           hist.etl_hash_full_record,\r\n           hist.etl_row_id\r\n    FROM EDW_History.testapp.Table1 hist\r\n        INNER JOIN EDW_Staging.testapp.Table1 stg\r\n            ON hist.id = stg.id)\r\nINSERT INTO EDW_History.testapp.Table1\r\n(\r\n    hist_testapp_table1_sk,\r\n    id,\r\n    transaction_key,\r\n    sale_price,\r\n    customer_id,\r\n    sale_datetime,\r\n    etl_batch_datetime,\r\n    etl_event_datetime,\r\n    etl_cdc_operation,\r\n    etl_hash_full_record,\r\n    etl_row_id\r\n)\r\nSELECT stg.stg_testapp_table1_sk,\r\n       stg.id,\r\n       stg.transaction_key,\r\n       stg.sale_price,\r\n       stg.customer_id,\r\n       stg.sale_datetime,\r\n       stg.etl_batch_datetime,\r\n       stg.etl_event_datetime,\r\n       stg.etl_cdc_operation,\r\n       stg.etl_hash_full_record,\r\n       stg.rn\r\nFROM stg\r\n    LEFT JOIN hist\r\n        ON stg.stg_testapp_table1_sk = hist.hist_testapp_table1_sk\r\n           AND stg.etl_event_datetime = hist.etl_event_datetime\r\nWHERE hist.hist_testapp_table1_sk IS NULL\r\n      OR hist.etl_hash_full_record IS NULL\r\nUNION ALL\r\nSELECT stg.stg_testapp_table1_sk,\r\n       stg.id,\r\n       stg.transaction_key,\r\n       stg.sale_price,\r\n       stg.customer_id,\r\n       stg.sale_datetime,\r\n       stg.etl_batch_datetime,\r\n       stg.etl_event_datetime,\r\n       stg.etl_cdc_operation,\r\n       stg.etl_hash_full_record,\r\n       stg.rn\r\nFROM stg\r\n    JOIN hist\r\n        ON stg.stg_testapp_table1_sk = hist.hist_testapp_table1_sk\r\n           AND stg.etl_event_datetime = hist.etl_event_datetime\r\nWHERE stg.etl_hash_full_record &lt;&gt; hist.etl_hash_full_record;\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">Finally, the last piece of the puzzle is to archive off all historised data into its own dedicated database \u2013 EDW_History_Archive \u2013 which stores all previous version of the record. Using a dedicated database for this ensures that current version of the data in EDW_History stays lean and is easy to query to get the most up-to-date information without the need to filter data based on dates or flags indicating record currency. EDW_History_Archive is designed to house all previous record versions based on changes introduced.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">To achieve this, we will need to do it as a two-stage process i.e. insert data into EDW_History_Archive database and delete any non-current record from EDW_History database. The following diagram and SQL code depicts the logic used for this process.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2019\/05\/DW__DataLake_HIST_ARCH_DB_Insert_Logic.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-3775\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2019\/05\/DW__DataLake_HIST_ARCH_DB_Insert_Logic.png\" alt=\"\" width=\"580\" height=\"413\" srcset=\"https:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2019\/05\/DW__DataLake_HIST_ARCH_DB_Insert_Logic.png 581w, https:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2019\/05\/DW__DataLake_HIST_ARCH_DB_Insert_Logic-300x214.png 300w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<pre class=\"brush: sql; collapse: true; light: false; title: (+) Expand To View EDW_History-to-EDW_History_Archive SQL; toolbar: true; notranslate\" title=\"(+) Expand To View EDW_History-to-EDW_History_Archive SQL\"> \r\nINSERT INTO EDW_History_Archive.testapp.Table1\r\n(\r\n    hist_testapp_table1_sk,\r\n    id,\r\n    transaction_key,\r\n    sale_price,\r\n    customer_id,\r\n    sale_datetime,\r\n    etl_batch_datetime,\r\n    etl_event_datetime,\r\n    etl_cdc_operation,\r\n    etl_hash_full_record,\r\n    etl_row_id\r\n)\r\nSELECT hist.hist_testapp_table1_sk,\r\n       hist.id,\r\n       hist.transaction_key,\r\n       hist.sale_price,\r\n       hist.customer_id,\r\n       hist.sale_datetime,\r\n       hist.etl_batch_datetime,\r\n       hist.etl_event_datetime,\r\n       hist.etl_cdc_operation,\r\n       hist.etl_hash_full_record,\r\n       hist.etl_row_id\r\nFROM EDW_History.testapp.Table1 hist\r\n    INNER JOIN\r\n    (\r\n        SELECT B.hist_testapp_table1_sk,\r\n               B.etl_event_datetime AS max_etl_event_datetime,\r\n               B.etl_row_id AS max_etl_row_id,\r\n               ROW_NUMBER() OVER (PARTITION BY B.hist_testapp_table1_sk\r\n                                  ORDER BY B.etl_event_datetime DESC,\r\n                                           B.etl_row_id DESC\r\n                                 ) AS row_no\r\n        FROM EDW_History.testapp.Table1 B\r\n            INNER JOIN EDW_Staging.testapp.Table1 STG\r\n                ON B.id = STG.id\r\n                   AND B.etl_event_datetime = STG.etl_event_datetime\r\n                   AND B.etl_row_id = STG.etl_row_id\r\n    ) z\r\n        ON hist.hist_testapp_table1_sk = z.hist_testapp_table1_sk\r\n           AND CONCAT(hist.etl_event_datetime, hist.etl_row_id) &lt;&gt; CONCAT(z.max_etl_event_datetime, z.max_etl_row_id)\r\n           AND z.row_no = 1;\r\n\r\nDELETE hist\r\nFROM EDW_History.testapp.Table1 hist\r\n    INNER JOIN \r\n    (\r\n        SELECT B.hist_testapp_table1_sk,\r\n               B.etl_event_datetime AS max_etl_event_datetime,\r\n               B.etl_row_id AS max_etl_row_id,\r\n               ROW_NUMBER() OVER (PARTITION BY B.hist_testapp_table1_sk\r\n                                  ORDER BY B.etl_event_datetime DESC,\r\n                                           B.etl_row_id DESC\r\n                                 ) AS row_no\r\n        FROM EDW_History.testapp.Table1 B\r\n            INNER JOIN EDW_Staging.testapp.Table1 stg\r\n                ON B.id = stg.id\r\n                   AND B.etl_event_datetime = stg.etl_event_datetime\r\n                   AND B.etl_row_id = stg.etl_row_id\r\n    ) z\r\n        ON hist.hist_testapp_table1_sk = z.hist_testapp_table1_sk\r\n           AND CONCAT(hist.etl_event_datetime, hist.etl_row_id) &lt;&gt; CONCAT(z.max_etl_event_datetime, z.max_etl_row_id)\r\n           AND z.row_no = 1;\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">In order to merge historical data (EDW_History_Archive) as well as the most current record (EDW_History), we can UNION same objects based on a combination of keys and ids e.g. if we require to expose the full transactional history of a record which has undergone changes (updates, deletes) than we can combine its surrogate key with the business key (in case of Table1 dummy data these are hist_testapp_table1_sk and id) and optionally sort it by etl_batch_datetime to view its lifecycle across the history.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Now that we have the conceptual understanding of how this paradigm can be applied to &#8216;data lake&#8217; creation, it is worth highlighting that all the scripts depicting the above architecture can be fully managed via the metadata-driven design. Rather than hard-coding all objects\u2019 and attributes\u2019 names, we can build those queries on the fly based on SQL Server metadata stored across its system catalog schema views and tables. As with any data warehouse design robust enough to respond to schema and data changes with little interference from developers, most simple logic following designated patterns should be derived from the information saved in the data store itself. With that approach, we can create a simple looping mechanism to run the metadata-based queries for each stage of the process, for each object being processed. Below is a sample code which builds the query used in populating EDW_Staging database (as per the SQL above) on the fly. We can then take this code and execute it inside a stored procedure or an ETL job without the need of hard-coding anything besides the names of the databases used (not bound to value changes).<\/p>\n<pre class=\"brush: sql; collapse: true; light: false; title: (+) Expand To View Metadata-Driven EDW_Landing-to-EDW_Staging SQL; toolbar: true; notranslate\" title=\"(+) Expand To View Metadata-Driven EDW_Landing-to-EDW_Staging SQL\">\r\nIF OBJECT_ID('tempdb..##t_metadata') IS NOT NULL\r\nBEGIN\r\n    DROP TABLE ##t_metadata;\r\nEND;\r\nCREATE TABLE tempdb..##t_metadata\r\n(\r\n    db_name VARCHAR(256),\r\n    table_name VARCHAR(256),\r\n    column_name VARCHAR(256),\r\n    schema_name VARCHAR(128),\r\n    ordinal_position INT,\r\n    is_nullable BIT,\r\n    data_type VARCHAR(256),\r\n    character_maximum_length BIGINT,\r\n    numeric_scale SMALLINT,\r\n    numeric_precision SMALLINT,\r\n    is_primary_key BIT,\r\n    cast_data_type VARCHAR(256)\r\n);\r\n\r\nDECLARE @command VARCHAR(MAX);\r\nSELECT @command\r\n    = 'IF DB_ID(''?'') IN\t\r\n\t\t\t\t\t\t\t(\r\n\t\t\t\t\t\t\tSELECT dbid \r\n\t\t\t\t\t\t\tFROM ##dbs \r\n\t\t\t\t\t\t\tWHERE db IN (\t''EDW_Landing'', \r\n\t\t\t\t\t\t\t\t\t\t\t''EDW_Staging'',\r\n\t\t\t\t\t\t\t\t\t\t\t''EDW_History'', \r\n\t\t\t\t\t\t\t\t\t\t\t''EDW_History_Archive'')\r\n\t\t\t\t\t\t\t) \r\n\t\tBEGIN \r\n\t\tUSE ? \r\n\t\tEXEC(\r\n\t\t\t''INSERT INTO ##t_metadata\r\n\t\t\t(\r\n\t\t\tdb_name,\r\n\t\t\ttable_name,\r\n\t\t\tcolumn_name,\r\n\t\t\tschema_name,\r\n\t\t\tordinal_position,\r\n\t\t\tis_nullable,\r\n\t\t\tdata_type,\r\n\t\t\tcharacter_maximum_length,\r\n\t\t\tnumeric_scale,\r\n\t\t\tnumeric_precision,\r\n\t\t\tis_primary_key,\r\n\t\t\tcast_data_type\r\n\t\t\t)\r\n\t\t\tSELECT  \r\n\t\t\tDB_NAME() AS db_name,\r\n\t\t\tt.name AS table_name ,\r\n\t\t\tc.name AS column_name ,\r\n\t\t\tss.name as schema_name,\r\n\t\t\tc.column_id AS ordinal_position ,\r\n\t\t\tc.is_nullable ,\t\t\t\t\t\t\t\t\t\t\r\n\t\t\ttp.name AS data_type ,\r\n\t\t\tc.max_length AS character_maximum_length ,\r\n\t\t\tc.scale AS numeric_scale ,\r\n\t\t\tc.precision AS numeric_precision ,\r\n\t\t\tISNULL(idx.pk_flag,0) as ''''is_primary_key'''',\r\n\t\t\tNULL AS cast_data_type\r\n\t\t\tFROM    sys.tables t\r\n\t\t\tJOIN sys.columns c ON t.object_id = c.object_id\r\n\t\t\tJOIN sys.types tp ON c.user_type_id = tp.user_type_id\r\n\t\t\tJOIN sys.objects so ON so.object_id = t.object_id\r\n\t\t\tJOIN sys.schemas ss ON so.schema_id = ss.schema_id\r\n\t\t\tLEFT JOIN\t\t\r\n\t\t\t\t(SELECT i.name as index_name, \r\n\t\t\t\t\t\ti.is_primary_key as pk_flag, \r\n\t\t\t\t\t\tOBJECT_NAME(ic.OBJECT_ID) AS table_name, \r\n\t\t\t\t\t\ts.name AS schema_name,\r\n\t\t\t\t\t\tCOL_NAME(ic.OBJECT_ID,ic.column_id) AS column_name \r\n\t\t\t\tFROM sys.indexes AS i \r\n\t\t\t\tJOIN sys.index_columns AS ic \r\n\t\t\t\t\t\tON  i.OBJECT_ID = ic.OBJECT_ID \r\n\t\t\t\t\t\tAND i.index_id = ic.index_id\r\n\t\t\t\tJOIN sys.objects o \r\n\t\t\t\t\t\tON o.object_id = i.object_id\r\n\t\t\t\tJOIN sys.schemas s \r\n\t\t\t\t\t\tON o.schema_id = s.schema_id\r\n\t\t\t\tWHERE   i.is_primary_key = 1) idx \r\n\t\t\t\tON idx.table_name = t.name \r\n\t\t\t\tAND idx.column_name = c.name \r\n\t\t\t\tAND idx.schema_name = ss.name'') \r\n\t\tEND';\r\nEXEC sp_MSforeachdb @command;\r\n\r\nUPDATE ##t_metadata\r\nSET cast_data_type = CASE\r\n                         WHEN data_type IN ( 'char', 'varchar' ) THEN\r\n                             CASE\r\n                                 WHEN character_maximum_length &lt; 0 THEN\r\n                                     'VARCHAR (MAX)'\r\n                                 WHEN character_maximum_length &lt;= 100 THEN\r\n                                     'VARCHAR (100)'\r\n                                 WHEN character_maximum_length &gt; 100\r\n                                      AND character_maximum_length &lt;= 1000 THEN\r\n                                     'VARCHAR (1000)'\r\n                                 WHEN character_maximum_length &gt; 1000\r\n                                      AND character_maximum_length &lt;= 4000 THEN\r\n                                     'VARCHAR (4000)'\r\n                                 WHEN character_maximum_length &gt; 4000\r\n                                      AND character_maximum_length &lt;= 8000 THEN\r\n                                     'VARCHAR (8000)'\r\n                                 ELSE\r\n                                     'VARCHAR (' + CAST(character_maximum_length AS VARCHAR(100)) + ')'\r\n                             END\r\n                         WHEN data_type IN ( 'nchar', 'nvarchar' ) THEN\r\n                             CASE\r\n                                 WHEN character_maximum_length &lt; 0 THEN\r\n                                     'NVARCHAR (MAX)'\r\n                                 WHEN character_maximum_length &lt;= 100 THEN\r\n                                     'NVARCHAR (100)'\r\n                                 WHEN character_maximum_length &gt; 100\r\n                                      AND character_maximum_length &lt;= 1000 THEN\r\n                                     'NVARCHAR (1000)'\r\n                                 WHEN character_maximum_length &gt; 1000\r\n                                      AND character_maximum_length &lt;= 4000 THEN\r\n                                     'NVARCHAR (4000)'\r\n                                 WHEN character_maximum_length &gt; 4000\r\n                                      AND character_maximum_length &lt;= 8000 THEN\r\n                                     'NVARCHAR (8000)'\r\n                                 ELSE\r\n                                     'VARCHAR (' + CAST(character_maximum_length AS VARCHAR(100)) + ')'\r\n                             END\r\n                         WHEN data_type IN ( 'smalldatetime', 'datetime', 'datetime2', 'Timestamp' ) THEN\r\n                             'VARCHAR(4000)'\r\n                         WHEN data_type IN ( 'int', 'smallint', 'mediumint', 'tinyint', 'bit' ) THEN\r\n                             'VARCHAR(4000)'\r\n                         WHEN data_type IN ( 'bigint' ) THEN\r\n                             'VARCHAR(4000)'\r\n                         WHEN data_type IN ( 'decimal', 'number', 'float', 'real', 'numeric' ) THEN\r\n                             'VARCHAR(4000)'\r\n                         WHEN data_type IN ( 'uniqueidentifier' ) THEN\r\n                             'VARCHAR(4000)'\r\n                         WHEN data_type IN ( 'tinyblob' ) THEN\r\n                             'VARCHAR(4000)'\r\n                         WHEN data_type IN ( 'date' ) THEN\r\n                             'VARCHAR(4000)'\r\n                         WHEN data_type IN ( 'datetimeoffset' ) THEN\r\n                             'VARCHAR(4000)'\r\n                         WHEN data_type IN ( 'timestamp' ) THEN\r\n                             'VARCHAR(4000)'\r\n                         WHEN data_type IN ( 'long', 'long raw' ) THEN\r\n                             'VARCHAR (MAX)'\r\n                         WHEN data_type IN ( 'time' ) THEN\r\n                             'VARCHAR(4000)'\r\n                         ELSE\r\n                             ''\r\n                     END;\r\n\r\nIF OBJECT_ID('tempdb..##t_sql') IS NOT NULL\r\nBEGIN\r\n    DROP TABLE ##t_sql;\r\nEND;\r\nSELECT DISTINCT\r\n       t.table_name,\r\n       'WITH lnd AS ('\r\n       + REVERSE(STUFF(\r\n                          REVERSE('SELECT ' + STUFF(\r\n                                              (\r\n                                                  SELECT ', ' + '&#x5B;' + C.column_name + ']'\r\n                                                  FROM ##t_metadata AS C\r\n                                                  WHERE C.schema_name = t.schema_name\r\n                                                        AND C.table_name = t.table_name\r\n                                                        AND C.db_name = 'EDW_Landing'\r\n                                                  ORDER BY C.ordinal_position\r\n                                                  FOR XML PATH('')\r\n                                              ),\r\n                                              1,\r\n                                              2,\r\n                                              ''\r\n                                                   ) + ', CAST(ROW_NUMBER() OVER (PARTITION BY ' + a.column_name\r\n                                  + ' ORDER BY \r\n\t\t\t\t\t\t\t\t\t\tetl_event_datetime, etl_row_id) AS INT) AS source_row_number, \r\n\t\t\t\t\t\t\t\t\t\tCONVERT(VARCHAR(40), HASHBYTES(''SHA1'', UPPER('\r\n                                  + STUFF(\r\n                                    (\r\n                                        SELECT 'ISNULL(RTRIM(LTRIM(CONVERT(' + cast_data_type + ', ' + '&#x5B;' + column_name\r\n                                               + ']))), ''NA'') + ''|'' + '\r\n                                        FROM ##t_metadata z\r\n                                        WHERE z.table_name = t.table_name\r\n                                              AND z.db_name = 'EDW_Landing'\r\n                                        ORDER BY z.ordinal_position\r\n                                        FOR XML PATH('')\r\n                                    ),\r\n                                    1,\r\n                                    0,\r\n                                    ''\r\n                                         )\r\n                                 ),\r\n                          1,\r\n                          8,\r\n                          ''\r\n                      )\r\n                ) + '+''|''+''|'')),2) AS etl_hash_full_record FROM EDW_Landing.' + t.schema_name + '.' + t.table_name\r\n       + '),' AS lnd_query,\r\n       'hist AS ('\r\n       + REVERSE(STUFF(\r\n                          REVERSE('SELECT ' + STUFF(\r\n                                              (\r\n                                                  SELECT ', ' + 'hist.&#x5B;' + C.column_name + ']'\r\n                                                  FROM ##t_metadata AS C\r\n                                                  WHERE C.schema_name = t.schema_name\r\n                                                        AND C.table_name = t.table_name\r\n                                                        AND C.db_name = 'EDW_History'\r\n                                                  ORDER BY C.ordinal_position\r\n                                                  FOR XML PATH('')\r\n                                              ),\r\n                                              1,\r\n                                              2,\r\n                                              ''\r\n                                                   ) + ', CONVERT(VARCHAR(40),HASHBYTES(''SHA1'',UPPER('\r\n                                  + STUFF(\r\n                                    (\r\n                                        SELECT 'ISNULL(RTRIM(LTRIM(CONVERT(' + cast_data_type + ', ' + 'hist.&#x5B;'\r\n                                               + column_name + ']))), ''NA'') + ''|'' + '\r\n                                        FROM ##t_metadata z\r\n                                        WHERE z.table_name = t.table_name\r\n                                              AND z.db_name = 'EDW_History'\r\n                                        ORDER BY z.ordinal_position\r\n                                        FOR XML PATH('')\r\n                                    ),\r\n                                    1,\r\n                                    0,\r\n                                    ''\r\n                                         )\r\n                                 ),\r\n                          1,\r\n                          8,\r\n                          ''\r\n                      )\r\n                ) + '+''|''+''|'')),2) AS etl_hash_full_record_delete \r\n\t\t\t\tFROM EDW_History.' + t.schema_name + '.' + t.table_name\r\n       + ' hist JOIN\r\n        (\r\n            SELECT id,\r\n                   MAX(etl_event_datetime) AS etl_event_datetime\r\n            FROM EDW_History.' + t.schema_name + '.' + a.table_name\r\n       + '\r\n            GROUP BY id\r\n        ) sub\r\n            ON hist.' + a.column_name + ' = sub.' + a.column_name\r\n       + '\r\n               AND hist.etl_event_datetime = sub.etl_event_datetime)' AS hist_query,\r\n       'INSERT INTO EDW_Staging.' + t.schema_name + '.' + t.table_name + ' ('\r\n       + STUFF(\r\n         (\r\n             SELECT ', ' + '&#x5B;' + C.column_name + ']'\r\n             FROM ##t_metadata AS C\r\n             WHERE C.schema_name = t.schema_name\r\n                   AND C.table_name = t.table_name\r\n                   AND C.column_name &lt;&gt; 'etl_row_id'\r\n                   AND C.db_name = 'EDW_Staging'\r\n             ORDER BY C.ordinal_position\r\n             FOR XML PATH('')\r\n         ),\r\n         1,\r\n         2,\r\n         ''\r\n              ) + ')' + ' SELECT '\r\n       + STUFF(\r\n         (\r\n             SELECT CASE\r\n                        WHEN C.column_name = 'etl_event_datetime' THEN\r\n                            ', GETDATE()'\r\n                        WHEN C.column_name = 'etl_batch_datetime' THEN\r\n                            ', COALESCE(l.' + C.column_name + ', GETDATE())'\r\n                        WHEN C.column_name = 'etl_cdc_operation' THEN\r\n                            ', CASE WHEN l.id IS NOT NULL THEN ''INSERT'' \r\n\t\t\t\t\t\t\t\t\tWHEN h.id IS NOT NULL THEN ''DELETE'' \r\n\t\t\t\t\t\t\t\t\tELSE NULL \r\n\t\t\t\t\t\t\t\tEND'\r\n                        ELSE\r\n                            ', ' + 'COALESCE(l.&#x5B;' + C.column_name + '], h.&#x5B;' + C.column_name + '])'\r\n                    END\r\n             FROM ##t_metadata AS C\r\n             WHERE C.schema_name = t.schema_name\r\n                   AND C.column_name &lt;&gt; 'etl_row_id'\r\n                   AND C.table_name = t.table_name\r\n                   AND C.db_name = 'EDW_Staging'\r\n             ORDER BY C.ordinal_position\r\n             FOR XML PATH('')\r\n         ),\r\n         1,\r\n         2,\r\n         ''\r\n              ) + +' FROM lnd l \r\n\t\t\t\tFULL OUTER JOIN hist h ON l.' + a.column_name + ' = h.' + a.column_name + ' \r\n\t\t\t\tWHERE h.' + a.column_name + ' IS NULL \r\n\t\t\t\tOR l.' + a.column_name + ' IS NULL  \r\n\t\t\t\tOR h.etl_hash_full_record &lt;&gt; l.etl_hash_full_record;' AS insert_query\r\nINTO ##t_sql\r\nFROM ##t_metadata t\r\n    CROSS APPLY\r\n(\r\n    SELECT DISTINCT\r\n           m.table_name,\r\n           m.column_name\r\n    FROM ##t_metadata m\r\n    WHERE m.table_name = t.table_name\r\n          AND m.column_name = t.column_name\r\n          AND m.is_primary_key = 1\r\n          AND m.db_name = 'EDW_Landing'\r\n) a;\r\n\r\n\r\nSELECT CONCAT(lnd_query, hist_query, insert_query) AS full_sql\r\nFROM ##t_sql\r\nORDER BY table_name ASC;\r\n<\/pre>\n<h3 style=\"text-align: center;\">Conclusion<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">So, there you go. A simple and efficient framework for designing and implementing a historised relational &#8216;data lake&#8217; which can then be built upon to include virtual data marts with business-specific rules and definitions as well as a presentation layer for reporting needs.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">In my personal experience as the information architect, the most mundane, exhaustive and repetitive activities I have seen most developers struggling with on any BI\/analytics project were building data acquisitions pipelines and managing data&#8217;s history. Likewise, for most business trying to uplift their analytics capability through running data-related projects, those two tasks provide very little tangible value. By providing access to the integrated and historised data which any business can utilize to answer pressing questions and improve their competitive advantage in little time and with high accuracy, the need for meticulous, domain-specific modelling is diminished and users can start turning data into information with greater speed and efficiency.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">It is also worth pointing out that I am not against data modelling in general as all the previously mentioned techniques and paradigms have greatly contributed to reducing risks, increasing performance and defining business processes through a set of concepts and rules tightly coupled to the models which reflect them. However, oftentimes, when organisations struggle with time-to-market and scope creep based on data complexities and the lack of structural independence and agility, it is worth exploring alternative approaches which may deliver 20% of functionality, yet providing 80% of value.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Problem Statement Data warehouse modelling techniques have largely been left unchanged for decades and most of common methodologies and patterns which were prevalent when I was starting my career in Business Intelligence are still applicable to most projects. In the early days, Inmon created the accepted definition of what a data warehouse is &#8211; a [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[38,5,46],"tags":[58,79,49,19],"class_list":["post-3755","post","type-post","status-publish","format-standard","hentry","category-data-modelling","category-sql","category-sql-server","tag-data-modelling","tag-data-warehouse","tag-sql","tag-sql-server"],"aioseo_notices":[],"jetpack_featured_media_url":"","_links":{"self":[{"href":"https:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/3755","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/comments?post=3755"}],"version-history":[{"count":29,"href":"https:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/3755\/revisions"}],"predecessor-version":[{"id":3793,"href":"https:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/3755\/revisions\/3793"}],"wp:attachment":[{"href":"https:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/media?parent=3755"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/categories?post=3755"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/tags?post=3755"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}