DevOps

Apache Flink Stream Processing with Java: Real-Time ETL, Windows, Stateful Processing & Production Patterns 2026

Apache Flink is the industry standard for stateful, fault-tolerant stream processing at scale. This comprehensive guide covers everything from the DataStream API fundamentals to advanced windowing strategies, RocksDB-backed stateful computations, exactly-once semantics, Kafka integration, and production-grade Kubernetes deployment — giving you the complete toolkit to build high-throughput, low-latency streaming pipelines in Java.

Md Sanwar Hossain
April 11, 2026
23 min read
Stream Processing Apache Flink Java
Apache Flink stream processing with Java for real-time ETL and stateful computation

TL;DR

Use Apache Flink for stateful stream processing requiring exactly-once guarantees, event-time windowing, or complex CEP patterns. Flink's RocksDB state backend scales to terabytes. Use Kafka as source/sink. Deploy on Kubernetes with Flink Operator. Always configure checkpointing for fault tolerance.

Apache Flink architecture diagram showing sources, JobManager, TaskManagers, state backend, and sinks
Apache Flink architecture: sources through JobManager and TaskManagers to state backend and sinks

3. DataStream API: Sources, Transformations & Sinks

The DataStream API is Flink's core API for processing unbounded streams of data. It provides a rich set of transformation operators that compose into a dataflow DAG executed by the Flink runtime.

Maven Dependencies (Flink 1.19)

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.19.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>3.1.0-1.18</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.19.0</version>
    </dependency>
</dependencies>

Complete Word Count Streaming Example

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;

public class WordCountStream {

    public static void main(String[] args) throws Exception {
        // 1. Get execution environment
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // 2. Configure Kafka source
        KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers("kafka:9092")
            .setTopics("input-topic")
            .setGroupId("flink-wordcount-group")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();

        // 3. Build the DataStream pipeline
        DataStream<String> lines = env.fromSource(
            source,
            WatermarkStrategy.noWatermarks(),
            "Kafka Source"
        );

        DataStream<Tuple2<String, Integer>> wordCounts = lines
            .flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
                for (String word : line.split("\\s+")) {
                    if (!word.isEmpty()) out.collect(Tuple2.of(word.toLowerCase(), 1));
                }
            })
            .returns(Types.TUPLE(Types.STRING, Types.INT))
            .keyBy(t -> t.f0)
            .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
            .sum(1);

        // 4. Configure Kafka sink
        KafkaSink<String> sink = KafkaSink.<String>builder()
            .setBootstrapServers("kafka:9092")
            .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                .setTopic("output-topic")
                .setValueSerializationSchema(new SimpleStringSchema())
                .build())
            .build();

        wordCounts
            .map(t -> t.f0 + ":" + t.f1)
            .sinkTo(sink);

        // 5. Execute
        env.execute("Word Count Stream Job");
    }
}

Core Transformation Operators

  • map(MapFunction) — one-to-one transformation, same number of output records as input.
  • flatMap(FlatMapFunction) — one-to-many; emit zero or more output records per input.
  • filter(FilterFunction) — retain only records satisfying a predicate.
  • keyBy(KeySelector) — partition stream by key; required before keyed windows and keyed state.
  • process(ProcessFunction) — low-level operator with access to timers, state, and side outputs.
  • union(DataStream...) — merge multiple streams of the same type.
  • connect(DataStream) — connect two streams of different types for co-processing.

4. Event Time vs Processing Time: Watermarks & Late Events

One of Flink's most powerful and nuanced features is its support for three distinct notions of time. Choosing the right time characteristic is critical for correctness in streaming applications.

Three Time Notions

  • Event Time — the timestamp embedded in the event data itself (e.g., when a user clicked a button). Provides correct results regardless of out-of-order arrival or network delays. The right choice for analytics and billing.
  • Ingestion Time — the wall-clock time when the event enters the Flink source operator. Simpler than event time but cannot handle reprocessing correctly.
  • Processing Time — the current wall-clock time on the TaskManager processing the event. Fastest and simplest, but produces non-deterministic results during replay.

Watermarks: Progress Markers for Event Time

Watermarks are special records injected into the stream to signal "we've seen all events with timestamp ≤ W". They allow Flink to advance event-time windows and emit results. The trade-off: a larger out-of-orderness tolerance improves completeness but increases latency.

// Bounded out-of-orderness watermark (handles events up to 5s late)
WatermarkStrategy<UserEvent> strategy = WatermarkStrategy
    .<UserEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner(
        (event, recordTimestamp) -> event.getEventTimestampMs()
    );

// Monotonous timestamps (no late events expected)
WatermarkStrategy<UserEvent> monoStrategy = WatermarkStrategy
    .<UserEvent>forMonotonousTimestamps()
    .withTimestampAssigner(
        (event, recordTimestamp) -> event.getEventTimestampMs()
    );

DataStream<UserEvent> events = env.fromSource(
    kafkaSource, strategy, "Kafka User Events"
);

Handling Late Events

Even with a generous watermark strategy, some events arrive after the window has been triggered and closed. Flink provides two mechanisms:

// Define late-data side output tag
OutputTag<UserEvent> lateTag = new OutputTag<UserEvent>("late-events") {};

SingleOutputStreamOperator<PageViewCount> result = events
    .keyBy(UserEvent::getPageId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(2))          // keep window state 2 extra minutes
    .sideOutputLateData(lateTag)               // late events beyond allowed lateness
    .aggregate(new PageViewAggregator());

// Main output: on-time results (+ late updates within allowedLateness)
result.print("on-time");

// Side output: events too late even for allowedLateness
DataStream<UserEvent> lateEvents = result.getSideOutput(lateTag);
lateEvents.sinkTo(deadLetterSink);

Idle Partitions Problem

When a Kafka partition receives no data, its watermark stalls and blocks downstream window progress. Fix with:

WatermarkStrategy.<UserEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withIdleness(Duration.ofMinutes(1)); // treat idle source as not blocking watermark

5. Windowing Strategies: Tumbling, Sliding & Session Windows

Windows are the primary mechanism for grouping infinite streams into finite, processable chunks. Flink supports three fundamental window types — each with event-time and processing-time variants — plus composable triggers and evictors for advanced use cases.

Tumbling Windows (Non-Overlapping Fixed Windows)

Each event belongs to exactly one window. The classic choice for per-minute/per-hour aggregations where you don't need overlap.

// Count page views per page per 1-minute tumbling window
DataStream<PageViewCount> tumblingResult = events
    .keyBy(UserEvent::getPageId)
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .aggregate(new PageViewCountAggregator(), new PageViewWindowFunction());

// AggregateFunction (incremental, memory-efficient)
public class PageViewCountAggregator
    implements AggregateFunction<UserEvent, Long, Long> {
    @Override public Long createAccumulator() { return 0L; }
    @Override public Long add(UserEvent v, Long acc) { return acc + 1; }
    @Override public Long getResult(Long acc) { return acc; }
    @Override public Long merge(Long a, Long b) { return a + b; }
}

Sliding Windows (Overlapping Windows)

Each event may belong to multiple windows. A sliding window of size 5 minutes sliding every 1 minute keeps 5 overlapping windows in memory simultaneously. Good for continuously updating moving-average metrics.

// 5-minute window sliding every 1 minute
DataStream<MetricSnapshot> slidingResult = metricsStream
    .keyBy(Metric::getServiceId)
    .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1)))
    .process(new SlidingWindowMetricProcessor());

// ProcessWindowFunction gives access to window context and state
public class SlidingWindowMetricProcessor
    extends ProcessWindowFunction<Metric, MetricSnapshot, String, TimeWindow> {
    @Override
    public void process(String key,
                        Context ctx,
                        Iterable<Metric> elements,
                        Collector<MetricSnapshot> out) {
        long count = StreamSupport.stream(elements.spliterator(), false).count();
        double avg = StreamSupport.stream(elements.spliterator(), false)
            .mapToDouble(Metric::getValue).average().orElse(0.0);
        out.collect(new MetricSnapshot(key, ctx.window().getStart(),
                                       ctx.window().getEnd(), count, avg));
    }
}

Session Windows (Activity-Based)

Session windows group events separated by a period of inactivity. No fixed size — they grow as long as events keep arriving within the gap. Perfect for user session analysis.

// Session window: gap of 30 minutes inactivity closes the session
DataStream<UserSession> sessionResult = clickStream
    .keyBy(ClickEvent::getUserId)
    .window(EventTimeSessionWindows.withGap(Time.minutes(30)))
    .aggregate(new SessionAggregator(), new SessionWindowFunction());

Custom Triggers and Evictors

  • CountTrigger — fire window when N elements accumulate, regardless of time.
  • PurgingTrigger — wraps another trigger and purges (clears) window state after each firing.
  • Evictor — pre/post-processing hook to remove elements from a window before or after the window function is applied.
// Fire every 1000 events, then purge state
stream.keyBy(Event::getId)
    .window(GlobalWindows.create())
    .trigger(PurgingTrigger.of(CountTrigger.of(1000)))
    .aggregate(new CountAgg());

6. Stateful Processing: Keyed State & RocksDB Backend

State is what elevates Flink beyond simple ETL. Keyed state allows individual operators to maintain per-key storage that persists across events, enabling use cases like fraud detection, session management, and running aggregations.

Keyed State Types

public class FraudDetectionFunction
    extends KeyedProcessFunction<String, Transaction, Alert> {

    // ValueState: single value per key
    private ValueState<Long> lastTransactionTime;
    // MapState: key-value map per stream-key
    private MapState<String, Long> transactionsByMerchant;
    // ListState: accumulate events
    private ListState<Transaction> recentTransactions;

    @Override
    public void open(Configuration params) throws Exception {
        lastTransactionTime = getRuntimeContext().getState(
            new ValueStateDescriptor<>("last-tx-time", Long.class));

        transactionsByMerchant = getRuntimeContext().getMapState(
            new MapStateDescriptor<>("merchant-counts",
                TypeInformation.of(String.class),
                TypeInformation.of(Long.class)));

        recentTransactions = getRuntimeContext().getListState(
            new ListStateDescriptor<>("recent-txns",
                TypeInformation.of(Transaction.class)));
    }

    @Override
    public void processElement(Transaction tx, Context ctx,
                                Collector<Alert> out) throws Exception {
        Long lastTime = lastTransactionTime.value();

        if (lastTime != null) {
            long gap = tx.getTimestamp() - lastTime;
            // Rapid successive transactions — potential fraud
            if (gap < 5000 && tx.getAmount() > 1000) {
                out.collect(new Alert(tx.getUserId(),
                    "Rapid high-value transactions detected", tx));
            }
        }

        // Update state
        lastTransactionTime.update(tx.getTimestamp());

        long merchantCount = transactionsByMerchant
            .getOrDefault(tx.getMerchantId(), 0L);
        transactionsByMerchant.put(tx.getMerchantId(), merchantCount + 1);

        recentTransactions.add(tx);

        // Register event-time timer for cleanup
        ctx.timerService().registerEventTimeTimer(
            tx.getTimestamp() + Duration.ofHours(1).toMillis());
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx,
                        Collector<Alert> out) throws Exception {
        // Clean up old state
        recentTransactions.clear();
        transactionsByMerchant.clear();
    }
}

Choosing a State Backend

  • HashMapStateBackend — state lives on the JVM heap. Fastest for small state. Use in development or when total state < a few GB per TaskManager. Checkpoint serializes to distributed storage.
  • EmbeddedRocksDBStateBackend — state lives in a local RocksDB instance (off-heap, on disk). Can handle terabytes of state per TaskManager. Slightly higher read latency due to serialization. The only choice for production pipelines with large state.
// Configure RocksDB state backend
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // true = incremental checkpoints
env.getCheckpointConfig().setCheckpointStorage("s3://my-bucket/flink-checkpoints/");

// RocksDB memory tuning in flink-conf.yaml
// state.backend.rocksdb.memory.managed: true
// state.backend.rocksdb.memory.fixed-per-slot: 512mb
// state.backend: rocksdb
// state.backend.incremental: true

State TTL Configuration

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.hours(24))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .cleanupFullSnapshot()       // clean on checkpoint
    .cleanupIncrementally(1000, true) // clean 1000 entries per access
    .build();

ValueStateDescriptor<UserProfile> descriptor =
    new ValueStateDescriptor<>("user-profile", UserProfile.class);
descriptor.enableTimeToLive(ttlConfig);

ValueState<UserProfile> profileState =
    getRuntimeContext().getState(descriptor);

Large State Best Practices

  • Prefer MapState<K, V> over ValueState<Map<K, V>> — Flink can serialize individual entries lazily.
  • Avoid putting large objects in ListState without TTL — state grows unboundedly.
  • Use @TypeInfo annotations or custom TypeSerializer to avoid slow Kryo serialization for complex types.
  • Enable RocksDB compaction statistics and monitor via Prometheus metrics.

7. Exactly-Once Semantics: Checkpointing & Two-Phase Commit

Exactly-once processing is Flink's killer feature for data-critical applications. It means each record affects the application state and outputs exactly once — even in the presence of failures, restarts, and network partitions.

Checkpointing Configuration

StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing every 60 seconds
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);

CheckpointConfig config = env.getCheckpointConfig();
// Maximum time a checkpoint may take before being discarded
config.setCheckpointTimeout(120_000);
// Only one checkpoint at a time
config.setMaxConcurrentCheckpoints(1);
// Minimum time between end of last checkpoint and start of next
config.setMinPauseBetweenCheckpoints(30_000);
// Keep last 3 completed checkpoints for manual recovery
config.setExternalizedCheckpointCleanup(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setTolerableCheckpointFailureNumber(2);

// Configure checkpoint storage (S3)
config.setCheckpointStorage("s3://my-flink-bucket/checkpoints/job-name/");

How Checkpoint Barriers Work

Flink implements the Chandy-Lamport algorithm for distributed snapshots:

  1. The JobManager triggers a checkpoint by injecting a special barrier record into each source partition.
  2. Barriers flow downstream through the dataflow graph. When an operator receives barriers from all its input channels, it takes a snapshot of its state.
  3. Once all operators have acknowledged their state snapshot to the JobManager, the checkpoint is complete and durable.
  4. On failure, Flink restores all operator state from the last completed checkpoint and replays events from that point.

Savepoints vs Checkpoints

AspectCheckpointSavepoint
PurposeAutomatic fault toleranceManual operational snapshot
Triggered byFlink runtime automaticallyOperator (CLI / REST API)
LifecycleAuto-deleted on job successRetained until manually deleted
Use caseCrash recoveryUpgrades, A/B testing, scaling
FormatOptimized internal formatPortable canonical format

Two-Phase Commit for External Sinks

To achieve end-to-end exactly-once with external systems (e.g., Kafka, databases), Flink uses a two-phase commit (2PC) protocol implemented via TwoPhaseCommitSinkFunction. The sink pre-commits during each checkpoint and commits only when the checkpoint completes.

// Kafka exactly-once sink configuration
KafkaSink<String> exactlyOnceSink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("output-topic")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build())
    .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-txn-myapp")
    .setKafkaProducerConfig(producerProperties) // transaction.timeout.ms=120000
    .build();

Restart Strategy

// Fixed delay: retry 3 times with 10 second delay
env.setRestartStrategy(
    RestartStrategies.fixedDelayRestart(3, Time.seconds(10)));

// Exponential delay: more sophisticated backoff
env.setRestartStrategy(
    RestartStrategies.exponentialDelayRestart(
        Time.seconds(1),      // initial delay
        Time.minutes(5),      // max delay
        2.0,                  // multiplier
        Time.minutes(10),     // reset interval
        0.1                   // jitter factor
    ));
Flink checkpointing mechanism showing barrier injection, state snapshots, and two-phase commit for exactly-once processing
Flink checkpointing mechanism: barrier injection, state snapshots, and two-phase commit for exactly-once processing with Kafka

8. Flink-Kafka Integration: Source, Sink & Offset Management

Kafka is the most common data source and sink for production Flink pipelines. The flink-connector-kafka library provides first-class, transactionally-aware integration with full exactly-once semantics when combined with Flink checkpointing.

KafkaSource Configuration

// Full KafkaSource with custom deserializer
KafkaSource<UserEvent> kafkaSource = KafkaSource.<UserEvent>builder()
    .setBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
    .setTopics("user-events")
    // Or pattern-based topic discovery:
    // .setTopicPattern(Pattern.compile("user-events-.*"))
    .setGroupId("flink-consumer-group")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(
        OffsetsInitializer.earliest())) // fallback to earliest if no committed offset
    .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(
        new JsonDeserializationSchema<>(UserEvent.class)))
    .setProperty("partition.discovery.interval.ms", "30000") // auto-discover new partitions
    .build();

Custom Deserialization Schema

public class UserEventDeserializationSchema
    implements KafkaRecordDeserializationSchema<UserEvent> {

    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void deserialize(ConsumerRecord<byte[], byte[]> record,
                             Collector<UserEvent> out) throws IOException {
        UserEvent event = MAPPER.readValue(record.value(), UserEvent.class);
        // Enrich with Kafka metadata
        event.setKafkaOffset(record.offset());
        event.setKafkaPartition(record.partition());
        out.collect(event);
    }

    @Override
    public TypeInformation<UserEvent> getProducedType() {
        return TypeInformation.of(UserEvent.class);
    }
}

KafkaSink Configuration

Properties producerProps = new Properties();
producerProps.setProperty("transaction.timeout.ms", "120000");
producerProps.setProperty("acks", "all");
producerProps.setProperty("compression.type", "lz4");

KafkaSink<ProcessedEvent> kafkaSink = KafkaSink.<ProcessedEvent>builder()
    .setBootstrapServers("kafka-broker-1:9092,kafka-broker-2:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("processed-events")
        .setKeySerializationSchema(event ->
            event.getUserId().getBytes(StandardCharsets.UTF_8))
        .setValueSerializationSchema(new JsonSerializationSchema<>())
        .build())
    .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-txn-processed")
    .setKafkaProducerConfig(producerProps)
    .build();

Offset Management with Exactly-Once

When using DeliveryGuarantee.EXACTLY_ONCE, Flink manages Kafka consumer offsets as part of the checkpoint state — not via Kafka's consumer group commit protocol. This is important to understand:

  • Offsets are stored in Flink's checkpoint, not in Kafka's __consumer_offsets topic.
  • The consumer group lag shown in Kafka monitoring tools may appear high because Kafka-committed offsets lag behind actual consumption.
  • On restart from checkpoint, Flink seeks Kafka to the saved offsets directly, bypassing the group coordinator.
  • Transactional producers are opened per checkpoint interval — set transaction.timeout.ms > checkpoint interval.

Monitoring Kafka Source Metrics

  • KafkaSourceReader.currentOffset — current read offset per partition.
  • KafkaSourceReader.committedOffset — last committed offset (lags behind by design).
  • numRecordsInPerSecond — throughput metric to detect backpressure from slow Kafka.

9. Flink on Kubernetes with Flink Operator

The Apache Flink Kubernetes Operator (version 1.8.0) is the recommended way to deploy and manage Flink applications on Kubernetes. It provides a FlinkDeployment custom resource that manages the full lifecycle: submission, scaling, savepoints, and upgrades.

Installing the Flink Kubernetes Operator

# Add Helm repository
helm repo add flink-operator-repo \
  https://downloads.apache.org/flink/flink-kubernetes-operator-1.8.0/

# Install operator with CRDs
helm install flink-kubernetes-operator \
  flink-operator-repo/flink-kubernetes-operator \
  --namespace flink-operator \
  --create-namespace \
  --set webhook.create=true

FlinkDeployment Custom Resource

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: fraud-detection-job
  namespace: flink-jobs
spec:
  flinkVersion: v1_19
  image: my-registry/fraud-detection-flink:1.0.0
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "4"
    state.backend: rocksdb
    state.backend.incremental: "true"
    state.checkpoints.dir: s3://my-flink-bucket/checkpoints/
    state.savepoints.dir: s3://my-flink-bucket/savepoints/
    execution.checkpointing.interval: "60000"
    execution.checkpointing.mode: EXACTLY_ONCE
    high-availability: kubernetes
    high-availability.storageDir: s3://my-flink-bucket/ha/
    kubernetes.cluster-id: fraud-detection-job
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 1
  taskManager:
    resource:
      memory: "4096m"
      cpu: 2
    replicas: 3
  job:
    jarURI: local:///opt/flink/usrlib/fraud-detection.jar
    entryClass: com.example.FraudDetectionJob
    args: ["--kafka.bootstrap", "kafka:9092"]
    parallelism: 12
    upgradeMode: savepoint
    savepointTriggerNonce: 0

Kubernetes Native HA Configuration

# Additional flinkConfiguration entries for HA
ha.type: kubernetes
kubernetes.cluster-id: fraud-detection-ha
high-availability.storageDir: s3://my-flink-bucket/ha/
# JobManager HA: elect leader via Kubernetes ConfigMap lease
kubernetes.leader-election.enabled: "true"

Job Upgrades with Savepoints

The Flink Operator supports three upgrade modes:

  • savepoint — triggers a savepoint before stopping the old job, restarts new version from savepoint. Zero data loss, requires compatible state schema.
  • last-state — restarts from the most recent successful checkpoint without triggering a new savepoint. Faster but less safe for code changes.
  • stateless — discards all state and restarts fresh. Use only when state compatibility is broken.

Reactive Autoscaling

# Enable reactive mode for dynamic parallelism scaling
scheduler-mode: reactive

# Or use KEDA with Kafka lag-based scaling:
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: flink-taskmanager-scaler
spec:
  scaleTargetRef:
    name: fraud-detection-job
  minReplicaCount: 2
  maxReplicaCount: 20
  triggers:
    - type: kafka
      metadata:
        bootstrapServers: kafka:9092
        consumerGroup: flink-consumer-group
        topic: user-events
        lagThreshold: "10000"

Accessing the Flink UI

# Port-forward to Flink Web UI
kubectl port-forward svc/fraud-detection-job-rest 8081:8081 -n flink-jobs
# Open http://localhost:8081 for job monitoring, checkpoints, backpressure

10. Performance Tuning, Monitoring & Production Checklist

Running Flink in production requires proactive performance tuning and comprehensive monitoring. Backpressure, checkpoint failures, and uncontrolled state growth are the three most common production issues.

Backpressure Detection & Resolution

Backpressure occurs when a downstream operator cannot keep up with its upstream source. The Flink Web UI's Backpressure tab shows backpressure ratio per operator. The busyTimeMsPerSecond metric is the most reliable indicator:

  • busyTimeMsPerSecond > 800 — operator is a bottleneck, consuming >80% of a second being busy.
  • backPressuredTimeMsPerSecond > 200 — operator is waiting on downstream >20% of the time.

Checkpoint Optimization

# flink-conf.yaml / flinkConfiguration in operator CRD
state.backend: rocksdb
state.backend.incremental: "true"       # only upload changed SST files
state.backend.rocksdb.memory.managed: "true"
state.backend.rocksdb.memory.fixed-per-slot: "512mb"
state.backend.local-recovery: "true"    # recover from local TaskManager disk first
execution.checkpointing.interval: "60000"
execution.checkpointing.timeout: "120000"
execution.checkpointing.max-concurrent-checkpoints: "1"
execution.checkpointing.min-pause: "30000"

Parallelism and Resource Tuning

  • Set job parallelism to match the number of Kafka partitions for the source topic — prevents some subtasks from idling.
  • Set taskmanager.numberOfTaskSlots equal to the number of CPU cores per TaskManager pod.
  • Allocate TaskManager memory: JVM heap (~30%), managed memory for RocksDB (~40%), network buffers (~10%), remaining for overhead.
  • Increase taskmanager.network.memory.fraction (default 0.1) if you see network buffer exhaustion.

Key Prometheus Metrics to Monitor

MetricWarning ThresholdCritical Threshold
numRecordsInPerSecondNear 0 (source stalled)
busyTimeMsPerSecond>700>900
backPressuredTimeMsPerSecond>200>500
lastCheckpointDuration>30 s>60 s
lastCheckpointSizeGrowing >10%/hrExceeds storage quota
numberOfFailedCheckpoints>0>2
numberOfRestarts>0 in 1hr>3 in 1hr
Kafka consumer lag>10k records>100k records

Production Checklist

  • ✅ Checkpointing enabled with EXACTLY_ONCE mode and external storage (S3/GCS)
  • EmbeddedRocksDBStateBackend with incremental checkpoints for production state
  • ✅ Kafka source/sink using DeliveryGuarantee.EXACTLY_ONCE with transactional prefix
  • ✅ Restart strategy configured (exponential or fixed delay, not fail-fast)
  • ✅ Kubernetes HA mode enabled (JobManager leader election via ConfigMap)
  • ✅ Resource requests and limits set on all pods (OOMKill prevention)
  • ✅ Prometheus metrics exported and Grafana dashboards configured
  • ✅ State TTL configured on long-lived keyed state to prevent unbounded growth
  • ✅ Late event handling with allowedLateness and side output dead-letter sink
  • ✅ Idle source partitions handled with .withIdleness() in WatermarkStrategy
  • ✅ Savepoint created before every production deployment (upgradeMode: savepoint)
  • ✅ Throughput and latency SLAs defined and alerted on (P99 end-to-end latency)

Grafana Dashboard Sample Queries (PromQL)

# End-to-end throughput
rate(flink_taskmanager_job_task_operator_numRecordsOutPerSecond[1m])

# Average checkpoint duration
avg(flink_jobmanager_job_lastCheckpointDuration) by (job_name)

# Backpressure ratio
flink_taskmanager_job_task_backPressuredTimeMsPerSecond / 1000

# Kafka consumer lag (from Kafka Exporter)
sum(kafka_consumer_lag_sum) by (consumergroup, topic)

Tags

Apache Flink Flink Java DataStream API Flink windowing stateful processing RocksDB state backend exactly-once semantics Flink Kafka Flink Kubernetes stream processing real-time ETL Flink checkpointing

Leave a Comment

Md Sanwar Hossain

Md Sanwar Hossain

Senior Software Engineer specializing in Java, Spring Boot, Apache Flink, Kafka, Kubernetes, and AWS. Passionate about building high-throughput, fault-tolerant stream processing pipelines and cloud-native microservices. Connect on LinkedIn or GitHub.

Back to Blog

Last updated: April 11, 2026