RDC ETL Control with Nifi
This document will describe the methods that we use to control RDC ETL job using Nifi
Requirements to perform ETL control tasks
Access and appropriate privileges on the RDC postgres databases.
Configuration Databases:
- i2rdc3-dev-pg-flex-config.dev.i2rdc3.wustl.edu (RW)
- i2rdc3-qa-pg-flex-config.qa.i2rdc3.wustl.edu (RW)
- i2rdc3-prod-pg-flex-config.prod.i2rdc3.wustl.edu (RW)
RDC OMOP Databases:
- postgresql.dev.i2rdc3.wustl.edu (RO pg_stat_activity table)
- postgresql.qa.i2rdc3.wustl.edu (RO pg_stat_activity table)
- postgresql.prod.i2rdc3.wustl.edu (RO pg_stat_activity table)
SQL tools - RIS compute cluster, DBeaver or PGAdmin
NOTE: The RIS compute cluster can be used, it has a native psql tool and access to a unified logging space.
See RIS compute setup document on how to use RIS compute cluster.
Access to the NIFI Administator's Group for the appropriate Environment.
Starting RDC ETL:
Review current state of ETL job using the configuration database
-- use the following query to review the existing state of the ETL job.
select * from etl_run.config_main cm where department_name = 'rdc3';
Field Definitions to pay attention to:
job_id - currently or previously running etl job identifier
department_name - currently of previously running etl job identifier
last_run_date - date and time of last successfully completed etl job
run_schedule - scheduler for running etl job that uses crontab syntax, this is checked ever 5 minutes by Nifi
active - toggles whether etl job is active, will not run unless set to TRUE
currently_running - show whether etl job is currently running (TRUE) or not (FALSE)
Checking the progress of a running ETL job
-- used to monitor the primary stage (parent_stage) progression the etl job
with a as
(
select
max(job_id) id
from
etl_run.controller_generated_sql cgs
)
select
count(case when parent_stage = 'Ingress' and complete = true then 1 end) ingress_done,
count(case when parent_stage = 'Ingress' and complete = false then 1 end) ingress_left,
(sum(case when parent_stage = 'Ingress' and complete = true then 1.0 end)/
sum(case when parent_stage = 'Ingress' then 1.0 end)*100)::decimal(5,2) || '%' ingress_status,
count(case when parent_stage = 'Stage.SH' and complete = true then 1 end) stage_done,
count(case when parent_stage = 'Stage.SH' and complete = false then 1.0 end) stage_left,
(sum(case when parent_stage = 'Stage.SH' and complete = true then 1.0 end)/
sum(case when parent_stage = 'Stage.SH' then 1.0 end)*100)::decimal(5,2) || '%' stage_status,
count(case when parent_stage = 'omop_staging/MERGE_OMOP' and complete = true then 1 end) mergeomop_done,
count(case when parent_stage = 'omop_staging/MERGE_OMOP' and complete = false then 1 end) mergeomop_left,
(sum(case when parent_stage = 'omop_staging/MERGE_OMOP' and complete = true then 1.0 end)/
sum(case when parent_stage = 'omop_staging/MERGE_OMOP' then 1.0 end) * 100)::decimal(5,2)|| '%' mergeomop_status,
count(case when parent_stage = 'OmopStage.SH' and complete = true then 1 end) omopstage_done,
count(case when parent_stage = 'OmopStage.SH' and complete = false then 1 end) omopstage_left,
(sum(case when parent_stage = 'OmopStage.SH' and complete = true then 1.0 end)/
(sum(case when parent_stage = 'OmopStage.SH' then 1.0 end)*100)::decimal(5,2)) || '%' omopstage_status,
max(job_id) job_id
from
etl_run.controller_generated_sql cgs2
where job_id = (select id from a);
Since the steps are calculated in real-time the numbers will likely change as the ETL job progresses.
Order of stages as ETL job progresses:
- Ingress
- Stage
- Merge OMOP
- OMOP Stage
Checking what is running currently on the RDC OMOP databases
--- this will show what the sql is currently running against the RDC OMOP database
select * from pg_stat_activity where usename = 'rdc_etl_user' and state = 'active' order by state desc;
Setting up the ELT Jobs to run on a schedule
QA and Production ETL job scheduling
This type of scheduling does not require any other steps than what is described below.
-- QA 6-8pm Sun-Thurs -6.00 hour offset for CST
update etl_run.config_main set run_schedule = '0 0-2 * * 1-5' where department_name = 'rdc3';
-- prod 4-5pm Sun-Thurs -6.00 hour offset for CST
update etl_run.config_main set run_schedule = '0 22-23 * * 0-4' where department_name = 'rdc3';
-- validate run_schedule has been set correctly in config_main table
select * from etl_run.config_main cm where department_name = 'rdc3';
-- if not sure check with data warehousing developers
NOTE: Certain types of ETL runs will set active column to FALSE, the ETL will not run unless active = TRUE;
--- In some cases this query is needed to continue ETL reprocessing, if not sure check with data warehousing developers
update etl_run.config_main set currently_running = false, active = true where department_name = 'rdc3';
NOTE: check last_run_date column, ETLs that have not ran for over a week can take more than 48 hours to complete. This can impact Databricks ingestion on Sunday.
AD-HOC run of ETL job
Type of scheduling used for AD-HOC ETL runs
-- run anytime at 5 minutes after top of the hour
update etl_run.config_main set run_schedule = '0 0-23 * * *' where department_name = 'rdc3';
-- hold ETL via unattainable cron setting
update etl_run.config_main set run_schedule = '0 25-26 * * *' where department_name = 'rdc3';
-- validate run_schedule has been set correctly in config_main table
select * from etl_run.config_main cm where department_name = 'rdc3';
Using Nifi to trigger ELT Job within 5 minutes
Navigate to Nifi for the correct environment:
Navigate through controler groups: WUSM-->RDC3-->RDC3_Main:
Find "ManualRun" controller (top-left of panel):
Right-click to Enable (if not already enabled):
Right-click to Run-Once:
Right click to Disable:
Additional NIFI resources
Working with ETL errors and failures
Monitoring Errors
Monitoring is provided through the "Azure I2 RDC"-->"RDC Nifi Alerts" channel in teams.
the job_id will need to be referenced in order to determine which environment the alert is for
-- use the following query to review the job_id for each environment.
select * from etl_run.config_main cm where department_name = 'rdc3';
Other helpful queries
--- restart failed etl jobs select * from etl_run.controller_reprocessing cr where main_id = 1 and manual_retry_dtm is null; -- error history of etl jobs select * from etl_run.history_run_errors where job_id = 605; -- silent errors (network errors) select * from etl_run.controller_omop; -- update table count to force restart --update etl_run.controller_reprocessing set reprocessing_ready = true --where main_id = 1 --and manual_retry_dtm is null; -- -- check current run select * from etl_run.config_ingestion_artifacts cia where active = true order by delta_column_current_value desc; -- check last run select ci.* from etl_run.history_ingestion as ci where job_id = 605 order by job_id desc; -- individual step by job_id -- good for find last step that failed select *, end_time - start_time duration from etl_run.history_generated_sql hgs where job_id = 605 order by id desc; -- current plan by job -- moved to history at stage boundaries select * from etl_run.controller_generated_sql where job_id= 605 order by id desc;