Insights from Paper: Apache Flink: Stream and Batch Processing in a Single Engine

Hemant Gupta
10 min readAug 4, 2024

--

1. Introduction

Typically, there are two kinds of data processing: stream processing (event processing) and batch processing. These are programmed using two different programming models and APIs and executed by different systems.

Apache Flink follows a paradigm that embraces the above processing using only one unifying model. This approach will work for real-time analysis, continuous streams, and batch processing on both the programming model and execution engine sides.

Flink’s stream processing programs do not distinguish between processing the latest events in real-time and processing terabytes of historical data. In the same operation, Flink programs can compute both early and approximate results and delayed and accurate results.

Flink supports different notions of time (event time, ingestion time, processing time) to give programmers great flexibility in defining how events should be correlated.

Batch programs are special cases of streaming programs in which the stream is finite, and the order and time of records do not matter.

Flink also has a specialized API for processing static data sets, uses specialized data structures and algorithms for the batch versions of operators like join or grouping, and uses dedicated scheduling strategies.

The contributions of the paper:

  1. The paper argues for a unified stream and batch data processing architecture.
  2. The paper shows how streaming, batch, iterative, and interactive analytics can be represented as fault-tolerant streaming dataflow.
  3. The paper discusses how we can build a full-fledged stream analytics system with a flexible windowing mechanism and a full-fledged batch processor on top of these dataflows.

2. System Architecture

The Flink’s architecture has four main layers: deployment, core, APIs, and libraries.

The Flink software stack.

Flink’s Runtime and APIs

The core of Flink is the distributed dataflow engine, which executes dataflow programs. A Flink runtime program is a DAG of stateful operators connected with data streams.

There are two core APIs in Flink: the DataSet API for processing finite data sets (batch processing) and the DataStream API for processing potentially unbounded data streams (stream processing).

Flink’s core runtime engine can be seen as a streaming dataflow engine, and both the DataSet and DataStream APIs create runtime programs that the engine can execute.

Flink bundles domain-specific libraries and APIs that generate DataSet and DataStream API programs. FlinkML is used for machine learning, Gelly for graph processing, and Table for SQL-like operations.

The Flink process model

Flink cluster comprises three processes: the client, the Job Manager, and at least one Task Manager.

The client takes the program code, transforms it into a dataflow graph, and submits that to the JobManager.

The JobManager coordinates the distributed execution of the dataflow. It tracks each operator’s and stream’s state and progress, schedules new operators, and coordinates checkpoints and recovery.

In a high-availability setup, the JobManager persists a minimal set of metadata at each checkpoint to a fault-tolerant storage, such that a standby JobManager can reconstruct the checkpoint and recover the dataflow execution from there.

The actual data processing takes place in the Task Managers. A Task Manager executes one or more operators that produce streams and report their status to the Job Manager.

The TaskManagers maintain the buffer pools to buffer or materialize the streams and the network connections to exchange the data streams between operators.

3. The common Fabric: Streaming Dataflow

Users can write Flink programs using different APIs. All Flink programs eventually compile down to a common representation: the dataflow graph. Flink’s runtime engine executes the dataflow graph.

3.1 Dataflow Graphs

The dataflow graph shown in the diagram below is a directed acyclic graph (DAG) that consists of (i) stateful operators and (ii) data streams that represent data produced by an operator and are available for consumption by operators.

A simple dataflow graph

Dataflow graphs are executed in a data-parallel fashion. Operators are parallelized into one or more parallel instances called subtasks, and streams are split into one or more stream partitions (one partition per producing subtask).

Streams distribute data between producing and consuming operators in various patterns, such as point-to-point, broadcast, re-partition, fan-out, and merge.

3.2 Data Exchange

Flink’s intermediate data streams are the core abstraction for operator data exchange. An intermediate data stream represents a logical handle to the data produced by an operator and consumed by one or more operators.

Pipelined Exchange — Flink uses pipelined streams to avoid materialization in continuous streaming programs and many parts of batch dataflows.

Blocked Exchange — Blocking streams are applied to bounded data streams. A blocking stream buffers all of the producing operator’s data before making it available for consumption, separating the producing and consuming operators into different execution stages.

Balancing latency and throughput — The diagram below shows the effect of buffer timeouts on the throughput and latency of delivering records in a simple streaming grep job on 30 machines (120 cores).

Flink can achieve an observable 99th-percentile latency of 20 ms. The corresponding throughput is 1.5 million events per second.

At a buffer timeout of 50 ms, the cluster reaches a throughput of more than 80 million events per second with a 99th-percentile latency of 50 ms.

Control Events

Apart from exchanging data, streams in Flink communicate different types of control events. These are special events injected into the data stream by operators and are delivered in order along with all other data records and events within a stream partition.

The receiving operators react to these events by performing certain actions upon arrival. Flink uses many special types of control events, including checkpoint barriers, watermarks, and iteration barriers.

Fault-Tolerance

Flink offers reliable execution with strict exactly-once-processing consistency guarantees and deals with failures via checkpointing and partial re-execution.

The checkpointing mechanism of Apache Flink builds on the notion of distributed consistent snapshots to achieve exact-once-processing guarantees.

To limit recovery time, Flink regularly snapshots the state of operators, including the current position of the input streams.

The core challenge lies in taking a consistent snapshot of all parallel operators without halting the execution of the topology. In essence, the snapshot of all operators should refer to the same logical time in the computation.

The mechanism used in Flink is called Asynchronous Barrier Snapshotting. Barriers are control records injected into the input streams that correspond to a logical time and logically separate the stream from the part whose effects will be included in the current snapshot and the part that will be snapshotted later.

An operator receives barriers from upstream and first performs an alignment phase, ensuring that all inputs’ barriers have been received. Then, the operator writes its state to durable storage.

The operator forwards the barrier downstream once the state has been backed up. Eventually, all operators will register a snapshot of their state, and a global snapshot will be completed.

Recovery from failures reverts all operator states to their respective states taken from the last successful snapshot. It restarts the input streams starting from the latest barrier for which there is a snapshot.

Partial recovery of a failed subtask is possible by replaying unprocessed records buffered at the immediate upstream subtasks.

Asynchronous Barrier Snapshotting

Iterative Dataflow

Incremental processing and iterations are crucial for graph processing and machine learning applications.

Support for iterations in data-parallel processing platforms typically relies on submitting a new job for each iteration or adding additional nodes to a running DAG or feedback edges.

In Flink, iterations are implemented as iteration steps with special operators that can contain an execution graph, as shown in the diagram below.

The iteration model of Apache Flink

To maintain the DAG-based runtime and scheduler, Flink allows for iteration “head” and “tail” tasks that are implicitly connected with feedback edges.

The role of these tasks is to establish an active feedback channel to the iteration step and provide coordination for processing data records in transit within this feedback channel.

4. Stream Analytics on top of dataflow

Flink’s DataStream API implements a full-stream analytics framework.

These APIs include mechanisms to manage time, such as out-of-order event processing, defining windows, and maintaining and updating user-defined states.

The APIs are based on the notion of a DataStream, a (possibly unbounded) immutable collection of elements of a given type.

Flink’s runtime already supports pipelined data transfers, continuous stateful operators, and a fault-tolerance mechanism for consistent state updates. Overlaying a stream processor on top of it essentially involves implementing a windowing system and a state interface.

Notion of Time

Flink distinguishes between two notions of time:

i) event-time

ii) processing-time

The system regularly inserts special events called low watermarks that mark a global progress measure. The watermarks aid the execution engine in processing events in the correct event order and serializing operations.

Watermarks originate at the sources of a topology, where we can determine the time inherent in future elements. The watermarks propagate from the sources throughout the other data flow operators.

Operators decide how they react to watermarks. Simple operations, such as maps or filters, just forward the watermarks they receive, while more complex operators do calculations based on watermarks.

Stateful Stream Processing

While most of Flink’s DataStream API operators look functional and side-effect-free, they support efficient stateful computations. Depending on the use case, there are many different types of states.

For example, the state can be as simple as a counter or a sum or more complex, such as a classification tree or a large sparse matrix.

Stream windows are stateful operators that assign records to continuously updated buckets kept in memory as part of the operator state.

In Flink, the state is made explicit and is incorporated into the API by providing the following:

i) operator interfaces or annotations

ii) an operator-state abstraction

Users can also configure how the state is stored and checkpointed using the StateBackend abstractions provided by the system.

Flink’s checkpointing mechanism guarantees that any registered state is durable with exactly once update semantics.

Stream windows

Windows are continuously evolving logical views. Any incremental computations over unbounded streams are evaluated over windows.

Apache Flink incorporates windowing within a stateful operator that is configured via a flexible declaration composed of three core functions:

  1. The assigner is responsible for assigning each record to logical windows.
  2. An optional trigger defines when the operation associated with the window definition is performed.
  3. An optional evictor determines which records to retain within each window.

Flink’s window assignment process can cover all known window types, such as periodic time-and-count windows, punctuation, landmark, session, and delta windows.

Let’s take an example. In the code below, a sliding window is implemented.

stream
.window(SlidingTimeWindows.of(Time.of(6, SECONDS), Time.of(2, SECONDS))
.trigger(EventTimeTrigger.create())

The window has a range of 6 seconds and slides every 2 seconds (the assigner). The window results are computed once the watermark passes the end of the window (the trigger).

5. Batch Analytics on top of dataflow

A bounded data set is a special case of an unbounded data stream.

A streaming program inserting all input data in a window can form a batch program.

Flink approaches batch processing as follows:

  • The same runtime executes batch computations as streaming computations.
  • Periodic snapshotting is turned off when its overhead is high.
  • Blocking operators are simply operator implementations that happen to block until they have consumed their entire input. The runtime is not aware of whether an operator is stopping or not.
  • A dedicated DataSet API provides familiar abstractions for batch computations, namely a bounded fault-tolerant DataSet data structure and transformations on DataSets (e.g., joins, aggregations, iterations).
  • A query optimization layer transforms a DataSet program into an efficient executable.

Memory Management

Flink serializes data into memory segments instead of allocating objects in the JVMs heap to represent buffered in-flight data records.

Operations such as sorting and joining operate as much as possible on the binary data directly, keeping the serialization and deserialization overhead at a minimum and partially spilling data to disk when needed.

To handle arbitrary objects, Flink uses type inference and custom serialization mechanisms.

6. Related Work

Apache Hadoop is one of the most popular open-source systems for batch processing. It is based on the MapReduce.

Dryad introduced embedded user-defined functions in general DAG-based dataflows. SCOPE enhanced this idea further.

Apache Tez can be seen as an open-source implementation of the ideas proposed in Dryad.

Apache Drill and Impala restrict their API to SQL variants only.

Apache Spark is a data-processing framework that implements a DAG-based execution engine.

In-stream Processing, many alternatives like SEEP, Naiad, Microsoft StreamInsight, and IBM Streams exist.

Recent approaches, like Apache Storm and Samza, enable horizontal scalability and compositional dataflow operators with weaker state consistency guarantees.

Out-of-order processing (OOP) has recently become famous and is used in Google MillWheel.

7. Conclusion

Apache Flink implements a universal dataflow engine to perform stream and batch analytics.

Flink’s dataflow engine treats operator state and logical intermediate results as first-class citizens. It is used by both the batch and data stream APIs with different parameters.

The streaming API, built on top of Flink’s streaming dataflow engine, provides the means to keep recoverable state and partition, transform, and aggregate data stream windows.

Batch computations are a special case of streaming computations. Flink treats them especially by optimizing their execution using a query optimizer and by implementing blocking operators that gracefully spill to the disk without memory.

References

Apache Spark paper post

Google MillWheel paper post

Apache Storm paper post

Dryad: distributed data-parallel programs from sequential building blocks

Incorporating Partitioning and Parallel Plans into the SCOPE Optimizer

Apache Tez: A unifying framework for modeling and building data processing applications

Apache Drill: Interactive Ad-Hoc Analysis at Scale

Apache Impala: A Modern, Open-Source SQL Engine for Hadoop

Samza: stateful scalable stream processing at LinkedIn

Trill: a high-performance incremental query processor for diverse analytics

Out-of-order processing: a new architecture for high-performance stream systems

--

--

Hemant Gupta
Hemant Gupta

Written by Hemant Gupta

https://www.linkedin.com/in/hkgupta/ Working on the challenge to create insights for 100 software eng papers. #100PapersChallenge, AVP Engineering at CoinDCX

No responses yet