Deploying Flink
Introduction
Apache Flink is a powerful, open-source framework for distributed stream processing and batch analytics. Built for high-throughput, low-latency data processing, Flink provides stateful computations over unbounded and bounded data streams with exactly-once semantics, event-time processing, and sophisticated state management. Used by companies like Netflix, Alibaba, and Uber, Flink excels at real-time analytics, complex event processing, ETL pipelines, and machine learning workflows.
Deploying Flink on Klutch.sh gives you automatic Dockerfile detection, persistent storage for checkpoints and savepoints, HTTP traffic support for the Flink Dashboard and REST API, and seamless scaling capabilities. This guide provides detailed instructions for deploying both Flink JobManager and TaskManager with production-ready configuration, checkpoint persistence, metrics integration, and best practices for running stateful stream processing workloads.
Why Deploy Flink on Klutch.sh?
- Automatic Dockerfile Detection: Klutch.sh detects your Flink Dockerfile and handles container orchestration automatically
- Persistent State Storage: Attach volumes for checkpoints, savepoints, and RocksDB state backend to ensure fault tolerance
- Scalable Architecture: Deploy separate JobManager and TaskManager instances with independent scaling
- Built-in HTTPS: Secure access to Flink Dashboard and REST API with automatic SSL certificates
- Environment Variable Management: Securely configure Flink settings, JVM options, and state backends through the dashboard
- Custom Domain Support: Map your own domain to your Flink cluster for professional deployments
- High Availability: Deploy multiple JobManagers with ZooKeeper for production-grade high availability
- Monitoring Ready: Integrate with Prometheus, Grafana, and other monitoring tools via Flink’s metrics system
- Cost-Effective: Pay only for the compute and storage resources you use with flexible scaling options
Prerequisites
Before deploying Flink on Klutch.sh, ensure you have:
- A Klutch.sh account
- A GitHub account with a repository for your project
- Docker installed locally for testing (optional but recommended)
- Basic familiarity with Docker and stream processing concepts
- Understanding of Flink’s architecture (JobManager, TaskManager, state management)
- Optional: Apache Kafka deployed for streaming data sources (see Kafka guide)
Understanding Flink Architecture
Flink follows a distributed master-worker architecture designed for scalable stream processing:
Core Components:
- JobManager: Master process that coordinates job execution, schedules tasks, manages checkpoints, and recovers from failures
- TaskManager: Worker processes that execute tasks, maintain state, and exchange data streams
- Dispatcher: REST API endpoint for job submission and cluster management
- ResourceManager: Manages TaskManager slots and resource allocation
- State Backend: Pluggable storage for operator state (Memory, FsStateBackend, RocksDBStateBackend)
Ports and Networking:
- 8081: Flink Dashboard and REST API (HTTP)
- 6123: JobManager RPC port (internal communication)
- 6121-6125: TaskManager RPC ports (internal communication)
- 9249: Prometheus metrics exporter (optional)
Directory Structure:
/opt/flink/├── bin/ # Flink binaries and scripts├── conf/│ ├── flink-conf.yaml # Main configuration file│ ├── log4j.properties # Logging configuration│ └── masters # JobManager hosts├── lib/ # Flink JARs and connectors├── plugins/ # Plugin JARs (metrics, state backends)├── log/ # Log files├── checkpoints/ # Checkpoint storage├── savepoints/ # Savepoint storage└── state/ # Local state (RocksDB)Installation Steps
Step 1: Create Your Flink Repository
Create a new GitHub repository for your Flink deployment:
mkdir flink-deploymentcd flink-deploymentgit initStep 2: Create the Dockerfile
Create a Dockerfile in your repository root with Apache Flink:
FROM flink:1.18.1-scala_2.12-java11
# Set working directoryWORKDIR /opt/flink
# Install additional dependenciesUSER rootRUN apt-get update && \ apt-get install -y --no-install-recommends \ curl \ wget \ netcat \ procps \ vim \ && rm -rf /var/lib/apt/lists/*
# Copy custom configuration filesCOPY flink-conf.yaml /opt/flink/conf/flink-conf.yamlCOPY log4j.properties /opt/flink/conf/log4j.properties
# Create directories for state and checkpointsRUN mkdir -p /opt/flink/checkpoints /opt/flink/savepoints /opt/flink/state && \ chown -R flink:flink /opt/flink/checkpoints /opt/flink/savepoints /opt/flink/state
# Add custom connectors (optional - example with Kafka)# RUN wget -P /opt/flink/lib/ \# https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.18.1/flink-sql-connector-kafka-1.18.1.jar
# Switch back to flink userUSER flink
# Expose ports# 8081: Flink Dashboard and REST API# 6123: JobManager RPC# 6121-6125: TaskManager RPC# 9249: Prometheus metricsEXPOSE 8081 6123 6121 6122 6123 6124 6125 9249
# Health check for JobManagerHEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ CMD curl -f http://localhost:8081/overview || exit 1
# Default command (can be overridden for JobManager vs TaskManager)CMD ["help"]Step 3: Create Flink Configuration File
Create flink-conf.yaml for Flink configuration:
# ==============================================================================# Flink Configuration# ==============================================================================
# JobManager Configurationjobmanager.rpc.address: localhostjobmanager.rpc.port: 6123jobmanager.bind-host: 0.0.0.0jobmanager.memory.process.size: 2048mjobmanager.memory.jvm-metaspace.size: 256m
# TaskManager Configurationtaskmanager.bind-host: 0.0.0.0taskmanager.host: localhosttaskmanager.rpc.port: 6122taskmanager.memory.process.size: 4096mtaskmanager.memory.framework.heap.size: 256mtaskmanager.memory.task.heap.size: 2048mtaskmanager.memory.managed.size: 1024mtaskmanager.numberOfTaskSlots: 4
# Parallelism Configurationparallelism.default: 2taskmanager.memory.network.fraction: 0.1taskmanager.memory.network.min: 64mbtaskmanager.memory.network.max: 1gb
# High Availability (disabled by default - enable for production)# high-availability: zookeeper# high-availability.storageDir: file:///opt/flink/ha# high-availability.zookeeper.quorum: zookeeper:2181# high-availability.zookeeper.path.root: /flink# high-availability.cluster-id: /flink-cluster
# State Backend Configurationstate.backend: filesystemstate.checkpoints.dir: file:///opt/flink/checkpointsstate.savepoints.dir: file:///opt/flink/savepointsstate.backend.incremental: false
# For RocksDB state backend (production recommended):# state.backend: rocksdb# state.backend.rocksdb.localdir: /opt/flink/state/rocksdb# state.backend.incremental: true
# Checkpoint Configurationexecution.checkpointing.interval: 60000execution.checkpointing.mode: EXACTLY_ONCEexecution.checkpointing.timeout: 600000execution.checkpointing.max-concurrent-checkpoints: 1execution.checkpointing.min-pause: 5000execution.checkpointing.tolerable-failed-checkpoints: 3
# Restart Strategyrestart-strategy: fixed-delayrestart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10s
# Web Dashboard Configurationweb.submit.enable: trueweb.cancel.enable: trueweb.upload.dir: /opt/flink/uploadsrest.port: 8081rest.address: 0.0.0.0rest.bind-address: 0.0.0.0
# Logging Configurationlog.file: /opt/flink/log/flink.loglog.level: INFO
# Metrics Configurationmetrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReportermetrics.reporter.prom.port: 9249
# Execution Configurationexecution.attached: falseexecution.shutdown-on-attached-exit: false
# Blob Server (for distributing JARs to TaskManagers)blob.server.port: 6124blob.service.cleanup.interval: 3600
# Security (uncomment and configure for production)# security.ssl.enabled: true# security.ssl.rest.enabled: true# security.ssl.keystore: /opt/flink/conf/flink.keystore# security.ssl.keystore-password: changeit# security.ssl.key-password: changeit# security.ssl.truststore: /opt/flink/conf/flink.truststore# security.ssl.truststore-password: changeit
# Classloader Resolutionclassloader.resolve-order: child-firstclassloader.check-leaked-classloader: true
# Akka Configuration (internal communication)akka.ask.timeout: 60sakka.tcp.timeout: 60sStep 4: Create Log4j Configuration
Create log4j.properties for logging configuration:
# ==============================================================================# Flink Log4j Configuration# ==============================================================================
log4j.rootLogger=INFO, file, console
# Console Appenderlog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# File Appenderlog4j.appender.file=org.apache.log4j.RollingFileAppenderlog4j.appender.file.file=/opt/flink/log/flink.loglog4j.appender.file.MaxFileSize=100MBlog4j.appender.file.MaxBackupIndex=10log4j.appender.file.layout=org.apache.log4j.PatternLayoutlog4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Suppress excessive logging from specific packageslog4j.logger.org.apache.kafka=WARNlog4j.logger.org.apache.zookeeper=WARNlog4j.logger.akka=WARNlog4j.logger.org.apache.hadoop=WARNlog4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERRORStep 5: Create Docker Entrypoint Script
Create docker-entrypoint.sh for flexible JobManager/TaskManager startup:
#!/bin/bashset -e
# Wait for JobManager to be ready (for TaskManager)wait_for_jobmanager() { echo "Waiting for JobManager at ${FLINK_JOBMANAGER_HOST}:${FLINK_JOBMANAGER_PORT}..." while ! nc -z ${FLINK_JOBMANAGER_HOST} ${FLINK_JOBMANAGER_PORT}; do sleep 2 done echo "JobManager is ready!"}
# Update flink-conf.yaml with environment variablesupdate_config() { if [ -n "$FLINK_JOBMANAGER_HOST" ]; then sed -i "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${FLINK_JOBMANAGER_HOST}/" /opt/flink/conf/flink-conf.yaml fi
if [ -n "$TASKMANAGER_SLOTS" ]; then sed -i "s/taskmanager.numberOfTaskSlots: 4/taskmanager.numberOfTaskSlots: ${TASKMANAGER_SLOTS}/" /opt/flink/conf/flink-conf.yaml fi
if [ -n "$CHECKPOINT_DIR" ]; then sed -i "s|state.checkpoints.dir: file:///opt/flink/checkpoints|state.checkpoints.dir: ${CHECKPOINT_DIR}|" /opt/flink/conf/flink-conf.yaml fi
if [ -n "$SAVEPOINT_DIR" ]; then sed -i "s|state.savepoints.dir: file:///opt/flink/savepoints|state.savepoints.dir: ${SAVEPOINT_DIR}|" /opt/flink/conf/flink-conf.yaml fi}
# Update configurationupdate_config
# Start based on FLINK_MODE environment variablecase "$FLINK_MODE" in jobmanager) echo "Starting Flink JobManager..." exec /docker-entrypoint.sh jobmanager ;; taskmanager) echo "Starting Flink TaskManager..." wait_for_jobmanager exec /docker-entrypoint.sh taskmanager ;; standalone) echo "Starting Flink in standalone mode..." exec /docker-entrypoint.sh standalone-job ;; *) echo "FLINK_MODE not set or invalid. Use 'jobmanager', 'taskmanager', or 'standalone'" exec "$@" ;;esacMake the script executable:
chmod +x docker-entrypoint.shStep 6: Create Environment Variables File
Create .env.example with all environment variables:
# ==============================================================================# Flink Environment Variables# ==============================================================================
# Deployment ModeFLINK_MODE=jobmanager # jobmanager, taskmanager, or standalone
# JobManager ConfigurationFLINK_JOBMANAGER_HOST=flink-jobmanager # Hostname/IP of JobManagerFLINK_JOBMANAGER_PORT=6123 # JobManager RPC port
# TaskManager ConfigurationTASKMANAGER_SLOTS=4 # Number of task slots per TaskManagerTASKMANAGER_MEMORY=4096m # TaskManager memory
# State Backend ConfigurationSTATE_BACKEND=filesystem # filesystem or rocksdbCHECKPOINT_DIR=file:///opt/flink/checkpointsSAVEPOINT_DIR=file:///opt/flink/savepoints
# For S3 state backend (production):# STATE_BACKEND=rocksdb# CHECKPOINT_DIR=s3://my-bucket/flink/checkpoints# SAVEPOINT_DIR=s3://my-bucket/flink/savepoints# S3_ACCESS_KEY=your-access-key# S3_SECRET_KEY=your-secret-key# S3_ENDPOINT=https://s3.amazonaws.com
# JVM ConfigurationJVM_ARGS=-Xms512m -Xmx2048m -XX:+UseG1GC
# High Availability (optional - requires ZooKeeper)# ENABLE_HA=true# ZOOKEEPER_QUORUM=zookeeper:2181# HA_STORAGE_DIR=file:///opt/flink/ha# HA_CLUSTER_ID=flink-cluster
# Metrics and MonitoringMETRICS_REPORTER=prometheusPROMETHEUS_PORT=9249
# Security (optional)# ENABLE_SSL=true# KEYSTORE_PATH=/opt/flink/conf/flink.keystore# KEYSTORE_PASSWORD=changeit# TRUSTSTORE_PATH=/opt/flink/conf/flink.truststore# TRUSTSTORE_PASSWORD=changeit
# Kafka Configuration (if using Kafka connector)# KAFKA_BOOTSTRAP_SERVERS=kafka:9092# KAFKA_GROUP_ID=flink-consumer
# LoggingLOG_LEVEL=INFOStep 7: Create README and Push to GitHub
Create a README.md:
# Apache Flink Deployment
Production-ready Apache Flink deployment with JobManager and TaskManager configuration.
## Features
- Flink 1.18.1 with Scala 2.12 and Java 11- Configurable state backends (Filesystem, RocksDB)- Checkpoint and savepoint persistence- Prometheus metrics integration- Docker-based deployment- High availability support (optional)
## Prerequisites
- Docker- GitHub account- Klutch.sh account
## Local Development
### Start JobManager
```bashdocker build -t flink-cluster .docker run -e FLINK_MODE=jobmanager -p 8081:8081 flink-clusterStart TaskManager
docker run -e FLINK_MODE=taskmanager \ -e FLINK_JOBMANAGER_HOST=localhost \ --network=host \ flink-clusterDeployment to Klutch.sh
See deployment instructions in the main guide.
Configuration
All configuration is managed through environment variables and flink-conf.yaml.
Monitoring
- Flink Dashboard: http://localhost:8081
- Prometheus metrics: http://localhost:9249/metrics
Commit and push to GitHub:
```bashgit add .git commit -m "Initial Flink deployment setup"git branch -M maingit remote add origin https://github.com/yourusername/flink-deployment.gitgit push -u origin mainDeploying to Klutch.sh
Deploy JobManager
-
Log in to Klutch.sh
Navigate to klutch.sh/app and sign in to your account.
-
Create a New Project
Go to Create Project and give your project a meaningful name (e.g., “Flink Cluster”).
-
Create JobManager App
Navigate to Create App and configure:
- App Name:
flink-jobmanager - Repository: Select your GitHub repository
- Branch: Choose
mainor your preferred branch - Klutch.sh will automatically detect your Dockerfile
- App Name:
-
Configure Traffic Type
- Traffic Type: Select HTTP (Flink Dashboard and REST API)
- Internal Port: Set to
8081(Flink’s default web port)
-
Set JobManager Environment Variables
Add the following environment variables:
FLINK_MODE=jobmanagerFLINK_JOBMANAGER_HOST=0.0.0.0TASKMANAGER_SLOTS=4STATE_BACKEND=filesystemCHECKPOINT_DIR=file:///opt/flink/checkpointsSAVEPOINT_DIR=file:///opt/flink/savepointsLOG_LEVEL=INFOMETRICS_REPORTER=prometheusPROMETHEUS_PORT=9249JVM_ARGS=-Xms512m -Xmx2048m -XX:+UseG1GC -
Attach Persistent Volumes for JobManager
Critical for state persistence:
- Mount Path:
/opt/flink/checkpoints - Size: 20GB (adjust based on checkpoint frequency and state size)
Add another volume:
- Mount Path:
/opt/flink/savepoints - Size: 10GB (adjust based on savepoint usage)
Add another volume (optional for HA):
- Mount Path:
/opt/flink/ha - Size: 5GB
- Mount Path:
-
Configure Compute Resources for JobManager
- Region: Select closest to your data sources
- CPU: 2 cores minimum (4 cores for production)
- Memory: 4GB minimum (8GB recommended for production)
- Instances: 1 (or 3 for high availability with ZooKeeper)
-
Deploy JobManager
Click “Create” to deploy. Klutch.sh will:
- Detect and build your Dockerfile
- Mount persistent volumes
- Start the JobManager
- Provide a URL like
flink-jobmanager.klutch.sh
Deploy TaskManager(s)
-
Create TaskManager App
Navigate to Create App in the same project:
- App Name:
flink-taskmanager-1 - Repository: Same GitHub repository
- Branch: Same branch as JobManager
- App Name:
-
Configure Traffic Type
- Traffic Type: Select TCP (TaskManager needs internal RPC communication)
- Internal Port: Set to
6122(TaskManager RPC port)
-
Set TaskManager Environment Variables
Add the following environment variables:
FLINK_MODE=taskmanagerFLINK_JOBMANAGER_HOST=flink-jobmanager.klutch.shFLINK_JOBMANAGER_PORT=6123TASKMANAGER_SLOTS=4TASKMANAGER_MEMORY=4096mSTATE_BACKEND=filesystemLOG_LEVEL=INFOJVM_ARGS=-Xms1g -Xmx4g -XX:+UseG1GCImportant: Replace
flink-jobmanager.klutch.shwith your actual JobManager URL from step 8 above. -
Attach Persistent Volumes for TaskManager
For local state (RocksDB):
- Mount Path:
/opt/flink/state - Size: 50GB (adjust based on state size and number of slots)
- Mount Path:
-
Configure Compute Resources for TaskManager
- Region: Same region as JobManager
- CPU: 4 cores minimum (8 cores for production)
- Memory: 8GB minimum (16GB recommended for production)
- Instances: Start with 2 (scale based on parallelism needs)
-
Deploy TaskManager
Click “Create” to deploy. The TaskManager will automatically connect to the JobManager.
-
Scale TaskManagers (Optional)
Deploy additional TaskManagers by repeating steps 1-6 with different app names (
flink-taskmanager-2,flink-taskmanager-3, etc.).
Post-Deployment Configuration
Access Flink Dashboard
- Navigate to your JobManager URL (e.g.,
https://flink-jobmanager.klutch.sh) - You’ll see the Flink Dashboard with:
- Running Jobs
- Available TaskManagers
- Task Slots
- Checkpoints and Savepoints
- Job metrics
Verify TaskManager Registration
- In the Flink Dashboard, go to Task Managers
- Verify all deployed TaskManagers are registered
- Check that the total number of task slots matches your configuration
Submit Your First Job
Using the Flink CLI or REST API:
# Using REST API to submit a jobcurl -X POST \ https://flink-jobmanager.klutch.sh/jars/upload \ -H "Expect:" \ -F "jarfile=@/path/to/your-flink-job.jar"
# Get the JAR ID from the response, then submitcurl -X POST \ https://flink-jobmanager.klutch.sh/jars/<jar-id>/run \ -H "Content-Type: application/json" \ -d '{ "entryClass": "com.example.YourJobClass", "parallelism": 4, "programArgs": "--input kafka --output database" }'Or use the Flink Dashboard:
- Go to Submit New Job
- Upload your JAR file
- Set entry class and program arguments
- Click Submit
Configure Custom Domain
- In your Klutch.sh app settings, go to Custom Domains
- Add your domain (e.g.,
flink.yourcompany.com) - Update your DNS records as instructed
- SSL certificates are automatically provisioned
Sample Flink Application
Here’s a simple Flink streaming application to get started:
Maven Dependencies (pom.xml)
<project> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>flink-streaming-job</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <flink.version>1.18.1</flink.version> <scala.binary.version>2.12</scala.binary.version> <java.version>11</java.version> </properties>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
<!-- Kafka Connector --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>${flink.version}</version> </dependency>
<!-- JSON Serialization --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.5.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build></project>Simple Word Count Job (Java)
package com.example;
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;
public class WordCountJob {
public static void main(String[] args) throws Exception { // Set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing (every 10 seconds) env.enableCheckpointing(10000);
// Read text from socket DataStream<String> text = env.socketTextStream("localhost", 9999);
// Parse and count words DataStream<Tuple2<String, Integer>> counts = text .flatMap(new Tokenizer()) .keyBy(value -> value.f0) .sum(1);
// Print results to console counts.print();
// Execute the job env.execute("Word Count Streaming Job"); }
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] words = value.toLowerCase().split("\\W+"); for (String word : words) { if (word.length() > 0) { out.collect(new Tuple2<>(word, 1)); } } } }}Kafka Streaming Job (Java)
package com.example;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.Properties;
public class KafkaStreamingJob {
public static void main(String[] args) throws Exception { // Set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing for fault tolerance env.enableCheckpointing(30000); // checkpoint every 30 seconds
// Configure Kafka source KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("kafka:9092") .setTopics("input-topic") .setGroupId("flink-consumer") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build();
// Create data stream from Kafka DataStream<String> stream = env .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Process stream: count messages per 1-minute window DataStream<Long> counts = stream .map(value -> 1L) .windowAll(TumblingProcessingTimeWindows.of(Time.minutes(1))) .sum(0);
// Print results counts.print();
// Execute the job env.execute("Kafka Streaming Job"); }}Build and Submit Job
# Build the JARmvn clean package
# Submit to Flink clustercurl -X POST \ https://flink-jobmanager.klutch.sh/jars/upload \ -H "Expect:" \ -F "jarfile=@target/flink-streaming-job-1.0-SNAPSHOT.jar"
# Run the job (replace <jar-id> with the ID from upload response)curl -X POST \ https://flink-jobmanager.klutch.sh/jars/<jar-id>/run \ -H "Content-Type: application/json" \ -d '{ "entryClass": "com.example.KafkaStreamingJob", "parallelism": 4 }'Production Best Practices
State Management
-
Use RocksDB State Backend for Production
Update
flink-conf.yaml:state.backend: rocksdbstate.backend.rocksdb.localdir: /opt/flink/state/rocksdbstate.backend.incremental: trueMount persistent volume at
/opt/flink/statefor each TaskManager. -
Configure S3 for Checkpoints and Savepoints
state.checkpoints.dir: s3://my-bucket/flink/checkpointsstate.savepoints.dir: s3://my-bucket/flink/savepointsAdd AWS credentials as environment variables:
S3_ACCESS_KEY=your-access-keyS3_SECRET_KEY=your-secret-keyS3_ENDPOINT=https://s3.amazonaws.com -
Tune Checkpoint Intervals
Balance between fault tolerance and performance:
execution.checkpointing.interval: 60000 # 1 minute for high throughputexecution.checkpointing.timeout: 600000 # 10 minutesexecution.checkpointing.max-concurrent-checkpoints: 1
High Availability
-
Deploy Multiple JobManagers
Deploy 3 JobManager instances for production HA.
-
Configure ZooKeeper
In
flink-conf.yaml:high-availability: zookeeperhigh-availability.storageDir: s3://my-bucket/flink/hahigh-availability.zookeeper.quorum: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181high-availability.zookeeper.path.root: /flinkhigh-availability.cluster-id: /production-cluster -
Set Environment Variables
ENABLE_HA=trueZOOKEEPER_QUORUM=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181HA_STORAGE_DIR=s3://my-bucket/flink/haHA_CLUSTER_ID=production-cluster
Resource Optimization
-
TaskManager Memory Configuration
taskmanager.memory.process.size: 8192mtaskmanager.memory.task.heap.size: 4096mtaskmanager.memory.managed.size: 2048mtaskmanager.memory.network.fraction: 0.1 -
Task Slots per TaskManager
Set slots based on parallelism requirements:
TASKMANAGER_SLOTS=4 # For 4-core machine -
JVM Tuning
JVM_ARGS=-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=200
Monitoring and Metrics
-
Enable Prometheus Metrics
Already configured in
flink-conf.yaml:metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReportermetrics.reporter.prom.port: 9249 -
Monitor Key Metrics
- Checkpoint duration and size
- Backpressure indicators
- Task failures and restarts
- State size growth
- Watermark lag
-
Set Up Alerts
Monitor for:
- Failed checkpoints
- High backpressure
- TaskManager disconnections
- Increasing watermark lag
- State size exceeding thresholds
Security
-
Enable SSL/TLS
In
flink-conf.yaml:security.ssl.enabled: truesecurity.ssl.rest.enabled: truesecurity.ssl.keystore: /opt/flink/conf/flink.keystoresecurity.ssl.keystore-password: changeit -
Secure Environment Variables
Mark sensitive values as secrets in Klutch.sh:
- Database passwords
- API keys
- S3 credentials
- Keystore passwords
-
Network Security
- Use private networking for JobManager-TaskManager communication
- Restrict Dashboard access to specific IPs
- Enable authentication for REST API
Backup and Recovery
-
Regular Savepoints
Create savepoints before upgrades:
Terminal window curl -X POST \https://flink-jobmanager.klutch.sh/jobs/<job-id>/savepoints \-H "Content-Type: application/json" \-d '{"target-directory": "s3://my-bucket/flink/savepoints"}' -
Automated Savepoint Scheduling
Use cron jobs or scheduled tasks to create periodic savepoints.
-
Test Recovery Procedures
Regularly test job recovery from savepoints:
Terminal window curl -X POST \https://flink-jobmanager.klutch.sh/jars/<jar-id>/run \-H "Content-Type: application/json" \-d '{"entryClass": "com.example.YourJob","savepointPath": "s3://my-bucket/flink/savepoints/savepoint-123"}'
Troubleshooting
JobManager Not Starting
Symptoms: JobManager container exits or health check fails
Solutions:
- Check logs:
docker logs <container-id> - Verify environment variables are set correctly
- Ensure persistent volumes are properly mounted
- Check JVM memory settings don’t exceed container limits
- Verify port 8081 is not already in use
TaskManager Not Registering
Symptoms: TaskManagers don’t appear in Dashboard
Solutions:
- Verify
FLINK_JOBMANAGER_HOSTpoints to correct JobManager URL - Check network connectivity between TaskManager and JobManager
- Ensure JobManager RPC port (6123) is accessible
- Review TaskManager logs for connection errors
- Verify TaskManager has sufficient memory allocated
Checkpoint Failures
Symptoms: Jobs fail with checkpoint timeout errors
Solutions:
-
Increase checkpoint timeout:
execution.checkpointing.timeout: 900000 # 15 minutes -
Verify persistent volume has sufficient space
-
Check S3 credentials and bucket permissions
-
Reduce checkpoint interval for smaller checkpoints
-
Enable incremental checkpoints for RocksDB:
state.backend.incremental: true
Out of Memory Errors
Symptoms: TaskManagers crash with OOM errors
Solutions:
-
Increase TaskManager memory:
TASKMANAGER_MEMORY=8192m -
Reduce task slots per TaskManager:
TASKMANAGER_SLOTS=2 -
Tune memory configuration in
flink-conf.yaml -
Use RocksDB state backend to offload state to disk
-
Monitor state size growth and implement TTL
High Backpressure
Symptoms: Slow processing, increasing lag
Solutions:
-
Scale out TaskManagers for more parallelism
-
Optimize job logic and operators
-
Increase network buffer size:
taskmanager.memory.network.max: 2gb -
Use async I/O for external lookups
-
Review operator chaining and breakpoints
Job Submission Failures
Symptoms: Cannot submit JAR via REST API
Solutions:
- Verify JAR file is not corrupted
- Check JAR size limits (increase if needed)
- Ensure entry class exists and is correct
- Review classpath conflicts
- Check Dashboard logs for detailed error messages
Advanced Configuration
Operator State and Keyed State
public class StatefulJob extends RichFlatMapFunction<Event, Result> {
private transient ValueState<Long> counterState;
@Override public void open(Configuration parameters) { ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("counter", Long.class, 0L); counterState = getRuntimeContext().getState(descriptor); }
@Override public void flatMap(Event event, Collector<Result> out) throws Exception { Long count = counterState.value(); count++; counterState.update(count); out.collect(new Result(event.getKey(), count)); }}Event Time Processing
DataStream<Event> stream = env .fromSource(source, WatermarkStrategy .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((event, timestamp) -> event.getTimestamp()), "Event Source") .keyBy(Event::getKey) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .process(new MyWindowFunction());Custom Metrics
public class MetricsJob extends RichMapFunction<Event, Result> {
private transient Counter counter; private transient Meter meter;
@Override public void open(Configuration parameters) { this.counter = getRuntimeContext() .getMetricGroup() .counter("myCounter"); this.meter = getRuntimeContext() .getMetricGroup() .meter("myMeter", new MeterView(60)); }
@Override public Result map(Event event) { counter.inc(); meter.markEvent(); return new Result(event); }}Scaling Your Flink Cluster
Horizontal Scaling
-
Add More TaskManagers
Deploy additional TaskManager apps following the deployment steps.
-
Increase Parallelism
Update job parallelism when submitting:
Terminal window curl -X POST \https://flink-jobmanager.klutch.sh/jars/<jar-id>/run \-d '{"parallelism": 16}' -
Dynamic Scaling
Use Flink’s reactive mode (requires Flink 1.13+):
scheduler-mode: reactiveexecution.attached: false
Vertical Scaling
-
Increase TaskManager Resources
In Klutch.sh app settings:
- Increase CPU cores
- Increase memory allocation
- Adjust task slots accordingly
-
Optimize Memory Distribution
taskmanager.memory.process.size: 16384mtaskmanager.memory.task.heap.size: 8192mtaskmanager.memory.managed.size: 4096m
Integration Examples
Kafka Integration
See Kafka guide for Kafka deployment, then configure Flink connector:
KafkaSource<Event> source = KafkaSource.<Event>builder() .setBootstrapServers("kafka.klutch.sh:8000") .setTopics("events") .setGroupId("flink-consumer") .setValueOnlyDeserializer(new EventDeserializer()) .build();Database Sink
JdbcSink.sink( "INSERT INTO results (id, value, timestamp) VALUES (?, ?, ?)", (statement, result) -> { statement.setString(1, result.getId()); statement.setDouble(2, result.getValue()); statement.setTimestamp(3, result.getTimestamp()); }, JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:postgresql://postgres.klutch.sh:8000/database") .withDriverName("org.postgresql.Driver") .withUsername("user") .withPassword("password") .build());Elasticsearch Sink
ElasticsearchSink.Builder<Event> builder = new ElasticsearchSink.Builder<>( List.of(new HttpHost("elasticsearch.klutch.sh", 9200, "http")), new ElasticsearchSinkFunction<Event>() { @Override public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } });
builder.setBulkFlushMaxActions(1000);stream.addSink(builder.build());Additional Resources
- Apache Flink Official Documentation
- DataStream API Guide
- State Backends Documentation
- Checkpointing Guide
- Flink Connectors Ecosystem
- Flink GitHub Repository
- Flink Community Resources
Production Deployment Checklist
Before going to production, verify:
- RocksDB state backend configured with incremental checkpoints
- Checkpoints and savepoints stored in persistent S3/cloud storage
- High availability enabled with ZooKeeper (3+ JobManagers)
- SSL/TLS enabled for all communications
- Prometheus metrics configured and integrated with monitoring
- Automated alerting set up for failures and performance issues
- Regular savepoint schedule implemented
- Backup and recovery procedures tested
- TaskManager resources scaled for expected load
- Network security and firewall rules configured
- Logging centralized and retained appropriately
- Job restart strategy configured
- Resource quotas and limits set
- Custom domains configured for JobManager
- Documentation updated for operations team
- Disaster recovery plan documented and tested
- Performance benchmarks completed
- Security audit performed
- Compliance requirements met
- Monitoring dashboards created
- On-call procedures established
Congratulations! You now have a production-ready Apache Flink cluster running on Klutch.sh with JobManager and TaskManager separation, persistent state management, and monitoring integration. Your cluster is ready to process real-time data streams with fault tolerance and exactly-once semantics.