Using AWS Polly And IBM Watson Text-To-Speech And Tone Analyser Artificial Intelligence Services To Read and Analyse Clinical Chat Data (Part 1)

August 23rd, 2017 / No Comments » / by admin

Note: Part two to this series can be found HERE

Ever since Amazon unveiled Polly text-to-speech cloud service I have wanted to create something that would allow this service to be utilised in a real-world scenario. Then it dawned on me that since my current employer (healthcare sector) is operating a service where people can connect with clinical staff and seek help for mental health issue through a chat messaging application (among other means), I can convert this text data into audible format and make it ‘playable’.

Polly is a very interesting example of the advances made in Artificial Intelligence and its democratisation in recent years. Amazon defines Polly as ‘AI service that uses advanced deep learning technologies to synthesise speech that sounds like a human voice. Amazon Polly includes dozens of lifelike voices across a variety of languages, so you can select the ideal voice and build speech-enabled applications that work in many different countries. Amazon Polly provides an API that enables you to quickly integrate speech synthesis into your application. You simply send the text you want converted into speech to the Amazon Polly API, and Amazon Polly immediately returns the audio stream to your application so your application can begin streaming it directly or store it in a standard audio file format, such as MP3’.

Below if a short sample footage demonstrating the functionality of the GUI and AWS Polly API.

Since the API is quite simple and the pricing very easy on the wallet, I got stuck into building a little POC in Python. The solution described here is comprised of Microsoft SQL Server database containing one table with a sample data of a typical chat session between a clinician and a patient (dummy data) and a simple function which splits out chat text data into multiple records when the maximum threshold is reached. Polly imposes a limit on the size of the input text which can be up to 1500 billed characters (3000 total characters). Since some entries exceed this limit, this function allows you to break up strings longer then 1500 characters into multiple chunks/records based on full stop character occurrence (I figured it was the most natural way of doing it).

To explore Polly’s voices and the characteristic assigned to them I created two separate drop-down controls (one for patient and one for clinician). Amazon boasts that Polly currently supports a total of 47 male and female voices spread across 24 languages, with additional languages and voices on the roadmap. This app includes the following voices as part of this demo: Russell and Nicole (en-AU), Amy, Brian and Emma (en-GB), Raveena (en-IN), Ivy, Joanna, Joey, Justin, Kendra, Kimberly and Salli (en-US). In this first iteration I have also built an additional button into its main GUI labelled ‘Perform Tone Analysis’ which, for now, has no functionality but if you’re keen to follow along with this tutorial, in part two I will be demonstrating how to use IBM Watson machine learning API for linguistic tone analysis of the chat data. The following is a sample Python code (Python 3) generating basic GUI. I have also included the SQL code which produces dummy data and the function responsible for breaking long strings into smaller ‘chunks’.

import time
import io
import multiprocessing
import pyodbc
import boto3
import pygame
from contextlib import closing
import tkinter as tk
from tkinter import scrolledtext, ttk, messagebox


class ConnectionInfo:

    def __init__(self):
        self.inst_srv = tk.StringVar()
        self.inst_db = tk.StringVar()
        self.inst_login = tk.StringVar()
        self.inst_passwd = tk.StringVar()
        self.aws_access_key_id = tk.StringVar()
        self.aws_secret_access_key = tk.StringVar()
        self.ibm_username = tk.StringVar()
        self.ibm_passwd = tk.StringVar()
        self.use_aws_api = tk.IntVar()
        self.use_ibm_api = tk.IntVar()
        self.use_win_auth = tk.IntVar()
        self.session_id = tk.IntVar()
        self.clinician_voice = tk.StringVar()
        self.patient_voice = tk.StringVar()


class MsSqlDatabase:

    ODBC_DRIVER = '{ODBC Driver 13 for SQL Server}'

    def __init__(self, conn_info):
        self.conn_info = conn_info

    def connect(self):
        connection_string = ('DRIVER={driver};SERVER={server};DATABASE={db};'.format(
            driver=self.ODBC_DRIVER,
            server=self.conn_info.inst_srv.get(),
            db=self.conn_info.inst_db.get()))
        if self.conn_info.use_win_auth.get() == 1:
            connection_string = connection_string + 'Trusted_Connection=yes;'
        else:
            connection_string = connection_string + 'UID={uid};PWD={password};'.format(
                uid=self.conn_info.inst_login.get(),
                password=self.conn_info.inst_passwd.get())

        try:
            conn = pyodbc.connect(connection_string, timeout=1)
        except pyodbc.Error as err:
            conn = None
        return conn

    def get_session(self, conn):
        try:
            cursor = conn.cursor()
            cursor.execute(
                """SELECT UPPER(user_role), message_body FROM dbo.test_dialog t
                WHERE t.Session_ID = ? ORDER BY t.ID ASC""", self.conn_info.session_id.get())
            results = cursor.fetchall()
        except pyodbc.Error as err:
            results = None
        return results

    def get_messages(self, conn):
        try:
            cursor = conn.cursor()
            cursor.execute(
                """SELECT t.user_role, t.direction, LTRIM(RTRIM(f.RESULT)) AS message FROM dbo.test_dialog t
                CROSS APPLY dbo.tvf_getConversations (t.message_body, 50, '.') f WHERE t.session_id = ?
                ORDER BY t.id, f.id""", self.conn_info.session_id.get())
            results = cursor.fetchall()
        except pyodbc.Error as err:
            results = None
        return results


class AudioPlayer:

    def __init__(self, credentials, voices):
        self.credentials = credentials
        self.voices = voices

    def run(self, messages, voices, commands, status):
        status['code'] = 0
        status['message'] = 'OK'

        # connect to AWS
        try:
            polly_service = boto3.client(
                'polly',
                aws_access_key_id = self.credentials['aws_access_key'],
                aws_secret_access_key = self.credentials['aws_secret_key'],
                region_name = 'eu-west-1')
        except:
            polly_service = None

        if not polly_service:
            status['code'] = 1
            status['message'] = 'Cannot connect to polly service'
            return

        is_stopped = False
        is_paused = False
        pygame.mixer.init(channels=1, frequency=44100)
        for message in messages:
            print(message)

            try:
                polly_response = polly_service.synthesize_speech(
                    OutputFormat='ogg_vorbis',
                    Text=message[2],
                    TextType='text',
                    VoiceId=voices[message[0]])
            except:
                polly_response = None

            if not polly_response:
                status['code'] = 2
                status['message'] = 'Cannot get response from polly'
                break

            if "AudioStream" in polly_response:
                with closing(polly_response["AudioStream"]) as stream:
                    data = stream.read()
                    filelike = io.BytesIO(data)
                    sound = pygame.mixer.Sound(file=filelike)
                    sound.play()

                    while pygame.mixer.get_busy() or is_paused:
                        if not commands.empty():
                            command = commands.get()
                            if command == 'STOP':
                                sound.stop()
                                is_stopped = True
                                break
                            if command == 'PAUSE':
                                is_paused = not is_paused
                                if is_paused:
                                    sound.stop()
                                else:
                                    sound.play()
                        time.sleep(0.010)
            if is_stopped:
                break

class AppFrame(object):

    def __init__(self):
        self.root = tk.Tk()
        self.root.title('Polly Text-To-Speech GUI Prototype ver 1.0')
        self.root.resizable(width=False, height=False)

        self.conn_info = ConnectionInfo()

        self.connection_details_frame = ConnDetailsFrame(self.root, self)
        self.session_frame = SessionDetailsFrame(self.root, self)
        self.playback_frame = PlaybackDetailsFrame(self.root, self)

    def run(self):
        self.root.mainloop()


class ConnDetailsFrame(ttk.LabelFrame):

    def __init__(self, root, parent):
        super(ConnDetailsFrame, self).__init__(root, text='1. Connection Details')
        super(ConnDetailsFrame, self).grid(
            row=0, column=0, columnspan=3, sticky='W',
            padx=5, pady=5, ipadx=5, ipady=5
        )

        self.root = root
        self.parent = parent
        self.conn_info = parent.conn_info

        self.create_notebook()

    def create_notebook(self):
        self.tab_control = ttk.Notebook(self)
        self.create_frames()
        self.create_labels()
        self.create_entry()
        self.create_checkbuttons()

    def create_frames(self):
        self.tab_db = ttk.Frame(self.tab_control)
        self.tab_api = ttk.Frame(self.tab_control)
        self.tab_control.add(self.tab_db, text="Database Connection Details ")
        self.tab_control.add(self.tab_api, text="APIs Connection Details ")
        self.tab_control.grid(row=0, column=0, sticky='E', padx=5, pady=5)

    def create_labels(self):
        ttk.Label(self.tab_db, text="Server/Instance Name:").grid(row=0, column=0, sticky='E', padx=5, pady=(15, 5))
        ttk.Label(self.tab_db, text="Database Name:").grid(row=1, column=0, sticky='E', padx=5, pady=5)
        ttk.Label(self.tab_db, text="User Name:").grid(column=0, row=3, sticky="E", padx=5, pady=5)
        ttk.Label(self.tab_db, text="Password:").grid(column=0, row=4, sticky="E", padx=5, pady=(5, 10))

        ttk.Label(self.tab_api, text="AWS Access Key ID:").grid(column=0, row=1, sticky="E", padx=5, pady=(5, 5))
        ttk.Label(self.tab_api, text="AWS Secret Access Key:").grid(column=0, row=2, sticky="E", padx=5, pady=5)
        ttk.Label(self.tab_api, text="IBM Watson Username:").grid(column=0, row=4, sticky="E", padx=5, pady=(5, 5))
        ttk.Label(self.tab_api, text="IBM Watson Password:").grid(column=0, row=5, sticky="E", padx=5, pady=(5,15))
    
    def create_checkbuttons(self):
        check_use_win_auth = ttk.Checkbutton(self.tab_db, onvalue=1, offvalue=0,
                                             variable=self.conn_info.use_win_auth,
                                             text='Use Windows Authentication',
                                             command=self.on_use_win_auth_change)
        check_use_win_auth.grid(row=2, column=0, sticky='W', padx=15, pady=(15,5))  
        check_use_aws_api = ttk.Checkbutton(self.tab_api, onvalue=1, offvalue=0,
                                            variable=self.conn_info.use_aws_api, text='Use AWS Text-To-Speech API')
        check_use_aws_api.grid(row=0, column=0, sticky='W', padx=15, pady=(15,5)) 
        check_use_ibm_api = ttk.Checkbutton(self.tab_api, onvalue=1, offvalue=0,
                                            variable=self.conn_info.use_ibm_api, text='Use IBM Watson API')
        check_use_ibm_api.grid(row=3, column=0, sticky='W', padx=15, pady=(15,5))   

    def create_entry(self):
        entry_db_server_name = ttk.Entry(self.tab_db, width=60, textvariable=self.conn_info.inst_srv)
        entry_db_server_name.grid(row=0, column=1, sticky='W', padx=10, pady=(15, 5))
        entry_db_name = ttk.Entry(self.tab_db, width=60, textvariable=self.conn_info.inst_db)
        entry_db_name.grid(row=1, column=1, sticky='W', padx=10, pady=5)                                      
        self.entry_db_user_name = ttk.Entry(self.tab_db, width=60, textvariable=self.conn_info.inst_login)
        self.entry_db_user_name.grid(row=3, column=1, padx=10, pady=5)
        self.entry_db_password = ttk.Entry(self.tab_db, width=60, textvariable=self.conn_info.inst_passwd, show="*")
        self.entry_db_password.grid(row=4, column=1, padx=10, pady=(5, 10))

        entry_aws_access_key = ttk.Entry(self.tab_api, width=60,
                                         textvariable=self.conn_info.aws_access_key_id)
        entry_aws_access_key.grid(row=1, column=1, sticky='W', padx=10, pady=(5, 5))
        entry_aws_secret_key = ttk.Entry(self.tab_api, width=60,
                                         textvariable=self.conn_info.aws_secret_access_key)
        entry_aws_secret_key.grid(row=2, column=1, padx=5, pady=5)         
        entry_ibm_username = ttk.Entry(self.tab_api, width=60, state='disabled',
                                       textvariable=self.conn_info.ibm_username)
        entry_ibm_username.grid(row=4, column=1, padx=5, pady=5)     
        entry_ibm_password = ttk.Entry(self.tab_api, width=60, state='disabled',
                                       textvariable=self.conn_info.ibm_passwd,show="*")
        entry_ibm_password.grid(row=5, column=1, padx=5, pady=(5,15))

    def on_use_win_auth_change(self):
        if (self.conn_info.use_win_auth.get() == 1):
            self.entry_db_user_name.configure(state='disabled')
            self.entry_db_password.configure(state='disabled')
        else:
            self.entry_db_user_name.configure(state='normal')
            self.entry_db_password.configure(state='normal')


class SessionDetailsFrame(ttk.LabelFrame):

    def __init__(self, root, parent):
        super(SessionDetailsFrame, self).__init__(root, text='2. Session Details')
        super(SessionDetailsFrame, self).grid(row=1, column=0, sticky='NW', padx=5, pady=5, ipadx=5, ipady=5)

        self.parent = parent
        self.conn_info = parent.conn_info

        self.create_entries()
        self.create_buttons()
        self.create_scrolled_text()

    def create_entries(self):
        ttk.Entry(
            self, justify="center", width=18, font="Helvetica 18 bold",
            textvariable=self.conn_info.session_id).grid(row=1, column=2, padx=3, pady=5, sticky='W')

    def create_buttons(self):
        search_session_btn = ttk.Button(self, text="SEARCH SESSION ID", command=self.on_search_session_click)
        search_session_btn.grid(row=1, column=3, ipadx=8, ipady=6)

    def create_scrolled_text(self):
        self.dialog_st = scrolledtext.ScrolledText(self, width=45, height=8, wrap=tk.WORD)
        self.dialog_st.grid(column=2, row=2, padx=4, pady=4, columnspan=2, sticky='w')

        style = ttk.Style()
        style.configure("TButton", foreground="red")

    def on_search_session_click(self):
        db = MsSqlDatabase(self.conn_info)
        conn = db.connect()
        if conn:
            results = db.get_session(conn)
            if results:
                self.dialog_st.delete('1.0', tk.END)
                for role, message in results:
                    self.dialog_st.insert(tk.END, '{}:\n'.format(role), 'role')
                    self.dialog_st.insert(tk.END, '{}\n\n'.format(message), 'message')
                    self.dialog_st.tag_config('role', foreground='red', font="Courier 11 bold")
            else:
                tk.messagebox.showwarning(title="Warning", message="Nominated Session ID not found in the database!")
        else:
            tk.messagebox.showwarning(title="Warning", message="Cannot connect to database server")


class PlaybackDetailsFrame(ttk.LabelFrame):

    def __init__(self, root, parent):
        super(PlaybackDetailsFrame, self).__init__(root, text='3. Playback Details')
        super(PlaybackDetailsFrame, self).grid(row=1, column=1, sticky='WN', padx=5, pady=5, ipadx=5, ipady=5)

        self.root = root
        self.parent = parent
        self.conn_info = parent.conn_info

        self.create_labels()
        self.create_combobox()
        self.create_buttons()

        root.protocol('WM_DELETE_WINDOW', self.on_closing)

        self.process_manager = multiprocessing.Manager()
        self.player_process = None
        self.player_commands = None
        self.player_status = None


    def create_labels(self):
        l1 = ttk.Label(self, text="Clinician Voice:").grid(row=0, column=0, sticky='W', padx=5, pady=5)
        l2 = ttk.Label(self, text="Patient Voice:").grid(row=0, column=1, sticky='W', padx=5, pady=5)
        var1 = tk.StringVar(self.root)
        var2 = tk.StringVar(self.root)

    def create_combobox(self):
        clinician = ttk.Combobox(self, width=11, textvariable=self.conn_info.clinician_voice)
        clinician.grid(row=1, column=0, padx=5, pady=5, sticky='NW')
        clinician['values'] = (
            'Russell',
            'Nicole',
            'Amy',
            'Brian',
            'Emma',
            'Raveena',
            'Ivy',
            'Joanna',
            'Joey',
            'Justin',
            'Kendra',
            'Kimberly',
            'Salli'
        )
        clinician.current(0)

        patient = ttk.Combobox(self, width=11, textvariable=self.conn_info.patient_voice)
        patient.grid(row=1, column=1, padx=(5, 0), pady=5, sticky='NW')
        patient['values'] = (
            'Nicole',
            'Russell',
            'Amy',
            'Brian',
            'Emma',
            'Raveena',
            'Ivy',
            'Joanna',
            'Joey',
            'Justin',
            'Kendra',
            'Kimberly',
            'Salli')
        patient.current(0)

    def create_buttons(self):
        play_session_btn = ttk.Button(self, text="PLAY", width=25, command=self.on_play_session_click)
        play_session_btn.grid(row=2, column=0, columnspan=2, padx=(10, 2), pady=(25, 5), sticky='WE')

        pause_session_btn = ttk.Button(self, text="PAUSE", width=25, command=self.on_pause_session_click)
        pause_session_btn.grid(row=3, column=0, columnspan=2, padx=(10, 2), pady=5, sticky='WE')

        stop_session_btn = ttk.Button(self, text="STOP", width=25, command=self.on_stop_session_click)
        stop_session_btn.grid(row=4, column=0, columnspan=2, padx=(10, 2), pady=5, sticky='WE')

    def on_play_session_click(self):
        if self.player_process:
            if self.player_process.is_alive():
                self.player_commands.put('STOP')

        db = MsSqlDatabase(self.conn_info)
        db_conn = db.connect()
        if db_conn:
            messages = db.get_messages(db_conn)
            if messages:
                credentials = {
                    'aws_access_key': self.conn_info.aws_access_key_id.get(),
                    'aws_secret_key': self.conn_info.aws_secret_access_key.get()
                }
                voices = {
                    'clinician': self.conn_info.clinician_voice.get(),
                    'client': self.conn_info.patient_voice.get()
                }
                player = AudioPlayer(credentials, voices)

                self.player_commands = self.process_manager.Queue()
                self.player_status = self.process_manager.dict()
                self.player_process = multiprocessing.Process(
                    target=player.run,
                    args=(messages, voices, self.player_commands, self.player_status))
                self.player_process.start()
                self.root.after(500, lambda: self.check_player_status(self.player_process, self.player_status))
            else:
                tk.messagebox.showwarning(title="Warning", message="Nominated Session ID not found in the database!")
        else:
            tk.messagebox.showwarning(title="Warning", message="Cannot connect to database server")

    def on_pause_session_click(self):
        if self.player_commands:
            self.player_commands.put("PAUSE")

    def on_stop_session_click(self):
        if self.player_commands:
            self.player_commands.put("STOP")

    def on_closing(self):
        if self.player_process:
            if self.player_process.is_alive():
                self.player_commands.put('STOP')
            self.player_process.join()

        self.root.destroy()

    def check_player_status(self, player_process, player_status):
        if not player_process.is_alive():
            print('Player status: {}, {}'.format(player_status['code'], player_status['message']))
            if player_status['code'] != 0:
                tk.messagebox.showwarning(title="Warning", message=player_status['message'])
        else:
            self.root.after(500, lambda: self.check_player_status(player_process, player_status))

if __name__ == "__main__":
    app = AppFrame()
    app.run()
/*==============================================================================
STEP 1
Create SampleDB databases on the local instance
==============================================================================*/
USE [master];
GO
IF EXISTS (SELECT name FROM sys.databases WHERE name = N'SampleDB')
BEGIN
    -- Close connections to the StagingDB database
    ALTER DATABASE SampleDB SET SINGLE_USER WITH ROLLBACK IMMEDIATE;
    DROP DATABASE SampleDB;
END;
GO
-- Create SampleDB database and log files
CREATE DATABASE SampleDB
ON PRIMARY
       (
           NAME = N'SampleDB',
           FILENAME = N'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQL2016DEV\MSSQL\DATA\SampleDB.mdf',
           SIZE = 10MB,
           MAXSIZE = 1GB,
           FILEGROWTH = 10MB
       )
LOG ON
    (
        NAME = N'SampleDB_log',
        FILENAME = N'C:\Program Files\Microsoft SQL Server\MSSQL13.MSSQL2016DEV\MSSQL\DATA\SampleDB_log.LDF',
        SIZE = 1MB,
        MAXSIZE = 1GB,
        FILEGROWTH = 10MB
    );
GO
--Assign database ownership to login SA
EXEC SampleDB.dbo.sp_changedbowner @loginame = N'SA', @map = false;
GO
--Change the recovery model to BULK_LOGGED
ALTER DATABASE SampleDB SET RECOVERY BULK_LOGGED;
GO

/*==============================================================================
STEP 2
Create test_dialog table on SampleDB database and insert few dummy records
imitating clinitian and patient conversation in a standard chat session
==============================================================================*/
USE SampleDB;
GO
IF OBJECT_ID('test_dialog') IS NOT NULL
BEGIN
    DROP TABLE test_dialog;
END;

CREATE TABLE test_dialog
(
    id INT IDENTITY(1, 1) NOT NULL,
    session_id INT NOT NULL,
    user_id INT NOT NULL,
    user_role VARCHAR(10) NOT NULL,
    message_body NVARCHAR(MAX) NOT NULL,
    date_created DATETIME NOT NULL,
    direction VARCHAR(3) NOT NULL
);

INSERT INTO test_dialog
(
    session_id,
    user_id,
    user_role,
    message_body,
    date_created,
    direction
)
SELECT 1,
       44411,
       'client',
       'Hi there. I want to talk to some one. Are you guys available for a quick chat?',
       SYSDATETIME(),
       'In'
UNION ALL
SELECT 1,
       32,
       'clinician',
       'Hi. My name is Nicole. How can I help you today?',
       DATEADD(SECOND, 9, SYSDATETIME()),
       'Out'
UNION ALL
SELECT 1,
       44411,
       'client',
       'I''m sad and depressed and need help with how I''m feeling at the moment. I was bullied at school today and feel like I don''t want to go back there again. I''m not coping well when my peers are making fun of me because I''m slightly overweight.',
       DATEADD(SECOND, 21, SYSDATETIME()),
       'In'
UNION ALL
SELECT 1,
       32,
       'clinician',
       'It sounds like you are going through some tough times at the moment. It''s OK, I''m here to help you. Can you tell me more on what''s been happening at school? Do you have any friends who you can talk to and who make you feel good about yourself when you''re feeling down?',
       DATEADD(SECOND, 51, SYSDATETIME()),
       'Out'
UNION ALL
SELECT 1,
       44411,
       'client',
       'Well, I do have a couple but I don''t want to sound like it is getting to me and I''m not coping very well. Also, my parents are quite supportive but I feel like they don''t understand what it''s like to be peer-pressured.',
       DATEADD(SECOND, 128, SYSDATETIME()),
       'In'
UNION ALL
SELECT 2,
       5989,
       'client',
       'Hello, I was given the name of this service from a friend of mine who used it before and said that you guys can help me with what''s happening in my life at the moment. Can I talk to you?',
       DATEADD(MINUTE, 2, SYSDATETIME()),
       'In'
UNION ALL
SELECT 2,
       19,
       'clinician',
       'Hi there. My name is Salli and I''m here for you. Anything in particular that bothers you or makes you feel the way you do today?',
       DATEADD(SECOND, 19, DATEADD(MINUTE, 2, SYSDATETIME())),
       'Out'
UNION ALL
SELECT 2,
       5989,
       'client',
       'OK, today I found out that my best friend is having some family issues and I thought that maybe I can refer her to you or if she''s too apprehensive maybe I can get someone to give me some strategies on how to help her deal with the problems she''s having right now.',
       DATEADD(SECOND, 68, DATEADD(MINUTE, 2, SYSDATETIME())),
       'In'
UNION ALL
SELECT 2,
       5989,
       'client',
       'I spoke to her last Friday and she said that because her step father is abusive towards her mom and her, she started experimenting with drugs and alcohol to kind of numb the pain and help her feel more in control.',
       DATEADD(SECOND, 125, DATEADD(MINUTE, 2, SYSDATETIME())),
       'In'
UNION ALL
SELECT 2,
       5989,
       'client',
       'I told her it''s not how she should be solving the problem and that she can do more harm but I think that things have gone too far for her to be able to turn it around without some sort of external help.',
       DATEADD(SECOND, 156, DATEADD(MINUTE, 2, SYSDATETIME())),
       'In'
UNION ALL
SELECT 3,
       3450,
       'client',
       'Hello, I spoke to a clinician named Amy last week and I just wanted to let her know that thanks to her help I feel a lot better now and finally have a positive outlook on life.',
       DATEADD(MINUTE, 3, SYSDATETIME()),
       'In'
UNION ALL
SELECT 3,
       3450,
       'client',
       'Is there anyway I can leave a great feedback for her or let her know how important those chats and her support were in my road to recovery and making me feel so much better? She''s been so helpfull and I want her to know that.',
       DATEADD(SECOND, 29, DATEADD(MINUTE, 3, SYSDATETIME())),
       'In'
UNION ALL
SELECT 3,
       34,
       'clinician',
       'Hi there and thank you for the kind feedback. I''m sure Zoe would be thrilled to know that she''s helped you. I''ll see if she''s busy with another person. Alternatively, you can leave the feedback comment on our website.',
       DATEADD(SECOND, 89, DATEADD(MINUTE, 3, SYSDATETIME())),
       'Out';

/*==============================================================================
STEP 3
Create tvf_getConversations function to split out individual records into 
smaller 'chunks' based on a predefined character limit (AWS Polly service sets 
it to 1500) and a specyfic character occurance e.g. full stop.
==============================================================================*/
IF EXISTS
(
    SELECT *
    FROM sysobjects
    WHERE id = OBJECT_ID(N'tvf_getConversations')
          AND xtype IN ( N'FN', N'IF', N'TF' )
)
    DROP FUNCTION tvf_getConversations;
GO


CREATE FUNCTION tvf_getConversations
(
    @text VARCHAR(MAX),
    @maxlen INT,
    @stopchar CHAR(1)
)
RETURNS @Conversations TABLE
(
    Id INT IDENTITY(1, 1) NOT NULL,
    Result VARCHAR(MAX) NOT NULL
)
AS
BEGIN
    DECLARE @str VARCHAR(MAX);
    DECLARE @ind INT;
    IF (@text IS NOT NULL)
    BEGIN
        SET @ind = CASE
                       WHEN LEN(@text) <= @maxlen THEN LEN(@text) ELSE CASE WHEN CHARINDEX(@stopchar, REVERSE(SUBSTRING(@text, 1, @maxlen))) = 0 THEN CHARINDEX(@stopchar, @text) ELSE 1 + @maxlen - CHARINDEX(@stopchar, REVERSE(SUBSTRING(@text, 1, @maxlen))) END END; WHILE @ind > 0
        BEGIN

            SET @str
                = CASE
                      WHEN LEN(@text) <= @maxlen THEN
                          @text
                      ELSE
                          CASE
                              WHEN CHARINDEX(@stopchar, REVERSE(SUBSTRING(@text, 1, @maxlen))) = 0 THEN
                                  SUBSTRING(@text, 1, @ind)
                              ELSE
                                  SUBSTRING(
                                               @text,
                                               1,
                                               1 + @maxlen
                                               - CHARINDEX(@stopchar, REVERSE(SUBSTRING(@text, 1, @maxlen)))
                                           )
                          END
                  END;
            SET @text = SUBSTRING(@text, @ind + 1, LEN(@text));

            INSERT INTO @Conversations
            (
                Result
            )
            SELECT LTRIM(RTRIM((@str)));
            SET @ind = CASE
                           WHEN LEN(@text) <= @maxlen THEN
                               LEN(@text)
                           ELSE
                               CASE
                                   WHEN CHARINDEX(@stopchar, REVERSE(SUBSTRING(@text, 1, @maxlen))) = 0 THEN
                                       LEN(@text)
                                   ELSE
                                       1 + @maxlen - CHARINDEX(@stopchar, REVERSE(SUBSTRING(@text, 1, @maxlen)))
                               END
                       END;
        END;
        SET @str = @text;
    END;
    RETURN;
END;
GO

If you’re keen to explore the full functionality of this app and see how we can tap into other cloud providers’ machine learning APIs, please look at part two of this series where I will be amending this code with Watson tonal analysis of the patients’ data.

Tags: , , , , , , , ,

Designing data acquisition framework in SQL Server and SSIS – how to source and integrate external data for a decision support system or data warehouse (Part 4)

May 25th, 2016 / No Comments » / by admin

Note: Part 1 to this series can be found HERE, Part 2 HERE, Part 3 HERE and all the code and additional files for this post can be downloaded from my OneDrive folder HERE.

Closing this series with the final post, this part focuses on the SSIS package structure and ties up all the code and architecture described in previous posts. The SQL Server Integration Services solution featured here is very minimalistic as all the heavy lifting is done by the code described mainly in parts two and three. The package is meant to mainly provide a structure or framework to execute individual components and control the workflow and, beyond this functionality, does not utilise any features which facilitate data acquisition e.g. SSIS lookups, merges, conditional splits, data conversion or aggregation etc. There is a little bit of C# code driving some transformations, but most of the components that constitute the package’s structure act as triggers to external code execution only.

The whole package (Control Flow as there is no Data Flow transformations) consists mainly of Execute SQL Task transformations, with a few Script Tasks here and there as per the image below.

Data_Acquisition_Framework_Part4_Complete_SSIS_Package

With the exception of two For Each Loop containers which, as the name suggests, loop through small and large tables, all components execute in a sequential order and once only. Most transformations are connected using Precedence Constraints based on two evaluation operators: Expression and Constraint, as per the image below.

Data_Acquisition_Framework_Part4_SSIS_Precedence_Constraint_View

This ensures that all the tasks executed not only are required to finish with the status of SUCCESS but also that the underlying code (a stored procedure in most cases) has completed without errors i.e. the returned variable’s @Is_All_OK value is one (1). If the value of @Is_All_OK variable is not one (alternative value is zero), the package workflow should steer into executing task(s) responsible for notifying administrators of the potential issue and terminating subsequent tasks execution.

Let’s go over the package skeleton and how some of the transformations are structured starting with the two top components i.e. ‘Set_All_OK and Sync_Exec_StartTime Variables Values’ and ‘Set DBMail_Receipients Variable Value’. The first transformation simply populates two variables which are used in subsequent package workflow – The @Sync_Exec_Start which is storing package start date and time and @Is_All_OK one which is preliminarily set to zero (0). The following C# snippet is used for this functionality.

#region Help:  Introduction to the script task
/* The Script Task allows you to perform virtually any operation that can be accomplished in
 * a .Net application within the context of an Integration Services control flow. 
 * 
 * Expand the other regions which have "Help" prefixes for examples of specific ways to use
 * Integration Services features within this script task. */
#endregion


#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
#endregion

namespace ST_44c20947eb6c4c5cb2f698bdd17b3534
{
    /// <summary>
    /// ScriptMain is the entry point class of the script.  Do not change the name, attributes,
    /// or parent of this class.
    /// </summary>
    [Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
    public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
    {
        #region Help:  Using Integration Services variables and parameters in a script
        /* To use a variable in this script, first ensure that the variable has been added to 
         * either the list contained in the ReadOnlyVariables property or the list contained in 
         * the ReadWriteVariables property of this script task, according to whether or not your
         * code needs to write to the variable.  To add the variable, save this script, close this instance of
         * Visual Studio, and update the ReadOnlyVariables and 
         * ReadWriteVariables properties in the Script Transformation Editor window.
         * To use a parameter in this script, follow the same steps. Parameters are always read-only.
         * 
         * Example of reading from a variable:
         *  DateTime startTime = (DateTime) Dts.Variables["System::StartTime"].Value;
         * 
         * Example of writing to a variable:
         *  Dts.Variables["User::myStringVariable"].Value = "new value";
         * 
         * Example of reading from a package parameter:
         *  int batchId = (int) Dts.Variables["$Package::batchId"].Value;
         *  
         * Example of reading from a project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].Value;
         * 
         * Example of reading from a sensitive project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].GetSensitiveValue();
         * */

        #endregion

        #region Help:  Firing Integration Services events from a script
        /* This script task can fire events for logging purposes.
         * 
         * Example of firing an error event:
         *  Dts.Events.FireError(18, "Process Values", "Bad value", "", 0);
         * 
         * Example of firing an information event:
         *  Dts.Events.FireInformation(3, "Process Values", "Processing has started", "", 0, ref fireAgain)
         * 
         * Example of firing a warning event:
         *  Dts.Events.FireWarning(14, "Process Values", "No values received for input", "", 0);
         * */
        #endregion

        #region Help:  Using Integration Services connection managers in a script
        /* Some types of connection managers can be used in this script task.  See the topic 
         * "Working with Connection Managers Programatically" for details.
         * 
         * Example of using an ADO.Net connection manager:
         *  object rawConnection = Dts.Connections["Sales DB"].AcquireConnection(Dts.Transaction);
         *  SqlConnection myADONETConnection = (SqlConnection)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Sales DB"].ReleaseConnection(rawConnection);
         *
         * Example of using a File connection manager
         *  object rawConnection = Dts.Connections["Prices.zip"].AcquireConnection(Dts.Transaction);
         *  string filePath = (string)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Prices.zip"].ReleaseConnection(rawConnection);
         * */
        #endregion


        /// <summary>
        /// This method is called when this script task executes in the control flow.
        /// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
        /// To open Help, press F1.
        /// </summary>
        /// 
        public void Main()
        {
            // TODO: Add your code here
            DateTime saveNow = DateTime.Now;
            Dts.Variables["Is_All_OK"].Value = 0;
            Dts.Variables["Sync_Exec_StartTime"].Value = saveNow;
            Dts.TaskResult = (int)ScriptResults.Success;
        }

        #region ScriptResults declaration
        /// <summary>
        /// This enum provides a convenient shorthand within the scope of this class for setting the
        /// result of the script.
        /// 
        /// This code was generated automatically.
        /// </summary>
        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion

    }
}

The ‘Set DBMail_Receipients Variable Value’ task uses a SQL function described in part 1 to fetch e-mail addresses stored in one of the control tables and map those to one of the package variables used for sending out notifications in the event of any errors (see image below).

Data_Acquisition_Framework_Part4_SSIS_Email_Notification_Function_ExecuteSQLTask

Continuing on, we have a bunch of Execute SQL Task transformations, all simply triggering stored procedures executions (code outlined in part 3) while passing a number of parameter values or reading values passed back into the package from the underlying code. Some of the SSIS variables are predefined with their respective values, whereas others are assigned to values as the package executes. Below is a sample image depicting parameter mapping for a task responsible for schema definition changes check.

Data_Acquisition_Framework_Part4_SSIS_Task_Parameter_Mapping

As we can see, a number of input parameters are mapped to user pre-defined variables. A full list of package-defined variables is as per image below (click on image to enlarge).

Data_Acquisition_Framework_Part4_SSIS_Package_Variables_View

Moving on, after all pre-acquisition tasks have been completed successfully, a Sequence Container encompassing actual data acquisition tasks is placed. This transformation is broken down into two groups – one for small tables (running dynamic merge statement) and one for larger ones (running parallel INSERTS). Each of those tasks runs in a For Each Loop container, iterating through object names assigned based on the query from the preceding tasks which populate ‘Object’ data type variables as per image below.

Data_Acquisition_Framework_Part4_SSIS_Get_Big_Tables_SQL_Task

Once schema and table names are retrieved, this data is passed into the looping transformation which executes data acquisition routines for each combination of schema and table name as per image below.

Data_Acquisition_Framework_Part4_SSIS_ForEachLoop_Editor

Both data acquisition routines, even though looping through each table in sequential order, execute in simultaneously until all objects marked for synchronisation have been accounted for. Since both data acquisition stored procedures are designed not to halt data copying process in case of any exception occurring, but rather log any issues and continue on, this sequence container also links up with a package size error triggering process which assigns predefined values to error-related variables using Script Task and send out an e-mail notification to nominated e-mail addresses. The code snippet below provides predefined set of values for further error notification routine execution.

#region Help:  Introduction to the script task
/* The Script Task allows you to perform virtually any operation that can be accomplished in
 * a .Net application within the context of an Integration Services control flow. 
 * 
 * Expand the other regions which have "Help" prefixes for examples of specific ways to use
 * Integration Services features within this script task. */
#endregion


#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
#endregion

namespace ST_7be0163238e6428e99c1178dd2a1c435
{
    /// <summary>
    /// ScriptMain is the entry point class of the script.  Do not change the name, attributes,
    /// or parent of this class.
    /// </summary>
	[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
	public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
	{
        #region Help:  Using Integration Services variables and parameters in a script
        /* To use a variable in this script, first ensure that the variable has been added to 
         * either the list contained in the ReadOnlyVariables property or the list contained in 
         * the ReadWriteVariables property of this script task, according to whether or not your
         * code needs to write to the variable.  To add the variable, save this script, close this instance of
         * Visual Studio, and update the ReadOnlyVariables and 
         * ReadWriteVariables properties in the Script Transformation Editor window.
         * To use a parameter in this script, follow the same steps. Parameters are always read-only.
         * 
         * Example of reading from a variable:
         *  DateTime startTime = (DateTime) Dts.Variables["System::StartTime"].Value;
         * 
         * Example of writing to a variable:
         *  Dts.Variables["User::myStringVariable"].Value = "new value";
         * 
         * Example of reading from a package parameter:
         *  int batchId = (int) Dts.Variables["$Package::batchId"].Value;
         *  
         * Example of reading from a project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].Value;
         * 
         * Example of reading from a sensitive project parameter:
         *  int batchId = (int) Dts.Variables["$Project::batchId"].GetSensitiveValue();
         * */

        #endregion

        #region Help:  Firing Integration Services events from a script
        /* This script task can fire events for logging purposes.
         * 
         * Example of firing an error event:
         *  Dts.Events.FireError(18, "Process Values", "Bad value", "", 0);
         * 
         * Example of firing an information event:
         *  Dts.Events.FireInformation(3, "Process Values", "Processing has started", "", 0, ref fireAgain)
         * 
         * Example of firing a warning event:
         *  Dts.Events.FireWarning(14, "Process Values", "No values received for input", "", 0);
         * */
        #endregion

        #region Help:  Using Integration Services connection managers in a script
        /* Some types of connection managers can be used in this script task.  See the topic 
         * "Working with Connection Managers Programatically" for details.
         * 
         * Example of using an ADO.Net connection manager:
         *  object rawConnection = Dts.Connections["Sales DB"].AcquireConnection(Dts.Transaction);
         *  SqlConnection myADONETConnection = (SqlConnection)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Sales DB"].ReleaseConnection(rawConnection);
         *
         * Example of using a File connection manager
         *  object rawConnection = Dts.Connections["Prices.zip"].AcquireConnection(Dts.Transaction);
         *  string filePath = (string)rawConnection;
         *  //Use the connection in some code here, then release the connection
         *  Dts.Connections["Prices.zip"].ReleaseConnection(rawConnection);
         * */
        #endregion


		/// <summary>
        /// This method is called when this script task executes in the control flow.
        /// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
        /// To open Help, press F1.
        /// </summary>
		public void Main()
		{
			// TODO: Add your code here
            Dts.Variables["Error_Message"].Value = "Execution of the Mizzen data acquisition package failed on an unspecyfied step, most likely due to an SSIS transformation failure. Please troubleshoot.";
            Dts.Variables["Process_Name"].Value = "Non-specyfic SSIS Job Transformation Failure";
            Dts.Variables["Object_Name"].Value = "N/A";
            Dts.TaskResult = (int)ScriptResults.Success;
		}

        #region ScriptResults declaration
        /// <summary>
        /// This enum provides a convenient shorthand within the scope of this class for setting the
        /// result of the script.
        /// 
        /// This code was generated automatically.
        /// </summary>
        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion

	}
}

Following on, the rest of the tasks are responsible for index recreation, statistics refresh and performing a number of rudimentary checks to ensure the data copied across is consistent with the source environment. Recreating indexes task is utilising the same stored procedure as the one responsible for dropping them, the only difference is the argument passed to the underlying code i.e. instead DROP we are using CREATE statement.

Data_Acquisition_Framework_Part4_SSIS_Index_SQL_Statement

Next up we have another Execute SQL Task for updating table statistics and completing acquisition process a simple Execute Script task to store core package execution tasks completion date and time. This Execute Script task carries almost identical code to the one used to log package execution start date and time, the only difference being the variable name used i.e. ‘Sync_Exec_EndTime’ rather then ‘Sync_Exec_StartTime’.

public void Main()
        {
            // TODO: Add your code here
            DateTime saveNow = DateTime.Now;
            Dts.Variables["Sync_Exec_EndTime"].Value = saveNow;
            Dts.TaskResult = (int)ScriptResults.Success;
        }

Final section of this workflow deals with record count comparison between source and target and querying AdminDBA database for any errors that may have been logged. Details of the actual code is explained in part 3 to this series so for the sake of succinctness I will refrain from repeating myself here, but hopefully by now you have a fairly good understanding of how the package works and that, in its core, it is just a shell to control individual steps execution workflow. The whole package, as previously mentioned, can be downloaded from my OneDrive folder HERE.

To conclude, this short series should provide anyone versed in Microsoft Business Intelligence tools with a basic framework used to source transactional data for staging purposes. As a blueprint, many steps outlined in previous parts can be skipped or extended depending on your business needs as this implementation was ‘ripped out’ one of my clients’ environments and may not be suitable in its entirety as a drop-in solution in other projects. However, unless streaming or near-real time staging data is a requirement, this sort of framework can become and good springboard for the majority of projects, allowing for further modifications and tweaking if required.

Tags: , , ,