{"id":4937,"date":"2024-07-22T15:32:14","date_gmt":"2024-07-22T05:32:14","guid":{"rendered":"http:\/\/bicortex.com\/?p=4937"},"modified":"2024-08-01T17:11:48","modified_gmt":"2024-08-01T07:11:48","slug":"using-polybase-and-in-database-python-runtime-for-building-sql-server-to-azure-data-lake-data-extraction-pipelines","status":"publish","type":"post","link":"http:\/\/bicortex.com\/bicortex\/using-polybase-and-in-database-python-runtime-for-building-sql-server-to-azure-data-lake-data-extraction-pipelines\/","title":{"rendered":"Using Polybase and In-Database Python Runtime for Building SQL Server to Azure Data Lake Data Extraction Pipelines"},"content":{"rendered":"<h3 style=\"text-align: center;\">Introduction<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">One of the projects I assisted with recently dictated that Parquet files staged in Azure Data Lake were to be consumed using a traditional ELT\\ETL architecture i.e. using Databricks, Data Factory or a similar tool and loaded into SQL Server tables. However, given the heighten data sensitivity and security requirements, using additional vendors or tools for bringing these pipelines on-line would mean obtaining <a href=\"https:\/\/www.cyber.gov.au\/irap\" target=\"_blank\" rel=\"noopener\">IRAP<\/a> (Information Security Registered Assessors Program) assessment first, which in turn would result in protracted development timelines, thus higher cost. The solution turned out to be quite simple \u2013 use out-of-the-box SQL Server functionality and try to query\/extract this data with the help of Polybase.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\"><a href=\"https:\/\/learn.microsoft.com\/en-us\/sql\/relational-databases\/polybase\/polybase-guide?view=sql-server-ver16\" target=\"_blank\" rel=\"noopener\">Polybase<\/a>, mainly used for data virtualization and federation, enables your SQL Server instance to query data with T-SQL directly from SQL Server, Oracle, Teradata, MongoDB, Cosmos DB and other database engines without separately installing client connection software. While Polybase version 1 only supported Hadoop using Java, version 2 included a set of ODBC drivers and was released with MSSQL 2019. Version 3 (applicable to SQL Server 2022) is a modernized take on Polybase and includes REST (Representative State Transfer) as the interface for intra-software communication. REST APIs are service endpoints that support sets of HTTP operations (methods), which provide create, retrieve, update or delete access to service&#8217;s resources. The set up is quite simple and with SQL Server 2022, Polybase now also supports CSV, Parquet, and Delta files stored on Azure Storage Account v2, Azure Data Lake Storage Gen2, or any S3-compliant object storage. This meant that querying ADLS parquet files was just a few T-SQL commands away and the project could get underway without the need for yet another tool and bespoke set of integrations.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">This got me thinking&#8230;I recently published a <a href=\"http:\/\/bicortex.com\/sql-server-to-snowflake-solution-architecture-and-implementation-how-to-extract-ingest-and-model-wide-world-importers-database-using-snowflake-platform\/\" target=\"_blank\" rel=\"noopener\">blog post<\/a> on how SQL Server data can be moved into Snowflake and for that architecture I used bcp utility Python wrapper for data extraction and SSIS package for orchestration and data upload. The solution worked very well but this new-found interest in Polybase led me down the path of using it not only for data virtualization but also as a pseudo-ELT tool used for pushing data into ADLS for upstream consumption. This capability, coupled with a sprinkling of Python code for managing object storage and post-export data validation, allowed for a more streamlined approach to moving data out of SQL Server. It also gave way to a seamless blending of T-SQL and Python code in a single code base and the resulting pipeline handled container storage management, data extraction and data validation from a single stored procedure.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Let\u2019s look at how Polybase and in-database SQL Server Python integration can be used to build a simple framework for managing &#8220;E&#8221; in the ELT.<\/p>\n<h3 style=\"text-align: center;\">Architecture Approach<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">This solution architecture takes advantage of both: Polybase v3 and in-database custom Python runtime and blends both into a mini-framework which can be used to automate data extraction into a series of flat files e.g. csv, text, parquet to allow other applications to further query or integrate with this data. Apache Parquet is a common file format used in many data integration and processing activities and this pattern allows for a native Parquet files extraction using out-of-the-box SQL Server functionality, with no additional libraries and plug-ins required. Outside of unsupported data type limitations (please see T-SQL code and exclusion defined in the script), it marries multiple, different programmatic paradigms together, resulting in Python and SQL engines running side-by-side and providing robust integration with a range of database vendors and cloud storage providers (including those compatible with S3 APIs).<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">As many times before, I will also use Wide World Importers OLAP database as my source data. WWI copy can be downloaded for free from <a href=\"https:\/\/github.com\/Microsoft\/sql-server-samples\/releases\/tag\/wide-world-importers-v1.0\" target=\"_blank\" rel=\"noopener\">HERE<\/a>.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_REST_and_Python_Runtime_Architecture.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-4946\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_REST_and_Python_Runtime_Architecture.png\" alt=\"\" width=\"580\" height=\"381\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_REST_and_Python_Runtime_Architecture.png 1067w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_REST_and_Python_Runtime_Architecture-300x197.png 300w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_REST_and_Python_Runtime_Architecture-1024x672.png 1024w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_REST_and_Python_Runtime_Architecture-768x504.png 768w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Polybase requires minimal effort to install and configure as it&#8217;s already a SQL Server native functionality. After installing Polybase Query Service, the remaining configuration activities can be done in SSMS. First, let\u2019s ensure we have Azure Storage Account created (detailed instructions are in <a href=\"https:\/\/learn.microsoft.com\/en-us\/azure\/storage\/common\/storage-account-create?tabs=azure-portal\" target=\"_blank\" rel=\"noopener\">THIS<\/a> link) and enable Polybase and allow export functionality on the target instance.<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nEXEC sp_configure @configname = 'polybase enabled', @configvalue = 1;\r\nRECONFIGURE;\r\nGO\r\nEXEC sp_configure 'allow polybase export', 1;\r\nGO\r\n \r\nSELECT SERVERPROPERTY ('IsPolyBaseInstalled') AS IsPolyBaseInstalled;\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">Next, we need to create encryption keys, database scoped credential, external data source and external file format.<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nUSE WideWorldImporters\r\nGO\r\n-- create encryption key\r\nIF  EXISTS\r\n(\r\n    SELECT *\r\n    FROM sys.symmetric_keys\r\n    WHERE &#x5B;name] = '##MS_DatabaseMasterKey##'\r\n)\r\nBEGIN\r\n    DROP MASTER KEY;\r\n    CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'Your_Complex_Pa$$word';\r\nEND;\r\n  \r\n-- create database scoped credential\r\nUSE WideWorldImporters\r\nGO\r\nIF EXISTS\r\n(\r\n    SELECT *\r\n    FROM sys.database_scoped_credentials\r\n    WHERE name = 'azblobstore'\r\n)\r\nBEGIN\r\n    DROP DATABASE SCOPED CREDENTIAL azblobstore;\r\nEND\r\n \r\nUSE WideWorldImporters\r\nGO\r\nCREATE DATABASE SCOPED CREDENTIAL azblobstore\r\nWITH IDENTITY = 'SHARED ACCESS SIGNATURE',\r\n     SECRET = 'Your_SAS_Key';\r\nGO\r\n \r\n-- create external data source pointing to the storage account location in Azure\r\nIF EXISTS (SELECT * FROM sys.external_data_sources WHERE name ='azblob')\r\nBEGIN\r\n    DROP EXTERNAL DATA SOURCE azblob;\r\nEND\r\nCREATE EXTERNAL DATA SOURCE azblob\r\nWITH (\r\nLOCATION = 'abs:\/\/demostorageaccount.blob.core.windows.net\/testcontainer\/',\r\nCREDENTIAL = azblobstore);\r\n \r\n-- create external file format for Parquet file type\r\nUSE WideWorldImporters\r\nGO\r\nIF EXISTS (SELECT * FROM sys.external_file_formats WHERE name = 'ParquetFileFormat')\r\nBEGIN\r\nDROP EXTERNAL FILE FORMAT ParquetFileFormat;\r\nEND\r\nCREATE EXTERNAL FILE FORMAT ParquetFileFormat WITH(FORMAT_TYPE  = PARQUET);\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">Finally, we can test out our configuration and providing all the parameters have been configured correctly, we should be able to use CETAS (create external table as) functionality to copy table\u2019s data into a parquet file in Azure blob.<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nUSE WideWorldImporters\r\nGO\r\nIF OBJECT_ID('customertransactions', 'U') IS NOT NULL\r\nBEGIN\r\n    DROP EXTERNAL TABLE customertransactions\r\nEND\r\nGO\r\nCREATE EXTERNAL TABLE customertransactions\r\nWITH(\r\nLOCATION  = 'customertransactions\/',\r\nDATA_SOURCE = azblob,\r\nFILE_FORMAT = ParquetFileFormat)\r\nAS SELECT * FROM &#x5B;Sales].&#x5B;CustomerTransactions];\r\nGO\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">When it comes to Python installation, Microsoft provides a detailed overview of all steps required to install Machine Learning Services (Python and R) on Windows in the following <a href=\"https:\/\/learn.microsoft.com\/en-us\/sql\/machine-learning\/install\/sql-machine-learning-services-windows-install-sql-2022?view=sql-server-ver16\" target=\"_blank\" rel=\"noopener\">LINK<\/a>, however, this documentation does not include Python v.3.10 (the required Python interpreter version for SQL Server 2022) download link. Beginning with SQL Server 2022 (16.x), runtimes for R, Python, and Java are no longer shipped or installed with SQL Server setup so anyone wishing to run in-database Python will need to download it and install it manually. To download Python 3.10.0 go to the <a href=\"https:\/\/www.python.org\/downloads\/release\/python-3100\/\" target=\"_blank\" rel=\"noopener\">following location<\/a> and download the required binaries. Afterwards, follow the installation process and steps as outlined by Microsoft documentation. When concluded, we can run a short script to ensure running external scripts is enabled and Python SQL Server integration is working, and the returned value is as expected.<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nEXEC sp_execute_external_script @language = N'Python',\r\n@script = N'OutputDataSet = InputDataSet;',\r\n@input_data_1 = N'SELECT 1 AS PythonValue'\r\nWITH RESULT SETS ((PValue int NOT NULL));\r\nGO\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">In order to make the Parquet files extract and ingestion repeatable (beyond 1st run), we need to ensure the created files can be deleted first. This is due to the fact that defining file names is not an option, neither can they be overwritten (only file location can be specified) and as such, any subsequent run will result in a clash without files being purged first. Interacting with Azure Blob Storage and files can be done through a range of different technologies, but in order to &#8220;consolidate&#8221; the approach and ensure that we have one code base which can act on many different requirements, all within the confines of SQL Server and no external dependencies, it&#8217;s best to do it via Python (will run as part of the same stored procedure code) and pip-install the required libraries first. In SQL Server 2022, recommended Python interpreter location is in C:\\Program Files\\Python310 directory. To install additional libraries, we need to go down a level and access pip in C:\\Program Files\\Python310\\Scripts directory as admin. I will also install additional libraries to conduct post-export data validation as per below.<\/p>\n<pre class=\"brush: python; title: ; notranslate\" title=\"\">\r\npip install azure-storage-blob\r\npip install pyarrow\r\npip install pandas\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">Now that we have our storage account created and Polybase and Python runtime configured, all we need is our account key and account name in order to execute the following script directly from the SQL Server instance.<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nDECLARE @az_account_name VARCHAR(512) = 'demostorageaccount';\r\nDECLARE @az_account_key VARCHAR(1024) = 'Your_Storage_Account_Key';\r\n \r\nEXECUTE sp_execute_external_script @language = N'Python',\r\n                                   @script = N'\r\nimport azure.storage.blob as b\r\n \r\naccount_name = account_name\r\naccount_key = account_key\r\n \r\ndef delete_blobs(container):\r\n    try:\r\n        blobs = block_blob_service.list_blobs(container)\r\n        for blob in blobs:\r\n            if (blob.name.endswith(''.parquet'') or blob.name.endswith(''_'')):\r\n                block_blob_service.delete_blob(container, blob.name, snapshot=None)\r\n    except Exception as e:\r\n        print(e)\r\n \r\ndef delete_directories(container):\r\n    try:\r\n        blobs = block_blob_service.list_blobs(container, delimiter=''\/'')\r\n        for blob in blobs:\r\n            if blob.name.endswith(''\/''):\r\n                delete_sub_blobs(container, blob.name)\r\n                blobs_in_directory = list(block_blob_service.list_blobs(container, prefix=blob.name))\r\n                if not blobs_in_directory:\r\n                    block_blob_service.delete_blob(container, blob.name&#x5B;:-1], snapshot=None)       \r\n    except Exception as e:\r\n        print(e)\r\n \r\ndef delete_sub_blobs(container, prefix):\r\n    try:\r\n        blobs = block_blob_service.list_blobs(container, prefix=prefix)\r\n        for blob in blobs:\r\n            block_blob_service.delete_blob(container, blob.name, snapshot=None)\r\n    except Exception as e:\r\n        print(e)\r\n \r\nblock_blob_service = b.BlockBlobService(\r\n    account_name=account_name, account_key=account_key\r\n)\r\ncontainers = block_blob_service.list_containers()\r\nfor c in containers:\r\n        delete_blobs(c.name)\r\n        delete_directories(c.name)',\r\n@input_data_1 = N'   ;',\r\n@params = N' @account_name nvarchar (100), @account_key nvarchar (MAX)',\r\n@account_name = @az_account_name,\r\n@account_key = @az_account_key;\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">Using sp_execute_external_script system stored procedure with @language parameter set to &#8216;Python&#8217;, we can execute Python scripts and run workflows previously requiring additional service or tooling outside of in-database execution. The stored procedure also takes arguments, in this case these are account_name and account_key to authenticate to Azure Storage Account before additional logic is executed. The script simply deletes blobs (ending in .parquet extension and related directories), making space for new files &#8211; this will be our first task in a series of activities building up to a larger workflow as per below.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Next, we will loop over tables in a nominated schema and upload WWI data into Azure Blob Storage as a series of Parquet files. Polybase does not play well with certain data types so columns with &#8216;geography&#8217;, &#8216;geometry&#8217;, &#8216;hierarchyid&#8217;, &#8216;image&#8217;, &#8216;text&#8217;, &#8216;nText&#8217;, &#8216;xml&#8217; will be excluded from this process (see script below). I will use table_name as a folder name in Azure blob storage so that every database object has a dedicated directory for its files. Note that the External Data Source and File Format need to be defined in advance (as per Polybase config script above).<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nSET NOCOUNT ON;\r\nDECLARE @az_account_name VARCHAR(128) = 'demostorageaccount';\r\nDECLARE @az_account_key VARCHAR(1024) = 'Your_Storage_Account_Key';\r\nDECLARE @external_data_source VARCHAR(128) = 'azblob';\r\nDECLARE @external_file_format VARCHAR(128) = 'ParquetFileFormat';\r\nDECLARE @local_database_name VARCHAR(128) = 'WideWorldImporters';\r\nDECLARE @local_schema_name VARCHAR(128) = 'sales';\r\n \r\nDECLARE @Error_Message NVARCHAR(MAX);\r\nDECLARE @Is_Debug_Mode BIT = 1;\r\n \r\n-- Run validation steps\r\nIF @Is_Debug_Mode = 1\r\nBEGIN\r\n    RAISERROR('Running validation steps...', 10, 1) WITH NOWAIT;\r\nEND;\r\nDECLARE @Is_PolyBase_Installed SQL_VARIANT =\r\n        (\r\n            SELECT SERVERPROPERTY('IsPolyBaseInstalled') AS IsPolyBaseInstalled\r\n        );\r\nIF @Is_PolyBase_Installed &lt;&gt; 1\r\nBEGIN\r\n    SET @Error_Message = N'PolyBase is not installed on ' + @@SERVERNAME + N' SQL Server instance. Bailing out!';\r\n    RAISERROR(@Error_Message, 16, 1);\r\n    RETURN;\r\nEND;\r\n \r\nIF NOT EXISTS\r\n(\r\n    SELECT *\r\n    FROM sys.external_data_sources\r\n    WHERE name = @external_data_source\r\n)\r\nBEGIN\r\n    SET @Error_Message\r\n        = N'' + @external_data_source + N' external data source has not been registered on ' + @@SERVERNAME\r\n          + N' SQL Server instance. Bailing out!';\r\n    RAISERROR(@Error_Message, 16, 1);\r\n    RETURN;\r\nEND;\r\n \r\n \r\nIF NOT EXISTS\r\n(\r\n    SELECT *\r\n    FROM sys.external_file_formats\r\n    WHERE name = @external_file_format\r\n)\r\nBEGIN\r\n    SET @Error_Message\r\n        = N'' + @external_file_format + N' file format has not been registered on ' + @@SERVERNAME\r\n          + N' SQL Server instance. Bailing out!';\r\n    RAISERROR(@Error_Message, 16, 1);\r\n    RETURN;\r\nEND;\r\n \r\n \r\nDROP TABLE IF EXISTS ##db_objects_metadata;\r\nCREATE TABLE ##db_objects_metadata\r\n(\r\n    Id INT IDENTITY(1, 1) NOT NULL,\r\n    Local_Column_Name VARCHAR(256) NOT NULL,\r\n    Local_Column_Data_Type VARCHAR(128) NOT NULL,\r\n    Local_Object_Name VARCHAR(512) NOT NULL,\r\n    Local_Schema_Name VARCHAR(128) NOT NULL,\r\n    Local_DB_Name VARCHAR(256) NOT NULL\r\n);\r\n \r\nDROP TABLE IF EXISTS ##db_objects_record_counts;\r\nCREATE TABLE ##db_objects_record_counts -- this table will be used for record count comparison in subsequent script\r\n(\r\n    Id INT IDENTITY(1, 1) NOT NULL,\r\n    Local_Object_Name VARCHAR(512) NOT NULL,\r\n    Record_Count BIGINT NULL\r\n);\r\n \r\n \r\nDECLARE @SQL NVARCHAR(MAX);\r\nSET @SQL\r\n    = N'INSERT INTO ##db_objects_metadata\r\n    (Local_Column_Name, Local_Column_Data_Type, Local_Object_Name,\r\n    Local_Schema_Name,\r\n    Local_DB_Name)\r\n    SELECT column_name, data_type, table_name, table_schema, table_catalog\r\n    FROM ' + @local_database_name + N'.INFORMATION_SCHEMA.COLUMNS\r\n    WHERE table_schema = ''' + @local_schema_name\r\n      + N'''\r\n    GROUP BY\r\n    column_name, data_type,\r\n    table_name,\r\n    table_schema,\r\n    table_catalog';\r\nEXEC (@SQL);\r\n \r\nIF @Is_Debug_Mode = 1\r\nBEGIN\r\n    RAISERROR('Uploading database tables content as parquet files into Azure...', 10, 1) WITH NOWAIT;\r\nEND;\r\nDECLARE @Table_Name NVARCHAR(512);\r\nDECLARE @Schema_Name NVARCHAR(512);\r\nDECLARE @Col_Names NVARCHAR(512);\r\n \r\nIF CURSOR_STATUS('global', 'cur_db_objects') &gt;= 1\r\nBEGIN\r\n    DEALLOCATE cur_db_objects;\r\nEND;\r\n \r\nDECLARE cur_db_objects CURSOR FORWARD_ONLY FOR\r\nSELECT Local_Object_Name,\r\n       Local_Schema_Name,\r\n       STRING_AGG(Local_Column_Name, ',') AS col_names\r\nFROM ##db_objects_metadata\r\nWHERE Local_Column_Data_Type NOT IN ( 'geography', 'geometry', 'hierarchyid', 'image', 'text', 'nText', 'xml' ) -- exclude data types not compatible with PolyBase external tables\r\nGROUP BY Local_Object_Name,\r\n         Local_Schema_Name;\r\n \r\nOPEN cur_db_objects;\r\nFETCH NEXT FROM cur_db_objects\r\nINTO @Table_Name,\r\n     @Schema_Name,\r\n     @Col_Names;\r\n \r\nWHILE @@FETCH_STATUS = 0\r\nBEGIN\r\n    SET @SQL = N'IF OBJECT_ID(''' + @Table_Name + N''', ''U'') IS NOT NULL ';\r\n    SET @SQL = @SQL + N'BEGIN DROP EXTERNAL TABLE ' + @Table_Name + N' END; ';\r\n    SET @SQL = @SQL + N'CREATE EXTERNAL TABLE ' + @Table_Name + N' ';\r\n    SET @SQL = @SQL + N'WITH(';\r\n    SET @SQL = @SQL + N'LOCATION  = ''' + CONCAT(@Table_Name, '\/') + N''',';\r\n    SET @SQL = @SQL + N'DATA_SOURCE = ' + @external_data_source + N', ';\r\n    SET @SQL = @SQL + N'FILE_FORMAT = ' + @external_file_format + N') ';\r\n    SET @SQL = @SQL + N'AS SELECT ' + @Col_Names + N' ';\r\n    SET @SQL = @SQL + N'FROM &#x5B;' + @Schema_Name + N'].&#x5B;' + @Table_Name + N'];';\r\n    IF @Is_Debug_Mode = 1\r\n    BEGIN\r\n        SET @Error_Message = N'  --&gt; Processing ' + @Table_Name + N' table...';\r\n        RAISERROR(@Error_Message, 10, 1) WITH NOWAIT;\r\n    END;\r\n    EXEC (@SQL);\r\n \r\n    SET @SQL = N'INSERT INTO ##db_objects_record_counts (Local_Object_Name, Record_Count) ';\r\n    SET @SQL\r\n        = @SQL + N'SELECT ''' + @Table_Name + N''', (SELECT COUNT(1) FROM ' + @Schema_Name + N'.' + @Table_Name + N') ';\r\n    EXEC (@SQL);\r\n \r\n    FETCH NEXT FROM cur_db_objects\r\n    INTO @Table_Name,\r\n         @Schema_Name,\r\n         @Col_Names;\r\nEND;\r\nCLOSE cur_db_objects;\r\nDEALLOCATE cur_db_objects;\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">This should create all the required External Tables on the SQL Server instance as well as parquet files in the nominated Azure Storage location (click on image to enlarge).<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_MSSQL_to_Azure_Blob_Parquet_Files.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-4948\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_MSSQL_to_Azure_Blob_Parquet_Files.png\" alt=\"\" width=\"580\" height=\"312\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_MSSQL_to_Azure_Blob_Parquet_Files.png 1684w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_MSSQL_to_Azure_Blob_Parquet_Files-300x161.png 300w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_MSSQL_to_Azure_Blob_Parquet_Files-1024x550.png 1024w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_MSSQL_to_Azure_Blob_Parquet_Files-768x413.png 768w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_MSSQL_to_Azure_Blob_Parquet_Files-1536x825.png 1536w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Running CETAS export for a single table, we can also see the execution plan used with he PUT operator, highlighting RESTful data egress capability.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_CETAS_TSQL.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-4949\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_CETAS_TSQL.png\" alt=\"\" width=\"580\" height=\"295\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_CETAS_TSQL.png 839w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_CETAS_TSQL-300x153.png 300w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_CETAS_TSQL-768x391.png 768w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Finally, we can run a data validation test to ensure the record counts across Azure and MSSQL are a match. My initial approach was to take advantage of DuckDB native parquet integration and simply pip-install DuckDB and do a row count for each parquet file. However, as well as this worked as an isolated script, it did not play well with MSSQL Python integration due to SQL Server constraints around accessing and creating (temporary) file system data. As DuckDB requires transient storage workspace to serialize data and access to it is restricted, the implementation worked well in a terminal but would not execute as part of SQL workload.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_DuckDB_Import_Failure_Message.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-4951\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_DuckDB_Import_Failure_Message.png\" alt=\"\" width=\"580\" height=\"174\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_DuckDB_Import_Failure_Message.png 1247w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_DuckDB_Import_Failure_Message-300x90.png 300w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_DuckDB_Import_Failure_Message-1024x307.png 1024w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_DuckDB_Import_Failure_Message-768x230.png 768w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Instead, using pandas and pyarrow, seemed like a good workaround and the following script can be used to match row counts across Azure parquet files (converted to Pandas data frames) and database tables.<\/p>\n<pre class=\"brush: sql; title: ; notranslate\" title=\"\">\r\nDECLARE @az_account_name VARCHAR(512) = 'demostorageaccount';\r\nDECLARE @az_account_key VARCHAR(1024) = 'Your_Storage_Account_Key';\r\n\r\nEXECUTE sp_execute_external_script @language = N'Python',\r\n                                   @script = N'\r\nimport azure.storage.blob as b\r\nimport pyarrow.parquet as pq\r\nimport pandas as pd\r\nimport io\r\n \r\naccount_name = account_name\r\naccount_key = account_key\r\n \r\naz_record_counts = {}\r\n \r\ndef compare_record_counts(df1, df2):\r\n    merged_df = pd.merge(df1, df2, on=''Object_Name'', suffixes=(''_df1'', ''_df2''), how=''outer'', indicator=True)\r\n    differences = merged_df&#x5B;merged_df&#x5B;''Record_Count_df1''] != merged_df&#x5B;''Record_Count_df2'']]\r\n    if differences.empty:\r\n        print(&quot;Success! All record counts match.&quot;)\r\n    else:\r\n        print(&quot;Record count mismatches found. Please troubleshoot:&quot;)\r\n        print(differences&#x5B;&#x5B;''Object_Name'', ''Record_Count_df1'', ''Record_Count_df2'']])\r\n    return differences\r\n         \r\ndef collect_az_files_record_counts(container):\r\n    try:\r\n        blobs = block_blob_service.list_blobs(container)\r\n        for blob in blobs:\r\n            if blob.name.endswith(&quot;.parquet&quot;):\r\n                blob_data = block_blob_service.get_blob_to_bytes(container, blob.name)\r\n                data = blob_data.content\r\n                 \r\n                with io.BytesIO(data) as file:\r\n                    parquet_file = pq.ParquetFile(file)\r\n                    row_counts = parquet_file.metadata.num_rows\r\n                    az_record_counts.update({blob.name: row_counts})\r\n    except Exception as e:\r\n        print(e)\r\n \r\nblock_blob_service = b.BlockBlobService(account_name=account_name, account_key=account_key)\r\ncontainers = block_blob_service.list_containers()\r\n \r\nfor container in containers:\r\n    collect_az_files_record_counts(container.name)\r\n \r\ndirectory_sums = {}\r\n \r\nfor filepath, record_count in az_record_counts.items():\r\n    directory = filepath.split(&quot;\/&quot;)&#x5B;0]\r\n    if directory in directory_sums:\r\n        directory_sums&#x5B;directory] += record_count\r\n    else:\r\n        directory_sums&#x5B;directory] = record_count\r\n \r\ntarget_df = pd.DataFrame(\r\n    &#x5B;(directory, int(record_count)) for directory, record_count in directory_sums.items()],\r\n    columns=&#x5B;&quot;Object_Name&quot;, &quot;Record_Count&quot;])\r\nsource_df = source_record_counts\r\ndf = compare_record_counts(source_df, target_df);\r\nOutputDataSet = df&#x5B;&#x5B;''Object_Name'', ''Record_Count_df1'', ''Record_Count_df2'']];',\r\n@input_data_1 = N'SELECT Local_Object_Name as Object_Name, CAST(Record_Count AS INT) AS Record_Count FROM ##db_objects_record_counts WHERE Record_Count &lt;&gt; 0',\r\n@input_data_1_name = N'source_record_counts',\r\n@params = N' @account_name nvarchar (100), @account_key nvarchar (MAX)',\r\n@account_name = @az_account_name,\r\n@account_key = @az_account_key\r\nWITH RESULT SETS\r\n(\r\n    (\r\n        &#x5B;object_name] VARCHAR(256) NOT NULL,\r\n        Source_Record_Count INT,\r\n        Target_Record_Count INT\r\n    )\r\n);\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">When completed, the following output should be displayed on the screen.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_Record_Count_Check_AllOK.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-4953\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_Record_Count_Check_AllOK.png\" alt=\"\" width=\"580\" height=\"156\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_Record_Count_Check_AllOK.png 580w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_Record_Count_Check_AllOK-300x81.png 300w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">On the other hand, if record count does not match e.g. we deleted one record in the source table, the process should correctly recognized data discrepancies and alert end user\/administrator, as per below:<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_Record_Count_Mismatch_Error_Message.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-4954\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_Record_Count_Mismatch_Error_Message.png\" alt=\"\" width=\"580\" height=\"178\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_Record_Count_Mismatch_Error_Message.png 580w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_Record_Count_Mismatch_Error_Message-300x92.png 300w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Putting all these pieces together into a single, all-encompassing stored procedure allows us to manage Azure storage, perform data extraction and execute data quality checks from a single code base. A copy of the stored procedure can be found in my OneDrive folder <a href=\"https:\/\/1drv.ms\/f\/s!AuEyKKgH71pxibpp5BGlFIsBgjmqIg?e=nXnsmm\" target=\"_blank\" rel=\"noopener\">HERE<\/a>.<\/p>\n<h3 style=\"text-align: center;\">Anecdotal Performance Test<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">Looking at Polybase performance for one specific database object, I multiplied Orders table record count (original count was 73,595) by a factor of 10, 50, 100, 150 and 200 into a new table called OrderExpanded and run the same process to ascertain the following:<\/p>\n<ul>\n<li style=\"text-align: justify;\">Data compression ratio for both: in-database data volume\/size as well as flat file (Parquet format) columnar storage in Azure Blob. For in-database data size, two scenarios were explored: no compression and columnstore compression applied to the Orders object.<\/li>\n<li style=\"text-align: justify;\">CPU and elapsed processing times.<\/li>\n<\/ul>\n<p class=\"Standard\" style=\"text-align: justify;\">These tests were run in a small Virtual Machine with 8 cores (Intel Core i5-10500T CPU, running at 2.30GHz) allocated to the 2022 Developer Edition instance, 32GB 2667MHz RAM and a single Samsung SSD QVO 1TB volume on a 100\/20 Mbps WAN network. The following performance characteristics were observed when running Polybase ETL workloads across columnstore-compressed and uncompressed data across different data sizes.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_Data_Extract_Performance_Comparison.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-4959\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_Data_Extract_Performance_Comparison.png\" alt=\"\" width=\"672\" height=\"668\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_Data_Extract_Performance_Comparison.png 672w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_Data_Extract_Performance_Comparison-300x298.png 300w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_Data_Extract_Performance_Comparison-150x150.png 150w\" sizes=\"auto, (max-width: 672px) 100vw, 672px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\"><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_Data_Compression_Gains_Comparison.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-4958\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2024\/07\/Polybase_Data_Compression_Gains_Comparison.png\" alt=\"\" width=\"671\" height=\"667\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_Data_Compression_Gains_Comparison.png 671w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_Data_Compression_Gains_Comparison-300x298.png 300w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2024\/07\/Polybase_Data_Compression_Gains_Comparison-150x150.png 150w\" sizes=\"auto, (max-width: 671px) 100vw, 671px\" \/><\/a><br \/>\nData egress performance appeared to be linear, corresponding to the volume of data across the ranges tested. There was a noticeable improvement to tables with a columnstore index applied to it, not only in terms of data size on disk, but also upload speeds. This is in spite of the fact columnstore compression typically results in a higher CPU utilization during the read phase. Another interesting (but expected) finding is that data compression gains resulting from storing data in Parquet file format (in Azure Blob Storage) as well as in a columnstore-compressed table (in-database) were significant, sometimes resulting in more than 7x improvement (reduction) in data compression rate and data volume\/size. The compression rate achieved, and the subsequent data egress performance improvements significantly outweigh decompression compute overhead (higher CPU utilization). As with any approach, these will need to be analyzed on their own merits and in the context of a particular architecture and use case (will not apply uniformly across all workloads), however, providing CPU resources have not been identified as a potential choke-point, applying columnstore compression to database objects provides a good tradeoff between higher resources utilization and performance gains.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Another benefit of using Polybase for data extraction is that it natively supports Parquet and Delta table format. There&#8217;s no need for 3rd party libraries in order to serialize SQL Server data into columnar storage format with metadata appended to it &#8211; Polybase can do it out-of-the-box.<\/p>\n<h3 style=\"text-align: center;\">Conclusion<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">Until now, I never had a chance or a good reason to play around with Polybase and I&#8217;ve always assumed that rolling out a dedicated tool for batch data extraction workloads is a better option. Likewise, I never thought that beyond Machine Learning applications, in-database Python code running side-by-side T-SQL is something I&#8217;d use outside of a Jupyter notebook, POC-like scenario. However, I was presently surprised how easy it was to blend these two technologies together and even though improvements could be made to both e.g. it would be nice to see Polybase native integration with other popular database engines or Python and T-SQL coalescing into other areas of data management, it was surprisingly straightforward to stand up a simple solution like this. With other RDBMS vendors betting on Python becoming a first class citizen on their platforms e.g. Snowflake, and lines between multi-cloud, single-cloud and hybrid blurring even more, Microsoft should double-down on these paradigms and keep innovating (Fabric does not work for everyone and everything!). I guess we will just have to cross our fingers and toes and wait for the 2025 version to arrive!<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Introduction One of the projects I assisted with recently dictated that Parquet files staged in Azure Data Lake were to be consumed using a traditional ELT\\ETL architecture i.e. using Databricks, Data Factory or a similar tool and loaded into SQL Server tables. However, given the heighten data sensitivity and security requirements, using additional vendors or [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[61,56,5],"tags":[65,62,95,41,19,94],"class_list":["post-4937","post","type-post","status-publish","format-standard","hentry","category-cloud-computing","category-programming","category-sql","tag-azure","tag-cloud-computing","tag-polybase","tag-python","tag-sql-server","tag-t-sql"],"aioseo_notices":[],"jetpack_featured_media_url":"","_links":{"self":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/4937","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/comments?post=4937"}],"version-history":[{"count":36,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/4937\/revisions"}],"predecessor-version":[{"id":4983,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/4937\/revisions\/4983"}],"wp:attachment":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/media?parent=4937"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/categories?post=4937"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/tags?post=4937"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}