Skip to content

Deploying Flink

Introduction

Apache Flink is a powerful, open-source framework for distributed stream processing and batch analytics. Built for high-throughput, low-latency data processing, Flink provides stateful computations over unbounded and bounded data streams with exactly-once semantics, event-time processing, and sophisticated state management. Used by companies like Netflix, Alibaba, and Uber, Flink excels at real-time analytics, complex event processing, ETL pipelines, and machine learning workflows.

Deploying Flink on Klutch.sh gives you automatic Dockerfile detection, persistent storage for checkpoints and savepoints, HTTP traffic support for the Flink Dashboard and REST API, and seamless scaling capabilities. This guide provides detailed instructions for deploying both Flink JobManager and TaskManager with production-ready configuration, checkpoint persistence, metrics integration, and best practices for running stateful stream processing workloads.

  • Automatic Dockerfile Detection: Klutch.sh detects your Flink Dockerfile and handles container orchestration automatically
  • Persistent State Storage: Attach volumes for checkpoints, savepoints, and RocksDB state backend to ensure fault tolerance
  • Scalable Architecture: Deploy separate JobManager and TaskManager instances with independent scaling
  • Built-in HTTPS: Secure access to Flink Dashboard and REST API with automatic SSL certificates
  • Environment Variable Management: Securely configure Flink settings, JVM options, and state backends through the dashboard
  • Custom Domain Support: Map your own domain to your Flink cluster for professional deployments
  • High Availability: Deploy multiple JobManagers with ZooKeeper for production-grade high availability
  • Monitoring Ready: Integrate with Prometheus, Grafana, and other monitoring tools via Flink’s metrics system
  • Cost-Effective: Pay only for the compute and storage resources you use with flexible scaling options

Prerequisites

Before deploying Flink on Klutch.sh, ensure you have:

  • A Klutch.sh account
  • A GitHub account with a repository for your project
  • Docker installed locally for testing (optional but recommended)
  • Basic familiarity with Docker and stream processing concepts
  • Understanding of Flink’s architecture (JobManager, TaskManager, state management)
  • Optional: Apache Kafka deployed for streaming data sources (see Kafka guide)

Flink follows a distributed master-worker architecture designed for scalable stream processing:

Core Components:

  • JobManager: Master process that coordinates job execution, schedules tasks, manages checkpoints, and recovers from failures
  • TaskManager: Worker processes that execute tasks, maintain state, and exchange data streams
  • Dispatcher: REST API endpoint for job submission and cluster management
  • ResourceManager: Manages TaskManager slots and resource allocation
  • State Backend: Pluggable storage for operator state (Memory, FsStateBackend, RocksDBStateBackend)

Ports and Networking:

  • 8081: Flink Dashboard and REST API (HTTP)
  • 6123: JobManager RPC port (internal communication)
  • 6121-6125: TaskManager RPC ports (internal communication)
  • 9249: Prometheus metrics exporter (optional)

Directory Structure:

/opt/flink/
├── bin/ # Flink binaries and scripts
├── conf/
│ ├── flink-conf.yaml # Main configuration file
│ ├── log4j.properties # Logging configuration
│ └── masters # JobManager hosts
├── lib/ # Flink JARs and connectors
├── plugins/ # Plugin JARs (metrics, state backends)
├── log/ # Log files
├── checkpoints/ # Checkpoint storage
├── savepoints/ # Savepoint storage
└── state/ # Local state (RocksDB)

Installation Steps

Create a new GitHub repository for your Flink deployment:

Terminal window
mkdir flink-deployment
cd flink-deployment
git init

Step 2: Create the Dockerfile

Create a Dockerfile in your repository root with Apache Flink:

FROM flink:1.18.1-scala_2.12-java11
# Set working directory
WORKDIR /opt/flink
# Install additional dependencies
USER root
RUN apt-get update && \
apt-get install -y --no-install-recommends \
curl \
wget \
netcat \
procps \
vim \
&& rm -rf /var/lib/apt/lists/*
# Copy custom configuration files
COPY flink-conf.yaml /opt/flink/conf/flink-conf.yaml
COPY log4j.properties /opt/flink/conf/log4j.properties
# Create directories for state and checkpoints
RUN mkdir -p /opt/flink/checkpoints /opt/flink/savepoints /opt/flink/state && \
chown -R flink:flink /opt/flink/checkpoints /opt/flink/savepoints /opt/flink/state
# Add custom connectors (optional - example with Kafka)
# RUN wget -P /opt/flink/lib/ \
# https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.18.1/flink-sql-connector-kafka-1.18.1.jar
# Switch back to flink user
USER flink
# Expose ports
# 8081: Flink Dashboard and REST API
# 6123: JobManager RPC
# 6121-6125: TaskManager RPC
# 9249: Prometheus metrics
EXPOSE 8081 6123 6121 6122 6123 6124 6125 9249
# Health check for JobManager
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
CMD curl -f http://localhost:8081/overview || exit 1
# Default command (can be overridden for JobManager vs TaskManager)
CMD ["help"]

Create flink-conf.yaml for Flink configuration:

# ==============================================================================
# Flink Configuration
# ==============================================================================
# JobManager Configuration
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.bind-host: 0.0.0.0
jobmanager.memory.process.size: 2048m
jobmanager.memory.jvm-metaspace.size: 256m
# TaskManager Configuration
taskmanager.bind-host: 0.0.0.0
taskmanager.host: localhost
taskmanager.rpc.port: 6122
taskmanager.memory.process.size: 4096m
taskmanager.memory.framework.heap.size: 256m
taskmanager.memory.task.heap.size: 2048m
taskmanager.memory.managed.size: 1024m
taskmanager.numberOfTaskSlots: 4
# Parallelism Configuration
parallelism.default: 2
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 64mb
taskmanager.memory.network.max: 1gb
# High Availability (disabled by default - enable for production)
# high-availability: zookeeper
# high-availability.storageDir: file:///opt/flink/ha
# high-availability.zookeeper.quorum: zookeeper:2181
# high-availability.zookeeper.path.root: /flink
# high-availability.cluster-id: /flink-cluster
# State Backend Configuration
state.backend: filesystem
state.checkpoints.dir: file:///opt/flink/checkpoints
state.savepoints.dir: file:///opt/flink/savepoints
state.backend.incremental: false
# For RocksDB state backend (production recommended):
# state.backend: rocksdb
# state.backend.rocksdb.localdir: /opt/flink/state/rocksdb
# state.backend.incremental: true
# Checkpoint Configuration
execution.checkpointing.interval: 60000
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 600000
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 5000
execution.checkpointing.tolerable-failed-checkpoints: 3
# Restart Strategy
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10s
# Web Dashboard Configuration
web.submit.enable: true
web.cancel.enable: true
web.upload.dir: /opt/flink/uploads
rest.port: 8081
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0
# Logging Configuration
log.file: /opt/flink/log/flink.log
log.level: INFO
# Metrics Configuration
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9249
# Execution Configuration
execution.attached: false
execution.shutdown-on-attached-exit: false
# Blob Server (for distributing JARs to TaskManagers)
blob.server.port: 6124
blob.service.cleanup.interval: 3600
# Security (uncomment and configure for production)
# security.ssl.enabled: true
# security.ssl.rest.enabled: true
# security.ssl.keystore: /opt/flink/conf/flink.keystore
# security.ssl.keystore-password: changeit
# security.ssl.key-password: changeit
# security.ssl.truststore: /opt/flink/conf/flink.truststore
# security.ssl.truststore-password: changeit
# Classloader Resolution
classloader.resolve-order: child-first
classloader.check-leaked-classloader: true
# Akka Configuration (internal communication)
akka.ask.timeout: 60s
akka.tcp.timeout: 60s

Step 4: Create Log4j Configuration

Create log4j.properties for logging configuration:

# ==============================================================================
# Flink Log4j Configuration
# ==============================================================================
log4j.rootLogger=INFO, file, console
# Console Appender
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# File Appender
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.file=/opt/flink/log/flink.log
log4j.appender.file.MaxFileSize=100MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Suppress excessive logging from specific packages
log4j.logger.org.apache.kafka=WARN
log4j.logger.org.apache.zookeeper=WARN
log4j.logger.akka=WARN
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR

Step 5: Create Docker Entrypoint Script

Create docker-entrypoint.sh for flexible JobManager/TaskManager startup:

#!/bin/bash
set -e
# Wait for JobManager to be ready (for TaskManager)
wait_for_jobmanager() {
echo "Waiting for JobManager at ${FLINK_JOBMANAGER_HOST}:${FLINK_JOBMANAGER_PORT}..."
while ! nc -z ${FLINK_JOBMANAGER_HOST} ${FLINK_JOBMANAGER_PORT}; do
sleep 2
done
echo "JobManager is ready!"
}
# Update flink-conf.yaml with environment variables
update_config() {
if [ -n "$FLINK_JOBMANAGER_HOST" ]; then
sed -i "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${FLINK_JOBMANAGER_HOST}/" /opt/flink/conf/flink-conf.yaml
fi
if [ -n "$TASKMANAGER_SLOTS" ]; then
sed -i "s/taskmanager.numberOfTaskSlots: 4/taskmanager.numberOfTaskSlots: ${TASKMANAGER_SLOTS}/" /opt/flink/conf/flink-conf.yaml
fi
if [ -n "$CHECKPOINT_DIR" ]; then
sed -i "s|state.checkpoints.dir: file:///opt/flink/checkpoints|state.checkpoints.dir: ${CHECKPOINT_DIR}|" /opt/flink/conf/flink-conf.yaml
fi
if [ -n "$SAVEPOINT_DIR" ]; then
sed -i "s|state.savepoints.dir: file:///opt/flink/savepoints|state.savepoints.dir: ${SAVEPOINT_DIR}|" /opt/flink/conf/flink-conf.yaml
fi
}
# Update configuration
update_config
# Start based on FLINK_MODE environment variable
case "$FLINK_MODE" in
jobmanager)
echo "Starting Flink JobManager..."
exec /docker-entrypoint.sh jobmanager
;;
taskmanager)
echo "Starting Flink TaskManager..."
wait_for_jobmanager
exec /docker-entrypoint.sh taskmanager
;;
standalone)
echo "Starting Flink in standalone mode..."
exec /docker-entrypoint.sh standalone-job
;;
*)
echo "FLINK_MODE not set or invalid. Use 'jobmanager', 'taskmanager', or 'standalone'"
exec "$@"
;;
esac

Make the script executable:

Terminal window
chmod +x docker-entrypoint.sh

Step 6: Create Environment Variables File

Create .env.example with all environment variables:

Terminal window
# ==============================================================================
# Flink Environment Variables
# ==============================================================================
# Deployment Mode
FLINK_MODE=jobmanager # jobmanager, taskmanager, or standalone
# JobManager Configuration
FLINK_JOBMANAGER_HOST=flink-jobmanager # Hostname/IP of JobManager
FLINK_JOBMANAGER_PORT=6123 # JobManager RPC port
# TaskManager Configuration
TASKMANAGER_SLOTS=4 # Number of task slots per TaskManager
TASKMANAGER_MEMORY=4096m # TaskManager memory
# State Backend Configuration
STATE_BACKEND=filesystem # filesystem or rocksdb
CHECKPOINT_DIR=file:///opt/flink/checkpoints
SAVEPOINT_DIR=file:///opt/flink/savepoints
# For S3 state backend (production):
# STATE_BACKEND=rocksdb
# CHECKPOINT_DIR=s3://my-bucket/flink/checkpoints
# SAVEPOINT_DIR=s3://my-bucket/flink/savepoints
# S3_ACCESS_KEY=your-access-key
# S3_SECRET_KEY=your-secret-key
# S3_ENDPOINT=https://s3.amazonaws.com
# JVM Configuration
JVM_ARGS=-Xms512m -Xmx2048m -XX:+UseG1GC
# High Availability (optional - requires ZooKeeper)
# ENABLE_HA=true
# ZOOKEEPER_QUORUM=zookeeper:2181
# HA_STORAGE_DIR=file:///opt/flink/ha
# HA_CLUSTER_ID=flink-cluster
# Metrics and Monitoring
METRICS_REPORTER=prometheus
PROMETHEUS_PORT=9249
# Security (optional)
# ENABLE_SSL=true
# KEYSTORE_PATH=/opt/flink/conf/flink.keystore
# KEYSTORE_PASSWORD=changeit
# TRUSTSTORE_PATH=/opt/flink/conf/flink.truststore
# TRUSTSTORE_PASSWORD=changeit
# Kafka Configuration (if using Kafka connector)
# KAFKA_BOOTSTRAP_SERVERS=kafka:9092
# KAFKA_GROUP_ID=flink-consumer
# Logging
LOG_LEVEL=INFO

Step 7: Create README and Push to GitHub

Create a README.md:

# Apache Flink Deployment
Production-ready Apache Flink deployment with JobManager and TaskManager configuration.
## Features
- Flink 1.18.1 with Scala 2.12 and Java 11
- Configurable state backends (Filesystem, RocksDB)
- Checkpoint and savepoint persistence
- Prometheus metrics integration
- Docker-based deployment
- High availability support (optional)
## Prerequisites
- Docker
- GitHub account
- Klutch.sh account
## Local Development
### Start JobManager
```bash
docker build -t flink-cluster .
docker run -e FLINK_MODE=jobmanager -p 8081:8081 flink-cluster

Start TaskManager

Terminal window
docker run -e FLINK_MODE=taskmanager \
-e FLINK_JOBMANAGER_HOST=localhost \
--network=host \
flink-cluster

Deployment to Klutch.sh

See deployment instructions in the main guide.

Configuration

All configuration is managed through environment variables and flink-conf.yaml.

Monitoring

Commit and push to GitHub:
```bash
git add .
git commit -m "Initial Flink deployment setup"
git branch -M main
git remote add origin https://github.com/yourusername/flink-deployment.git
git push -u origin main

Deploying to Klutch.sh

Deploy JobManager

    1. Log in to Klutch.sh

      Navigate to klutch.sh/app and sign in to your account.

    2. Create a New Project

      Go to Create Project and give your project a meaningful name (e.g., “Flink Cluster”).

    3. Create JobManager App

      Navigate to Create App and configure:

      • App Name: flink-jobmanager
      • Repository: Select your GitHub repository
      • Branch: Choose main or your preferred branch
      • Klutch.sh will automatically detect your Dockerfile
    4. Configure Traffic Type

      • Traffic Type: Select HTTP (Flink Dashboard and REST API)
      • Internal Port: Set to 8081 (Flink’s default web port)
    5. Set JobManager Environment Variables

      Add the following environment variables:

      FLINK_MODE=jobmanager
      FLINK_JOBMANAGER_HOST=0.0.0.0
      TASKMANAGER_SLOTS=4
      STATE_BACKEND=filesystem
      CHECKPOINT_DIR=file:///opt/flink/checkpoints
      SAVEPOINT_DIR=file:///opt/flink/savepoints
      LOG_LEVEL=INFO
      METRICS_REPORTER=prometheus
      PROMETHEUS_PORT=9249
      JVM_ARGS=-Xms512m -Xmx2048m -XX:+UseG1GC
    6. Attach Persistent Volumes for JobManager

      Critical for state persistence:

      • Mount Path: /opt/flink/checkpoints
      • Size: 20GB (adjust based on checkpoint frequency and state size)

      Add another volume:

      • Mount Path: /opt/flink/savepoints
      • Size: 10GB (adjust based on savepoint usage)

      Add another volume (optional for HA):

      • Mount Path: /opt/flink/ha
      • Size: 5GB
    7. Configure Compute Resources for JobManager

      • Region: Select closest to your data sources
      • CPU: 2 cores minimum (4 cores for production)
      • Memory: 4GB minimum (8GB recommended for production)
      • Instances: 1 (or 3 for high availability with ZooKeeper)
    8. Deploy JobManager

      Click “Create” to deploy. Klutch.sh will:

      • Detect and build your Dockerfile
      • Mount persistent volumes
      • Start the JobManager
      • Provide a URL like flink-jobmanager.klutch.sh

Deploy TaskManager(s)

    1. Create TaskManager App

      Navigate to Create App in the same project:

      • App Name: flink-taskmanager-1
      • Repository: Same GitHub repository
      • Branch: Same branch as JobManager
    2. Configure Traffic Type

      • Traffic Type: Select TCP (TaskManager needs internal RPC communication)
      • Internal Port: Set to 6122 (TaskManager RPC port)
    3. Set TaskManager Environment Variables

      Add the following environment variables:

      FLINK_MODE=taskmanager
      FLINK_JOBMANAGER_HOST=flink-jobmanager.klutch.sh
      FLINK_JOBMANAGER_PORT=6123
      TASKMANAGER_SLOTS=4
      TASKMANAGER_MEMORY=4096m
      STATE_BACKEND=filesystem
      LOG_LEVEL=INFO
      JVM_ARGS=-Xms1g -Xmx4g -XX:+UseG1GC

      Important: Replace flink-jobmanager.klutch.sh with your actual JobManager URL from step 8 above.

    4. Attach Persistent Volumes for TaskManager

      For local state (RocksDB):

      • Mount Path: /opt/flink/state
      • Size: 50GB (adjust based on state size and number of slots)
    5. Configure Compute Resources for TaskManager

      • Region: Same region as JobManager
      • CPU: 4 cores minimum (8 cores for production)
      • Memory: 8GB minimum (16GB recommended for production)
      • Instances: Start with 2 (scale based on parallelism needs)
    6. Deploy TaskManager

      Click “Create” to deploy. The TaskManager will automatically connect to the JobManager.

    7. Scale TaskManagers (Optional)

      Deploy additional TaskManagers by repeating steps 1-6 with different app names (flink-taskmanager-2, flink-taskmanager-3, etc.).

Post-Deployment Configuration

  1. Navigate to your JobManager URL (e.g., https://flink-jobmanager.klutch.sh)
  2. You’ll see the Flink Dashboard with:
    • Running Jobs
    • Available TaskManagers
    • Task Slots
    • Checkpoints and Savepoints
    • Job metrics

Verify TaskManager Registration

  1. In the Flink Dashboard, go to Task Managers
  2. Verify all deployed TaskManagers are registered
  3. Check that the total number of task slots matches your configuration

Submit Your First Job

Using the Flink CLI or REST API:

Terminal window
# Using REST API to submit a job
curl -X POST \
https://flink-jobmanager.klutch.sh/jars/upload \
-H "Expect:" \
-F "jarfile=@/path/to/your-flink-job.jar"
# Get the JAR ID from the response, then submit
curl -X POST \
https://flink-jobmanager.klutch.sh/jars/<jar-id>/run \
-H "Content-Type: application/json" \
-d '{
"entryClass": "com.example.YourJobClass",
"parallelism": 4,
"programArgs": "--input kafka --output database"
}'

Or use the Flink Dashboard:

  1. Go to Submit New Job
  2. Upload your JAR file
  3. Set entry class and program arguments
  4. Click Submit

Configure Custom Domain

  1. In your Klutch.sh app settings, go to Custom Domains
  2. Add your domain (e.g., flink.yourcompany.com)
  3. Update your DNS records as instructed
  4. SSL certificates are automatically provisioned

Here’s a simple Flink streaming application to get started:

Maven Dependencies (pom.xml)

<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-streaming-job</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.18.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Kafka Connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- JSON Serialization -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.5.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Simple Word Count Job (Java)

package com.example;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountJob {
public static void main(String[] args) throws Exception {
// Set up the streaming execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing (every 10 seconds)
env.enableCheckpointing(10000);
// Read text from socket
DataStream<String> text = env.socketTextStream("localhost", 9999);
// Parse and count words
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.sum(1);
// Print results to console
counts.print();
// Execute the job
env.execute("Word Count Streaming Job");
}
public static final class Tokenizer
implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] words = value.toLowerCase().split("\\W+");
for (String word : words) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
}

Kafka Streaming Job (Java)

package com.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Properties;
public class KafkaStreamingJob {
public static void main(String[] args) throws Exception {
// Set up the streaming execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing for fault tolerance
env.enableCheckpointing(30000); // checkpoint every 30 seconds
// Configure Kafka source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("input-topic")
.setGroupId("flink-consumer")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// Create data stream from Kafka
DataStream<String> stream = env
.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Process stream: count messages per 1-minute window
DataStream<Long> counts = stream
.map(value -> 1L)
.windowAll(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.sum(0);
// Print results
counts.print();
// Execute the job
env.execute("Kafka Streaming Job");
}
}

Build and Submit Job

Terminal window
# Build the JAR
mvn clean package
# Submit to Flink cluster
curl -X POST \
https://flink-jobmanager.klutch.sh/jars/upload \
-H "Expect:" \
-F "jarfile=@target/flink-streaming-job-1.0-SNAPSHOT.jar"
# Run the job (replace <jar-id> with the ID from upload response)
curl -X POST \
https://flink-jobmanager.klutch.sh/jars/<jar-id>/run \
-H "Content-Type: application/json" \
-d '{
"entryClass": "com.example.KafkaStreamingJob",
"parallelism": 4
}'

Production Best Practices

State Management

  1. Use RocksDB State Backend for Production

    Update flink-conf.yaml:

    state.backend: rocksdb
    state.backend.rocksdb.localdir: /opt/flink/state/rocksdb
    state.backend.incremental: true

    Mount persistent volume at /opt/flink/state for each TaskManager.

  2. Configure S3 for Checkpoints and Savepoints

    state.checkpoints.dir: s3://my-bucket/flink/checkpoints
    state.savepoints.dir: s3://my-bucket/flink/savepoints

    Add AWS credentials as environment variables:

    S3_ACCESS_KEY=your-access-key
    S3_SECRET_KEY=your-secret-key
    S3_ENDPOINT=https://s3.amazonaws.com
  3. Tune Checkpoint Intervals

    Balance between fault tolerance and performance:

    execution.checkpointing.interval: 60000 # 1 minute for high throughput
    execution.checkpointing.timeout: 600000 # 10 minutes
    execution.checkpointing.max-concurrent-checkpoints: 1

High Availability

  1. Deploy Multiple JobManagers

    Deploy 3 JobManager instances for production HA.

  2. Configure ZooKeeper

    In flink-conf.yaml:

    high-availability: zookeeper
    high-availability.storageDir: s3://my-bucket/flink/ha
    high-availability.zookeeper.quorum: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
    high-availability.zookeeper.path.root: /flink
    high-availability.cluster-id: /production-cluster
  3. Set Environment Variables

    ENABLE_HA=true
    ZOOKEEPER_QUORUM=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
    HA_STORAGE_DIR=s3://my-bucket/flink/ha
    HA_CLUSTER_ID=production-cluster

Resource Optimization

  1. TaskManager Memory Configuration

    taskmanager.memory.process.size: 8192m
    taskmanager.memory.task.heap.size: 4096m
    taskmanager.memory.managed.size: 2048m
    taskmanager.memory.network.fraction: 0.1
  2. Task Slots per TaskManager

    Set slots based on parallelism requirements:

    TASKMANAGER_SLOTS=4 # For 4-core machine
  3. JVM Tuning

    JVM_ARGS=-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=200

Monitoring and Metrics

  1. Enable Prometheus Metrics

    Already configured in flink-conf.yaml:

    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
    metrics.reporter.prom.port: 9249
  2. Monitor Key Metrics

    • Checkpoint duration and size
    • Backpressure indicators
    • Task failures and restarts
    • State size growth
    • Watermark lag
  3. Set Up Alerts

    Monitor for:

    • Failed checkpoints
    • High backpressure
    • TaskManager disconnections
    • Increasing watermark lag
    • State size exceeding thresholds

Security

  1. Enable SSL/TLS

    In flink-conf.yaml:

    security.ssl.enabled: true
    security.ssl.rest.enabled: true
    security.ssl.keystore: /opt/flink/conf/flink.keystore
    security.ssl.keystore-password: changeit
  2. Secure Environment Variables

    Mark sensitive values as secrets in Klutch.sh:

    • Database passwords
    • API keys
    • S3 credentials
    • Keystore passwords
  3. Network Security

    • Use private networking for JobManager-TaskManager communication
    • Restrict Dashboard access to specific IPs
    • Enable authentication for REST API

Backup and Recovery

  1. Regular Savepoints

    Create savepoints before upgrades:

    Terminal window
    curl -X POST \
    https://flink-jobmanager.klutch.sh/jobs/<job-id>/savepoints \
    -H "Content-Type: application/json" \
    -d '{"target-directory": "s3://my-bucket/flink/savepoints"}'
  2. Automated Savepoint Scheduling

    Use cron jobs or scheduled tasks to create periodic savepoints.

  3. Test Recovery Procedures

    Regularly test job recovery from savepoints:

    Terminal window
    curl -X POST \
    https://flink-jobmanager.klutch.sh/jars/<jar-id>/run \
    -H "Content-Type: application/json" \
    -d '{
    "entryClass": "com.example.YourJob",
    "savepointPath": "s3://my-bucket/flink/savepoints/savepoint-123"
    }'

Troubleshooting

JobManager Not Starting

Symptoms: JobManager container exits or health check fails

Solutions:

  1. Check logs: docker logs <container-id>
  2. Verify environment variables are set correctly
  3. Ensure persistent volumes are properly mounted
  4. Check JVM memory settings don’t exceed container limits
  5. Verify port 8081 is not already in use

TaskManager Not Registering

Symptoms: TaskManagers don’t appear in Dashboard

Solutions:

  1. Verify FLINK_JOBMANAGER_HOST points to correct JobManager URL
  2. Check network connectivity between TaskManager and JobManager
  3. Ensure JobManager RPC port (6123) is accessible
  4. Review TaskManager logs for connection errors
  5. Verify TaskManager has sufficient memory allocated

Checkpoint Failures

Symptoms: Jobs fail with checkpoint timeout errors

Solutions:

  1. Increase checkpoint timeout:

    execution.checkpointing.timeout: 900000 # 15 minutes
  2. Verify persistent volume has sufficient space

  3. Check S3 credentials and bucket permissions

  4. Reduce checkpoint interval for smaller checkpoints

  5. Enable incremental checkpoints for RocksDB:

    state.backend.incremental: true

Out of Memory Errors

Symptoms: TaskManagers crash with OOM errors

Solutions:

  1. Increase TaskManager memory:

    TASKMANAGER_MEMORY=8192m
  2. Reduce task slots per TaskManager:

    TASKMANAGER_SLOTS=2
  3. Tune memory configuration in flink-conf.yaml

  4. Use RocksDB state backend to offload state to disk

  5. Monitor state size growth and implement TTL

High Backpressure

Symptoms: Slow processing, increasing lag

Solutions:

  1. Scale out TaskManagers for more parallelism

  2. Optimize job logic and operators

  3. Increase network buffer size:

    taskmanager.memory.network.max: 2gb
  4. Use async I/O for external lookups

  5. Review operator chaining and breakpoints

Job Submission Failures

Symptoms: Cannot submit JAR via REST API

Solutions:

  1. Verify JAR file is not corrupted
  2. Check JAR size limits (increase if needed)
  3. Ensure entry class exists and is correct
  4. Review classpath conflicts
  5. Check Dashboard logs for detailed error messages

Advanced Configuration

Operator State and Keyed State

public class StatefulJob extends RichFlatMapFunction<Event, Result> {
private transient ValueState<Long> counterState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("counter", Long.class, 0L);
counterState = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Event event, Collector<Result> out) throws Exception {
Long count = counterState.value();
count++;
counterState.update(count);
out.collect(new Result(event.getKey(), count));
}
}

Event Time Processing

DataStream<Event> stream = env
.fromSource(source,
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp()),
"Event Source")
.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyWindowFunction());

Custom Metrics

public class MetricsJob extends RichMapFunction<Event, Result> {
private transient Counter counter;
private transient Meter meter;
@Override
public void open(Configuration parameters) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MeterView(60));
}
@Override
public Result map(Event event) {
counter.inc();
meter.markEvent();
return new Result(event);
}
}

Horizontal Scaling

  1. Add More TaskManagers

    Deploy additional TaskManager apps following the deployment steps.

  2. Increase Parallelism

    Update job parallelism when submitting:

    Terminal window
    curl -X POST \
    https://flink-jobmanager.klutch.sh/jars/<jar-id>/run \
    -d '{"parallelism": 16}'
  3. Dynamic Scaling

    Use Flink’s reactive mode (requires Flink 1.13+):

    scheduler-mode: reactive
    execution.attached: false

Vertical Scaling

  1. Increase TaskManager Resources

    In Klutch.sh app settings:

    • Increase CPU cores
    • Increase memory allocation
    • Adjust task slots accordingly
  2. Optimize Memory Distribution

    taskmanager.memory.process.size: 16384m
    taskmanager.memory.task.heap.size: 8192m
    taskmanager.memory.managed.size: 4096m

Integration Examples

Kafka Integration

See Kafka guide for Kafka deployment, then configure Flink connector:

KafkaSource<Event> source = KafkaSource.<Event>builder()
.setBootstrapServers("kafka.klutch.sh:8000")
.setTopics("events")
.setGroupId("flink-consumer")
.setValueOnlyDeserializer(new EventDeserializer())
.build();

Database Sink

JdbcSink.sink(
"INSERT INTO results (id, value, timestamp) VALUES (?, ?, ?)",
(statement, result) -> {
statement.setString(1, result.getId());
statement.setDouble(2, result.getValue());
statement.setTimestamp(3, result.getTimestamp());
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://postgres.klutch.sh:8000/database")
.withDriverName("org.postgresql.Driver")
.withUsername("user")
.withPassword("password")
.build()
);

Elasticsearch Sink

ElasticsearchSink.Builder<Event> builder = new ElasticsearchSink.Builder<>(
List.of(new HttpHost("elasticsearch.klutch.sh", 9200, "http")),
new ElasticsearchSinkFunction<Event>() {
@Override
public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
builder.setBulkFlushMaxActions(1000);
stream.addSink(builder.build());

Additional Resources

Production Deployment Checklist

Before going to production, verify:

  • RocksDB state backend configured with incremental checkpoints
  • Checkpoints and savepoints stored in persistent S3/cloud storage
  • High availability enabled with ZooKeeper (3+ JobManagers)
  • SSL/TLS enabled for all communications
  • Prometheus metrics configured and integrated with monitoring
  • Automated alerting set up for failures and performance issues
  • Regular savepoint schedule implemented
  • Backup and recovery procedures tested
  • TaskManager resources scaled for expected load
  • Network security and firewall rules configured
  • Logging centralized and retained appropriately
  • Job restart strategy configured
  • Resource quotas and limits set
  • Custom domains configured for JobManager
  • Documentation updated for operations team
  • Disaster recovery plan documented and tested
  • Performance benchmarks completed
  • Security audit performed
  • Compliance requirements met
  • Monitoring dashboards created
  • On-call procedures established

Congratulations! You now have a production-ready Apache Flink cluster running on Klutch.sh with JobManager and TaskManager separation, persistent state management, and monitoring integration. Your cluster is ready to process real-time data streams with fault tolerance and exactly-once semantics.