Overview: Incremental Parquet Snapshot Ingestion in Microsoft Fabric Lakehouse
Managing large datasets in Microsoft Fabric often means handling recurring snapshots, such as weekly or monthly inventory updates. This article shows an incremental ingestion pattern for Fabric Lakehouse that appends new Parquet snapshots to an existing Delta dataset, avoiding the out-of-memory failures that can appear in Dataflow Gen2 as file counts grow over time.
For finance teams, the practical benefit is a shorter and more predictable refresh window, which reduces reporting delays and avoids last-minute pipeline failures during close or forecast cycles.
Problem: Dataflow Gen2 Out-of-Memory Errors When Loading Snapshot Files
When working with large datasets, particularly those that involve regular snapshots, many users face issues with out-of-memory errors during data processing. This can be especially problematic when using Dataflow Gen2 in Microsoft Fabric, which struggles with managing multiple small files over time. Even if the data size remains relatively small, the increasing number of files can overwhelm the system. This leads to inefficient data processing, making it difficult to scale data workflows. Additionally, attempting to overwrite data for each new snapshot can be resource-intensive, adding unnecessary complexity.
The need for a more efficient, incremental load strategy arises here—one that allows us to append new data to existing datasets without reprocessing historical data or consuming excessive memory resources.
Goal: Append-Only Snapshot Loads to a Delta Table in Fabric Lakehouse
The goal of this blog is to introduce a new method for automating incremental data loads in Microsoft Fabric. Many data engineers and analysts face out-of-memory errors when processing large datasets using Dataflow Gen2. This approach seeks to replace that method by focusing on appending new Parquet files to existing datasets, ensuring that data integration occurs seamlessly without overloading system resources. This will help optimize the performance of your data workflows and offer a practical solution to the challenges posed by large data volumes.
Why Use an Append-Only Incremental Load for Snapshot Data in Fabric?
Handling snapshot data like inventory updates that are received weekly or monthly can be challenging, particularly when data processing systems start facing memory-related issues. Although the data volume may seem small, the number of files continuously growing can lead to out-of-memory errors, especially in Dataflow Gen2, which is often used to load and transform large datasets.
The main issue is the repeated full refresh for each new snapshot. As the number of snapshot files grows, Dataflow Gen2 can struggle to process the full set consistently, even when total data size is not extreme. Appending new snapshots instead of overwriting the entire dataset avoids reprocessing historical data and reduces the chance of memory-related failures.
Solution Overview: Append Parquet Snapshots to a Partitioned Delta Table in Fabric
To handle these incremental loads efficiently, the solution involves appending new snapshot files to an existing dataset, rather than overwriting the previous snapshots. Parquet files, known for their efficient storage and quick processing, are ideal for this approach. Below is an overview of how the solution works:
- Folder Structure: You store weekly or monthly snapshot files in a Lakehouse folder.
- File Handling: The process dynamically lists all files in the folder and extracts their metadata, including modification dates, ensuring that only the most recent snapshot files are processed.
- Sorting and Selecting Latest File: After listing the files, the system identifies the latest snapshot based on modification time. This ensures you are working with the most recent data.
- Reading Parquet Files: The latest Parquet file is read, and relevant metadata such as the week number is added to the data. This allows you to partition the data by week, making it easier to query and analyze over time.
- Appending Data: If historical data already exists (saved in Delta format), the new data is appended to the existing dataset. If no historical data is found, the system creates a new Delta table and stores the initial snapshot.
- Cleaning Data: To avoid issues with special characters in column names, the system dynamically cleans up column names before writing the data to the Delta table.
This approach leverages Delta Lake’s append and partitioning capabilities so you can add new snapshots without reloading historical data. Over time, that keeps refresh behavior predictable and reduces the operational risk that comes from repeatedly processing an ever-growing file set.
Why Not Use Incremental Refresh in Fabric? Query Folding Limitations
While Microsoft Fabric offers the option of incremental refresh, this approach has limitations, especially if your transformation steps break query folding. Query folding is a process where certain transformations are passed to the data source for efficient processing. If the transformations in your pipeline are too complex, query folding may not work, and you’ll run into performance issues.
In these cases, the notebook-based approach outlined here offers better performance. Incremental refresh can be effective, but it often depends on query folding. When transformation steps break folding, refresh performance can degrade and become harder to predict. In those cases, handling incremental loads in a notebook gives you more control over file selection, partitioning, and append behavior, even when transformations are complex.
Implementation: PySpark Notebook for Snapshot Ingestion in Fabric Lakehouse
The core of this solution revolves around PySpark and Delta Lake, utilizing PySpark’s powerful file handling capabilities to manage and process snapshot data effectively. Below is a breakdown of the code used to implement the solution:
Step 1: List Files and Select the Latest Snapshot in OneLake Folder
folder_path = “Files/Snapshots/Inventory_Snapshots/Weekly/”
file_list = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).listStatus(
spark._jvm.org.apache.hadoop.fs.Path(folder_path)
)
files_with_dates = [(file.getPath().toString(), file.getModificationTime()) for file in file_list]
latest_file = sorted(files_with_dates, key=lambda x: x[1], reverse=True)[0]
latest_file_path = latest_file[0]
print(f”Latest file: {latest_file_path}”)
This section lists all the files in the specified folder and sorts them by modification time to identify the most recent file.
Step 2: Read Parquet and Add Week Metadata for Partitioning
df_latest = spark.read.parquet(latest_file_path)
date_str = re.search(r’_(\d{4}-\d{2}-\d{2})$’, latest_file_path)
if date_str:
snapshot_date = datetime.strptime(date_str.group(1), ‘%Y-%m-%d’)
week_number = snapshot_date.strftime(‘%Y-W%U’)
df_latest = df_latest.withColumn(“week”, lit(week_number))
Here, we read the latest Parquet file and extract the date from the filename. The week number is calculated and added as a new column, which helps in partitioning the data.
Step 3: Append Snapshot to Historical Delta Data (Create if Missing)
historical_data_path = “Files/Test/Inventory_Snapshots/”
try:
historical_df = spark.read.format(“delta”).load(historical_data_path)
combined_df = historical_df.union(df_latest)
print(“Successfully appended new data.”)
except Exception as e:
print(f”No existing data found, creating a new Delta table. Error: {e}”)
combined_df = df_latest
combined_df.write.format(“delta”).mode(“overwrite”).partitionBy(“week”).save(historical_data_path)
This section checks for existing historical data in Delta format. If it exists, it appends the new data; if not, it creates a new Delta table.
Step 4: Clean Column Names and Write a Partitioned Delta Table
cleaned_df = combined_df.select(
[col(c).alias(c.replace(” “, “_”).replace(“,”, “_”).replace(“;”, “_”)) for c in combined_df.columns]
)
cleaned_df.write.format(“delta”).mode(“append”).partitionBy(“week”).saveAsTable(“InventorySnapTest”)
Finally, the column names are cleaned, and the data is written back to the Delta table, ensuring it is partitioned by the week column for optimized querying.
Full PySpark Example: Fabric Lakehouse Snapshot Ingestion Notebook
from pyspark.sql.functions import lit, col
import re
from datetime import datetime
# Define the path where the weekly snapshot files are stored in Lakehouse
folder_path = “Files/Snapshots/Inventory Snapshots/Weekly/”
# List all files in the folder (Spark-based file handling)
file_list = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).listStatus(
spark._jvm.org.apache.hadoop.fs.Path(folder_path)
)
# Extract file paths and their modification times
files_with_dates = [(file.getPath().toString(), file.getModificationTime()) for file in file_list]
# Sort the files by the modification date to get the latest one
latest_file = sorted(files_with_dates, key=lambda x: x[1], reverse=True)[0]
# Extract the path of the latest file
latest_file_path = latest_file[0]
# Show the latest file (for debugging purposes)
print(f”Latest file: {latest_file_path}”)
# Read the latest Parquet file
df_latest = spark.read.parquet(latest_file_path)
# Update the regular expression to correctly capture the date part of the filename
# Modified to match the date part after the last underscore
date_str = re.search(r’_(\d{4}-\d{2}-\d{2})$’, latest_file_path)
# If the regex fails to match the date, print an error message
if date_str:
snapshot_date = datetime.strptime(date_str.group(1), ‘%Y-%m-%d’)
else:
print(“Error: Could not extract the date from the filename.”)
snapshot_date = None # Handling the case where no date is found
if snapshot_date:
# Calculate the week number for partitioning
week_number = snapshot_date.strftime(‘%Y-W%U’) # Format: ‘2025-W06’ for the 6th week of 2025
# Add the week column dynamically based on the snapshot date of the latest file
df_latest = df_latest.withColumn(“week”, lit(week_number))
# Initialize an empty DataFrame for appending the new data
combined_df = None
# If there’s historical data (from previous processing), append the new data
# Assuming you have already saved historical data in a Delta format partitioned by ‘week’
historical_data_path = “Files/Test/Inventory_Snapshots/”
# Check if the Delta table exists, if not, create it
try:
historical_df = spark.read.format(“delta”).load(historical_data_path)
combined_df = historical_df.union(df_latest) # Append the new data to the existing data
print(“Successfully appended new data.”)
except Exception as e:
print(f”No existing data found, creating a new Delta table. Error: {e}”)
# If no data exists, initialize the DataFrame with the latest file’s data
combined_df = df_latest
# Create the Delta table by writing the initial data for the first time
combined_df.write.format(“delta”).mode(“overwrite”).partitionBy(“week”).save(historical_data_path)
print(“Delta table created successfully.”)
# Show the combined DataFrame with the correct week column for the latest file
combined_df.show()
# Clean the column names to remove spaces or special characters (if any)
cleaned_df = combined_df.select(
[col(c).alias(c.replace(” “, “_”).replace(“,”, “_”).replace(“;”, “_”)) for c in combined_df.columns]
)
# Write the cleaned data to Delta (this avoids issues with special characters in column names)
cleaned_df.write.format(“delta”).mode(“append”).partitionBy(“week”).saveAsTable(“InventorySnapTest”)
# Enable column mapping for the Delta table (if necessary)
spark.sql(f”””
ALTER TABLE delta.`{historical_data_path}`
SET TBLPROPERTIES (
‘delta.columnMapping.mode’ = ‘name’,
‘delta.minReaderVersion’ = ‘2’,
‘delta.minWriterVersion’ = ‘5’
)
“””)
else:
print(“Error: No valid snapshot date found.”)
Step 5: Automating with Scheduling the Notebook with a Fabric Pipeline
Once the solution is implemented, the final step is to automate the process by scheduling it within a pipeline. Here’s how you can do it:
- Create a Pipeline: In Microsoft Fabric, create a new pipeline where you will add the notebook that contains the code.
- Schedule the pipeline: Set the schedule to match the frequency of your data updates. For example, if you’re dealing with weekly snapshots, you can schedule the pipeline to run every week, automatically processing the latest snapshot data.
- Monitor and Adjust: Once the pipeline is scheduled, monitor its performance and adjust the schedule as needed. If your data grows, consider adjusting the pipeline’s resources or processing time to maintain efficiency.
By scheduling the notebook in a pipeline, you can ensure that your incremental data loads run automatically, keeping your dataset up-to-date without manual intervention.
Conclusion: A Scalable, Append-Only Snapshot Ingestion Pattern for Finance Reporting
By adopting this incremental load strategy using PySpark and Delta Lake, you can efficiently handle the ingestion and appending of Parquet files in Microsoft Fabric without encountering memory errors. This approach avoids the Dataflow Gen2 failure pattern that can appear as snapshot file counts grow, while keeping ingestion scalable and straightforward. Whether snapshots arrive weekly, monthly, or ad hoc, the append-only Delta pattern helps keep refresh windows predictable and reduces the operational effort required to keep historical data current.
With this solution in place, you’ll have a robust framework to manage your snapshot data effectively, avoiding the need for full refreshes and reducing the strain on system resources.
For finance teams, the outcome is simple: fewer refresh failures, clearer data lineage over time, and reporting that stays on schedule when it matters most.



































