Insights from paper: Citus: Distributed PostgreSQL for Data-Intensive Applications

Hemant Gupta
12 min readAug 28, 2024

--

1. Abstract

Citus is an open-source distributed database engine for PostgreSQL

Citus is implemented as an extension for PostgreSQL. It creates a cluster of PostgreSQL servers.

Users can distribute data, queries, and transactions in Citus to cater to the needs of data-intensive applications.

2. Introduction

PostgreSQL is one of the most popular open-source database management systems.

PostgreSQL’s functionality can be extended by creating extensions.

The team has developed an open-source PostgreSQL extension (plug-in) called Citus.

Citus aims to address the scalability needs within the PostgreSQL ecosystem.

Three different approaches offer compatibility with an existing relational database system.

  1. Build the database engine from scratch and write a layer for over-the-wire SQL compatibility.
  2. Fork open-source database systems and build new features on top of it.
  3. Provide new features through a middleware layer between the application and the database.

Citus is the first distributed database that delivers its functionality through the PostgreSQL extension APIs.

Citus distributes data across regular PostgreSQL servers and sends queries over the regular PostgreSQL protocol. It means Citus can utilize all the data access and storage capabilities offered by the underlying PostgreSQL servers.

Four different kinds of workloads benefit from scaling out.

  1. Multi-tenant / SaaS (MT)
  2. Real-time analytics (RA)
  3. High-performance CRUD (HC)
  4. Data warehousing (DW)

Let’s understand these workloads in detail.

3. Workload Requirements

The diagram below gives an overview of the approximate scale requirements in each workload pattern.

Scale requirements of workload patterns.

Each workload pattern requires a different combination of distributed database capabilities to achieve its meaning of high performance at scale.

The diagram below details these capabilities for all four workloads:

Workload patterns and required capabilities.

3.1 Multi-tenant(MT)

A single backend deployment can be used to serve many tenants. It is called a multi-tenant system. A typical example is Software-as-a-Service (SaaS).

One of the challenges in scaling a multi-tenant workload is that the working set is relatively large due to the large number of independent tenants.

The traditional solution for the above problem is sharding, in which the data for each tenant is placed into its database or schema/namespace.

Another solution is to use a shared schema with tenant ID columns. Tables containing tenant-specific data should all have a tenant ID column to be distributed and co-located by tenant ID.

The database system should then be able to route SQL queries by tenant ID to the appropriate server.

Reference tables (replicated across servers) are needed for (local) joins.

There are two more requirements from customers:

  1. They may need the flexibility to customize data for specific tenants. In this case, the recommended solution is to use the JSONB data type.
  2. They may need control over tenant placement to avoid issues with noisy neighbors. Citus provides features to view hotspots to isolate a tenant onto its server.

3.2 Real-time Analytics

Some applications must provide interactive analytics or search on large data streams with only minor delay.

Examples of the data streams are event data or time series data describing telemetry from devices or software.

These real-time analytics applications are multi-user analytics dashboards. They visualize aggregations of the data through charts.

The database must sustain a high write throughput and serve hundreds of analytical queries per second with sub-second latency.

A simple real-time analytics pipeline.

The diagram above shows an example of a typical real-time analytics pipeline.

PostgreSQL has the following capabilities for building real-time analytics applications.

  1. The heap storage format and COPY command allow high-speed data ingestion.
  2. MVCC allows analytical queries to run concurrently with writes.
  3. A versatile set of data types and index types helps represent data easily.

There are two challenges, though. The data volume can easily exceed the capacity of a single server, and most operations are single-threaded.

To scale real-time analytics workloads, the database system must be able to distribute tables across servers and support parallel bulk loading.

Parallel INSERT…SELECT is needed to incrementally pre-aggregate large volumes of incoming data into rollup tables.

Parallel distributed SELECT on the event data or the rollup tables is required to keep response times low.

Co-located distributed joins are needed for advanced analytics.

Citus supports all these capabilities required for building large-scale real-time analytics dashboards on PostgreSQL.

3.3 High-performance CRUD

Applications can access the data through simple CRUD operations or complex queries across objects. Usually, these objects are in unstructured data formats like JSON.

PostgreSQL provides very sophisticated JSON support.

There can be two types of scalability challenges. The first type occurs when the working set is too large, and the second type occurs when the change rate is high for large objects.

The PostgreSQL MVCC model requires writing a new copy and reclaiming the space later(auto-vacuuming). If auto-vacuuming cannot keep up, it can cause performance degradation.

To achieve high performance on CRUD, the tables need to be distributed by key. In this way, CRUD operations can then be routed to a shard.

Another benefit is that auto-vacuuming is now parallelized across many cores.

Citus meets all the requirements for high-performance CRUD workloads except connection scalability, which was a work in progress at the time of the paper’s writing.

3.4 Data warehousing

Queries from data warehousing applications may need to scan massive amounts.

PostgreSQL generally needs more scanning performance and parallelism to perform huge scans quickly.

There are a few requirements for it:

  1. Scans must be sped up through parallel, distributed SELECT and columnar storage.
  2. Distribution columns should be chosen to maximize the number of co-located distributed joins.
  3. The database must also support efficient non-co-located joins by reshuffling or broadcasting data over the network.
  4. The query optimizer must decide on a join order that minimizes network traffic.

Citus meets most of the above requirements but has several limitations around non-co-located joins.

4. Citus Architecture

A PostgreSQL extension consists of two parts:

  1. A set of SQL objects (metadata tables, functions, types, etc.)
  2. A shared library that is loaded into PostgreSQL

Other than query parser, all database modules within PostgreSQL are extensible. The parser code is generated at build time, so it can not be changed, but the extension infrastructure is loaded at the run time.

After the extension is loaded, the behavior of PostgreSQL can be altered by setting specific hooks. Citus uses the following hooks:

4.1 PostgreSQL extension APIs

User-defined functions (UDFs) — UDFs are callable from SQL queries as part of a transaction.

Planner and executor hooks — These are global function pointers that allow an extension to provide an alternative query plan and execution method.

CustomScan — It is an execution node in a PostgreSQL query plan that holds a custom state and returns tuples via custom function pointers.

Transaction callbacks — These are called at critical points in a transaction’s lifecycle, such as pre-commit, post-commit, and abort.

Utility hook — It is called after parsing any command that does not go through the regular query planner.

Background workers — It runs user-supplied code in separate processes.

4.2 Citus architecture diagram

The diagram below shows that Citus deployments typically have one coordinator and 0-n worker nodes.

Example Citus deployment, showing the coordinator and two worker nodes

The coordinator stores the distributed tables’ metadata (catalogs), and clients typically connect to the coordinator.

Worker nodes store the shards that contain the actual data. The coordinator can also be a worker node.

A large coordinator node can handle hundreds of thousands of transactions per second or ingest millions of rows per second.

4.2.1 Scaling the coordinator node

Sometimes, the coordinator can be a bottleneck.

Citus can distribute the distributed table metadata and all its changes to all the nodes.

In this case, each worker node assumes the coordinator role for all distributed queries and transactions it receives.

Clients should use a load-balancing mechanism to divide connections among the workers.

When each worker node acts as a coordinate, the Citus cluster creates one or more connections. Citus caches connections for higher performance, which could lead to a connection scaling bottleneck within the cluster.

This issue can be mitigated by setting up connection pooling between the instances via PgBouncer.

4.3 Citus table types

Citus introduces two types of tables to PostgreSQL:

  1. Distributed tables
  2. Reference tables

Distributed tables

Distributed tables are hash-partitioned along a distribution column into multiple logical shards, each containing a contiguous range of hash values.

The create_distributed_table UDF converts a regular table to a distributed table.

CREATE TABLE my_table (. . . );
SELECT create_distributed_table('my_table','distribution_column');

Shards are placed on worker nodes in a round-robin fashion.

Co-location

Citus ensures that the same range of (hash) values are always on the same worker node.

Relational operations like joins and foreign keys involving the distribution column of two or more co-located tables can be performed without any network traffic by operating on pairs of co-located shards.

When creating a second distributed table, co-location can be specified using the colocate_with option:

CREATE TABLE other_table (. . . );
SELECT create_distributed_table(
'other_table','distribution_column', colocate_with := 'my_table'
);

Reference tables

Reference tables are replicated to all nodes in a Citus cluster.

Joins between distributed tables and reference tables are implemented by joining each distributed table shard with the reference table’s local replica.

Users create reference tables by calling create_reference_table:

CREATE TABLE dimensions (. . . );
SELECT create_reference_table('dimensions');

4.4 Data rebalancing

Citus provides a shard rebalancer to enable even data distribution among workers. The rebalancer moves shards until it reaches an even number of shards across worker nodes.

Users can rebalance based on data size or create a custom policy.

4.5 Distributed Query Planner

Citus has a distributed query planner that produces a PostgreSQL query plan.

The PostgreSQL query plan contains a CustomScan node, which includes the distributed query plan.

A distributed query plan consists of tasks (queries on shards) to run on the workers. The plan can optionally have a set of sub-plans whose results need to be broadcast or re-partitioned.

Citus has planners for different classes of queries.

The diagram above shows a basic example for each planner.

  1. The Fast path planner handles simple CRUD queries on a single table with a single distribution column value.
  2. The Router planner handles arbitrarily complex queries that can be scoped to one set of co-located shards.
  3. The Logical planner handles queries across shards by constructing a multi-relational algebra tree.

3. a. The Logical pushdown planner detects whether the join tree can be fully pushed down.

3.b. The Logical join order planner determines the optimal execution order for join trees involving non-co-located joins.

4.6 Distributed query executor

A PostgreSQL query plan is a tree of execution nodes.

We have discussed that the plans generated by the distributed query planner contain a CustomScan node. The CustomScan nodes call the distributed query executor.

If the plan was generated via the fast path or router planner, the entire plan is a single CustomScan node.

Plans generated by the logical planner may require a merge step.

Call flow for the execution of a query planned by the logical pushdown planner

When the PostgreSQL executor calls the CustomScan, Citus hands the execution to a component called the adaptive executor. It is shown in the diagram above.

Adaptive executor

Citus handles different kinds of workloads. Adaptive executor helps in it.

Some query plans will have a single task, while others have many tasks.

The adaptive executor manages the parallelism vs. low latency trade-off using a technique called slow start.

At the start of the query, the executor can use one connection to each worker (n=1).

Every 10ms, the number of new connections increases by one (n=n+1). If there are t pending tasks for a worker node that are not assigned to a specific connection, the executor increases that worker node’s connection pool by min(n,t) new connections.

Slow start increases the number of connections when tasks take a long time to complete. The executor also keeps track of the total number of connections to each worker node in shared memory to prevent it from exceeding a shared connection limit.

4.7 Distributed transactions

A distributed transaction in Citus is composed of the following:

  1. A transaction on the coordinator (initiated by the client)
  2. One or more transactions on worker nodes (initiated by the coordinator)

If the transaction involves a single worker node, the coordinator delegates the responsibility to that worker.

Citus uses a two-phase commit (2PC) for atomicity and implements distributed deadlock detection for other transactions.

4.7. 1 Single-node transactions

The coordinator sends commit/abort commands from the commit/abort transaction callbacks to the worker node.

The worker node provides the transactional guarantees of a single PostgreSQL server.

4.7.2 Two-phase commit protocol

If a transaction involves multiple worker nodes, the executor opens a transaction block on them and performs a 2PC across them at commit time.

PostgreSQL implements commands to preserve locks and survive restarts and recovery.

When the transaction on the coordinator is about to commit, the pre-commit callback sends a “prepare transaction” over all connections to worker nodes with open transaction blocks.

If all are successful, the coordinator writes a commit record for each prepared transaction to the Citus metadata and the local transaction commits.

If one or more prepared transactions fail to commit or abort, the commit record in the Citus metadata determines the transaction’s outcome.

4.7.3 Distributed deadlocks

Cistus implements distributed deadlock detection, which aborts transactions when there is an actual deadlock.

PostgreSQL already provides deadlock detection on a single node. Citus extends this logic with a background daemon running on the coordinator node.

This daemon detects distributed deadlocks by polling all worker nodes for the edges in their lock graph. After that, it merges all processes in the graph that participate in the same distributed transaction.

If the resulting graph contains a cycle, then a cancellation is sent to the process belonging to the youngest distributed transaction in the cycle to abort the transaction.

4.8 Specialized scaling logic

Citus implements specialized scaling logic for critical PostgreSQL capabilities, including SELECT and DML statements.

DDL commands in PostgreSQL are transactional.

COPY commands append a CSV-formatted stream of data to a table.

INSERT…SELECT between distributed tables use one of 3 strategies:

  1. If the SELECT requires a merge step on the coordinator, the command is internally executed as a distributed SELECT and a COPY into the destination table.
  2. If no merge step exists, but the source and destination tables are not co-located, the INSERT…SELECT performs distributed re-partitioning of the SELECT result before inserting it into the destination table.
  3. If above cases are not matched, the INSERT..SELECT is performed directly and parallel on the co-located shards.

4.9 High Availability and Backups

HA in Citus is handled primarily at the server level using existing PostgreSQL replication.

For HA, each node in the cluster has one or more hot standby nodes and replicates its write-ahead log (WAL) using synchronous, asynchronous, or quorum replication.

If a node fails, the cluster orchestrator promotes a standby and updates the Citus metadata.

Backups are also handled at the server level by creating periodic disk snapshots.

Citus supports periodically creating a consistent restore point. It is a record in each node’s WAL. The restore point is made while blocking writes to the commit records table(s) on the coordinator(s).

I am omitting the benchmark and case study sections, which are facts and figures. If necessary, curious readers can check the original paper.

5. Related Work

Vitess is a sharding solution for MySQL. Vitess scales out MySQL. It is not an extension, so it must be deployed separately from the database servers.

Greenplum and Redshift are PostgreSQL-based data warehouses optimized for handling complex SQL queries that analyze large amounts of data with low concurrency.

Amazon Aurora can scale out the storage for a single PostgreSQL server to demand OLTP workloads and fast recovery.

Spanner, CockroachDB, and Yugabyte have been developed with a focus on serializability for multi-node transactions.

TimescaleDB is a PostgreSQL extension that optimizes PostgreSQL for time series data.

6. Conclusion

Citus is a distributed database engine for PostgreSQL.

It takes care of the scalability challenge for PostgreSQL.

Citus is an extension, so it has long-term compatibility with PostgreSQL.

Citus is designed as a multi-purpose database system that can handle various PostgreSQL workloads.

References

Google Spanner paper post

Amazon Redshift paper post

Amazon Aurora paper post

CockroachDB paper post

Vitess: A database clustering system for horizontal scaling of MySQL

TimescaleDB

Greenplum Database

YugabyteDB

--

--

Hemant Gupta

https://www.linkedin.com/in/hkgupta/ Working on the challenge to create insights for 100 software eng papers. #100PapersChallenge, AVP Engineering at CoinDCX