Data Acquisition Framework Using Custom Python Wrapper For Concurrent BCP Utility Execution

October 2nd, 2018 / No Comments » / by admin

Although building a data acquisition framework for a data warehouse isn’t nearly or as interesting as doing analytics or data mining on the already well-structured data, it’s an essential part of any data warehousing project. I have already written extensively on how one could build a robust data acquisition framework in THIS series of blog posts, however, given the rapidly growing ecosystem of data sources and data stores, more and more projects tend to gravitate to the more application-agnostic storage formats e.g. text files or column-oriented files e.g. parquet. As such, direct data store connection may not be a possibility or the most optimal way of exporting data, making flat file export/import option a good alternative.

Microsoft offers a few different tools to facilitate flat files imports and exports, the most commonly used being bcp, BULK INSERT and their own ETL tool, capable of connecting to and reading flat files – SSIS. In this demo, I will analyse the difference in performance for data export and import between using pure T-SQL (executed as a series of concurrently running SQL Agent jobs) and parallelised bcp operations (using custom Python wrapper). Below image depicts high-level architecture of the solution with a short video clip demonstrating the actual code execution on a sample dataset at the end of this post. Let’s compare the pair and see if any of them is better at moving data across the wire.

Database and Schema Setup

Most of data warehouses come with some form of metadata layer so for this exercise I have ‘scrapped’ the source metadata e.g. attributes such as object names, data types, row counts etc. into a Sqlite local database. I also copied over partial target database metadata (with a bit more structure imposed) from one of the systems I’m currently working with. This resulted in creating the following schema of tables containing key information on source and target objects, their attributes, their state etc.

Target metadata is contained to three tables which allow for a three-layer, hierarchical approach to data acquisition:

  • Ctrl_DataAcquisition_ObjectLevelExceptions table stores details of database objects with some other peripheral information. The grain of this table is set to the object (table) level which then allows for an easy object tagging and acquisitions inclusion/exclusion.
  • Ctrl_DataAcquisition_AttributeLevelExceptions table stores attributes level data. It references ‘objects’ table and determines which fields out of the referenced object are to be included/excluded.
  • Ctrl_DataAcquisition_DataLevelExceptions table stores any statements which need to be executed in order to remove/retain specific values.

As a result, thanks to this setup, we now have the option to dictate which objects are to be acquired, which fields from those objects are to be retained/omitted and finally which values out of these attributes are to be persisted/hashed/obfuscated/removed etc. Source-level metadata (in this scenario) was diluted to a single table containing some of the key attributes and data to fulfil the requirements for this post.

Creating the metadata repository, depending on the system’s complexity and level of detail required, can be quite challenging in itself so I will leave out the particulars out of this post, however, it’s important to note that this mini-framework would not be possible without storing and comparing both systems’ metadata i.e. source database and target database.

This post also scratches the surface of actual metadata layer implementation details but suffice to say that any well designed and architected framework should lean heavily on metadata. For this post, mainly to preserve the brevity and the focus on the problem analysed, I have obfuscated the details of target metadata creation and diluted source metadata ‘scrapping’ to bare minimum.

The key information sourced, aside from which tables and columns to handle, is essential to providing a more flexible approach to data extraction for the following reasons:

  • Primary key flag – identifies a specific attribute as either a primary key or not a primary key in order to split a large table into multiple ‘chunks’. As most primary keys are of data type INTEGER (or its derivative e.g. SMALLINT, TINYINT, BIGINT) splitting larger tables into smaller streams of data and files allows for better resources utilisation.
  • Column data type – useful in checking whether arithmetic can be performed on the given table’s primary key i.e. if it’s not an INTEGER (or similar) than subdividing into smaller partitions will not be an option.
  • Rows count – this data is required I order to partition larger tables into smaller shards based on the total record count.
  • Minimum record count – this value is essential for partitioning larger tables which do not have their seeding values starting from one.
  • Maximum record count – same as above, this piece of data is required to ascertain the ceiling value for further data ‘chunking’.

The following Python script was used to create and populate MetadataDB.db database.

import configparser
import sqlite3
import os
import sys
import pyodbc


config = configparser.ConfigParser()
config.read("params.cfg")

dbsqlite_location = config.get("Sqlite", os.path.normpath("dbsqlite_location"))
dbsqlite_fileName = config.get("Sqlite", "dbsqlite_fileName")
dbsqlite_sql = config.get("Sqlite", "dbsqlite_sql")
dbsqlite_import_tables = config.get("Tables_To_Import", "local_mssql_meta_tables")
dbsqlite_import_tables = dbsqlite_import_tables.split(",")
dbsqlite_excluded_attribs = config.get("Tables_To_Import", "local_mssql_meta_attributes_to_exclude")
dbsqlite_excluded_attribs = dbsqlite_excluded_attribs.split(",")
dbsqlite_import_tables_remote = config.get("Tables_To_Import", "remote_meta_tables")


def get_sql(dbsqlite_location, dbsqlite_fileName, dbsqlite_sql):
    """
    Source operation types from the SQL file - denoted by the use of four dash characteres
    and store them in a dictionary (referenced later)
    """
    if os.path.isfile(dbsqlite_fileName):
        os.remove(dbsqlite_fileName)
    operations = []
    commands = []

    with open(dbsqlite_sql, "r") as f:
        for i in f:
            if i.startswith("----"):
                i = i.replace("----", "")
                operations.append(i.rstrip("\n"))

    tempCommands = []
    with open(dbsqlite_sql, "r") as f:
        for i in f:
            tempCommands.append(i)
        l = [i for i, s in enumerate(tempCommands) if "----" in s]
        l.append((len(tempCommands)))
        for first, second in zip(l, l[1:]):
            commands.append("".join(tempCommands[first:second]))
    sql = dict(zip(operations, commands))
    return sql


def build_db(dbsqlite_location, dbsqlite_fileName, sql):
    """
    Build the database
    """
    print("Building TableauEX.db database schema... ")
    with sqlite3.connect(
        os.path.join(dbsqlite_location, dbsqlite_fileName)
    ) as sqlLiteConn:
        sqlCommands = sql.get("SQL 1: Build_Meta_DB").split(";")
        for c in sqlCommands:
            try:
                sqlLiteConn.execute(c)
            except Exception as e:
                print(e)
                sqlLiteConn.rollback()
                sys.exit(1)
            else:
                sqlLiteConn.commit()


def populate_db_from_mssql_meta(
    dbsqlite_import_tables, dbsqlite_excluded_attribs, sql, **options
):
    """
    Source local instance MSSQL metadata 
    and populate Sqlite tables with the data. 
    """
    mssql_args = {
        "Driver": options.get("Driver", "ODBC Driver 13 for SQL Server"),
        "Server": options.get("Server", "ServerName\\InstanceName"),
        "Database": options.get("Database", "Metadata_DB_Name"),
        "Trusted_Connection": options.get("Trusted_Connection", "yes"),
    }

    cnxn = pyodbc.connect(**mssql_args)
    if cnxn:
        with sqlite3.connect(
            os.path.join(dbsqlite_location, dbsqlite_fileName)
        ) as sqlLiteConn:
            mssql_cursor = cnxn.cursor()
            for tbl in dbsqlite_import_tables:
                try:
                    tbl_cols = {}
                    mssql_cursor.execute(
                        """SELECT c.ORDINAL_POSITION, c.COLUMN_NAME 
                           FROM Metadata_DB_Name.INFORMATION_SCHEMA.COLUMNS c 
                           WHERE c.TABLE_SCHEMA = 'dbo' 
                           AND c.TABLE_NAME = '{}' 
                           AND c.COLUMN_NAME NOT IN ({})
                           ORDER BY 1 ASC;""".format(
                            tbl,
                            ", ".join(
                                map(lambda x: "'" + x + "'",
                                    dbsqlite_excluded_attribs)
                            ),
                        )
                    )
                    rows = mssql_cursor.fetchall()
                    for row in rows:
                        tbl_cols.update({row[0]: row[1]})
                    sortd = [tbl_cols[key] for key in sorted(tbl_cols.keys())]
                    cols = ",".join(sortd)
                    mssql_cursor.execute("SELECT {} FROM {}".format(cols, tbl))
                    rows = mssql_cursor.fetchall()
                    num_columns = max(len(rows[0]) for t in rows)
                    mssql = "INSERT INTO {} ({}) VALUES({})".format(
                        tbl, cols, ",".join("?" * num_columns)
                    )
                    sqlLiteConn.executemany(mssql, rows)
                except Exception as e:
                    print(e)
                    sys.exit(1)
                else:
                    sqlLiteConn.commit()


def populate_db_from_meta(dbsqlite_import_tables_remote, sql, **options):
    """
    Source remote MSSQL server metadata 
    and populate nominated Sqlite table with the data. 
    """
    mssql_args = {
        "Driver": options.get("Driver", "ODBC Driver 13 for SQL Server"),
        "Server": options.get("Server", "ServerName\\InstanceName"),
        "Database": options.get("Database", "Metadata_DB_Name"),
        "Trusted_Connection": options.get("Trusted_Connection", "yes"),
    }
    with sqlite3.connect(
        os.path.join(dbsqlite_location, dbsqlite_fileName)
    ) as sqlLiteConn:

        print("Fetching external SQL Server metadata... ")
        cnxn = pyodbc.connect(**mssql_args)
        if cnxn:
            mssql_cursor = cnxn.cursor()
            sqlCommands = sql.get("SQL 2: Scrape_Source_DH2_Meta")
            try:
                mssql_cursor.execute(sqlCommands)
                rows = mssql_cursor.fetchall()
                num_columns = max(len(rows[0]) for t in rows)
                mssql = "INSERT INTO {} (linked_server_name, \
                                            database_name, \
                                            schema_name, \
                                            table_name, \
                                            column_name, \
                                            column_data_type, \
                                            column_id, \
                                            pk_flag, \
                                            rows_count, \
                                            min_pk_value, \
                                            max_pk_value) \
                            VALUES({})".format(
                    dbsqlite_import_tables_remote, ",".join("?" * num_columns)
                )
                sqlLiteConn.executemany(mssql, rows)
            except Exception as e:
                sys.exit(1)
            else:
                sqlLiteConn.commit()


def main():
    sql = get_sql(dbsqlite_location, dbsqlite_fileName, dbsqlite_sql)
    build_db(dbsqlite_location, dbsqlite_fileName, sql)
    populate_db_from_mssql_meta(dbsqlite_import_tables, dbsqlite_excluded_attribs, sql)
    populate_db_from_meta(dbsqlite_import_tables_remote, sql)


if __name__ == '__main__':
    main()

At runtime, this script also calls a SQL query which copies data from an already existing metadata database (on the target server) as well as queries source database for all of its necessary metadata. The full SQL file as well as all other scripts used in this post can be downloaded from my OneDriver folder HERE. When populated with the source and target data, a SQL query is issued as part of the subsequent script execution which joins source and target metadata and ‘flattens’ the output, comma-delimiting attribute values (most column names blurred-out) into an array used to specify individual columns names as per the screenshot below (click on image to enlarge).

Python Wrapper For Concurrent BCP Utility Execution

Now that we have our source and target metadata defined we can start testing transfer times. In order to take advantage of all resources available, multiple bcp processes can be spawned using Python’s multiprocessing library for both: extracting data out of the source environment as well as importing it into target database. The following slapdash Python script, comprised mainly of the large SQL statement which pulls source and target metadata together, can be used to invoke concurrent bcp sessions and further subdivide larger tables into smaller batches which are streamed independently of each other.

The script, along with the database structure, can be further customised to include features such as allowing incremental, delta-only data transfer and error logging via either Python-specific functionality or bcp utility ‘-e err_file’ switch usage with just a few lines of code.

 
from os import path, system
from multiprocessing import Pool, cpu_count
import argparse
import time
import pyodbc
import sqlite3
import configparser
import build_db

odbcDriver = "{ODBC Driver 13 for SQL Server}"

config = configparser.ConfigParser()
config.read("params.cfg")

dbsqlite_location = config.get("Sqlite", path.normpath("dbsqlite_location"))
dbsqlite_fileName = config.get("Sqlite", "dbsqlite_fileName")
dbsqlite_sql = config.get("Sqlite", "dbsqlite_sql")
export_path = config.get("Files_Path", path.normpath("export_path"))
odbcDriver = "{ODBC Driver 13 for SQL Server}"

parser = argparse.ArgumentParser(
    description="SQL Server Data Loading/Extraction BCP Wrapper by bicortex.com"
)
parser.add_argument(
    "-SSVR", "--source_server", help="Source Linked Server Name", required=True, type=str
)
parser.add_argument(
    "-SDB", "--source_database", help="Source Database Name", required=True, type=str
)
parser.add_argument(
    "-SSCH", "--source_schema", help="Source Schema Name", required=True, type=str
)
parser.add_argument(
    "-TSVR", "--target_server", help="Target Server Name", required=True, type=str
)
parser.add_argument(
    "-TDB", "--target_database", help="Target Database Name", required=True, type=str
)
parser.add_argument(
    "-TSCH", "--target_schema", help="Target Schema Name", required=True, type=str
)
args = parser.parse_args()

if (
    not args.source_server
    or not args.source_database
    or not args.source_schema
    or not args.target_server
    or not args.target_database
    or not args.target_schema
):
    parser.print_help()
    exit(1)


def get_db_data(linked_server_name, remote_db_name, remote_schema_name):
    """
    Connect to Sqlite database and get all metadata required for
    sourcing target data and subsequent files generation
    """
    with sqlite3.connect(
        path.join(dbsqlite_location, dbsqlite_fileName)
    ) as sqlLiteConn:
        sqliteCursor = sqlLiteConn.cursor()
        sqliteCursor.execute(
            """
        SELECT
        Database_Name,
        Schema_Name,
        Table_Name,
        GROUP_CONCAT(
          Column_Name ) AS Column_Names,
        Is_Big,
        ETL_Batch_No,
        Rows_Count,
        Min_PK_Value,
        Max_PK_Value,
        GROUP_CONCAT(
         CASE
           WHEN PK_Column_Name != 'N/A'
                   THEN PK_Column_Name
             END) AS PK_Column_Name
        FROM (SELECT b.Remote_DB_Name AS Database_Name,
             b.Remote_Schema_Name AS Schema_Name,
             a.Table_Name,
             a.Column_Name,
             b.Is_Big,
             b.ETL_Batch_No,
             b.Rows_Count,
             CASE
               WHEN a.PK_Flag = 1
                 THEN a.Column_Name
               ELSE 'N/A'
             END AS PK_Column_Name,
             a.Min_PK_Value,
             a.Max_PK_Value
                FROM Ctrl_DataAcquisition_SourceDB_MetaData a
             JOIN Ctrl_DataAcquisition_ObjectLevelExceptions b
             ON a.Table_Name = b.Remote_Object_Name
             AND b.Remote_Schema_Name = '{schema}'
             AND b.Remote_DB_Name = '{db}'
             AND b.Remote_Server_Name = '{lsvr}'
        WHERE b.Is_Active = 1
        AND a.Column_Data_Type != 'timestamp'
        AND a.Rows_Count > 0
        AND (Min_PK_Value != -1 OR Max_PK_Value != -1)
        AND NOT EXISTS(
                  SELECT 1
                  FROM Ctrl_DataAcquisition_AttributeLevelExceptions o
                  WHERE o.Is_Active = 1
                    AND o.Local_Attribute_Name = a.Column_Name
                    AND o.FK_ObjectID = b.ID
          )
        ORDER BY 1, 2, 3, a.Column_ID)
        GROUP BY
        Database_Name,
        Schema_Name,
        Table_Name,
        Is_Big,
        ETL_Batch_No,
        Rows_Count,
        Min_PK_Value
        """.format(
                lsvr=linked_server_name, schema=remote_schema_name, db=remote_db_name
            )
        )
        db_data = sqliteCursor.fetchall()
        return db_data


def split_into_ranges(start, end, parts):
    """
    Based on input parameters, split number of records into
    consistent and record count-like shards of data and
    return to the calling function
    """
    ranges = []
    x = round((end - start) / parts)
    for _ in range(parts):
        ranges.append([start, start + x])
        start = start + x + 1
        if end - start <= x:
            remainder = end - ranges[-1][-1]
            ranges.append([ranges[-1][-1] + 1, ranges[-1][-1] + remainder])
            break
    return ranges


def truncate_target_table(schema, table, **options):
    """ 
    Truncate target table before data is loaded 
    """
    mssql_args = {"Driver": options.get("Driver", "ODBC Driver 13 for SQL Server"),
                  "Server": options.get("Server", "ServerName\\InstanceName"),
                  "Database": options.get("Database", "Staging_DB_Name"),
                  "Trusted_Connection": options.get("Trusted_Connection", "yes"), }
    cnxn = pyodbc.connect(**mssql_args)
    if cnxn:
        try:
            mssql_cursor = cnxn.cursor()
            sql_truncate = "TRUNCATE TABLE {db}.{schema}.{tbl};".format(
                db=mssql_args.get("Database"), schema=schema, tbl=table)
            print("Truncating {tbl} table...".format(tbl=table))
            mssql_cursor.execute(sql_truncate)
            cnxn.commit()
            sql_select = "SELECT TOP 1 1 FROM {db}.{schema}.{tbl};".format(
                db=mssql_args.get("Database"), schema=schema, tbl=table)
            mssql_cursor.execute(sql_select)
            rows = mssql_cursor.fetchone()
            if rows:
                raise Exception(
                    "Issue truncating target table...Please troubleshoot!")
        except Exception as e:
            print(e)
        finally:
            cnxn.close()


def export_import_data(target_server, target_database, target_schema, source_server, source_database, source_schema, table_name, columns, export_path, ):
    """
    Export source data into a 'non-chunked' CSV file calling SQL Server
    bcp utility and import staged CSV file into the target SQL Server instance.
    This function also provides a verbose way to output runtime details of which
    object is being exported/imported and the time it took to process.
    """
    full_export_path = path.join(export_path, table_name + ".csv")
    start = time.time()
    print("Exporting '{tblname}' table into {file} file...".format(
        tblname=table_name, file=table_name + ".csv"))
    bcp_export = 'bcp "SELECT * FROM OPENQUERY ({ssvr}, \'SELECT {cols} FROM {sdb}.{schema}.{tbl}\')" queryout {path} -T -S {tsvr} -q -c -t "|" -r "\\n" 1>NUL'.format(
        ssvr=source_server,
        sdb=source_database,
        schema=source_schema,
        cols=columns,
        tbl=table_name,
        path=full_export_path,
        tsvr=target_server,
    )

    system(bcp_export)

    end = time.time()
    elapsed = round(end - start)
    if path.isfile(full_export_path):
        print("File '{file}' exported in {time} seconds".format(
            file=table_name + ".csv", time=elapsed
        ))

    start, end, elapsed = None, None, None

    start = time.time()
    print(
        "Importing '{file}' file into {tblname} table...".format(
            tblname=table_name, file=table_name + ".csv"
        )
    )

    bcp_import = 'bcp {schema}.{tbl} in {path} -S {tsvr} -d {tdb} -h "TABLOCK" -T -q -c -t "|" -r "\\n" 1>NUL'.format(
        schema=target_schema,
        tbl=table_name,
        path=full_export_path,
        tsvr=target_server,
        tdb=target_database,
    )
    system(bcp_import)

    end = time.time()
    elapsed = round(end - start)
    print("File '{file}' imported in {time} seconds".format(
        file=table_name + ".csv", time=elapsed
    ))


def export_import_chunked_data(
    target_server,
    target_database,
    target_schema,
    source_server,
    source_database,
    source_schema,
    table_name,
    columns,
    export_path,
    vals,
    idx,
    pk_column_name,
):
    """
    Export source data into a 'chunked' CSV file calling SQL Server bcp utility 
    and import staged CSV file into the target SQL Server instance.
    This function also provides a verbose way to output runtime details
    of which object is being exported/imported and the time it took to process.
    """
    full_export_path = path.join(export_path, table_name + str(idx) + ".csv")
    start = time.time()
    print(
        "Exporting '{tblname}' table into {file} file ({pk}s between {minv} and {maxv})...".format(
            tblname=table_name,
            file=table_name + str(idx) + ".csv",
            pk=pk_column_name,
            minv=str(int(vals[0])),
            maxv=str(int(vals[1])),
        )
    )

    bcp_export = 'bcp "SELECT * FROM OPENQUERY ({ssvr}, \'SELECT {cols} FROM {sdb}.{schema}.{tbl} WHERE {pk} BETWEEN {minv} AND {maxv}\')" queryout {path} -T -S {tsvr} -q -c -t "|" -r "\\n" 1>NUL'.format(
        ssvr=source_server,
        sdb=source_database,
        schema=source_schema,
        cols=columns,
        tbl=table_name,
        path=full_export_path,
        tsvr=target_server,
        pk=pk_column_name,
        minv=str(int(vals[0])),
        maxv=str(int(vals[1])),
    )

    system(bcp_export)

    end = time.time()
    elapsed = round(end - start)
    if path.isfile(full_export_path):
        print("File '{file}' exported in {time} seconds".format(
            file=table_name + str(idx) + ".csv", time=elapsed
        ))
    start, end, elapsed = None, None, None

    start = time.time()
    print(
        "Importing '{file}' file into {tblname} table ({pk}s between {minv} and {maxv})...".format(
            tblname=table_name,
            file=table_name + str(idx) + ".csv",
            pk=pk_column_name,
            minv=str(int(vals[0])),
            maxv=str(int(vals[1])),
        )
    )

    bcp_import = 'bcp {schema}.{tbl} in {path} -S {tsvr} -d {tdb} -T -q -c -t "|" -r "\\n" 1>NUL'.format(
        schema=target_schema,
        tbl=table_name,
        path=full_export_path,
        tsvr=target_server,
        tdb=target_database,
    )

    system(bcp_import)

    end = time.time()
    elapsed = round(end - start)
    print("File '{file}' imported in {time} seconds".format(
        file=table_name + str(idx) + ".csv", time=elapsed
    ))


def main():
    """
    Call export/import 'chunked' and 'non-chunked' data functions and
    process each database object as per the metadata information 
    in a concurrent fashion 
    """
    build_db.main()
    db_data = get_db_data(args.source_server,
                          args.source_database, args.source_schema)
    if db_data:
        if path.exists(export_path):
            try:
                p = Pool(processes=cpu_count())
                for row in db_data:
                    table_name = row[2]
                    columns = row[3]
                    is_big = int(row[4])
                    etl_batch_no = int(row[5])
                    min_pk_value = int(row[7])
                    max_pk_value = int(row[8])
                    pk_column_name = row[9]

                    truncate_target_table(
                        schema=args.target_schema, table=table_name)

                    if is_big == 1:
                        ranges = split_into_ranges(
                            min_pk_value, max_pk_value, etl_batch_no
                        )
                        for idx, vals in enumerate(ranges):
                            p.apply_async(
                                export_import_chunked_data,
                                [
                                    args.target_server,
                                    args.target_database,
                                    args.target_schema,
                                    args.source_server,
                                    args.source_database,
                                    args.source_schema,
                                    table_name,
                                    columns,
                                    export_path,
                                    vals,
                                    idx,
                                    pk_column_name,
                                ],
                            )
                    else:
                        p.apply_async(
                            export_import_data,
                            [
                                args.target_server,
                                args.target_database,
                                args.target_schema,
                                args.source_server,
                                args.source_database,
                                args.source_schema,
                                table_name,
                                columns,
                                export_path,
                            ],
                        )
                p.close()
                p.join()
            except Exception as e:
                print(e)
        else:
            raise Exception(
                "Specyfied folder does not exist. Please troubleshoot!")
    else:
        raise Exception(
            "No data retrieved from the database...Please troubleshoot!")


if __name__ == "__main__":
    main()

Once kicked off, the multiprocessing Python module spins up multiple instances of bcp utility. The below footage demonstrates those running as individual processes (limited to 4 based on the number of cores allocated to the test virtual machine) on data transfer limited to 4 test objects: tables named ‘client’, ‘clientaddress’, ‘news’, ‘report’. Naturally, the more cores allocated the better the performance should be.

Testing Results

Running the script with a mixture of different tables (60 in total), across a 15Mbps link, on a 4 core machine with 64GB DDR4 RAM allocated to the instance (target environment) yielded pretty much the same transfer speeds results as using a direct RDBMS-to-RDBMS connection. The volume of data (table size – index size) equaling to around 7 GB in SQL Server (4GB as extracted CSV files) yielded a disappointing 45 minutes transfer time. The key issue with my setup specifically was the mediocre network speed, with SQL Server engine having to spend around 75 percent of this time (33 minutes) waiting for data to arrive. Upgrading network link connection allowed for much faster processing time, making this method of data acquisition and staging a very compelling option for a contingency or fail-over method of sourcing application data into landing area of a data warehouse environment. Below images show how upgrading network speed (most of times the new connection speed oscillated between 140Mbps and 160Mbps due to VPN/firewall restrictions) removed the network bottleneck and maximized hardware resources utilization across this virtual environment (vCPUs usage no longer idling, waiting for network packets).

When re-running the same acquisition across the upgraded link the processing time was reduced down to 5 minutes.

So there you go, a simple and straightforward micro-framework for concurrent bcp utility data extraction which can be used as a contingency method for data acquisitions or for testing purposes.

Tags: , , , ,

Kicking the tires on BigQuery – Google’s Serverless Enterprise Data Warehouse (Part 1)

September 21st, 2018 / No Comments » / by admin

Note: Part 2 can be found HERE.

Introduction

As traditional, on-premise hosted data warehouse solutions are increasingly becoming harder to scale and manage, a new breed of vendors and products is starting to emerge, one which can easily accommodate exponentially growing data volumes with little up-front cost and very little operational overhead. More and more do we get to see cash-strapped IT departments or startups being pushed by their VC overlords to provide immediate ROI, not being able to take time and build up their analytical capability following the old-fashion, waterfall trajectory. Instead, the famous Facebook mantra of “move fast and break things” is gradually becoming a standard to live up to, with cloud-first, pay-for-what-you-use, ops-free services making inroads or displacing/replacing ‘old guard’ technologies. Data warehousing domain has successfully avoided being dragged into the relentless push for optimisation and efficiency for a very long time, with many venerable vendors (IBM, Oracle, Teradata) resting on their laurels, sometimes for decades. However, with the advent of the cloud computing, new juggernauts emerged and Google’s BigQuery is slowly becoming synonymous with petabyte-scale, ops-free, ultra-scalable and extremely fast data warehouse service. And while BigQuery has not (yet) become the go-to platform for large volumes of data storage and processing, big organisations such as New York Times and Spotify decided to adopt it, bucking the industry trend of selecting existing cloud data warehouse incumbents e.g. AWS Redshift or Microsoft SQL DW.

BigQuery traces its roots back to 2010, when Google released beta access to a new SQL processing system based on its distributed query engine technology, called Dremel, which was described in an influential paper released the same year. BigQuery and Dremel share the same underlying architecture and by incorporating columnar storage and tree architecture of Dremel, BigQuery manages to offer unprecedented performance. But, BigQuery is much more than Dremel which serves as the execution engine for the BigQuery. In fact, BigQuery service leverages Google’s innovative technologies like Borg – large-scale cluster management system, Colossus – Google’s latest generation distributed file system, Capacitor – columnar data storage format, and Jupiter – Google’s networking infrastructure. As illustrated below, a BigQuery client interact with Dremel engine via a client interface. Borg – Google’s large-scale cluster management system – allocates the compute capacity for the Dremel jobs. Dremel jobs read data from Google’s Colossus file systems using Jupiter network, perform various SQL operations and return results to the client.

Having previously worked with a number of other vendors which have successfully occupied this realm for a very long time, it was refreshing to see how different BigQuery is and what separates it from the rest of the competition. But before I get into the general observations and the actual conclusion, let’s look at how one can easily process, load and query large volumes of data (TPC-DS data in this example) using BigQuery and a few Python scripts for automation.

TPC-DS Schema Creation and Data Loading

BigQuery offers a number of ways one can interact with it and take advantage of its functionality. The most basic one is through the GCP Web UI but for repetitive tasks users will find themselves mainly utilising Google Cloud SDK or via Google BigQuery API. For the purpose of this example, I have generated two TPC-DS datasets – 100GB and 300GB – and staged them on the local machine’s folder. I have also created a TPC-DS schema JSON file which is used by the script to generate all dataset objects. All these resources, as well as the scripts used in this post can be found in my OneDrive folder HERE.

The following config file parameters and Python code are used to mainly clean up the flat files (TPC-DS delimits line ending with a “|” character which needs to be removed), break them up into smaller chunks and upload them to Google Cloud storage service (it assumes that gs://tpc_ds_data_100GB and gs://tpc_ds_data_300GB buckets has already been created). The schema structure used for this project is a direct translation of the PostgreSQL-specific TPC-DS schema used in one of my earlier projects where data types and their NULL-ability have been converted and standardized to conform to BigQuery standards.

import configparser
from tqdm import tqdm
from os import path, rename, listdir, remove, walk
from shutil import copy2, move
from google.cloud import storage
from csv import reader, writer


config = configparser.ConfigParser()
config.read("params.cfg")

bq_tpc_ds_schema_as_json = config.get(
    "Files_Path", path.normpath("bq_tpc_ds_schema_as_json"))
bq_local_source_files_path = config.get(
    "Files_Path", path.normpath("bq_local_source_files_path"))
bq_local_target_files_path = config.get(
    "Files_Path", path.normpath("bq_local_target_files_path"))
storage_client = config.get("Big_Query", path.normpath("bq_client"))
bq_bucket_path = config.get("Big_Query", "bq_bucket_path")
bq_storage_client = storage.Client.from_service_account_json(storage_client)


keep_src_files = True
keep_headers = False
keep_src_csv_files = True
row_limit = 1000000
output_name_template = '_%s.csv'
delimiter = '|'


def RemoveFiles(DirPath):
    filelist = [f for f in listdir(DirPath)]
    for f in filelist:
        remove(path.join(DirPath, f))


def SourceFilesRename(bq_local_source_files_path):
    for file in listdir(bq_local_source_files_path):
        if file.endswith(".dat"):
            rename(path.join(bq_local_source_files_path, file),
                   path.join(bq_local_source_files_path, file[:-4]+'.csv'))


def SplitLargeFiles(file, row_count, bq_local_target_files_path, output_name_template, keep_headers=False):
    """
    Split flat files into smaller chunks based on the comparison between row count 
    and 'row_limit' variable value. This creates file_row_count/row_limit number of files which can 
    facilitate upload speed if parallelized. 
    """
    file_handler = open(path.join(bq_local_target_files_path, file), 'r')
    csv_reader = reader(file_handler, delimiter=delimiter)
    current_piece = 1
    current_out_path = path.join(bq_local_target_files_path, path.splitext(file)[
                                 0]+output_name_template % current_piece)
    current_out_writer = writer(
        open(current_out_path, 'w', newline=''), delimiter=delimiter)
    current_limit = row_limit
    if keep_headers:
        headers = next(csv_reader)
        current_out_writer.writerow(headers)
    pbar = tqdm(total=row_count)
    for i, row in enumerate(csv_reader):
        pbar.update()
        if i + 1 > current_limit:
            current_piece += 1
            current_limit = row_limit * current_piece
            current_out_path = path.join(bq_local_target_files_path, path.splitext(file)[
                                         0]+output_name_template % current_piece)
            current_out_writer = writer(
                open(current_out_path, 'w', newline=''), delimiter=delimiter)
            if keep_headers:
                current_out_writer.writerow(headers)
        current_out_writer.writerow(row)
    pbar.close()
    print("\n")


def ProcessLargeFiles(bq_local_source_files_path, bq_local_target_files_path, output_name_template, row_limit):
    """
    Remove trailing '|' characters from the files generated by the TPC-DS utility
    as they intefere with BQ load function. As BQ does not allow for denoting end-of-line 
    characters in the flat file, this breaks data load functionality. This function also calls
    the 'SplitLargeFiles' function resulting in splitting files with the row count > 'row_limit'
    variable value into smaller chunks/files.
    """
    RemoveFiles(bq_local_target_files_path)
    fileRowCounts = []
    for file in listdir(bq_local_source_files_path):
        if file.endswith(".csv"):
            fileRowCounts.append([file, sum(1 for line in open(
                path.join(bq_local_source_files_path, file), newline=''))])
    for file in fileRowCounts:
        if file[1] > row_limit:
            print(
                "Removing trailing characters in {f} file...".format(f=file[0]))
            RemoveTrailingChar(row_limit, path.join(
                bq_local_source_files_path, file[0]), path.join(bq_local_target_files_path, file[0]))
            print("\nSplitting file:", file[0],)
            print("Measured row count:", file[1])
            print("Progress...")
            SplitLargeFiles(
                file[0], file[1], bq_local_target_files_path, output_name_template)
            remove(path.join(bq_local_target_files_path, file[0]))
        else:
            print(
                "Removing trailing characters in {f} file...".format(f=file[0]))
            RemoveTrailingChar(row_limit, path.join(
                bq_local_source_files_path, file[0]), path.join(bq_local_target_files_path, file[0]))
    RemoveFiles(bq_local_source_files_path)


def RemoveTrailingChar(row_limit, bq_local_source_files_path, bq_local_target_files_path):
    """
    Remove trailing '|' characters from files' line ending. 
    """
    line_numer = row_limit
    lines = []
    with open(bq_local_source_files_path, "r") as r, open(bq_local_target_files_path, "w") as w:
        for line in r:
            if line.endswith('|\n'):
                lines.append(line[:-2]+"\n")
                if len(lines) == line_numer:
                    w.writelines(lines)
                    lines = []
        w.writelines(lines)


def GcsUploadBlob(bq_storage_client, bq_local_target_files_path, bq_bucket_path):
    """
    Upload files into the nominated GCP bucket if it does not exists. 
    """
    bucket_name = (bq_bucket_path.split("//"))[1].split("/")[0]
    CHUNK_SIZE = 10485760
    bucket = bq_storage_client.get_bucket(bucket_name)
    print("\nCommencing files upload...")
    for file in listdir(bq_local_target_files_path):
        try:
            blob = bucket.blob(file, chunk_size=CHUNK_SIZE)
            file_exist = storage.Blob(bucket=bucket, name=file).exists(
                bq_storage_client)
            if not file_exist:
                print("Uploading file {fname} into {bname} GCP bucket...".format(
                    fname=file, bname=bucket_name))
                blob.upload_from_filename(
                    path.join(bq_local_target_files_path, file))
            else:
                print("Nominated file {fname} already exists in {bname} bucket. Moving on...".format(
                    fname=file, bname=bucket_name))
        except Exception as e:
            print(e)


if __name__ == "__main__":
    SourceFilesRename(bq_local_source_files_path)
    ProcessLargeFiles(bq_local_source_files_path,
                      bq_local_target_files_path, output_name_template, row_limit)
    GcsUploadBlob(bq_storage_client,
                  bq_local_target_files_path, bq_bucket_path)

Once all the TPC-DS data has been moved across into GCP storage bucket, we can proceed with creating BigQuery dataset (synonymous with the schema name in a typical Microsoft SQL Server or PostgreSQL RDBMS hierarchy) and its corresponding tables. Dataset is associated with a project which forms the overarching container, somewhat equivalent to the database if you come from a traditional RDBMS experience. It is also worth pointing out that dataset names must be unique per project, all tables referenced in a query must be stored in datasets in the same location and finally, geographic location can be set at creation time only. The following is an excerpt from the JSON file used to create the dataset schema which can be downloaded from my OneDrive folder HERE.

{
    "table_schema": [
        {
            "table_name": "dbgen_version",
            "fields": [
                {
                    "name": "dv_version",
                    "type": "STRING",
                    "mode": "NULLABLE"
                },
                {
                    "name": "dv_create_date",
                    "type": "DATE",
                    "mode": "NULLABLE"
                },
                {
                    "name": "dv_create_time",
                    "type": "TIME",
                    "mode": "NULLABLE"
                },
                {
                    "name": "dv_cmdline_args",
                    "type": "STRING",
                    "mode": "NULLABLE"
                }
            ]
        }
    ]
}

Now that the dataset is in place, we can create the required tables and populate them with the flat files data which was moved across into Google cloud storage using the previous code snippet. The following Python script creates tpc_ds_test_data dataset, creates all tables based on the TPC-DS schema stored in the JSON file and finally populates them with text files data.

from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.cloud import storage
from os import path
import configparser
import json
import re


config = configparser.ConfigParser()
config.read("params.cfg")

bq_tpc_ds_schema_as_json = config.get(
    "Files_Path", path.normpath("bq_tpc_ds_schema_as_json"))
bq_client = config.get("Big_Query", path.normpath("bq_client"))
bq_client = bigquery.Client.from_service_account_json(bq_client)
storage_client = config.get("Big_Query", path.normpath("bq_client"))
bq_bucket_path = config.get("Big_Query", "bq_bucket_path")
bq_storage_client = storage.Client.from_service_account_json(storage_client)
bq_ref_dataset = config.get("Big_Query", "bq_ref_dataset")


def dataset_bq_exists(bq_client, bq_ref_dataset):
    try:
        bq_client.get_dataset(bq_ref_dataset)
        return True
    except NotFound as e:
        return False


def bq_create_dataset(bq_client, bq_ref_dataset):
    """
    Create BigQuery dataset if it does not exists in the
    Australian location
    """
    if not dataset_bq_exists(bq_client, bq_ref_dataset):
        print("Creating {d} dataset...".format(d=bq_ref_dataset))
        dataset = bq_client.dataset(bq_ref_dataset)
        dataset = bigquery.Dataset(dataset)
        dataset.location = 'australia-southeast1'
        dataset = bq_client.create_dataset(dataset)
    else:
        print("Nominated dataset already exists. Moving on...")


def table_bq_exists(bq_client, ref_table):
    try:
        bq_client.get_table(ref_table)
        return True
    except NotFound as e:
        return False


def bq_drop_create_table(bq_client, bq_ref_dataset, bq_tpc_ds_schema_as_json):
    """
    Drop (if exists) and create schema tables on the nominated BigQuery dataset.
    Schema details are stored inside 'tpc_ds_schema.json' file.
    """
    dataset = bq_client.dataset(bq_ref_dataset)
    with open(bq_tpc_ds_schema_as_json) as schema_file:
        data = json.load(schema_file)
        for t in data['table_schema']:
            table_name = t['table_name']
            ref_table = dataset.table(t['table_name'])
            table = bigquery.Table(ref_table)
            if table_bq_exists(bq_client, ref_table):
                print("Table '{tname}' already exists in the '{dname}' dataset. Dropping table '{tname}'...".format(
                    tname=table_name, dname=bq_ref_dataset))
                bq_client.delete_table(table)
            for f in t['fields']:
                table.schema += (
                    bigquery.SchemaField(f['name'], f['type'], mode=f['mode']),)
            print("Creating table {tname} in the '{dname}' dataset...".format(
                tname=table_name, dname=bq_ref_dataset))
            table = bq_client.create_table(table)


def bq_load_data_from_file(bq_client, bq_ref_dataset,  bq_bucket_path, bq_storage_client):
    """
    Load data stored on the nominated GCP bucket into dataset tables
    """
    bucket_name = (bq_bucket_path.split("//"))[1].split("/")[0]
    bucket = bq_storage_client.get_bucket(bucket_name)
    blobs = bucket.list_blobs()
    dataset = bq_client.dataset(bq_ref_dataset)
    for blob in blobs:
        file_name = blob.name
        rem_char = re.findall("_\d+", file_name)
        if len(rem_char) == 0:
            ref_table = dataset.table(file_name[:file_name.index(".")])
            table_name = file_name[:file_name.index(".")]
        else:
            rem_char = str("".join(rem_char))
            ref_table = dataset.table(file_name[:file_name.index(rem_char)])
            table_name = file_name[:file_name.index(rem_char)]
        print("Loading file {f} into {t} table...".format(
            f=file_name, t=table_name))
        job_config = bigquery.LoadJobConfig()
        job_config.source_format = "CSV"
        job_config.skip_leading_rows = 0
        job_config.field_delimiter = "|"
        job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
        job = bq_client.load_table_from_uri(
            path.join(bq_bucket_path, file_name), ref_table, job_config=job_config)
        result = job.result()
        print(result.state)


if __name__ == "__main__":
    bq_create_dataset(bq_client, bq_ref_dataset)
    bq_drop_create_table(bq_client, bq_ref_dataset, bq_tpc_ds_schema_as_json)
    bq_load_data_from_file(bq_client, bq_ref_dataset,
                           bq_bucket_path, bq_storage_client)

BigQuery offers some special tables whose contents represent metadata, such as the list of tables and views in a dataset. To confirm all objects have been created in the correct dataset, we can interrogate the __TABLES__ or __TABLES_SUMMARY__ meta-tables as per the query below.

In the next post I will go over some of TPC-DS queries execution performance on BigQuery, BigQuery ML – Google’s take on in-database machine learning as well as some basic interactive dashboard building in Tableau.

Tags: , , , , , ,