Insights from paper: Delta Lake: HighPerformance ACID Table Storage over Cloud Object Stores

Hemant Gupta
13 min readAug 20, 2024

--

1. Abstract

Public cloud providers like Amazon S3, Azure Blob Storage, and Google Cloud Storage have object or blob storage.

These are the largest and most cost-effective storage systems. They are a good fit for large data warehouses and data lakes for storage needs.

The problem is that they are key-value stores, so achieving ACID transactions and high performance takes a lot of work.

The paper presents Delta Lake, an open-source ACID table storage layer over cloud object stores initially developed at Databricks.

Delta Lake uses a transaction log that is compacted into Apache Parquet format.

This log provides ACID properties, time travel, and significantly faster metadata operations for large tabular datasets.

This design is also leveraged to provide high-level features such as automatic data layout optimization, upserts, caching, and audit logs.

Delta Lake tables can be accessed from Apache Spark, Hive, Presto, Redshift, and other systems.

2. Introduction

Cloud object stores are good because they allow independent (decoupled) scaling of computing and storage.

Let’s understand this decoupling from an example. A user can store a petabyte of data. The user may not always be required to run the compute for such a large amount of data. He can run a cluster to execute a query for a few hours.

The major open-source Big Data systems, such as Apache Spark, Hive, and Presto, support reading and writing to cloud object stores.

They usually use file formats such as Apache Parquet and ORC.

Similarly, commercial systems, such as AWS Athena, Google BigQuery, and Redshift Spectrum, can read directly from cloud object storage using these open file formats.

The most common way to store relational datasets in cloud object stores is using columnar file formats. Apache Parquet and ORC are those.

These formats are suitable, but they create correctness and performance challenges for more complex workloads.

In the first few years of cloud service (2014–2016), Databricks received around half the support escalations due to data corruption, consistency, or performance issues.

Delta Lake was designed to overcome these challenges.

Delta Lake is an ACID table storage layer over cloud object stores.

It was open-sourced in 2019.

Delta Lake maintains information about Delta table objects in a write-ahead log. The objects themselves are encoded in Parquet.

This design allows clients to update multiple objects at once or replace a subset with another. This can all be done while achieving high parallel read and write performance from the objects.

The log also contains metadata such as min/max statistics for each data file for faster metadata searches.

Delta Lake stores all the metadata in the underlying object store, so there is no need for separate servers or systems to maintain that.

Delta Lake uses optimistic concurrency protocols against the underlying object store to avoid any other dependency.

Based on this design, Delta Lake provides some unique features that are not present in traditional data warehouses or data lakes.

  • Time travel: It lets users query point-in-time snapshots or roll back erroneous updates to their data.
  • UPSERT, DELETE, and MERGE: These operations efficiently rewrite the relevant objects, which helps in compliance workflows.
  • Efficient streaming I/O: The streaming is efficient as a user can write small objects into the table at low latency. A fast read of tailing data is supported. Users can treat a Delta table as a message bus.
  • Caching: Because of their immutable nature, cluster nodes can safely cache the Delta table and log it on local storage.
  • Data layout optimization: Delta Lake automatically optimizes the size of objects in a table and the clustering of data records without impacting running queries.
  • Schema evolution: The system allows continued reading of old Parquet files without rewriting them if a table’s schema changes.
  • Audit logging: The system provides it by exposing the transaction log.

The team observed that many Databricks customers could simplify their overall data architectures with Delta Lake by replacing previously separate data lake, data warehouse, and streaming storage systems with Delta tables.

The diagram below shows an example.

A data pipeline that includes object storage, a message queue, and two data warehouses is replaced with just Delta tables on object storage. It runs ETL using Delta’s streaming I/O and performance features.

A data pipeline implemented

The open-source Delta Lake project includes connectors to Apache Spark (batch or streaming), Hive, Presto, AWS Athena, Redshift, and Snowflake. It can run over multiple cloud object stores or HDFS.

3. Motivation: Characteristics and Challenges of Object Stores

3.1 Object Store APIs

Cloud object stores offer a simple but easy-to-scale key-value store interface. These systems allow users to create buckets that store multiple objects.

A string key identifies each object.

Cloud object stores provide metadata APIs, such as S3’s LIST operations. These metadata APIs are generally expensive.

Cloud object stores usually support byte-range requests for reading the objects. Updating objects usually requires rewriting the whole object at once.

3.2 Consistency Properties

The most popular cloud object stores provide eventual consistency for each key.

There are no consistency guarantees across keys.

There are two inconsistency scenarios to remember:

  1. After a client uploads a new object, other clients are not guaranteed to see the object in LIST or read operations immediately.
  2. Updates to an existing object may take time to be visible to other clients.

Each cloud storage provider has its consistency model, so there are a few variations here and there.

3.3 Performance Characteristics

We have seen that reading a sequential byte range is the most granular operation available. Each read operation usually incurs at least 5–10 ms of base latency.

LIST operations require significant parallelism to list large sets of objects quickly.

Write operations generally have to replace a whole object. If a table is expected to receive point updates, its objects should be kept small for better performance.

There are three considerations for analytical workloads:

  1. Keep frequently accessed data close by sequentially.
  2. Make objects large, but not too large.
  3. Avoid LIST operations.

3.4 Existing Approaches for Table Storage

There are three significant approaches to managing tabular datasets:

3.4.1 Directories of Files

The first approach is to store the table as a collection of objects in a columnar format such as Parquet.

The records may be partitioned into directories based on one or more attributes.

Partitioning reduces the cost of LIST operations and reads for queries that only access a few partitions.

Apache Hive and some other systems use this approach.

There are a few challenges in this approach:

  1. There is no atomicity across multiple objects.
  2. The store has eventual consistency.
  3. It needs better performance for some operations.
  4. There is no management functionality in it.

3.4.2 Custom Storage Engines

A few storage engines ( not open-sourced) manage the consistency challenges with cloud object stores by managing metadata in a separate, strongly consistent service.

This approach requires running a highly available service to manage the metadata.

This approach also has a few challenges:

  1. All I/O operations to a table need to contact the metadata service.
  2. Connectors to existing computing engines require more engineering work to implement.
  3. The proprietary metadata service ties users to a specific service provider.

Apache Hive ACID implements a similar approach on top of HDFS.

3.4.3 Metadata in Object Stores

Delta Lake’s approach is to store a transaction log and metadata directly within the cloud object store.

It uses a set of protocols over object store operations to achieve serializability.

The data within a table is stored in Parquet format.

Apache Hudi and Apache Iceberg also support this approach.

4. Delta Lake Storage Format and Access Protocols

A Delta Lake table is a directory on a cloud object store or file system that holds data objects with the table contents and a log of transaction operations.

Clients update these data structures using optimistic concurrency control protocols.

4.1 Storage Format

The diagram below shows the storage format for a Delta table.

Objects stored in a sample Delta table

Each table is stored within a file system directory (mytable here).

The date column partitions the table. The data objects are in separate directories for each date.

Each data object in Delta has a unique name. The writer selects it by generating a GUID.

4.1.1 Data Objects

The table contents are stored in Apache Parquet objects.

4.1.2 Log

The log is stored in the _delta_log subdirectory within the table. It contains a sequence of JSON objects. The objects have increasing, zero-padded numerical IDs to store the log records. It also has occasional checkpoints for specific log objects.

Each log record object (e.g., 000003.json) contains an array of actions to apply to the previous version of the table to generate the next one. Some of the important actions are:

  1. Change Metadata
  2. Add or Remove Files
  3. Data Change
  4. Protocol Evolution

4.1.3 Log Checkpoints

Checkpoints store all the non-redundant actions in the table’s log up to a certain log record ID.

The result of the checkpointing process is a Parquet file. The file contains an add record for each object still in the table, remove records for deleted objects that need to be retained, and a small number of other records such as txn, protocol, and change metadata.

Any client may attempt to create a checkpoint up to a given log record ID. It should be written as a .parquet file for the corresponding ID. By default, clients write checkpoints for every 10 transactions.

If the new checkpoint ID is newer than the current ID in the _delta_log/_last_checkpoint file, checkpoint writers write it in that file.

4.2 Access Protocols

Delta Lake’s access protocols are designed to allow clients to make serializable transactions.

A log record object, such as 000003.json, is the “root” data structure a client needs to know to read a specific table version. Given this object’s content, the client can then query for other objects from the object store.

For transactions that perform writes, clients need a way to ensure that only a single writer can create the next log record and can then use this to implement optimistic concurrency control.

4.2.1 Reading from Tables

  1. Read the _last_checkpoint object.
  2. Use a LIST operation whose start key is the last checkpoint ID( if present) to find any newer .json and .parquet files. This provides a list files that can be used to reconstruct the table’s state starting from a recent checkpoint.
  3. To reconstruct the table's state, use the checkpoint (if present) and subsequent log records identified in the previous step.
  4. Use the statistics to identify the data object files relevant to the read query.
  5. Query the object store to read the relevant data objects, possibly in parallel across a cluster.

4.2.2 Write Transactions

  1. Identify a recent log record ID, say r, using steps 1–2 of the read protocol.
  2. Read data at table version r.
  3. Write any data objects the transaction aims to add to the table into new files. This step can happen in parallel.
  4. Attempt to write the transaction’s log record into the r + 1 .json log object. This step needs to be atomic. If the step fails, the transaction can be retried.
  5. Optionally, write a new .parquet checkpoint for log record r + 1.

Let’s understand how to do step 4: Adding Log Records Atomically.

Only one client should succeed in creating the object with that name.

  1. Google Cloud Storage and Azure Blob Store support atomic put-if-absent operations, so Delta Lake uses them.
  2. On distributed filesystems such as HDFS, Delta Lake uses atomic renames to rename a temporary file to the target name (e.g., 000004. JSON) or fail if it already exists.
  3. Databricks service deployments use a separate lightweight coordination service to ensure that only one client can add a record with each log ID.

4.3 Available Isolation Levels

All transactions that perform writes are serializable.

Read transactions can achieve either snapshot isolation or serializability.

At the time of paper writing, Delta Lake only supports transactions within one table.

4.4 Transaction Rates

The latency of the put-if-absent operations limits delta Lake’s write transaction rate when writing new log records.

In practice, the latency of writes to object stores can be tens to hundreds of milliseconds. The team found this rate sufficient for virtually all current Delta Lake applications.

5. Higher Level Features in Delta

5.1 Time Travel and Rollbacks

Delta Lake’s data objects and logs are immutable, making it straightforward to query a past snapshot of the data.

A client needs to read the table state based on an older log record ID.

Delta Lake allows users to configure a per-table data retention interval. It supports SQL AS OF timestamp and VERSION AS OF commit_id syntax for reading past snapshots.

Users can use time travel to fix errors in data pipelines.

5.2 Efficient UPSERT, DELETE and MERGE

Analytical datasets in enterprises often need to be modified over time.

Performing updates without stopping concurrent readers in traditional data lake storage formats takes a lot of work.

With Delta Lake, updates can be executed transactionally, replacing any updated objects through new add and remove records in the Delta log.

Delta Lake supports standard SQL UPSERT, DELETE, and MERGE syntax.

5.3 Streaming Ingest and Consumption

Many data teams need to deploy streaming pipelines. For traditional cloud data lakes, they need to use Apache Kafka or Amazon Kinesis.

Delta Lake is designed so that both data producers and consumers can use a table’s log in a message queue.

Write Compaction

Delta Lake allows users to run a background process that compacts small data objects transactionally without affecting readers.

Users can set the dataChange flag to false. It allows streaming consumers to ignore the compaction operations.

Exactly-Once Streaming Writes.

Writers can use the txn action type in log records to keep track of which data they wrote into a Delta Lake table.

The stream processing systems need some mechanism to make their writes available in external stores.

It could be done by ensuring that each record has a unique key in the case of overwrites. Delta lake atomically updates a “last version written” record with each write.

Delta Lake facilitates this pattern by allowing applications to update an (appId, version) pair with each transaction.

Efficient Log Tailing

The log's storage format, a series of .json objects with lexicographically increasing IDs, efficiently allows consumers to find new writes.

A consumer can object store LIST operations starting at the last log record ID to discover new ones.

Users can implement low-latency streaming pipelines by combining all three strategies.

5.4 Data Layout Optimization

Delta Lake can update the data structures transactionally (representing a table).

Delta Lake can support a variety of layout optimizations without affecting concurrent operations.

Let’s see how Delta Lake take advantage of this property to implement a number of data layout optimization features:

OPTIMIZE Command: Users can manually run an OPTIMIZE command on a table that compacts small objects without affecting ongoing transactions.

Z-Ordering by Multiple Attributes: Delta Lake supports reorganizing the records in a table in Z-order. The Z-order curve is an easy-to-compute space-filling curve that creates locality in given dimensions. Users can set a Z-order specification on a table and then run OPTIMIZE to move a desired subset of the data into Z-ordered objects.

5.5 Caching

Databricks built a feature on top of Delta Lake to cache data on clusters. Caching is safe because data, log, and checkpoint objects in Delta Lake tables are immutable.

5.6 Audit Logging

Delta Lake’s transaction log can be used for audit logging.

Databricks offers an execution mode for Spark clusters where user-defined functions cannot access cloud storage directly. Users can view the history of a Delta Lake table using the DESCRIBE HISTORY command. An example is shown in the below diagram:

DESCRIBE HISTORY output for a Delta Lake table

5.7 Schema Evolution and Enforcement

Delta Lake can perform schema changes and update the underlying objects and schema changes if needed.

Delta Lake keeps the history of schema updates in the transaction log. This allows using older Parquet objects without rewriting them for certain schema changes.

Delta clients ensure that newly written data follows the table’s schema.

5.8 Connectors to Query and ETL Engines

Delta Lake provides full-fledged connectors to Spark SQL and Structured Streaming using Apache Spark’s data source API. For more details, read my post on the Apache Structured Streaming Paper.

Delta Lake currently provides read-only integrations with several other systems: Apache Hive, Presto, AWS Athena, AWS Redshift, and Snowflake.

6. Delta Lake Use Cases

Delta Lake is currently in active use by thousands of Databricks customers. It processes exabytes of data per day.

The data types stored in Delta Lake include Change Data Capture (CDC) logs from enterprise OLTP systems, application logs, time series data, graphs, aggregate tables for reporting, and image or feature data for machine learning (ML).

Applications running on this data include SQL workloads, business intelligence, streaming, data science, machine learning, and graph analytics.

6.1 Data Engineering and ETL

Many organizations are migrating ETL/ELT and data warehousing workloads to the cloud to simplify their management.

Some are augmenting traditional enterprise data sources with much larger data streams from other sources for downstream data and machine learning applications.\

Delta Lake’s ACID transactions, UPSERT/MERGE support, and time travel features allow these organizations to reuse existing SQL queries to perform their ETL process directly on the object store.

6.2 Data Warehousing and BI

To enable interactive query workloads such as business intelligence (BI), traditional data warehouse systems combine ETL/ELT functionality with efficient tools to query.

Delta Lake supports required features directly for tables in a cloud object store.

Most Delta Lake users run ad-hoc query and BI workloads against their lakehouse datasets through SQL directly or BI software such as Tableau.

6.3 Compliance and Reproducibility

Users leverage the audit logging feature for data governance. Delta Lake’s time travel support is useful for reproducible data science and machine learning.

6.4 Specialized Use Cases

There are some very industry-specific use cases. Some of those are listed below

  • Computer System Event Data
  • Bioinformatics
  • Media Datasets for Machine Learning

7. Discussions and Limitations

Delta Lake’s design is attractive because it does not require any other heavyweight system to mediate access to cloud storage.

Delta Lake’s support for ACID enables other powerful performance and management features.

There are a few limitations as of now:

  1. Delta Lake currently only provides serializable transactions within a single table.
  2. Delta Lake is limited by the latency of the underlying cloud object store for streaming workloads.
  3. Delta Lake does not currently support secondary indexes.

8. Related Work

Building a database on S3 paper explored building an OLTP database system over S3.

Bolt-on Causal Consistency paper implements causal consistency on top of eventually consistent key-value stores.

Google BigQuery, AWS Redshift Spectrum, and Snowflake are OLAP DBMSes that can scale computing clusters separately from storage and can read data from cloud object stores.

The closest systems to Delta Lake’s design and goals are Apache Hudi and Apache Iceberg. Both define data formats and access protocols to implement transactional operations on cloud object stores.

Apache Hive ACID also implements transactions over object stores or distributed file systems.

C-Store combines high-performance transactional and analytical processing.

References

Google Dremel paper post

C-Store paper post

Meta Presto paper post

Apache Structured Streaming paper post

Spark SQL: Relational Data Processing in Spark

Bolt-on Causal Consistency

The Snowflake Elastic Data Warehouse

Hive — A Petabyte Scale Data Warehouse Using Hadoop

Amazon Aurora: Design considerations for high throughput cloud-native relational databases

Building a Database on S3 — People

--

--

Hemant Gupta
Hemant Gupta

Written by Hemant Gupta

https://www.linkedin.com/in/hkgupta/ Working on the challenge to create insights for 100 software eng papers. #100PapersChallenge, AVP Engineering at CoinDCX

No responses yet