- Introduction
- Architecture Overview
- Setting Up the Environment 3.1. Azure Setup 3.2. Snowflake Setup 3.3. DBT Setup 3.4. Python Environment Setup
- Data Ingestion with Azure Data Factory
- Medallion Architecture Implementation 5.1. Bronze Layer: Raw Data in Snowflake 5.2. Silver Layer: Cleansed and Conformed Data 5.3. Gold Layer: Business-Level Aggregates
- DBT Models and Transformations
- Python Scripts for Orchestration and Additional Processing
- Scheduling and Monitoring
- Testing and Quality Assurance
- Performance Optimization
- Security and Compliance
- Best Practices and Tips
- Conclusion
This tutorial will guide you through building a robust data pipeline using Azure, Snowflake, DBT, and Python, following the medallion architecture. This setup is designed for teams looking to implement a scalable, maintainable, and efficient data solution.
Our architecture consists of:
- Azure Data Factory for data ingestion and orchestration
- Azure Blob Storage for storing raw data
- Snowflake as our cloud data warehouse
- DBT for data transformation and modeling
- Python for custom processing and orchestration
We'll follow the medallion architecture:
- Bronze: Raw data ingested from source systems (created by Azure Data Factory)
- Silver: Cleansed and conformed data (transformed by DBT)
- Gold: Business-level aggregates (created by DBT)
- Create an Azure account and set up a resource group.
- Set up Azure Data Factory:
az datafactory create --name "YourDataFactory" --resource-group "YourResourceGroup" --location "EastUS"
- Create an Azure Blob Storage account:
az storage account create --name "YourStorageAccount" --resource-group "YourResourceGroup" --location "EastUS" --sku Standard_LRS
- Sign up for a Snowflake account.
- Create a Snowflake warehouse, database, and schema:
CREATE WAREHOUSE IF NOT EXISTS compute_wh WITH WAREHOUSE_SIZE = 'XSMALL' AUTO_SUSPEND = 60 AUTO_RESUME = TRUE; CREATE DATABASE IF NOT EXISTS your_database; USE DATABASE your_database; CREATE SCHEMA IF NOT EXISTS your_schema;
- Install DBT with Snowflake adapter:
pip install dbt-snowflake
- Initialize a DBT project:
dbt init your_project_name
- Configure
profiles.yml
for Snowflake connection:your_project_name: target: dev outputs: dev: type: snowflake account: your_account.snowflakecomputing.com user: your_username password: your_password role: your_role database: your_database warehouse: compute_wh schema: your_schema threads: 4
- Set up a virtual environment:
python -m venv data_pipeline_env source data_pipeline_env/bin/activate # On Windows: data_pipeline_env\Scripts\activate
- Install required packages:
pip install azure-storage-blob azure-identity snowflake-connector-python pandas
- In Azure Data Factory, create a pipeline for data ingestion.
- Set up a copy activity to move data from your source to Azure Blob Storage.
- Use Azure Data Factory's Snowflake connector to copy data from Blob Storage to Snowflake, creating the bronze layer tables.
Example Azure Data Factory pipeline (JSON snippet):
{
"name": "IngestDataToSnowflake",
"properties": {
"activities": [
{
"name": "CopyFromSourceToBlob",
"type": "Copy",
"inputs": [{"referenceName": "SourceDataset", "type": "DatasetReference"}],
"outputs": [{"referenceName": "AzureBlobDataset", "type": "DatasetReference"}]
},
{
"name": "CopyFromBlobToSnowflake",
"type": "Copy",
"inputs": [{"referenceName": "AzureBlobDataset", "type": "DatasetReference"}],
"outputs": [{"referenceName": "SnowflakeBronzeDataset", "type": "DatasetReference"}],
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "SnowflakeSink",
"tableCreation": "Auto"
}
}
}
]
}
}
In this setup, ADF will automatically create the bronze tables in Snowflake based on the source data structure.
Since Azure Data Factory is creating the bronze tables, we'll use DBT to document and test these tables.
Create a file models/bronze/bronze_tables.yml
:
version: 2
sources:
- name: bronze
database: your_database
schema: your_schema
tables:
- name: raw_data
columns:
- name: id
tests:
- unique
- not_null
- name: name
- name: created_at
models:
- name: raw_data
description: "Bronze layer table created by Azure Data Factory"
columns:
- name: id
description: "Unique identifier for each record"
- name: name
description: "Name field from the source data"
- name: created_at
description: "Timestamp of when the record was created"
This YAML file documents the structure of the bronze table and sets up some basic tests.
Create a model models/silver/cleaned_data.sql
:
{{ config(materialized='table') }}
SELECT
id,
TRIM(LOWER(name)) AS name,
created_at::DATE AS created_date
FROM {{ source('bronze', 'raw_data') }}
WHERE id IS NOT NULL
Create another DBT model models/gold/user_stats.sql
:
{{ config(materialized='table') }}
SELECT
created_date,
COUNT(DISTINCT id) AS new_users,
COUNT(*) AS total_records
FROM {{ ref('cleaned_data') }}
GROUP BY created_date
Organize your DBT project:
models/
├── bronze/
│ └── bronze_tables.yml
├── silver/
│ └── cleaned_data.sql
└── gold/
└── user_stats.sql
Run your DBT models:
dbt run
Create a Python script for orchestration orchestrate.py
:
import subprocess
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
def run_dbt():
result = subprocess.run(['dbt', 'run'], capture_output=True, text=True)
return result.returncode == 0
def upload_logs_to_azure(container_name, log_content):
credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(account_url="https://yourstorageaccount.blob.core.windows.net", credential=credential)
blob_client = blob_service_client.get_blob_client(container=container_name, blob="dbt_run.log")
blob_client.upload_blob(log_content, overwrite=True)
if __name__ == "__main__":
success = run_dbt()
log_content = "DBT run completed successfully" if success else "DBT run failed"
upload_logs_to_azure('logs', log_content)
- Use Azure Data Factory to schedule your Python script and DBT runs.
- Set up Azure Monitor to track your pipeline's performance and set alerts.
- Implement DBT tests. In
models/silver/cleaned_data.yml
:
version: 2
models:
- name: cleaned_data
columns:
- name: id
tests:
- unique
- not_null
- name: name
tests:
- not_null
- Run DBT tests:
dbt test
- Use Snowflake's
CLUSTER BY
for frequently filtered columns:
{{ config(
materialized='table',
cluster_by=['created_date']
) }}
-- Your SQL model here
- Optimize Snowflake warehouse size and auto-scaling settings.
- Use Azure Key Vault to store sensitive information.
- Implement column-level security in Snowflake for sensitive data:
CREATE OR REPLACE SECURITY POLICY row_access_policy AS (id_column int) RETURNS BOOLEAN ->
CURRENT_ROLE() IN ('ANALYST_ROLE', 'ADMIN_ROLE');
ALTER TABLE gold.user_stats ADD ROW ACCESS POLICY row_access_policy ON (id);
- Use DBT documentation to keep track of your data models.
- Implement CI/CD for your DBT project using Azure DevOps.
- Regularly review and optimize your SQL transformations and Snowflake resource usage.
- Use Snowflake's Time Travel for data recovery and auditing.
- Leverage Snowflake's zero-copy cloning for testing environments.
This comprehensive tutorial has guided you through setting up a robust data pipeline using Azure, Snowflake, DBT, and Python with a medallion architecture. Key points to remember:
- Azure Data Factory handles data ingestion and creates bronze tables.
- DBT is used for transforming data in silver and gold layers, as well as documenting and testing bronze tables.
- Python scripts can be used for additional orchestration and processing.
- Regular testing, monitoring, and optimization are crucial for maintaining a healthy data pipeline.
By following these steps, you've created a scalable, maintainable, and efficient data transformation process that separates raw data ingestion, data cleaning, and business-level aggregations. Remember to continually refine your models, optimize performance, and maintain security as your data needs evolve.