{"id":4491,"date":"2022-09-27T20:00:45","date_gmt":"2022-09-27T10:00:45","guid":{"rendered":"http:\/\/bicortex.com\/?p=4491"},"modified":"2022-09-30T09:54:35","modified_gmt":"2022-09-29T23:54:35","slug":"data-build-tool-dbt-the-emerging-standard-for-building-sql-first-data-transformation-pipelines-part-1","status":"publish","type":"post","link":"http:\/\/bicortex.com\/bicortex\/data-build-tool-dbt-the-emerging-standard-for-building-sql-first-data-transformation-pipelines-part-1\/","title":{"rendered":"Data Build Tool (DBT) &#8211; The Emerging Standard For Building SQL-First Data Transformation Pipelines &#8211; Part 1"},"content":{"rendered":"<p class=\"Standard\" style=\"text-align: justify;\">Note: Part 2 of this post can be found <a href=\"http:\/\/bicortex.com\/data-build-tool-dbt-the-emerging-standard-for-building-sql-first-data-transformation-pipelines-part-2\/\" target=\"_blank\" rel=\"noopener\">HERE<\/a>.<\/p>\n<h3 style=\"text-align: center;\">Introduction<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">It\u2019s never a dull moment when working in IT and although Data Warehousing domain was not subjected to the hamster wheel of relentless innovation in the first few decades when Oracle, IBM, Microsoft and SAP reigned supreme, with the advent of cloud computing, it too had to adapt and change. For me, the most prolific changes included the separation of storage and compute, in-database machine learning, on-demand elasticity and server-less database models. The resulting upending of the status quo also had a large impact on the good, old-fashion ETL (Extract Transform Load) paradigm which started to shift to the new, more cloud-aligned architecture and many businesses contemplating Data Warehouse modernization are jumping on the ELT bandwagon. This is also where a suite of new tools started to emerge, and one company with its flagship product started to make serious inroads in this market.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">dbt (data build tool) emerged as a development framework that combines modular SQL with software engineering best practices to make data transformation reliable, fast, and fun. It makes data engineering activities accessible to people with data analyst skills to transform the data in the warehouse using simple SELECT statements, effectively creating your entire transformation process with code. You can write custom business logic using SQL, automate data quality testing, deploy the code, and deliver trusted data with a comprehensive documentation side-by-side with the code. This is more important today than ever due to the shortage of data engineering professionals in the marketplace. Anyone who knows SQL can now build production-grade data pipelines, reducing the entry barriers that previously limited staffing capabilities for legacy technologies. In short, dbt turns your data analysts into engineers and allows them to own the entire analytics engineering workflow.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2021\/11\/DBT_High_Level_Workflow.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-4492\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2021\/11\/DBT_High_Level_Workflow.png\" alt=\"\" width=\"580\" height=\"380\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2021\/11\/DBT_High_Level_Workflow.png 1118w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2021\/11\/DBT_High_Level_Workflow-300x196.png 300w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2021\/11\/DBT_High_Level_Workflow-1024x670.png 1024w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2021\/11\/DBT_High_Level_Workflow-768x503.png 768w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">dbt has two core workflows: building data models and testing data models. It fits nicely into the modern data stack and is cloud agnostic &#8211; meaning it works within each of the major cloud ecosystems: Azure, GCP, and AWS. However, the biggest advantage of dbt is its new approach to building pipelines which traditionally have been quite clunky and inefficient. Some of the most prolific issues with the standard ETL workflow are:<\/p>\n<ul>\n<li style=\"text-align: justify;\">The schema within data warehouses is often strongly defined and controlled. The emphasis in ETL was therefore on getting data into the warehouse in the correct \u201cone true\u201d format, putting the burden on the people loading the data and making the process of getting data into the warehouse slow and fragile.<\/li>\n<li style=\"text-align: justify;\">This warehouse and the ETL processes would usually be managed by centralized data teams. These teams would be a fairly siloed bottleneck, always behind with the needs of the business for integrating and transforming the data.<\/li>\n<li style=\"text-align: justify;\">The ETL stacks and scripts would often be fragile, error prone, and difficult and slow to change.<\/li>\n<li style=\"text-align: justify;\">The tools providing ETL would often be GUI based and proprietary. Not only would they be expensive to license, they would also require specialist skills. This meant that neither the producers or consumers of the data would have access to the ETL scripts or the ability to make changes to them.<\/li>\n<li style=\"text-align: justify;\">Bringing ETL into anything which defines a software development lifecycle was tricky. For instance, the ETL process was always identified as being difficult to source control, version and test. Implementing the concept of development, test and production environments with accurate data management was also way behind the state of the art in the software development world.<\/li>\n<\/ul>\n<p class=\"Standard\" style=\"text-align: justify;\">With dbt, many of the above shortcomings have been addressed, improving reliability, repeatability and collaboration by breaking down organizational silos, and reducing time to market.<\/p>\n<h3 style=\"text-align: center;\">Environment Prep and Sample Data<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">Before I jump into what makes dbt such a powerful framework, I\u2019d like to set the stage and outline the following mocked up example of building an end-to-end pipeline using modern data architecture tools to firstly acquire and load Google Analytics data into an Azure environment and finally to transform it using dbt framework. I believe that rather than installing dbt and running a few scripts to outline its core features it\u2019s better to showcase it on a tangible mini-project which accurately reflects some of the problems many business may be grappling with. For this purpose, I will be following the below script:<\/p>\n<ul>\n<li style=\"text-align: justify;\">Stand up Azure environment (using Azure Python SDK), including Azure SQL database, Azure Data Lake gen2 (ADLS) and associated Resource Group. Technically this part can be done using any cloud provider or even on-premises environment but since modern data stack tends to rely on APIs and tools available from major public cloud vendors, this is in line with more contemporary information architecture and management practices<\/li>\n<li style=\"text-align: justify;\">Build a simple pipeline to acquire Google Analytics data and stage it in ADLS storage as well as Azure SQL database. This script can also be run as Azure Function to create automated, ETL-like process<\/li>\n<li style=\"text-align: justify;\">Install dbt and the supporting SQL Server connector on an isolated local environment<\/li>\n<li style=\"text-align: justify;\">Augment GA data with geocoding information to build a simple, one-table data mart using SQL and dbt Jinja templates<\/li>\n<li style=\"text-align: justify;\">Test our data, create a snapshot using dbt functionality and generate sample project documentation<\/li>\n<\/ul>\n<p class=\"Standard\" style=\"text-align: justify;\">Firstly, let\u2019s provision a sample Azure environment consisting of a dedicated resource group as well as Azure Data Lake Gen 2 and Azure SQL database.<\/p>\n<pre class=\"brush: python; title: ; notranslate\" title=\"\">\r\nfrom azure.identity import AzureCliCredential\r\nfrom azure.mgmt.resource import ResourceManagementClient\r\nfrom azure.mgmt.storage import StorageManagementClient\r\nfrom azure.storage.filedatalake import DataLakeServiceClient\r\nfrom azure.mgmt.sql import SqlManagementClient\r\nfrom humanfriendly import format_timespan\r\nfrom timeit import default_timer as timer\r\nimport time\r\nimport pyodbc\r\nfrom os import popen\r\n\r\n\r\n_RESOURCE_GROUP_NAME = 'gademoresourcegroup2022'\r\n_RESOURCE_GROUP_LOCATION = 'australiaeast'\r\n_STORAGE_ACCOUNT_NAME = 'gademostorageacct2022'\r\n_STORAGE_CONTAINER_NAME = 'gademooutputfiles2022'\r\n_SUBSCRIPTION_ID = 'your_subscription_id'\r\n_DF_LINKED_SERVICE_NAME = 'lsoutputfiles'\r\n_SQL_SERVER_NAME = 'gademosqlserver2022'\r\n_SQL_DB_NAME = 'sourcedb'\r\n_SQL_USERNAME = 'testusername'\r\n_SQL_PASSWORD = 'MyV3ry$trongPa$$word'\r\n_SQL_DRIVER = '{ODBC Driver 18 for SQL Server}'\r\nexternal_IP = popen(&quot;curl -s ifconfig.me&quot;).readline()\r\n\r\n\r\n# create resource group\r\ndef create_resource_group(resource_client, _RESOURCE_GROUP_NAME, _LOCATION):\r\n    print(&quot;Creating Azure Resource Group {rg_name}...&quot;.format(\r\n        rg_name=_RESOURCE_GROUP_NAME), end=&quot;&quot;, flush=True)\r\n    try:\r\n        resource_client.resource_groups.create_or_update(\r\n            _RESOURCE_GROUP_NAME, {'location': _LOCATION})\r\n    except Exception as e:\r\n        print(e)\r\n    rg = &#x5B;g.name for g in resource_client.resource_groups.list()]\r\n    if _RESOURCE_GROUP_NAME in rg:\r\n        print('OK')\r\n\r\n\r\n# create storage account in the nominated resource group\r\ndef create_storage_account(storage_client, _STORAGE_ACCOUNT_NAME, _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION):\r\n    print(&quot;Creating Azure Storage Account {st_acct}...&quot;.format(\r\n        st_acct=_STORAGE_ACCOUNT_NAME), end=&quot;&quot;, flush=True)\r\n    try:\r\n        availability_result = storage_client.storage_accounts.check_name_availability(\r\n            {'name': _STORAGE_ACCOUNT_NAME})\r\n        if not availability_result.name_available:\r\n            print('storage name {st_acct} is already in use. Try another name.'.format(\r\n                st_acct=_STORAGE_ACCOUNT_NAME))\r\n            exit()\r\n        poller = storage_client.storage_accounts.begin_create(_RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME,\r\n                                                              {\r\n                                                                  &quot;location&quot;: _RESOURCE_GROUP_LOCATION,\r\n                                                                  &quot;kind&quot;: &quot;StorageV2&quot;,\r\n                                                                  &quot;is_hns_enabled&quot;: &quot;true&quot;,\r\n                                                                  &quot;sku&quot;: {&quot;name&quot;: &quot;Standard_LRS&quot;, &quot;tier&quot;: &quot;Standard&quot;},\r\n                                                                  &quot;properties&quot;: {\r\n                                                                      &quot;minimumTlsVersion&quot;: &quot;TLS1_2&quot;,\r\n                                                                      &quot;allowBlobPublicAccess&quot;: &quot;true&quot;,\r\n                                                                      &quot;networkAcls&quot;: {\r\n                                                                          &quot;bypass&quot;: &quot;AzureServices&quot;,\r\n                                                                          &quot;virtualNetworkRules&quot;: &#x5B;],\r\n                                                                          &quot;ipRules&quot;: &#x5B;],\r\n                                                                          &quot;defaultAction&quot;: &quot;Allow&quot;\r\n                                                                      }\r\n                                                                  }})\r\n        account_result = poller.result()\r\n        if account_result.name == _STORAGE_ACCOUNT_NAME:\r\n            print('OK')\r\n    except Exception as e:\r\n        print(e)\r\n\r\n\r\n# create storage container aka 'filesystem' in the nominated storage account\r\ndef create_adls_container(_STORAGE_ACCOUNT_NAME, _STORAGE_CONTAINER_NAME):\r\n    print(&quot;Creating Azure Data Lake Storage Container {st_ct}...&quot;.format(\r\n        st_ct=_STORAGE_CONTAINER_NAME), end=&quot;&quot;, flush=True)\r\n    keys = storage_client.storage_accounts.list_keys(\r\n        _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME)\r\n    account_url = &quot;https:\/\/{}.dfs.core.windows.net\/&quot;.format(\r\n        _STORAGE_ACCOUNT_NAME)\r\n    datalake_service = DataLakeServiceClient(\r\n        account_url=account_url, credential=keys.keys&#x5B;0].value\r\n    )\r\n    try:\r\n        datalake_service.create_file_system(\r\n            file_system=_STORAGE_CONTAINER_NAME)\r\n        file_systems = &#x5B;i.name for i in datalake_service.list_file_systems()]\r\n        if _STORAGE_CONTAINER_NAME in file_systems:\r\n            print('OK')\r\n    except Exception as e:\r\n        print(e)\r\n\r\n\r\n# create azure sql server in the nominated resource group\r\ndef create_sql_server(sql_client, _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME,\r\n                      _RESOURCE_GROUP_LOCATION, _SQL_USERNAME, _SQL_PASSWORD):\r\n    print(&quot;Creating Azure SQL Server {ssvr_name}...&quot;.format(\r\n        ssvr_name=_SQL_SERVER_NAME), end=&quot;&quot;, flush=True)\r\n    try:\r\n        sql_server = sql_client.servers.begin_create_or_update(\r\n            _RESOURCE_GROUP_NAME,\r\n            _SQL_SERVER_NAME,\r\n            {\r\n                'location': _RESOURCE_GROUP_LOCATION,\r\n                'version': '12.0',\r\n                'administrator_login': _SQL_USERNAME,\r\n                'administrator_login_password': _SQL_PASSWORD\r\n            }\r\n        )\r\n        sql_server.wait()\r\n    except Exception as e:\r\n        print(e)\r\n    ssvr = &#x5B;i.name for i in sql_client.servers.list_by_resource_group(\r\n        _RESOURCE_GROUP_NAME)]\r\n    if _SQL_SERVER_NAME in ssvr:\r\n        print('OK')\r\n\r\n\r\n# create azure sql db in the nominated resource group\r\ndef create_sql_db(sql_client, _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME, _SQL_DB_NAME, _RESOURCE_GROUP_LOCATION):\r\n    print(&quot;Creating Azure SQL Database {db_name}...&quot;.format(\r\n        db_name=_SQL_DB_NAME), end=&quot;&quot;, flush=True)\r\n    try:\r\n        sql_db = sql_client.databases.begin_create_or_update(\r\n            _RESOURCE_GROUP_NAME,\r\n            _SQL_SERVER_NAME,\r\n            _SQL_DB_NAME,\r\n            {\r\n                'location': _RESOURCE_GROUP_LOCATION,\r\n                'collation': 'SQL_Latin1_General_CP1_CI_AS',\r\n                'create_mode': 'default',\r\n                'requested_service_objective_name': 'Basic'\r\n            }\r\n        )\r\n        sql_db.wait()\r\n    except Exception as e:\r\n        print(e)\r\n    dbs = &#x5B;i.name for i in sql_client.databases.list_by_server(\r\n        _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME)]\r\n    if _SQL_DB_NAME in dbs:\r\n        print('OK')\r\n\r\n\r\n# configure azure sql server firewall to accept connections from the host ip address\r\ndef configure_firewall(sql_client, _SQL_DRIVER, _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME, _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD, external_IP):\r\n    print(&quot;Configuring Azure SQL Server Firewall Settings...&quot;, end=&quot;&quot;, flush=True)\r\n    try:\r\n        sql_client.firewall_rules.create_or_update(\r\n            _RESOURCE_GROUP_NAME,\r\n            _SQL_SERVER_NAME,\r\n            &quot;firewall_rule_name_&quot; + external_IP,\r\n            {\r\n                &quot;startIpAddress&quot;: external_IP,\r\n                &quot;endIpAddress&quot;: external_IP\r\n            }\r\n        )\r\n    except Exception as e:\r\n        print(e)\r\n    _AZURE_SQL_SERVER = _SQL_SERVER_NAME + '.database.windows.net'\r\n    with pyodbc.connect('DRIVER='+_SQL_DRIVER+';SERVER='+_AZURE_SQL_SERVER+';PORT=1433;DATABASE='+_SQL_DB_NAME+';UID='+_SQL_USERNAME+';PWD='+_SQL_PASSWORD) as conn:\r\n        with conn.cursor() as cursor:\r\n            cursor.execute(&quot;SELECT @@version&quot;)\r\n            row = cursor.fetchone()\r\n    if row:\r\n        print('OK')\r\n\r\n\r\n\r\nif __name__ == '__main__':\r\n    print(&quot;\\n&quot;)\r\n    execution_start_time = timer()\r\n    credentials = AzureCliCredential()\r\n    storage_client = StorageManagementClient(credentials, _SUBSCRIPTION_ID)\r\n    resource_client = ResourceManagementClient(credentials, _SUBSCRIPTION_ID)\r\n    sql_client = SqlManagementClient(credentials, _SUBSCRIPTION_ID)\r\n    resource_groups = &#x5B;i.name for i in resource_client.resource_groups.list()]\r\n    if _RESOURCE_GROUP_NAME in resource_groups:\r\n        print(&quot;Deleting existing resource group{res_gr}...&quot;.format(\r\n            res_gr=_RESOURCE_GROUP_NAME), end=&quot;&quot;, flush=True)\r\n        delete_async_operation = resource_client.resource_groups.begin_delete(\r\n            _RESOURCE_GROUP_NAME)\r\n        delete_async_operation.wait()\r\n        print('OK')\r\n\r\n    create_resource_group(\r\n        resource_client, _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION)\r\n    create_storage_account(storage_client, _STORAGE_ACCOUNT_NAME,\r\n                           _RESOURCE_GROUP_NAME, _RESOURCE_GROUP_LOCATION)\r\n    create_adls_container(_STORAGE_ACCOUNT_NAME, _STORAGE_CONTAINER_NAME)\r\n    create_sql_server(sql_client, _RESOURCE_GROUP_NAME, _SQL_SERVER_NAME,\r\n                      _RESOURCE_GROUP_LOCATION, _SQL_USERNAME, _SQL_PASSWORD)\r\n    create_sql_db(sql_client, _RESOURCE_GROUP_NAME,\r\n                  _SQL_SERVER_NAME, _SQL_DB_NAME, _RESOURCE_GROUP_LOCATION)\r\n    configure_firewall(sql_client, _SQL_DRIVER, _RESOURCE_GROUP_NAME,\r\n                       _SQL_SERVER_NAME, _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD, external_IP)\r\n    execution_end_time = timer()\r\n    elapsed_duration = execution_end_time - execution_start_time\r\n    print('Elapsed resources(s) provisioning time was {time}.\\n'.format(\r\n        time=format_timespan(elapsed_duration)))\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">Running the above script produces the following output, providing we have the Azure subscription set up and configured on the local environment.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2021\/11\/DBT_Env_Prep_Python_Output.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-4497\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2021\/11\/DBT_Env_Prep_Python_Output.png\" alt=\"\" width=\"580\" height=\"159\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2021\/11\/DBT_Env_Prep_Python_Output.png 580w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2021\/11\/DBT_Env_Prep_Python_Output-300x82.png 300w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Now that we have all artefacts supporting Google Analytics data acquisition in place, let\u2019s start by defining GA attributes we\u2019d like to source, and stage those in our data lake and SQL database. For this exercise I used data from my own website \u2013 the one you\u2019re reading right now \u2013 and restricted it to last 30 days and the following attributes: PagePath, PageTitle, Country, City, Medium, DeviceCategory, OperatingSystem, Browser and SessionDuration. I won\u2019t go into how to set up GA account in this post as there are countless other internet resources on this topic and most of this code is self-explanatory. The only thing that was unnecessarily frustrating and took me a while to figure out was creating a service account and providing it access to my GA view, as denoted by _GA_Service_ACCT_KEY (JSON file) and _GA_VIEW_ID variables. Getting the account and its key generated was not a problem but modifying security details so that the service account could access the view was quite convoluted. The following script is responsible for GA data acquisition, tabulating and formatting it into a Pandas data frame and inserting it into the provisioned Azure SQL DB (table object is also created\/truncated as part of this code).<\/p>\n<pre class=\"brush: python; title: ; notranslate\" title=\"\">\r\nfrom googleapiclient.discovery import build\r\nfrom oauth2client.service_account import ServiceAccountCredentials\r\nfrom pathlib import PureWindowsPath\r\nfrom azure.storage.filedatalake import DataLakeServiceClient\r\nfrom azure.mgmt.storage import StorageManagementClient\r\nfrom azure.identity import AzureCliCredential\r\nimport pandas as pd\r\nimport pyodbc\r\nimport time\r\n\r\n\r\n_GA_SCOPES = &#x5B;'https:\/\/www.googleapis.com\/auth\/analytics.readonly']\r\n_GA_VIEW_ID = 'your_ga_view_id'\r\n_GA_OUTPUT_FILE_NAME = 'GADataExtract-'+time.strftime(&quot;%Y%m%d-%H%M%S&quot;)+'.csv'\r\n_GA_Service_ACCT_KEY = PureWindowsPath('C:\/your_file_path\/your_json_service_account_key_file.json')\r\n_SQL_SERVER_NAME = 'gademosqlserver2022.database.windows.net'\r\n_SQL_DB_NAME = 'sourcedb'\r\n_SQL_USERNAME = 'testusername'\r\n_SQL_PASSWORD = 'MyV3ry$trongPa$$word'\r\n_SQL_DRIVER = '{ODBC Driver 18 for SQL Server}'\r\n_RESOURCE_GROUP_NAME = 'gademoresourcegroup2022'\r\n_STORAGE_CONTAINER_NAME = 'gademooutputfiles2022'\r\n_STORAGE_ACCOUNT_NAME = 'gademostorageacct2022'\r\n_STORAGE_CONTAINER_DIRECTORY_NAME = time.strftime(&quot;%Y%m%d&quot;)\r\n_SUBSCRIPTION_ID = 'your_subscription_id'\r\n_GA_OUTPUT_FILE_PATH = PureWindowsPath('C:\/your_file_path\/{file_name}'.format(\r\n        file_name=_GA_OUTPUT_FILE_NAME))\r\n_SCHEMAS = &#x5B;'stg', 'mart']\r\n\r\n# get Google Analytics service account credentials\r\ndef initialize_analyticsreporting():\r\n    credentials = ServiceAccountCredentials.from_json_keyfile_name(\r\n        _GA_Service_ACCT_KEY, _GA_SCOPES)\r\n    analytics = build('analyticsreporting', 'v4', credentials=credentials)\r\n    return analytics\r\n\r\n\r\ndef get_report(analytics):\r\n    return analytics.reports().batchGet(\r\n        body={\r\n            'reportRequests': &#x5B;\r\n                {\r\n                    'viewId': _GA_VIEW_ID,\r\n                    'dateRanges': &#x5B;{'startDate': '30daysAgo', 'endDate': 'today'}],\r\n                    'metrics': &#x5B;{'expression': 'ga:sessions'}],\r\n                    'dimensions': &#x5B;{&quot;name&quot;: &quot;ga:pagePath&quot;}, {&quot;name&quot;: &quot;ga:pageTitle&quot;}, {&quot;name&quot;: &quot;ga:country&quot;}, {&quot;name&quot;: &quot;ga:city&quot;}, {&quot;name&quot;: &quot;ga:medium&quot;}, {&quot;name&quot;: &quot;ga:deviceCategory&quot;}, {&quot;name&quot;: &quot;ga:operatingSystem&quot;}, {&quot;name&quot;: &quot;ga:browser&quot;}],\r\n                    'orderBys': &#x5B;{&quot;fieldName&quot;: &quot;ga:sessions&quot;, &quot;sortOrder&quot;: &quot;DESCENDING&quot;}],\r\n                    'pageSize': 1000\r\n                }]\r\n        }\r\n    ).execute()\r\n\r\n\r\ndef ga_response_dataframe(response):\r\n    row_list = &#x5B;]\r\n    for report in response.get('reports', &#x5B;]):\r\n        column_header = report.get('columnHeader', {})\r\n        dimension_headers = column_header.get('dimensions', &#x5B;])\r\n        metric_headers = column_header.get(\r\n            'metricHeader', {}).get('metricHeaderEntries', &#x5B;])\r\n        for row in report.get('data', {}).get('rows', &#x5B;]):\r\n            row_dict = {}\r\n            dimensions = row.get('dimensions', &#x5B;])\r\n            date_range_values = row.get('metrics', &#x5B;])\r\n\r\n            for header, dimension in zip(dimension_headers, dimensions):\r\n                row_dict&#x5B;header] = dimension\r\n\r\n            for i, values in enumerate(date_range_values):\r\n                for metric, value in zip(metric_headers, values.get('values')):\r\n                    if ',' in value or '.' in value:\r\n                        row_dict&#x5B;metric.get('name')] = float(value)\r\n                    else:\r\n                        row_dict&#x5B;metric.get('name')] = int(value)\r\n\r\n            row_list.append(row_dict)\r\n    return pd.DataFrame(row_list)\r\n\r\n\r\n# upload a file to data lake in Azure\r\ndef upload_file_to_lake(storage_client, ga_file_content, _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME, _STORAGE_CONTAINER_NAME, _STORAGE_CONTAINER_DIRECTORY_NAME, _GA_OUTPUT_FILE_PATH, _GA_OUTPUT_FILE_NAME):\r\n    keys = storage_client.storage_accounts.list_keys(\r\n        _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME)\r\n    account_url = &quot;https:\/\/{}.dfs.core.windows.net\/&quot;.format(\r\n        _STORAGE_ACCOUNT_NAME)\r\n    service_client = DataLakeServiceClient(account_url=&quot;{}:\/\/{}.dfs.core.windows.net&quot;.format(\r\n        &quot;https&quot;, _STORAGE_ACCOUNT_NAME), credential=keys.keys&#x5B;0].value)\r\n    file_system_client = service_client.get_file_system_client(\r\n        file_system=_STORAGE_CONTAINER_NAME)\r\n    dir_client = file_system_client.get_directory_client(\r\n        _STORAGE_CONTAINER_DIRECTORY_NAME)\r\n    dir_client.create_directory()\r\n    file_client = dir_client.create_file(_GA_OUTPUT_FILE_NAME)\r\n    file_client.append_data(ga_file_content, 0, len(ga_file_content))\r\n    file_client.flush_data(len(ga_file_content))\r\n\r\n\r\n# create required database schemas\r\ndef create_stg_schema(_SQL_DRIVER, _SQL_SERVER_NAME, _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD, _SCHEMAS):\r\n    with pyodbc.connect('DRIVER='+_SQL_DRIVER+';SERVER='+_SQL_SERVER_NAME+';PORT=1433;DATABASE='+_SQL_DB_NAME+';UID='+_SQL_USERNAME+';PWD=' + _SQL_PASSWORD) as conn:\r\n        with conn.cursor() as cursor:\r\n            for schema in _SCHEMAS:\r\n                cursor.execute('''IF (NOT EXISTS (SELECT TOP 1 (1) FROM sys.schemas WHERE name = '{schema}')) \r\n                                BEGIN\r\n                                    EXEC ('CREATE SCHEMA &#x5B;{schema}] AUTHORIZATION &#x5B;dbo]')\r\n                                END'''.format(schema=schema))\r\n\r\n\r\n# create required objects in the nomainated database and populate with data\r\ndef insert_into_azuresql(ga_data, _SQL_DRIVER, _SQL_SERVER_NAME, _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD):\r\n    with pyodbc.connect('DRIVER='+_SQL_DRIVER+';SERVER='+_SQL_SERVER_NAME+';PORT=1433;DATABASE='+_SQL_DB_NAME+';UID='+_SQL_USERNAME+';PWD=' + _SQL_PASSWORD) as conn:\r\n        with conn.cursor() as cursor:\r\n            if not cursor.tables(table='ga_data', tableType='TABLE').fetchone():\r\n                cursor.execute('''CREATE TABLE dbo.ga_data (ID INT IDENTITY (1,1),\r\n                                                            PagePath NVARCHAR(1024),\r\n                                                            PageTitle NVARCHAR (2048),\r\n                                                            Country NVARCHAR (256),\r\n                                                            City NVARCHAR (256),\r\n                                                            Medium NVARCHAR (256),\r\n                                                            DeviceCategory NVARCHAR (512),\r\n                                                            OperatingSystem VARCHAR (128),\r\n                                                            Browser NVARCHAR (256),\r\n                                                            SessionDuration INT)''')  \r\n                cursor.commit()\r\n            for index, row in ga_data.iterrows():\r\n                cursor.execute('''INSERT INTO dbo.ga_data\r\n                                    (PagePath,\r\n                                    PageTitle,\r\n                                    Country,\r\n                                    City,\r\n                                    Medium,\r\n                                    DeviceCategory,\r\n                                    OperatingSystem,\r\n                                    Browser,\r\n                                    SessionDuration)\r\n                          values (?, ?, ?, ?, ?, ?, ?, ?, ?)''',\r\n                               row&#x5B;0], row&#x5B;1], row&#x5B;2], row&#x5B;3], row&#x5B;4], row&#x5B;5], row&#x5B;6], row&#x5B;7], row&#x5B;8])\r\n                cursor.commit()\r\n            cursor.execute('SELECT TOP (1) 1 FROM dbo.ga_data')\r\n            rows = cursor.fetchone()\r\n            if rows:\r\n                print('All Good!')\r\n            else:\r\n                raise ValueError(\r\n                    'No data generated in the source table. Please troubleshoot!'\r\n                )\r\n\r\n\r\ndef main():\r\n    credentials = AzureCliCredential()\r\n    storage_client = StorageManagementClient(credentials, _SUBSCRIPTION_ID)\r\n    analytics = initialize_analyticsreporting()\r\n    response = get_report(analytics)\r\n    df = ga_response_dataframe(response)\r\n    df.columns = &#x5B;x.replace(':', '_') for x in df.columns]\r\n    ga_data = df.replace('(none)', '').replace(\r\n        '(not set)', '')\r\n    ga_data.to_csv(_GA_OUTPUT_FILE_NAME, index=False)\r\n    with open(_GA_OUTPUT_FILE_PATH) as file:\r\n        ga_file_content = file.read()\r\n    upload_file_to_lake(storage_client, ga_file_content, _RESOURCE_GROUP_NAME, _STORAGE_ACCOUNT_NAME,\r\n                        _STORAGE_CONTAINER_NAME, _STORAGE_CONTAINER_DIRECTORY_NAME, _GA_OUTPUT_FILE_PATH, _GA_OUTPUT_FILE_NAME)\r\n    create_stg_schema(_SQL_DRIVER, _SQL_SERVER_NAME,\r\n                      _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD, _SCHEMAS)\r\n    insert_into_azuresql(ga_data, _SQL_DRIVER, _SQL_SERVER_NAME,\r\n                         _SQL_DB_NAME, _SQL_USERNAME, _SQL_PASSWORD)\r\n    \r\nif __name__ == '__main__':\r\n    main()\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">When executed, our Azure SQL DB table is created (if it does not exist) and GA data inserted into the ADLS container and the aforementioned table (click on image to enlarge).<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2022\/06\/DBT_Azure_SQL__Lake_Google_Analytics_Data.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-4532\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2022\/06\/DBT_Azure_SQL__Lake_Google_Analytics_Data.png\" alt=\"\" width=\"580\" height=\"463\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2022\/06\/DBT_Azure_SQL__Lake_Google_Analytics_Data.png 1335w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2022\/06\/DBT_Azure_SQL__Lake_Google_Analytics_Data-300x240.png 300w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2022\/06\/DBT_Azure_SQL__Lake_Google_Analytics_Data-1024x818.png 1024w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2022\/06\/DBT_Azure_SQL__Lake_Google_Analytics_Data-768x613.png 768w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Now that we have our GA data, lets launch into how dbt can help us with shaping it by building our sample ELT pipeline.<\/p>\n<h3 style=\"text-align: center;\">DBT Installation<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">Installing dbt is a fairly straightforward affair. One can also go with a Docker container or WSL approach but in this post I\u2019ll outline the steps to perform a local installation in a Python virtual environment on a Windows system. Providing Python is already installed, let\u2019s go ahead and set up our virtual environment &#8211; a self-contained Python installation.<\/p>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\npython -m venv dbt_env\r\n\\dbt_env\\Scripts\\activate.ps1\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">Once set up and activated, we can install dbt using pip and install dbt-sqlserver adapter as dbt does not support SQL Server out-of-the-box. You can find the official GitHub repo for it will all the supporting documentation in the following <a href=\"https:\/\/github.com\/dbt-msft\/dbt-sqlserver\" target=\"_blank\" rel=\"noopener\">LINK<\/a>.<\/p>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\npip install dbt-core\r\npip install dbt-sqlserver\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">Next, we will initialize our demo dbt project using dbt init command and provide a new name for the project we&#8217;re creating (in this case it&#8217;s azure_sql_demo). Please also note that newer versions of dbt only allow for lower case letters and underscores being used when specifying the project name.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2022\/03\/DBT_Debug_PowerShell_Output.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-4519\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2022\/03\/DBT_Debug_PowerShell_Output.png\" alt=\"\" width=\"580\" height=\"412\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2022\/03\/DBT_Debug_PowerShell_Output.png 580w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2022\/03\/DBT_Debug_PowerShell_Output-300x213.png 300w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">As we can see from the output, a sample profiles.yml file containing placeholder configuration was created. Depending on which option we selected i.e. (1) for BigQuery, (2) for PostgreSQL etc., the default profiles.yml file contains only generic properties or get created in an empty state. This will need to be amended to reflect our Azure SQL Server environment details e.g. user name, password, driver etc. As profiles.yml file contains database connections and credentials (sensitive information), it is generated in the ~\/.dbt\/ folder and not the project folder. On the other hand, the configuration file, the main file defining settings which apply to the whole project called dbt_project.yml, contains placeholders for development and production environment. Let\u2019s go ahead populate profiles.yml file with the required information, ensuring that the profile name from dbt_project.yml matches that from profiles.yml file.<\/p>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\nazure_sql_demo:\r\n    target: dev\r\n    outputs:\r\n        dev:\r\n            type: sqlserver\r\n            driver: SQL Server\r\n            server: demosqlserver2022.database.windows.net\r\n            database: sourcedb\r\n            port: 1433\r\n            schema: stg\r\n            user: testusername\r\n            password: MyV3ry$trongPa$$word\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">Finally, we can check our target database connectivity to ensure all the parameters have been entered correctly by running dbt debug command as per below. It&#8217;s a good sign if you see no error messages at this point and all the critical outputs (color-coded in green) are telling us that all checks have passed.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2022\/03\/DBT_Init_PowerShell_Output.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-4520\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2022\/03\/DBT_Init_PowerShell_Output.png\" alt=\"\" width=\"580\" height=\"420\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2022\/03\/DBT_Init_PowerShell_Output.png 580w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2022\/03\/DBT_Init_PowerShell_Output-300x217.png 300w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">In the next part of this post I will dive deeper in the functionality dbt provides out of the box and some of its features, for example, snapshots, tests, docs and more. You can view part 2 of this series <a href=\"http:\/\/bicortex.com\/data-build-tool-dbt-the-emerging-standard-for-building-sql-first-data-transformation-pipelines-part-2\/\" target=\"_blank\" rel=\"noopener\">HERE<\/a>.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Note: Part 2 of this post can be found HERE. Introduction It\u2019s never a dull moment when working in IT and although Data Warehousing domain was not subjected to the hamster wheel of relentless innovation in the first few decades when Oracle, IBM, Microsoft and SAP reigned supreme, with the advent of cloud computing, it [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[61,38,5,1],"tags":[65,62,79,87,41,49,19],"class_list":["post-4491","post","type-post","status-publish","format-standard","hentry","category-cloud-computing","category-data-modelling","category-sql","category-uncategorized","tag-azure","tag-cloud-computing","tag-data-warehouse","tag-dbt","tag-python","tag-sql","tag-sql-server"],"aioseo_notices":[],"jetpack_featured_media_url":"","_links":{"self":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/4491","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/comments?post=4491"}],"version-history":[{"count":38,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/4491\/revisions"}],"predecessor-version":[{"id":4630,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/4491\/revisions\/4630"}],"wp:attachment":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/media?parent=4491"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/categories?post=4491"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/tags?post=4491"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}