Schema

탁가이버·2025년 3월 1일
0

Grok3

목록 보기
4/7

Below, I'll provide examples of creating and orchestrating data pipelines and performing data quality validation over large, complex data sources and schemas. These examples will build on the previous EHR-based context, simulating a real-world healthcare scenario with large datasets. I'll use Python with popular tools like Apache Airflow (for orchestration), Pandas (for data manipulation), and Great Expectations (for data quality validation). The examples will include code snippets and explanations.


Scenario

We’re managing EHR data from multiple hospitals stored in a PostgreSQL database. The data includes patient demographics, visits, and lab results—each with complex schemas and millions of records. We need to:
1. Create a data pipeline to extract, transform, and load (ETL) this data into a unified analytics table.
2. Orchestrate the pipeline to run daily.
3. Validate data quality to ensure completeness, consistency, and accuracy.


1. Creation of Data Pipeline

Tools Used

  • PostgreSQL: Source database with large, complex schemas.
  • Pandas: For data transformation.
  • SQLAlchemy: For database connectivity.

Example Schema

  • patients: (patient_id, name, dob, gender, hospital_id)
  • visits: (visit_id, patient_id, visit_date, diagnosis, treatment, hospital_id)
  • lab_results: (lab_id, visit_id, test_name, result_value, test_date)

Python Script for ETL Pipeline

import pandas as pd
from sqlalchemy import create_engine
import datetime

# Database connection
engine = create_engine('postgresql://user:password@localhost:5432/ehr_db')

# Extract function
def extract_data(start_date: str, end_date: str) -> Dict[str, pd.DataFrame]:
    """Extract data from source tables within a date range."""
    queries = {
        "patients": "SELECT * FROM patients",
        "visits": f"""
            SELECT * FROM visits 
            WHERE visit_date BETWEEN '{start_date}' AND '{end_date}'
        """,
        "lab_results": f"""
            SELECT * FROM lab_results 
            WHERE test_date BETWEEN '{start_date}' AND '{end_date}'
        """
    }
    dataframes = {key: pd.read_sql(query, engine) for key, query in queries.items()}
    return dataframes

# Transform function
def transform_data(dataframes: Dict[str, pd.DataFrame]) -> pd.DataFrame:
    """Transform and merge data into a unified analytics table."""
    # Merge visits with patients
    df_visits = dataframes["visits"].merge(
        dataframes["patients"][["patient_id", "name", "dob", "gender", "hospital_id"]],
        on="patient_id",
        how="left"
    )
    
    # Merge with lab results
    df_combined = df_visits.merge(
        dataframes["lab_results"][["visit_id", "test_name", "result_value"]],
        on="visit_id",
        how="left"
    )
    
    # Calculate patient age
    df_combined["age"] = df_combined["dob"].apply(
        lambda x: (datetime.datetime.now() - pd.to_datetime(x)).days // 365
    )
    
    # Add a derived column: has_abnormal_result
    df_combined["has_abnormal_result"] = df_combined["result_value"].apply(
        lambda x: 1 if pd.notnull(x) and (x < 0 or x > 1000) else 0
    )
    
    return df_combined

# Load function
def load_data(df: pd.DataFrame, target_table: str):
    """Load transformed data into a target table."""
    df.to_sql(target_table, engine, if_exists="append", index=False)
    print(f"Loaded {len(df)} rows into {target_table}")

# Pipeline execution
if __name__ == "__main__":
    # Define date range (e.g., last 7 days)
    end_date = datetime.datetime.now().strftime("%Y-%m-%d")
    start_date = (datetime.datetime.now() - datetime.timedelta(days=7)).strftime("%Y-%m-%d")
    
    # Run ETL
    raw_data = extract_data(start_date, end_date)
    transformed_data = transform_data(raw_data)
    load_data(transformed_data, "analytics_ehr")

Explanation

  • Extract: Pulls data from three tables with millions of rows, filtered by a date range to manage volume.
  • Transform: Merges tables, calculates derived fields (e.g., age, abnormal results), and handles NULLs.
  • Load: Writes the unified data to an analytics table for downstream use.
  • Scalability: For larger datasets, you could chunk the extraction (e.g., pd.read_sql with chunksize) or use distributed frameworks like Apache Spark.

2. Orchestration of Data Pipeline with Apache Airflow

Tools Used

  • Apache Airflow: Workflow orchestration tool to schedule and monitor the pipeline.

Airflow DAG Example

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from etl_script import extract_data, transform_data, load_data  # Import from above

# Default arguments for the DAG
default_args = {
    "owner": "ehr_team",
    "depends_on_past": False,
    "email_on_failure": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

# Define the DAG
with DAG(
    "ehr_etl_pipeline",
    default_args=default_args,
    description="Daily EHR data pipeline",
    schedule_interval="@daily",
    start_date=datetime(2025, 1, 1),
    catchup=False,
) as dag:

    # Define tasks
    def extract_task():
        end_date = datetime.now().strftime("%Y-%m-%d")
        start_date = (datetime.now() - timedelta(days=7)).strftime("%Y-%m-%d")
        return extract_data(start_date, end_date)

    def transform_task(ti):
        raw_data = ti.xcom_pull(task_ids="extract_data")
        return transform_data(raw_data)

    def load_task(ti):
        transformed_data = ti.xcom_pull(task_ids="transform_data")
        load_data(transformed_data, "analytics_ehr")

    # Task definitions
    t1 = PythonOperator(
        task_id="extract_data",
        python_callable=extract_task,
    )
    t2 = PythonOperator(
        task_id="transform_data",
        python_callable=transform_task,
    )
    t3 = PythonOperator(
        task_id="load_data",
        python_callable=load_task,
    )

    # Task dependencies
    t1 >> t2 >> t3

Explanation

  • DAG: Defines a daily pipeline with three tasks: extract, transform, and load.
  • Task Dependencies: t1 >> t2 >> t3 ensures sequential execution.
  • XCom: Airflow’s cross-task communication passes data between tasks.
  • Scheduling: Runs daily, with retries for robustness.
  • Scalability: For larger pipelines, you could add parallel tasks (e.g., process each hospital separately) using Airflow’s TaskGroup.

3. Data Quality Validation with Great Expectations

Tools Used

  • Great Expectations: Open-source library for data validation.

Validation Example

import great_expectations as ge
import pandas as pd
from sqlalchemy import create_engine

# Connect to database
engine = create_engine('postgresql://user:password@localhost:5432/ehr_db')

# Load data to validate (e.g., from the analytics table)
df = pd.read_sql("SELECT * FROM analytics_ehr LIMIT 1000000", engine)
ge_df = ge.from_pandas(df)

# Define expectations
ge_df.expect_column_values_to_not_be_null("patient_id")
ge_df.expect_column_values_to_be_of_type("visit_date", "datetime64")
ge_df.expect_column_values_to_be_between("age", min_value=0, max_value=150)
ge_df.expect_table_row_count_to_be_between(min_value=1000, max_value=10000000)
ge_df.expect_column_values_to_be_in_set("gender", ["M", "F", "Other"])
ge_df.expect_column_mean_to_be_between("result_value", min_value=0, max_value=500)

# Validate data
validation_results = ge_df.validate()

# Output results
if validation_results["success"]:
    print("Data quality validation passed!")
else:
    print("Data quality issues found:")
    for result in validation_results["results"]:
        if not result["success"]:
            print(f"- {result['expectation_config']['expectation_type']}: {result['result']}")

# Save validation report
ge_df.save_expectation_suite("ehr_expectations.json")
with open("validation_report.json", "w") as f:
    f.write(str(validation_results))

Explanation

  • Expectations:
    • not_be_null: Ensures no missing patient IDs.
    • be_of_type: Verifies date columns are in datetime format.
    • be_between: Checks age and result values are within reasonable ranges.
    • be_in_set: Ensures gender values are valid.
    • table_row_count: Confirms the dataset size is reasonable.
  • Validation: Runs checks against a million-row sample (adjustable for scale).
  • Output: Reports pass/fail status and details issues for investigation.
  • Scalability: For larger datasets, validate in batches or integrate with a data lake (e.g., S3).

Integration into Pipeline

You can add the validation step to the Airflow DAG:

def validate_task(ti):
    transformed_data = ti.xcom_pull(task_ids="transform_data")
    ge_df = ge.from_pandas(transformed_data)
    ge_df.expect_column_values_to_not_be_null("patient_id")
    results = ge_df.validate()
    if not results["success"]:
        raise ValueError("Data quality validation failed!")

t4 = PythonOperator(
    task_id="validate_data",
    python_callable=validate_task,
)
t1 >> t2 >> t3 >> t4  # Add validation after load

Key Features Demonstrated

  1. Complex Schemas: Handles joins across multiple tables with different structures.
  2. Large Datasets: Optimized queries and batch processing for scalability.
  3. Orchestration: Automated, scheduled execution with error handling.
  4. Data Quality: Robust validation to ensure reliability.

Let me know if you’d like to dive deeper into any part (e.g., Spark for bigger data, more complex transformations, or additional validations)!

profile
더 나은 세상은 가능하다를 믿고 실천하는 활동가

0개의 댓글