Data Build Tool (DBT) – The Emerging Standard For Building SQL-First Data Transformation Pipelines – Part 2

January 20th, 2023 / 4 Comments » / by admin

In Part 1 of this series, I went over the high-level introduction to dbt, stood up a small development SQL Server database in Azure, acquired a sample size of Google Analytics data to populate a few tables used in this post and finally installed and configured dbt on a local environment. In this post I’d like to dive deeper into dbt functionality and outline some of its key features that make it an attractive proposition for data transformation and loading part of the ETL/ELT process.

DBT Models

In dbt framework, a model is simply a SELECT SQL statement. When executing dbt run command, dbt will build this model in our data warehouse by wrapping it in a CREATE VIEW AS or CREATE TABLE AS statement. By default dbt is configured to persist those SELECT SQL statements as views, however, this behaviour can be modified to take advantage of other materialization options e.g. table, ephemeral or incremental. Each type of materialization has its advantages and disadvantages and should be evaluated based on a specify use case and scenario. The following image depicts core pros and cons associated with each approach.

Before we dive headfirst into creating dbt models, first, let’s explore some high-level guiding principles around structuring our project files. The team at dbt recommends organizing your models into at least two different folders – staging and marts. In a simple project, these may be the only models we build; more complex projects may have a number of intermediate models that provide a better logical separation as well as accessories to these models.

  • Staging models – the atomic unit of data modeling. Each model bears a one-to-one relationship with the source data table it represents. It has the same granularity, but the columns have been renamed, recast, or standardized into a consistent format
  • Marts models – models that represent business processes and entities, abstracted from the data sources that they are based on. Where the work of staging models is limited to cleaning and preparing, fact tables are the product of substantive data transformation: choosing (and reducing) dimensions, date-spinning, executing business logic, and making informed decisions based on business requirements

Sometimes, mainly due to the level of data complexity or additional security requirements, further logical separation is recommended. In this case ‘Sources’ models layer is introduced before data is loaded into the Staging layer. Sources store schemas and tables in a source-conformed structure (i.e. tables and columns in a structure based on what an API returns), loaded by a third party tool.

Because we often work with multiple data sources, in our Staging and Marts directories, we create one folder per source – in our case, since we’re only working with a single source, we will simply call these google_analytics. Conforming to the dbt minimum standards for project organization and layout i.e. Staging and Marts layers, let’s create the required folders so that the overall structure looks as the one on the left.

At this point we should have everything in place to build our first model based on the table we created in the Azure SQL Server DB in the previous post. Creating simple models is dbt is a straightforward affair and in this case it’s just a SELECT SQL statement. To begin, in our staging\google_analytics folder we create a SQL file, name it after the source table we will be staging and save it with the following two-line statement.

{{ config(materialized='table') }}
SELECT * FROM ga_data

The top line simply tells dbt to materialize the output as a physical table (default is a view) and in doing that select everything from our previously created dbo.ga_data table into a new stg.ga_data table. dbt uses Jinja templating language, making a dbt project an ideal programming environment for SQL. With Jinja, we can do transformations which are not typically possible in SQL, for example, using environment variables or macros to abstract snippets of SQL, which is analogous to functions in most programming languages. Whenever you see a {{ … }}, you’re already using Jinja.

To execute this model, we will simply issue ‘dbt run’ command (here with an optional parameter ‘–select staging’, denoting the name of the model we want to compile) and the output should tell us that we successfully created a staging version of our ga_data table.

Obviously, in order to build more complete analytics, we need to combine data from across multiple tables and data sources so let’s create another table called ‘ga_geo_ref_data’ containing latitude, longitude and display name values using Geopy Python library. Geopy makes it easy to locate the coordinates of addresses, cities, countries, and landmarks across the globe using third-party geocoders. This will provide us with an additional reference data which we will blend with the core ‘ga_data’ table/model and create a single, overarching dataset containing both: Google Analytics data and reference Geo data used to enrich it.

from pathlib import PureWindowsPath
import pyodbc
import pandas as pd
from geopy.geocoders import Nominatim

_SQL_SERVER_NAME = 'gademosqlserver2022.database.windows.net'
_SQL_DB = 'sourcedb'
_SQL_USERNAME = 'testusername'
_SQL_PASSWORD = 'MyV3ry$trongPa$$word'
_SQL_DRIVER = '{ODBC Driver 18 for SQL Server}'

geolocator = Nominatim(user_agent='testapp')


def enrich_with_geocoding_vals(row, val):
    loc = geolocator.geocode(row, exactly_one=True, timeout=10)
    if val == 'lat':
        if loc is None:
            return -1
        else:
            return loc.raw['lat']
    if val == 'lon':
        if loc is None:
            return -1
        else:
            return loc.raw['lon']
    if val == 'name':
        if loc is None:
            return 'Unknown'
        else:
            return loc.raw['display_name']
    else:
        pass


try:
    with pyodbc.connect('DRIVER='+_SQL_DRIVER+';SERVER='+_SQL_SERVER_NAME+';PORT=1433;DATABASE='+_SQL_DB+';UID='+_SQL_USERNAME+';PWD=' + _SQL_PASSWORD) as conn:
        with conn.cursor() as cursor:
            if not cursor.tables(table='ga_geo_ref_data', tableType='TABLE').fetchone():
                cursor.execute('''CREATE TABLE dbo.ga_geo_ref_data (ID INT IDENTITY (1,1),
                                                            Country NVARCHAR (256),
                                                            City NVARCHAR (256),
                                                            Latitude DECIMAL(12,8),
                                                            Longitude DECIMAL(12,8),
                                                            Display_Name NVARCHAR (1024))''')
            cursor.execute('TRUNCATE TABLE dbo.ga_geo_ref_data;')
            query = "SELECT country, city FROM dbo.ga_data WHERE city <> '' AND country <> '' GROUP BY country, city;"
            df = pd.read_sql(query, conn)
            df['latitude'] = df['city'].apply(
                enrich_with_geocoding_vals, val='lat')
            df['longitude'] = df['city'].apply(
                enrich_with_geocoding_vals, val='lon')
            df['display_name'] = df['city'].apply(
                enrich_with_geocoding_vals, val='name')
            for index, row in df.iterrows():
                cursor.execute('''INSERT INTO dbo.ga_geo_ref_data
                                    (Country,
                                    City,
                                    Latitude,
                                    Longitude,
                                    Display_Name)
                          values (?, ?, ?, ?, ?)''',
                               row[0], row[1], row[2], row[3], row[4])

            cursor.execute('SELECT TOP (1) 1 FROM dbo.ga_geo_ref_data')
            rows = cursor.fetchone()
            if rows:
                print('All Good!')
            else:
                raise ValueError(
                    'No data generated in the source table. Please troubleshoot!'
                )
except pyodbc.Error as ex:
    sqlstate = ex.args[1]
    print(sqlstate)

We will also materialize this table using the same technique we tested before and now we should be in a position to create our first data mart object, combining ga_geo_ref_data and ga_data into a single table.

This involves creating another SQL file, this time in our marts\google_analytics folder, and using the following query to blend these two data sets together.

{{ config(materialized='table') }}

SELECT ga.*, ref_ga.Latitude, ref_ga.Longitude, ref_ga.Display_Name 
FROM {{ ref('ga_data') }} ga
LEFT JOIN {{ ref('ga_geo_ref_data') }} ref_ga
ON ga.country = ref_ga.country 
AND ga.city = ref_ga.city

As with one of the previous queries, we’re using the familiar configuration syntax in the first line but there is also an additional reference configuration applied which uses the most important function in dbt – the ref() function. For building more complex models, ref() function is very handy as it allows us to refer to other models. ref() is, under the hood, actually doing two important things. First, it is interpolating the schema into our model file to allow us to change our deployment schema via configuration. Second, it is using these references between models to automatically build the dependency graph. This will enable dbt to deploy models in the correct order when using ‘dbt run’ command.

If we were to run this model as is, dbt would concatenate our default schema name (as expressed in the profiles.yml file) with the schema we would like to output it into. It’s a default behavior which we need to override using a macro. Therefore, to change the way dbt generates a schema name, we should add a macro named generate_schema_name to the project, where we can then define our own logic. We will place the following bit of code in the macros folder in our solution and define our custom schema name in the dbt_project.yml file as per below

{% macro generate_schema_name(custom_schema_name, node) -%}
    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}
        {{ default_schema }}
    {%- else -%}
        {{ custom_schema_name | trim }}
    {%- endif -%}
{%- endmacro %}
name: 'azure_sql_demo'
version: '1.0.0'
config-version: 2

# This setting configures which 'profile' dbt uses for this project.
profile: 'azure_sql_demo'

# These configurations specify where dbt should look for different types of files.
# The 'model-paths' config, for example, states that models in this project can be
# found in the 'models/' directory. You probably won't need to change these!
model-paths: ['models']
analysis-paths: ['analyses']
test-paths: ['tests']
seed-paths: ['seeds']
macro-paths: ['macros']
snapshot-paths: ['snapshots']

target-path: 'target'  # directory which will store compiled SQL files
clean-targets:         # directories to be removed by `dbt clean`
  - 'target'
  - 'dbt_packages'

# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
models:
  azure_sql_demo:
    staging:
      +materialized: view
      +schema: stg
    marts:
      +materialized: view
      +schema: mart

With everything in place, we can now save our SQL code into a file called ga_geo.sql and execute dbt to materialize it in a mart schema using ‘dbt run –select marts’ command. On model built completion, our first mart table should be persisted in the database as per the image below (click to enlarge).

Another great feature of dbt is the ability to create snapshots which is synonymous with the concept of Slowly Changing Dimensions (SCD) in data warehousing. Snapshots implement Type-2 SCD logic, identifying how a row in a table changes over time. If we were to issue an ALTER statement to our ‘ga_data’ table located in the staging schema and add one extra column denoting when the row was created or updated, we could track it’s history using a typical SCD-2 pattern i.e. expiring and watermarking rows which were changed or added using a combination of GUIDs and date attributes.

For this demo let’s execute the following SQL statement to alter our ga_data object, adding a new field called UpdatedAt with a default value of current system timestamp.

ALTER TABLE stg.ga_data
ADD UpdatedAt DATETIME DEFAULT SYSDATETIME()

Once we have our changes in place we can add the following SQL to our solution under the snapshots node and call it ga_data_snapshot.sql.

{% snapshot ga_data_snapshot %}
    {{
        config(
          target_schema = 'staging',
          unique_key = 'ID',
          strategy = 'check',
          check_col = 'all'
        )
    }}
    SELECT * FROM ga_data
{% endsnapshot %}

Next, running ‘dbt snapshot’ command a new table in the staging schema is created and a few additional attributes added to allow for SCD Type-2 tracking (click on image to enlarge).

Snapshots are a powerful feature in dbt that facilitate keeping track of our mutable data through time and generally, they’re as simple as creating any other type of model. This allows for very simple implementation of the SCD Type-2 pattern, with no complex MERGE or UPDATE and INSERT (upsert) SQL statements required.

DBT Tests

Testing data solutions has been notoriously difficult and data validation and QA has always been an afterthought. Best case scenario, third party applications had to be used to guarantee data conformance and minimal standards. Worst case, tests were not developed at all and the job of validating the final output fell on analysts or report developers, eyeballing dashboards before pushing them into production. In extreme cases, I even saw customers being delegated to the roles of unsuspected testers, having raising support tickets due to dashboards coming up empty.

In dbt, tests are assertions we make about the models and other resources in our dbt project. When we run dbt test, dbt will tell us if each test in our project passes or fails. There are two ways of defining tests in dbt:

  • A singular test is testing in its simplest form: If we can write a SQL query that returns failing rows, we can save that query in a .sql file within our test directory. It’s now a test, and it will be executed by the dbt test command
  • A generic test is a parametrized query that accepts arguments. The test query is defined in a special test block (similar to a macro). Once defined, you we reference the generic test by name throughout our .yml files — define it on models, columns, sources, snapshots, and seeds. dbt ships with four generic tests built in: unique, not_null, accepted_values and relationships

In our scenario, we will use generic tests to ensure that:

  • The id field on our ga_geo_ref_data is unique and does not contain any NULL values
  • The DeviceCategory attribute should only contain a list of accepted values

Test definitions are stored in our staging directory, in a yml file called ‘schema.yml’ and once we issue dbt test command, the following output is generated, denoting all test passed successfully.

DBT Docs

One of the great features of dbt is the fact we can easily generate a fully self-documenting solution, together with a lineage graph that helps easily navigate through the nodes and understand the hierarchy. dbt provides a way to generate documentation for our dbt project and render it as a website. The documentation includes the following:

  • Information about our project: including model code, a DAG of our project, any tests we’ve added to a column, and more
  • Information about our data warehouse: including column data types, and table sizes. This information is generated by running queries against the information schema

Running ‘dbt docs generate’ command instructs dbt to compiles all relevant information about our project and warehouse into manifest.json and catalog.json files. Next, executing ‘dbt docs serve’ starts a local web server (http://127.0.0.1:8080) and allows dbt to use these JSON files to generate a local website. We can see a representation of the project structure, a markdown description for a model, and a list of all of the columns (with documentation) in the model. Additionally, we can click the green button in the bottom-right corner of the webpage to expand a ‘mini-map’ of our DAG with, relevant lineage information (click on image to expand).

Conclusion

I barely scratched the surface of dbt can do to establish a full-fledged framework for data transformations using SQL and it looks like the company is not resting on its laurels, adding more features and partnering with other vendors. From my limited time with dbt, the key benefits that allow it to stand out in the sea of other tools include:

  • Quickly and easily provide clean, transformed data ready for analysis: dbt enables data analysts to custom-write transformations through SQL SELECT statements. There is no need to write boilerplate code. This makes data transformation accessible for analysts that don’t have extensive experience in other programming languages, as the initial learning curve is quite low
  • Apply software engineering practices—such as modular code, version control, testing, and continuous integration/continuous deployment (CI/CD)—to analytics code. Continuous integration means less time testing and quicker time to development, especially with dbt Cloud. You don’t need to push an entire repository when there are necessary changes to deploy, but rather just the components that change. You can test all the changes that have been made before deploying your code into production. dbt Cloud also has integration with GitHub for automation of your continuous integration pipelines, so you won’t need to manage your own orchestration, which simplifies the process
  • Build reusable and modular code using Jinja. dbt allows you to establish macros and integrate other functions outside of SQL’s capabilities for advanced use cases. Macros in Jinja are pieces of code that can be used multiple times. Instead of starting at the raw data with every analysis, analysts instead build up reusable data models that can be referenced in subsequent work
  • Maintain data documentation and definitions within dbt as they build and develop lineage graphs: Data documentation is accessible, easily updated, and allows you to deliver trusted data across the organization. dbt automatically generates documentation around descriptions, models dependencies, model SQL, sources, and tests. dbt creates lineage graphs of the data pipeline, providing transparency and visibility into what the data is describing, how it was produced, as well as how it maps to business logic
  • Perform simplified data refreshes within dbt Cloud: There is no need to host an orchestration tool when using dbt Cloud. It includes a feature that provides full autonomy with scheduling production refreshes at whatever cadence the business wants

Obviously, dbt is not a perfect solution and some of its current shortcomings include:

  • It covers only the T of ETL, so you will need separate tools to perform Extraction and Load
  • It’s SQL based; it might offer less readability compared with tools that have an interactive UI
  • Sometimes circumstances necessitate rewriting of macros used at the backend. Overriding this standard behavior of dbt requires knowledge and expertise in handling source code
  • Integrations with many (less popular) database engines are missing or handled by community-supported adapters e.g. Microsoft SQL Server, SQLite, MySQL

All in all, I really enjoyed the multitude of features dbt carries in its arsenal and understand why it’s become the favorite horse in the race to dominate minds and hearts of analytics and data engineers. It’s a breath of fresh air, as its main focus is SQL – a novel approach in the landscape dominated by Python and Scala, it runs in the cloud and on-premises and has good external vendors’ support. Additionally, it has some bells and whistles which typically involve integrating with other 3rd party tooling e.g. tests and docs. Finally, at its core, it’s an open-source product and as such, anyone can take it for a spin and start building extensible, modular and reusable ‘plumbing’ for your next project.

Tags: , , , , , , , , ,

Building rapid data import interfaces with PySimpleGUI and Streamlit

December 5th, 2022 / 3 Comments » / by admin

Note: All code from this post can be downloaded from my OneDrive folder HERE.

Introduction

The proliferation of low-code or no-code solutions for building simple apps has taken a solid foothold in the industry and a lot can be achieved with minimal effort and supporting code. As we enter an era of AI-enabled development, we’re seeing a lot of productivity gains from complex linguistic framework e.g. ChatGPT which can take requirements as input and generate comprehensive code in a matter of seconds. This creates an interesting conundrum – are developers automating themselves out of their jobs or are they making their work more enjoyable by ‘outsourcing’ the most mundane parts of their job to focus on what’s truly important – providing business value. It’s a discussion for a separate post but there’s no denying that the days of manually crafting application ‘scaffolding’ and reinventing the wheel are coming to an end as more of us lean towards using mobile, desktop or Web frameworks, ORMs, predefined libraries and packages or other adaptive software development approaches to expedite output delivery.

For those who need to provide a visual interface for either data entry or data output, there is a plethora of choices out there. Building a simple data entry form or a bare-bones site with a few widgets to allow end-users to interact with the content typically involves a combination of multiple technologies and languages but it’s surprising how much functionality can be achieved using some of the frameworks available in Python. In this post I’d like to explore how anyone can build a simple flat file data import interface for SQL Server. The need for this type of solution came out of multiple requests from non-technical clients needing to interface data sources familiar to them e.g. CSV, Excel with their internal database(s) and with a few button clicks upload/insert the required dataset to augment or change upstream reporting or analytics. It’s nothing cutting-edge and most visualisation tools provide the means to do a low-level data prep through pseudo ETL-like process e.g. Power BI Data Flows or Tableau Prep Builder but they still require technical expertise and the knowledge of underlying schemas and structures. Sometimes, all that’s required is a simple interface with a few widgets – that’s where frameworks like PySimpleGUI and Streamlit shine.

For this exercise, let’s look at a very simple interface for validating and loading flat file data into a database table. The objective is to provide end-users with an app which allows them to:

  • Authenticate to Microsoft SQL Server instance running on their network using either Windows Authentication or SQL Server authentication
  • Validate their supplied credentials
  • Search for the required file on their file system
  • Select a table this data needs to be loaded into
  • Run a series of validation steps and if no issues are detected, load the CSV file into the nominated table

Let’s dive in and see how quick and productive one can be using these two frameworks.

PySimpleGUI

Launched in 2018, PySimpleGUI is a python library that wraps tkinter, Qt (pyside2), wxPython and Remi (for browser support), allowing very fast and simple-to-learn GUI programming. Your code is not required to have an object-oriented architecture – one of the major obstacles when writing GUI applications – which makes the package usable by a larger audience. PySimpleGUI code is simpler and shorter than writing directly using the underlying framework because PySimpleGUI implements much of the “boilerplate code” for you. Additionally, interfaces are simplified to require as little code as possible (half to 1/10th) to get the desired result.

This app interface can be designed in many different ways but for this exercise, I have kept the functionality and output to minimum and when executing app.py file we get the following output.

PySimpleGUI allows for creating bespoke windows and layouts using lists – in this example I’ve defined three to account for the separation between database login-specific elements, file search-specific elements and text output elements. These three are then combined using the ‘layout’ list which also ‘draws’ a frame around each of them to make these three areas visually distinct. Next, we have a function responsible for all the heavy listing i.e. data validation and load and finally the main method binding it all together. I’m not going to go over PySimpleGUI API as it’s one of those projects which has a very comprehensive support documentation. It also features over 300 Demo Programs which provide many design patterns for you to learn how to use PySimpleGUI and how to integrate PySimpleGUI with other packages and a cookbook featuring recipes covering many different scenarios. It’s by far one of the best documented open-source projects I have came across. To pip-install it into you default Python interpreter or a virtual environment simply run one of the following lines:

pip install pysimplegui
or
pip3 install pysimplegui

The following short snippet of Python is responsible for most of this little app’s functionality, proving that minimal amount of code is required to build a complete interface which fulfills all requirements specyfied.

import PySimpleGUI as sg
import os
import csv
from pathlib import Path
import helpers as helpers


_WORKING_DIR = os.getcwd()
_SQL_TABLES = ['Test_Table1', 'Test_Table2', 'Test_Table3']
_LOAD_STORED_PROC = 'usp_load_ext_table'
_TARGET_TABLE_SCHEMA_NAME = 'dbo'

db_layout = [[sg.Text('Provide SQL Server instance details and credentials to authenticate.')],
             [sg.Text('Host Name ', size=(15, 1)), sg.Input(
                 key='-HOSTNAME-', default_text='192.168.153.128')],
             [sg.Text('Database Name ', size=(15, 1)),
              sg.Input(key='-DATABASE-', enable_events=True, default_text='TestDB')], [sg.Checkbox('Use Windows Authentication', enable_events=True, default=False, key='-USEWINAUTH-')],
             [sg.Text('User Name ', size=(15, 1), key='-USERNAMELABEL-'), sg.Input(
                 key='-USERNAME-', enable_events=True, default_text='test_login')],
             [sg.Text('Password ', size=(15, 1), key='-PASSWORDLABEL-'), sg.Input(
                 key='-PASSWORD-',  password_char='*',
                 enable_events=True, default_text='test_password')],
             [sg.Button('Validate Database Connection', key='-VALIDATE-'), sg.Text(
                 '--> This operation may take up to 1 minute', visible=True, key='_text_visible_')]
             ]

file_search_layout = [[sg.Text('Provide CSV file for upload and database table to load into.')],
                      [sg.InputText(key='-FILEPATH-', size=(55, 1)),
                       sg.FileBrowse(initial_folder=_WORKING_DIR, file_types=[("CSV Files", "*.csv")])],
                      [sg.Text('Select database table name to load text data.')],
                      [sg.Combo(_SQL_TABLES, size=(37), key='-TABLECHOICE-', readonly=True),
                      sg.Checkbox('Truncate before insert?', default=True, key='-TRUCATESOURCE-')]]


stdout_layout = [[sg.Multiline(size=(62, 10), key='-OUTPUT-')], [
    sg.Button('Validate and Submit', key='-SUBMIT-'),
    sg.Button('Clear Output', key='-CLEAR-')]]


layout = [[sg.Frame('1. Database Connection Details', db_layout,
                    title_color='blue', font='Any 12', pad=(15, 20))],
          [sg.Frame('2. File Upload Details', file_search_layout,
                    title_color='blue', font='Any 12', pad=(15, 20))],
          [sg.Frame('3. File Upload Output', stdout_layout,
                    title_color='blue', font='Any 12', pad=(15, 20))]]


def check_file_and_load(db_conn, full_path, table_dropdown_value, truncate_source_table_flag, _LOAD_STORED_PROC, _TARGET_TABLE_SCHEMA_NAME):
    file_name = Path(full_path).name
    file_path = Path(full_path).parent

    # check csv file is comma delimited
    window['-OUTPUT-'].print('Validating selected file is comma-delimited...', end='')
    with open(full_path, 'r', newline='') as f:
        reader = csv.reader(f, delimiter=",")
        dialect = csv.Sniffer().sniff(f.read(1024))
    if dialect.delimiter != ',':
        window['-OUTPUT-'].print('Failed!', text_color='red')
        return
    else:
        window['-OUTPUT-'].print('OK!')

    # check csv file is not empty
    window['-OUTPUT-'].print('Validating selected file is not empty...', end='')
    with open(full_path, 'r', newline='') as f:
        reader = csv.reader(f, dialect)
        ncols = len(next(reader))
        f.seek(0)
        nrow = len(list(reader))
    if nrow < 2:
        window['-OUTPUT-'].print('Failed!', text_color='red')
        return
    else:
        window['-OUTPUT-'].print('OK!')

    # check for database connection values correctness
    window['-OUTPUT-'].print(
        'Validating database connection details are correct...', end='')
    conn = db_conn(HOSTNAME=values['-HOSTNAME-'], DATABASE=values['-DATABASE-'],
                   USERNAME=values['-USERNAME-'],   PASSWORD=values['-PASSWORD-'], USEWINAUTH=values['-USEWINAUTH-'])
    if conn:
        window['-OUTPUT-'].print('OK!')
    else:
        window['-OUTPUT-'].print('Failed!', text_color='red')

    # check required schema exists on the target server
    window['-OUTPUT-'].print('Validating target schema existance...', end='')
    with conn.cursor() as cursor:
        sql = """SELECT TOP (1) 1 FROM {target_db}.sys.schemas 
                        WHERE name = '{target_schema}'""".format(target_db=values['-DATABASE-'], target_schema = _TARGET_TABLE_SCHEMA_NAME)
        cursor.execute(sql)
        rows = cursor.fetchone()
    if rows:
        window['-OUTPUT-'].print('OK!')
    else:
        window['-OUTPUT-'].print('Failed!', text_color='red')
        return

    # check selected table exists on the target server
    window['-OUTPUT-'].print('Validating target table existance...', end='')
    with conn.cursor() as cursor:
        sql = """SELECT TOP (1) 1 FROM {target_db}.information_schema.tables 
                        WHERE table_name = '{target_table}'""".format(target_db=values['-DATABASE-'], target_table=table_dropdown_value)
        cursor.execute(sql)
        rows = cursor.fetchone()
    if rows:
        window['-OUTPUT-'].print('OK!')
    else:
        window['-OUTPUT-'].print('Failed!', text_color='red')
        return

    # check if insert sored proc exists
    window['-OUTPUT-'].print("Validating loading stored procedure exists...", end='')
    with conn.cursor() as cursor:
        sql = """SELECT TOP (1) 1 FROM {target_db}.sys.objects WHERE type = 'P' AND OBJECT_ID = OBJECT_ID('dbo.usp_load_ext_table')""".format(
            target_db=values['-DATABASE-'])
        cursor.execute(sql)
        sp = cursor.fetchone()
    if sp:
        window['-OUTPUT-'].print('OK!')
    else:
        window['-OUTPUT-'].print('Failed!', text_color='red')
        return

    # check if number of columns matches
    window['-OUTPUT-'].print('Validating columns number match...', end='')
    with conn.cursor() as cursor:
        sql = """SELECT COUNT(1) FROM {target_db}.information_schema.columns 
                        WHERE table_name = '{target_table}'""".format(target_db=values['-DATABASE-'], target_table=table_dropdown_value)
        cursor.execute(sql)
        dbcols = cursor.fetchone()[0]
    if dbcols == ncols:
        window['-OUTPUT-'].print('OK!')
    else:
        window['-OUTPUT-'].print('Failed!', text_color='red')
        return

    window['-OUTPUT-'].print('Attempting to load {csv_file} file into {target_table} table...'.format(
        csv_file=file_name.lower(), target_table=table_dropdown_value.lower()), end='')
    with conn.cursor() as cursor:
        sql = '''\
                DECLARE @col_names VARCHAR (MAX); 
                EXEC TestDB.dbo.{sp_name} 
                @temp_target_table_name=?,
                @target_table_name=?,
                @target_table_schema_name=?, 
                @file_path=?, 
                @truncate_target_table=?,
                @col_names = @col_names OUTPUT; 
                SELECT @col_names AS col_names
                '''.format(sp_name=_LOAD_STORED_PROC)
        params = ('##'+table_dropdown_value, table_dropdown_value,
                  _TARGET_TABLE_SCHEMA_NAME, full_path, truncate_source_table_flag)         
        cursor.execute(sql, params)
        col_names = cursor.fetchval()

        if truncate_source_table_flag == 1:
            test_1_sql = 'SELECT TOP (1) 1 FROM (SELECT {cols} FROM {temp_target_table_name} EXCEPT SELECT {cols} FROM {target_table_schema_name}.{target_table_name})a'.format(
                temp_target_table_name='##'+table_dropdown_value, target_table_schema_name=_TARGET_TABLE_SCHEMA_NAME, target_table_name=table_dropdown_value, cols=col_names)
            cursor.execute(test_1_sql)
            test_1_rows = cursor.fetchone()
            test_2_sql = 'SELECT TOP (1) status FROM {temp_target_table_name}'.format(
                temp_target_table_name='##'+table_dropdown_value)
            cursor.execute(test_2_sql)
            test_2_rows = cursor.fetchone()[0]
            if test_1_rows and test_2_rows != 'SUCCESS':
                window['-OUTPUT-'].print('Failed!', text_color='red')
            else:
                window['-OUTPUT-'].print('OK!')
                return
        elif truncate_source_table_flag == 0:
            test_1_sql = 'SELECT COUNT(1) as ct FROM (SELECT {cols} FROM {temp_target_table_name} INTERSECT SELECT {cols} FROM {target_table_schema_name}.{target_table_name})a'.format(
                temp_target_table_name='##'+table_dropdown_value, target_table_schema_name=_TARGET_TABLE_SCHEMA_NAME, target_table_name=table_dropdown_value, cols=col_names)
            cursor.execute(test_1_sql)
            test_1_rows = cursor.fetchone()[0]
            test_2_sql = 'SELECT TOP (1) status FROM {temp_target_table_name}'.format(
                temp_target_table_name='##'+table_dropdown_value)
            cursor.execute(test_2_sql)
            test_2_rows = cursor.fetchone()[0]
            if nrow-1 != test_1_rows or test_2_rows != 'SUCCESS':
                window['-OUTPUT-'].print('Failed!', text_color='red')
            else:
                window['-OUTPUT-'].print('OK!')
                return
        

window = sg.Window('File Upload Utility version 1.01', layout)
while True:
    event, values = window.read()
    if event == sg.WIN_CLOSED or event == 'Exit':
        break
    elif '-USEWINAUTH-' in event:
        win_auth_setting = values['-USEWINAUTH-']
        if win_auth_setting == True:
            window['-USERNAMELABEL-'].Update(visible=False)
            window['-PASSWORDLABEL-'].Update(visible=False)
            window['-USERNAME-'].Update(visible=False)
            window['-PASSWORD-'].Update(visible=False)
        elif win_auth_setting == False:
            window['-USERNAMELABEL-'].Update(visible=True)
            window['-PASSWORDLABEL-'].Update(visible=True)
            window['-USERNAME-'].Update(visible=True)
            window['-PASSWORD-'].Update(visible=True)

    elif '-VALIDATE-' in event:
        validation = helpers.validate_db_input_values(values)
        if validation:
            error_msg = ('\nInvalid: ' + value for value in validation)
            error_message = helpers.generate_error_message(validation)
            sg.popup(error_message, keep_on_top=True)
        else:
            HOSTNAME = values['-HOSTNAME-'],
            DATABASE = values['-DATABASE-'],
            USERNAME = values['-USERNAME-'],
            PASSWORD = values['-PASSWORD-'],
            USEWINAUTH = values['-USEWINAUTH-']
            conn = helpers.db_conn(HOSTNAME=values['-HOSTNAME-'], DATABASE=values['-DATABASE-'],
                                USERNAME=values['-USERNAME-'],   PASSWORD=values['-PASSWORD-'], USEWINAUTH=values['-USEWINAUTH-'])
            if conn:
                sg.popup('Supplied credentails are valid!',
                        keep_on_top=True, title='')
            else:
                sg.popup('Supplied credentails are invalid or there is a network connection issue.',
                        keep_on_top=True, title='', button_color='red')
    elif '-SUBMIT-' in event:
        validation = helpers.validate_file_input_values(values)
        if validation:
            error_msg = ('\nInvalid: ' + value for value in validation)
            error_message = helpers.generate_error_message(validation)
            sg.popup(error_message, keep_on_top=True)
        else:
            truncate_source_table_flag = bool(values['-TRUCATESOURCE-'])
            table_dropdown_value = values['-TABLECHOICE-']
            window['-OUTPUT-'].update('')
            full_path = str(Path(values['-FILEPATH-']))
            file_name = Path(full_path).name
            file_path = Path(full_path).parent
            db_conn = helpers.db_conn
            check_file_and_load(db_conn,
                                full_path, table_dropdown_value, truncate_source_table_flag, _LOAD_STORED_PROC, _TARGET_TABLE_SCHEMA_NAME)
    elif '-CLEAR-' in event:
        window['-OUTPUT-'].update('')
window.close()

The main app.py file also import from the helper module which contains functions used for input validation, database connection and error message generation. You can find all the code used for this solution in my OneDrive folder HERE.

Finally, tying it all together is a bit of T-SQL wrapped around a stored procedure. This code takes five parameters as input values and outputs one value into the Python code defining the names of the columns of our target database table (used in Python code validation). Its primary function is to:

  • Create a global temporary table mirrored on the schema of the target table
  • Using BULK INSERT operation insert our flat file’s data into it
  • Optionally truncate target object and insert the required data from the temp table into the destination table
  • Alter temp table with the operation status i.e. Failure of Success (used in Python validation code)

The reason why a global temporary table is created and loaded into first is that we want to ensure that the CSV file’s data and schema is conforming to what the target table structure is before we make any changes to it. Having this step in place allows for an extra level of validation – if the temporary table insert fails, target table insertion code is not triggered at all. Likewise, if the temporary table with the schema identical to that of the destination table is created and populated successfully, we can be confident that the target table’s data can be purged (as denoted by one of the parameter’s value) and loaded into without any issues. The full T-SQL code is as follows:

USE [TestDB];
GO
SET ANSI_NULLS ON;
GO
SET QUOTED_IDENTIFIER ON;
GO
CREATE PROCEDURE [dbo].[usp_load_ext_table]
(
    @temp_target_table_name VARCHAR(256),
    @target_table_name VARCHAR(256),
    @target_table_schema_name VARCHAR(256),
    @file_path VARCHAR(1024),
    @truncate_target_table BIT,
    @col_names VARCHAR(MAX) OUTPUT
)
AS
BEGIN
    SET NOCOUNT ON;
    DECLARE @cols VARCHAR(MAX);
    DECLARE @sql VARCHAR(MAX);
    DECLARE @error_message VARCHAR(MAX);

    SELECT @sql
        = 'DROP TABLE IF EXISTS ' + @temp_target_table_name + '; CREATE TABLE ' + @temp_target_table_name + ' ('
          + o.list + ')',
           @cols = j.list
    FROM sys.tables t
        CROSS APPLY
    (
        SELECT STUFF(
               (
                   SELECT ',' + QUOTENAME(c.COLUMN_NAME) + ' ' + c.DATA_TYPE
                          + CASE c.DATA_TYPE
                                WHEN 'sql_variant' THEN
                                    ''
                                WHEN 'text' THEN
                                    ''
                                WHEN 'ntext' THEN
                                    ''
                                WHEN 'xml' THEN
                                    ''
                                WHEN 'decimal' THEN
                                    '(' + CAST(c.NUMERIC_PRECISION AS VARCHAR) + ', '
                                    + CAST(c.NUMERIC_SCALE AS VARCHAR) + ')'
                                ELSE
                                    COALESCE(   '(' + CASE
                                                          WHEN c.CHARACTER_MAXIMUM_LENGTH = -1 THEN
                                                              'MAX'
                                                          ELSE
                                                              CAST(c.CHARACTER_MAXIMUM_LENGTH AS VARCHAR)
                                                      END + ')',
                                                ''
                                            )
                            END
                   FROM INFORMATION_SCHEMA.COLUMNS c
                       JOIN sysobjects o
                           ON c.TABLE_NAME = o.name
                   WHERE c.TABLE_NAME = @target_table_name
                         AND TABLE_NAME = t.name
                   ORDER BY ORDINAL_POSITION
                   FOR XML PATH('')
               ),
               1,
               1,
               ''
                    )
    ) o(list)
        CROSS APPLY
    (
        SELECT STUFF(
               (
                   SELECT ',' + QUOTENAME(c.COLUMN_NAME)
                   FROM INFORMATION_SCHEMA.COLUMNS c
                       JOIN sysobjects o
                           ON c.TABLE_NAME = o.name
                   WHERE c.TABLE_NAME = @target_table_name
                   ORDER BY ORDINAL_POSITION
                   FOR XML PATH('')
               ),
               1,
               1,
               ''
                    )
    ) j(list)
    WHERE t.name = @target_table_name;

    EXEC (@sql);
    SET @col_names = @cols;

    IF NOT EXISTS
    (
        SELECT *
        FROM tempdb.INFORMATION_SCHEMA.TABLES
        WHERE TABLE_NAME = @temp_target_table_name
    )
    BEGIN
        SET @error_message
            = 'Global temporary placeholder table has not been successfully created in the tempdb database. Please troubleshoot.';
        RAISERROR(   @error_message, -- Message text.
                     16,             -- Severity.
                     1               -- State.
                 );
        RETURN;
    END;

    SET @sql
        = '
			BULK INSERT ' + @temp_target_table_name + '
			FROM ' + QUOTENAME(@file_path, '''')
          + '
			WITH
			(
				FIRSTROW = 2,
				FIELDTERMINATOR = '','',
				ROWTERMINATOR = ''\n'',
				MAXERRORS=0,
				TABLOCK
			)';
    EXEC (@sql);

    SET @sql = CASE
                   WHEN @truncate_target_table = 1 THEN
                       'TRUNCATE TABLE ' + @target_table_schema_name + '.' + @target_table_name + '; '
                   ELSE
                       ''
               END + 'INSERT INTO ' + @target_table_schema_name + '.' + @target_table_name + ' (' + @cols + ') ';
    SET @sql = @sql + 'SELECT ' + @cols + ' FROM ' + @temp_target_table_name + '';
    BEGIN TRANSACTION;
    BEGIN TRY
        EXEC (@sql);
        SET @sql
            = 'ALTER TABLE ' + @temp_target_table_name + ' ADD status VARCHAR(56) DEFAULT ''FAILURE'', ' + CHAR(13);
        SET @sql = @sql + 'status_message varchar(256) NULL; ' + CHAR(13);
        EXEC (@sql);
        SET @sql = 'UPDATE ' + @temp_target_table_name + ' SET status = ''SUCCESS'', ' + CHAR(13);
        SET @sql = @sql + 'status_message = ''Operation executed successfully!''' + CHAR(13);
        EXEC (@sql);
    END TRY
    BEGIN CATCH
        SET @error_message = ERROR_MESSAGE();
        SET @sql
            = 'ALTER TABLE ' + @temp_target_table_name + ' ADD outcome varchar (56) DEFAULT ''FAILURE'',' + CHAR(13);
        SET @sql = @sql + 'outcome_message varchar(256) NULL; ' + CHAR(13);
        EXEC (@sql);
        SET @sql = 'UPDATE ' + @temp_target_table_name + ' SET outcome_message = ''' + @error_message + '''';
        EXEC (@sql);
        IF @@TRANCOUNT > 0
            ROLLBACK TRANSACTION;
    END CATCH;

    IF @@TRANCOUNT > 0
        COMMIT TRANSACTION;
END;

One drawback of this code is that it assumes the nominated flat file is located on the same server as our SQL Server instance – the above SQL code uses T-SQL BULK INSERT operation which expects access to the underlying file system and the file itself. This limitation can be alleviated by reading the file’s content into a dataframe and populating the target table using, for example, Python’s Pandas library instead. However, in this instance, I expected end users to have access to the system hosting MSSQL instance so all ‘heavy lifting’ i.e. data insertion is done using pure T-SQL.

Streamlit

As desktop GUI apps only work in certain scenarios, leveraging the previously created code (both T-SQL and Python) we can build a Web front end instead. Streamlit, an open-source Python app framework, allows anyone to transform scripts into sharable web apps, all without the prior knowledge of CSS, HTML or JavaScript. It’s a purely Python-based framework (though some customizations can be achieved with a sprinkling of HTML), claimed to be used by over 80% of Fortune 50 companies and with the backing of Snowflake (which recently acquired it for $800 million), is set to democratize access to data. Streamlit boasts a large gallery of apps, most of them with source code, so it’s relatively easy to explore its capabilities and poke around ‘under the hood’. It also offers Community Cloud where one can deploy, manage, and share apps with the world, directly from Streamlit — all for free. To pip-install it into you default Python interpreter or a virtual environment simply run the following:

pip install streamlit

To spin up a local web server and run the app in your default web browser you can run the following from the command line:

streamlit run your_script.py [-- script args]

For this Streamlit app, I tried to keep the interface layout largely unchanged from our previous PySimpleGUI app even though I was tempted to add a bit of eye-candy to the page – Streamlit has a pretty good integrations with some of the major plotting and visualization Python packages so adding a widget or two would be very easy and make the whole app a lot more appealing. The draft version of the app is as per the image below.

Most of the code changes relate to how Streamlit API works, and the components required to replicate the GUI app functionality. This means that any Python code handling the logic was mostly left unchanged – a testament to how easy both Streamlit and PySimpleGUI are to get up and running with. The below snippet of Python is used to build this tiny Streamlit app.

import streamlit as st
import tempfile
import csv
import os
from pathlib import Path
import helpers as helpers

_SQL_TABLES = ['Test_Table1', 'Test_Table2', 'Test_Table3']
_LOAD_STORED_PROC = 'usp_load_ext_table'
_TARGET_TABLE_SCHEMA_NAME = 'dbo'

st.header('Database Connection Details')
col1, col2 = st.columns([1, 1])
host_name = col1.text_input('Host Name', value='192.168.153.128')
db_name = col1.text_input('Database Name', value='TestDB')
win_auth = col1.checkbox('Use Windows Authentication', value=False)
validate = col1.button('Validate Database Connection')


user_name = col2.text_input('User Name', value='test_login')
password = col2.text_input('Password', type='password', value='test_password')

if validate:
    validation = helpers.validate_db_input_values(
        host_name, db_name, user_name, password)
    if validation:
        error_message = helpers.generate_error_message(validation)
        st.error(error_message)
    else:
        conn = helpers.db_conn(HOSTNAME=host_name, DATABASE=db_name,
                               USERNAME=user_name,   PASSWORD=password, USEWINAUTH=win_auth)
        if conn:
            st.success('Supplied credentails are valid.')
        else:
            st.error(
                'Supplied credentials are invalid or there is a network connection issue!')


st.header('File Upload Details')
uploaded_file = st.file_uploader(
    label='Provide CSV file for upload', type=['.csv'])
table_dropdown_value = st.selectbox(
    'Select database table name to load text data', options=_SQL_TABLES)
truncate_source_table = st.checkbox('Truncate source table?', value=True)

st.header('File Upload Output')
logtxtbox = st.empty()
logtxt = ''
logtxtbox.text_area("Log: ", logtxt, height=200, key='fdgfgjjj')
upload_status = st.button('Validate and Submit')


def check_file_and_load(db_conn, full_path, table_dropdown_value, truncate_source_table_flag, _LOAD_STORED_PROC, _TARGET_TABLE_SCHEMA_NAME):
    logtxt = 'Validating selected file is comma-delimited...'
    logtxtbox.text_area("Log: ", logtxt, height=200)

    # check csv file is comma delimited
    with open(full_path, 'r', newline='') as f:
        reader = csv.reader(f, delimiter=",")
        dialect = csv.Sniffer().sniff(f.read(1024))
    if dialect.delimiter != ',':
        logtxt = ''.join((logtxt, 'Failed!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
        return
    else:
        logtxt = ''.join((logtxt, 'OK!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)

    # check csv file is not empty
    logtxt = ''.join((logtxt, 'Validating selected file is not empty...'))
    with open(full_path, 'r', newline='') as f:
        reader = csv.reader(f, dialect)
        ncols = len(next(reader))
        f.seek(0)
        nrow = len(list(reader))
    if nrow < 2:
        logtxt = ''.join((logtxt, 'Failed!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
        return
    else:
        logtxt = ''.join((logtxt, 'OK!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)

    # check for database connection values correctness
    logtxt = ''.join(
        (logtxt, 'Validating database connection details are correct...'))
    conn = db_conn(host_name, db_name, user_name, password, win_auth)
    if conn:
        logtxt = ''.join((logtxt, 'OK!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
    else:
        logtxt = ''.join((logtxt, 'Failed!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
        return

    # check required schema exists on the target server
    logtxt = ''.join((logtxt, 'Validating target schema existance...'))
    with conn.cursor() as cursor:
        sql = """SELECT TOP (1) 1 FROM {target_db}.sys.schemas 
                        WHERE name = '{target_schema}'""".format(target_db=db_name, target_schema=_TARGET_TABLE_SCHEMA_NAME)
        cursor.execute(sql)
        rows = cursor.fetchone()
    if rows:
        logtxt = ''.join((logtxt, 'OK!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
    else:
        logtxt = ''.join((logtxt, 'Failed!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
        return

    # check selected table exists on the target server
    logtxt = ''.join((logtxt, 'Validating target table existance...'))
    with conn.cursor() as cursor:
        sql = """SELECT TOP (1) 1 FROM {target_db}.information_schema.tables 
                        WHERE table_name = '{target_table}'""".format(target_db=db_name, target_table=table_dropdown_value)
        cursor.execute(sql)
        rows = cursor.fetchone()
    if rows:
        logtxt = ''.join((logtxt, 'OK!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
    else:
        logtxt = ''.join((logtxt, 'Failed!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
        return

    # check if insert sored proc exists
    logtxt = ''.join((logtxt, 'Validating loading stored procedure exists...'))
    with conn.cursor() as cursor:
        sql = """SELECT TOP (1) 1 FROM {target_db}.sys.objects WHERE type = 'P' AND OBJECT_ID = OBJECT_ID('dbo.usp_load_ext_table')""".format(
            target_db=db_name)
        cursor.execute(sql)
        sp = cursor.fetchone()
    if sp:
        logtxt = ''.join((logtxt, 'OK!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
    else:
        logtxt = ''.join((logtxt, 'Failed!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
        return

    # check if number of columns matches
    logtxt = ''.join((logtxt, 'Validating columns number match...'))
    with conn.cursor() as cursor:
        sql = """SELECT COUNT(1) FROM {target_db}.information_schema.columns 
                        WHERE table_name = '{target_table}'""".format(target_db=db_name, target_table=table_dropdown_value)
        cursor.execute(sql)
        dbcols = cursor.fetchone()[0]
    if dbcols == ncols:
        logtxt = ''.join((logtxt, 'OK!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
    else:
        logtxt = ''.join((logtxt, 'Failed!\n'))
        logtxtbox.text_area('Log: ', logtxt, height=200)
        return
    logtxt = ''.join((logtxt, 'Attempting to load...'))
    with conn.cursor() as cursor:
        sql = '''\
                DECLARE @col_names VARCHAR (MAX); 
                EXEC TestDB.dbo.{sp_name} 
                @temp_target_table_name=?,
                @target_table_name=?,
                @target_table_schema_name=?, 
                @file_path=?, 
                @truncate_target_table=?,
                @col_names = @col_names OUTPUT; 
                SELECT @col_names AS col_names
                '''.format(sp_name=_LOAD_STORED_PROC)
        params = ('##'+table_dropdown_value, table_dropdown_value,
                  _TARGET_TABLE_SCHEMA_NAME, full_path, truncate_source_table_flag)
        cursor.execute(sql, params)
        col_names = cursor.fetchval()

        if truncate_source_table_flag == 1:
            test_1_sql = 'SELECT TOP (1) 1 FROM (SELECT {cols} FROM {temp_target_table_name} EXCEPT SELECT {cols} FROM {target_table_schema_name}.{target_table_name})a'.format(
                temp_target_table_name='##'+table_dropdown_value, target_table_schema_name=_TARGET_TABLE_SCHEMA_NAME, target_table_name=table_dropdown_value, cols=col_names)
            cursor.execute(test_1_sql)
            test_1_rows = cursor.fetchone()
            test_2_sql = 'SELECT TOP (1) status FROM {temp_target_table_name}'.format(
                temp_target_table_name='##'+table_dropdown_value)
            cursor.execute(test_2_sql)
            test_2_rows = cursor.fetchone()[0]
            if test_1_rows and test_2_rows != 'SUCCESS':
                st.error('File failed to load successfuly, please troubleshoot!')
                return
            else:
                st.success('File loaded successfully!')
        elif truncate_source_table_flag == 0:
            test_1_sql = 'SELECT COUNT(1) as ct FROM (SELECT {cols} FROM {temp_target_table_name} INTERSECT SELECT {cols} FROM {target_table_schema_name}.{target_table_name})a'.format(
                temp_target_table_name='##'+table_dropdown_value, target_table_schema_name=_TARGET_TABLE_SCHEMA_NAME, target_table_name=table_dropdown_value, cols=col_names)
            cursor.execute(test_1_sql)
            test_1_rows = cursor.fetchone()[0]
            test_2_sql = 'SELECT TOP (1) status FROM {temp_target_table_name}'.format(
                temp_target_table_name='##'+table_dropdown_value)
            cursor.execute(test_2_sql)
            test_2_rows = cursor.fetchone()[0]
            if nrow-1 != test_1_rows or test_2_rows != 'SUCCESS':
                st.error('File failed to load successfuly, please troubleshoot!')
                return
            else:
                st.success('File loaded successfully!')


if upload_status:
    validation = helpers.validate_db_input_values(
        host_name, db_name, user_name, password)
    if validation:
        error_message = helpers.generate_error_message(validation)
        st.error(error_message)
    if uploaded_file is not None:
        with tempfile.NamedTemporaryFile(delete=False, dir='.', suffix='.csv') as f:
            f.write(uploaded_file.getbuffer())
            fp = Path(f.name)
            full_path = str(Path(f.name))
            file_name = Path(full_path).name
            file_path = Path(full_path).parent
        db_conn = helpers.db_conn
        check_file_and_load(db_conn, full_path, table_dropdown_value,
                            truncate_source_table, _LOAD_STORED_PROC, _TARGET_TABLE_SCHEMA_NAME)
    else:
        st.error('Please select at least one flat file for upload!')

Conclusion

Python has brought a large number of people into the programming community. The number of programs and the range of areas it touches is mindboggling. But more often than not, these technologies are out of reach of all but a handful of people. Most Python programs are “command line” based. This isn’t a problem for programmer-types as we’re all used to interacting with computers through a text interface. While programmers don’t have a problem with command-line interfaces, most “normal people” do. This creates a digital divide, a “GUI Gap”. Adding a GUI or a Web front end to a program opens that program up to a wider audience as it instantly becomes more approachable. Visual interfaces can also make interacting with some programs easier, even for those that are comfortable with a command-line interface.

Both PySimpleGUI and Streamlit provide an easy entry barrier for anyone, even Python novices, and democratise Web and GUI development. For simple projects, these frameworks allow citizen developers to rapidly create value and solve business problems without needing to understand complex technologies and tools. And whilst more critical LOB (Line of Business) apps will demand the use of more advanced technologies in foreseeable future, for small projects requiring simple interfaces and business logic, PySimpleGUI and Streamlit provide a ton of immediate value. Alternatively, one can go even further down the simplification route and try something like Gooey which conveniently converts (almost) any Python 3 console program into a GUI application with a single line of code!

Tags: , , , , ,