Insights from paper: ClickHouse — Lightning Fast Analytics for Everyone

Hemant Gupta
13 min readAug 29, 2024

--

1. Abstract

What is ClickHouse?

It is a popular open-source OLAP database.

It is designed for high-performance analytics over petabyte-scale data sets with high ingestion rates. Its storage layer is based on log-structured merge (LSM) trees.

On top of this, ClickHouse uses a novel technique for continuously transforming historical data in the background.

It supports SQL dialect and is powered by a state-of-the-art vectorized query execution engine.

ClickHouse also aggressively prunes irrelevant data.

The benchmarks show that ClickHouse is one of the fastest analytical databases on the market.

2. Introduction

ClickHouse was started in 2009 and open-sourced in 2016.

Initially, it was a filter and aggregation operator for large-scale log files.

The diagram below shows the feature history of ClickHouse.

ClickHouse timeline

ClickHouse was designed to handle the following five challenges:

  1. Massive data sets with high ingestion rates
  2. Many simultaneous queries with an expectation of low latencies
  3. Diverse landscapes of data stores, storage locations, and formats.
  4. A convenient query language with support for performance introspection
  5. Industry-grade robustness and versatile deployment

3. Architecture

ClickHouse engine is split into three main layers:

  1. The query processing layer
  2. The storage layer
  3. Integration layer

In addition to these layers, ClickHouse has an access layer to manage user sessions and communication with applications via different protocols.

ClickHouse is built in C++ as a single binary without any dependencies.

The Query processing layer

This layer parses incoming queries, builds and optimizes logical and physical query plans, and executes them.

ClickHouse uses a vectorized execution model similar to MonetDB/X100.

Queries can be written in a feature-rich SQL dialect, PRQL, or Kusto’s KQL.

The storage layer

This layer consists of different table engines that encapsulate the format and location of table data.

There are three types of table engines:

  1. The MergeTree family of table engines represents the primary persistence format. It is based on the idea of LSM trees. The tables are split into horizontal, sorted parts. A background process continuously merges these parts. Individual MergeTree table engines take different approaches to combining the rows from their input parts inside this family of table engines. For example, rows can be aggregated or replaced if they are outdated.
  2. Special-purpose table engines speed up or distribute query execution. This type of engine includes in-memory key-value table engines called dictionaries. A dictionary caches the result of a query periodically executed against an internal or external data source. Other examples of special-purpose table engines include a pure in-memory engine used for temporary tables and a Distributed table engine for transparent data sharding.
  3. Virtual table engines are designed for bidirectional data exchange with external systems such as relational databases, publish/subscribe systems, or key/value stores. They can also interact with data lakes or files in object storage.

ClickHouse supports sharding and replication of tables across multiple cluster nodes.

Clients can read and write table shards directly. Distributed special table engine provides a global view of all table shards.

A shard can be replicated across multiple nodes for tolerance against node failure.

Each MergeTree* table engine has a corresponding ReplicatedMergeTree* engine.

It uses a multi-master coordination scheme based on the Raft consensus algorithm to guarantee that every shard has a configurable number of replicas. The team has implemented Keeper, a C++ drop-in replacement for Apache Zookeeper.

ClickHouse database engine can be operated in on-premise, cloud, standalone, or in-process modes.

An in-process chDB mode has been developed for interactive data analysis use cases, such as Jupyter notebooks with Pandas data frames. It is inspired by DuckDB.

4. Storage Layer

It is time to go into the Rabbit hole. Let’s understand how MergeTree* table engines work.

4.1 On-Disk Format

Each table in the MergeTree* table engine is organized as a collection of immutable table parts.

Parts are self-contained. They include all metadata required to interpret their content.

A background merge job periodically combines multiple smaller parts into a larger part to keep parts per table low. The default part size is 150 GB.

The table’s primary key columns sort parts. An efficient k-way merge sort is used to merge.

Rows can be inserted in two modes:

Synchronous insert mode: Each INSERT statement creates a new part and appends it to the table. Database clients are encouraged to insert tuples in bulk(e.g., 20,000 rows at once).

Asynchronous insert mode: In this mode, ClickHouse buffers rows from multiple incoming INSERTs into the same table and creates a new part only after the buffer size exceeds a configurable threshold or a timeout expires.

The diagram below shows four synchronous and two asynchronous inserts into a MergeTree*-engine table.

Inserts and merges for MergeTree*-engine tables

ClickHouse treats all parts equally instead of arranging them in a hierarchy as some other LSM trees do. It writes inserts directly to disk, while other LSM-tree-based stores typically use write-ahead logging (WAL).

A part corresponds to a directory on disk. There is one file for each column. The columns of a small part (smaller than 10 MB by default) are stored consecutively in a single file.

The rows of a part are logically divided into groups of 8192 records, called granules.

A granule represents the smallest indivisible data unit processed by the scan and index lookup operators.

Reads and writes of on-disk data are done on the block level. A block combines multiple neighboring granules within a column.

Blocks are compressed before storing on the disk and decompressed on the fly when loaded from the disk into memory.

The tables can be range, hash, or round-robin partitioned using arbitrary partitioning expressions.

4.2 Data Pruning

ClickHouse supports three data pruning techniques:

  1. Users can define a primary key index for a table. The primary key columns determine the sort order. This way, the index is locally clustered. For every part, a mapping from the primary key column values of each granule’s first row to the granule’s ID is also stored. This resulting data structure is sparse. Only 1000 entries are required to index 8.1 million rows.

The diagram above shows a primary key index in the column EventTime for a table. Granules that match the range predicate in the query can be found by binary searching the primary key index.

2. Users can create table projections. Projections are alternative versions of a table that contain the same rows sorted by a different primary key. They allow speed-up queries that filter on different columns.

3. Skipping indices provide a lightweight alternative to projections. It can be created for arbitrary index expressions using configurable granularity. Examples of skipping index types are the Min-max, Set, and Bloom filter indices.

4.3 Merge-time Data Transformation

ClickHouse allows a continuous incremental transformation of existing data using different merge strategies.

  1. Replacing merges retains only the most recently inserted version of a tuple based on the creation timestamp of its containing part.
  2. Aggregating merges collapse rows with equal primary key column values into an aggregated row. Non-primary key columns must be in a partial aggregation state that holds the summary values. Aggregating merges are typically used in materialized views instead of normal tables. ClickHouse does not periodically refresh materialized views with the entire content of the source table. When a new part is inserted into the source table, materialized views are updated incrementally as a result of the transformation query.
A materialized view defined on a table

3. TTL (time-to-live) merges provide aging for historical data. They process only one part at a time and are defined by rules with triggers and actions. Some possible actions are: 1. Move the part to another volume 2. Re-compress the part 3. Delete the part 4. Roll-up.

4.4 Updates and Deletes

MergeTree* table engines are designed for append-only workloads.

There are two approaches for updating or deleting existing data. Both methods don't block parallel inserts.

Mutations rewrite all parts of a table in place. They guarantee that the data is physically changed at the end of the operation.

Clickhouse has lightweight deletes, which only update an internal bitmap column. This indicates whether a row has been deleted or not.

The SELECT queries use an additional filter based on this bitmap column to exclude deleted rows.

4.5 Idempotent Inserts

Let me ask one question: How should clients handle connection timeouts after sending data to the server for insertion into a table?

The simple answer is to resend the data from the client to the server and rely on the primary key or unique constraints to reject duplicate inserts.

However, this solution is prohibitive for large data sets and high ingest rates.

ClickHouse provides a lightweight alternative approach.

It is based on the fact that each insert eventually creates a part.

The server maintains hashes of the N last inserted parts (Say N=100) and ignores re-inserts of parts with a known hash.

Keeper stores hash for non-replicated and replicated tables that are stored locally.

In this way, inserts are idempotent in ClickHouse.

4.6 Data Replication

ClickHouse manages replication based on the notion of table states.

Table states consist of a set of table parts and table metadata.

Nodes advance the state of a table using three operations:

1. Inserts add a new part to the state

2. Merges add a new part and delete existing parts to/from the state

3. Mutations and DDL statements add parts, and/or delete parts, and/or change table metadata

Operations are performed locally on a single node and recorded in a global replication log as a state transition sequence.

The keeper process maintains the replication log using the Raft consensus algorithm. The diagram below shows an example scenario.

Replication in a cluster of three nodes

All cluster nodes initially point to the same position in the replication log. The nodes run local operations, and the replication log is replayed asynchronously on all other nodes.

4.7 ACID Compliance

ClickHouse uses snapshot isolation, which is implemented as an MVCC variant.

Queries are executed against a snapshot of all parts in all involved tables. It ensures that new parts inserted by parallel INSERTs or merges do not participate in the execution.

The reference count of the processed parts is incremented for the duration of the query to prevent parts from being modified or removed simultaneously.

So, it is clear that statements are generally not ACID-compliant.

5. Query Processing Layer

ClickHouse parallelizes queries at the level of data elements, data chunks, and table shards. It is shown in the diagram below:

Parallelization across SIMD units, cores, and nodes

Multiple data elements can be processed within operators at once using SIMD instructions.

The query engine executes operators simultaneously in various threads on a single node. Multiple nodes can scan the shards if a source table is split into disjoint table shards.

5.1 SIMD Parallelization

Vectorization is either manually written or auto-generated by the compiler.

First, the code is compiled into different compute kernels to benefit from vectorization.

The kernel can be a non-vectorized Kernel, an auto-vectorized AVX2 kernel, or a manually vectorized AVX-512 kernel.

ClickHouse selects the fastest kernel based on the runtime for the cpuid instruction. The ClickHouse team has an excellent blog about CPU Dispatch.

5.2 Multi-Core Parallelization

Like most other systems, SQL queries are transformed into a directed graph of physical plan operators.

Special source operators represent the input of the operator plan.

The physical operator plan is unfolded at query compilation time into independent execution lanes.

These lanes decompose the data to be processed by parallel operators into non-overlapping ranges.

In the diagram below, Node 1 shows the operator graph of a typical OLAP query against a table.

Query for the diagram:

SELECT RegionID, avg(Latency) AS AvgLatency
FROM hits
WHERE URL = 'https://clickhouse.com'
GROUP BY RegionID
ORDER BY AvgLatency DESC
LIMIT 3

In the first stage, three disjoint ranges of the source table are filtered simultaneously.

A Repartition exchange operator dynamically routes result chunks between the first and second stages to keep the processing threads evenly utilized.

In the second stage, the rows that survived the filter are grouped by RegionID.

The third stage starts once the aggregation result has been fully computed. The result groups are first divided by a Distribute exchange operator into three equally large disjoint partitions, then sorted by AvgLatency.

Sorting is performed in three steps:

  1. ChunkSort operators sort the individual chunks of each partition.
  2. StreamSort operators maintain a local sorted result combined with incoming sorted chunks using 2-way merge sorting.
  3. MergeSort operator combines the local results using k-way sorting to obtain the final result.

5.3 Multi-Node Parallelization

If a query’s source table is sharded, the query optimizer on the node that receives the query tries to perform as much work as possible on other nodes.

Results from other nodes can be integrated into different points of the query plan.

Remote nodes may either:

1. Stream raw source table columns to the initiator node

2. Filter the source columns and send the surviving rows

3. Execute filter and aggregation steps and send local result groups with partial aggregation states

4. Run the entire query, including filters, aggregation, and sorting.

The previous diagram has nodes 2…N, which hold shards of the hits table. They execute plan fragments.

5.4 Holistic Performance Optimization

Query optimization

The first set of optimizations is applied to a semantic query representation obtained from the query’s AST.

The optimized semantic query representation is subsequently transformed into a logical operator plan.

The logical plan is optimized for filter pushdown, reordering function evaluation, and sorting steps.

Finally, the logical query plan is transformed into a physical operator plan.

Query compilation

ClickHouse uses query compilation based on LLVM to fuse adjacent plan operators dynamically.

ClickHouse also employs compilation to evaluate multiple aggregation functions simultaneously and for sorting with more than one sort key.

Primary key index evaluation

ClickHouse evaluates WHERE conditions using the primary key index.

The primary key index is analyzed left-to-right on lexicographically sorted ranges of key values.

Filter clauses corresponding to a primary key column are evaluated using ternary logic.

Data skipping

ClickHouse tries to avoid data reads at query runtime, as we saw in the Data Pruning section.

Only data chunks that contain at least one matching row are passed to the following predicate.

This gradually decreases the amount of read data and the number of computations performed from predicate to predicate.

Hash tables

ClickHouse instantiates various hash tables from a generic template, with the hash function, allocator, cell type, and resize policy as variation points.

After selecting the fastest hash table, the following optimizations may be implemented for hash tables:

  • A two-level layout with 256 sub-tables to support huge key sets
  • String hash tables with four sub-tables and different hash functions for different string lengths
  • Lookup tables that use the key directly as bucket index
  • Values with embedded hashes for faster collision resolution when comparison is expensive
  • Creation of hash tables based on predicted sizes from runtime statistics to avoid unnecessary resizes
  • Allocation of multiple small hash tables with the same creation/destruction lifecycle on a single memory slab
  • Instant clearing of hash tables for reuse using per-hash-map and per-cell version counters

Joins

ClickHouse supports all join types that are available in SQL. It also supports different join algorithms.

It provides parallel variants of the classic join algorithms.

5.5 Workload Isolation

ClickHouse provides concurrency control, memory usage limits, and I/O scheduling so that users can isolate queries into different workload classes.

ClickHouse tracks the byte sizes of memory allocations at the server, user, and query levels, allowing flexible memory usage limits.

6. Integration Layer

There are two approaches to making external data available in an OLAP database.

Push-based model

A third-party component bridges the database with external data stores. For example, ETL tools can push remote data to the destination system.

Pull-based model

The database connects to remote data sources and pulls data for querying into local tables or exports data to remote systems.

We will be discussing pull-based data integration methods in ClickHouse.

External Connectivity: ClickHouse provides 50+ integration table functions and engines for connectivity with external systems and storage locations.

Data Formats: ClickHouse supports 90+ formats, including CSV, JSON, Parquet, Avro, ORC, Arrow, and Protobuf.

8. Related Work

Early systems, such as Sybase IQ, Teradata, Vertica, and Greenplum, were characterized by expensive batch ETL jobs and limited elasticity due to their on-premise nature.

In early 2010, Snowflake, BigQuery, and Redshift dramatically reduced the cost and complexity of analytics for organizations while benefiting from the cloud’s high availability and automatic resource scaling.

Analytical execution kernels Photon and Velox offer commodified data processing in different analytical, streaming, and machine-learning applications.

Regarding goals and design principles, the most similar databases to ClickHouse are Apache Druid and Apache Pinot.

DuckDB is meant to be embedded by a host process but provides query optimization and transactions.

9. Conclusion

ClickHouse is an open-source, high-performance OLAP database.

It has a write-optimized storage layer and a state-of-the-art vectorized query engine.

It merges and transforms the data asynchronously in the background to support high ingestion rates.

ClickHouse efficiently decouples data maintenance and parallel inserts.

It has a heavily optimized query processing layer.

It provides seamless integration for multiple data management systems and data formats.

References

Snowflake paper post

Amazon Redshift paper post

Databricks Photon paper post

Meta Velox paper post

Apache Druid paper post

Apache Pinot paper post

ClickHouse Evolution

ClickHouse GitHub

MonetDB/X100: Hyper-Pipelining Query Execution

PRQL

Kusto Query Language

The Log-Structured Merge-Tree (LSM-Tree)

ClickHouse Keeper

chDB — an embedded OLAP SQL Engine

Sybase IQ Multiplex — Designed For Analytics

Teradata Database

--

--

Hemant Gupta

https://www.linkedin.com/in/hkgupta/ Working on the challenge to create insights for 100 software eng papers. #100PapersChallenge, AVP Engineering at CoinDCX