Below, I'll provide examples of creating and orchestrating data pipelines and performing data quality validation over large, complex data sources and schemas. These examples will build on the previous EHR-based context, simulating a real-world healthcare scenario with large datasets. I'll use Python with popular tools like Apache Airflow (for orchestration), Pandas (for data manipulation), and Great Expectations (for data quality validation). The examples will include code snippets and explanations.
We’re managing EHR data from multiple hospitals stored in a PostgreSQL database. The data includes patient demographics, visits, and lab results—each with complex schemas and millions of records. We need to:
1. Create a data pipeline to extract, transform, and load (ETL) this data into a unified analytics table.
2. Orchestrate the pipeline to run daily.
3. Validate data quality to ensure completeness, consistency, and accuracy.
patients
: (patient_id, name, dob, gender, hospital_id)
visits
: (visit_id, patient_id, visit_date, diagnosis, treatment, hospital_id)
lab_results
: (lab_id, visit_id, test_name, result_value, test_date)
import pandas as pd
from sqlalchemy import create_engine
import datetime
# Database connection
engine = create_engine('postgresql://user:password@localhost:5432/ehr_db')
# Extract function
def extract_data(start_date: str, end_date: str) -> Dict[str, pd.DataFrame]:
"""Extract data from source tables within a date range."""
queries = {
"patients": "SELECT * FROM patients",
"visits": f"""
SELECT * FROM visits
WHERE visit_date BETWEEN '{start_date}' AND '{end_date}'
""",
"lab_results": f"""
SELECT * FROM lab_results
WHERE test_date BETWEEN '{start_date}' AND '{end_date}'
"""
}
dataframes = {key: pd.read_sql(query, engine) for key, query in queries.items()}
return dataframes
# Transform function
def transform_data(dataframes: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""Transform and merge data into a unified analytics table."""
# Merge visits with patients
df_visits = dataframes["visits"].merge(
dataframes["patients"][["patient_id", "name", "dob", "gender", "hospital_id"]],
on="patient_id",
how="left"
)
# Merge with lab results
df_combined = df_visits.merge(
dataframes["lab_results"][["visit_id", "test_name", "result_value"]],
on="visit_id",
how="left"
)
# Calculate patient age
df_combined["age"] = df_combined["dob"].apply(
lambda x: (datetime.datetime.now() - pd.to_datetime(x)).days // 365
)
# Add a derived column: has_abnormal_result
df_combined["has_abnormal_result"] = df_combined["result_value"].apply(
lambda x: 1 if pd.notnull(x) and (x < 0 or x > 1000) else 0
)
return df_combined
# Load function
def load_data(df: pd.DataFrame, target_table: str):
"""Load transformed data into a target table."""
df.to_sql(target_table, engine, if_exists="append", index=False)
print(f"Loaded {len(df)} rows into {target_table}")
# Pipeline execution
if __name__ == "__main__":
# Define date range (e.g., last 7 days)
end_date = datetime.datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.datetime.now() - datetime.timedelta(days=7)).strftime("%Y-%m-%d")
# Run ETL
raw_data = extract_data(start_date, end_date)
transformed_data = transform_data(raw_data)
load_data(transformed_data, "analytics_ehr")
pd.read_sql
with chunksize
) or use distributed frameworks like Apache Spark.from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from etl_script import extract_data, transform_data, load_data # Import from above
# Default arguments for the DAG
default_args = {
"owner": "ehr_team",
"depends_on_past": False,
"email_on_failure": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
# Define the DAG
with DAG(
"ehr_etl_pipeline",
default_args=default_args,
description="Daily EHR data pipeline",
schedule_interval="@daily",
start_date=datetime(2025, 1, 1),
catchup=False,
) as dag:
# Define tasks
def extract_task():
end_date = datetime.now().strftime("%Y-%m-%d")
start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
return extract_data(start_date, end_date)
def transform_task(ti):
raw_data = ti.xcom_pull(task_ids="extract_data")
return transform_data(raw_data)
def load_task(ti):
transformed_data = ti.xcom_pull(task_ids="transform_data")
load_data(transformed_data, "analytics_ehr")
# Task definitions
t1 = PythonOperator(
task_id="extract_data",
python_callable=extract_task,
)
t2 = PythonOperator(
task_id="transform_data",
python_callable=transform_task,
)
t3 = PythonOperator(
task_id="load_data",
python_callable=load_task,
)
# Task dependencies
t1 >> t2 >> t3
t1 >> t2 >> t3
ensures sequential execution.TaskGroup
.import great_expectations as ge
import pandas as pd
from sqlalchemy import create_engine
# Connect to database
engine = create_engine('postgresql://user:password@localhost:5432/ehr_db')
# Load data to validate (e.g., from the analytics table)
df = pd.read_sql("SELECT * FROM analytics_ehr LIMIT 1000000", engine)
ge_df = ge.from_pandas(df)
# Define expectations
ge_df.expect_column_values_to_not_be_null("patient_id")
ge_df.expect_column_values_to_be_of_type("visit_date", "datetime64")
ge_df.expect_column_values_to_be_between("age", min_value=0, max_value=150)
ge_df.expect_table_row_count_to_be_between(min_value=1000, max_value=10000000)
ge_df.expect_column_values_to_be_in_set("gender", ["M", "F", "Other"])
ge_df.expect_column_mean_to_be_between("result_value", min_value=0, max_value=500)
# Validate data
validation_results = ge_df.validate()
# Output results
if validation_results["success"]:
print("Data quality validation passed!")
else:
print("Data quality issues found:")
for result in validation_results["results"]:
if not result["success"]:
print(f"- {result['expectation_config']['expectation_type']}: {result['result']}")
# Save validation report
ge_df.save_expectation_suite("ehr_expectations.json")
with open("validation_report.json", "w") as f:
f.write(str(validation_results))
not_be_null
: Ensures no missing patient IDs.be_of_type
: Verifies date columns are in datetime format.be_between
: Checks age and result values are within reasonable ranges.be_in_set
: Ensures gender values are valid.table_row_count
: Confirms the dataset size is reasonable.You can add the validation step to the Airflow DAG:
def validate_task(ti):
transformed_data = ti.xcom_pull(task_ids="transform_data")
ge_df = ge.from_pandas(transformed_data)
ge_df.expect_column_values_to_not_be_null("patient_id")
results = ge_df.validate()
if not results["success"]:
raise ValueError("Data quality validation failed!")
t4 = PythonOperator(
task_id="validate_data",
python_callable=validate_task,
)
t1 >> t2 >> t3 >> t4 # Add validation after load
Let me know if you’d like to dive deeper into any part (e.g., Spark for bigger data, more complex transformations, or additional validations)!