{"id":3572,"date":"2018-10-02T08:10:13","date_gmt":"2018-10-02T08:10:13","guid":{"rendered":"http:\/\/bicortex.com\/?p=3572"},"modified":"2019-12-20T11:15:00","modified_gmt":"2019-12-20T01:15:00","slug":"data-acquisition-framework-using-custom-python-wrapper-for-concurrent-bcp-utility-execution","status":"publish","type":"post","link":"http:\/\/bicortex.com\/bicortex\/data-acquisition-framework-using-custom-python-wrapper-for-concurrent-bcp-utility-execution\/","title":{"rendered":"Data Acquisition Framework Using Custom Python Wrapper For Concurrent BCP Utility Execution"},"content":{"rendered":"<p class=\"Standard\" style=\"text-align: justify;\">Although building a data acquisition framework for a data warehouse isn\u2019t nearly or as interesting as doing analytics or data mining on the already well-structured data, it\u2019s an essential part of any data warehousing project. I have already written extensively on how one could build a robust data acquisition framework in <a href=\"http:\/\/bicortex.com\/designing-data-acquisition-framework-in-sql-server-and-ssis-how-to-source-and-integrate-external-data-for-a-decision-support-system-or-data-warehouse-part-1\/\" target=\"_blank\" rel=\"noopener noreferrer\">THIS<\/a> series of blog posts, however, given the rapidly growing ecosystem of data sources and data stores, more and more projects tend to gravitate to the more application-agnostic storage formats e.g. text files or column-oriented files e.g. parquet. As such, direct data store connection may not be a possibility or the most optimal way of exporting data, making flat file export\/import option a good alternative.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Microsoft offers a few different tools to facilitate flat files imports and exports, the most commonly used being bcp, BULK INSERT and their own ETL tool, capable of connecting to and reading flat files \u2013 SSIS. In this demo, I will analyse the difference in performance for data export and import between using pure T-SQL (executed as a series of concurrently running SQL Agent jobs) and parallelised bcp operations (using custom Python wrapper). Below image depicts high-level architecture of the solution with a short video clip demonstrating the actual code execution on a sample dataset at the end of this post. Let\u2019s compare the pair and see if any of them is better at moving data across the wire.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2018\/10\/BCP_Data_Acquisitions_High_Level_Architecture_Diagram.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-3626\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2018\/10\/BCP_Data_Acquisitions_High_Level_Architecture_Diagram.png\" alt=\"\" width=\"580\" height=\"388\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/10\/BCP_Data_Acquisitions_High_Level_Architecture_Diagram.png 798w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/10\/BCP_Data_Acquisitions_High_Level_Architecture_Diagram-300x201.png 300w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/10\/BCP_Data_Acquisitions_High_Level_Architecture_Diagram-768x514.png 768w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<h3 style=\"text-align: center;\">Database and Schema Setup<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">Most of data warehouses come with some form of metadata layer so for this exercise I have \u2018scrapped\u2019 the source metadata e.g. attributes such as object names, data types, row counts etc. into a Sqlite local database. I also copied over partial target database metadata (with a bit more structure imposed) from one of the systems I\u2019m currently working with. This resulted in creating the following schema of tables containing key information on source and target objects, their attributes, their state etc.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2018\/09\/BCP_Data_Acquisitions_MetadataDB_Schema_ERD.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-3584\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2018\/09\/BCP_Data_Acquisitions_MetadataDB_Schema_ERD.png\" alt=\"\" width=\"580\" height=\"427\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/09\/BCP_Data_Acquisitions_MetadataDB_Schema_ERD.png 756w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/09\/BCP_Data_Acquisitions_MetadataDB_Schema_ERD-300x221.png 300w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Target metadata is contained to three tables which allow for a three-layer, hierarchical approach to data acquisition:<\/p>\n<ul>\n<li class=\"Standard\" style=\"text-align: justify;\">Ctrl_DataAcquisition_ObjectLevelExceptions table stores details of database objects with some other peripheral information. The grain of this table is set to the object (table) level which then allows for an easy object tagging and acquisitions inclusion\/exclusion.<\/li>\n<li class=\"Standard\" style=\"text-align: justify;\">Ctrl_DataAcquisition_AttributeLevelExceptions table stores attributes level data. It references \u2018objects\u2019 table and determines which fields out of the referenced object are to be included\/excluded.<\/li>\n<li class=\"Standard\" style=\"text-align: justify;\">Ctrl_DataAcquisition_DataLevelExceptions table stores any statements which need to be executed in order to remove\/retain specific values.<\/li>\n<\/ul>\n<p class=\"Standard\" style=\"text-align: justify;\">As a result, thanks to this setup, we now have the option to dictate which objects are to be acquired, which fields from those objects are to be retained\/omitted and finally which values out of these attributes are to be persisted\/hashed\/obfuscated\/removed etc. Source-level metadata (in this scenario) was diluted to a single table containing some of the key attributes and data to fulfil the requirements for this post.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Creating the metadata repository, depending on the system\u2019s complexity and level of detail required, can be quite challenging in itself so I will leave out the particulars out of this post, however, it\u2019s important to note that this mini-framework would not be possible without storing and comparing both systems\u2019 metadata i.e. source database and target database.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">This post also scratches the surface of actual metadata layer implementation details but suffice to say that any well designed and architected framework should lean heavily on metadata. For this post, mainly to preserve the brevity and the focus on the problem analysed, I have obfuscated the details of target metadata creation and diluted source metadata \u2018scrapping\u2019 to bare minimum.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">The key information sourced, aside from which tables and columns to handle, is essential to providing a more flexible approach to data extraction for the following reasons:<\/p>\n<ul>\n<li class=\"Standard\" style=\"text-align: justify;\">Primary key flag \u2013 identifies a specific attribute as either a primary key or not a primary key in order to split a large table into multiple \u2018chunks\u2019. As most primary keys are of data type INTEGER (or its derivative e.g. SMALLINT, TINYINT, BIGINT) splitting larger tables into smaller streams of data and files allows for better resources utilisation.<\/li>\n<li class=\"Standard\" style=\"text-align: justify;\">Column data type \u2013 useful in checking whether arithmetic can be performed on the given table\u2019s primary key i.e. if it\u2019s not an INTEGER (or similar) than subdividing into smaller partitions will not be an option.<\/li>\n<li class=\"Standard\" style=\"text-align: justify;\">Rows count \u2013 this data is required I order to partition larger tables into smaller shards based on the total record count.<\/li>\n<li class=\"Standard\" style=\"text-align: justify;\">Minimum record count \u2013 this value is essential for partitioning larger tables which do not have their seeding values starting from one.<\/li>\n<li class=\"Standard\" style=\"text-align: justify;\">Maximum record count \u2013 same as above, this piece of data is required to ascertain the ceiling value for further data \u2018chunking\u2019.<\/li>\n<\/ul>\n<p class=\"Standard\" style=\"text-align: justify;\">The following Python script was used to create and populate MetadataDB.db database.<\/p>\n<pre class=\"brush: python; title: ; notranslate\" title=\"\">\r\nimport configparser\r\nimport sqlite3\r\nimport os\r\nimport sys\r\nimport pyodbc\r\n\r\n\r\nconfig = configparser.ConfigParser()\r\nconfig.read(&quot;params.cfg&quot;)\r\n\r\ndbsqlite_location = config.get(&quot;Sqlite&quot;, os.path.normpath(&quot;dbsqlite_location&quot;))\r\ndbsqlite_fileName = config.get(&quot;Sqlite&quot;, &quot;dbsqlite_fileName&quot;)\r\ndbsqlite_sql = config.get(&quot;Sqlite&quot;, &quot;dbsqlite_sql&quot;)\r\ndbsqlite_import_tables = config.get(&quot;Tables_To_Import&quot;, &quot;local_mssql_meta_tables&quot;)\r\ndbsqlite_import_tables = dbsqlite_import_tables.split(&quot;,&quot;)\r\ndbsqlite_excluded_attribs = config.get(&quot;Tables_To_Import&quot;, &quot;local_mssql_meta_attributes_to_exclude&quot;)\r\ndbsqlite_excluded_attribs = dbsqlite_excluded_attribs.split(&quot;,&quot;)\r\ndbsqlite_import_tables_remote = config.get(&quot;Tables_To_Import&quot;, &quot;remote_meta_tables&quot;)\r\n\r\n\r\ndef get_sql(dbsqlite_location, dbsqlite_fileName, dbsqlite_sql):\r\n    &quot;&quot;&quot;\r\n    Source operation types from the SQL file - denoted by the use of four dash characteres\r\n    and store them in a dictionary (referenced later)\r\n    &quot;&quot;&quot;\r\n    if os.path.isfile(dbsqlite_fileName):\r\n        os.remove(dbsqlite_fileName)\r\n    operations = &#x5B;]\r\n    commands = &#x5B;]\r\n\r\n    with open(dbsqlite_sql, &quot;r&quot;) as f:\r\n        for i in f:\r\n            if i.startswith(&quot;----&quot;):\r\n                i = i.replace(&quot;----&quot;, &quot;&quot;)\r\n                operations.append(i.rstrip(&quot;\\n&quot;))\r\n\r\n    tempCommands = &#x5B;]\r\n    with open(dbsqlite_sql, &quot;r&quot;) as f:\r\n        for i in f:\r\n            tempCommands.append(i)\r\n        l = &#x5B;i for i, s in enumerate(tempCommands) if &quot;----&quot; in s]\r\n        l.append((len(tempCommands)))\r\n        for first, second in zip(l, l&#x5B;1:]):\r\n            commands.append(&quot;&quot;.join(tempCommands&#x5B;first:second]))\r\n    sql = dict(zip(operations, commands))\r\n    return sql\r\n\r\n\r\ndef build_db(dbsqlite_location, dbsqlite_fileName, sql):\r\n    &quot;&quot;&quot;\r\n    Build the database\r\n    &quot;&quot;&quot;\r\n    print(&quot;Building TableauEX.db database schema... &quot;)\r\n    with sqlite3.connect(\r\n        os.path.join(dbsqlite_location, dbsqlite_fileName)\r\n    ) as sqlLiteConn:\r\n        sqlCommands = sql.get(&quot;SQL 1: Build_Meta_DB&quot;).split(&quot;;&quot;)\r\n        for c in sqlCommands:\r\n            try:\r\n                sqlLiteConn.execute(c)\r\n            except Exception as e:\r\n                print(e)\r\n                sqlLiteConn.rollback()\r\n                sys.exit(1)\r\n            else:\r\n                sqlLiteConn.commit()\r\n\r\n\r\ndef populate_db_from_mssql_meta(\r\n    dbsqlite_import_tables, dbsqlite_excluded_attribs, sql, **options\r\n):\r\n    &quot;&quot;&quot;\r\n    Source local instance MSSQL metadata \r\n    and populate Sqlite tables with the data. \r\n    &quot;&quot;&quot;\r\n    mssql_args = {\r\n        &quot;Driver&quot;: options.get(&quot;Driver&quot;, &quot;ODBC Driver 13 for SQL Server&quot;),\r\n        &quot;Server&quot;: options.get(&quot;Server&quot;, &quot;ServerName\\\\InstanceName&quot;),\r\n        &quot;Database&quot;: options.get(&quot;Database&quot;, &quot;Metadata_DB_Name&quot;),\r\n        &quot;Trusted_Connection&quot;: options.get(&quot;Trusted_Connection&quot;, &quot;yes&quot;),\r\n    }\r\n\r\n    cnxn = pyodbc.connect(**mssql_args)\r\n    if cnxn:\r\n        with sqlite3.connect(\r\n            os.path.join(dbsqlite_location, dbsqlite_fileName)\r\n        ) as sqlLiteConn:\r\n            mssql_cursor = cnxn.cursor()\r\n            for tbl in dbsqlite_import_tables:\r\n                try:\r\n                    tbl_cols = {}\r\n                    mssql_cursor.execute(\r\n                        &quot;&quot;&quot;SELECT c.ORDINAL_POSITION, c.COLUMN_NAME \r\n                           FROM Metadata_DB_Name.INFORMATION_SCHEMA.COLUMNS c \r\n                           WHERE c.TABLE_SCHEMA = 'dbo' \r\n                           AND c.TABLE_NAME = '{}' \r\n                           AND c.COLUMN_NAME NOT IN ({})\r\n                           ORDER BY 1 ASC;&quot;&quot;&quot;.format(\r\n                            tbl,\r\n                            &quot;, &quot;.join(\r\n                                map(lambda x: &quot;'&quot; + x + &quot;'&quot;,\r\n                                    dbsqlite_excluded_attribs)\r\n                            ),\r\n                        )\r\n                    )\r\n                    rows = mssql_cursor.fetchall()\r\n                    for row in rows:\r\n                        tbl_cols.update({row&#x5B;0]: row&#x5B;1]})\r\n                    sortd = &#x5B;tbl_cols&#x5B;key] for key in sorted(tbl_cols.keys())]\r\n                    cols = &quot;,&quot;.join(sortd)\r\n                    mssql_cursor.execute(&quot;SELECT {} FROM {}&quot;.format(cols, tbl))\r\n                    rows = mssql_cursor.fetchall()\r\n                    num_columns = max(len(rows&#x5B;0]) for t in rows)\r\n                    mssql = &quot;INSERT INTO {} ({}) VALUES({})&quot;.format(\r\n                        tbl, cols, &quot;,&quot;.join(&quot;?&quot; * num_columns)\r\n                    )\r\n                    sqlLiteConn.executemany(mssql, rows)\r\n                except Exception as e:\r\n                    print(e)\r\n                    sys.exit(1)\r\n                else:\r\n                    sqlLiteConn.commit()\r\n\r\n\r\ndef populate_db_from_meta(dbsqlite_import_tables_remote, sql, **options):\r\n    &quot;&quot;&quot;\r\n    Source remote MSSQL server metadata \r\n    and populate nominated Sqlite table with the data. \r\n    &quot;&quot;&quot;\r\n    mssql_args = {\r\n        &quot;Driver&quot;: options.get(&quot;Driver&quot;, &quot;ODBC Driver 13 for SQL Server&quot;),\r\n        &quot;Server&quot;: options.get(&quot;Server&quot;, &quot;ServerName\\\\InstanceName&quot;),\r\n        &quot;Database&quot;: options.get(&quot;Database&quot;, &quot;Metadata_DB_Name&quot;),\r\n        &quot;Trusted_Connection&quot;: options.get(&quot;Trusted_Connection&quot;, &quot;yes&quot;),\r\n    }\r\n    with sqlite3.connect(\r\n        os.path.join(dbsqlite_location, dbsqlite_fileName)\r\n    ) as sqlLiteConn:\r\n\r\n        print(&quot;Fetching external SQL Server metadata... &quot;)\r\n        cnxn = pyodbc.connect(**mssql_args)\r\n        if cnxn:\r\n            mssql_cursor = cnxn.cursor()\r\n            sqlCommands = sql.get(&quot;SQL 2: Scrape_Source_DH2_Meta&quot;)\r\n            try:\r\n                mssql_cursor.execute(sqlCommands)\r\n                rows = mssql_cursor.fetchall()\r\n                num_columns = max(len(rows&#x5B;0]) for t in rows)\r\n                mssql = &quot;INSERT INTO {} (linked_server_name, \\\r\n                                            database_name, \\\r\n                                            schema_name, \\\r\n                                            table_name, \\\r\n                                            column_name, \\\r\n                                            column_data_type, \\\r\n                                            column_id, \\\r\n                                            pk_flag, \\\r\n                                            rows_count, \\\r\n                                            min_pk_value, \\\r\n                                            max_pk_value) \\\r\n                            VALUES({})&quot;.format(\r\n                    dbsqlite_import_tables_remote, &quot;,&quot;.join(&quot;?&quot; * num_columns)\r\n                )\r\n                sqlLiteConn.executemany(mssql, rows)\r\n            except Exception as e:\r\n                sys.exit(1)\r\n            else:\r\n                sqlLiteConn.commit()\r\n\r\n\r\ndef main():\r\n    sql = get_sql(dbsqlite_location, dbsqlite_fileName, dbsqlite_sql)\r\n    build_db(dbsqlite_location, dbsqlite_fileName, sql)\r\n    populate_db_from_mssql_meta(dbsqlite_import_tables, dbsqlite_excluded_attribs, sql)\r\n    populate_db_from_meta(dbsqlite_import_tables_remote, sql)\r\n\r\n\r\nif __name__ == '__main__':\r\n    main()\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">At runtime, this script also calls a SQL query which copies data from an already existing metadata database (on the target server) as well as queries source database for all of its necessary metadata. The full SQL file as well as all other scripts used in this post can be downloaded from my OneDriver folder <a href=\"https:\/\/1drv.ms\/f\/s!AuEyKKgH71pxg9lUPA_FsvYZfZaeOA\" target=\"_blank\" rel=\"noopener noreferrer\">HERE<\/a>. When populated with the source and target data, a SQL query is issued as part of the subsequent script execution which joins source and target metadata and &#8216;flattens&#8217; the output, comma-delimiting attribute values\u00a0(most column names blurred-out) into an array used to specify individual columns names as per the screenshot below (click on image to enlarge).<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\"><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2018\/10\/BCP_Data_Acquisitions_Query_Results.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-3601\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2018\/10\/BCP_Data_Acquisitions_Query_Results.png\" alt=\"\" width=\"580\" height=\"491\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/10\/BCP_Data_Acquisitions_Query_Results.png 1519w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/10\/BCP_Data_Acquisitions_Query_Results-300x254.png 300w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/10\/BCP_Data_Acquisitions_Query_Results-768x650.png 768w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/10\/BCP_Data_Acquisitions_Query_Results-1024x866.png 1024w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<h3 style=\"text-align: center;\">Python Wrapper For Concurrent BCP Utility Execution<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">Now that we have our source and target metadata defined we can start testing transfer times. In order to take advantage of all resources available, multiple bcp processes can be spawned using Python\u2019s multiprocessing library for both: extracting data out of the source environment as well as importing it into target database. The following slapdash Python script, comprised mainly of the large SQL statement which pulls source and target metadata together, can be used to invoke concurrent bcp sessions and further subdivide larger tables into smaller batches which are streamed independently of each other.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">The script, along with the database structure, can be further customised to include features such as allowing incremental, delta-only data transfer and error logging via either Python-specific functionality or bcp utility &#8216;-e err_file&#8217; switch usage with just a few lines of code.<\/p>\n<pre class=\"brush: python; title: ; notranslate\" title=\"\"> \r\nfrom os import path, system\r\nfrom multiprocessing import Pool, cpu_count\r\nimport argparse\r\nimport time\r\nimport pyodbc\r\nimport sqlite3\r\nimport configparser\r\nimport build_db\r\n\r\nodbcDriver = &quot;{ODBC Driver 13 for SQL Server}&quot;\r\n\r\nconfig = configparser.ConfigParser()\r\nconfig.read(&quot;params.cfg&quot;)\r\n\r\ndbsqlite_location = config.get(&quot;Sqlite&quot;, path.normpath(&quot;dbsqlite_location&quot;))\r\ndbsqlite_fileName = config.get(&quot;Sqlite&quot;, &quot;dbsqlite_fileName&quot;)\r\ndbsqlite_sql = config.get(&quot;Sqlite&quot;, &quot;dbsqlite_sql&quot;)\r\nexport_path = config.get(&quot;Files_Path&quot;, path.normpath(&quot;export_path&quot;))\r\nodbcDriver = &quot;{ODBC Driver 13 for SQL Server}&quot;\r\n\r\nparser = argparse.ArgumentParser(\r\n    description=&quot;SQL Server Data Loading\/Extraction BCP Wrapper by bicortex.com&quot;\r\n)\r\nparser.add_argument(\r\n    &quot;-SSVR&quot;, &quot;--source_server&quot;, help=&quot;Source Linked Server Name&quot;, required=True, type=str\r\n)\r\nparser.add_argument(\r\n    &quot;-SDB&quot;, &quot;--source_database&quot;, help=&quot;Source Database Name&quot;, required=True, type=str\r\n)\r\nparser.add_argument(\r\n    &quot;-SSCH&quot;, &quot;--source_schema&quot;, help=&quot;Source Schema Name&quot;, required=True, type=str\r\n)\r\nparser.add_argument(\r\n    &quot;-TSVR&quot;, &quot;--target_server&quot;, help=&quot;Target Server Name&quot;, required=True, type=str\r\n)\r\nparser.add_argument(\r\n    &quot;-TDB&quot;, &quot;--target_database&quot;, help=&quot;Target Database Name&quot;, required=True, type=str\r\n)\r\nparser.add_argument(\r\n    &quot;-TSCH&quot;, &quot;--target_schema&quot;, help=&quot;Target Schema Name&quot;, required=True, type=str\r\n)\r\nargs = parser.parse_args()\r\n\r\nif (\r\n    not args.source_server\r\n    or not args.source_database\r\n    or not args.source_schema\r\n    or not args.target_server\r\n    or not args.target_database\r\n    or not args.target_schema\r\n):\r\n    parser.print_help()\r\n    exit(1)\r\n\r\n\r\ndef get_db_data(linked_server_name, remote_db_name, remote_schema_name):\r\n    &quot;&quot;&quot;\r\n    Connect to Sqlite database and get all metadata required for\r\n    sourcing target data and subsequent files generation\r\n    &quot;&quot;&quot;\r\n    with sqlite3.connect(\r\n        path.join(dbsqlite_location, dbsqlite_fileName)\r\n    ) as sqlLiteConn:\r\n        sqliteCursor = sqlLiteConn.cursor()\r\n        sqliteCursor.execute(\r\n            &quot;&quot;&quot;\r\n        SELECT\r\n        Database_Name,\r\n        Schema_Name,\r\n        Table_Name,\r\n        GROUP_CONCAT(\r\n          Column_Name ) AS Column_Names,\r\n        Is_Big,\r\n        ETL_Batch_No,\r\n        Rows_Count,\r\n        Min_PK_Value,\r\n        Max_PK_Value,\r\n        GROUP_CONCAT(\r\n         CASE\r\n           WHEN PK_Column_Name != 'N\/A'\r\n                   THEN PK_Column_Name\r\n             END) AS PK_Column_Name\r\n        FROM (SELECT b.Remote_DB_Name AS Database_Name,\r\n             b.Remote_Schema_Name AS Schema_Name,\r\n             a.Table_Name,\r\n             a.Column_Name,\r\n             b.Is_Big,\r\n             b.ETL_Batch_No,\r\n             b.Rows_Count,\r\n             CASE\r\n               WHEN a.PK_Flag = 1\r\n                 THEN a.Column_Name\r\n               ELSE 'N\/A'\r\n             END AS PK_Column_Name,\r\n             a.Min_PK_Value,\r\n             a.Max_PK_Value\r\n                FROM Ctrl_DataAcquisition_SourceDB_MetaData a\r\n             JOIN Ctrl_DataAcquisition_ObjectLevelExceptions b\r\n             ON a.Table_Name = b.Remote_Object_Name\r\n             AND b.Remote_Schema_Name = '{schema}'\r\n             AND b.Remote_DB_Name = '{db}'\r\n             AND b.Remote_Server_Name = '{lsvr}'\r\n        WHERE b.Is_Active = 1\r\n        AND a.Column_Data_Type != 'timestamp'\r\n        AND a.Rows_Count &gt; 0\r\n        AND (Min_PK_Value != -1 OR Max_PK_Value != -1)\r\n        AND NOT EXISTS(\r\n                  SELECT 1\r\n                  FROM Ctrl_DataAcquisition_AttributeLevelExceptions o\r\n                  WHERE o.Is_Active = 1\r\n                    AND o.Local_Attribute_Name = a.Column_Name\r\n                    AND o.FK_ObjectID = b.ID\r\n          )\r\n        ORDER BY 1, 2, 3, a.Column_ID)\r\n        GROUP BY\r\n        Database_Name,\r\n        Schema_Name,\r\n        Table_Name,\r\n        Is_Big,\r\n        ETL_Batch_No,\r\n        Rows_Count,\r\n        Min_PK_Value\r\n        &quot;&quot;&quot;.format(\r\n                lsvr=linked_server_name, schema=remote_schema_name, db=remote_db_name\r\n            )\r\n        )\r\n        db_data = sqliteCursor.fetchall()\r\n        return db_data\r\n\r\n\r\ndef split_into_ranges(start, end, parts):\r\n    &quot;&quot;&quot;\r\n    Based on input parameters, split number of records into\r\n    consistent and record count-like shards of data and\r\n    return to the calling function\r\n    &quot;&quot;&quot;\r\n    ranges = &#x5B;]\r\n    x = round((end - start) \/ parts)\r\n    for _ in range(parts):\r\n        ranges.append(&#x5B;start, start + x])\r\n        start = start + x + 1\r\n        if end - start &lt;= x:\r\n            remainder = end - ranges&#x5B;-1]&#x5B;-1]\r\n            ranges.append(&#x5B;ranges&#x5B;-1]&#x5B;-1] + 1, ranges&#x5B;-1]&#x5B;-1] + remainder])\r\n            break\r\n    return ranges\r\n\r\n\r\ndef truncate_target_table(schema, table, **options):\r\n    &quot;&quot;&quot; \r\n    Truncate target table before data is loaded \r\n    &quot;&quot;&quot;\r\n    mssql_args = {&quot;Driver&quot;: options.get(&quot;Driver&quot;, &quot;ODBC Driver 13 for SQL Server&quot;),\r\n                  &quot;Server&quot;: options.get(&quot;Server&quot;, &quot;ServerName\\\\InstanceName&quot;),\r\n                  &quot;Database&quot;: options.get(&quot;Database&quot;, &quot;Staging_DB_Name&quot;),\r\n                  &quot;Trusted_Connection&quot;: options.get(&quot;Trusted_Connection&quot;, &quot;yes&quot;), }\r\n    cnxn = pyodbc.connect(**mssql_args)\r\n    if cnxn:\r\n        try:\r\n            mssql_cursor = cnxn.cursor()\r\n            sql_truncate = &quot;TRUNCATE TABLE {db}.{schema}.{tbl};&quot;.format(\r\n                db=mssql_args.get(&quot;Database&quot;), schema=schema, tbl=table)\r\n            print(&quot;Truncating {tbl} table...&quot;.format(tbl=table))\r\n            mssql_cursor.execute(sql_truncate)\r\n            cnxn.commit()\r\n            sql_select = &quot;SELECT TOP 1 1 FROM {db}.{schema}.{tbl};&quot;.format(\r\n                db=mssql_args.get(&quot;Database&quot;), schema=schema, tbl=table)\r\n            mssql_cursor.execute(sql_select)\r\n            rows = mssql_cursor.fetchone()\r\n            if rows:\r\n                raise Exception(\r\n                    &quot;Issue truncating target table...Please troubleshoot!&quot;)\r\n        except Exception as e:\r\n            print(e)\r\n        finally:\r\n            cnxn.close()\r\n\r\n\r\ndef export_import_data(target_server, target_database, target_schema, source_server, source_database, source_schema, table_name, columns, export_path, ):\r\n    &quot;&quot;&quot;\r\n    Export source data into a 'non-chunked' CSV file calling SQL Server\r\n    bcp utility and import staged CSV file into the target SQL Server instance.\r\n    This function also provides a verbose way to output runtime details of which\r\n    object is being exported\/imported and the time it took to process.\r\n    &quot;&quot;&quot;\r\n    full_export_path = path.join(export_path, table_name + &quot;.csv&quot;)\r\n    start = time.time()\r\n    print(&quot;Exporting '{tblname}' table into {file} file...&quot;.format(\r\n        tblname=table_name, file=table_name + &quot;.csv&quot;))\r\n    bcp_export = 'bcp &quot;SELECT * FROM OPENQUERY ({ssvr}, \\'SELECT {cols} FROM {sdb}.{schema}.{tbl}\\')&quot; queryout {path} -T -S {tsvr} -q -c -t &quot;|&quot; -r &quot;\\\\n&quot; 1&gt;NUL'.format(\r\n        ssvr=source_server,\r\n        sdb=source_database,\r\n        schema=source_schema,\r\n        cols=columns,\r\n        tbl=table_name,\r\n        path=full_export_path,\r\n        tsvr=target_server,\r\n    )\r\n\r\n    system(bcp_export)\r\n\r\n    end = time.time()\r\n    elapsed = round(end - start)\r\n    if path.isfile(full_export_path):\r\n        print(&quot;File '{file}' exported in {time} seconds&quot;.format(\r\n            file=table_name + &quot;.csv&quot;, time=elapsed\r\n        ))\r\n\r\n    start, end, elapsed = None, None, None\r\n\r\n    start = time.time()\r\n    print(\r\n        &quot;Importing '{file}' file into {tblname} table...&quot;.format(\r\n            tblname=table_name, file=table_name + &quot;.csv&quot;\r\n        )\r\n    )\r\n\r\n    bcp_import = 'bcp {schema}.{tbl} in {path} -S {tsvr} -d {tdb} -h &quot;TABLOCK&quot; -T -q -c -t &quot;|&quot; -r &quot;\\\\n&quot; 1&gt;NUL'.format(\r\n        schema=target_schema,\r\n        tbl=table_name,\r\n        path=full_export_path,\r\n        tsvr=target_server,\r\n        tdb=target_database,\r\n    )\r\n    system(bcp_import)\r\n\r\n    end = time.time()\r\n    elapsed = round(end - start)\r\n    print(&quot;File '{file}' imported in {time} seconds&quot;.format(\r\n        file=table_name + &quot;.csv&quot;, time=elapsed\r\n    ))\r\n\r\n\r\ndef export_import_chunked_data(\r\n    target_server,\r\n    target_database,\r\n    target_schema,\r\n    source_server,\r\n    source_database,\r\n    source_schema,\r\n    table_name,\r\n    columns,\r\n    export_path,\r\n    vals,\r\n    idx,\r\n    pk_column_name,\r\n):\r\n    &quot;&quot;&quot;\r\n    Export source data into a 'chunked' CSV file calling SQL Server bcp utility \r\n    and import staged CSV file into the target SQL Server instance.\r\n    This function also provides a verbose way to output runtime details\r\n    of which object is being exported\/imported and the time it took to process.\r\n    &quot;&quot;&quot;\r\n    full_export_path = path.join(export_path, table_name + str(idx) + &quot;.csv&quot;)\r\n    start = time.time()\r\n    print(\r\n        &quot;Exporting '{tblname}' table into {file} file ({pk}s between {minv} and {maxv})...&quot;.format(\r\n            tblname=table_name,\r\n            file=table_name + str(idx) + &quot;.csv&quot;,\r\n            pk=pk_column_name,\r\n            minv=str(int(vals&#x5B;0])),\r\n            maxv=str(int(vals&#x5B;1])),\r\n        )\r\n    )\r\n\r\n    bcp_export = 'bcp &quot;SELECT * FROM OPENQUERY ({ssvr}, \\'SELECT {cols} FROM {sdb}.{schema}.{tbl} WHERE {pk} BETWEEN {minv} AND {maxv}\\')&quot; queryout {path} -T -S {tsvr} -q -c -t &quot;|&quot; -r &quot;\\\\n&quot; 1&gt;NUL'.format(\r\n        ssvr=source_server,\r\n        sdb=source_database,\r\n        schema=source_schema,\r\n        cols=columns,\r\n        tbl=table_name,\r\n        path=full_export_path,\r\n        tsvr=target_server,\r\n        pk=pk_column_name,\r\n        minv=str(int(vals&#x5B;0])),\r\n        maxv=str(int(vals&#x5B;1])),\r\n    )\r\n\r\n    system(bcp_export)\r\n\r\n    end = time.time()\r\n    elapsed = round(end - start)\r\n    if path.isfile(full_export_path):\r\n        print(&quot;File '{file}' exported in {time} seconds&quot;.format(\r\n            file=table_name + str(idx) + &quot;.csv&quot;, time=elapsed\r\n        ))\r\n    start, end, elapsed = None, None, None\r\n\r\n    start = time.time()\r\n    print(\r\n        &quot;Importing '{file}' file into {tblname} table ({pk}s between {minv} and {maxv})...&quot;.format(\r\n            tblname=table_name,\r\n            file=table_name + str(idx) + &quot;.csv&quot;,\r\n            pk=pk_column_name,\r\n            minv=str(int(vals&#x5B;0])),\r\n            maxv=str(int(vals&#x5B;1])),\r\n        )\r\n    )\r\n\r\n    bcp_import = 'bcp {schema}.{tbl} in {path} -S {tsvr} -d {tdb} -T -q -c -t &quot;|&quot; -r &quot;\\\\n&quot; 1&gt;NUL'.format(\r\n        schema=target_schema,\r\n        tbl=table_name,\r\n        path=full_export_path,\r\n        tsvr=target_server,\r\n        tdb=target_database,\r\n    )\r\n\r\n    system(bcp_import)\r\n\r\n    end = time.time()\r\n    elapsed = round(end - start)\r\n    print(&quot;File '{file}' imported in {time} seconds&quot;.format(\r\n        file=table_name + str(idx) + &quot;.csv&quot;, time=elapsed\r\n    ))\r\n\r\n\r\ndef main():\r\n    &quot;&quot;&quot;\r\n    Call export\/import 'chunked' and 'non-chunked' data functions and\r\n    process each database object as per the metadata information \r\n    in a concurrent fashion \r\n    &quot;&quot;&quot;\r\n    build_db.main()\r\n    db_data = get_db_data(args.source_server,\r\n                          args.source_database, args.source_schema)\r\n    if db_data:\r\n        if path.exists(export_path):\r\n            try:\r\n                p = Pool(processes=cpu_count())\r\n                for row in db_data:\r\n                    table_name = row&#x5B;2]\r\n                    columns = row&#x5B;3]\r\n                    is_big = int(row&#x5B;4])\r\n                    etl_batch_no = int(row&#x5B;5])\r\n                    min_pk_value = int(row&#x5B;7])\r\n                    max_pk_value = int(row&#x5B;8])\r\n                    pk_column_name = row&#x5B;9]\r\n\r\n                    truncate_target_table(\r\n                        schema=args.target_schema, table=table_name)\r\n\r\n                    if is_big == 1:\r\n                        ranges = split_into_ranges(\r\n                            min_pk_value, max_pk_value, etl_batch_no\r\n                        )\r\n                        for idx, vals in enumerate(ranges):\r\n                            p.apply_async(\r\n                                export_import_chunked_data,\r\n                                &#x5B;\r\n                                    args.target_server,\r\n                                    args.target_database,\r\n                                    args.target_schema,\r\n                                    args.source_server,\r\n                                    args.source_database,\r\n                                    args.source_schema,\r\n                                    table_name,\r\n                                    columns,\r\n                                    export_path,\r\n                                    vals,\r\n                                    idx,\r\n                                    pk_column_name,\r\n                                ],\r\n                            )\r\n                    else:\r\n                        p.apply_async(\r\n                            export_import_data,\r\n                            &#x5B;\r\n                                args.target_server,\r\n                                args.target_database,\r\n                                args.target_schema,\r\n                                args.source_server,\r\n                                args.source_database,\r\n                                args.source_schema,\r\n                                table_name,\r\n                                columns,\r\n                                export_path,\r\n                            ],\r\n                        )\r\n                p.close()\r\n                p.join()\r\n            except Exception as e:\r\n                print(e)\r\n        else:\r\n            raise Exception(\r\n                &quot;Specyfied folder does not exist. Please troubleshoot!&quot;)\r\n    else:\r\n        raise Exception(\r\n            &quot;No data retrieved from the database...Please troubleshoot!&quot;)\r\n\r\n\r\nif __name__ == &quot;__main__&quot;:\r\n    main()\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">Once kicked off, the multiprocessing Python module spins up multiple instances of bcp utility. The below footage demonstrates those running as individual processes (limited to 4 based on the number of cores allocated to the test virtual machine) on data transfer limited to 4 test objects: tables named &#8216;client&#8217;, &#8216;clientaddress&#8217;, &#8216;news&#8217;, &#8216;report&#8217;. Naturally, the more cores allocated the better the performance should be.<\/p>\n<p><iframe loading=\"lazy\" src=\"https:\/\/www.youtube.com\/embed\/ZbISJnBqjJc\" width=\"580\" height=\"325\" frameborder=\"0\" allowfullscreen=\"allowfullscreen\"><\/iframe><\/p>\n<h3 style=\"text-align: center;\">Testing Results<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">Running the script with a mixture of different tables (60 in total), across a 15Mbps link, on a 4 core machine with 64GB DDR4 RAM allocated to the instance (target environment) yielded pretty much the same transfer speeds results as using a direct RDBMS-to-RDBMS connection. The volume of data (table size &#8211; index size) equaling to around 7 GB in SQL Server (4GB as extracted CSV files) yielded a disappointing 45 minutes transfer time. The key issue with my setup specifically was the mediocre network speed, with SQL Server engine having to spend around 75 percent of this time (33 minutes) waiting for data to arrive. Upgrading network link connection allowed for much faster processing time, making this method of data acquisition and staging a very compelling option for a contingency or fail-over method of sourcing application data into landing area of a data warehouse environment. Below images show how upgrading network speed (most of times the new connection speed oscillated between 140Mbps and 160Mbps due to VPN\/firewall restrictions) removed the network bottleneck and maximized hardware resources utilization across this virtual environment (vCPUs usage no longer idling, waiting for network packets).<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2018\/10\/BCP_Data_Acquisitions_Network_Speed.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-3746\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2018\/10\/BCP_Data_Acquisitions_Network_Speed.png\" alt=\"\" width=\"580\" height=\"352\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/10\/BCP_Data_Acquisitions_Network_Speed.png 796w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/10\/BCP_Data_Acquisitions_Network_Speed-300x182.png 300w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/10\/BCP_Data_Acquisitions_Network_Speed-768x466.png 768w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2018\/10\/BCP_Data_Acquisitions_CPU_Utilization.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-3748\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2018\/10\/BCP_Data_Acquisitions_CPU_Utilization.png\" alt=\"\" width=\"580\" height=\"348\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/10\/BCP_Data_Acquisitions_CPU_Utilization.png 793w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/10\/BCP_Data_Acquisitions_CPU_Utilization-300x180.png 300w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2018\/10\/BCP_Data_Acquisitions_CPU_Utilization-768x461.png 768w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">When re-running the same acquisition across the upgraded link the processing time was reduced down to 5 minutes.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">So there you go, a simple and straightforward micro-framework for concurrent bcp utility data extraction which can be used as a contingency method for data acquisitions or for testing purposes.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Although building a data acquisition framework for a data warehouse isn\u2019t nearly or as interesting as doing analytics or data mining on the already well-structured data, it\u2019s an essential part of any data warehousing project. I have already written extensively on how one could build a robust data acquisition framework in THIS series of blog [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[56,5,46],"tags":[58,24,41,49,19],"class_list":["post-3572","post","type-post","status-publish","format-standard","hentry","category-programming","category-sql","category-sql-server","tag-data-modelling","tag-programming","tag-python","tag-sql","tag-sql-server"],"aioseo_notices":[],"jetpack_featured_media_url":"","_links":{"self":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/3572","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/comments?post=3572"}],"version-history":[{"count":51,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/3572\/revisions"}],"predecessor-version":[{"id":3981,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/3572\/revisions\/3981"}],"wp:attachment":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/media?parent=3572"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/categories?post=3572"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/tags?post=3572"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}