Insights from paper: Google Vortex: A Stream-oriented Storage Engine For Big Data Analytics

Hemant Gupta
12 min readAug 23, 2024

--

1. Abstract

Most organizations are looking for data analytics over continuous data streams.

It enables them to build interactive applications and reduces the time to get insights from the data.

This paper presents Google Vortex.

It is a storage system/engine that is built inside Google BigQuery.

It supports real-time analytics for both streaming and batch data analytics.

Vortex supports petabyte-scale data ingestion with sub-second data freshness and query latency.

2. Introduction

Users are looking to extract continuous insights from streaming data.

Data warehouses and lakehouses provide managed data analysis services that are scalable, cost-effective, and easy to use.

These systems often rely on storage systems such as distributed file systems or cloud object stores. Data is typically stored in columnar storage formats.

The challenge is with streaming data. How do we push the data to a remote distributed storage as quickly as possible?

Most systems solve the above problem by buffering data locally or in a transient storage system. This data is then loaded into an analytic storage system to enable processing over it.

There are many problems with this solution, as follows:

  1. This offline batch ingestion sacrifices data freshness.
  2. The data-generating applications may have limited storage for local buffering.
  3. The Data is copied multiple times.

The paper has the following contribution to solving the problem:

  1. It guarantees ACID properties.
  2. It offers a single unified API with streaming and batch data support.
  3. It implements a fully distributed data and control plane.
  4. The Vortex API offers sub-second tail write latencies.

3. Related Work

Pravega organizes data into streams, similar to topics in other messaging systems. It supports arbitrarily large atomic transactions.

Rockset is a real-time analytics database that is similar to Vortex. It offers both batch and streaming ingestion.

4. Background

BigQuery is a fully managed, serverless data warehouse. It enables scalable analytics over petabytes of data.

BigQuery architecture

BigQuery is based on the fundamental principle of separation of storage and compute.

The compute nodes can process data stored on various distributed storage systems.

BigQuery also decouples data shuffle from compute by building on the disaggregated distributed memory.

3.1 Query Execution Engine

Google Dremel is a distributed query execution engine that powers BigQuery. Dremel supports interactive queries for analyzing petabyte-scale datasets.

BigQuery supports semistructured data.

When a query is submitted, it is routed to one of the Query Coordinator nodes. The node parses the query and creates a Logical Query Plan. After that, the node converts the Logical Query Plan to a distributed query execution plan.

The query plan can be described as a DAG (directed acyclic graph) of stages. Each stage is replicated across several workers, each running the same operation on different pieces of data.

The leaf stages of the query plan DAG are dispatched in parallel. The leaf stage has a scan operation over the data source with filters and computations.

3.2 Storage Engine

Dremel is a general-purpose distributed query execution engine.

The team created the storage engine for data management, transactional consistency, and data governance features.

BigQuery storage provides a global namespace of overall data.

Data is organized into regional containers called datasets.

A dataset contains tables, logical views, materialized views, search indexes, stored procedures, machine learning models, etc.

BigQuery offers APIs to bulk import data from object storage systems into its managed storage.

The write API allows data ingestion and analysis in real time.

The read API allows the analysis of BigQuery tables from other data analytic engines, such as Google Cloud Dataflow and Apache Spark.

BigQuery’s managed storage offering is a layer on top of BigQuery’s Core Storage Engine.

BigQuery tables can be partitioned and clustered by a set of columns in the table.

BigQuery-managed storage stores data in a proprietary columnar storage format called Capacitor.

BigLake Managed Tables (BLMTs) offer a fully managed experience with BigQuery tables. These tables store data in customer-owned cloud storage buckets using open file formats.

BigQuery stores coarse-grained metadata in Spanner, Google’s distributed, scalable, and synchronously replicated database.

BigQuery uses a columnar index on the metadata called Big Metadata.

We will focus on Vortex, the BigQuery Core Storage Engine.

5. Storage Concepts and API

BigQuery’s model supports tables with semi-structured data with repeated (ARRAY) and nested (STRUCT) columns.

Vortex provides a stream abstraction on top of a BigQuery table.

5.1 Streams

A Vortex Stream is an entity. Rows can be appended to this entity at the end.

Each row is identified by the Stream’s identifier and row’s offset within the Stream.

Readers can concurrently read a Stream at different row offsets.

A table is an unordered collection of streams.

The BigQuery users create one or more Streams to write into a Table.

5.2 API

5.2.1 Stream creation

The Vortex client first creates a Stream on the table. There are three types of streams: UNBUFFERED, BUFFERED, and PENDING.

UNBUFFERED — The input rows provided in the request have been durably committed to Vortex. These rows are guaranteed to be present in any subsequent reads of this table.

BUFFERED — The input rows have been written to Vortex but aren’t committed, so they are not visible for subsequent reads.

PENDING — The rows are not visible until the Stream is committed.

Creating a Vortex Stream:

CreateStreamOptions options;
options.set_stream_type(STREAM_TYPE_UNBUFFERED);
Stream s = CreateStream(table_name, options);

5.2.2 Appending rows

The client serializes structured or semi-structured input data using the returned stream in binary format.

Vortex supports multiple data formats, such as Protocol buffers and Avro, and is extensible to other formats, such as Apache Arrow.

Appending to a Vortex Stream:

RowSet row_set;
// Populate row_set from the input data and the schema.
// ...
// Append data to the end of a Stream.
AppendStreamResponse response = AppendStream(s, row_set,[row_offset]);

The optional row_offset parameter allows the client to specify the offset in the Stream at which it expects the rows to be appended.

A high-level architecture of BigQuery Storage

5.2.3 Flushing a stream: The data written to a BUFFERED stream is only committed once it is flushed. Flushing is achieved by invoking the FlushStream API.

Flushing a BUFFERED StreamL:

Status status = FlushStream(s, row_offset)

If FlushStream returns success, it indicates that all rows till row_offset have been committed. The FlushStream operation is idempotent.

If the current length of the Stream is less than row_offset, FlushStream returns an error.

5.2.4 Committing a stream:

In batch processing, multiple workers write to the table concurrently and independently.

Each worker creates a PENDING Stream to achieve the atomicity of these parallel distributed writes.

It writes all the data to it and reports its completion to a coordinator node.

When the coordinator receives success from all the workers, it issues a batch commit request to Vortex to commit all the Streams atomically.

Committing PENDING Streams:

std::vector<Stream> streams = GetStreams();
Status status = BatchCommitStreams(streams);

5.2.5 Finalizing a stream:

A client can write to the Stream once it is created.

When the client has finished writing, the Stream can be finalized to prevent further appending.

Status status = FinalizeStream(s);

5.2.6 Mutations:

Vortex supports mutations using the AppendStream API.

Vortex defines a unique virtual column called _CHANGE_TYPE in the table schema.

It takes three values: INSERT, UPSERT, and DELETE.

This column can be specified for each row in the row set supplied to an AppendStream request.

When a user uses only the UPSERT and DELETE change types, construction enforces the uniqueness of primary keys.

The Vortex write API is available externally as the BigQuery Storage Write API. It uses gRPC transport for high throughput transfer.

6. Architecture

6.1 Metadata concepts

Streamlets:

A BigQuery region consists of 2 or more Borg clusters. Each append to a Stream is durably written to 2 clusters before it is reported as success to the client.

Data for a Stream can be in any 2 clusters of all the available clusters in a region.

A Streamlet is a contiguous slice of rows in the Stream, all present in the same 2 clusters.

A Stream is an ordered list of one or more Streamlets.

Fragments:

Each Streamlet is further split into contiguous blocks of rows called Fragments.

Fragments typically are a range of rows inside a log file.

Log files are stored in the Google File System called Colossus.

Data formats:

BigQuery operates in two formats.

Vortex’s append API writes data to the write-optimized storage format (WOS).

The read-optimized storage format (ROS) is the format in which data is optimized for data processing.

BigQuery Managed Storage Tables use Capacitor as ROS.

BigLake Managed Tables use Parquet as ROS.

Streamlets and Fragments are internal physical metadata entities. These aren’t visible to Vortex users.

The high-level architecture of Vortex

6.2 Control Plane

The Stream Metadata Server (SMS) is the control plane of Vortex.

It manages the physical metadata of Streams, Streamlets, and Fragments.

The table’s logical metadata includes the table schema and user-defined properties such as data partitioning and clustering.

Streams provide append points into a table. Multiple clients can append to the table, and each client (typically) appends to its own Stream.

The SMS assigns a Streamlet to a specific Stream Server. The Stream Server maintains the set of fragments for the Streamlet.

6.2.1 Sharding the control plane

The Vortex SMS is deployed in multiple Borg clusters in a GCP region.

Each customer workload is assigned a primary and secondary cluster.

Vortex transparently fails over the management of the table to the secondary cluster when required.

Multiple SMS tasks exist within a single cluster. Each active table’s metadata within the cluster is managed by a single SMS task.

6.3 Data Plane

The Stream Server is the data plane of Vortex.

It owns a set of Streamlets and creates Fragments for those Streamlets.

The Stream Server has its in-memory metadata about its Streamlets and Fragments. This persists when writing to a transaction log and periodically writing checkpoints.

Each cluster often contains hundreds of Stream Servers.

A given Stream Server in a cluster can host Streamlets for any table that uses the cluster as its primary.

6.4 Client Library

Vortex is accessed through a client library, which supports reading from and writing to Vortex.

The library can retry failed read and write operations.

6.4.1 Schema Evolution

Table schema changes are effected on the SMS.

When the table schema changes, the SMS first notifies the Stream Servers with currently writable Streamlets about the new schema version via heartbeat.

The Stream Server then relays this information to the clients.

6.4.2 Unary and bi-directional RPC

The Vortex client library can adaptively switch between using a single-directional (unary) short-lived connection and a bidirectional long-lived connection.

The short-lived unary connection type supports a request-response mechanism.

The long-lived bi-directional connection type supports streaming RPCs.

6.4.3 Lifetime of Data in a Stream

When a client requests that the SMS create a new Stream, the SMS generates a unique random ID for the Stream and the first Streamlet in that Stream.

The SMS attaches the stream to the table by persisting it into Spanner.

The SMS sends an RPC to the Stream Server to create a Streamlet with the provided ID.

The SMS responds to the client that it can start appending to the Streamlet with this ID on that Stream Server.

Life of a Vortex Append request

6.4.4 Fragment File Format

Each Fragment begins with a header that contains the File Map.

The File Map lists the committed size and record ranges of all previous Fragments in the same Streamlet.

The Stream Server buffers up to 2MB of records into a single write to a Fragment.

The write includes a header specifying a single server-assigned TrueTime timestamp for all rows in the write.

The write includes a header specifying a single server-assigned TrueTime timestamp for all rows in the write.

6.4.5 Compression and Encryption

The Stream Server uses the Snappy compressor.

The typical compression ratio is 4:1 but can be 10:1 if values of string fields are common between many rows.

After compressing the data, the Stream Server encrypts the data before writing to Fragment.

Vortex also uses an end-to-end CRC to protect row data as it is sent from the client to the Stream Server, and from the Stream Server to Colossus.

6.4.6 Cross cluster/region read

The Vortex Client Library allows readers in any region or cluster to read from others.

BigQuery always prefers to run compute in the region where the data is.

6.5 Heartbeat and Load balancing

The Stream Server sends a heartbeat to each SMS every few seconds.

It informs about changes to Streamlet metadata. These changes include creating new log files, increasing the size of existing log files, and changing the column properties of data in the log files.

The Stream Server also sends its current load information (CPU and memory) to the SMS.

6.6 Disaster Resilience

The Stream Server performs synchronous replication.

It simultaneously writes fragments to two Colossus clusters before returning success to the client.

This replication provides resilience against failures of entire clusters.

The Stream Server log files written are identical in both clusters. If the Stream Server cannot continue writing to the Streamlet after using local retries, it reports the failure to the client library.

The client library informs the SMS and starts a reconciliation process.

7. Data Management

7.1 Storage Optimization

A background service continuously optimizes data in Vortex as it is written.

Vortex maintains an LSM tree of Fragments, starting with Fragments in WOS at the deepest level of the tree, with progressively more optimized ROS versions as we climb up the tree.

The first optimization step is to convert data in the WOS to ROS.

To track the lifetime of Fragments, each Fragment maintains two timestamps: a creation_timestamp and a deletion_timestamp.

Conversion ofWrite Optimized (WOS) to Read Optimized Storage (ROS)

7.2 Metadata Management

Vortex continuously tracks metadata for Streams, Streamlets, and Fragments.

The source of truth for Streamlet and Fragment metadata is persisted in the Vortex stream server’s log.

7.3 Data Verification

Vortex continuously traces requests to detect data correctness issues such as missing or duplicated records.

The system tracks all calls to the client library and saves aggregated information back to Vortex.

Vortex has data verification pipelines to validate the correctness of these operations continuously.

7.4 BigLake Managed Tables(BLMTs)

The WOS for BLMTs is in Colossus. It is present in the same way as BigQuery Managed Storage Tables.

Metadata and data management rely on the same underlying service.

Converting data from WOS to ROS involves writing Parquet files to the customer-provided cloud storage buckets.

For queries over BLMTs, read the union of the WOS data and the Parquet files.

8. Data Analytics

When Vortex SMS receives a request, it returns the union of the data in WOS and ROS.

8.1 Reading data

Query processing in BigQuery reads data in Vortex directly from Colossus without contacting the Stream Server.

8.2 Partition Elimination

Partition elimination/pruning is a common technique to improve query performance.

The query coordinator inspects the filter condition and eliminates the scan of the partitions that cannot possibly satisfy the filter condition.

8.3 Mutations

Vortex allows a range of rows in a Fragment or Streamlet to be marked as deleted.

A DELETE statement first determines the candidate rows to be marked deleted and, at commit time, persists a deletion mask to the Streamlet or Fragment metadata.

8.4 Exactly-once Processing

The Vortex allows analytic engines such as BigQuery and Dataflow to achieve end-to-end exactly-once processing for both ingress and egress.

The paper provides an example of this using Google Dataflow. Curious readers should explore that, but I am omitting it as it requires an understanding of Dataflow.

8.5 Unifying batch and streaming

Apache Beam introduced a unified model for defining parallel data processing pipelines for streaming and batch analysis.

The Vortex API brings that same unification to data ingestion into a data warehouse.

Vortex guarantees ACID semantics on arbitrarily large transactions using PENDING streams.

9. Conclusion

Vortex presents a highly scalable ingestion and low latency retrieval of streaming data for real-time analysis.

Vortex supports long-running coarse and short-running fine-grained transactions.

The team first built the streaming system and then tuned it for the cost trade-offs of batch scenarios.

Vortex supports semi-structured data types also.

References

Google Spanner paper post

Google Dremel paper post

Apache Spark paper post

Slicer: Auto-Sharding for Datacenter Applications

Big Metadata: When Metadata is Big Data

Rockset Concepts, Design and Architecture

Pravega: A Tiered Storage System for Data Streams

A peek behind Colossus, Google’s file system

Inside Capacitor, BigQuery’s Next-Generation Columnar Storage Format

Large-scale cluster management at Google with Borg

--

--

Hemant Gupta

https://www.linkedin.com/in/hkgupta/ Working on the challenge to create insights for 100 software eng papers. #100PapersChallenge, AVP Engineering at CoinDCX