RDC ETL Control with Nifi

This document will describe the methods that we use to control RDC ETL job using Nifi

Requirements to perform ETL control tasks

Access and appropriate privileges on the RDC postgres databases.

Configuration Databases:

RDC OMOP Databases:

SQL tools - RIS compute cluster, DBeaver or PGAdmin

NOTE: The RIS compute cluster can be used, it has a native psql tool and access to a unified logging space.

See RIS compute setup document on how to use RIS compute cluster.

Access to the NIFI Administator's Group for the appropriate Environment.

Starting RDC ETL:

Review current state of ETL job using the configuration database

-- use the following query to review the existing state of the ETL job.
select * from etl_run.config_main cm where department_name = 'rdc3'; 

Image of query output via DBeaver

Field Definitions to pay attention to:

job_id - currently or previously running etl job identifier

department_name - currently of previously running etl job identifier

last_run_date - date and time of last successfully completed etl job

run_schedule - scheduler for running etl job that uses crontab syntax, this is checked ever 5 minutes by Nifi

active - toggles whether etl job is active, will not run unless set to TRUE

currently_running - show whether etl job is currently running (TRUE) or not (FALSE)

Checking the progress of a running ETL job

-- used to monitor the primary stage (parent_stage) progression the etl job 
with a as 
(
    select
        max(job_id) id
    from
        etl_run.controller_generated_sql cgs
)
select
    count(case when parent_stage = 'Ingress' and complete = true then 1 end) ingress_done,
    count(case when parent_stage = 'Ingress' and complete = false then 1 end) ingress_left,
    (sum(case when parent_stage = 'Ingress' and complete = true then 1.0 end)/
    sum(case when parent_stage = 'Ingress' then 1.0 end)*100)::decimal(5,2) || '%' ingress_status,
    count(case when parent_stage = 'Stage.SH' and complete = true then 1 end) stage_done,
    count(case when parent_stage = 'Stage.SH' and complete = false then 1.0 end) stage_left,
    (sum(case when parent_stage = 'Stage.SH' and complete = true then 1.0 end)/
    sum(case when parent_stage = 'Stage.SH' then 1.0 end)*100)::decimal(5,2) || '%' stage_status,
    count(case when parent_stage = 'omop_staging/MERGE_OMOP' and complete = true then 1 end) mergeomop_done,
    count(case when parent_stage = 'omop_staging/MERGE_OMOP' and complete = false then 1 end) mergeomop_left,
    (sum(case when parent_stage = 'omop_staging/MERGE_OMOP' and complete = true then 1.0 end)/
    sum(case when parent_stage = 'omop_staging/MERGE_OMOP' then 1.0 end) * 100)::decimal(5,2)|| '%' mergeomop_status,
    count(case when parent_stage = 'OmopStage.SH' and complete = true then 1 end) omopstage_done,
    count(case when parent_stage = 'OmopStage.SH' and complete = false then 1 end) omopstage_left,
    (sum(case when parent_stage = 'OmopStage.SH' and complete = true then 1.0 end)/
    (sum(case when parent_stage = 'OmopStage.SH' then 1.0 end)*100)::decimal(5,2)) || '%' omopstage_status,
    max(job_id) job_id
from
    etl_run.controller_generated_sql cgs2
where job_id = (select id from a);

Image of query output via DBeaver

Since the steps are calculated in real-time the numbers will likely change as the ETL job progresses.

Order of stages as ETL job progresses:

  • Ingress
  • Stage
  • Merge OMOP
  • OMOP Stage

Checking what is running currently on the RDC OMOP databases

--- this will show what the sql is currently running against the RDC OMOP database
select * from pg_stat_activity where usename = 'rdc_etl_user' and state = 'active' order by state desc;

image of postgres process from dbeaver

Setting up the ELT Jobs to run on a schedule

QA and Production ETL job scheduling

This type of scheduling does not require any other steps than what is described below.

-- QA 6-8pm Sun-Thurs -6.00 hour offset for CST
update etl_run.config_main set run_schedule = '0 0-2 * * 1-5' where department_name = 'rdc3';

-- prod 4-5pm Sun-Thurs -6.00 hour offset for CST
update etl_run.config_main set run_schedule = '0 22-23 * * 0-4' where department_name = 'rdc3';

-- validate run_schedule has been set correctly in config_main table
select * from etl_run.config_main cm where department_name = 'rdc3'; 

-- if not sure check with data warehousing developers

NOTE: Certain types of ETL runs will set active column to FALSE, the ETL will not run unless active = TRUE;

--- In some cases this query is needed to continue ETL reprocessing, if not sure check with data warehousing developers
update etl_run.config_main set currently_running = false, active = true where department_name = 'rdc3';

NOTE: check last_run_date column, ETLs that have not ran for over a week can take more than 48 hours to complete. This can impact Databricks ingestion on Sunday.

AD-HOC run of ETL job

Type of scheduling used for AD-HOC ETL runs

-- run anytime at 5 minutes after top of the hour
update etl_run.config_main set run_schedule = '0 0-23 * * *' where department_name = 'rdc3';

-- hold ETL via unattainable cron setting
update etl_run.config_main set run_schedule = '0 25-26 * * *' where department_name = 'rdc3';

-- validate run_schedule has been set correctly in config_main table
select * from etl_run.config_main cm where department_name = 'rdc3'; 

Using Nifi to trigger ELT Job within 5 minutes

Navigate to Nifi for the correct environment:

Navigate through controler groups: WUSM-->RDC3-->RDC3_Main:

Find "ManualRun" controller (top-left of panel):

Image ManualRun controller

Right-click to Enable (if not already enabled):

Image ManualRun controller

Right-click to Run-Once:

Image ManualRun controller

Right click to Disable:

Image ManualRun controller

Additional NIFI resources

Working with ETL errors and failures

Monitoring Errors

Monitoring is provided through the "Azure I2 RDC"-->"RDC Nifi Alerts" channel in teams.

the job_id will need to be referenced in order to determine which environment the alert is for

-- use the following query to review the job_id for each environment.
select * from etl_run.config_main cm where department_name = 'rdc3'; 

Other helpful queries

--- restart failed etl jobs
select * from etl_run.controller_reprocessing cr  where main_id = 1
and manual_retry_dtm is null;

-- error history of etl jobs
select * from etl_run.history_run_errors where job_id = 605;

-- silent errors (network errors)
select * from etl_run.controller_omop;
-- update table count to force restart


--update etl_run.controller_reprocessing set reprocessing_ready = true
--where main_id = 1
--and manual_retry_dtm is null;
--

-- check current run
select * from etl_run.config_ingestion_artifacts cia where active = true order by delta_column_current_value desc;
-- check last run
select ci.*
from etl_run.history_ingestion as ci
where job_id = 605 order by job_id desc;

-- individual step by job_id
-- good for find last step that failed
select
	*,
	end_time - start_time duration
from
	etl_run.history_generated_sql hgs 
where
	job_id = 605
order by id desc;

-- current plan by job
-- moved to history at stage boundaries
select * from etl_run.controller_generated_sql where job_id= 605 order by id desc;

Updated on August 7, 2025