March 02, 2024

Kafka to Flink Integration for the First-Time: Actionable Insights

Mitch Gray
Mitch Gray
twitter facebook linkedin

In this follow-up article (see part 1), building on my initial explorations with Apache Flink, I aim to dive into Flink sources, with a focus on Apache Kafka and its role as both a data source and a sink. Kafka is essential for streaming use cases but Kafka by itself is not enough. While being a state of the art data bus for streaming data, it lacks the ability to do the processing part which is needed to turn streaming data into something of actual value. Despite the widespread adoption of Kafka among streaming data organizations, and my initial assumption that a Kafka Flink integration would be straightforward, my experience, echoed by numerous community inquiries, suggests that the reality is a bit more complex. So, with this context, let’s embark on a detailed examination of Kafka within the Flink ecosystem.

Understanding Sources In Apache Flink

So, what exactly is a source in the context of Apache Flink? Sources are the components that enable you to ingest your data into your Flink job for processing from various storage types or systems. Flink processes data streams at a high throughput and low latency in a fault-tolerant manner and these can be unbounded (meaning the data has no defined start or end, which theoretically means an infinity number of records will be processed) or bounded (meaning there are a finite number of records that will be processed). Flink has many predefined sources such as reading from sockets, files, collections, and io pipes. It also has bundled connectors which provide code for interfacing with third-party systems.

As of Flink 1.18 there are many popular connectors available such as Kafka, RabbitMQ, JDBC, Elasticsearch, Google PubSub and various Amazon streams and databases, with a Kafka to Flink integration being a key aspect of stream processing. Are you wanting to connect to a database? No problem, Flink has a Table API and SQL Sources which allow a high-level abstraction making it easier to work with structured data such as databases and numerous file formats. Lastly, you can even build a custom source if it’s not covered with one of the built-in connectors. Now that you know what an Apache Flink Source is, let’s move on to why Kafka.

Why Kafka? Unleashing The Power Of Stream-Processing

Apache Kafka is the titan of data streaming. According to Kafka it is “The most popular open-source stream-processing software for collecting, processing, storing, and analyzing data at scale. Most known for its excellent performance, low latency, fault tolerance, and high throughput, it’s capable of handling thousands of messages per second.” It is trusted by giants in the industry such as Adidas, Airbnb, Cisco, CloudFlare, Hotels.com, Linkedin, Netflix, Oracle, Salesforce and many more. According to one source in 2024 over 29,545 companies and more than 80% of all Fortune 100 companies are using it. Kafka’s power lies in its ability to seamlessly connect multiple data sources and consumers, making it the cornerstone of modern data architectures. Use this ability to subscribe to, store, and process streams of records in a fault-tolerant way is why it is widely used for building real time streaming data pipelines and applications.

A Kaleidoscope Of Use Cases

While new use cases are popping up every day there are many common ones that you see across various companies. Messaging being one of the top contenders. It can be used to send and receive messages and can be a replacement for more traditional message brokers such as RabbitMQ. Obviously, we can’t leave out stream processing since this is why we are using Flink in the first place. With the help of a Kafka Consumer we can ingest our data into our Flink pipeline for further transformation / processing. Log Aggregation is another popular use case. It can aggregate logs from different sources and make them available to multiple consumers effectively simplifying log processing and analytics. On that same line real-time Analytics with things like data aggregation, real-time reporting, and triggering event-based events.

According to Kafka, the original use case was for website activity tracking. A user’s actions on a site such as searches, pages views, and other common website activity would be published to a topic in Kafka. From there things like reporting, monitoring, or analytics would subscribe to these topic/topics for consumption. Kafka is also a great tool for aggregating metrics for distributed applications and external commit logs for re-syncing failed nodes.

As you can see Apache Kafka is essential to data streaming and has a wide range of uses. Everything from IoT data processing to microservices communication Kafka is highly flexible and can handle large scale, real-time data streams across a broad range of applications and industries. Kafka flexibility is unmatched and this is why I wanted to explore Kafka and a source in my Flink journey.

Looking to see how other companies are using Kafka Flink Integration? Apache has some real business use cases here.

My Journey With Apache Kafka And Flink

Since Kafka is either a source or sink for most Flink users, I wanted to try and build a pipeline streaming data from Kafka. I used Upstash for my managed kafka solution since it’s free for a certain number of messages per day, requires little setup, and has instant gratification. For my Kafka to Flink integration, I first needed to send streaming data to my Kafka topic. I decided to use my Home Assistant setup to generate some fake sensor data that uses a webhook to push events over to my Kafka topic. Basically, the sensors will randomly change and push events on each change to my Upstash’s webhook. Upstash has a nice dashboard that gives you multiple ways to produce or consume your data. Now with unbounded data being published to my Kafka topic I was ready to try and consume it as a source in my Flink pipeline.

Home Assistant Dashboard with Sensor Data
Upstash Dashboard

Setting Up Kafka As A Source

Java:

Between blogs, tutorials, stackoverflow, and my personal experience, Java has ample examples of using Kafka as a source with Flink, and for once, Flink’s documentation was helpful. Java seems to be widely used and well documented. Flink provides a builder class for constructing an instance of a KafkaSource. You will need a few things to get your source set up:

  • Connection information/properties/security, (which I got from my Upstash dashboard and Flink)
  • Topic you set up in Kafka
  • Group Id (consumer group — this must be unique across all consumers)
  • Offset — such as OffsetsInitializer.earliest(). See link for all options
  • Deserializer — this defines how your Kafka message will be deserialized. See link for options
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

(Flink Java Code Snippet for Kafka Source)

There are additional options you can set depending on your use case, but that’s the basics to get you streaming data.

Python:

Python offered its own nuances with the critical addition of the Kafka jar. Other than that it will look very similar with slightly different syntax. In my research, I didn’t find a lot of tutorials or code examples of an actual working Kafka source in PyFlink so I wanted to provide that. One thing that was only mentioned in a few places, and completely left out of all AI generations tools I tried, was the Java Kafka Jar. You will need to add this jar as a dependency in your Python code. Whatever version of Python you are using you will need that jar (my Flink version is 1.18 and the jar is here). Add the jar to your code via env.add_jar(file location)

# Create a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

# Adding the jar to my streming environment.
env.add_jars("file:///home/pythonProject/UseCases/flink-sql-connector-kafka.jar")

(code snippet of adding Kafka jar to my Python code)

Once you have the jar you can add your Kafka source similarly to the above Java code snippet.

from pyflink.common import WatermarkStrategy, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import KafkaSource
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
import uuid

(PyFlink Libraries I used)

# Create a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

# Adding the jar to my streming environment.
env.add_jars("file:///home/pythonProject/UseCases/flink-sql-connector-kafka.jar")

properties = {
'bootstrap.servers': 'your-bootstrap-url:9092',
'sasl.mechanism': 'SCRAM-SHA-256',
'security.protocol': 'SASL_SSL',
'sasl.jaas.config': "org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required \
username='your-username' \
password='your-password';",
'group.id': 'observability',
}

earliest = False
offset = KafkaOffsetsInitializer.earliest() if earliest else KafkaOffsetsInitializer.latest()

# Create a Kafka Source
kafka_source = KafkaSource.builder() \
.set_topics("iot") \
.set_properties(properties) \
.set_starting_offsets(offset) \
.set_value_only_deserializer(SimpleStringSchema())\
.build()

# Create a DataStream from the Kafka source and assign timestamps and watermarks
data_stream = env.from_source(kafka_source, WatermarkStrategy.for_monotonous_timestamps(), "Kafka Source")

# Print line for readablity in the console
print("start reading data from kafka")

# Different ways of printing each event
# Standard Print
#data_stream.print()

# Print with a unique ID for each event
#data_stream.map(lambda x: "Processed" + str(uuid.uuid4()) + " : " + x, output_type=Types.STRING()).print()

# Print in a more readable format
data_stream.map(lambda x: "\n" + x, output_type=Types.STRING()).print()

# Execute the Flink pipeline
env.execute("Kafka Source Example")

(PyFlink Kafka Source)

As the same with Java code you will need:

  • Connection information/properties/security, (which I got from my Upstash dashboard and Flink)
  • Topic you set up in Kafka
  • Group Id (consumer group) if you have one
  • Offset — such as KafkaOffsetsInitializer.earliest(). See link for all options
  • Deserializer — this defines how your Kafka message will be deserialized. See link for options

This should get you streaming a Kafka topic in PyFlink. I added a few ways to print the results so it was a readable format and on with a unique Id for each event to help troubleshoot.

Kafka to Flink Integration: The Future Is Bright

As you can see Flink and Kafka can be a powerful solution together. You can easily add Kafka as a source or sink in both Java, Scala, and Python with a few lines of code. The biggest issue I ran into was getting real streaming data to send to Kafka, adding the Kafka jar to Python (not needed for Java), and getting the syntax correct for the builder. Once I overcame that the data poured in. I hope the above code will save you some time in the future.

As far as the Kafka Flink integration future, I think they will both see even further adoption outside of the fortune 100 company list, and we’ll see them even more popular, together and separately. Since they are fault tolerant, have the ability for high throughput, scalable,reliable, and with the added bonus of low latency it really is a match made in heaven. Pairing Kafka and Flink together not only enhances your ability to process large streams of data efficiently but also enables deeper, faster insights into your data, driving immediate and informed decisions across your organization. Embrace this dynamic duo and unlock the full potential of your real-time data streams.

Don’t forget about the Apache mailing list and Flink’s Slack channel to ask questions when you run into any problems. I have found a lot of Flink users have run into similar issues.

What’s next: I will continue to blog about my experiences with Apache Flink.

Feel free to share your comments and ideas! Are you using Kafka with Flink? What are your thoughts?

Here are some resources I used throughout my research:

Related Articles

See The Data Behind Your Data

Start Visualizing
Join Today

Fill out the short form below