IT AI Skill
Data Warehouse Engineering
Design, build, and maintain data warehouses and data lakehouses including ETL/ELT pipelines, schema design, data modeling, and query optimization. Use when architecting data platforms, building ETL pipelines, designing star schemas, optimizing query perform...
Data Warehouse Engineering
Design, build, and maintain data warehouses, data lakes, and ETL/ELT pipelines for analytics and reporting.
Workflow
1. Data Warehouse Architecture
MODERN DATA PLATFORM ARCHITECTURE
═══════════════════════════════════════
SOURCE SYSTEMS:
→ Transactional DBs (PostgreSQL, MySQL, Oracle)
→ SaaS apps (Salesforce, HubSpot, NetSuite)
→ Event streams (Kafka, Kinesis)
→ File systems (S3, GCS, Azure Blob)
→ APIs (REST, GraphQL)
INGESTION LAYER:
→ Batch: Airflow, dbt, Fivetran, Stitch
→ Streaming: Kafka, Debezium, Flink
→ CDC: Debezium, AWS DMS, Striim
DATA LAKE (Raw Zone):
→ Format: Parquet/Delta Lake/Iceberg
→ Storage: S3/GCS with lifecycle policies
→ Schema: Raw/landing (as-is from source)
DATA WAREHOUSE (Curated Zone):
→ Engine: Snowflake / BigQuery / Redshift / Databricks
→ Schema: Star schema (dimensional modeling)
→ Zones: Staging → Data Marts → Aggregates
SERVICE LAYER:
→ Semantic layer: dbt, Cube, LookML
→ APIs: dbt + FastAPI, Hasura
→ Materialized views for high-frequency queries
CONSUMPTION LAYER:
→ BI: Tableau, Looker, Power BI, Metabase
→ Ad-hoc: SQL workbench, Redash, Metabase
→ ML: Feature store, model training pipelines
2. Data Modeling (Dimensional)
STAR SCHEMA DESIGN
═══════════════════════════════════════
FACT TABLE: fact_sales
═══════════════════════════════════════
Column Type Source Description
──────────────────────────────────────────────────────────────────
sales_key BIGINT PK (surrogate) Unique row ID
date_key DATE FK → dim_date Transaction date
customer_key INT FK → dim_cust Customer ID
product_key INT FK → dim_product Product ID
channel_key INT FK → dim_channel Sales channel
store_key INT FK → dim_store Store location
salesperson_key INT FK → dim_emp Sales rep
quantity INT Order quantity
unit_price DECIMAL(10,2) Price at sale
discount_amount DECIMAL(10,2) Discount given
total_amount DECIMAL(10,2) Line total
tax_amount DECIMAL(10,2) Tax charged
cost_amount DECIMAL(10,2) COGS
profit_amount DECIMAL(10,2) Line profit
currency_code CHAR(3) Transaction currency
source_system VARCHAR(20) originating system
DIMENSION TABLES:
═══════════════════════════════════════
dim_date:
date_key, year, quarter, month, day, day_name, week_number,
is_weekend, is_holiday, fiscal_year, fiscal_quarter, fiscal_month
dim_customer:
customer_key, customer_id, name, segment, region, country,
created_date, is_active, lifetime_value, acquisition_channel,
effective_date, expiry_date (SCD Type 2)
dim_product:
product_key, product_id, name, category, subcategory, brand,
sku, unit_cost, list_price, weight, launch_date, is_active
dim_channel:
channel_key, channel_id, channel_name, channel_type, region
SLOWLY CHANGING DIMENSIONS (SCD):
═══════════════════════════════════════
Type 0: Historical (no change) — never update
Type 1: Overwrite — replace old value (no history)
Type 2: Add row — new row with effective/expiry dates (full history)
Type 3: Add column — track previous value (limited history)
Implementation (Type 2):
═══════════════════════════════════════
CREATE TABLE dim_customer (
customer_key INT PRIMARY KEY,
customer_id VARCHAR(50),
name VARCHAR(200),
segment VARCHAR(50),
region VARCHAR(50),
country VARCHAR(50),
effective_date DATE,
expiry_date DATE DEFAULT '9999-12-31',
is_current BOOLEAN DEFAULT TRUE
);
-- On update:
UPDATE dim_customer SET expiry_date = CURRENT_DATE, is_current = FALSE
WHERE customer_id = 'C001' AND is_current = TRUE;
INSERT INTO dim_customer (customer_id, name, segment, region, country, effective_date)
VALUES ('C001', 'New Name', 'Enterprise', 'NA', 'US', CURRENT_DATE);
3. ETL/ELT Pipeline Design
PIPELINE ARCHITECTURE
═══════════════════════════════════════
EXTRACT → STAGE → TRANSFORM → LOAD → VALIDATE
STAGING LAYER (RAW):
═══════════════════════════════════════
→ All source data loaded as-is
→ Table naming: stg_<source_system>_<entity>
→ Include: source_timestamp, extracted_at, batch_id
→ No transformations applied
TRANSFORMATION LAYER (dbt):
═══════════════════════════════════════
→ Clean and standardize data
→ Apply business logic
→ Create derived metrics
→ Handle SCD logic
→ Table naming: int_<description> (intermediate)
→ dim_<entity> (dimensions)
→ fact_<entity> (facts)
→ mrt_<description> (marts)
INCREMENTAL LOADING STRATEGY:
═══════════════════════════════════════
Full Load: Daily/weekly for small tables (< 1M rows)
Incremental: Based on updated_at timestamp or CDC
dbt incremental model:
═══════════════════════════════════════
{{ config(materialized='incremental', unique_key='sales_key') }}
WITH source AS (
SELECT * FROM stg_salesforce_opportunities
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
)
SELECT
GENERATED_UNIQUE_ID() AS sales_key,
opportunity_id,
account_id,
amount,
stage,
close_date,
created_at,
updated_at
FROM source
DATA QUALITY CHECKS:
═══════════════════════════════════════
→ Row count validation (vs source)
→ Null checks on critical columns
→ Referential integrity (FK constraints)
→ Value range checks (price > 0, quantity > 0)
→ Duplicate detection (unique key violations)
→ Freshness check (data within SLA window)
4. Query Optimization
QUERY PERFORMANCE OPTIMIZATION
═══════════════════════════════════════
COMMON ISSUES AND SOLUTIONS:
═══════════════════════════════════════
Issue 1: Full Table Scan on Large Fact Table
═══════════════════════════════════════
Slow query:
SELECT customer_id, SUM(total_amount)
FROM fact_sales
WHERE date_key BETWEEN '2024-01-01' AND '2024-12-31'
GROUP BY customer_id
Optimizations:
→ Partition fact_sales by date_key (monthly)
→ Cluster by customer_id
→ Create materialized view for common aggregation
Materialized view:
CREATE MATERIALIZED VIEW mrt_customer_monthly_sales AS
SELECT customer_key, date_trunc('month', date_key) AS sale_month,
SUM(total_amount) AS total_sales, SUM(quantity) AS total_qty,
COUNT(DISTINCT product_key) AS unique_products
FROM fact_sales
GROUP BY customer_key, date_trunc('month', date_key)
REFRESH ON SCHEDULE (DAILY);
Issue 2: Cross-Join Performance
═══════════════════════════════════════
Slow query (missing join condition):
SELECT * FROM fact_sales f, dim_customer c, dim_product p
Fix:
→ Add explicit JOIN conditions
→ Use indexed columns in JOIN
→ Push down filters before JOIN
Issue 3: Subquery in WHERE Clause
═══════════════════════════════════════
Slow:
WHERE customer_id IN (SELECT customer_id FROM dim_customer WHERE segment = 'Enterprise')
Optimized:
→ Use JOIN instead of subquery
→ Use EXISTS for existence checks
→ Materialize frequently-used subqueries
COST-BASED OPTIMIZATION:
═══════════════════════════════════════
Snowflake:
→ Warehouse size: Match to query complexity
→ Clustering keys: On high-cardinality filter columns
→ Result caching: Leverage for repeated queries (24hr cache)
→ Micro-partitions: Automatic pruning by filter predicates
BigQuery:
→ Partition tables: BY date column
→ Cluster columns: High-cardinality filter columns
→ Use table wildcards: `project.dataset.table_*`
→ Limit data scanned: Use partition filters
5. Data Pipeline Orchestration
AIRFLOW DAG EXAMPLE — Daily ETL
═══════════════════════════════════════
dag = DAG(
'daily_etl_pipeline',
schedule_interval='0 6 * * *', # 6 AM UTC daily
start_date=datetime(2024, 1, 1),
catchup=False,
retries=2,
retry_delay=timedelta(minutes=30),
default_args={
'owner': 'data-engineering',
'email_on_failure': True,
'email': ['[email protected]'],
'sla': timedelta(hours=4),
}
)
# Task dependencies:
extract_salesforce >> stage_salesforce >> transform_sales >> validate_sales >> load_marts
extract_erp >> stage_erp >> transform_finance >> validate_finance >> load_marts
extract_web_analytics >> stage_analytics >> transform_analytics >> load_marts
load_marts >> notify_complete
NOTIFICATION TASKS:
═══════════════════════════════════════
→ Success: Slack notification to #data-alerts
→ Failure: PagerDuty alert + Slack + Email
→ SLA breach: Escalation to data engineering lead
PIPELINE MONITORING:
═══════════════════════════════════════
Metrics tracked:
→ DAG duration: Average and P95
→ Task success rate: Per DAG, per month
→ Data freshness: Last successful run timestamp
→ Row counts: Source vs target (data quality)
→ Compute costs: Snowflake/BigQuery credits consumed
→ Error rates: By task, by day
Edge Cases
- Cross-cloud data: Federated queries (Databricks, BigQuery federated)
- Real-time requirements: Streaming pipelines (Kafka + Flink + Materialize)
- Data privacy: PII masking, GDPR compliance, data residency
- Schema evolution: Handle breaking changes in source systems
- Multi-tenant: Row-level security, schema isolation
Integration Points
- Cloud data platforms: Snowflake, BigQuery, Redshift, Databricks
- Orchestration: Airflow, Dagster, Prefect, dbt Cloud
- Ingestion: Fivetran, Airbyte, Debezium, Kafka Connect
- BI tools: Tableau, Looker, Power BI, Metabase
- Data quality: Great Expectations, dbt tests, Soda Core
- Monitoring: Datadog, PagerDuty, Slack, email
Output
Data Warehouse Status
DATA WAREHOUSE STATUS — Daily Report
═══════════════════════════════════════
Pipeline status: 18/18 DAGs completed (100%)
Data freshness: All tables updated within SLA
Data volume:
Raw zone: 2.4 TB
Curated zone: 860 GB
Aggregates: 120 GB
Query performance:
Avg query time: 3.2 seconds
P95 query time: 12.8 seconds
Queries/day: 2,400
Cost: $8,200/month (Snowflake + storage)
Compute: $6,800
Storage: $1,400