Designing data acquisition framework in SQL Server and SSIS – how to source and integrate external data for a decision support system or data warehouse (Part 1)

May 20th, 2016 / No Comments » / by admin

Note: Part 2 to this series can be found HERE, Part 3 HERE, Part 4 HERE and all the code and additional files for this post can be downloaded from my OneDrive folder HERE

Introduction

There is a lot of literature and Internet resources on the subject of data warehouse and decision support systems architecture, including considerations for Kimbal vs Inmon approach, data storage and management systems vendor options, RDBMS vs NoSQL arguments etc., however, in a typical small-to-medium enterprise environment, the first step to creating a data warehouse is designing a data acquisition job to move the source data into a staging area. Providing the most prevalent approach to data warehouse design is employed i.e. no near-real time or streaming architecture is required, the staging server/database, storing a copy of a transactional system data for further processing and transformations is created and populated in the first instance.

Most of the time, sourcing transactional data and placing its copy on the staging database simply involves a full or delta copy from the operational system(s) data into the staging database. There is typically no schema denormalisation involved at this stage but data cleansing routines can be employed to make data cleaner and conforming to business definitions e.g. missing values substitution, data types conversion, de-duplication etc. Sometimes, certain degree of ‘pruning’ may be employed to separate redundant and information-poor data from data which can be turned into insight, thus competitive advantage. Also, since the advent of cloud providers/services, with their huge on-demand and cost-competitive processing and storage capabilities, ELT (extract, load and transform), rather than ETL (extract, transform and load) approach may be more applicable for some scenarios. These may include dealing with large volumes of data e.g. generated by a variety of dispersed systems such as IoT devices or operating on a database engine designed for fast, high-concurrency data processing e.g. massively parallel processing (MPP) engine. Therefore, depending on how much you would like to massage the data before it finds its way into the landing/staging area, data acquisition can either become as simple as a like-for-like, source-to-target copying or as complex as an intricate collection of transformations, mostly to deal with data quality and deluge issues.

To demonstrate a sample workflow for a data acquisition job of moderate complexity and data volume I will describe a sample SQL Server Integration Services (SSIS) package which anyone versed enough in T-SQL and SSIS can replicate and modify according to the project and business needs. This package has been ‘taken out’ of one of my previous client’s environment and can serve as a template for sourcing transactional data into a staging database for further processing and massaging. To make this example more akin to a typical business scenario and more flexible for future reuse I have deliberately assumed the following:

  • The source data is running outside local network on a database supported by vendor other than Microsoft i.e. MySQL therefore specific data incompatibilities e.g. data types, numeric precisions, character maximum lengths etc. are likely to occur and need to be rectified automatically in the process of acquisition. For the sake of completeness, I will also include the code altered version for SQL Server-to-SQL Server data acquisition
  • The source database schema is under constant development so target database, where the acquired data is stored, needs to be adjusted automatically. Alterations such as schema changes for existing tables e.g. column names, data types, numeric precision and scale etc. need to be reconciled without developers’ intervention as part of the pre-acquisition tasks
  • In case any connectivity issues occur, the job will wait for a predefined period of time in a loop also executed a predefined number of times before reporting failed connectivity status
  • Any errors raised need to be logged and stored for reference but halting the entire process should not be the default behaviour in case of a single table failure. When exception is raised, the process should not stop, but rather gracefully log the error details and continue to synchronise the remaining objects
  • In case of any issues encountered, we need the administrator(s) to be notified
  • Any indexes will be dealt with as needed i.e. dropped/recreated, reorganised etc. Statistics will also be refreshed at the end of the process
  • Some source data (small tables) can be merged and others (larger tables) require source truncation and full table being copied across (no row per row comparison)
  • We should be able to ‘turn on’ and ‘turn off’ which tables and which columns from each table will be brought across e.g. some may contain irrelevant or sensitive data which is not required to be copied across
  • At the job completion we will have some rudimentary checks to compare source to target data e.g. record count for each table and check for any errors logged

Conceptually, the acquisition process and its core components can be depicted as per the image below.

Data_Acquisition_Framework_Part1_HighLevel_Architecture_Diagram

 

At a lower level, this framework blueprint will become much more involved as there is quite a bit of code to account for each of the step’s functionality, however, at a higher level, all tasks involved can be roughly divided into three categories.

  • Pre-acquisition tasks – activities which facilitate subsequent data coping e.g. source server availability checking, schema modifications check, pre-load indexes management etc.
  • Acquisition tasks – tasks which are directly responsible for source-to-target data coping
  • Post-acquisition tasks – activities which ensure post-load validation e.g. statistics refresh, indexes re-creation/rebuild/re-organisation, error logs check etc.

Transactional systems data sourcing and staging can be as straightforward as simply selecting source data and inserting it into pre-created table, however, it is always prudent to assume that, for example, changes to the source or target data/schema/environment will not always be communicated or that source system will not always be available for querying and take measures to prevent the process from falling over. From my experience, developers are not always diligent about relying database changes information up-steam and on a lot of occasions I have witnessed even larger modifications being dismissed as not having any impact on the decision support systems, sometimes resulting in business being deprived of data for days or longer. To prevent situations where data could not be sourced reliably, it is always better to assume the worst and hope for the best so in the spirit of following best practice standards I will break this post into four parts, each dealing with their respective phases of development process i.e.

  • Building the supporting scaffolding i.e. creating support databases and database objects, setting up linked server connection to the source data etc. – this post
  • Pre-acquisition activities e.g. source server availability checking, schema modifications check etc. as well as large tables acquisition code development and overview – Part 2
  • Post-acquisition activities e.g. statistics refresh, error log check etc. as well as small tables acquisition code development and overview – Part 3
  • SSIS package structure and final conclusion – Part 4

As previously mentioned, the acquisition package (template) this blog series describes is logically comprised of three sections: pre-acquisition activities, data acquisition tasks and post-acquisition activities. At a high level, the package control flow may look as per image below.

Data_Acquisition_Framework_Part1_PseudoPackage_Workflow_View

Please note that this template, along with its individual tasks is only a guide and if any of the steps are not applicable or are excluded and should be added to conform to technical requirements, it should be fairly straightforward to alter it with little effort. This post will deal with the first step in this process i.e. creating all auxiliary structure to support further code and package development outlined in part 2, part 3 and part 4.

Environment and Supporting Objects Setup

Let’s begin by setting the stage to the rest of this series and create all necessary scaffolding i.e. staging database, control database and AdminDBA database (more on that later), linked server to the source database etc.

Firstly, let’s create two databases – ControlDB and StagingDB – and the associated objects/data. StagingDB database will simply act as a local copy of the source data. Control database, on the other hand, will hold tables controlling data acquisition objects metadata e.g. tables and fields exceptions in case we want to exclude certain columns from the process, indexes names in case we want to drop and rebuild them, information on whether the source table is large or small (this dependency will trigger different acquisition process), notification recipients’ e-mail addresses etc. One can omit creating control database and circumvent dealing with this metadata by hard-coding it into the stored procedures directly, however, in my experience, it is a worthwhile feature to have as changes/additions can be applied to a single repository transparently and effortlessly e.g. excluding one or more attributes from a source table is a simple INSERT (into control table) statement. I will demonstrate this functionality in more details in part 2 and 3.

As part of this task we will also create all database objects and populate them with test data. You can notice that the code below creates four tables (on ControlDB database) and two views (on StagingDB database). Each of those objects’ functionality is described as per below:

  • Ctrl_RemoteSvrs_Tables2Process – metadata table holding objects names and their corresponding environment variables e.g. schema names (both remote and local servers), database names (both remote and local servers), whether the table is active, whether the data volume/record count is large or not etc. This table’s content dictates which acquisition process should be used for data coping i.e. dynamic MERGE SQL statement (see part 3 for details) or parallelised INSERTs (see part 2 for details) as well as providing some basic metadata information
  • Ctrl_RemoteSvrs_Tables2Process_ColumnExceptions – metadata table containing objects attributes which are not to be acquired from the source database/server for data redundancy or security reasons. This table may be referenced if any particular object on the source server contains columns which can be excluded, saving space and reducing security concerns
  • Ctrl_INDXandPKs2Process – control table containing indexes metadata which stores information on the indexes types, objects they’re built on, columns they’re encompassing etc.
  • Ctrl_ErrorMsg_Notification_List – control table containing email addresses distribution list for error massages notifications and associated metadata. This table is referenced to build a list of addresses which should be notify in case of unexpected event occurrence
  • vw_MySQLReservedWords – a view containing a list of MySQL reserved words to allow for MySQL syntax compliance by means of substituting certain key words with a delimited version e.g. replacing words such as AS, CHAR or COLUMN with `AS`, `CHAR` and `COLUMN` equivalents (delimited by backticks)
  • vw_MSSQLReservedWords – a view containing a list of SQL Server reserved words. Same purpose as the one above but targeting SQL Server version
/*==============================================================================
STEP 1
Create Staging and Control databases on the local instance
==============================================================================*/
USE [master];
GO
IF EXISTS ( SELECT  name
            FROM    sys.databases
            WHERE   name = N'StagingDB' )
    BEGIN
-- Close connections to the StagingDB database
        ALTER DATABASE StagingDB SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
        DROP DATABASE StagingDB;
    END;
GO
CREATE DATABASE StagingDB ON PRIMARY
( NAME = N'StagingDB'
, FILENAME = N'C:\DBFiles\StagingDB.mdf'
, SIZE = 10MB
, MAXSIZE = 1GB
, FILEGROWTH = 10MB ) LOG ON
( NAME = N'StagingDB_log'
, FILENAME = N'C:\DBFiles\StagingDB_log.LDF'
, SIZE = 1MB
, MAXSIZE = 1GB
, FILEGROWTH = 10MB);
GO
--Assign database ownership to login SA
EXEC StagingDB.dbo.sp_changedbowner @loginame = N'SA', @map = false;
GO
--Change the recovery model to BULK_LOGGED
ALTER DATABASE StagingDB SET RECOVERY BULK_LOGGED;
GO

IF EXISTS ( SELECT  name
            FROM    sys.databases
            WHERE   name = N'ControlDB' )
    BEGIN
-- Close connections to the ControlDB database
        ALTER DATABASE ControlDB SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
        DROP DATABASE ControlDB;
    END;
GO
CREATE DATABASE ControlDB ON PRIMARY
( NAME = N'ControlDB'
, FILENAME = N'C:\DBFiles\ControlDB.mdf'
, SIZE = 10MB
, MAXSIZE = 1GB
, FILEGROWTH = 10MB ) LOG ON
( NAME = N'StagingDB_log'
, FILENAME = N'C:\DBFiles\ControlDB_log.LDF'
, SIZE = 1MB
, MAXSIZE = 1GB
, FILEGROWTH = 10MB);
GO
--Assign database ownership to login SA
EXEC ControlDB.dbo.sp_changedbowner @loginame = N'SA', @map = false;
GO
--Change the recovery model to BULK_LOGGED
ALTER DATABASE ControlDB SET RECOVERY BULK_LOGGED;
GO

/*==============================================================================
STEP 2
Create ControlDB database objects
==============================================================================*/
USE [ControlDB];
GO

-- Create 'Ctrl_RemoteSvrs_Tables2Process' table
CREATE TABLE [dbo].[Ctrl_RemoteSvrs_Tables2Process]
    (
      [ID] [SMALLINT] IDENTITY(1, 1)
                      NOT NULL ,
      [Application_Name] [VARCHAR](255) NOT NULL ,
      [Local_Table_Name] [VARCHAR](255) NOT NULL ,
      [Local_Schema_Name] [VARCHAR](55) NOT NULL ,
      [Local_DB_Name] [VARCHAR](255) NOT NULL ,
      [Remote_Table_Name] [VARCHAR](255) NOT NULL ,
      [Remote_Schema_Name] [VARCHAR](55) NOT NULL ,
      [Remote_DB_Name] [VARCHAR](255) NOT NULL ,
      [Remote_Server_Name] [VARCHAR](255) NULL ,
      [Is_Active] [BIT] NOT NULL ,
      [Is_Big_Table] [BIT] NOT NULL ,
      CONSTRAINT [pk_dbo_ctrl_remotesvrs_tables2process_id] PRIMARY KEY CLUSTERED
        ( [ID] ASC )
        WITH ( PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
               IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,
               ALLOW_PAGE_LOCKS = ON ) ON [PRIMARY]
    )
ON  [PRIMARY];
GO

--Create 'Ctrl_RemoteSvrs_Tables2Process_ColumnExceptions' table
CREATE TABLE [dbo].[Ctrl_RemoteSvrs_Tables2Process_ColumnExceptions]
    (
      [ID] [SMALLINT] IDENTITY(1, 1)
                      NOT NULL ,
      [FK_ObjectID] [SMALLINT] NOT NULL ,
      [Application_Name] [VARCHAR](255) NOT NULL ,
      [Local_Field_Name] [VARCHAR](255) NOT NULL ,
      [Local_Table_Name] [VARCHAR](255) NOT NULL ,
      [Local_Schema_Name] [VARCHAR](55) NOT NULL ,
      [Local_DB_Name] [VARCHAR](255) NOT NULL ,
      [Remote_Field_Name] [VARCHAR](255) NOT NULL ,
      [Remote_Table_Name] [VARCHAR](255) NOT NULL ,
      [Remote_Schema_Name] [VARCHAR](55) NOT NULL ,
      [Remote_DB_Name] [VARCHAR](255) NOT NULL ,
      [Remote_Server_Name] [VARCHAR](255) NOT NULL ,
      [Exception_Type] [VARCHAR](55) NOT NULL ,
      [Is_Active] [BIT] NOT NULL ,
      CONSTRAINT [pk_dbo_ctrl_remotesvrs_tables2process_columnexceptions_id] PRIMARY KEY CLUSTERED
        ( [ID] ASC )
        WITH ( PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
               IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,
               ALLOW_PAGE_LOCKS = ON ) ON [PRIMARY]
    )
ON  [PRIMARY];

GO


-- Create foreign key constraint between 
-- 'Ctrl_RemoteSvrs_Tables2Process_ColumnExceptions' and 'Ctrl_RemoteSvrs_Tables2Process' tables
ALTER TABLE [dbo].[Ctrl_RemoteSvrs_Tables2Process_ColumnExceptions]  
WITH CHECK ADD  CONSTRAINT [fk_dbo_ctrl_remotesvrs_tables2process_id] FOREIGN KEY([FK_ObjectID])
REFERENCES [dbo].[Ctrl_RemoteSvrs_Tables2Process] ([ID]);
GO

ALTER TABLE [dbo].[Ctrl_RemoteSvrs_Tables2Process_ColumnExceptions] 
CHECK CONSTRAINT [fk_dbo_ctrl_remotesvrs_tables2process_id];
GO


-- Create 'Ctrl_INDXandPKs2Process' table
CREATE TABLE [dbo].[Ctrl_INDXandPKs2Process]
    (
      [ID] [SMALLINT] IDENTITY(1, 1)
                      NOT NULL ,
      [Program_Name] [VARCHAR](128) NOT NULL ,
      [Database_Name] [VARCHAR](128) NOT NULL ,
      [Schema_Name] [VARCHAR](25) NOT NULL ,
      [Table_Name] [VARCHAR](256) NOT NULL ,
      [Index_or_PKName] [VARCHAR](512) NOT NULL ,
      [Index_Type] [VARCHAR](128) NOT NULL ,
      [Is_Unique] [VARCHAR](56) NULL ,
      [Is_PK] [VARCHAR](56) NULL ,
      [PK_ColNames] [VARCHAR](1024) NULL ,
      [Indx_ColNames] [VARCHAR](1024) NULL ,
	  [Indx_Options] VARCHAR (MAX) NULL,
      CONSTRAINT [pk_id_ctrl_indxandpks2process_id] PRIMARY KEY CLUSTERED
        ( [ID] ASC )
        WITH ( PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
               IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,
               ALLOW_PAGE_LOCKS = ON ) ON [PRIMARY]
    )
ON  [PRIMARY];
GO


-- Create 'Ctrl_ErrorMsg_Notification_List' table
CREATE TABLE [dbo].[Ctrl_ErrorMsg_Notification_List]
    (
      [ID] [INT] IDENTITY(1, 1)
                 NOT NULL ,
      [ServerName] [VARCHAR](128) NULL ,
      [InstanceName] [VARCHAR](128) NULL ,
      [TaskName] [VARCHAR](256) NULL ,
      [EmailAddress] [VARCHAR](256) NULL ,
      [IsActive] [BIT] NULL ,
      CONSTRAINT [pk_dbo_ctrl_errorMsg_notification_list_id] PRIMARY KEY CLUSTERED
        ( [ID] ASC )
        WITH ( PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
               IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON,
               ALLOW_PAGE_LOCKS = ON ) ON [PRIMARY]
    )
ON  [PRIMARY];
GO


-- Insert sample data into control objects created 
INSERT  INTO [dbo].[Ctrl_RemoteSvrs_Tables2Process]
        ( Application_Name ,
          Local_Table_Name ,
          Local_Schema_Name ,
          Local_DB_Name ,
          Remote_Table_Name ,
          Remote_Schema_Name ,
          Remote_DB_Name ,
          Remote_Server_Name ,
          Is_Active ,
          Is_Big_Table
        )
        SELECT  'AppName' ,
                'answers' ,
                'dbo' ,
                'StagingDB' ,
                'answers' ,
                'Remote_SchemaName' ,
                'Remote_DBName' ,
                'RemoteMySQLDB' ,
                1 ,
                1
        UNION ALL
        SELECT  'AppName' ,
                'federal_states' ,
                'dbo' ,
                'StagingDB' ,
                'federal_states' ,
                'Remote_SchemaName' ,
                'Remote_DBName' ,
                'RemoteMySQLDB' ,
                1 ,
                0;

INSERT  INTO dbo.Ctrl_RemoteSvrs_Tables2Process_ColumnExceptions
        ( FK_ObjectID ,
          Application_Name ,
          Local_Field_Name ,
          Local_Table_Name ,
          Local_Schema_Name ,
          Local_DB_Name ,
          Remote_Field_Name ,
          Remote_Table_Name ,
          Remote_Schema_Name ,
          Remote_DB_Name ,
          Remote_Server_Name ,
          Exception_Type ,
          Is_Active
        )
        SELECT  1 ,
                'AppName' ,
                'other_value' ,
                'answers' ,
                'dbo' ,
                'StagingDB' ,
                'other_value' ,
                'answers' ,
                'Remote_Schema_Name' ,
                'Remote_DB_Name' ,
                'RemoteMySQLDB' ,
                'security' ,
                1;

INSERT  INTO dbo.Ctrl_INDXandPKs2Process
        ( Program_Name ,
          Database_Name ,
          Schema_Name ,
          Table_Name ,
          Index_or_PKName ,
          Index_Type ,
          Is_Unique ,
          Is_PK ,
          PK_ColNames ,
          Indx_ColNames ,
          Indx_Options
        )
        SELECT  'AppName' ,
                'StagingDB' ,
                'dbo' ,
                'answers' ,
                'cstore_nonclustered_idx_dbo_answers_multiplecols' ,
                'CLUSTERED COLUMNSTORE' ,
                '' ,
                '' ,
                '' ,
                'id,oos_id,question_id,question_set_id,answer_provided_by_user_id,answer_option_id,timestamp,oos_questionset_id,owner_user_id' ,
                'WITH ( DATA_COMPRESSION = COLUMNSTORE_ARCHIVE )'
        UNION ALL
        SELECT  'AppName' ,
                'StagingDB' ,
                'dbo' ,
                'federal_states' ,
                'nonclustered_idx_dbo_federal_states_name' ,
                'NONCLUSTERED' ,
                '' ,
                '' ,
                '' ,
                'name' ,
                ''
        UNION ALL
        SELECT  'AppName' ,
                'StagingDB' ,
                'dbo' ,
                'answers' ,
                'pk_dbo_answers_id' ,
                'CLUSTERED' ,
                'UNIQUE' ,
                'PRIMARY KEY' ,
                'id' ,
                '' ,
                ''
        UNION ALL
        SELECT  'AppName' ,
                'StagingDB' ,
                'dbo' ,
                'federal_states' ,
                'pk_dbo_federal_states_id' ,
                'CLUSTERED' ,
                'UNIQUE' ,
                'PRIMARY KEY' ,
                'id' ,
                '' ,
                '';

INSERT  INTO dbo.Ctrl_ErrorMsg_Notification_List
        ( [ServerName] ,
          [InstanceName] ,
          [TaskName] ,
          [EmailAddress] ,
          [IsActive]
        )
        SELECT  'BICortexTestServer' ,
                'TestSQLServer' ,
                'Data_Acquisition_Job' ,
                'myname@emailaddress.com' ,
                1;

/*==============================================================================
STEP 2
Create StagingDB database objects
==============================================================================*/
USE [StagingDB]
GO

CREATE VIEW [dbo].[vw_MysqlReservedWords] AS
SELECT 'ACCESSIBLE'			AS reserved_word,	'`ACCESSIBLE`' AS mysql_version				UNION ALL				
SELECT 'ADD',									'`ADD`'										UNION ALL			
SELECT 'ALL',									'`ALL`'										UNION ALL			
SELECT 'ALTER',									'`ALTER`'									UNION ALL			
SELECT 'ANALYZE',								'`ANALYZE`'									UNION ALL				
SELECT 'AND',									'`AND`'										UNION ALL			
SELECT 'AS',									'`AS`'										UNION ALL 		
SELECT 'ASC',									'`ASC`'										UNION ALL			
SELECT 'ASENSITIVE',							'`ASENSITIVE`'								UNION ALL				
SELECT 'BEFORE',								'`BEFORE`'									UNION ALL			
SELECT 'BETWEEN',								'`BETWEEN`'									UNION ALL				
SELECT 'BIGINT',								'`BIGINT`'									UNION ALL			
SELECT 'BINARY',								'`BINARY`'									UNION ALL			
SELECT 'BLOB',									'`BLOB`'									UNION ALL			
SELECT 'BOTH',									'`BOTH`'									UNION ALL			
SELECT 'BY',									'`BY`'										UNION ALL		
SELECT 'CALL',									'`CALL`'									UNION ALL			
SELECT 'CASCADE',								'`CASCADE`'									UNION ALL				
SELECT 'CASE',									'`CASE`'									UNION ALL			
SELECT 'CHANGE',								'`CHANGE`'									UNION ALL			
SELECT 'CHAR',									'`CHAR`'									UNION ALL			
SELECT 'CHARACTER',								'`CHARACTER`'								UNION ALL				
SELECT 'CHECK',									'`CHECK`'									UNION ALL			
SELECT 'COLLATE',								'`COLLATE`'									UNION ALL				
SELECT 'COLUMN',								'`COLUMN`'									UNION ALL			
SELECT 'CONDITION',								'`CONDITION`'								UNION ALL				
SELECT 'CONSTRAINT',							'`CONSTRAINT`'								UNION ALL				
SELECT 'CONTINUE',								'`CONTINUE`'								UNION ALL				
SELECT 'CONVERT',								'`CONVERT`'									UNION ALL				
SELECT 'CREATE',								'`CREATE`'									UNION ALL			
SELECT 'CROSS',									'`CROSS`'									UNION ALL			
SELECT 'CURRENT_DATE',							'`CURRENT_DATE`'							UNION ALL					
SELECT 'CURRENT_TIME',							'`CURRENT_TIME`'							UNION ALL					
SELECT 'CURRENT_TIMESTAMP',						'`CURRENT_TIMESTAMP`'						UNION ALL						
SELECT 'CURRENT_USER',							'`CURRENT_USER`'							UNION ALL					
SELECT 'CURSOR',								'`CURSOR`'									UNION ALL			
SELECT 'DATABASE',								'`DATABASE`'								UNION ALL				
SELECT 'DATABASES',								'`DATABASES`'								UNION ALL				
SELECT 'DAY',									'`DAY`'										UNION ALL			
SELECT 'HOUR',									'`HOUR`'									UNION ALL			
SELECT 'DAY_MICROSECOND',						'`DAY_MICROSECOND`'							UNION ALL						
SELECT 'DAY_MINUTE',							'`DAY_MINUTE`'								UNION ALL				
SELECT 'DAY_SECOND',							'`DAY_SECOND`'								UNION ALL				
SELECT 'DEC',									'`DEC`'										UNION ALL			
SELECT 'DECIMAL',								'`DECIMAL`'									UNION ALL				
SELECT 'DECLARE',								'`DECLARE`'									UNION ALL				
SELECT 'DEFAULT',								'`DEFAULT`'									UNION ALL				
SELECT 'DELAYED',								'`DELAYED`'									UNION ALL				
SELECT 'DELETE',								'`DELETE`'									UNION ALL			
SELECT 'DESC',									'`DESC`'									UNION ALL			
SELECT 'DESCRIBE',								'`DESCRIBE`'								UNION ALL				
SELECT 'DETERMINISTIC',							'`DETERMINISTIC`'							UNION ALL					
SELECT 'DISTINCT',								'`DISTINCT`'								UNION ALL				
SELECT 'DISTINCTROW',							'`DISTINCTROW`'								UNION ALL					
SELECT 'DIV',									'`DIV`'										UNION ALL			
SELECT 'DOUBLE',								'`DOUBLE`'									UNION ALL			
SELECT 'DROP',									'`DROP`'									UNION ALL			
SELECT 'DUAL',									'`DUAL`'									UNION ALL			
SELECT 'EACH',									'`EACH`'									UNION ALL			
SELECT 'ELSE',									'`ELSE`'									UNION ALL			
SELECT 'ELSEIF',								'`ELSEIF`'									UNION ALL			
SELECT 'ENCLOSED',								'`ENCLOSED`'								UNION ALL				
SELECT 'ESCAPED',								'`ESCAPED`'									UNION ALL				
SELECT 'EXISTS',								'`EXISTS`'									UNION ALL			
SELECT 'EXIT',									'`EXIT`'									UNION ALL			
SELECT 'EXPLAIN',								'`EXPLAIN`'									UNION ALL				
SELECT 'FALSE',									'`FALSE`'									UNION ALL			
SELECT 'FETCH',									'`FETCH`'									UNION ALL			
SELECT 'FLOAT',									'`FLOAT`'									UNION ALL			
SELECT 'FLOAT4',								'`FLOAT4`'									UNION ALL			
SELECT 'FLOAT8',								'`FLOAT8`'									UNION ALL			
SELECT 'FOR',									'`FOR`'										UNION ALL			
SELECT 'FORCE',									'`FORCE`'									UNION ALL			
SELECT 'FOREIGN',								'`FOREIGN`'									UNION ALL				
SELECT 'FROM',									'`FROM`'									UNION ALL			
SELECT 'FULLTEXT',								'`FULLTEXT`'								UNION ALL				
SELECT 'GRANT',									'`GRANT`'									UNION ALL			
SELECT 'GROUP',									'`GROUP`'									UNION ALL			
SELECT 'HAVING',								'`HAVING`'									UNION ALL			
SELECT 'HIGH_PRIORITY',							'`HIGH_PRIORITY`'							UNION ALL					
SELECT 'HOUR_MICROSECOND',						'`HOUR_MICROSECOND`'						UNION ALL						
SELECT 'HOUR_MINUTE',							'`HOUR_MINUTE`'								UNION ALL					
SELECT 'HOUR_SECOND',							'`HOUR_SECOND`'								UNION ALL					
SELECT 'IF',									'`IF`'										UNION ALL		
SELECT 'IGNORE',								'`IGNORE`'									UNION ALL			
SELECT 'IN',									'`IN`'										UNION ALL		
SELECT 'INDEX',									'`INDEX`'									UNION ALL			
SELECT 'INFILE',								'`INFILE`'									UNION ALL			
SELECT 'INNER',									'`INNER`'									UNION ALL			
SELECT 'INOUT',									'`INOUT`'									UNION ALL			
SELECT 'INSENSITIVE',							'`INSENSITIVE`'								UNION ALL					
SELECT 'INSERT',								'`INSERT`'									UNION ALL			
SELECT 'INT',									'`INT`'										UNION ALL			
SELECT 'INT1',									'`INT1`'									UNION ALL			
SELECT 'INT2',									'`INT2`'									UNION ALL			
SELECT 'INT3',									'`INT3`'									UNION ALL			
SELECT 'INT4',									'`INT4`'									UNION ALL			
SELECT 'INT8',									'`INT8`'									UNION ALL			
SELECT 'INTEGER',								'`INTEGER`'									UNION ALL				
SELECT 'INTERVAL',								'`INTERVAL`'								UNION ALL				
SELECT 'INTO',									'`INTO`'									UNION ALL			
SELECT 'IS',									'`IS`'										UNION ALL		
SELECT 'ITERATE',								'`ITERATE`'									UNION ALL				
SELECT 'JOIN',									'`JOIN`'									UNION ALL			
SELECT 'KEY',									'`KEY`'										UNION ALL			
SELECT 'KEYS',									'`KEYS`'									UNION ALL			
SELECT 'KILL',									'`KILL`'									UNION ALL			
SELECT 'LEADING',								'`LEADING`'									UNION ALL				
SELECT 'LEAVE',									'`LEAVE`'									UNION ALL			
SELECT 'LEFT',									'`LEFT`'									UNION ALL			
SELECT 'LIKE',									'`LIKE`'									UNION ALL			
SELECT 'LIMIT',									'`LIMIT`'									UNION ALL			
SELECT 'LINEAR',								'`LINEAR`'									UNION ALL			
SELECT 'LINES',									'`LINES`'									UNION ALL			
SELECT 'LOAD',									'`LOAD`'									UNION ALL			
SELECT 'LOCALTIME',								'`LOCALTIME`'								UNION ALL				
SELECT 'LOCALTIMESTAMP',						'`LOCALTIMESTAMP`'							UNION ALL					
SELECT 'LOCK',									'`LOCK`'									UNION ALL			
SELECT 'LONG',									'`LONG`'									UNION ALL			
SELECT 'LONGBLOB',								'`LONGBLOB`'								UNION ALL				
SELECT 'LONGTEXT',								'`LONGTEXT`'								UNION ALL				
SELECT 'LOOP',									'`LOOP`'									UNION ALL			
SELECT 'LOW_PRIORITY',							'`LOW_PRIORITY`'							UNION ALL					
SELECT 'MASTER_SSL_VERIFY_SERVER_CERT',			'`MASTER_SSL_VERIFY_SERVER_CERT`'			UNION ALL							
SELECT 'MATCH',									'`MATCH`'									UNION ALL			
SELECT 'MAXVALUE',								'`MAXVALUE`'								UNION ALL				
SELECT 'MEDIUMBLOB',							'`MEDIUMBLOB`'								UNION ALL				
SELECT 'MEDIUMINT',								'`MEDIUMINT`'								UNION ALL				
SELECT 'MEDIUMTEXT',							'`MEDIUMTEXT`'								UNION ALL				
SELECT 'MIDDLEINT',								'`MIDDLEINT`'								UNION ALL				
SELECT 'MINUTE_MICROSECOND',					'`MINUTE_MICROSECOND`'						UNION ALL						
SELECT 'MINUTE_SECOND',							'`MINUTE_SECOND`'							UNION ALL					
SELECT 'MOD',									'`MOD`'										UNION ALL			
SELECT 'MODIFIES',								'`MODIFIES`'								UNION ALL				
SELECT 'NATURAL',								'`NATURAL`'									UNION ALL				
SELECT 'NOT',									'`NOT`'										UNION ALL			
SELECT 'NO_WRITE_TO_BINLOG',					'`NO_WRITE_TO_BINLOG`'						UNION ALL						
SELECT 'NULL',									'`NULL`'									UNION ALL			
SELECT 'NUMERIC',								'`NUMERIC`'									UNION ALL				
SELECT 'ON',									'`ON`'										UNION ALL		
SELECT 'OPTIMIZE',								'`OPTIMIZE`'								UNION ALL				
SELECT 'OPTION',								'`OPTION`'									UNION ALL			
SELECT 'OPTIONALLY',							'`OPTIONALLY`'								UNION ALL				
SELECT 'OR',									'`OR`'										UNION ALL		
SELECT 'ORDER',									'`ORDER`'									UNION ALL			
SELECT 'OUT',									'`OUT`'										UNION ALL			
SELECT 'OUTER',									'`OUTER`'									UNION ALL			
SELECT 'OUTFILE',								'`OUTFILE`'									UNION ALL				
SELECT 'PRECISION',								'`PRECISION`'								UNION ALL				
SELECT 'PRIMARY',								'`PRIMARY`'									UNION ALL				
SELECT 'PROCEDURE',								'`PROCEDURE`'								UNION ALL				
SELECT 'PURGE',									'`PURGE`'									UNION ALL			
SELECT 'RANGE',									'`RANGE`'									UNION ALL			
SELECT 'READ',									'`READ`'									UNION ALL			
SELECT 'READS',									'`READS`'									UNION ALL			
SELECT 'READ_WRITE',							'`READ_WRITE`'								UNION ALL				
SELECT 'REAL',									'`REAL`'									UNION ALL			
SELECT 'REFERENCES',							'`REFERENCES`'								UNION ALL				
SELECT 'REGEXP',								'`REGEXP`'									UNION ALL			
SELECT 'RELEASE',								'`RELEASE`'									UNION ALL				
SELECT 'RENAME',								'`RENAME`'									UNION ALL			
SELECT 'REPEAT',								'`REPEAT`'									UNION ALL			
SELECT 'REPLACE',								'`REPLACE`'									UNION ALL				
SELECT 'REQUIRE',								'`REQUIRE`'									UNION ALL				
SELECT 'RESIGNAL',								'`RESIGNAL`'								UNION ALL				
SELECT 'RESTRICT',								'`RESTRICT`'								UNION ALL				
SELECT 'RETURN',								'`RETURN`'									UNION ALL			
SELECT 'REVOKE',								'`REVOKE`'									UNION ALL			
SELECT 'RIGHT',									'`RIGHT`'									UNION ALL			
SELECT 'RLIKE',									'`RLIKE`'									UNION ALL			
SELECT 'SCHEMA',								'`SCHEMA`'									UNION ALL			
SELECT 'SCHEMAS',								'`SCHEMAS`'									UNION ALL				
SELECT 'SECOND_MICROSECOND',					'`SECOND_MICROSECOND`'						UNION ALL						
SELECT 'SELECT',								'`SELECT`'									UNION ALL			
SELECT 'SENSITIVE',								'`SENSITIVE`'								UNION ALL				
SELECT 'SEPARATOR',								'`SEPARATOR`'								UNION ALL				
SELECT 'SET',									'`SET`'										UNION ALL			
SELECT 'SHOW',									'`SHOW`'									UNION ALL			
SELECT 'SIGNAL',								'`SIGNAL`'									UNION ALL			
SELECT 'SMALLINT',								'`SMALLINT`'								UNION ALL				
SELECT 'SPATIAL',								'`SPATIAL`'									UNION ALL				
SELECT 'SPECIFIC',								'`SPECIFIC`'								UNION ALL				
SELECT 'SQL',									'`SQL`'										UNION ALL			
SELECT 'SQLEXCEPTION',							'`SQLEXCEPTION`'							UNION ALL					
SELECT 'SQLSTATE',								'`SQLSTATE`'								UNION ALL				
SELECT 'SQLWARNING',							'`SQLWARNING`'								UNION ALL				
SELECT 'SQL_BIG_RESULT',						'`SQL_BIG_RESULT`'							UNION ALL					
SELECT 'SQL_CALC_FOUND_ROWS',					'`SQL_CALC_FOUND_ROWS`'						UNION ALL							
SELECT 'SQL_SMALL_RESULT',						'`SQL_SMALL_RESULT`'						UNION ALL						
SELECT 'SSL',									'`SSL`'										UNION ALL			
SELECT 'STARTING',								'`STARTING`'								UNION ALL				
SELECT 'STRAIGHT_JOIN',							'`STRAIGHT_JOIN`'							UNION ALL					
SELECT 'TABLE',									'`TABLE`'									UNION ALL			
SELECT 'TERMINATED',							'`TERMINATED`'								UNION ALL				
SELECT 'THEN',									'`THEN`'									UNION ALL			
SELECT 'TINYBLOB',								'`TINYBLOB`'								UNION ALL				
SELECT 'TINYINT',								'`TINYINT`'									UNION ALL				
SELECT 'TINYTEXT',								'`TINYTEXT`'								UNION ALL				
SELECT 'TO',									'`TO`'										UNION ALL		
SELECT 'TRAILING',								'`TRAILING`'								UNION ALL				
SELECT 'TRIGGER',								'`TRIGGER`'									UNION ALL				
SELECT 'TRUE',									'`TRUE`'									UNION ALL			
SELECT 'UNDO',									'`UNDO`'									UNION ALL			
SELECT 'UNION',									'`UNION`'									UNION ALL			
SELECT 'UNIQUE',								'`UNIQUE`'									UNION ALL			
SELECT 'UNLOCK',								'`UNLOCK`'									UNION ALL			
SELECT 'UNSIGNED',								'`UNSIGNED`'								UNION ALL				
SELECT 'UPDATE',								'`UPDATE`'									UNION ALL			
SELECT 'USAGE',									'`USAGE`'									UNION ALL			
SELECT 'USE',									'`USE`'										UNION ALL			
SELECT 'USING',									'`USING`'									UNION ALL			
SELECT 'UTC_DATE',								'`UTC_DATE`'								UNION ALL				
SELECT 'UTC_TIME',								'`UTC_TIME`'								UNION ALL				
SELECT 'UTC_TIMESTAMP',							'`UTC_TIMESTAMP`'							UNION ALL					
SELECT 'VALUES',								'`VALUES`'									UNION ALL			
SELECT 'VARBINARY',								'`VARBINARY`'								UNION ALL				
SELECT 'VARCHAR',								'`VARCHAR`'									UNION ALL				
SELECT 'VARCHARACTER',							'`VARCHARACTER`'							UNION ALL					
SELECT 'VARYING',								'`VARYING`'									UNION ALL				
SELECT 'WHEN',									'`WHEN`'									UNION ALL			
SELECT 'WHERE',									'`WHERE`'									UNION ALL			
SELECT 'WHILE',									'`WHILE`'									UNION ALL			
SELECT 'WITH',									'`WITH`'									UNION ALL			
SELECT 'WRITE',									'`WRITE`'									UNION ALL			
SELECT 'XOR',									'`XOR`'										UNION ALL			
SELECT 'YEAR_MONTH',							'`YEAR_MONTH`'								UNION ALL				
SELECT 'ZEROFILL',								'`ZEROFILL`'								
GO



CREATE VIEW [dbo].[vw_MssqlReservedWords] AS
SELECT 	'ADD' AS reserved_word,					'[ADD]' AS mssql_version			UNION ALL
SELECT 	'EXTERNAL',								'[EXTERNAL]'						UNION ALL
SELECT 	'PROCEDURE',							'[PROCEDURE]'						UNION ALL
SELECT 	'ALL',									'[ALL]'								UNION ALL
SELECT 	'FETCH',								'[FETCH]'							UNION ALL
SELECT 	'PUBLIC',								'[PUBLIC]'							UNION ALL
SELECT 	'ALTER',								'[ALTER]'							UNION ALL
SELECT 	'FILE',									'[FILE]'							UNION ALL
SELECT 	'RAISERROR',							'[RAISERROR]'						UNION ALL
SELECT 	'AND',									'[AND]'								UNION ALL
SELECT 	'FILLFACTOR',							'[FILLFACTOR]'						UNION ALL
SELECT 	'READ',									'[READ]'							UNION ALL
SELECT 	'ANY',									'[ANY]'								UNION ALL
SELECT 	'FOR',									'[FOR]'								UNION ALL
SELECT 	'READTEXT',								'[READTEXT]'						UNION ALL
SELECT 	'AS',									'[AS]'								UNION ALL
SELECT 	'FOREIGN',								'[FOREIGN]'							UNION ALL
SELECT 	'RECONFIGURE',							'[RECONFIGURE]'						UNION ALL
SELECT 	'ASC',									'[ASC]'								UNION ALL
SELECT 	'FREETEXT',								'[FREETEXT]'						UNION ALL
SELECT 	'REFERENCES',							'[REFERENCES]'						UNION ALL
SELECT 	'AUTHORIZATION',						'[AUTHORIZATION]'					UNION ALL
SELECT 	'FREETEXTTABLE',						'[FREETEXTTABLE]'					UNION ALL
SELECT 	'REPLICATION',							'[REPLICATION]'						UNION ALL
SELECT 	'BACKUP',								'[BACKUP]'							UNION ALL
SELECT 	'FROM',									'[FROM]'							UNION ALL
SELECT 	'RESTORE',								'[RESTORE]'							UNION ALL
SELECT 	'BEGIN',								'[BEGIN]'							UNION ALL
SELECT 	'FULL',									'[FULL]'							UNION ALL
SELECT 	'RESTRICT',								'[RESTRICT]'						UNION ALL
SELECT 	'BETWEEN',								'[BETWEEN]'							UNION ALL
SELECT 	'FUNCTION',								'[FUNCTION]'						UNION ALL
SELECT 	'RETURN',								'[RETURN]'							UNION ALL
SELECT 	'BREAK',								'[BREAK]'							UNION ALL
SELECT 	'GOTO',									'[GOTO]'							UNION ALL
SELECT 	'REVERT',								'[REVERT]'							UNION ALL
SELECT 	'BROWSE',								'[BROWSE]'							UNION ALL
SELECT 	'GRANT',								'[GRANT]'							UNION ALL
SELECT 	'REVOKE',								'[REVOKE]'							UNION ALL
SELECT 	'BULK',									'[BULK]'							UNION ALL
SELECT 	'GROUP',								'[GROUP]'							UNION ALL
SELECT 	'RIGHT',								'[RIGHT]'							UNION ALL
SELECT 	'BY',									'[BY]'								UNION ALL
SELECT 	'HAVING',								'[HAVING]'							UNION ALL
SELECT 	'ROLLBACK',								'[ROLLBACK]'						UNION ALL
SELECT 	'CASCADE',								'[CASCADE]'							UNION ALL
SELECT 	'HOLDLOCK',								'[HOLDLOCK]'						UNION ALL
SELECT 	'ROWCOUNT',								'[ROWCOUNT]'						UNION ALL
SELECT 	'CASE',									'[CASE]'							UNION ALL
SELECT 	'IDENTITY',								'[IDENTITY]'						UNION ALL
SELECT 	'ROWGUIDCOL',							'[ROWGUIDCOL]'						UNION ALL
SELECT 	'CHECK',								'[CHECK]'							UNION ALL
SELECT 	'IDENTITY_INSERT',						'[IDENTITY_INSERT]'					UNION ALL
SELECT 	'RULE',									'[RULE]'							UNION ALL
SELECT 	'CHECKPOINT',							'[CHECKPOINT]'						UNION ALL
SELECT 	'IDENTITYCOL',							'[IDENTITYCOL]'						UNION ALL
SELECT 	'SAVE',									'[SAVE]'							UNION ALL
SELECT 	'CLOSE',								'[CLOSE]'							UNION ALL
SELECT 	'IF',									'[IF]'								UNION ALL
SELECT 	'SCHEMA',								'[SCHEMA]'							UNION ALL
SELECT 	'CLUSTERED',							'[CLUSTERED]'						UNION ALL
SELECT 	'IN',									'[IN]'								UNION ALL
SELECT 	'SECURITYAUDIT',						'[SECURITYAUDIT]'					UNION ALL
SELECT 	'COALESCE',								'[COALESCE]'						UNION ALL
SELECT 	'INDEX',								'[INDEX]'							UNION ALL
SELECT 	'SELECT',								'[SELECT]'							UNION ALL
SELECT 	'COLLATE',								'[COLLATE]'							UNION ALL
SELECT 	'INNER',								'[INNER]'							UNION ALL
SELECT 	'SEMANTICKEYPHRASETABLE',				'[SEMANTICKEYPHRASETABLE]'			UNION ALL
SELECT 	'COLUMN',								'[COLUMN]'							UNION ALL
SELECT 	'INSERT',								'[INSERT]'							UNION ALL
SELECT 	'SEMANTICSIMILARITYDETAILSTABLE',		'[SEMANTICSIMILARITYDETAILSTABLE]'	UNION ALL
SELECT 	'COMMIT',								'[COMMIT]'							UNION ALL
SELECT 	'INTERSECT',							'[INTERSECT]'						UNION ALL
SELECT 	'SEMANTICSIMILARITYTABLE',				'[SEMANTICSIMILARITYTABLE]'			UNION ALL
SELECT 	'COMPUTE',								'[COMPUTE]'							UNION ALL
SELECT 	'INTO',									'[INTO]'							UNION ALL
SELECT 	'SESSION_USER',							'[SESSION_USER]'					UNION ALL
SELECT 	'CONSTRAINT',							'[CONSTRAINT]'						UNION ALL
SELECT 	'IS',									'[IS]'								UNION ALL
SELECT 	'SET',									'[SET]'								UNION ALL
SELECT 	'CONTAINS',								'[CONTAINS]'						UNION ALL
SELECT 	'JOIN',									'[JOIN]'							UNION ALL
SELECT 	'SETUSER',								'[SETUSER]'							UNION ALL
SELECT 	'CONTAINSTABLE',						'[CONTAINSTABLE]'					UNION ALL
SELECT 	'KEY',									'[KEY]'								UNION ALL
SELECT 	'SHUTDOWN',								'[SHUTDOWN]'						UNION ALL
SELECT 	'CONTINUE',								'[CONTINUE]'						UNION ALL
SELECT 	'KILL',									'[KILL]'							UNION ALL
SELECT 	'SOME',									'[SOME]'							UNION ALL
SELECT 	'CONVERT',								'[CONVERT]'							UNION ALL
SELECT 	'LEFT',									'[LEFT]'							UNION ALL
SELECT 	'STATISTICS',							'[STATISTICS]'						UNION ALL
SELECT 	'CREATE',								'[CREATE]'							UNION ALL
SELECT 	'LIKE',									'[LIKE]'							UNION ALL
SELECT 	'SYSTEM_USER',							'[SYSTEM_USER]'						UNION ALL
SELECT 	'CROSS',								'[CROSS]'							UNION ALL
SELECT 	'LINENO',								'[LINENO]'							UNION ALL
SELECT 	'TABLE',								'[TABLE]'							UNION ALL
SELECT 	'CURRENT',								'[CURRENT]'							UNION ALL
SELECT 	'LOAD',									'[LOAD]'							UNION ALL
SELECT 	'TABLESAMPLE',							'[TABLESAMPLE]'						UNION ALL
SELECT 	'CURRENT_DATE',							'[CURRENT_DATE]'					UNION ALL
SELECT 	'MERGE',								'[MERGE]'							UNION ALL
SELECT 	'TEXTSIZE',								'[TEXTSIZE]'						UNION ALL
SELECT 	'CURRENT_TIME',							'[CURRENT_TIME]'					UNION ALL
SELECT 	'NATIONAL',								'[NATIONAL]'						UNION ALL
SELECT 	'THEN',									'[THEN]'							UNION ALL
SELECT 	'CURRENT_TIMESTAMP',					'[CURRENT_TIMESTAMP]'				UNION ALL
SELECT 	'NOCHECK',								'[NOCHECK]'							UNION ALL
SELECT 	'TO',									'[TO]'								UNION ALL
SELECT 	'CURRENT_USER',							'[CURRENT_USER]'					UNION ALL
SELECT 	'NONCLUSTERED',							'[NONCLUSTERED]'					UNION ALL
SELECT 	'TOP',									'[TOP]'								UNION ALL
SELECT 	'CURSOR',								'[CURSOR]'							UNION ALL
SELECT 	'NOT',									'[NOT]'								UNION ALL
SELECT 	'TRAN',									'[TRAN]'							UNION ALL
SELECT 	'DATABASE',								'[DATABASE]'						UNION ALL
SELECT 	'NULL',									'[NULL]'							UNION ALL
SELECT 	'TRANSACTION',							'[TRANSACTION]'						UNION ALL
SELECT 	'DBCC',									'[DBCC]'							UNION ALL
SELECT 	'NULLIF',								'[NULLIF]'							UNION ALL
SELECT 	'TRIGGER',								'[TRIGGER]'							UNION ALL
SELECT 	'DEALLOCATE',							'[DEALLOCATE]'						UNION ALL
SELECT 	'OF',									'[OF]'								UNION ALL
SELECT 	'TRUNCATE',								'[TRUNCATE]'						UNION ALL
SELECT 	'DECLARE',								'[DECLARE]'							UNION ALL
SELECT 	'OFF',									'[OFF]'								UNION ALL
SELECT 	'TRY_CONVERT',							'[TRY_CONVERT]'						UNION ALL
SELECT 	'DEFAULT',								'[DEFAULT]'							UNION ALL
SELECT 	'OFFSETS',								'[OFFSETS]'							UNION ALL
SELECT 	'TSEQUAL',								'[TSEQUAL]'							UNION ALL
SELECT 	'DELETE',								'[DELETE]'							UNION ALL
SELECT 	'ON',									'[ON]'								UNION ALL
SELECT 	'UNION',								'[UNION]'							UNION ALL
SELECT 	'DENY',									'[DENY]'							UNION ALL
SELECT 	'OPEN',									'[OPEN]'							UNION ALL
SELECT 	'UNIQUE',								'[UNIQUE]'							UNION ALL
SELECT 	'DESC',									'[DESC]'							UNION ALL
SELECT 	'OPENDATASOURCE',						'[OPENDATASOURCE]'					UNION ALL
SELECT 	'UNPIVOT',								'[UNPIVOT]'							UNION ALL
SELECT 	'DISK',									'[DISK]'							UNION ALL
SELECT 	'OPENQUERY',							'[OPENQUERY]'						UNION ALL
SELECT 	'UPDATE',								'[UPDATE]'							UNION ALL
SELECT 	'DISTINCT',								'[DISTINCT]'						UNION ALL
SELECT 	'OPENROWSET',							'[OPENROWSET]'						UNION ALL
SELECT 	'UPDATETEXT',							'[UPDATETEXT]'						UNION ALL
SELECT 	'DISTRIBUTED',							'[DISTRIBUTED]'						UNION ALL
SELECT 	'OPENXML',								'[OPENXML]'							UNION ALL
SELECT 	'USE',									'[USE]'								UNION ALL
SELECT 	'DOUBLE',								'[DOUBLE]'							UNION ALL
SELECT 	'OPTION',								'[OPTION]'							UNION ALL
SELECT 	'USER',									'[USER]'							UNION ALL
SELECT 	'DROP',									'[DROP]'							UNION ALL
SELECT 	'OR',									'[OR]'								UNION ALL
SELECT 	'VALUES',								'[VALUES]'							UNION ALL
SELECT 	'DUMP',									'[DUMP]'							UNION ALL
SELECT 	'ORDER',								'[ORDER]'							UNION ALL
SELECT 	'VARYING',								'[VARYING]'							UNION ALL
SELECT 	'ELSE',									'[ELSE]'							UNION ALL
SELECT 	'OUTER',								'[OUTER]'							UNION ALL
SELECT 	'VIEW',									'[VIEW]'							UNION ALL
SELECT 	'END',									'[END]'								UNION ALL
SELECT 	'OVER',									'[OVER]'							UNION ALL
SELECT 	'WAITFOR',								'[WAITFOR]'							UNION ALL
SELECT 	'ERRLVL',								'[ERRLVL]'							UNION ALL
SELECT 	'PERCENT',								'[PERCENT]'							UNION ALL
SELECT 	'WHEN',									'[WHEN]'							UNION ALL
SELECT 	'ESCAPE',								'[ESCAPE]'							UNION ALL
SELECT 	'PIVOT',								'[PIVOT]'							UNION ALL
SELECT 	'WHERE',								'[WHERE]'							UNION ALL
SELECT 	'EXCEPT',								'[EXCEPT]'							UNION ALL
SELECT 	'PLAN',									'[PLAN]'							UNION ALL
SELECT 	'WHILE',								'[WHILE]'							UNION ALL
SELECT 	'EXEC',									'[EXEC]'							UNION ALL
SELECT 	'PRECISION',							'[PRECISION]'						UNION ALL
SELECT 	'WITH',									'[WITH]'							UNION ALL
SELECT 	'EXECUTE',								'[EXECUTE]'							UNION ALL
SELECT 	'PRIMARY',								'[PRIMARY]'							UNION ALL
SELECT 	'WITHIN GROUP',							'[WITHIN GROUP]'					UNION ALL
SELECT 	'EXISTS',								'[EXISTS]'							UNION ALL
SELECT 	'PRINT',								'[PRINT]'							UNION ALL
SELECT 	'WRITETEXT',							'[WRITETEXT]'						UNION ALL
SELECT 	'EXIT',									'[EXIT]'							UNION ALL
SELECT 	'PROC',									'[PROC]'							UNION ALL
SELECT 	'USER_ID',								'[USER_ID]'							UNION ALL
SELECT 	'SEQUENCE',								'[SEQUENCE]'
GO

These tables/views, as mentioned before, will be referenced in subsequent posts and code as they provide the process with relevant metadata information to control tables, tables’ attributes, indexes, error notification alerts etc. and in case any change is required, provide a central point of reference for implementation. Also, all entries made into the four tables above correspond to my development environment so if replicating this functionality is your goal I suggest adjusting data entered/used in this post to one that matches your environment.

As part of this preliminary set up we will also create AdminDBA database (named this way, instead of ErrorsDB, only because it is probably too much hassle to change the already well documented code in one of my previous post). This database will be used to log any execution errors which can determine further package work flow e.g. determine if the subsequent task should or shouldn’t execute. A stored procedure responsible for sending out notification errors will also be located here as will a function concatenating e-mail addresses used by the package.

I have written extensively on how error capture and logging works in this process in my two previous blog posts (HERE and HERE) so I won’t be repeating myself in this post. For full details on the schema and the actual code used to create this database please view my previous blog posts HERE and HERE.

Once all the databases and their objects have been created successfully, the below stored procedure, allowing sending out notifications on any errors that occurred during package runtime (highlighted line entry needs to be modified with a valid reporting platform URL pointing to the AdminDBA database log report) as well as a scalar function, allowing tabular e-mail address entries conversion into a comma separated array, can be created. These two objects will later be incorporated into the SSIS package to manage error notifications distribution via e-mail.

/*====================================================================================
STEP 1
Create 'error distribution' stored procedure to manage error notifications based
on executing stored procedure name and reporting platform in use (see the highlighted
line). When implementing, please replace 'https://YourReportingPlatform' with
a valid URL pointing to the reporting platform e.g. SSRS, Tableau etc. where 
a detained report based on AdminDBA database logs can be accessed from. 
====================================================================================*/
USE [AdminDBA]
GO

SET ANSI_NULLS ON
GO

SET QUOTED_IDENTIFIER ON
GO

CREATE PROCEDURE [dbo].[usp_sendBIGroupETLFailMessage]
    (
      @Execution_Instance_GUID UNIQUEIDENTIFIER ,
      @Package_Start_DateTime DATETIME ,
      @Error_Message NVARCHAR(MAX) ,
      @DBMail_Profile_Name VARCHAR(100) ,
      @DBMail_Recipients VARCHAR(1024) ,
      @DBMail_Msg_Body_Format VARCHAR(20) ,
      @DBMail_Msg_Subject NVARCHAR(255) ,
      @DBMail_Msg_Importance VARCHAR(6) ,
      @Package_Name NVARCHAR(255) ,
      @Process_Name NVARCHAR(255) ,
      @Object_Name NVARCHAR(255)
    )
AS
    BEGIN
        IF OBJECT_ID('tempdb..#Temp') IS NOT NULL
            BEGIN
                DROP TABLE #Temp
            END	
		
        SELECT  
		COALESCE(@Package_Name,'Unknown')											AS PackageName,
		COALESCE(CAST(DB_NAME() AS VARCHAR (128)) , 'Unknown')						AS DatabaseName,
		COALESCE(CAST(@Execution_Instance_GUID AS VARCHAR (60)) , 'Unknown')		AS ExecutionInstanceGUID,
		COALESCE(CONVERT(VARCHAR (50),@Package_Start_DateTime, 120) , 'Unknown')	AS PackageStartDateTime,
		COALESCE(CONVERT(VARCHAR (50),SYSDATETIME(), 120), 'Unknown')				AS EventDateTime,
		COALESCE(@Object_Name,'Unknown')											AS ObjectName,
		COALESCE(@Process_Name, 'Unknown')											AS ErrorProcedure,
		COALESCE(@Error_Message , 'Unknown')										AS ErrorMessage
		INTO #Temp

		UPDATE #Temp
		SET ObjectName = 'Unknown'
		WHERE ObjectName = ''
 
        IF OBJECT_ID('tempdb..#Msg') IS NOT NULL
            BEGIN
                DROP TABLE [#Msg]
            END

        CREATE TABLE #Msg
            (
              [ID] [INT] IDENTITY(1, 1) NOT NULL ,
              [ProcessName] [VARCHAR](255) NULL ,
              [MsgText] VARCHAR(1024) NULL ,
            );      

        INSERT  INTO #Msg
                ( [ProcessName] ,
                  [MsgText] 	                  
                )
                SELECT  'usp_updateLogSSISErrorsDBObjects' AS ProcessName ,
                        ''+@@SERVERNAME+''+' instance metadata update process for package ' + ''+@Package_Name+''+' has encountered an error during processing' AS MsgText
                UNION ALL
				SELECT	'usp_checkRemoteSvrMySQLTablesSchemaChanges',
						'Table schema definition reconciliation failed between '+''+@@SERVERNAME+''+'and the remote server for package ' + ''+@Package_Name+''+''
				UNION ALL
                SELECT  'usp_checkRemoteSvrConnectionStatus' ,
                        'Connection from ' +''+@@SERVERNAME+''+ ' to a remote/linked server cannot be established at this time for package ' + ''+@Package_Name+''+''
                UNION ALL
                SELECT  'usp_checkRemoteSvrDBvsLocalDBRecCounts' ,
                        'Preliminary record count between a remote/linked server and staging database on ' +''+@@SERVERNAME+''+' server is different'
				UNION ALL
				SELECT	'usp_runCreateDropStagingIDXs',
						'Creating/dropping staging environment indexes procedure for package ' + ''+@Package_Name+''+' raised errors during execution on ' +''+@@SERVERNAME+''+ ' server'
				UNION ALL
                SELECT  'Non-specyfic SSIS Job Transformation Failure' ,
                        'SSIS package ' + ''+@Package_Name+''+' failed during execution on ' +''+@@SERVERNAME+''+ ' server'								
				UNION ALL
				SELECT 'usp_checkRemoteSvrDBvsLocalDBSyncErrors',
						'SSIS package ' + ''+@Package_Name+''+' finished executing; however, some errors were raised at runtime on ' +''+@@SERVERNAME+''+ ' server'
				UNION ALL
				SELECT 'usp_runUpdateStagingDBStatistics',
						'Statistics update step in ' + ''+@Package_Name+''+' package failed during execution on ' +''+@@SERVERNAME+''+ ' server'


        DECLARE @Heading NVARCHAR(1024) = ( SELECT  MsgText
                                            FROM    #Msg
                                            WHERE   ProcessName = @Process_Name
                                          )															
        DECLARE @tableHTML NVARCHAR(MAX)  
        SET @tableHTML = 			
			'<H3><span style="color: #ff0000;">' 
			+ '' + @Heading + '' 
			+ '&nbsp;<img src="http://tinymce.cachefly.net/4.1/plugins/emoticons/img/smiley-frown.gif" alt="frown" /></H3>' 
            + N'<p><span style="color: #333333;">Click on the <a class="btn" href="https://YourReportingPlatform">LINK</a> to view more detailed execution error logs or refer to the table below for info on the recent event(s).</p>'
            + N'<table border="1">' 
			+ N'<tr><th>Package Name </th>' 
			+ N'<th>Database Name</th>'
            + N'<th>Execution Instance GUID</th>' 
			+ N'<th>Package Start DateTime</th>'
            + N'<th>Event DateTime</th>' 
			+ N'<th>Affected Object Name</th>'
            + N'<th>Error Procedure/Process Name</th>'
            + N'<th>Error Message</th></tr><font size="2"'
            + CAST(( SELECT td = PackageName ,
                            '' ,
                            td = DatabaseName ,
                            '' ,
                            td = ExecutionInstanceGUID ,
                            '' ,
                            td = PackageStartDateTime ,
                            '' ,
                            td = EventDateTime ,
                            '' ,
                            td = ObjectName ,
                            '' ,
                            td = ErrorProcedure ,
                            '' ,
                            td = ErrorMessage ,
                            ''
                     FROM   #Temp
                   FOR
                     XML PATH('tr') ,
                         TYPE
                   ) AS NVARCHAR(MAX)) + N'</font></table>';		
				
        EXEC msdb.dbo.sp_send_dbmail 
			@profile_name	=	@DBMail_Profile_Name,
            @recipients		=	@DBMail_Recipients,
            @body_format	=	@DBMail_Msg_Body_Format,
            @subject		=	@DBMail_Msg_Subject, 
			@body			=	@tableHTML,
            @importance		=	@DBMail_Msg_Importance
		
		IF OBJECT_ID('tempdb..#Temp') IS NOT NULL
            BEGIN
                DROP TABLE #Temp
            END	
		IF OBJECT_ID('tempdb..#Msg') IS NOT NULL
            BEGIN
                DROP TABLE [#Msg]
            END
    END
GO


/*====================================================================================
STEP 2
Create a row merging function to concatenate multiple e-mail addresses into a single 
line for error notifications e-mail distribution.
====================================================================================*/
CREATE FUNCTION [dbo].[udf_getErrorEmailDistributionArray]
    (
      @servername VARCHAR(128) ,
      @taskname VARCHAR(128)
    )
RETURNS VARCHAR(1024)
AS
    BEGIN
        DECLARE @string VARCHAR(1024);
        SELECT  @string = ( SELECT  STUFF(( SELECT  ';' + [EmailAddress]
                                            FROM    [ControlDB].[dbo].[Ctrl_ErrorMsg_Notification_List]
                                            WHERE   IsActive = 1
                                                    AND ServerName + '\'
                                                    + InstanceName = @servername
                                                    AND TaskName = @taskname
                                          FOR
                                            XML PATH('')
                                          ), 1, 1, '') AS emailaddresses
                          );
        RETURN @string;
    END;
GO

A sample e-mail notification (providing database mail is enabled on the used SQL Server instance) can look as per the image below. Notice the embedded hyperlink pointing to a more detailed report which can be retrieved to analyse the error log entries on the AdminDBA database.

Data_Acquisition_Framework_Part1_Sample_Error_Email_Notification

You will notice that upon running the above scripts as well as creating AdminDBA database with all its related tables and stored procedures, the following objects will be available in the object explorer.

Data_Acquisition_Framework_Part1_DBsSchema_View

One final thing in this preliminary phase is to create a linked server between the source and target databases. Since our source data resides on the remote MySQL instance, the simplest way to connect to it is through a linked server connection. In this example I have downloaded the Oracle ODBC driver for windows environment and, after installation, configured it with the source database credentials.

Data_Acquisition_Framework_Part1_Linked_MySQL_Server_Conn_Details

Once the connection configuration has been completed and we could connect to the remote host, it is just a matter of creating a linked server connection from SQL Server and validating the setup by querying remote data.

Data_Acquisition_Framework_Part1_Linked_Server_Conn_Status

In Part 2 to this series I will dive into the nuts and bolts of how large tables data can be migrated across as well as some pre-acquisition activities e.g. checking remote server connection, dropping existing indexes etc.

Tags: , , ,

Using Python and Tableau to Scrape and Visualize Twitter User Data

March 8th, 2016 / No Comments » / by admin

Note: all the code and additional files for this post can be downloaded from my OneDrive folder HERE.

Introduction

What can I say…I like Twitter and I use it often to get interesting content links on data-related topics and find out what other people I follow are sharing. I covered Twitter-related stuff (sentiment analysis, Twitter timeline harvesting etc.) a number of times in the past, mainly HERE, HERE and HERE but since Twitter data is so rich and easy to work with (their API is pretty straightforward, especially coupled with Python wrappers such as Twython) and I have never gone down the path of visualising my findings I thought it would be a good idea to slap together a simple dashboard depicting some of the scraped data. To make it more fun and to make the dashboard look more polished I also analysed the sentiment of each tweet and geocoded my followers resulting in the Tableau dashboard as per below (click on image to expand).

Twitter_Tableau_Dashboard_Final

In order to access Twitter data through a Python script first we will need to register a new Twitter app and download necessary Python libraries (assuming you already have Python installed). Registering the app can be done HERE, where upon filling out some details you should be issued with an API Key (Consumer Key) and API Secret (Consumer Secret). For this demo I will be authenticating into Twitter using OAuth2 i.e. application authentication. With application-only authentication you don’t have the context of an authenticated user and this means that any request to API for endpoints that require user context, such as posting tweets, will not work. However, the set of endpoints that will still be available can have a higher rate limit thus allowing for more frequent querying and data acquisitions (you can find more details on how app authentication works HERE). The API keys are stored in a configuration file, in this case called ‘params.cfg’, which is stored in the same directory as our SQLite database and Python script file. When it comes to Python libraries required, I have chosen the following for this demo, some of which go beyond ‘batteries included’ Python mantra so a quick ‘pip install…’ should do the job:

  • Twython – Python wrapper for Twitter API
  • Configparser – Python configuration file (storing API Secret and API Key values) parser
  • Sqlite3 – relational database for data storage
  • Requests – HTTP library (here used for sentiment querying)
  • NLTK – natural language/text processing library
  • tqdm – terminal progress meter
  • Geopy – Python library for popular geocoding services

Python Code

Let’s go through the code (full script as well as all other files can be downloaded from my OneDrive folder HERE) and break it down into logical sections describing different functions. For this demo I wanted my script to do the following:

  • Create SQLite database if it does not exist with all the underlying schema for storing user timeline, followers geo-located details and most commonly used words
  • Acquire selected user timeline tweets (maximum number allowed by Twitter at this time is 3200) and insert them into the database table
  • Assign sentiment value i.e. positive, negative or neutral by querying THIS API (maximum number of API calls allowed is 45,000/month) and update the table storing user timeline tweets with the returned values
  • Using NLTK, ‘clean up’ tweets e.g. removing stop words, lemmitize words etc., remove some unwanted characters and hyper links and finally insert the most commonly occurring words into a table
  • Acquire user followers’ data and where possible geocode their location using MapQuest API

To do all this, firstly, let’s create our configuration file called ‘params.cfg’ which will store our API keys. Given that most commercially available APIs for sentiment analysis or geocoding are not free or at the very least require you to sign up to make the API key available, this would also be a good place to store those. For this demo we will be using free services so the only values saved in the config file are the secret key and consumer key as per below.

Twitter_Tableau_Dashboard_Dummy_API_Keys_Example

Now we can start building up our script, importing necessary Python modules, declaring variables and their values and referencing our configuration file data as per the snippet below (the highlighted line needs to be populated with the user name referencing the account timeline we’re trying to scrape).

#import relevant libraries and set up variables' values
import twython
import configparser
import sqlite3
import os
import platform
import re
import requests
import sys
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
import time
from collections import Counter
from tqdm import tqdm
from geopy.geocoders import GoogleV3
from geopy.exc import GeocoderQuotaExceeded

MaxTweetsAPI = 3200
StatusesCount = 200
WordCloudTweetNo = 40
IncludeRts = 1
ExcludeReplies = 0
SkipStatus = 1
ScreenName = 'user twitter handle' #e.g. @bicortex
SentimentURL = 'http://text-processing.com/api/sentiment/'
config = configparser.ConfigParser()
config.read('params.cfg')
AppKey = config.get('TwitterAuth','AppKey')
AppSecret = config.get('TwitterAuth','AppSecret')

Next, let’s set up our SQLite database and its schema. Depending on whether I’m at work or using my home computer I tend to move between Windows and Linux so you may want to adjust the first IF/ELSE statement to reflect your environment path and system (highlighted sections).

#Set up database objects' schema
if platform.system() == 'Windows':
    db_location = os.path.normpath('Your_Windows_Directory_Path/twitterDB.db') #e.g. 'C:/Twitter_Scraping_Project/twitterDB.db'
else:
    db_location = r'/Your_Linux_Directory_Path/twitterDB.db' #e.g. '/home/username/Twitter_Scraping_Project/twitterDB.db'

objectsCreate = {'UserTimeline':
                 'CREATE TABLE IF NOT EXISTS UserTimeline ('
                 'user_id int, '
                 'user_name text, '
                 'screen_name  text, '
                 'user_description text, '
                 'user_location text, '
                 'user_url text, '
                 'user_created_datetime text,'
                 'user_language text ,'
                 'user_timezone text, '
                 'user_utc_offset real,'
                 'user_friends_count real,'
                 'user_followers_count real,'
                 'user_statuses_count real,'
                 'tweet_id int,'
                 'tweet_id_str text,'
                 'tweet_text text,'
                 'tweet_created_timestamp text,'
                 'tweet_probability_sentiment_positive real,'
                 'tweet_probability_sentiment_neutral real,'
                 'tweet_probability_sentiment_negative real,'
                 'tweet_sentiment text, '
                 'PRIMARY KEY(tweet_id, user_id))',

                 'FollowersGeoData':
                 'CREATE TABLE IF NOT EXISTS FollowersGeoData ('
                 'follower_id int,'
                 'follower_name text,'
                 'follower_location text,'
                 'location_latitude real,'
                 'location_longitude real,'
                 'PRIMARY KEY (follower_id))',

                 'WordsCount':
                 'CREATE TABLE IF NOT EXISTS WordsCount ('
                 'word text,'
                 'frequency int)'}

#create database file and schema using the scripts above
db_is_new = not os.path.exists(db_location)
with sqlite3.connect(db_location) as conn:
    if db_is_new:
        print("Creating database schema on " + db_location + " database...\n")
        for t in objectsCreate.items():
            try:
                conn.executescript(t[1])
            except sqlite3.OperationalError as e:
                print (e)
                conn.rollback()
                sys.exit(1)
            else:
                conn.commit()
    else:
        print('Database already exists, bailing out...')

UserTimelineIDs = []
cur = 'SELECT DISTINCT tweet_ID from UserTimeline'
data = conn.execute(cur).fetchall()
for u in data:
    UserTimelineIDs.extend(u)

UserFollowerIDs = []
cur = 'SELECT DISTINCT follower_id from FollowersGeoData'
data = conn.execute(cur).fetchall()
for f in data:
    UserFollowerIDs.extend(f)

Next, we will define a simple function to check for Twitter API rate limit. Twitter limits the number of calls we can make to a specific API endpoint e.g. at the time of writing this post user timeline endpoint has a limit of 300 calls in a 15 minutes window when using with the application authentication and 180 calls when used with the user authentication. To allow our script to continue without Twython complaining of us exceeding the allowed threshold we want to make sure that once we reach this rate limit, the script will pause the execution for a predefined amount of time. Twitter’s ‘application/rate_limit_status’ API endpoint provides us with rate limits defined for each endpoint, the remaining number of calls available as well as the expiration time in epoch time. Using those three values we can allow for the script to sleep until the reset windows has been reset and resume scraping without completely stopping the process. Interestingly enough, ‘application/rate_limit_status’ API endpoint is also rate-limited to 180 calls/15 minute window so you will notice ‘appstatus’ variable also keeping this threshold in check.

#check Twitter API calls limit and pause execution to reset the limit if required
def checkRateLimit(limittypecheck):
    appstatus = {'remaining':1}
    while True:
        if appstatus['remaining'] &amp;gt; 0:
            twitter = twython.Twython(AppKey, AppSecret, oauth_version=2)
            ACCESS_TOKEN = twitter.obtain_access_token()
            twitter = twython.Twython(AppKey, access_token=ACCESS_TOKEN)
            status = twitter.get_application_rate_limit_status(resources = ['statuses', 'application', 'followers'])
            appstatus = status['resources']['application']['/application/rate_limit_status']
            if limittypecheck=='usertimeline':
                usertimelinestatus = status['resources']['statuses']['/statuses/user_timeline']
                if usertimelinestatus['remaining'] == 0:
                    wait = max(usertimelinestatus['reset'] - time.time(), 0) + 1  # addding 1 second pad
                    time.sleep(wait)
                else:
                    return
            if limittypecheck=='followers':
                userfollowersstatus = status['resources']['followers']['/followers/list']
                if userfollowersstatus['remaining'] == 0:
                    wait = max(userfollowersstatus['reset'] - time.time(), 0) + 1  # addding 1 second pad
                    time.sleep(wait)
                else:
                    return
        else :
            wait = max(appstatus['reset'] - time.time(), 0) + 1
            time.sleep(wait)

Now that we have the above function in place we can start scraping Twitter feeds for particular user. The below function queries ‘get_user_timeline’ endpoint for a range of timeline attributes, most important being ‘text’ – the actual tweet string. As Twitter returns a single page of data with the number of tweets defined by the ‘StatusesCount’ variable and we require either the total count of tweets or 3200 of them if the user whose timeline we’re querying has posted more than the 3200 statuses rate limit, we will need to implement paging mechanism. By means of using an API parameter called ‘max_id’ which returns the results with an ID less than (that is, older than) or equal to the specified ID we can create a cursor to iterate through multiple pages, eventually getting to the total tweet count or the 3200 rate limit (whichever’s lower). Finally, we insert all tweets and corresponding attributes into the database.

#grab user timeline twitter feed for the profile selected and store them in a table
def getUserTimelineFeeds(StatusesCount, MaxTweetsAPI, ScreenName, IncludeRts, ExcludeReplies, AppKey, AppSecret):
    #Pass Twitter API and database credentials/config parameters
    twitter = twython.Twython(AppKey, AppSecret, oauth_version=2)
    try:
        ACCESS_TOKEN = twitter.obtain_access_token()
    except twython.TwythonAuthError as e:
        print (e)
        sys.exit(1)
    else:
        try:
            twitter = twython.Twython(AppKey, access_token=ACCESS_TOKEN)
            print('Acquiring tweeter feed for user "{0}"...'.format(ScreenName))
            params = {'count': StatusesCount, 'screen_name': ScreenName, 'include_rts': IncludeRts,'exclude_replies': ExcludeReplies}
            AllTweets = []
            checkRateLimit(limittypecheck='usertimeline')
            NewTweets = twitter.get_user_timeline(**params)
            if NewTweets is None:
                print('No user timeline tweets found for "{0}" account, exiting now...'.format(ScreenName))
                sys.exit()
            else:
                ProfileTotalTweets = [tweet['user']['statuses_count'] for tweet in NewTweets][0]
                if ProfileTotalTweets &amp;gt; MaxTweetsAPI:
                    TweetsToProcess = MaxTweetsAPI
                else:
                    TweetsToProcess = ProfileTotalTweets
                oldest = NewTweets[0]['id']
                progressbar = tqdm(total=TweetsToProcess, leave=1)
                while len(NewTweets) &amp;gt; 0:
                    checkRateLimit(limittypecheck='usertimeline')
                    NewTweets = twitter.get_user_timeline(**params, max_id=oldest)
                    AllTweets.extend(NewTweets)
                    oldest = AllTweets[-1]['id'] - 1
                    if len(NewTweets)!=0:
                        progressbar.update(len(NewTweets))
                progressbar.close()

                AllTweets = [tweet for tweet in AllTweets if tweet['id'] not in UserTimelineIDs]
                for tweet in AllTweets:
                    conn.execute("INSERT OR IGNORE INTO UserTimeline "
                     "(user_id,"
                     "user_name,"
                     "screen_name,"
                     "user_description,"
                     "user_location,"
                     "user_url,"
                     "user_created_datetime,"
                     "user_language,"
                     "user_timezone,"
                     "user_utc_offset,"
                     "user_friends_count,"
                     "user_followers_count,"
                     "user_statuses_count,"
                     "tweet_id,"
                     "tweet_id_str,"
                     "tweet_text,"
                     "tweet_created_timestamp) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",(
                    tweet['user']['id'],
                    tweet['user']['name'],
                    tweet['user']['screen_name'],
                    tweet['user']['description'],
                    tweet['user']['location'],
                    tweet['user']['url'],
                    time.strftime('%Y-%m-%d %H:%M:%S', time.strptime(tweet['user']['created_at'],'%a %b %d %H:%M:%S +0000 %Y')),
                    tweet['user']['lang'],
                    tweet['user']['time_zone'],
                    tweet['user']['utc_offset'],
                    tweet['user']['friends_count'],
                    tweet['user']['followers_count'],
                    tweet['user']['statuses_count'],
                    tweet['id'],
                    tweet['id_str'],
                    str(tweet['text'].replace("\n","")),
                    time.strftime('%Y-%m-%d %H:%M:%S', time.strptime(tweet['created_at'],'%a %b %d %H:%M:%S +0000 %Y'))))
                conn.commit()
        except Exception as e:
            print(e)
            sys.exit(1)

Great! Now that we have the user timeline table populated we can proceed with assigning sentiment to each individual record. I used a free service which allows for 45,000 API calls a month. Since there are no batch POST requests available and every single record has to be queried individually we also exclude the ones which have already been assigned the sentiment. Also, the function below takes care of removing certain characters and hyper links as these can interfere with the classification process.

#assign sentiment to individual tweets
def getSentiment(SentimentURL):
        cur = ''' SELECT tweet_id, tweet_text, count(*) as NoSentimentRecCount
                  FROM UserTimeline
                  WHERE tweet_sentiment IS NULL
                  GROUP BY tweet_id, tweet_text '''
        data = conn.execute(cur).fetchone()
        if data is None:
            print('Sentiment already assigned to relevant records or table is empty, bailing out...')
        else:
            print('Assigning sentiment to selected tweets...')
            data = conn.execute(cur)
            payload = {'text':'tweet'}
            for t in tqdm(data.fetchall(),leave=1):
                id = t[0]
                payload['text'] = t[1]
                #concatnate if tweet is on multiple lines
                payload['text'] = str(payload['text'].replace("\n", ""))
                #remove http:// URL shortening links
                payload['text'] = re.sub(r'http://[\w.]+/+[\w.]+', "", payload['text'], re.IGNORECASE)
                #remove https:// URL shortening links
                payload['text'] = re.sub(r'https://[\w.]+/+[\w.]+', "", payload['text'], re.IGNORECASE)
                #remove certain characters
                payload['text'] = re.sub('[@#\[\]\'"$.;{}~`&amp;lt;&amp;gt;:%&amp;amp;^*()-?_!,+=]', "", payload['text'])
                #print(payload['text'])
                try:
                    post=requests.post(SentimentURL, data=payload)
                    response = post.json()
                    conn.execute("UPDATE UserTimeline "
                                    "SET tweet_probability_sentiment_positive = ?, "
                                    "tweet_probability_sentiment_neutral = ?, "
                                    "tweet_probability_sentiment_negative = ?, "
                                    "tweet_sentiment = ? WHERE tweet_id = ?",
                                    (response['probability']['neg'],
                                    response['probability']['neutral'],
                                    response['probability']['pos'],
                                    response['label'], id))
                    conn.commit()
                except Exception as e:
                    print (e)
                    sys.exit(1)

Next, we can look at the word count function. This code takes advantage of the NLTK library to tokenize the tweets, remove certain stop words, lemmatize the tweets, remove unwanted characters and finally work out the top counts for individual words. The output gets inserted into the separate table in the same database simply as the top occurring word and frequency attributes.

#get most commonly occurring (40) words in all stored tweets
def getWordCounts(WordCloudTweetNo):
    print('Fetching the most commonly used {0} words in the "{1}" feed...'.format(WordCloudTweetNo, ScreenName))
    cur = "DELETE FROM WordsCount;"
    conn.execute(cur)
    conn.commit()
    cur = 'SELECT tweet_text FROM UserTimeline'
    data = conn.execute(cur)
    StopList = stopwords.words('english')
    Lem = WordNetLemmatizer()
    AllWords = ''
    for w in tqdm(data.fetchall(),leave=1):
            try:
                #remove certain characters and strings
                CleanWordList = re.sub(r'http://[\w.]+/+[\w.]+', "", w[0], re.IGNORECASE)
                CleanWordList = re.sub(r'https://[\w.]+/+[\w.]+', "", CleanWordList, re.IGNORECASE)
                CleanWordList = re.sub(r'[@#\[\]\'"$.;{}~`&amp;lt;&amp;gt;:%&amp;amp;^*()-?_!,+=]', "", CleanWordList)
                #tokenize and convert to lower case
                CleanWordList = [words.lower() for words in word_tokenize(CleanWordList) if words not in StopList]
                #lemmatize words
                CleanWordList = [Lem.lemmatize(word) for word in CleanWordList]
                #join words
                CleanWordList =' '.join(CleanWordList)
                AllWords += CleanWordList
            except Exception as e:
                print (e)
                sys.exit(e)
    if AllWords is not None:
        words = [word for word in AllWords.split()]
        c = Counter(words)
        for word, count in c.most_common(WordCloudTweetNo):
            conn.execute("INSERT INTO WordsCount (word, frequency) VALUES (?,?)", (word, count))
            conn.commit()

Next, we will source the data on the user’s followers, again using the cursor to iterate through each page (excluding the ones who did not list their location), insert it into the database and finally geo-code their location using Google API which will attempt assigning latitude and longitude values where possible and update the database table accordingly. Caution though, since Google Geo API is limited to processing 2500 calls free of charge, if you have run up more than this limit of geo-located followers you may either want to pay to have this limit expanded or geocode the data in daily batches.

#geocode followers where geolocation data stored as part of followers' profiles
def GetFollowersGeoData(StatusesCount, ScreenName, SkipStatus, AppKey, AppSecret):
    print('Acquiring followers for Twitter handle "{0}"...'.format(ScreenName))
    twitter = twython.Twython(AppKey, AppSecret, oauth_version=2)
    ACCESS_TOKEN = twitter.obtain_access_token()
    twitter = twython.Twython(AppKey, access_token=ACCESS_TOKEN)
    params = {'count': StatusesCount, 'screen_name': ScreenName, 'skip_status':1}
    checkRateLimit(limittypecheck='usertimeline')
    TotalFollowersCount = twitter.get_user_timeline(**params)
    TotalFollowersCount = [tweet['user']['followers_count'] for tweet in TotalFollowersCount][0]

    progressbar=tqdm(total=TotalFollowersCount, leave=1)
    Cursor = -1
    while Cursor != 0:
        checkRateLimit(limittypecheck='followers')
        NewGeoEnabledUsers = twitter.get_followers_list(**params, cursor=Cursor)
        Cursor = NewGeoEnabledUsers['next_cursor']
        progressbar.update(len(NewGeoEnabledUsers['users']))
        NewGeoEnabledUsers = [[user['id'], user['screen_name'], user['location']] for user in NewGeoEnabledUsers['users'] if user['location'] != '']
        #AllGeoEnabledUsers.extend(NewGeoEnabledUsers)
        for user in NewGeoEnabledUsers:
            if user[0] not in UserFollowerIDs:
                conn.execute("INSERT OR IGNORE INTO FollowersGeoData ("
                             "follower_id,"
                             "follower_name,"
                             "follower_location)"
                             "VALUES (?,?,?)",
                            (user[0], user[1], user[2]))
                conn.commit()
    progressbar.close()
    print('')
    print('Geo-coding followers location where location variable provided in the user profile...')
    geo = GoogleV3(timeout=5)
    cur = 'SELECT follower_id, follower_location FROM FollowersGeoData WHERE location_latitude IS NULL OR location_longitude IS NULL'
    data = conn.execute(cur)
    for location in tqdm(data.fetchall(), leave=1):
        try:
            try:
                followerid = location[0]
                #print(location[1])
                geoparams = geo.geocode(location[1])
            except GeocoderQuotaExceeded as e:
                print(e)
                return
                #print(geoparams)
            else:
                if geoparams is None:
                    pass
                else:
                    latitude = geoparams.latitude
                    longitude = geoparams.longitude
                    conn.execute("UPDATE FollowersGeoData "
                                "SET location_latitude = ?,"
                                "location_longitude = ?"
                                "WHERE follower_id = ?",
                                (latitude,longitude,followerid))
                    conn.commit()
        except Exception as e:
            print("Error: geocode failed on input %s with message %s"%(location[2], e))
            continue

Finally, running the ‘main’ function will execute the above in a sequential order, displaying progress bar for all API related calls as per below.

#run all functions
def main():
    getUserTimelineFeeds(StatusesCount, MaxTweetsAPI, ScreenName, IncludeRts, ExcludeReplies, AppKey, AppSecret)
    getSentiment(SentimentURL)
    getWordCounts(WordCloudTweetNo)
    GetFollowersGeoData(StatusesCount, ScreenName, SkipStatus, AppKey, AppSecret)
    conn.close()

if __name__ == "__main__":
    main()

Twitter_Tableau_Dashboard_Python_Code_Execution_Terminal_Progress

Visualizing Data in Tableau

Now that we have our three database tables populated, let create a simple dashboard in Tableau. Since we implemented our data storage layer in SQLite, first we need to download SQLite driver to enable it to talk to Tableau. Once installed (I downloaded mine from HERE), you should able to select it from options available under ‘Other Databases (ODBC)’ data sources.

Twitter_Tableau_Dashboard_SQLite_Data_Source_Selection

Once the connection has been established we can finally commence building out our dashboard but before we begin, a quick word of caution – if you want to turn this demo into a more robust solution, SQLite database back-end, even though good enough for this proof of concept, may not play well with Tableau. I came across a few issues with data types support and since SQLite has not been designed with a server/client architecture in mind, chances are that Tableau has a better out-of-the-box compatibility when coupled with RDBMSs which it natively supports e.g. MySQL, PostgeSQL, MSSQL etc. Below is a notification pop-up warning of the lack of support for some features when using SQLite ODBC connection.

Twitter_Tableau_Dashboard_ODBC_SQLite_Support

Continuing on, in Tableau, I have created four different sheets, two of them pointing to the same data source (as they rely on the same UserTimeline table) and the remaining two pointing to two other tables i.e. WordsCount and FollowersGeoData. The SQL queries used are mostly just straightforward SELECT statements out of their respective tables, with the one driving followers/tweets/friends counts structured to account for changes captured in the last seven days as per the code below.

SELECT
friends_count as counts,
((friends_count - last_weeks_friends_count)/last_weeks_friends_count)  as percentage_change, 'Friends' as category
FROM
(SELECT 
MAX(user_friends_count) as friends_count,
MAX(user_followers_count) as followers_count,
MAX(user_statuses_count) as statuses_count,
(SELECT user_friends_count FROM UserTimeLine
		WHERE tweet_created_timestamp &amp;lt; date('now','-7 days')
		ORDER BY ROWID ASC LIMIT 1
) as last_weeks_friends_count,
(SELECT user_followers_count FROM UserTimeLine
		WHERE tweet_created_timestamp &amp;lt; date('now','-7 days')
		ORDER BY ROWID ASC LIMIT 1
) as last_weeks_followers_count,
(SELECT user_statuses_count FROM UserTimeLine
		WHERE tweet_created_timestamp &amp;lt; date('now','-7 days')
		ORDER BY ROWID ASC LIMIT 1
) as last_weeks_statuses_count
FROM UserTimeLine) 
UNION ALL
SELECT
followers_count as counts,
((followers_count - last_weeks_followers_count)/last_weeks_followers_count) as percentage_change, 'Followers' as category
FROM
(SELECT 
MAX(user_friends_count) as friends_count,
MAX(user_followers_count) as followers_count,
MAX(user_statuses_count) as statuses_count,
(SELECT user_friends_count FROM UserTimeLine
		WHERE tweet_created_timestamp &amp;lt; date('now','-7 days')
		ORDER BY ROWID ASC LIMIT 1
) as last_weeks_friends_count,
(SELECT user_followers_count FROM UserTimeLine
		WHERE tweet_created_timestamp &amp;lt; date('now','-7 days')
		ORDER BY ROWID ASC LIMIT 1
) as last_weeks_followers_count,
(SELECT user_statuses_count FROM UserTimeLine
		WHERE tweet_created_timestamp &amp;lt; date('now','-7 days')
		ORDER BY ROWID ASC LIMIT 1
) as last_weeks_statuses_count
FROM UserTimeLine) UNION ALL
SELECT
statuses_count as counts,
((statuses_count - last_weeks_statuses_count)/last_weeks_statuses_count)  as percentage_change, 'Tweets' as category
FROM
(SELECT 
MAX(user_friends_count) as friends_count,
MAX(user_followers_count) as followers_count,
MAX(user_statuses_count) as statuses_count,
(SELECT user_friends_count FROM UserTimeLine
		WHERE tweet_created_timestamp &amp;lt; date('now','-7 days')
		ORDER BY ROWID ASC LIMIT 1
) as last_weeks_friends_count,
(SELECT user_followers_count FROM UserTimeLine
		WHERE tweet_created_timestamp &amp;lt; date('now','-7 days')
		ORDER BY ROWID ASC LIMIT 1
) as last_weeks_followers_count,
(SELECT user_statuses_count FROM UserTimeLine
		WHERE tweet_created_timestamp &amp;lt; date('now','-7 days')
		ORDER BY ROWID ASC LIMIT 1
) as last_weeks_statuses_count
FROM UserTimeLine)

All there is left to do is to position each sheets’ visualisation on the dashboard and vuala…the complete dashboard looks like this (click on image to expand).

Twitter_Tableau_Dashboard_Final_TableauDesktop

Again, all the code, database and Tableau solution files can be downloaded from my OneDrive folder HERE. Since I have removed the database connection details from all data sources, if you’re thinking of replicating this solution on your own machine, all you need to do is point each data source to SQLite database location as per the video below.

Tags: , , , ,