• May 2, 2025

Databricks – DLT – expect, pytest vs dbt test

Databricks Delta Live Tables (DLT) is a framework for building reliable, scalable ETL pipelines in Databricks. It allows for the creation of declarative ETL workflows, which can manage and track data transformations automatically, making it easier to ensure that data pipelines are reliable.

Databricks DLT is somewhat similar to dbt in that both allow you to manage your ETL workflows in a more declarative way (though dbt is more SQL-centric, while DLT is built into the Databricks environment and supports both Python and SQL).

Here’s how you can set up a Delta Live Tables (DLT) pipeline and how you might approach testing it, akin to a dbt setup.

1. Setting up a DLT Pipeline in Databricks

To get started, you need to set up a DLT pipeline in Databricks.

a) Create a Delta Live Tables Pipeline

  1. Create a New Pipeline:
    • In Databricks, go to the “Jobs” section.
    • Click on “Create” and select “Delta Live Tables”.
    • You will be prompted to set up your pipeline, where you specify your data sources, transformations, and outputs.
  2. Write Your Transformation Logic:
    • Delta Live Tables can use both SQL and Python for transformations.
    • The key to DLT is using the @dlt.table decorator (in Python) or CREATE LIVE TABLE SQL commands.

b) Example of a Simple DLT Pipeline in Python

import dlt
from pyspark.sql.functions import col

@dlt.table
def raw_data():
    # Simulate raw data loading (e.g., from a Bronze table)
    return spark.read.csv('/mnt/raw_data/*', header=True, inferSchema=True)

@dlt.table
def cleaned_data():
    # Perform transformations (e.g., remove duplicates, clean data)
    return (
        dlt.read("raw_data")
        .filter(col("id").isNotNull())
        .dropDuplicates(["id"])
    )

@dlt.table
def enriched_data():
    # Further transformations or enrichments
    return (
        dlt.read("cleaned_data")
        .withColumn("enriched_col", col("existing_col") * 2)
    )

c) Example of a Simple DLT Pipeline in SQL
-- Create a live table for raw data
CREATE LIVE TABLE raw_data
AS
SELECT *
FROM json.`/mnt/raw_data/*`

-- Create a live table for cleaned data
CREATE LIVE TABLE cleaned_data
AS
SELECT *
FROM LIVE.raw_data
WHERE id IS NOT NULL

-- Create a live table for enriched data
CREATE LIVE TABLE enriched_data
AS
SELECT *, existing_col * 2 AS enriched_col
FROM LIVE.cleaned_data

2. Testing Delta Live Tables (DLT)
Testing DLT pipelines involves verifying data correctness, transformation logic, and ensuring reliability. While DLT doesn't have a dedicated testing framework like dbt does (which integrates with tools like pytest), you can use a combination of Databricks' features, Python testing frameworks, and assertions.

a) Testing in Databricks with Python
You can write tests that check the intermediate tables and the final table data.

Unit Tests: You can write unit tests for individual transformations using Databricks notebooks or an external test framework like pytest.
import pytest
from pyspark.sql import SparkSession

@pytest.fixture
def spark():
    return SparkSession.builder.getOrCreate()

def test_cleaned_data(spark):
    # Simulate loading of cleaned data (this could be a DLT table or a mock of it)
    cleaned_df = spark.read.parquet("/mnt/cleaned_data/")
    
    # Check for nulls in essential columns
    assert cleaned_df.filter("id IS NULL").count() == 0
    
    # Check for duplicates
    assert cleaned_df.count() == cleaned_df.distinct().count()

def test_enriched_data(spark):
    # Simulate loading of enriched data
    enriched_df = spark.read.parquet("/mnt/enriched_data/")
    
    # Check if enriched column has expected values
    assert enriched_df.filter("enriched_col IS NOT NULL").count() > 0


b) Testing Data Quality with Constraints

Another way to test DLT pipelines is to enforce data quality constraints directly within the pipeline itself. This can be done using @dlt.expect to add expectations to your DLT tables. These are similar to dbt tests.

import dlt
from pyspark.sql.functions import col

@dlt.table
@dlt.expect("id_not_null", col("id").isNotNull())
def cleaned_data():
    return (
        dlt.read("raw_data")
        .filter(col("id").isNotNull())
        .dropDuplicates(["id"])
    )

@dlt.table
@dlt.expect("no_duplicates", "id IS DISTINCT FROM id")
def enriched_data():
    return (
        dlt.read("cleaned_data")
        .withColumn("enriched_col", col("existing_col") * 2)
    )

This will cause the DLT pipeline to fail if the data doesn’t meet these conditions (i.e., if id is null in cleaned_data or if there are duplicates).

c) Testing in SQL with Expectations

In SQL, you can use similar expectation functions:

-- Create a live table with an expectation
CREATE LIVE TABLE cleaned_data
AS
SELECT *
FROM raw_data
WHERE id IS NOT NULL

-- Expectation to check for no duplicates in the cleaned data
CREATE LIVE TABLE enriched_data
AS
SELECT *,
       existing_col * 2 AS enriched_col
FROM LIVE.cleaned_data
WHERE id IS DISTINCT FROM id

3. dbt-like Setup with DLT

If you’re coming from a dbt background, you can think of DLT as a way to define your data models with full support for incremental processing, data quality, and monitoring. However, unlike dbt, DLT integrates directly with Databricks’ environment and can be triggered as part of your Databricks jobs.

The dbt setup often includes:

  • Model definitions (like cleaned_data or enriched_data)
  • Tests on models (as shown above with expect)
  • Documentation and metadata

With DLT, you can use a similar approach for data transformation but rely on the Databricks UI or the Databricks REST API to manage your pipeline instead of dbt’s CLI.

Conclusion

Delta Live Tables (DLT) in Databricks is a powerful way to build ETL pipelines that automatically track the state of your data, with built-in transformations, data quality checks, and monitoring. You can think of it as Databricks’ answer to the dbt workflow, with some differences in how things are managed (i.e., the integration with Spark and Python/SQL APIs).

To test DLT pipelines:

  • Use Python-based tests or pytest for unit tests.
  • Use DLT’s built-in @dlt.expect for inline data quality checks.
  • Incorporate assertions and data integrity checks directly into your pipeline design.

Let me know if you’d like more examples or have specific questions!