Understanding Slowly Changing Dimensions (SCD2)

In the world of data management, tracking changes over time is crucial for maintaining the accuracy and integrity of data. **SCD Type 2 (SCD2)** is a technique used in data warehousing to handle historical data, allowing businesses to keep track of changes in a record over time while preserving the original versions of that record. This is particularly useful when working with data that evolves, such as customer information, product pricing, or any other dataset where historical accuracy is important.

How SCD2 Works: The Key Concepts

SCD2 works by marking the current version of a record as active and storing historical versions when changes occur. Instead of simply updating the existing record, SCD2 adds a new record each time an update happens, ensuring that the historical data is never lost. Below is a breakdown of how SCD2 functions:


Capturing Changes Over Time: Every time a change is detected in a record, rather than updating the existing entry, SCD2 creates a new version of that record. For example, if a customer changes their address, a new record with the updated address is created, and the old one is marked as inactive. This ensures that the historical state of the data is preserved for future analysis.

Tracking the Active Version: Each record is flagged as either active or inactive. A flag called **CurrentFlag** is typically used to indicate whether the record is the active one or a historical version. For active records, the flag is set to "Y", and for historical records, it is set to "N". This makes it easy to distinguish the current version from past ones.

Handling Inactive Records: When a record is no longer current, its **CurrentFlag** is updated to "N" and its **EffectiveToDate** is set. This marks the record as inactive and indicates that it is no longer valid for current use. This ensures that historical records are preserved while ensuring that only the most current data is used in day-to-day operations.

Managing New and Updated Records: When a new record comes in, such as a new customer, it is simply added as an active record. If an existing record has updated information, the old record is made inactive, and the new record with the updated information is inserted as the active version. This approach allows data scientists and analysts to perform accurate historical analyses.

Implementing SCD2 with PySpark and Delta Lake

Implementing SCD2 using **PySpark** and **Delta Lake** allows for efficient processing of large datasets while ensuring that historical data is preserved. The Delta Lake format ensures atomic operations, consistency, and scalability for large-scale data transformations. Here's how you can implement SCD2 with PySpark:
Import necessary libraries: We import required libraries such as pyspark.sql.functions for various Spark SQL functions, SparkSession for initializing the Spark session, and DeltaTable for working with Delta tables.
                                    

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from delta.tables import *
                                    
                                

Create and save initial data: We define and create the initial data containing customer information with timestamp fields for EffectiveFromDate and EffectiveToDate, a CurrentFlag, and IsActive column indicating the record’s status..
                                    
initial_data = [(1, "Alice", "New York", "Y", "2023-01-01 00:00:00", "2999-12-31 23:59:59"),
                (2, "Bob", "Los Angeles", "Y", "2023-01-01 00:00:00", "2999-12-31 23:59:59"),
                (3, "Charlie", "Chicago", "Y", "2023-01-01 00:00:00", "2999-12-31 23:59:59")]
initial_df = spark.createDataFrame(
    initial_data,
    schema="CustomerID INT, Name STRING, City STRING, CurrentFlag STRING, EffectiveFromDate STRING, EffectiveToDate STRING"
).select(
    F.col("CustomerID"),
    F.col("Name"),
    F.col("City"),
    F.col("CurrentFlag"),
    F.to_timestamp(F.col("EffectiveFromDate")).alias("EffectiveFromDate"),
    F.to_timestamp(F.col("EffectiveToDate")).alias("EffectiveToDate"),
    F.when(F.col("CurrentFlag") == "Y", True).otherwise(False).alias("IsActive")
)

initial_df.write.format("delta").mode("overwrite").save(path_to_table)
                                    
                                

Load new source data: We create a DataFrame for the new source data that will be used for the full load operation.
                                    

source_data = [(1, "Alice", "Boston"),
               (2, "Bob", "Los Angeles"),
               (4, "David", "Seattle")]
source_df = spark.createDataFrame(
    source_data,
    schema="CustomerID INT, Name STRING, City STRING"
)
current_timestamp = F.current_timestamp()
                                    
                                

Read the existing Delta table: We load the existing data from the Delta table to compare and make necessary updates.
                                    
delta_table = DeltaTable.forPath(spark, path_to_table)
                                    
                                

Mark old records as inactive: We identify records that have been updated by joining the source and existing data, marking them as inactive (IsActive = False).
                                    

inactive_records = delta_table.toDF().alias("existing").join(
    source_df.alias("source"),
    on="CustomerID",
    how="inner"
).filter(
    F.col("source.City") != F.col("existing.City")
).select(
    F.col("existing.CustomerID"),
    F.col("existing.Name"),
    F.col("existing.City"),
    F.col("existing.EffectiveFromDate"),
    current_timestamp.alias("EffectiveToDate"),
    F.lit("N").alias("CurrentFlag"),
    F.lit(False).alias("IsActive")
)
                                    
                                

Insert new version of updated records: For records that have been updated, we insert the new version with the updated values while keeping the old version as inactive.
                                    
new_version_records = source_df.alias("source").join(
    delta_table.toDF().alias("existing"),
    on="CustomerID",
    how="inner"
).filter(
    F.col("source.City") != F.col("existing.City")
).select(
    F.col("source.CustomerID"),
    F.col("source.Name"),
    F.col("source.City"),
    current_timestamp.alias("EffectiveFromDate"),
    F.lit("2999-12-31 23:59:59").alias("EffectiveToDate"),
    F.lit("Y").alias("CurrentFlag"),
    F.lit(True).alias("IsActive")
)
                                    
                                

Insert new records: We identify new records that do not exist in the existing data and add them to the table.
                                    
new_records = source_df.alias("source").join(
    delta_table.toDF().alias("existing"),
    on="CustomerID",
    how="left_anti"
).select(
    F.col("source.CustomerID"),
    F.col("source.Name"),
    F.col("source.City"),
    current_timestamp.alias("EffectiveFromDate"),
    F.lit("2999-12-31 23:59:59").alias("EffectiveToDate"),
    F.lit("Y").alias("CurrentFlag"),
    F.lit(True).alias("IsActive")
)
                                    
                                

Mark records as inactive that no longer exist in the load: We identify records that are in the existing data but no longer present in the new source data and mark them as inactive.
                                    
deleted_records = delta_table.toDF().alias("existing").join(
    source_df.alias("source"),
    on="CustomerID",
    how="left_anti"
).select(
    F.col("existing.CustomerID"),
    F.col("existing.Name"),
    F.col("existing.City"),
    F.col("existing.EffectiveFromDate"),
    current_timestamp.alias("EffectiveToDate"),
    F.lit("N").alias("CurrentFlag"),
    F.lit(False).alias("IsActive")
)
                                    
                                

Combine all updates into the final DataFrame: We combine the inactive records, new version records, new records, and deleted records into the final DataFrame and write it to the Delta table.
                                    
final_df = inactive_records.union(new_version_records).union(new_records).union(deleted_records)
final_df.write.option("mergeSchema", "true").format("delta").mode("overwrite").save(path_to_table)
                                    
                                

Validate the results: Finally, we load the data from the Delta table and display it, ordered by CustomerID and EffectiveFromDate to verify our operations.
                                    
delta_table.toDF().orderBy(F.col("CustomerID"), F.col("EffectiveFromDate")).show(truncate=False)
                                    
                                

With this implementation, you can efficiently track changes, preserve historical records, and ensure that your data is always up to date. SCD2 in combination with Delta Lake provides a robust framework for handling time-sensitive and evolving data with high performance and reliability.
Share:

Subscribe to our Newsletter

Subscribe to our newsletter to get notifications about new updates, etc...