SCD2 works by marking the current version of a record as active and storing historical versions when changes occur. Instead of simply updating the existing record, SCD2 adds a new record each time an update happens, ensuring that the historical data is never lost. Below is a breakdown of how SCD2 functions:
Implementing SCD2 using **PySpark** and **Delta Lake** allows for efficient processing of large datasets while ensuring that historical data is preserved. The Delta Lake format ensures atomic operations, consistency, and scalability for large-scale data transformations. Here's how you can implement SCD2 with PySpark:
Import necessary libraries: We import required libraries such as pyspark.sql.functions for various Spark SQL functions, SparkSession for initializing the Spark session, and DeltaTable for working with Delta tables.
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from delta.tables import *
Create and save initial data: We define and create the initial data containing customer information with timestamp fields for EffectiveFromDate and EffectiveToDate, a CurrentFlag, and IsActive column indicating the record’s status..
initial_data = [(1, "Alice", "New York", "Y", "2023-01-01 00:00:00", "2999-12-31 23:59:59"),
(2, "Bob", "Los Angeles", "Y", "2023-01-01 00:00:00", "2999-12-31 23:59:59"),
(3, "Charlie", "Chicago", "Y", "2023-01-01 00:00:00", "2999-12-31 23:59:59")]
initial_df = spark.createDataFrame(
initial_data,
schema="CustomerID INT, Name STRING, City STRING, CurrentFlag STRING, EffectiveFromDate STRING, EffectiveToDate STRING"
).select(
F.col("CustomerID"),
F.col("Name"),
F.col("City"),
F.col("CurrentFlag"),
F.to_timestamp(F.col("EffectiveFromDate")).alias("EffectiveFromDate"),
F.to_timestamp(F.col("EffectiveToDate")).alias("EffectiveToDate"),
F.when(F.col("CurrentFlag") == "Y", True).otherwise(False).alias("IsActive")
)
initial_df.write.format("delta").mode("overwrite").save(path_to_table)
Load new source data: We create a DataFrame for the new source data that will be used for the full load operation.
source_data = [(1, "Alice", "Boston"),
(2, "Bob", "Los Angeles"),
(4, "David", "Seattle")]
source_df = spark.createDataFrame(
source_data,
schema="CustomerID INT, Name STRING, City STRING"
)
current_timestamp = F.current_timestamp()
Read the existing Delta table: We load the existing data from the Delta table to compare and make necessary updates.
delta_table = DeltaTable.forPath(spark, path_to_table)
Mark old records as inactive: We identify records that have been updated by joining the source and existing data, marking them as inactive (IsActive = False).
inactive_records = delta_table.toDF().alias("existing").join(
source_df.alias("source"),
on="CustomerID",
how="inner"
).filter(
F.col("source.City") != F.col("existing.City")
).select(
F.col("existing.CustomerID"),
F.col("existing.Name"),
F.col("existing.City"),
F.col("existing.EffectiveFromDate"),
current_timestamp.alias("EffectiveToDate"),
F.lit("N").alias("CurrentFlag"),
F.lit(False).alias("IsActive")
)
Insert new version of updated records: For records that have been updated, we insert the new version with the updated values while keeping the old version as inactive.
new_version_records = source_df.alias("source").join(
delta_table.toDF().alias("existing"),
on="CustomerID",
how="inner"
).filter(
F.col("source.City") != F.col("existing.City")
).select(
F.col("source.CustomerID"),
F.col("source.Name"),
F.col("source.City"),
current_timestamp.alias("EffectiveFromDate"),
F.lit("2999-12-31 23:59:59").alias("EffectiveToDate"),
F.lit("Y").alias("CurrentFlag"),
F.lit(True).alias("IsActive")
)
Insert new records: We identify new records that do not exist in the existing data and add them to the table.
new_records = source_df.alias("source").join(
delta_table.toDF().alias("existing"),
on="CustomerID",
how="left_anti"
).select(
F.col("source.CustomerID"),
F.col("source.Name"),
F.col("source.City"),
current_timestamp.alias("EffectiveFromDate"),
F.lit("2999-12-31 23:59:59").alias("EffectiveToDate"),
F.lit("Y").alias("CurrentFlag"),
F.lit(True).alias("IsActive")
)
Mark records as inactive that no longer exist in the load: We identify records that are in the existing data but no longer present in the new source data and mark them as inactive.
deleted_records = delta_table.toDF().alias("existing").join(
source_df.alias("source"),
on="CustomerID",
how="left_anti"
).select(
F.col("existing.CustomerID"),
F.col("existing.Name"),
F.col("existing.City"),
F.col("existing.EffectiveFromDate"),
current_timestamp.alias("EffectiveToDate"),
F.lit("N").alias("CurrentFlag"),
F.lit(False).alias("IsActive")
)
Combine all updates into the final DataFrame: We combine the inactive records, new version records, new records, and deleted records into the final DataFrame and write it to the Delta table.
final_df = inactive_records.union(new_version_records).union(new_records).union(deleted_records)
final_df.write.option("mergeSchema", "true").format("delta").mode("overwrite").save(path_to_table)
Validate the results: Finally, we load the data from the Delta table and display it, ordered by CustomerID and EffectiveFromDate to verify our operations.
delta_table.toDF().orderBy(F.col("CustomerID"), F.col("EffectiveFromDate")).show(truncate=False)
With this implementation, you can efficiently track changes, preserve historical records, and ensure that your data is always up to date. SCD2 in combination with Delta Lake provides a robust framework for handling time-sensitive and evolving data with high performance and reliability.