TCP-DS big data benchmark overview – how to generate and load sample data

October 11th, 2017 / No Comments » / by admin

Ever wanted to benchmark your data warehouse and see how it stacks up against the competition? Ever wanted to have a point of reference which would give you industry-applicable and measurable set of metrics you could use to objectively quantify your data warehouse or decision support system’s performance? And most importantly, have you ever considered which platform and vendor will provide you with the most bang for your buck setup?

Well, arguably one of the best way to find out is to simulate a suit of TPC-DS workloads (queries and data maintenance) against your target platform to obtain a representative evaluation of the System Under Test’s (SUT) performance as a general-purpose decision support system. There are a few other data sets and methodologies that can be used to gauge data storage and processing system’s performance e.g. TLC Trip Record Data released by the New York City Taxi & Limousine Commission has gained a lot of traction amongst big data specialists, however,  TPC-approved benchmarks have long been considered as the most objective, vendor-agnostic way to perform data-specific hardware and software comparisons, capturing the complexity of modern business processing to a much greater extent than its predecessors.

In this post I will explore how to generate test data and test queries using dsdgen and dsqgen utilities on a windows machine against the product supplier snowflake-type schema as well as how to load test data into the created database in order to run some or all of the 99 queries TPC-DS defines. I don’t want to go too much into details as TCP already provides a very comprehensive overview of the database and schema it provides, along with detail description of constraints and assumptions. This post is more focused on how to generate the required components used to evaluate the target platform the database used to for TPC-DS data sets and data loading.

In this post I will be referring to TPC-DS  version 2.5.0rc2, the most current version at the time of this post publication. The download link can be found on the consortium website. The key components used for system’s evaluation using TPC-DS tools include the following:

  • Data generator called dsdgen used for data sets creation
  • Query generator called dsqgen used for creating benchmark query sets
  • Three SQL files called tpcds.sql, tpcds_source.sql and tpc_ri.sql which create a sample implementation of the logical schema for the data warehouse

There are other components present in the toolkit, however, for the sake of brevity I will not be discussing data maintained functionality, verification data, alternative query sets or verification queries in this post. Let’s start with dsdgen tool – an executable used to create raw data sets in a form of files. The dsdgen solution supplied as part of the download needs to be complied before the use. For me it was as simple as firing up Microsoft Visual Studio, loading up dbgen2 solution and building the final executable. Once complied, the tool can be controlled from the command line by a combination of switches and environment variables assumed to be single flags preceded by a minus sign, optionally followed by an argument. In its simplest form of execution, it dsdgen can be called from Powershell e.g. I used the following combination of flags to generate the required data (1GB size) in a C:\TPCDSdata\SourceFiles\ folder location and to measure execution time.

Measure-Command {.\dsdgen /SCALE 1 /DIR C:\TPCDSdata\SourceFiles /FORCE /VERBOSE}

The captured output below shows that the execution time on my old Lenovo laptop (i7-4600U @ 2.1GHz, 12GB RAM, SSD HDD & Win10 Pro) was close to 10 minutes for 1GB of data generating sequentially.

The process can be sped up using PARALLEL flag which will engage multiple CPU cores, distributing workload across multiple processes. The VERBOSE parameter is pretty much self-explanatory – progress messages are displayed as the data is generated. FORCE flag overwrites previously created files. Finally, the scale parameter requires a little bit more explanation.

The TPC-DS benchmark defines a set of discrete scaling points – scale factors – based on the approximate size of the raw data produced by dsdgen. The set of scaling factors defined for the TCP-DS is 1TB, 3TB, 10TB, 30TB and 100TB where a terabyte (TB) is defined as 2 to the power of 40 bytes. The required row count for each permissible scale factor and each table in the test database is as follows.

Once the data has been generated, we can create a set of scripts used to simulate reporting/analytical business scenarios using another tool in the TCP-DS arsenal – dsqgen. As with the previous executable, dsqgen runs off the command line. Passing applicable query template (in this case Microsoft SQL Server specific), it creates a single SQL file containing queries used across a multitude of scenarios e.g. reporting queries, ad hoc queries, interactive OLAP queries as well as data mining queries. I used the following command to generate the SQL scripts file.

dsqgen /input .\query_templates\templates.lst /directory .\query_templates /dialect sqlserver /scale 1

Before I get to how the data loading script and schema overview, a quick mention of the final set of SQL scripts, tpcds.sql file, which create a sample implementation of the logical schema for the data warehouse. This file contains the necessary DDLs so running those on the environment of your choice should be executed before generated data is loaded into the data warehouse. Also, since the data/files generated by dsdgen utility come with a *.dat extension, it may be worthwhile to do some cleaning up before the target schema is populated. The following butchered Python script converts all *.dat files into *.csv files and breaks up some of the larger ones into a collection of smaller ones suffixed in an orderly fashion i.e. filename_1, filename_2, filename_3 etc. based on the values set for some of the variables declared at the start. This step is entirely optional but I found that some system utilities used for data loading find it difficult to deal with large files e.g. scaling factor greater than 100GB. Splitting a large file into a collection of smaller ones may be an easy loading speed optimisation technique, especially if the process can be parallelized.

from os import listdir, rename, remove, path, walk
from shutil import copy2, move
from csv import reader, writer
import pyodbc 
import win32file as win
from tqdm import tqdm
import platform as p

srcFilesLocation = path.normpath('C:/TPCDSdata/SourceFiles/') 
srcCSVFilesLocation = path.normpath('C:/TPCDSdata/SourceCSVFiles/') 
tgtFilesLocation = path.normpath('C:/TPCDSdata/TargetFiles/')    
delimiter = '|'
keepSrcFiles = True
keepSrcCSVFiles = True
rowLimit = 2000000
outputNameTemplate = '_%s.csv'
keepHeaders = False
            
def getSize(start_path):
    total_size = 0
    for dirpath, dirnames, filenames in walk(start_path):
        for f in filenames:
            fp = path.join(dirpath, f)
            total_size += path.getsize(fp)            
    return total_size 

def freeSpace(start_path):
    secsPerClus, bytesPerSec, nFreeClus, totClus = win.GetDiskFreeSpace(start_path)
    return secsPerClus * bytesPerSec * nFreeClus

def removeFiles(DirPath):
    filelist = [ f for f in listdir(DirPath)]
    for f in filelist:
        remove(path.join(DirPath, f))

def SplitAndMoveLargeFiles(file, rowCount, srcCSVFilesLocation, outputNameTemplate, keepHeaders=False):
    filehandler = open(path.join(srcCSVFilesLocation, file), 'r')
    csv_reader = reader(filehandler, delimiter=delimiter)
    current_piece = 1
    current_out_path = path.join(tgtFilesLocation, path.splitext(file)[0]+outputNameTemplate  % current_piece)
    current_out_writer = writer(open(current_out_path, 'w', newline=''), delimiter=delimiter)
    current_limit = rowLimit
    if keepHeaders:
        headers = next(csv_reader)
        current_out_writer.writerow(headers)
    pbar=tqdm(total=rowCount)        
    for i, row in enumerate(csv_reader):      
        pbar.update()             
        if i + 1 > current_limit:
            current_piece += 1
            current_limit = rowLimit * current_piece
            current_out_path = path.join(tgtFilesLocation, path.splitext(file)[0]+outputNameTemplate  % current_piece)
            current_out_writer = writer(open(current_out_path, 'w', newline=''), delimiter=delimiter)
            if keepHeaders:
                current_out_writer.writerow(headers)          
        current_out_writer.writerow(row)
    pbar.close()

def SourceFilesRename(srcFilesLocation, srcCSVFilesLocation):
    srcDirSize = getSize(srcFilesLocation)
    diskSize = freeSpace(srcFilesLocation)
    removeFiles(srcCSVFilesLocation)
    for file in listdir(srcFilesLocation):
        if file.endswith(".dat"):
            if keepSrcFiles:
                if srcDirSize >= diskSize:
                    print ('''Not enough space on the nominated disk to duplicate the files (you nominated to keep source files as they were generated i.e. with the .dat extension). 
                    Current source files directory size is {} MB. At least {} MB required to continue. Bailing out...'''
                    .format(round(srcDirSize/1024/1024,2), round(2*srcDirSize/1024/1024),2))
                    exit(1)
                else:                    
                    copy2(path.join(srcFilesLocation , file), srcCSVFilesLocation)
                    rename(path.join(srcCSVFilesLocation , file), path.join(srcCSVFilesLocation , file[:-4]+'.csv'))
            else:
                move(srcFilesLocation + file, srcCSVFilesLocation)
                rename(path.join(srcCVSFilesLocation , file), path.join(srcCSVFilesLocation , file[:-4]+'.csv'))

def ProcessLargeFiles(srcCSVFilesLocation,outputNameTemplate,rowLimit):
    fileRowCounts = []
    for file in listdir(srcCSVFilesLocation):
        if file.endswith(".csv"):
            fileRowCounts.append([file,sum(1 for line in open(path.join(srcCSVFilesLocation , file), newline=''))])               
    removeFiles(tgtFilesLocation)
    for file in fileRowCounts: 
        if file[1]>rowLimit:
            print("Processing File:", file[0],)
            print("Measured Row Count:", file[1])
            SplitAndMoveLargeFiles(file[0], file[1], srcCSVFilesLocation, outputNameTemplate)
        else:
            #print(path.join(srcCSVFilesLocation,file[0]))
            move(path.join(srcCSVFilesLocation, file[0]), tgtFilesLocation)
    removeFiles(srcCSVFilesLocation)            

if __name__ == "__main__":
    SourceFilesRename(srcFilesLocation, srcCSVFilesLocation)
    ProcessLargeFiles(srcCSVFilesLocation, outputNameTemplate, rowLimit)

The following screenshot demonstrates execution output when the file is run in Powershell. Since there were only two tables with the record count greater than the threshold set in the rowLimit variable i.e. 2000000 rows, only those two tables get broken up into smaller chunks.

Once the necessary data files have been created and the data warehouse is prepped for loading we can proceed to load the data. Depending on the volume of data, which in case of TPC-DS directly correlates with the scaling factor used when running dsdgen utility, a number of different methods can be used to populate the schema. Looking at Microsoft SQL Server as a test platform, the most straightforward approach would be to use BCP utility. The following Python script executes bulk copy program utility across many cores/CPUs to load the data created in the previous step (persisted in C:\TPCSdata\TargetFiles\ directory). The script utilises python’s multiprocessing library to make the most out of the resources available.

from multiprocessing import Pool, cpu_count
from os import listdir,path, getpid,system
import argparse

odbcDriver = '{ODBC Driver 13 for SQL Server}'

parser = argparse.ArgumentParser(description='TCP-DS Data Loading Scriptby bicortex.com')
parser.add_argument('-S','--svrinstance', help='Server and Instance Name of the MSSQL installation', required=True)
parser.add_argument('-D','--db', help='Database Name', required=True)
parser.add_argument('-U','--username', help='User Name', required=True)
parser.add_argument('-P','--password', help='Autenticating Password', required=True)
parser.add_argument('-L','--filespath', help='UNC path where the files are located', required=True)
args = parser.parse_args()

if not args.svrinstance or not args.db or not args.username or not args.password or not args.filespath:
    parser.print_help()
    exit(1)

def loadFiles(tableName, fileName):    
    print("Loading from CPU: %s" % getpid())
    fullPath = path.join(args.filespath, fileName)
    bcp = 'bcp %s in %s -S %s -d %s -U %s -P %s -q -c -t "|" -r"|\\n"' % (tableName, fullPath, args.svrinstance, args.db, args.username, args.password)
    #bcp store_sales in c:\TPCDSdata\TargetFiles\store_sales_1.csv -S <servername>\<instancename> -d <dbname> -U <username> -P <password> -q -c -t  "|" -r "|\n"  
    print(bcp)
    system(bcp)
    print("Done loading data from CPU: %s" % getpid())

if __name__ == "__main__":    
    p = Pool(processes = 2*cpu_count())
    for file in listdir(args.filespath):
        if file.endswith(".csv"):
            tableName = ''.join([i for i in path.splitext(file)[0] if not i.isdigit()]).rstrip('_')
            p.apply_async(loadFiles, [tableName, file])
    p.close()
    p.join()

The following screenshot (you can also see data load sample execution video HERE) depicts the Python script loading 10GB of data, with some files generated by dsdgen utility further split up into smaller chunks by the the split_files.py script. The data was loaded into a Microsoft SQL Server 2016 Enterprise instance running on Azure virtual machine with 20 virtual cores (Intel E5-2673 v3 @ 2.40GHz) and 140GB of memory. Apart from the fact that it’s very satisfying to watch 20 cores being put to work simultaneously, spawning multiple BCP instances significantly reduces data load times, even when accounting for locking and contention.

I have not bothered to test the actual load times as SQL Server has a number of knobs and switches which can be used to adjust and optimise system’s performance. For example, in-memory optimised tables, introduced in SQL Server 2014, store their data in memory using multiple versions of each row’s data. This technique is characterised as ‘non-blocking multi-version optimistic concurrency control’ and eliminates both locks and latches so a separate post may be in order to do justice to this feature and conduct a full performance analysis in a fair and unbiased manner.

So there you go – a fairly rudimentary rundown of some of the features of TCP-DS benchmark. For more detailed overview of TCP-DS please refer to the TCP website and their online resources HERE.

Tags: , , , , ,

Using AWS Polly And IBM Watson Text-To-Speech And Tone Analyser Artificial Intelligence Services To Read and Analyse Clinical Chat Data (Part 2)

August 23rd, 2017 / No Comments » / by admin

Note: Part one to this series can be found HERE

In my last blog post I outlined the concept of creating a simple Python GUI application which utilised Amazon Polly Text-To-Speech cloud API. The premise was quite simple – retrieve chat data stored in SQL Server database and pass it to Polly API to convert it into audible stream using a choice of different male and female voices.

Whilst this functionality provided a good ‘playground’ to showcase one of the multitude of cloud-enabled machine learning applications, I felt that augmenting text-to-voice feature with some visual clues as a representation of the chat content would provide additional value. This is where I thought pairing text-to-speech with linguistic analysis can make this app even more useful and complete. As of today, all major cloud juggernauts offer a plethora general-purpose ML services but when it comes to linguistic analysis which goes beyond sentiment tagging, IBM has risen to become a major player in this arena. IBM Watson Tone Analyser specifically targets understanding emotions and communication style using linguistic analysis to detect emotional, social and language tones in written text. Tones detected within the ‘General Purpose Endpoint’ include joy, fear, sadness, anger, disgust, analytical, confident, tentative, openness, conscientiousness, extraversion, agreeableness, and emotional range. Typical use cases for this service include analysing emotions and tones in what people write online, like tweets or reviews, predicting whether they are happy, sad, confident as well as monitoring customer service and support conversations, personalised marketing and of course chat bots. The following diagram shows the basic flow of calls to the service.

You authenticate to the Tone Analyzer API by providing the username and password that are provided in the service credentials for the service instance that you want to use. The API uses HTTP basic authentication. The request includes several parameters and their respective value options and the simplest way to kick some tires (after completing the sigh-up process) is to use curl tool command and some sample text for analysis e.g.

curl -v -u "username":"password" -H "Content-Type: text/plain" -d "I feel very happy today!"
"https://gateway.watsonplatform.net/tone-analyzer/api/v3/tone?version=2016-05-19"
{
   "document_tone": {
      "tone_categories": [
         {
            "tones": [
               {
                  "score": 0.013453,
                  "tone_id": "anger",
                  "tone_name": "Anger"
               },
               {
                  "score": 0.017433,
                  "tone_id": "disgust",
                  "tone_name": "Disgust"
               },
               {
                  "score": 0.039234,
                  "tone_id": "fear",
                  "tone_name": "Fear"
               },
               {
                  "score": 0.857981,
                  "tone_id": "joy",
                  "tone_name": "Joy"
               },
               {
                  "score": 0.062022,
                  "tone_id": "sadness",
                  "tone_name": "Sadness"
               }
            ],
            "category_id": "emotion_tone",
            "category_name": "Emotion Tone"
         },
         {
            "tones": [
               {
                  "score": 0,
                  "tone_id": "analytical",
                  "tone_name": "Analytical"
               },
               {
                  "score": "0.849827",
                  "tone_id": "confident",
                  "tone_name": "Confident"
               },
               {
                  "score": 0,
                  "tone_id": "tentative",
                  "tone_name": "Tentative"
               }
            ],
            "category_id": "language_tone",
            "category_name": "Language Tone"
         },
         {
            "tones": [
               {
                  "score": 0.016275,
                  "tone_id": "openness_big5",
                  "tone_name": "Openness"
               },
               {
                  "score": 0.262399,
                  "tone_id": "conscientiousness_big5",
                  "tone_name": "Conscientiousness"
               },
               {
                  "score": 0.435574,
                  "tone_id": "extraversion_big5",
                  "tone_name": "Extraversion"
               },
               {
                  "score": 0.679046,
                  "tone_id": "agreeableness_big5",
                  "tone_name": "Agreeableness"
               },
               {
                  "score": 0.092516,
                  "tone_id": "emotional_range_big5",
                  "tone_name": "Emotional Range"
               }
            ],
            "category_id": "social_tone",
            "category_name": "Social Tone"
         }
      ]
   }
}

The service returns JSON structure which can be further unpacked and analysed/visualised. Using their SDK and a little bit of Python we can create a little script that will pass the desired text to the Tone Analyser API and return a matplotlib graph chart visualising each tone value within its respective category. Below is a simple visualisation of a paragraph containing text with linguistically-negative sentiment and the Python code generating it.

import matplotlib.pyplot as plt
import numpy as np
import matplotlib as mpl
import watson_developer_cloud as wdc

tone_analyzer = wdc.ToneAnalyzerV3(
  version='2016-05-19',
  username='username',
  password='password',
  x_watson_learning_opt_out=True
)

message = 'Hi Team, I know the times are difficult! \
Our sales have been disappointing for the \
past three quarters for our data analytics \
product suite. We have a competitive data \
analytics product suite in the industry. \
But we need to do our job selling it!'

tone=tone_analyzer.tone(message, sentences=False, content_type='text/plain')

#assign each tone name and value to its respective category 
emotion_tone={}
language_tone={}
social_tone={}

for cat in tone['document_tone']['tone_categories']:
    print('Category:', cat['category_name'])
    if cat['category_name'] == 'Emotion Tone':
        for tone in cat['tones']:
            print('-', tone['tone_name'], tone['score'])
            emotion_tone.update({tone['tone_name']:tone['score']})     
    if cat['category_name'] == 'Social Tone':
        for tone in cat['tones']:
            print('-', tone['tone_name'], tone['score'])
            social_tone.update({tone['tone_name']:tone['score']}) 
    if cat['category_name'] == 'Language Tone':
        for tone in cat['tones']:
            print('-', tone['tone_name'], tone['score'])
            language_tone.update({tone['tone_name']:tone['score']})             


#find largest value in all tones to adjust the x scale accordingly
max_tone_value = {**emotion_tone, **language_tone, **social_tone}
if max(max_tone_value.values()) > 0.9:
    max_tone_value = 1
else:
    max_tone_value = max(max_tone_value.values())+0.1


#plot all tones by category
fig = plt.figure(figsize=(7,7))
mpl.style.use('seaborn')
fig.suptitle('Tones by Intensity, scale range: 0(min) - 1(max)', fontsize=14, fontweight='bold')

x1=fig.add_subplot(311)
y_pos = np.arange(len(emotion_tone.keys()))
plt.barh(y_pos, emotion_tone.values(), align='center', alpha=0.6, color='limegreen')
plt.yticks(y_pos, emotion_tone.keys())
plt.title('Emotion Tone', fontsize=12)
x1.set_xlim([0, max_tone_value])

x2=fig.add_subplot(312)
y_pos = np.arange(len(social_tone.keys()))
plt.barh(y_pos, social_tone.values(), align='center', alpha=0.6,color='red')
plt.yticks(y_pos, social_tone.keys())
plt.title('Social Tone',fontsize=12)
x2.set_xlim([0, max_tone_value])

x3=fig.add_subplot(313)
y_pos = np.arange(len(language_tone.keys()))
plt.barh(y_pos, language_tone.values(), height = 0.4, align='center', alpha=0.6, color='deepskyblue')
plt.yticks(y_pos, language_tone.keys())
plt.title('Language Tone',fontsize=12)
x3.set_xlim([0, max_tone_value])

plt.tight_layout(pad=0.9, w_pad=0.5, h_pad=1.7)
fig.subplots_adjust(top=0.85, left=0.20)
plt.show()

And finally, the amended Python code for the complete application (including AWS Polly integration from Part 1) is as follows:

import sys
import time
import io
from contextlib import closing
import multiprocessing
import pygame
import numpy as np
import pyodbc
import boto3
import watson_developer_cloud as wdc
import tkinter as tk
from tkinter import scrolledtext, ttk, messagebox
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg


class ConnectionInfo:
    def __init__(self):
        self.use_win_auth = tk.IntVar()
        self.inst_srv = tk.StringVar()
        self.inst_db = tk.StringVar()
        self.inst_login = tk.StringVar()
        self.inst_passwd = tk.StringVar()
        self.session_id = tk.IntVar()
        self.use_aws_api = tk.IntVar(value=1)
        self.aws_access_key_id = tk.StringVar()
        self.aws_secret_access_key = tk.StringVar()
        self.use_ibm_api = tk.IntVar(value=1)
        self.ibm_username = tk.StringVar()
        self.ibm_passwd = tk.StringVar()
        self.clinician_voice = tk.StringVar()
        self.patient_voice = tk.StringVar()

        self.ibm_version = '2016-05-19'
        self.ibm_x_watson_learning_opt_out = True


class MsSqlDatabase:
    ODBC_DRIVER = '{ODBC Driver 13 for SQL Server}'

    def __init__(self, conn_info):
        self.conn_info = conn_info

    def connect(self):
        connection_string = ('DRIVER={driver};SERVER={server};DATABASE={db};'.format(
            driver=self.ODBC_DRIVER,
            server=self.conn_info.inst_srv.get(),
            db=self.conn_info.inst_db.get()))
        if self.conn_info.use_win_auth.get() == 1:
            connection_string = connection_string + 'Trusted_Connection=yes;'
        else:
            connection_string = connection_string + 'UID={uid};PWD={password};'.format(
                uid=self.conn_info.inst_login.get(),
                password=self.conn_info.inst_passwd.get())

        try:
            conn = pyodbc.connect(connection_string, timeout=1)
        except pyodbc.Error as err:
            conn = None
        return conn

    def get_session(self, conn):
        try:
            cursor = conn.cursor()
            cursor.execute(
                """SELECT UPPER(user_role), message_body FROM dbo.test_dialog t
                WHERE t.Session_ID = ? ORDER BY t.ID ASC""", self.conn_info.session_id.get())
            results = cursor.fetchall()
        except pyodbc.Error as err:
            results = None
        return results

    def get_user_id(self, conn):
        try:
            cursor = conn.cursor()
            cursor.execute(
                """SELECT DISTINCT user_id from dbo.test_dialog t
                WHERE t.session_id = ? AND user_role = 'client'""", self.conn_info.session_id.get())
            results = cursor.fetchall()
        except pyodbc.Error as err:
            results = None
        return results

    def get_messages(self, conn):
        try:
            cursor = conn.cursor()
            cursor.execute(
                """SELECT t.user_role, t.direction, LTRIM(RTRIM(f.RESULT)) AS message FROM dbo.test_dialog t
                CROSS APPLY dbo.tvf_getConversations (t.message_body, 50, '.') f WHERE t.session_id = ?
                ORDER BY t.id, f.id""", self.conn_info.session_id.get())
            results = cursor.fetchall()
        except pyodbc.Error as err:
            results = None
        return results

    def get_messages_for_tone_analyse(self, conn):
        try:
            cursor = conn.cursor()
            cursor.execute(
                """DECLARE @message VARCHAR(MAX) 
                SELECT @message = COALESCE(@message + ' ', '') + message_body 
                FROM dbo.test_dialog t WHERE t.Session_ID = ? AND user_role = 'client' ORDER BY t.ID ASC
                SELECT @message""", self.conn_info.session_id.get())
            results = cursor.fetchall()
            results = [row[0] for row in results]
            try:
                messages = ''.join(results)
            except TypeError:
                messages = None
        except pyodbc.Error as err:
            messages = None
        return messages


class AudioPlayer:
    def __init__(self, credentials, voices):
        self.credentials = credentials
        self.voices = voices

    def run(self, messages, voices, commands, status):
        status['code'] = 0
        status['message'] = 'OK'

        try:
            polly_service = boto3.client(
                'polly',
                aws_access_key_id = self.credentials['aws_access_key'],
                aws_secret_access_key = self.credentials['aws_secret_key'],
                region_name = 'eu-west-1')
        except:
            polly_service = None

        if not polly_service:
            status['code'] = 1
            status['message'] = 'Cannot connect to AWS Polly service. Please check your API credentials are valid.'
            return

        is_stopped = False
        is_paused = False
        pygame.mixer.init(channels=1, frequency=44100)
        for message in messages:
            print(message)

            try:
                polly_response = polly_service.synthesize_speech(
                    OutputFormat='ogg_vorbis',
                    Text=message[2],
                    TextType='text',
                    VoiceId=voices[message[0]])
            except:
                polly_response = None

            if not polly_response:
                status['code'] = 2
                status['message'] = 'Cannot connect to AWS Polly service. Please check your API credentials are valid.'
                break

            if "AudioStream" in polly_response:
                with closing(polly_response["AudioStream"]) as stream:
                    data = stream.read()
                    filelike = io.BytesIO(data)
                    sound = pygame.mixer.Sound(file=filelike)
                    sound.play()

                    while pygame.mixer.get_busy() or is_paused:
                        if not commands.empty():
                            command = commands.get()
                            if command == 'STOP':
                                sound.stop()
                                is_stopped = True
                                break
                            if command == 'PAUSE':
                                is_paused = not is_paused
                                if is_paused:
                                    sound.stop()
                                else:
                                    sound.play()
                        time.sleep(0.010)
            if is_stopped:
                break


class AppFrame(object):
    def __init__(self):
        self.root = tk.Tk()
        self.root.title('Polly Text-To-Speech GUI Prototype ver 1.1')
        self.root.resizable(width=False, height=False)

        self.conn_info = ConnectionInfo()

        self.menubar = self.create_menubar()
        self.connection_details_frame = ConnDetailsFrame(self.root, self)
        self.session_frame = SessionDetailsFrame(self.root, self)
        self.playback_frame = PlaybackDetailsFrame(self.root, self)
        self.graph_frame = WatsonGraphDetailsFrame(self.root, self)

    def create_menubar(self):
        menubar = tk.Menu(self.root)

        title_menu = tk.Menu(menubar, tearoff=0)
        title_menu.add_command(label='API details...', command=self.on_api_details_select)
        title_menu.add_command(label='About...', command=self.on_about_select)
        menubar.add_cascade(label='About', menu=title_menu)
        self.root.config(menu=menubar)

        return menubar

    def on_api_details_select(self):
        dialog = APIDetailsDialog(self.root)
        self.root.wait_window(dialog)

    def on_about_select(self):
        tk.messagebox.showinfo(title="About", message="Polly Text-To-Speech GUI Prototype ver 1.1")

    def run(self):
        self.root.mainloop()


class ConnDetailsFrame(ttk.LabelFrame):

    def __init__(self, root, parent):
        super(ConnDetailsFrame, self).__init__(root, text='1. Connection Details')
        super(ConnDetailsFrame, self).grid(
            row=0, column=0, columnspan=3, sticky='W',
            padx=5, pady=5, ipadx=5, ipady=5
        )

        self.root = root
        self.parent = parent
        self.conn_info = parent.conn_info

        self.create_notebook()

    def create_notebook(self):
        self.tab_control = ttk.Notebook(self)
        self.create_frames()
        self.create_labels()
        self.create_entry()
        self.create_checkbuttons()

    def create_frames(self):
        self.tab_db = ttk.Frame(self.tab_control)
        self.tab_api = ttk.Frame(self.tab_control)
        self.tab_control.add(self.tab_db, text="Database Connection Details ")
        self.tab_control.add(self.tab_api, text="APIs Connection Details ")
        self.tab_control.grid(row=0, column=0, sticky='E', padx=5, pady=5)

    def create_labels(self):
        ttk.Label(self.tab_db, text="Server/Instance Name:").grid(row=0, column=0, sticky='E', padx=5, pady=(15, 5))
        ttk.Label(self.tab_db, text="Database Name:").grid(row=1, column=0, sticky='E', padx=5, pady=5)
        ttk.Label(self.tab_db, text="User Name:").grid(column=0, row=3, sticky="E", padx=5, pady=5)
        ttk.Label(self.tab_db, text="Password:").grid(column=0, row=4, sticky="E", padx=5, pady=(5, 10))        

        ttk.Label(self.tab_api, text="AWS Access Key ID:").grid(column=0, row=1, sticky="E", padx=5, pady=(5, 5))
        ttk.Label(self.tab_api, text="AWS Secret Access Key:").grid(column=0, row=2, sticky="E", padx=5, pady=5)
        ttk.Label(self.tab_api, text="IBM Watson Username:").grid(column=0, row=4, sticky="E", padx=5, pady=(5, 5))
        ttk.Label(self.tab_api, text="IBM Watson Password:").grid(column=0, row=5, sticky="E", padx=5, pady=(5,15))
    
    def create_checkbuttons(self):
        check_use_win_auth = ttk.Checkbutton(self.tab_db, onvalue=1, offvalue=0,
                                             variable=self.conn_info.use_win_auth,
                                             text='Use Windows Authentication',
                                             command=self.on_use_win_auth_change)
        check_use_win_auth.grid(row=2, column=0, sticky='W', padx=15, pady=(15,5))  
        check_use_aws_api = ttk.Checkbutton(self.tab_api, onvalue=1, offvalue=0,
                                            variable=self.conn_info.use_aws_api, text='Use AWS Text-To-Speech API')
        check_use_aws_api.grid(row=0, column=0, sticky='W', padx=15, pady=(15,5)) 
        check_use_ibm_api = ttk.Checkbutton(self.tab_api, onvalue=1, offvalue=0,
                                            variable=self.conn_info.use_ibm_api,
                                            text='Use IBM Watson API',
                                            command=self.on_use_ibm_api_change)
        check_use_ibm_api.grid(row=3, column=0, sticky='W', padx=15, pady=(15,5))   

    def create_entry(self):
        entry_db_server_name = ttk.Entry(self.tab_db, width=60, textvariable=self.conn_info.inst_srv)
        entry_db_server_name.grid(row=0, column=1, sticky='W', padx=10, pady=(15, 5))
        entry_db_name = ttk.Entry(self.tab_db, width=60, textvariable=self.conn_info.inst_db)
        entry_db_name.grid(row=1, column=1, sticky='W', padx=10, pady=5)                                      
        self.entry_db_user_name = ttk.Entry(self.tab_db, width=60, textvariable=self.conn_info.inst_login)
        self.entry_db_user_name.grid(row=3, column=1, padx=10, pady=5)
        self.entry_db_password = ttk.Entry(self.tab_db, width=60, textvariable=self.conn_info.inst_passwd, show="*")
        self.entry_db_password.grid(row=4, column=1, padx=10, pady=(5, 10))

        entry_aws_access_key = ttk.Entry(self.tab_api, width=60,
                                         textvariable=self.conn_info.aws_access_key_id)
        entry_aws_access_key.grid(row=1, column=1, sticky='W', padx=10, pady=(5, 5))
        entry_aws_secret_key = ttk.Entry(self.tab_api, width=60,
                                         textvariable=self.conn_info.aws_secret_access_key)
        entry_aws_secret_key.grid(row=2, column=1, padx=5, pady=5)         
        self.entry_ibm_username = ttk.Entry(self.tab_api, width=60,
                                            textvariable=self.conn_info.ibm_username)
        self.entry_ibm_username.grid(row=4, column=1, padx=5, pady=5)
        self.entry_ibm_password = ttk.Entry(self.tab_api, width=60,
                                            textvariable=self.conn_info.ibm_passwd,show="*")
        self.entry_ibm_password.grid(row=5, column=1, padx=5, pady=(5,15))

    def on_use_win_auth_change(self):
        if (self.conn_info.use_win_auth.get() == 1):
            self.entry_db_user_name.configure(state='disabled')
            self.entry_db_password.configure(state='disabled')
        else:
            self.entry_db_user_name.configure(state='normal')
            self.entry_db_password.configure(state='normal')

    def on_use_ibm_api_change(self):
        if (self.conn_info.use_ibm_api.get() == 0):
            self.entry_ibm_username.configure(state='disabled')
            self.entry_ibm_password.configure(state='disabled')
        else:
            self.entry_ibm_username.configure(state='normal')
            self.entry_ibm_password.configure(state='normal')


class SessionDetailsFrame(ttk.LabelFrame):
    def __init__(self, root, parent):
        super(SessionDetailsFrame, self).__init__(root, text='2. Session Details')
        super(SessionDetailsFrame, self).grid(row=1, column=0, sticky='NW', padx=5, pady=5, ipadx=5, ipady=5, rowspan=2)

        self.parent = parent
        self.conn_info = parent.conn_info

        self.create_entries()
        self.create_buttons()
        self.create_scrolled_text()

    def create_entries(self):
        ttk.Entry(
            self, justify="center", width=18, font="Helvetica 18 bold",
            textvariable=self.conn_info.session_id).grid(row=1, column=2, padx=3, pady=5, sticky='W')

    def create_buttons(self):
        search_session_btn = ttk.Button(self, text="SEARCH SESSION ID", command=self.on_search_session_click)
        search_session_btn.grid(row=1, column=3, ipadx=8, ipady=6)

    def create_scrolled_text(self):
        self.dialog_st = scrolledtext.ScrolledText(self, width=45, height=13, wrap=tk.WORD)
        self.dialog_st.grid(column=2, row=2, padx=4, pady=4, columnspan=2, sticky='w')

        style = ttk.Style()
        style.configure("TButton", foreground="red")

    def on_search_session_click(self):
        db = MsSqlDatabase(self.conn_info)
        conn = db.connect()
        if conn:
            results = db.get_session(conn)
            if results:
                self.dialog_st.delete('1.0', tk.END)
                for role, message in results:
                    self.dialog_st.insert(tk.END, '{}:\n'.format(role), 'role')
                    self.dialog_st.insert(tk.END, '{}\n\n'.format(message), 'message')
                    self.dialog_st.tag_config('role', foreground='red', font="Courier 11 bold")
            else:
                tk.messagebox.showwarning(title="Warning", message="Nominated Session ID not found in the database!")
        else:
            tk.messagebox.showwarning(title="Warning", message="Cannot connect to database server!")


class PlaybackDetailsFrame(ttk.LabelFrame):
    def __init__(self, root, parent):
        super(PlaybackDetailsFrame, self).__init__(root, text='3. Playback Details')
        super(PlaybackDetailsFrame, self).grid(row=1, column=1, sticky='WN', padx=5, pady=5, ipadx=5, ipady=5)

        self.root = root
        self.parent = parent
        self.conn_info = parent.conn_info

        self.create_labels()
        self.create_combobox()
        self.create_buttons()

        root.protocol('WM_DELETE_WINDOW', self.on_closing)

        self.process_manager = multiprocessing.Manager()
        self.player_process = None
        self.player_commands = None
        self.player_status = None

    def create_labels(self):
        l1 = ttk.Label(self, text="Clinician Voice:").grid(row=0, column=0, sticky='W', padx=5, pady=5)
        l2 = ttk.Label(self, text="Patient Voice:").grid(row=0, column=1, sticky='W', padx=5, pady=5)
        var1 = tk.StringVar(self.root)
        var2 = tk.StringVar(self.root)

    def create_combobox(self):
        clinician = ttk.Combobox(self, width=11, textvariable=self.conn_info.clinician_voice)
        clinician.grid(row=1, column=0, padx=5, pady=5, sticky='NW')
        clinician['values'] = (
            'Russell',
            'Nicole',
            'Amy',
            'Brian',
            'Emma',
            'Raveena',
            'Ivy',
            'Joanna',
            'Joey',
            'Justin',
            'Kendra',
            'Kimberly',
            'Salli'
        )
        clinician.current(0)

        patient = ttk.Combobox(self, width=11, textvariable=self.conn_info.patient_voice)
        patient.grid(row=1, column=1, padx=(5, 0), pady=5, sticky='NW')
        patient['values'] = (
            'Nicole',
            'Russell',
            'Amy',
            'Brian',
            'Emma',
            'Raveena',
            'Ivy',
            'Joanna',
            'Joey',
            'Justin',
            'Kendra',
            'Kimberly',
            'Salli')
        patient.current(0)

    def create_buttons(self):
        play_session_btn = ttk.Button(self, text="PLAY", width=25, command=self.on_play_session_click)
        play_session_btn.grid(row=2, column=0, columnspan=2, padx=(10, 2), pady=(20, 5), sticky='WE')
        pause_session_btn = ttk.Button(self, text="PAUSE", width=25, command=self.on_pause_session_click)
        pause_session_btn.grid(row=3, column=0, columnspan=2, padx=(10, 2), pady=5, sticky='WE')
        stop_session_btn = ttk.Button(self, text="STOP", width=25, command=self.on_stop_session_click)
        stop_session_btn.grid(row=4, column=0, columnspan=2, padx=(10, 2), pady=(5, 5), sticky='WE')

    def on_play_session_click(self):
        if self.player_process:
            if self.player_process.is_alive():
                self.player_commands.put('STOP')

        db = MsSqlDatabase(self.conn_info)
        db_conn = db.connect()
        if db_conn:
            messages = db.get_messages(db_conn)
            if messages:
                is_credentials_valid = True
                if len(self.conn_info.aws_access_key_id.get()) == 0 or \
                    len(self.conn_info.aws_secret_access_key.get()) == 0:
                        is_credentials_valid = False

                if (is_credentials_valid):
                    credentials = {
                        'aws_access_key': self.conn_info.aws_access_key_id.get(),
                        'aws_secret_key': self.conn_info.aws_secret_access_key.get()
                    }
                    voices = {
                        'clinician': self.conn_info.clinician_voice.get(),
                        'client': self.conn_info.patient_voice.get()
                    }
                    player = AudioPlayer(credentials, voices)

                    self.player_commands = self.process_manager.Queue()
                    self.player_status = self.process_manager.dict()
                    self.player_process = multiprocessing.Process(
                        target=player.run,
                        args=(messages, voices, self.player_commands, self.player_status))
                    self.player_process.start()
                    self.root.after(500, lambda: self.check_player_status(self.player_process, self.player_status))
                else:
                    tk.messagebox.showwarning(title="Warning", message="AWS access or secret key is empty")
            else:
                tk.messagebox.showwarning(title="Warning", message="Nominated Session ID not found in the database!")
        else:
            tk.messagebox.showwarning(title="Warning", message="Cannot connect to database server")

    def on_pause_session_click(self):
        if self.player_commands:
            self.player_commands.put("PAUSE")

    def on_stop_session_click(self):
        if self.player_commands:
            self.player_commands.put("STOP")

    def on_closing(self):
        if self.player_process:
            if self.player_process.is_alive():
                self.player_commands.put('STOP')
            self.player_process.join()

        self.root.destroy()

    def check_player_status(self, player_process, player_status):
        if not player_process.is_alive():
            print('Player status: {}, {}'.format(player_status['code'], player_status['message']))
            if player_status['code'] != 0:
                tk.messagebox.showwarning(title="Warning", message=player_status['message'])
        else:
            self.root.after(500, lambda: self.check_player_status(player_process, player_status))


class WatsonGraphDetailsFrame(ttk.LabelFrame):
    def __init__(self, root, parent):
        super(WatsonGraphDetailsFrame, self).__init__(root, text='4. Analysis Graph Details')
        super(WatsonGraphDetailsFrame, self).grid(row=2, column=1, sticky='WE', padx=5, pady=5, ipadx=5, ipady=1)

        self.root = root
        self.parent = parent
        self.conn_info = parent.conn_info

        self.create_buttons()

    def create_buttons(self):
        self.tone_analysis_btn = ttk.Button(self, text='PERFORM TONE ANALYSIS', width=28,
                                                      command=self.tone_analysis_btn_click)
        self.tone_analysis_btn.grid(row=0, column=1, padx=(12, 2), pady=(11, 11), sticky='EW')

    def tone_analysis_btn_click(self):
        if len(self.conn_info.ibm_username.get()) == 0 or len(self.conn_info.ibm_passwd.get()) == 0 \
            or self.conn_info.use_ibm_api.get() == 0:
                tk.messagebox.showwarning(title='Warning',
                                          message='\'IBM Watson API\' username or password is empty or disabled')
                return

        db = MsSqlDatabase(self.conn_info)
        conn = db.connect()
        if not conn:
            tk.messagebox.showwarning(title='Warning', message='Cannot connect to database server!')
            return

        messages = db.get_messages_for_tone_analyse(conn)
        if not messages:
            tk.messagebox.showwarning(title='Warning', message='Nominated Session ID not found in the database!')
            return

        if len(messages.split()) < 3: tk.messagebox.showwarning(title='Warning', message='Too few words provided!') return if sys.getsizeof(messages) > 128000:
            tk.messagebox.showwarning(title='Warning', message='The message provided is too long for API string limit.')
            return

        db_user_id = db.get_user_id(conn)
        if not db_user_id:
            tk.messagebox.showwarning(title='Warning', message='Cannot get User ID for given Session ID')
            return
        client = { 'session_id': self.conn_info.session_id.get(),
                   'user_id': db_user_id[0][0] }

        tone_analyzer = wdc.ToneAnalyzerV3(
            version=self.conn_info.ibm_version,
            username=self.conn_info.ibm_username.get(),
            password=self.conn_info.ibm_passwd.get(),
            x_watson_learning_opt_out=self.conn_info.ibm_x_watson_learning_opt_out
        )

        try:
            tone = tone_analyzer.tone(messages, sentences=False, content_type='text/plain')
        except:
            tk.messagebox.showwarning(title='Warning', message='Cannot connect to IBM Watson service')
            return

        dialog = ToneAnalysisDialog(self, client, tone)
        self.wait_window(dialog)


class APIDetailsDialog(tk.Toplevel):
    def __init__(self, parent):
        super(APIDetailsDialog, self).__init__(parent)
        self.parent = parent

        self.title('API Details')
        self.resizable(width=False, height=False)

        frame = ttk.LabelFrame(self, text="Polly Text-To-Speech GUI Prototype API Details")
        ttk.Label(frame, text="Text to Speech API:").grid(row=0, column=0, sticky='W')
        ttk.Label(frame, text="AWS Polly").grid(row=1, column=0, sticky='W', pady=(0, 10))
        ttk.Label(frame, text="Tone Analyser API:").grid(row=2, column=0, sticky='W')
        ttk.Label(frame, text="IBM Watson").grid(row=3, column=0, sticky='W')
        frame.pack(side=tk.TOP, fill=tk.BOTH, padx=10, pady=10)

        close_btn = ttk.Button(self, text='Close', command=self.on_close_btn_click)
        close_btn.pack(padx=5, pady=5, side=tk.BOTTOM)

        self.update_idletasks()
        w = self.winfo_width()
        h = self.winfo_height()
        x = (self.winfo_screenwidth() - w) // 2
        y = (self.winfo_screenheight() - h) // 2
        self.geometry('{}x{}+{}+{}'.format(w, h, x, y))
        self.grab_set()

    def on_close_btn_click(self):
        self.destroy()


class ToneAnalysisDialog(tk.Toplevel):
    def __init__(self, parent, client, tone):
        super(ToneAnalysisDialog, self).__init__(parent)

        self.parent = parent
        self.client = client
        self.tone = tone

        self.title('Tone Analysis')

        plot_widget = self.create_tone_analyse_plot()
        plot_widget.pack(side=tk.TOP, fill=tk.BOTH, expand=1)

        close_btn = ttk.Button(self, text='Close', command=self.on_close_btn_click)
        close_btn.pack(padx=5, pady=5, side=tk.BOTTOM)
        self.grab_set()

    def create_tone_analyse_plot(self):
        emotion_tone = {}
        language_tone = {}
        social_tone = {}

        for cat in self.tone['document_tone']['tone_categories']:
            print('Category:', cat['category_name'])
            if cat['category_name'] == 'Emotion Tone':
                for tone in cat['tones']:
                    print('-', tone['tone_name'], tone['score'])
                    emotion_tone.update({tone['tone_name']: tone['score']})
            if cat['category_name'] == 'Social Tone':
                for tone in cat['tones']:
                    print('-', tone['tone_name'], tone['score'])
                    social_tone.update({tone['tone_name']: tone['score']})
            if cat['category_name'] == 'Language Tone':
                for tone in cat['tones']:
                    print('-', tone['tone_name'], tone['score'])
                    language_tone.update({tone['tone_name']: tone['score']})

        max_tone_values = list(emotion_tone.values()) + list(language_tone.values()) + list(social_tone.values())
        if max(max_tone_values) > 0.9:
            max_tone_value = 1
        else:
            max_tone_value = max(max_tone_values) + 0.1

        mpl.style.use('seaborn')

        fig = mpl.figure.Figure(figsize=(7, 7))
        canvas = FigureCanvasTkAgg(fig, master=self)

        fig.suptitle(
            'Tones Analysis of Patient ID \'{}\', Chat Data for Session ID \'{}\'\nScale range: 0 (min) -- 1 (max)'
                .format(self.client['user_id'], self.client['session_id']), fontsize=14, fontweight='bold')

        keys = sorted(emotion_tone.keys(), reverse=True)
        values = [emotion_tone[key] for key in keys]
        y_pos = np.arange(len(values))
        ax1 = fig.add_subplot(311)
        ax1.barh(y_pos, values, align='center', alpha=0.6, color='limegreen')
        ax1.set_yticks(y_pos)
        ax1.set_yticklabels(keys)
        ax1.set_title('Emotion Tone', fontsize=12)
        ax1.set_xlim([0, max_tone_value])

        keys = sorted(social_tone.keys(), reverse=True)
        values = [social_tone[key] for key in keys]
        y_pos = np.arange(len(values))
        ax2 = fig.add_subplot(312)
        ax2.barh(y_pos, values, align='center', alpha=0.6, color='red')
        ax2.set_yticks(y_pos)
        ax2.set_yticklabels(keys)
        ax2.set_title('Social Tone', fontsize=12)
        ax2.set_xlim([0, max_tone_value])

        keys = sorted(language_tone.keys(), reverse=True)
        values = [language_tone[key] for key in keys]
        y_pos = np.arange(len(values))
        ax3 = fig.add_subplot(313)
        ax3.barh(y_pos, values, height=0.4, align='center', alpha=0.6, color='deepskyblue')
        ax3.set_yticks(y_pos)
        ax3.set_yticklabels(keys)
        ax3.set_title('Language Tone', fontsize=12)
        ax3.set_xlim([0, max_tone_value])

        fig.tight_layout(pad=0.9, w_pad=0.5, h_pad=1.7)
        fig.subplots_adjust(top=0.85, left=0.20)

        canvas.show()
        widget = canvas.get_tk_widget()

        return widget

    def on_close_btn_click(self):
        self.destroy()

if __name__ == "__main__":
    app = AppFrame()
    app.run()

This concludes this two-part series on building a simple GUI app in Python and Tkinter using AWS and IBM machine learning cloud services. Now you can see that anyone, with a little bit of elbow grease, minimal Python skills and little bit of time to spare (no PhD required!) can take advantage of these machine learning services and create something interesting.

Tags: , , , , , , , ,