Skip to content

ETL Pipeline Documentation

The ETL (Extract, Transform, Load) pipeline processes raw data from multiple sources into a unified dataset suitable for machine learning analysis.

Overview

The pipeline processes data from three main sources:

  • Air Quality Data: Pollutant measurements with station metadata
  • Health Data: Respiratory disease mortality and life expectancy statistics
  • Socioeconomic Data: GDP (PIB) and population data

All data covers Spanish provinces from 2000-2021 and is transformed into a single, clean dataset.

Pipeline Architecture & Data Flow

The pipeline follows a modular design with 8 sequential steps, each inheriting from the ETLStep base class:

flowchart TD
    subgraph "Input Data"
        A1[Air Quality CSVs<br/>Station measurements]
        A2[Health CSVs<br/>Province mortality/life expectancy] 
        A3[Socioeconomic CSVs<br/>PIB/Population by province]
    end

    subgraph "ETL Processing"
        B1[<b>DataExtractionStep</b><br/>Extract & Load Raw DataFrames]
        B2[<b>DataTransformationStep</b><br/>Transform & Standardize]
        B3[<b>DataMergingStep</b><br/>Merge on Province+Year]
        B4[<b>FeatureEngineeringStep</b><br/>Create calculated features]
        B5[<b>DataCleaningStep</b><br/>Clean & filter data]
        B6[<b>DataValidationStep</b><br/>Ensure data quality]
        B7[<b>DataExportStep</b><br/>Save final dataset]
        B8[<b>DataQualityReportStep</b><br/>Generate reports]
    end

    subgraph "Output"
        C1[Unified Dataset<br/>Province-Year-Pollutant records]
        C2[Quality Reports<br/>HTML/JSON]
        C3[Metadata & Dictionary<br/>CSV/JSON]
    end

    A1 --> B1
    A2 --> B1
    A3 --> B1
    B1 --> B2
    B2 --> B3
    B3 --> B4
    B4 --> B5
    B5 --> B6
    B6 --> B7
    B7 --> B8
    B7 --> C1
    B7 --> C3
    B8 --> C2

ETL Steps in Detail

1. DataExtractionStep

Purpose: Read raw CSV files from different data sources

graph LR
    A[data/air_quality/raw/] --> D[Air Quality DataFrames]
    B[data/health/raw/] --> E[Health DataFrames]
    C[data/socioeconomic/raw/] --> F[Socioeconomic DataFrames]

    style D fill:#e1f5fe
    style E fill:#f3e5f5
    style F fill:#e8f5e8
  • Loads CSV files from data/{source}/raw/ directories
  • Creates separate DataFrames for each data source
  • Performs initial data type detection and basic validation

2. DataTransformationStep

Purpose: Transform and standardize data from each source

graph TD
    A[Air Quality DataFrames] --> B[Air Quality Transformer]
    C[Health DataFrames] --> D[Health Transformer]
    E[Socioeconomic DataFrames] --> F[Socioeconomic Transformer]

    B --> G[Standardized Air Quality Data]
    D --> H[Standardized Health Data]
    F --> I[Standardized Socioeconomic Data]

    style G fill:#e1f5fe
    style H fill:#f3e5f5
    style I fill:#e8f5e8

Key transformations:

  • Air Quality:

    • Clean invalid Province values
    • Classify air quality levels using pollutant thresholds
  • Health:

    • Rename columns (Provincias→Province, Periodo→Year, Total→target metrics)
    • Remove commas/dots from respiratory disease totals
    • Convert to numeric types
  • Socioeconomic:

    • Transform GDP from wide to long format
    • Rename columns (value→pib, Provincia→Province, anio→Year)
    • Remove dots from population numbers
    • Convert date formats

Common transformations applied to all sources:

  • Province name standardization using unified mapping system
  • Date/year column standardization to consistent format

Province Name Standardization

Each data source uses different province naming conventions, but Province+Year serves as the primary key for merging. All sources must share identical province names using a unified mapping system.

3. DataMergingStep

Purpose: Combine all data sources into a single DataFrame

graph TD
    A[Air Quality Data] --> D{Merge on<br/>Province + Year}
    B[Health Data] --> D
    C[Socioeconomic Data] --> D
    D --> E[Unified DataFrame]

Merge process:

  • Uses Province and Year as composite key
  • Maintains air quality station metadata
  • Preserves pollutant-specific measurements
  • Results in multiple rows per province-year (one per pollutant type)

4. FeatureEngineeringStep

Purpose: Create new calculated columns

graph TD
    A[Merged Data] --> B[Calculate respiratory_deaths_per_100k]
    B --> C[Enhanced Dataset]

Generated feature: - respiratory_deaths_per_100k: Calculates respiratory deaths per 100,000 population from respiratory disease totals and population data

5. DataCleaningStep

Purpose: Clean and filter the dataset

graph TD
    A[Enhanced Dataset] --> B[Remove Metadata Columns]
    B --> C[Remove Island Observations]
    C --> D[Filter Timeframe]
    D --> E[Convert Categories to Lowercase]
    E --> F[Handle Null Values]
    F --> G[Remove Duplicates]
    G --> H[Convert Data Types]
    H --> I[Standardize Column Names]
    I --> J[Clean Dataset]

Cleaning operations:

  • Remove metadata columns (e.g., "Total Nacional", "Comunidades y Ciudades Autónomas")
  • Remove island observations (excluded regions from configuration)
  • Filter timeframe to configured date range
  • Convert categorical columns to lowercase (except Province)
  • Handle null values (<5% removed, >5% kept for imputation)
  • Remove duplicate rows
  • Convert columns to appropriate data types from feature_types.yaml
  • Standardize column names to lowercase with underscores

6. DataValidationStep

Purpose: Ensure data quality and integrity

graph TD
    A[Clean Dataset] --> B[Validate Not Empty]
    B --> C[Validate Nulls]
    C --> D[Validate Data Types]
    D --> E[Validate Duplicates]
    E --> F[Validate Required Columns]
    F --> G[Validate Business Rules]
    G --> H[Detect Statistical Anomalies]
    H --> I{Validation Results}
    I -->|Pass| J[Validated Dataset]
    I -->|Warnings| K[Log Issues & Continue]
    I -->|Errors| L[Pipeline Failure]

Validation checks:

  • Not Empty: Ensure DataFrame has data and rows
  • Nulls: Check null percentage against configured thresholds
  • Data Types: Verify columns match feature_types.yaml schema
  • Duplicates: Detect duplicate rows (configurable allowance)
  • Required Columns: Ensure all required columns are present
  • Business Rules: Validate year ranges and positive pollution levels
  • Statistical Anomalies: Detect outliers using IQR method (>10% triggers warning)

7. DataExportStep

Purpose: Save the final dataset to storage

graph TD
    A[Validated Dataset] --> B[Export to CSV]
    A --> C[Generate Metadata]
    A --> D[Create Data Dictionary]
    B --> E[data/output/dataset.csv]
    C --> F[data/output/metadata.json]
    D --> G[data/output/data_dictionary.csv]

Export outputs:

  • Main dataset: data/output/dataset.csv
  • Metadata file with processing statistics
  • Data dictionary with column descriptions
  • Processing logs and timestamps

8. DataQualityReportStep

Purpose: Generate quality reports

graph TD
    A[Final Dataset] --> B[Statistical Summary]
    A --> C[Missing Data Report]
    A --> D[Distribution Analysis]
    A --> E[Geographic Analysis]
    B --> F[Quality Report HTML]
    C --> F
    D --> F
    E --> F
    F --> G[data/output/reports/]

Report contents:

  • Descriptive statistics for all variables
  • Missing data patterns and percentages
  • Pollutant distribution analysis
  • Geographic coverage and station distribution
  • Data quality scores and recommendations

Configuration

The pipeline uses YAML configuration files:

  • config/pipeline_config.yaml: Main pipeline settings
  • utils/unified_province_name.json: Province name mappings
  • ../common/feature_types.yaml: Feature type definitions

Error Handling & Recovery

The pipeline includes built-in error recovery mechanisms:

  • Validation warnings: Continue processing with logged warnings
  • Missing data: Apply configured imputation strategies
  • Processing errors: Retry with alternative approaches
  • Critical failures: Stop pipeline with detailed error reporting

Testing

Test suite that covers all pipeline components:

# Run all ETL tests
pytest tests/

# Run specific test categories
pytest tests/extract_tests/
pytest tests/transform_tests/
pytest tests/load_tests/

Output Dataset Schema

The final dataset contains the following columns:

Column Type Description
air_pollutant string Pollutant type (no2, pm25, pm10, so2, o3)
air_pollutant_description string Full pollutant description
data_aggregation_process string How data was aggregated (e.g., "annual mean / 1 calendar year")
year date Measurement year (YYYY-01-01 format)
air_pollution_level float Pollutant concentration value
unit_of_air_pollution_level string Measurement unit (typically μg/m³)
air_quality_station_type string Station type (traffic, background, industrial)
air_quality_station_area string Area type (urban, suburban, rural)
longitude float Station longitude coordinate
latitude float Station latitude coordinate
altitude float Station altitude (meters)
province string Standardized Spanish province name
quality string Data quality classification (buena, etc.)
respiratory_diseases_total integer Total respiratory disease deaths
life_expectancy_total float Life expectancy at birth
pib float PIB (GDP) in billions of euros
population integer Province population
respiratory_deaths_per_100k float Respiratory deaths per 100,000 population