{"id":4027,"date":"2020-05-24T07:00:00","date_gmt":"2020-05-23T21:00:00","guid":{"rendered":"http:\/\/bicortex.com\/?p=4027"},"modified":"2020-08-20T08:17:20","modified_gmt":"2020-08-19T22:17:20","slug":"kicking-the-tires-on-airflow-apaches-workflow-management-platform-architecture-overview-installation-and-sample-azure-cloud-deployment-pipeline-in-python-part-2","status":"publish","type":"post","link":"http:\/\/bicortex.com\/bicortex\/kicking-the-tires-on-airflow-apaches-workflow-management-platform-architecture-overview-installation-and-sample-azure-cloud-deployment-pipeline-in-python-part-2\/","title":{"rendered":"Kicking the Tires on Airflow, Apache\u2019s workflow management platform \u2013 Architecture Overview, Installation and sample Azure Cloud Deployment Pipeline in Python (Part 2)"},"content":{"rendered":"<h3 style=\"text-align: center;\">Airflow Sample Workflow Continued&#8230;<\/h3>\n<p>Note: Part 1 can be found <a href=\"http:\/\/bicortex.com\/kicking-the-tires-on-airflow-apaches-workflow-management-platform-architecture-overview-installation-and-sample-azure-cloud-deployment-pipeline-in-python-part-1\" target=\"_blank\" rel=\"noopener noreferrer\">HERE<\/a> and all files used in this tutorial can be downloaded from <a href=\"https:\/\/1drv.ms\/u\/s!AuEyKKgH71pxhNY3EHHrOvLrj4YSQQ?e=HZ6UZn\" target=\"_blank\" rel=\"noopener noreferrer\">HERE<\/a>.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">In the first part to this short series on Apache Airflow I outlined it its core architecture, installation process and created a sample example of a very rudimentary DAG (Directed Acyclic Graph). In this post I&#8217;d like to dive a bit deeper and look at a more advanced workflow which combines a few different operations and tasks. Let\u2019s assume that we\u2019re tasked with creating a DAG which needs to accomplish the following:<\/p>\n<ul>\n<li style=\"text-align: justify;\">Programmatically create Citus database instance in Azure public cloud with specific configuration parameters e.g. geographic location, storage size, number of nodes assigned, firewall rules etc. These parameters are stored across two JSON template files, known Azure Resource Manager templates<\/li>\n<li style=\"text-align: justify;\">Subsequently to creating Azure Citus database, the workflow should create a database schema using DDL SQL stored in one of the configuration files<\/li>\n<li style=\"text-align: justify;\">Next, load text files data into the newly created database schema<\/li>\n<li style=\"text-align: justify;\">Run four sample TPC-DS SQL queries in parallel (queries 5, 13, 47 and 57) and store the results into a file on the local file system<\/li>\n<li style=\"text-align: justify;\">Shut the cluster down and remove any resources created as part of this process<\/li>\n<li style=\"text-align: justify;\">Send out an e-mail notifying operator(s) of the workflow conclusion<\/li>\n<\/ul>\n<p class=\"Standard\" style=\"text-align: justify;\">A similar type of workload is often used to create an idempotent pipeline which stitches together a number of tasks, including cloud infrastructure provisioning, data analytics, ETL\/ELT activities etc. to design an end-to-end data management solution. The required DAG, with all its dependencies and activities should like as per the image below.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2020\/04\/Airflow_Azure_Cloud_Job_DAG_Graph_View.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-4048\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2020\/04\/Airflow_Azure_Cloud_Job_DAG_Graph_View.png\" alt=\"\" width=\"580\" height=\"359\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2020\/04\/Airflow_Azure_Cloud_Job_DAG_Graph_View.png 580w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2020\/04\/Airflow_Azure_Cloud_Job_DAG_Graph_View-300x186.png 300w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<h3 style=\"text-align: center;\">Sample Airflow Azure Cloud Pipeline Architecture<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">In order to provision the Azure resource groups and resources, I have created two JSON template files, also known as Azure Resource Manager (ARM) templates, which contain the required resources and their properties. ARM templates allow for declarative creation and deployment of any Azure infrastructure components e.g. virtual machines, hosted databases, storage accounts etc. along their properties and names. As the libraries used in the Python version of the Azure SDK directly mirror and consume Azure service&#8217;s REST endpoints, it\u2019s quite easy to script out a repeatable deployment process using ARM templates as described in more details further along in this post. As part of the solution configuration, I have also created two SQL files which are used to create a suite of tables on the Citus cluster and run the queries, outputting the result sets into a local directory.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\"><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2020\/04\/Airflow_Solution_Files_Directory_View.png\"><img loading=\"lazy\" decoding=\"async\" class=\"size-full wp-image-4049 alignleft\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2020\/04\/Airflow_Solution_Files_Directory_View.png\" alt=\"\" width=\"262\" height=\"734\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2020\/04\/Airflow_Solution_Files_Directory_View.png 262w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2020\/04\/Airflow_Solution_Files_Directory_View-107x300.png 107w\" sizes=\"auto, (max-width: 262px) 100vw, 262px\" \/><\/a>The aforementioned files used to populate the Citus cluster schema are part of the TPC-DS benchmark data (scaling factor set to 1GB) and consist of a number of flat files in a CSV format. As we&#8217;re not going to recreate the whole TPC-DS benchmark suite of tests (this post is not intended to evaluate Citus database performance), to simplify the data loading process, I will only use a few of them (just over 800MB in size), which should provide us with enough data to run a few queries once the tables have been populated. To simplify this demonstration, all Python solution files as well as ARM JSON templates, SQL files and flat files data were moved into dags directory aka {AIRFLOW_HOME}\/dags folder. Under typical scenario, data would not co-exists with configuration and solution files but for this exercise, I have nominated to structure the project as per the the directory breakdown on the left. Also, in a production environment, particularly one with many dag and task files, you would want to ensure that a consistent organizational structure is imposed across DAG folder and its sub-directories. As Airflow is such a versatile tool, depending on a specific use case, you may want to separate individual projects and their related hooks, operators, scripts and other files into their own respective folders etc.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">With Airflow server up and running (installation and configuration instructions in the previous post <a href=\"http:\/\/bicortex.com\/kicking-the-tires-on-airflow-apaches-workflow-management-platform-architecture-overview-installation-and-sample-azure-cloud-deployment-pipeline-in-python-part-1\" target=\"_blank\" rel=\"noopener noreferrer\">HERE<\/a>) and the files required to load into our Citus database schema staged in the solution directory we can start unpacking individual code samples used in the final DAG. As mentioned at the start, Citus database cluster, just as any other Azure resource, can be spun up declaratively using ARM template(s). You can peruse the content of the templates used in this demo as well as other artifacts in my publicly accessible OneDrive folder <a href=\"https:\/\/1drv.ms\/u\/s!AuEyKKgH71pxhNY3EHHrOvLrj4YSQQ?e=HZ6UZn\" target=\"_blank\" rel=\"noopener noreferrer\">HERE<\/a>.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">A quick word about Citus (now owned by Microsoft). Citus is an open source extension to PostgreSQL which distributes data and queries across multiple nodes. Because Citus is an extension to PostgreSQL (not a fork), it gives developers and enterprises a scale-out database while keeping the power and familiarity of a relational database. Citus horizontally scales PostgreSQL across multiple machines using sharding and replication. Its query engine parallelizes incoming SQL queries across these servers to enable super-fast responses on even very large datasets. I&#8217;m not very familiar with the service but there are many posts on the Internet describing some interesting projects Citus was used in, beating other competing technologies (TiDB, Apache Pinot, Apache Kylin, Apache Druid), partly due to its great SQL support and a large community of PostgreSQL users who are already familiar with the RDBMS it is based on e.g. <a href=\"https:\/\/techcommunity.microsoft.com\/t5\/azure-database-for-postgresql\/architecting-petabyte-scale-analytics-by-scaling-out-postgres-on\/ba-p\/969685\" target=\"_blank\" rel=\"noopener noreferrer\">HERE<\/a>.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2020\/04\/Citus_Cluster_Simplified_Architecture.jpg\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-4111\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2020\/04\/Citus_Cluster_Simplified_Architecture.jpg\" alt=\"\" width=\"580\" height=\"326\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2020\/04\/Citus_Cluster_Simplified_Architecture.jpg 638w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2020\/04\/Citus_Cluster_Simplified_Architecture-300x169.jpg 300w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Now, let&#8217;s go through the individual scripts and unpack the whole solution piece by piece. The following Python script is using ARM templates to provision Azure resource group and a small Citus database cluster (single coordinator node and two worker nodes).<\/p>\n<pre class=\"brush: python; title: ; notranslate\" title=\"\">\r\nimport adal\r\nfrom timeit import default_timer as timer\r\nfrom sys import platform\r\nfrom msrestazure.azure_active_directory import AdalAuthentication\r\nfrom msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD\r\nfrom azure.mgmt.resource.resources.models import (\r\n    DeploymentMode,\r\n    DeploymentProperties,\r\n    Deployment,\r\n)\r\nfrom azure.mgmt.resource import ResourceManagementClient\r\nfrom pathlib import Path, PurePosixPath\r\nimport json\r\nimport os\r\n\r\narm_template_location = PurePosixPath(\r\n    &quot;\/home\/airflow\/airflow\/dags\/azure_citus_db_dag_deps\/azure_arm_templates\/pg_template.json&quot;\r\n)\r\narm_template_params_location = PurePosixPath(\r\n    &quot;\/home\/airflow\/airflow\/dags\/azure_citus_db_dag_deps\/azure_arm_templates\/pg_template_params.json&quot;\r\n)\r\n\r\nresource_group = &quot;citus_svr_resource_group&quot;\r\nexternal_IP = os.popen(&quot;curl -s ifconfig.me&quot;).readline()\r\ndeployment_name = &quot;azure_citus_deployment&quot;\r\n\r\nexternal_ip_params = {\r\n    &quot;name&quot;: &quot;ClientIPAddress&quot;,\r\n    &quot;startIPAddress&quot;: external_IP,\r\n    &quot;endIPAddress&quot;: external_IP,\r\n}\r\nadmin_login = {&quot;value&quot;: &quot;citus&quot;}\r\nadmin_passwd = {&quot;value&quot;: &quot;your_citus_password&quot;}\r\n\r\ntemplate = arm_template_location\r\nparams = arm_template_params_location\r\n\r\nif template:\r\n    with open(template, &quot;r&quot;) as json_file_template:\r\n        template = json.load(json_file_template)\r\n\r\nif params:\r\n    with open(params, &quot;r&quot;) as json_file_template:\r\n        params = json.load(json_file_template)\r\n        params = {k: v for k, v in params&#x5B;&quot;parameters&quot;].items()}\r\n        params&#x5B;&quot;firewallRules&quot;]&#x5B;&quot;value&quot;]&#x5B;&quot;rules&quot;]&#x5B;1] = external_ip_params\r\n        params&#x5B;&quot;administratorLogin&quot;] = admin_login\r\n        params&#x5B;&quot;administratorLoginPassword&quot;] = admin_passwd\r\n        resource_group_location = params&#x5B;&quot;location&quot;]&#x5B;&quot;value&quot;]\r\n\r\n\r\n# Tenant ID for your Azure Subscription\r\nTENANT_ID = &quot;123a27e2-7777-4d30-9ca2-ce960d430ef8&quot;\r\n\r\n# Your Service Principal App ID\r\nCLIENT = &quot;ae277f4e-882d-4f03-a0b5-b69275046123&quot;\r\n\r\n# Your Service Principal Password\r\nKEY = &quot;e7bbf0ed-d461-48c7-aace-c6a4822a123e&quot;\r\n\r\n# Your Azure Subscription ID\r\nsubscription_id = &quot;1236a74c-dfd8-4b4d-b0b9-a355d2ec793e&quot;\r\n\r\nLOGIN_ENDPOINT = AZURE_PUBLIC_CLOUD.endpoints.active_directory\r\nRESOURCE = AZURE_PUBLIC_CLOUD.endpoints.active_directory_resource_id\r\n\r\ncontext = adal.AuthenticationContext(LOGIN_ENDPOINT + &quot;\/&quot; + TENANT_ID)\r\ncredentials = AdalAuthentication(\r\n    context.acquire_token_with_client_credentials, RESOURCE, CLIENT, KEY\r\n)\r\n\r\nclient = ResourceManagementClient(credentials, subscription_id)\r\n\r\n\r\ndef create_citus_instance(resource_group):\r\n\r\n    deployment_properties = {\r\n        &quot;mode&quot;: DeploymentMode.incremental,\r\n        &quot;template&quot;: template,\r\n        &quot;parameters&quot;: params,\r\n    }\r\n\r\n    deployment_async_operation = client.deployments.create_or_update(\r\n        resource_group, deployment_name, deployment_properties\r\n    )\r\n\r\n    deployment_async_operation.wait()\r\n\r\n\r\ndef main():\r\n    print(&quot;\\nCreating Azure Resource Group...&quot;)\r\n    start = timer()\r\n    client.resource_groups.create_or_update(\r\n        resource_group, {&quot;location&quot;: resource_group_location}\r\n    )\r\n    print(&quot;Creating Citus Cluster...&quot;)\r\n    create_citus_instance(resource_group)\r\n    end = timer()\r\n    time = round(end - start, 1)\r\n    print(&quot;Total Elapsed Deployment Time = {t} seconds&quot;.format(\r\n        t=format(time, &quot;.2f&quot;)))\r\n\r\n\r\nif __name__ == &quot;__main__&quot;:\r\n    main()\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">This file, called provision_citus_cluster.py, will be called from our Airflow DAG using PythonOperator and is the first step of our pipeline. The instance creation process should take a few minutes to provision and once created we can continue on and generate Citus cluster schema with all the required objects.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">In order to populate Citus database with the flat files data we first need to create the desired schema (in this case called tpc_ds) and all the required objects. Even though this process will not be populating all tables (there are only thirteen flat files to be loaded), the script will create all TPC-DS benchmark objects using a simple SQL file containing all necessary DDL statements. The actual SQL file can be viewed and downloaded from my OneDrive folder so to keep this post concise, I will only include the Python script which parses the DDL SQL statements and executes those, creating all required tables. This operation (called from create_citus_schema.py file) will generate a second object in our DAG&#8217;s chain of tasks.<\/p>\n<pre class=\"brush: python; title: ; notranslate\" title=\"\">\r\nfrom pathlib import Path, PurePosixPath\r\nfrom sys import platform\r\nfrom psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT\r\nimport psycopg2\r\nimport sys\r\n\r\nsql_schema = PurePosixPath(\r\n    &quot;\/home\/airflow\/airflow\/dags\/azure_citus_db_dag_deps\/sql\/tpc_ds_schema.sql&quot;\r\n)\r\n\r\npg_args = {\r\n    &quot;user&quot;: &quot;citus&quot;,\r\n    &quot;password&quot;: &quot;your_citus_password&quot;,\r\n    &quot;host&quot;: &quot;citussvrgroup-c.postgres.database.azure.com&quot;,\r\n    &quot;port&quot;: &quot;5432&quot;,\r\n    &quot;database&quot;: &quot;citus&quot;,\r\n    &quot;sslmode&quot;: &quot;require&quot;,\r\n}\r\n\r\npg_schema_name = &quot;tpc_ds&quot;\r\n\r\n\r\ndef get_sql(sql_schema):\r\n    &quot;&quot;&quot;\r\n    Source operation types from the 'create_sqlite_schema' SQL file.\r\n    Each operation is denoted by the use of four dash characters\r\n    and a corresponding table DDL statement and store them in a dictionary\r\n    (referenced in the main() function).\r\n    &quot;&quot;&quot;\r\n    table_name = &#x5B;]\r\n    query_sql = &#x5B;]\r\n\r\n    with open(sql_schema, &quot;r&quot;) as f:\r\n        for i in f:\r\n            if i.startswith(&quot;----&quot;):\r\n                i = i.replace(&quot;----&quot;, &quot;&quot;)\r\n                table_name.append(i.rstrip(&quot;\\n&quot;))\r\n\r\n    temp_query_sql = &#x5B;]\r\n    with open(sql_schema, &quot;r&quot;) as f:\r\n        for i in f:\r\n            temp_query_sql.append(i)\r\n        l = &#x5B;i for i, s in enumerate(temp_query_sql) if &quot;----&quot; in s]\r\n        l.append((len(temp_query_sql)))\r\n        for first, second in zip(l, l&#x5B;1:]):\r\n            query_sql.append(&quot;&quot;.join(temp_query_sql&#x5B;first:second]))\r\n    sql = dict(zip(table_name, query_sql))\r\n    return sql\r\n\r\n\r\ndef build_schema(sql_schema, pg_schema_name, **kwargs):\r\n    connection = None\r\n    try:\r\n        connection = psycopg2.connect(**kwargs)\r\n        cursor = connection.cursor()\r\n        cursor.execute(&quot;SELECT version();&quot;)\r\n        record = cursor.fetchone()\r\n        if record:\r\n            connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)\r\n            cursor.execute(\r\n                &quot;CREATE SCHEMA IF NOT EXISTS {s};&quot;.format(\r\n                    s=pg_schema_name)\r\n            )\r\n            conn_params = connection.get_dsn_parameters()\r\n            if conn_params&#x5B;&quot;dbname&quot;] == &quot;citus&quot;:\r\n                sql = get_sql(sql_schema)\r\n                ddls = sql.values()\r\n                for s in ddls:\r\n                    try:\r\n                        cursor.execute(s)\r\n                        connection.commit()\r\n                    except (Exception, psycopg2.DatabaseError) as error:\r\n                        print(&quot;Error while creating nominated database object&quot;, error)\r\n                        sys.exit(1)\r\n            else:\r\n                raise Exception(&quot;Failed to connect to citus database.&quot;)\r\n\r\n    except (Exception, psycopg2.Error) as error:\r\n        print(&quot;Could not connect to your PostgreSQL instance.&quot;, error)\r\n    finally:\r\n        if connection:\r\n            cursor.close()\r\n            connection.close()\r\n\r\n\r\ndef main():\r\n    build_schema(sql_schema, pg_schema_name, **pg_args)\r\n\r\n\r\nif __name__ == &quot;__main__&quot;:\r\n    main()\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">Up next is our third task which loads text files data into the newly created database. The following Python code (solution file is called load_data_into_citus.py) is responsible for taking each of the staged flat files and copying those into the corresponding tables (in a sequential order).<\/p>\n<pre class=\"brush: python; title: ; notranslate\" title=\"\">\r\nimport psycopg2\r\nimport os\r\nimport sys\r\nfrom pathlib import Path, PurePosixPath\r\nfrom psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT\r\n\r\n\r\ntpc_ds_files = PurePosixPath(\r\n    &quot;\/home\/airflow\/airflow\/dags\/azure_citus_db_dag_deps\/tpc_ds_data\/&quot;\r\n)\r\n\r\npg_schema_name = &quot;tpc_ds&quot;\r\nencoding = &quot;iso-8859-1&quot;\r\npg_args = {\r\n    &quot;user&quot;: &quot;citus&quot;,\r\n    &quot;password&quot;: &quot;your_citus_password&quot;,\r\n    &quot;host&quot;: &quot;citussvrgroup-c.postgres.database.azure.com&quot;,\r\n    &quot;port&quot;: &quot;5432&quot;,\r\n    &quot;database&quot;: &quot;citus&quot;,\r\n    &quot;sslmode&quot;: &quot;require&quot;,\r\n}\r\n\r\n\r\ndef load_data(tpc_ds_files, pg_schema_name, **kwargs):\r\n    connection = None\r\n    files = &#x5B;f for f in os.listdir(tpc_ds_files) if f.endswith(&quot;.csv&quot;)]\r\n    try:\r\n        connection = psycopg2.connect(**kwargs)\r\n        cursor = connection.cursor()\r\n        cursor.execute(&quot;SELECT version();&quot;)\r\n        record = cursor.fetchone()\r\n        if record:\r\n            connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)\r\n            for file in files:\r\n                schema_table_name = pg_schema_name + &quot;.&quot; + file&#x5B;:-4]\r\n                with open(\r\n                    os.path.join(tpc_ds_files, file), &quot;r&quot;, encoding=encoding\r\n                ) as f:\r\n                    # next(f)  # Skip the header row\r\n                    print(\r\n                        &quot;Truncating table {table_name}...&quot;.format(\r\n                            table_name=schema_table_name\r\n                        )\r\n                    )\r\n                    cursor.execute(\r\n                        &quot;TRUNCATE TABLE {table_name};&quot;.format(\r\n                            table_name=schema_table_name\r\n                        )\r\n                    )\r\n                    print(\r\n                        &quot;Copying file {file_name} into {table_name} table...\\n&quot;.format(\r\n                            file_name=file, table_name=schema_table_name\r\n                        )\r\n                    )\r\n                    cursor.copy_from(f, schema_table_name, sep=&quot;|&quot;, null=&quot;&quot;)\r\n                    file_row_rounts = sum(\r\n                        1\r\n                        for line in open(\r\n                            os.path.join(tpc_ds_files, file),\r\n                            encoding=encoding,\r\n                            newline=&quot;&quot;,\r\n                        )\r\n                    )\r\n                    cursor.execute(\r\n                        &quot;SELECT COUNT(1) FROM {tbl}&quot;.format(\r\n                            tbl=schema_table_name)\r\n                    )\r\n                    record = cursor.fetchone()\r\n                    db_row_counts = record&#x5B;0]\r\n\r\n                    if file_row_rounts != db_row_counts:\r\n                        raise Exception(\r\n                            &quot;Table {tbl} failed to load correctly as record counts do not match: flat file: {ff_ct} vs database: {db_ct}.\\\r\n                            Please troubleshoot!&quot;.format(\r\n                                tbl=schema_table_name,\r\n                                ff_ct=file_row_rounts,\r\n                                db_ct=db_row_counts,\r\n                            )\r\n                        )\r\n\r\n    except Exception as e:\r\n        print(e)\r\n    finally:\r\n        if connection:\r\n            cursor.close()\r\n            connection.close()\r\n\r\n\r\ndef main():\r\n    load_data(tpc_ds_files, pg_schema_name, **pg_args)\r\n\r\n\r\nif __name__ == &quot;__main__&quot;:\r\n    main()\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">As you may expect, each one of the aforementioned operations needs to be executed sequentially as there are clear dependencies across each of the steps outlined above i.e. database objects cannot be populated without the schema being created in the first place and likewise, database schema cannot be created without the database instance being present. However, when it comes to SQL queries execution, Citus was designed to crunch large volumes of data concurrently. Our next set of operation will calculate and generate four separate SQL results and stage those as CSV files inside the &#8216;queries_output&#8217; folder. Please also note that this post does not look into Citus cluster performance and is not concerned with the queries execution times. We are not attempting to tune the way the data is distributed across the nodes or adjust any configuration parameters as the main purpose of this post is to highlight Airflow functionality and design patterns for the above scenarios.<\/p>\n<pre class=\"brush: python; title: ; notranslate\" title=\"\">\r\nimport psycopg2\r\nimport os\r\nimport sys\r\nimport csv\r\nfrom pathlib import Path, PurePosixPath\r\n\r\n\r\nsql_queries_file = PurePosixPath(&quot;\/home\/airflow\/airflow\/dags\/azure_citus_db_dag_deps\/sql\/queries.sql&quot;)\r\nsql_output_files_dir = PurePosixPath(\r\n        &quot;\/home\/airflow\/airflow\/dags\/azure_citus_db_dag_deps\/queries_output\/&quot;\r\n    )\r\n\r\npg_schema_name = &quot;tpc_ds&quot;\r\nencoding = &quot;iso-8859-1&quot;\r\npg_args = {\r\n    &quot;user&quot;: &quot;citus&quot;,\r\n    &quot;password&quot;: &quot;your_citus_password&quot;,\r\n    &quot;host&quot;: &quot;citussvrgroup-c.postgres.database.azure.com&quot;,\r\n    &quot;port&quot;: &quot;5432&quot;,\r\n    &quot;database&quot;: &quot;citus&quot;,\r\n    &quot;sslmode&quot;: &quot;require&quot;,\r\n}\r\n\r\n\r\ndef get_sql(sql_queries_file):\r\n    query_number = &#x5B;]\r\n    query_sql = &#x5B;]\r\n\r\n    with open(sql_queries_file, &quot;r&quot;) as f:\r\n        for i in f:\r\n            if i.startswith(&quot;----&quot;):\r\n                i = i.replace(&quot;----&quot;, &quot;&quot;)\r\n                query_number.append(i.rstrip(&quot;\\n&quot;))\r\n    temp_query_sql = &#x5B;]\r\n    with open(sql_queries_file, &quot;r&quot;) as f:\r\n        for i in f:\r\n            temp_query_sql.append(i)\r\n        l = &#x5B;i for i, s in enumerate(temp_query_sql) if &quot;----&quot; in s]\r\n        l.append((len(temp_query_sql)))\r\n        for first, second in zip(l, l&#x5B;1:]):\r\n            query_sql.append(&quot;&quot;.join(temp_query_sql&#x5B;first:second]))\r\n    sql = dict(zip(query_number, query_sql))\r\n    return sql\r\n\r\n\r\ndef main():\r\n    query_sql = sql.get(param).rstrip()\r\n    query_sql = query_sql&#x5B;:-1] # remove last semicolon\r\n    output_file_name = param.lower() + &quot;_results.csv&quot;\r\n    try:\r\n        connection = psycopg2.connect(**pg_args)\r\n        cursor = connection.cursor()\r\n        cursor.execute(&quot;SELECT version();&quot;)\r\n        record = cursor.fetchone()\r\n        if record:\r\n            sql_for_file_output = &quot;COPY ({0}) TO STDOUT WITH CSV DELIMITER '|';&quot;.format(query_sql)\r\n            with open(os.path.join(sql_output_files_dir, output_file_name), &quot;w&quot;) as output_file:\r\n                cursor.copy_expert(sql_for_file_output, output_file)\r\n    except Exception as e:\r\n        print(e)\r\n    finally:\r\n        if connection:\r\n            cursor.close()\r\n            connection.close()\r\n\r\n\r\nif __name__ == &quot;__main__&quot;:\r\n    if len(sys.argv&#x5B;1:]) == 1:\r\n        sql = get_sql(sql_queries_file)\r\n        param = sys.argv&#x5B;1]\r\n        query_numbers = &#x5B;q for q in sql]\r\n        if param not in query_numbers:\r\n            raise ValueError(\r\n                &quot;Incorrect argument given. Choose from the following numbers: {q}&quot;.format(\r\n                    q=&quot; or &quot;.join(query_numbers)\r\n                )\r\n            )\r\n        else:\r\n            param = sys.argv&#x5B;1]\r\n            main()\r\n    else:\r\n        raise ValueError(\r\n            &quot;Too many arguments given. Looking for &lt;query number&gt; numerical value.&quot;\r\n        )\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">The above code is executed using BashOperator rather than PythonOperator, as each query is run with a parameter passed to the executing script, indicating the SQL query number. Additionally, each query output (the result set) is staged as a CSV file on the local drive so that at the end of the pipeline execution we should end up with four flat files in the queries_output folder.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Last but not least, we will initialize the &#8216;clean-up&#8217; step, where Citus cluster resource group and all its corresponding services will be terminated to avoid incurring additional cost and send an email to a nominated email address, notifying administrator of the successful pipeline execution completion. The following Python code terminates and deletes all resources created in this process.<\/p>\n<pre class=\"brush: python; title: ; notranslate\" title=\"\">\r\nimport adal\r\nfrom timeit import default_timer as timer\r\nfrom sys import platform\r\nfrom msrestazure.azure_active_directory import AdalAuthentication\r\nfrom msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD\r\nfrom azure.mgmt.resource.resources.models import (\r\n    DeploymentMode,\r\n    DeploymentProperties,\r\n    Deployment,\r\n)\r\nfrom azure.mgmt.resource import ResourceManagementClient\r\n\r\nresource_group = &quot;citus_svr_resource_group&quot;\r\n\r\n\r\n# Tenant ID for your Azure Subscription\r\nTENANT_ID = &quot;123a27e2-7777-4d30-9ca2-ce960d430ef8&quot;\r\n\r\n# Your Service Principal App ID\r\nCLIENT = &quot;ae277f4e-882d-4f03-a0b5-b69275046123&quot;\r\n\r\n# Your Service Principal Password\r\nKEY = &quot;e7bbf0ed-d461-48c7-aace-c6a4822a123e&quot;\r\n\r\n# Your Azure Subscription ID\r\nsubscription_id = &quot;1236a74c-dfd8-4b4d-b0b9-a355d2ec793e&quot;\r\n\r\nLOGIN_ENDPOINT = AZURE_PUBLIC_CLOUD.endpoints.active_directory\r\nRESOURCE = AZURE_PUBLIC_CLOUD.endpoints.active_directory_resource_id\r\n\r\ncontext = adal.AuthenticationContext(LOGIN_ENDPOINT + &quot;\/&quot; + TENANT_ID)\r\ncredentials = AdalAuthentication(\r\n    context.acquire_token_with_client_credentials, RESOURCE, CLIENT, KEY\r\n)\r\n\r\nclient = ResourceManagementClient(credentials, subscription_id)\r\n\r\n\r\ndef main():\r\n    client.resource_groups.delete(resource_group)\r\n\r\n\r\nif __name__ == &quot;__main__&quot;:\r\n    main()\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">There are many ways one can script out email notification dispatch using Python, however, Airflow provides out-of-the-box functionality in the form of EmailOperator. For this demonstration I decided to use my personal Gmail address which required a slight change to the airflow.cfg (Airflow configuration) file as well as creating a Google account App Password. More on the process of creating App Password can be found <a href=\"https:\/\/support.google.com\/accounts\/answer\/185833?hl=en\" target=\"_blank\" rel=\"noopener noreferrer\">HERE<\/a>. The following changes to the Airflow configuration file are required (in addition to DAG-specyfic email task definition) to enable EmailOperator functionality with Gmail-specyfic email address.<\/p>\n<pre class=\"brush: bash; title: ; notranslate\" title=\"\">\r\nsmtp_host = smtp.gmail.com\r\nsmtp_user = your_gmail_email_address@gmail.com\r\nsmtp_password = your_app_password\r\nsmtp_port = 587\r\nsmtp_mail_from = Airflow\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">The final piece of the puzzle is the actual DAG file responsible for individual tasks definition as per my previous introductory post to Airflow <a href=\"http:\/\/bicortex.com\/kicking-the-tires-on-airflow-apaches-workflow-management-platform-architecture-overview-installation-and-sample-azure-cloud-deployment-pipeline-in-python-part-1\" target=\"_blank\" rel=\"noopener noreferrer\">HERE<\/a>. The following Python code is responsible for defining DAG arguments, individual tasks and the order they should be executed in. Notice that some Python tasks are executed using BashOperator instead of PythonOperator. That&#8217;s because those scripts are run with an argument passed to them using Python sys module argv list, denoting query number which the script is to run. Also, those tasks need to include full path to where the script is located. In contrast, PythonOperator only requires a Python callable to be included.<\/p>\n<pre class=\"brush: python; title: ; notranslate\" title=\"\">\r\nfrom airflow import DAG\r\nfrom airflow.operators.bash_operator import BashOperator\r\nfrom airflow.operators.python_operator import PythonOperator\r\nfrom datetime import datetime, timedelta\r\nfrom airflow.operators.email_operator import EmailOperator\r\nfrom azure_citus_db_dag_deps import create_citus_schema\r\nfrom azure_citus_db_dag_deps import load_data_into_citus\r\nfrom azure_citus_db_dag_deps import provision_citus_cluster\r\nfrom azure_citus_db_dag_deps import destroy_citus_cluster\r\n\r\n\r\ndefault_args = {\r\n    'owner': 'Airflow',\r\n    'depends_on_past': False,\r\n    'start_date': datetime(2019, 6, 1),\r\n    'email': &#x5B;'airflow@example.com'],\r\n    'email_on_failure': False,\r\n    'email_on_retry': False,\r\n    'retries': 0,\r\n    # 'retry_delay': timedelta(minutes=5),\r\n    # 'queue': 'bash_queue',\r\n    # 'pool': 'backfill',\r\n    # 'priority_weight': 10,\r\n    # 'end_date': datetime(2016, 1, 1),\r\n}\r\n\r\nwith DAG('Azure_Citus_Database_Job', default_args=default_args, schedule_interval=None) as dag:\r\n    provisioning_citus_cluster = PythonOperator(\r\n        task_id=&quot;Provision_Citus_Cluster&quot;,\r\n        python_callable=provision_citus_cluster.main)\r\n    creating_citus_schema = PythonOperator(\r\n        task_id=&quot;Create_DB_Schema&quot;, python_callable=create_citus_schema.main)\r\n    loading_csv_data = PythonOperator(\r\n        task_id=&quot;Load_Data&quot;, python_callable=load_data_into_citus.main)\r\n    executing_sql_query_5 = BashOperator(\r\n        task_id=&quot;Run_Query_5&quot;, bash_command='python ~\/airflow\/dags\/azure_citus_db_dag_deps\/query_data_in_citus.py Query5'\r\n    )\r\n    executing_sql_query_13 = BashOperator(\r\n        task_id=&quot;Run_Query_13&quot;, bash_command='python ~\/airflow\/dags\/azure_citus_db_dag_deps\/query_data_in_citus.py Query13'\r\n    )\r\n    executing_sql_query_47 = BashOperator(\r\n        task_id=&quot;Run_Query_47&quot;, bash_command='python ~\/airflow\/dags\/azure_citus_db_dag_deps\/query_data_in_citus.py Query47'\r\n    )\r\n    executing_sql_query_57 = BashOperator(\r\n        task_id=&quot;Run_Query_57&quot;, bash_command='python ~\/airflow\/dags\/azure_citus_db_dag_deps\/query_data_in_citus.py Query57'\r\n    )\r\n    destroying_citus_cluster = PythonOperator(\r\n        task_id=&quot;Destroy_Citus_Cluster&quot;, python_callable=destroy_citus_cluster.main)\r\n\r\n    emailing_operator = EmailOperator(\r\n        task_id=&quot;Email_Operator&quot;,\r\n        to=&quot;your_gmail_email_address@gmail.com&quot;,\r\n        subject=&quot;Airflow Message&quot;,\r\n        html_content=&quot;&quot;&quot;&lt;h1&gt;Congratulations! All your tasks are now completed.&lt;\/h1&gt;&quot;&quot;&quot;\r\n    )\r\n\r\nprovisioning_citus_cluster &gt;&gt; creating_citus_schema &gt;&gt; loading_csv_data &gt;&gt; executing_sql_query_5 &gt;&gt; destroying_citus_cluster &gt;&gt; emailing_operator\r\nprovisioning_citus_cluster &gt;&gt; creating_citus_schema &gt;&gt; loading_csv_data &gt;&gt; executing_sql_query_13 &gt;&gt; destroying_citus_cluster &gt;&gt; emailing_operator\r\nprovisioning_citus_cluster &gt;&gt; creating_citus_schema &gt;&gt; loading_csv_data &gt;&gt; executing_sql_query_47 &gt;&gt; destroying_citus_cluster &gt;&gt; emailing_operator\r\nprovisioning_citus_cluster &gt;&gt; creating_citus_schema &gt;&gt; loading_csv_data &gt;&gt; executing_sql_query_57 &gt;&gt; destroying_citus_cluster &gt;&gt; emailing_operator\r\n<\/pre>\n<p class=\"Standard\" style=\"text-align: justify;\">Now that we have everything in place we can kick this DAG off and wait for the pipeline to go through its individual execution steps, concluding with the queries output files staged in the nominated folder as well as the email sent to the Gmail address we specified.<\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">When finished, Airflow can provide us with the Gantt chart breakdown on how long individual tasks took which helps with pinpointing slow-running processes and gives us a good overview of time consumed across each step.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2020\/04\/Airflow_DAG_Execution_Gantt_Chart.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter size-full wp-image-4066\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2020\/04\/Airflow_DAG_Execution_Gantt_Chart.png\" alt=\"\" width=\"580\" height=\"275\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2020\/04\/Airflow_DAG_Execution_Gantt_Chart.png 580w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2020\/04\/Airflow_DAG_Execution_Gantt_Chart-300x142.png 300w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<p class=\"Standard\" style=\"text-align: justify;\">Additionally, we can now see the DAG tree view completion status (green = success, red = failure) for each run and each step, the four output files staged in the nominated directory with expected queries result sets persisted and finally, the email notification sent out on pipeline successful completion.<\/p>\n<p><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2020\/04\/Airflow_DAG_Tree_View.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-4100\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2020\/04\/Airflow_DAG_Tree_View.png\" alt=\"\" width=\"580\" height=\"254\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2020\/04\/Airflow_DAG_Tree_View.png 787w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2020\/04\/Airflow_DAG_Tree_View-300x132.png 300w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2020\/04\/Airflow_DAG_Tree_View-768x337.png 768w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><a href=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2020\/04\/Airflow_Job_Completion_Email.png\"><img loading=\"lazy\" decoding=\"async\" class=\"aligncenter wp-image-4102\" src=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/\/2020\/04\/Airflow_Job_Completion_Email.png\" alt=\"\" width=\"580\" height=\"139\" srcset=\"http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2020\/04\/Airflow_Job_Completion_Email.png 732w, http:\/\/bicortex.com\/bicortex\/wp-content\/post_content\/2020\/04\/Airflow_Job_Completion_Email-300x72.png 300w\" sizes=\"auto, (max-width: 580px) 100vw, 580px\" \/><\/a><\/p>\n<h3 style=\"text-align: center;\">Conclusion<\/h3>\n<p class=\"Standard\" style=\"text-align: justify;\">This concludes this short series outlining some of the core functionality of Airflow workflow management system. From the short time I spent with Airflow it seems like it fills the void for a robust, open-source, configuration-as-code platform which can be used across many different application, not just ETL\/ELT e.g. automating DevOps operations, machine learning pipelines, scheduler replacement etc. Its documentation is passable, it has a fairly mild learning curve, it&#8217;s fairly extensible and scalable, has a good management interface and its adoption rate, particularly in the startup community, is high as it is slowly becoming the de facto platform for programmatic workloads authoring\/scheduling. Data engineering field has been changing rapidly over the last decade, competition is rife and I&#8217;m not sure whether Airflow will be able to sustain its momentum in years to come. However, given its current rise to stardom, easily superseding tools such as Apache Oozie or Luigi, anyone in the field of Data Engineering should be at least familiar with it.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Airflow Sample Workflow Continued&#8230; Note: Part 1 can be found HERE and all files used in this tutorial can be downloaded from HERE. In the first part to this short series on Apache Airflow I outlined it its core architecture, installation process and created a sample example of a very rudimentary DAG (Directed Acyclic Graph). [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[61,72,56,5],"tags":[65,62,79,81,73,24,41,49],"class_list":["post-4027","post","type-post","status-publish","format-standard","hentry","category-cloud-computing","category-mpp-rdbms","category-programming","category-sql","tag-azure","tag-cloud-computing","tag-data-warehouse","tag-etl","tag-mpp-rdbms","tag-programming","tag-python","tag-sql"],"aioseo_notices":[],"jetpack_featured_media_url":"","_links":{"self":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/4027","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/comments?post=4027"}],"version-history":[{"count":72,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/4027\/revisions"}],"predecessor-version":[{"id":4369,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/posts\/4027\/revisions\/4369"}],"wp:attachment":[{"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/media?parent=4027"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/categories?post=4027"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/bicortex.com\/bicortex\/wp-json\/wp\/v2\/tags?post=4027"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}