Insights from Paper — Google F1: A Distributed SQL Database That Scales

Hemant Gupta
15 min readApr 22, 2023

--

Introduction

F1 is a fault-tolerant, globally-distributed relational, OLTP and
OLAP
database built at Google.

F1 is a hybrid database that combines high availability, the scalability of
NoSQL systems like Bigtable and the consistency and usability of traditional SQL databases.

It was designed to replace a sharded MySQL implementation that could not meet growing scalability and reliability requirements.

F1 was built as the new storage system for Google’s AdWords system.

The key design goals of F1 are:

  1. Scalability
  2. Availability
  3. Consistency
  4. Usability

The general understanding is that these design goals are mutually exclusive. This paper shows how Google has achieved all of these goals in F1’s design and where they made trade-offs and sacrifices.

F1 is built on Google Spanner, which provides extremely scalable data storage, synchronous replication, and strong consistency and ordering properties. You can read my post on Google Spanner paper here.

F1 adds a few more features like:

  1. Distributed SQL queries
  2. Transactionally consistent secondary indexes
  3. Asynchronous schema changes
  4. Optimistic transactions
  5. Automatic change history recording and publishing

Google’s design choices in F1 result in higher latency for typical reads and writes. The team has developed techniques to hide that increased latency.

The F1 system has been managing all AdWords advertising campaign data in production since early 2012. Adwords database is over 100 TB, serves up to hundreds of thousands of requests per second, and runs SQL queries that scan tens of trillions of data rows daily.

Basic Architecture

In the below diagram, a simple structure for basic architecture is shown.

The Basic Architecture of F1 System

Users interact with F1 through the F1 client library. The client sends requests to one of many F1 servers, which are responsible for reading
and writing data from remote data sources and coordinating query execution.

F1 servers are typically co-located in the same set of data centers as the Spanner servers storing the data. In turn, the Spanner servers in each data center retrieve their data from the Colossus File System (CFS) in the same data center. The CFS is the next generation of The Google File System (GFS).

Read my posts in the reference section to learn more about the GFS paper and other papers.

CFS is not a globally replicated service; therefore, Spanner servers will never communicate with remote CFS instances.

F1 servers are primarily stateless. The one exception is when a client uses pessimistic transactions and must hold locks.

An F1 cluster has several additional components.

The shared slave pool consists of F1 processes that exist only to execute parts of distributed query plans on behalf of regular F1 servers. The F1 master maintains slave pool membership.

F1 also supports large-scale data processing through Google’s MapReduce framework. MapReduce workers can communicate directly with Spanner servers to extract data in bulk.

The throughput of the entire system can be scaled up by adding more Spanner servers, F1 servers, or F1 slaves.

The data is synchronously replicated across multiple data centers. The result is that the commit latencies are relatively high (50–150 ms).

Google Spanner

Spanner handles lower-level storage issues like persistence, caching, replication, fault tolerance, data sharding, movement, location lookups, and transactions.

In Spanner, data rows are partitioned into clusters called directories using ancestry relationships in the schema. Groups store a collection of directory fragments. Each group typically has one replica tablet per data center.

Data is replicated synchronously using the Paxos algorithm. One replica tablet is a leader. The leader handles all requests. Groups may have read-only replicas which do not vote in the Paxos algorithm for leader election.

Spanner provides serializable pessimistic transactions using strict two-phase locking. Spanner supports transactions across multiple groups, using a two-phase commit (2PC) protocol on top of Paxos.

Spanner has very strong consistency and timestamp semantics. Spanner uses TrueTime API to pick globally ordered timestamps. Spanner uses these
timestamps to provide multi-versioned consistent reads, including snapshot reads of current data, without taking read locks.

Data Model

Hierarchical Schema

Logically F1 has a relational schema similar to that of a traditional RDBMS.

Logically, tables in the F1 schema can be organized into a hierarchy. The child table must have a foreign key to its parent table as a prefix of its primary key.

For example, the Ad-Words schema contains a table Customer with a primary key(CustomerId), which has a child table Campaign with pri-
mary key (CustomerId, CampaignId), CampaignId), which in turn has
a child table AdGroup with a primary key (CustomerId, CampaignId, AdGroupId).

A row of the root table in the hierarchy is called a root row.

All child table rows corresponding to a root row are clustered together with that root row in a single Spanner directory.

Child rows are stored under their parent row, ordered by the primary key. See the below diagram.

Logical and Physical Schema compared for F1 database and traditional RDBMs

The hierarchically clustered physical schema has several advantages over the relational schema.

In the traditional schema, fetching all Campaign and AdGroup records corresponding to a given CustomerId would take two sequential steps because there is no direct way to retrieve AdGroup records by CustomerId.

In the F1 schema, the hierarchical primary keys allow the fetches of Campaign and AdGroup records to be started in parallel because CustomerId keys are in both tables.

Hierarchical clustering is especially useful for updates. Each root row and its descendant rows are stored in a single Spanner directory, so transactions restricted to a single root usually will not need 2PC.

Limiting the number of roots involved is essential because adding
more participants increases latency and decreases the likelihood of a successful commit.

Hierarchical clustering is not mandatory in F1.

Protocol Buffers

The F1 data model supports table columns that contain structured data types.

The structured types use the Protocol Buffers library for schema and binary encoding format. Protocol Buffers have typed fields that can be required, optional, or repeated. Fields can also be nested.

In F1 schema designs, we often use repeated fields instead of child tables when the number of child records has a low upper bound. Many tables in an F1 schema consist of just a Protocol Buffer column.

Other tables split their data across several columns, partitioning the fields according to access patterns.

Tables can be partitioned into columns:

  1. To group together fields that are usually accessed together
  2. To separate fields with static and frequently updated data
  3. To allow specifying different read/write permissions per column
  4. To allow concurrent updates to different columns.

Using fewer columns generally improves performance in Spanner, where there can be high per-column overhead.

Indexing

All indexes in F1 are transactional and fully consistent. Indexes are stored as separate tables in Spanner.

The composite key of the index is a concatenation of the index key and the indexed table’s primary key.

There are two physical storage layouts for F1 indexes: local and global.
Local index keys must contain the root row primary key as a prefix.

Global index keys do not include the root row primary key as a prefix and hence cannot be co-located with the rows they index.

Global indexes can cause scaling problems for large transactions.
Consider a transaction that inserts 1000 rows. Each row requires adding one or more global index entries. Those index entries could be arbitrarily spread across 100s of index directories, meaning the 2PC transaction would have 100s of participants, making it slower and more error-prone.

Schema Changes

F1 is designed to make all schema changes fully non-blocking. Several aspects make non-blocking schema changes particularly challenging:

  1. F1 is a massively distributed system with servers in multiple data centers in distinct geographic regions.
  2. Each F1 server has a schema locally in memory. It is not practical to make an update occur atomically across all servers.
  3. Queries and transactions must continue on all tables, even those undergoing schema changes.
  4. System availability and latency must not be negatively impacted during schema changes.

F1 schema changes are applied asynchronously. This implies that two F1
servers may update the database concurrently using different schemas.
If two F1 servers update the database using different schemas that are not compatible, this could lead to anomalies, including database corruption.

F1 has implemented a schema change algorithm that prevents anomalies.

  1. F1 servers can have at most two different schemas active. Each server uses either the current or the next schema.
  2. Subdivide each schema change into multiple phases. Each consecutive pairs of phases are mutually compatible and cannot cause anomalies.

Transactions

One of the most critical properties of F1 is complete transactional consistency. F1 implements three types of transactions, all built on top of
Spanner’s transaction support:

  1. Snapshot transactions — These are read-only transactions with snapshot semantics. Snapshot transactions are usually read at Spanner’s global safe timestamp, typically 5–10 seconds old, and from a local Spanner replica.
  2. Pessimistic transactions — These transactions are mapped directly on to Spanner transactions. Pessimistic transactions require holding locks. All requests in a pessimistic transaction are directed to the same F1 server.
  3. Optimistic transactions — These consist of a long read phase (without locks) and a short write phase.

Let’s understand the steps involved in optimistic transactions:

(1) F1 returns with each row its last modification timestamp also. This timestamp is stored in a hidden lock column of the row. The new commit timestamp is automatically written into the lock column for every update.

(2) The client passes this timestamp to the F1 server.

(3) The F1 server creates a short-lived Spanner pessimistic transaction and re-reads the last modification timestamps for all read rows.

(4) If any of the re-read timestamps differ from what was passed in by the client, there was a conflicting update, and F1 aborts the transaction. Otherwise, F1 sends the writes on to Spanner to finish the commit.

By default, F1 clients use optimistic transactions. They have several benefits:

  • Long-lasting transactions
  • Server-side ability to re-try
  • Server failover
  • Speculative writes

F1 users can arbitrarily mix optimistic and pessimistic transactions and preserve ACID semantics. All F1 writes update the last modification timestamp on every relevant lock column.

Flexible Locking Granularity

F1 provides row-level locking by default. Each F1 row contains one default lock column that covers all columns in the same row.

Concurrency levels can be increased in the row. Users can increase concurrency by defining additional lock columns in the same row, with each lock column covering a subset of columns. In an extreme case, the subset can be a single column.

One common use for column-level locking is in tables with concurrent writers, where each updates a different set of columns.

Users can also selectively reduce concurrency by using a lock column in a parent table to cover columns in a child table.

Change History

In F1, Change History is a first-class feature at the database level.

Every transaction in F1 creates one or more ChangeBatch Protocol Buffers. They include the primary key and before and after values of changed columns for each updated row.

ChangeBatches are written into normal F1 tables that exist as children of each root table.

The primary key of the ChangeBatch table includes the associated root table key and the transaction commit timestamp. When a transaction updates data under multiple root rows, one ChangeBatch is written for each distinct root row.

F1’s ChangeHistory mechanism has a variety of uses. The most common use is in applications that want to be notified of changes and then do some incremental processing. Change History also gets used in exciting ways for caching.

Client Design

Simplified ORM

The client application code written using the ORM library exhibited several common ORM anti-patterns like the below:

  1. Obscuring database operations from developers.
  2. Serial reads, including for loops that do one query per
    iteration.
  3. Implicit traversals

For F1, the team replaced the ORM layer with a new, stripped-down API that forcibly avoids these anti-patterns.

The new ORM layer does not use any joins. All object loading is explicit. The layer exposes APIs for parallel and asynchronous read access.

NoSQL Interface

F1 supports a NoSQL key/value-based interface for easy access to rows.

Read requests can include any set of tables, any specific columns, and key ranges for each.

Write requests specify inserts, updates, and deletes by primary key, with any new column values, for any set of tables.

The ORM layer under the hood uses this interface.

This API allows for batched retrieval of rows from multiple tables in a
single call.

SQL Interface

F1 also provides a fully fledged SQL interface, which is used for low-latency OLTP queries, large OLAP queries, and everything in between.

F1 supports joining data from its Spanner data store with other data sources, including Bigtable, CSV files, and aggregated analytical data warehouses.

Updates are also supported using SQL data manipulation statements. It also supports updating fields inside protocol buffers.

QUERY PROCESSING

The F1 SQL query processing system has the following key properties:

  1. Queries are executed as low-latency centrally executed or distributed queries with high parallelism.
  2. All data is remote and batching is used heavily to mitigate network latency.
  3. All input data and internal data are arbitrarily partitioned.
  4. Queries use many hash-based repartitioning steps.
  5. Individual query plan operators are designed to stream
    data to later operators as soon as possible.
  6. Hierarchically clustered tables have optimized access methods.
  7. Query data can be consumed in parallel.
  8. Protocol Buffer-valued columns provide first-class sup-
    port for structured data types.
  9. Spanner’s snapshot consistency model provides globally consistent results.

Central and Distributed Queries

Centralized execution is used for short OLTP-style queries, and the entire query runs on one F1 server node.

Distributed execution is used for OLAP-style queries and spreads the query workload over worker tasks in the F1 slave pool.

Distributed queries always use snapshot transactions. The query optimizer uses heuristics to determine which execution mode is appropriate for a given query.

Distributed Query Example

SELECT agcr.CampaignId, click.Region,cr.Language, SUM(click.Clicks)
FROM AdClick click
JOIN AdGroupCreative agcr
USING (AdGroupId, CreativeId)
JOIN Creative cr
USING (CustomerId, CreativeId)
WHERE click.Date = '2013-03-23'
GROUP BY agcr.CampaignId, click.Region,cr.Language

Let’s understand the AdWords schema.

AdGroup — a collection of ads. Creative- The actual ad text.

AdGroupCreative — This table is a link table between AdGroup and Creative.

Multiple AdGroups can share creatives.

Each AdClick records the Creative that the user was shown and the AdGroup from which the Creative was chosen.

This query takes all AdClicks on a specific date, finds the corresponding Ad-GroupCreative, and then the Creative. It then aggregates to find the number of clicks grouped by campaign, region, and language.

A possible query plan for this query is shown in the below diagram.

A distributed query plan. Rounded boxes represent processes running on separate machines. Arrows show data flow within a process or over the network in RPCs.

Data is streamed bottom-up through each operator until the aggregation operator.

The deepest operator performs a scan of the AdClick table. In
the same worker node, the data from the AdClick scan flows into a lookup join operator, which looks up AdGroupCreative records using a secondary index key.

The plan then repartitions the data stream by a hash of the CustomerId and CreativeId and performs a lookup in a hash table that is partitioned similarly (a distributed hash join).

After the distributed hash join, the data is once again repartitioned, this time by a hash of the CampaignId, Region, and Language fields, and then fed into an aggregation operator that groups by those same fields (a distributed aggregation).

Remote Data

SQL query processing and join processing pose some exciting challenges in F1. The reason is F1’s central data store Spanner. It is the remote data source. Also, F1 SQL can access other remote data sources. These remote data accesses involve highly variable network latency.

Network latency and disk latency are fundamentally different in two ways.

First, network latency can be mitigated by batching or pipelining data accesses. F1 uses extensive batching to mitigate network latency.

Secondly, F1’s network-based storage is typically distributed over many disks. So scheduling multiple data accesses in parallel often results in near-linear speedup until the underlying storage system is overloaded.

Distributed Execution Overview

The structure of a distributed query plan is as follows.

A full query plan consists of potentially tens of plan parts, each representing several workers executing the same query sub-plan.

The plan parts are organized as a directed acyclic graph (DAG), with data
owing up from the leaves of the DAG to a single root node.

The root node, also called the query coordinator, is executed by the
server that receives the incoming SQL query request from a client.

The query coordinator plans the query for execution, receives results from the penultimate plan parts, performs any final aggregation, sorting, or filtering, and then streams the results back to the client.

Hierarchical Table Joins

The F1 data model supports hierarchically clustered tables, where the rows of a child table are interleaved in the parent table.

For instance, consider the join of table Customer with table Campaign:

SELECT *
FROM Customer JOIN Campaign
USING (CustomerId)

F1 performs this join using a single request to Spanner, in which F1 requests the data from both tables. Spanner will return the data to F1 in interleaved order (a pre-order depth-first traversal), ordered by primary key.

Customer(3)
Campaign(3,5)
Campaign(3,6)
Customer(4)
Campaign(4,2)
Campaign(4,4)

F1 uses a merge-join-like algorithm which is called cluster join. The cluster join operator only needs to buffer one row from each table, and returns
the joined results in a streaming fashion as the Spanner input data is received.

Partitioned Consumers

There are two possible bottlenecks for data consumption end.

F1 queries can produce vast amounts of data; pushing this data through a single query coordinator can be a bottleneck.

Also, a single client process receiving all the data can be a bottleneck.

F1 allows multiple client processes to consume sharded streams of data from the same query in parallel. This feature is used for partitioned consumers like MapReduce.

Queries with Protocol Buffers

The F1 SQL dialect treats protocol buffer values as first-class objects. It provides full access to all of the data contained in the object.

For example, the following query requests the CustomerId and the entire Protocol Buffer valued column Info for each customer whose country
code is US.

SELECT c.CustomerId, c.Info
FROM Customer AS c
WHERE c.Info.country_code = 'US'

First, queries use path expressions to extract individual fields.

Second, F1 SQL allows for querying and passing around entire protocol
buffers (c.Info).
Protocol Buffers also allow repeated fields, which may have zero or more instances, i.e., they can be regarded as variable-length arrays.

F1 SQL supports access to repeated fields using PROTO JOIN.
F1 SQL also allows subqueries on repeated fields in Protocol Buffers.

Deployment

In this section, the paper talks about F1 deployed for Adwords. F1 uses five data centers spread out across the mainland US. The Spanner configuration uses 5-way Paxos replication to ensure high availability.

Normally, 3-way replication is sufficient for high availability. When one data center is down, both surviving replicas must remain available for F1 to commit transactions because a Paxos commit must succeed on a majority of replicas. The entire database becomes completely unavailable if a second data center goes down.

Spanner’s Paxos implementation designates one of the replicas as a leader. All transactional reads and commits must be routed to the leader replica.

Transaction latency is best when clients and F1 servers are co-located with Spanner leader replicas. F1 designates one of the data centers as a preferred leader location. Spanner locates leader replicas in the preferred leader location whenever possible.

Latency And Throughput

F1 users see read latencies of 5–10 ms and commit latencies of 50–150 ms.

Commit latency is largely determined by network latency between data centers. A commits require a round trip from the leader to the two nearest replicas. Multi-group commits require 2PC, which typically doubles the minimum latency.

Despite the higher database latency, overall user-facing latency for the main interactive AdWords web application averages about 200ms.

For non-interactive applications that apply bulk updates, F1 optimizes for throughput rather than latency. F1 typically structure such applications so they do small transactions, scoped to single Spanner directories when possible, and use parallelism to achieve high throughput.

Related Work

F1’s relational query execution techniques are similar to those in the shared-nothing database literature.

F1’s NoSQL capabilities share properties with other well-described scalable key-value stores, including Bigtable, HBase, and Dynamo.

The hierarchical schema and clustering properties are similar to Megastore.

Optimistic transactions have been used in previous systems, including Percolator and Megastore.

MDCC suggests some Paxos optimizations that could reduce the overhead of multi-participant transactions.

Conclusions

The conventional wisdom in engineering assumes that if you need a highly scalable, high throughput data store, the only viable option is to use a NoSQL key/value store. And in doing this, we have to work around the lack of ACID transactional guarantees and the lack of conveniences
like secondary indexes, SQL, and so on by ourselves.

Google built F1, a distributed relational database system that combines high availability, the throughput and scalability of NoSQL systems, and the
functionality, usability, and consistency of traditional relational databases, including ACID transactions and SQL queries.

References

My posts

Dynamo Part I: https://medium.com/@hemant-gupta/insights-from-paper-part-i-dynamo-amazons-highly-available-key-value-store-a9dd3485b51b

Dynamo Part II: https://medium.com/@hemant-gupta/insights-from-paper-part-ii-dynamo-amazons-highly-available-key-value-store-80a53e3f62da

DynamoDB Part I: https://medium.com/@hemant-gupta/insights-from-paper-part-i-amazon-dynamodb-a-scalable-predictably-performant-and-fully-6d93dfbfe2fb

DynamoDB Part II: https://medium.com/@hemant-gupta/insights-from-paper-part-ii-amazon-dynamodb-a-scalable-predictably-performant-and-fully-162fbc0ced93

Cassandra: https://medium.com/@hemant-gupta/insights-from-paper-cassandra-a-decentralized-structured-storage-system-e822d53215a4

Google Bigtable: https://medium.com/@hemant-gupta/insights-from-paper-bigtable-a-distributed-storage-system-for-structured-data-1eea26ee0f3a

The Google File System: https://medium.com/@hemant-gupta/insights-from-paper-the-google-file-system-1c9d7b32c8cc

Google Spanner: https://medium.com/@hemant-gupta/insights-from-paper-spanner-googles-globally-distributed-database-2da23c245722

Google Dremel: https://medium.com/@hemant-gupta/insights-from-paper-google-dremel-interactive-analysis-of-webscale-datasets-b9466d77bfa1

Google Percolator: https://hemant-gupta.medium.com/insights-from-paper-google-percolator-large-scale-incremental-processing-using-distributed-b8f3ac98da5d

Paper references

Google Megastore: https://research.google/pubs/pub36971.pdf

Google Bigtable: https://research.google.com/archive/bigtable-osdi06.pdf

GFS: https://research.google.com/archive/gfs-sosp2003.pdf

Google Spanner: https://research.google.com/archive/spanner-osdi2012.pdf

Protocol Buffers: https://developers.google.com/protocol-buffers/

MapReduce: https://research.google.com/archive/mapreduce-osdi04.pdf

Dynamo: https://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf

MDCC: http://mdcc.cs.berkeley.edu/mdcc.pdf

--

--

Hemant Gupta
Hemant Gupta

Written by Hemant Gupta

https://www.linkedin.com/in/hkgupta/ Working on the challenge to create insights for 100 software eng papers. #100PapersChallenge, AVP Engineering at CoinDCX

No responses yet