Kafka to Flink Integration for the First-Time: Actionable Insights
In this follow-up article (see part 1), building on my initial explorations with Apache Flink, I aim to dive into
Continuing on my Apache Flink Journey it’s time for some real world use cases. Now that I have discussed my initial Flink experiences (See part 1) and setting up a source using Apache Kafka (See part 2), I thought it was best to discuss the T in ETL or ELT. Consider your data transformation a critical stage in any data pipeline; without it, you will have only raw data, which alone may not yield correct or actionable insights. One common task in transformation is the process of deduplication. Below I will outline some common transformation tasks, what and why to dedupe, use cases, and finally some code examples.
Data transformations are vital for maximizing data’s potential and can broken down into some key types:
Transformations are critical for improving data quality, ensuring consistency, enriching content, and aligning data with business and analytical needs. Effective transformation, particularly when performed close to the data source, ensures that the data supports accurate and insightful analysis.
Deduplication of streaming data is a vital technique in data processing that ensures the uniqueness of records within a dataset. This process is crucial for maintaining data quality and integrity, especially in real-time data processing systems where data comes from various sources and might contain duplicates due to retries, system errors, or inherent characteristics of the data source such as over sampling sensor that produces more data than needed from business perspective
Implementing deduplication involves identifying and removing duplicate records from your data streams, some typical ways might include leveraging unique identifiers, timestamps, or implementing custom logic by storing state on different fields or columns. You can achieve even more advanced deduplication can through various techniques such as hash-based deduplication, sequence numbering, or leveraging features of stream processing frameworks like Apache Flink or Apache Kafka’s exactly-once semantics.
Here are several reasons why you should consider using deduplication in your streaming data
Deduplication is essential across various fields for ensuring data integrity and efficiency, especially in real-time processing. Here’s a condensed look at where it makes a big difference:
By streamlining data processing and maintaining data uniqueness, deduplication enhances system reliability and user experience, supporting informed decision-making and a competitive edge.
Implementing deduplication in Apache Flink requires careful consideration of your data characteristics, volume, and processing requirements. By following these best practices, you can design an efficient and scalable deduplication strategy that ensures high-quality data processing in your Flink applications.
Apache Flink’s capabilities in managing state, handling time, and windowing are fundamental to building sophisticated, real-time data processing applications and are needed to accurately accomplish deduplication in the real world.
Apache Flink’s Windowing is a powerful feature that allows developers to group events into windows for aggregated analysis. Windows can be defined based on time (e.g., processing time, event time), count, or even custom triggers. This flexibility enables a wide range of use cases, from simple aggregations (like summing values over a 10-minute window) to complex event pattern recognition.
Flink supports various window types, including tumbling, sliding, and session windows, each serving different analytical needs. Tumbling windows are fixed-sized, non-overlapping windows ideal for discrete, non-continuous analyses. Sliding windows overlap and are perfect for continuous analysis over a period. Session windows dynamically adjust based on data patterns, making them suitable for user session analysis.
In the world of stream processing, accurately handling time is crucial, especially given the out-of-order nature of real-world data streams. Flink’s event time processing capabilities ensure that events are processed based on when they actually occurred, rather than when they were received. This distinction is critical for accurate windowing and time-based computations.
Flink’s Watermark mechanism plays a pivotal role here, allowing the system to handle late-arriving data gracefully. You can use watermarks to signal the progress of time in the data stream, enabling Flink to know when it’s safe to perform computations that depend on time.
Stateful operations are at the heart of Flink’s ability to process unbounded streams of data. Unlike stateless operations, which treat each incoming data item in isolation, stateful operations remember information about past events. This capability enables Flink to perform sophisticated processing like deduplication, pattern detection, and machine learning scoring.
Flink offers rich state management APIs that developers can use to manage custom state, including keyed state and operator state. Keyed state is useful for scenarios where state needs to be maintained per key (e.g., counting events per user), while operator state is more suited for operator-wide state (e.g., a global count).
I decided to give a python example because of the shortage of python ones out there I will link to a git repo with a java example
This Python example will generate random numbers to stream from as the data source. Assigning a timestamps and watermark for each event. Any duplicate within a 5 second duration will be removed and print out the results.
import random
import time
import math
import typing
import json
from pyflink.common import WatermarkStrategy, Duration, Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment, KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor, ValueState
class DeduplicationOperator(KeyedProcessFunction):
def __init__(self, time_range: int):
self.unique_event: typing.Union[None, ValueState] = None
self.time_range = time_range
def open(self, runtime_context: RuntimeContext):
self.unique_event = (
runtime_context
.get_state(ValueStateDescriptor("UniqueEvent", Types.PICKLED_BYTE_ARRAY()))
)
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
if self.unique_event.value() is None:
self.unique_event.update(value)
ctx.timer_service().register_event_time_timer(ctx.timestamp() + self.time_range)
def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
if self.unique_event.value() is not None:
yield self.unique_event.value()
self.unique_event.clear()
def main():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
rand = random.Random()
random_numbers = [
(rand.randint(1, 10), math.floor(index + (time.time() * 1000)))
for index in range(10000)
]
random_source = env.from_collection(random_numbers)
random_examples = random_source.map(lambda event: ExampleEvent(event[0], event[1]))
watermarked_examples = random_examples.assign_timestamps_and_watermarks(
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(5))
.with_timestamp_assigner(ExampleEventTimestampAssigner())
)
unique_events = (
watermarked_examples
.key_by(lambda event: event.id)
.process(DeduplicationOperator(10000))
)
values = unique_events.execute_and_collect()
for i in values:
print(i)
if __name__ == '__main__':
main()
Even though there were plenty of examples out there on using SQL to remove duplicates I decided to try it out. This code will read from an input csv and remove duplicates based on a column / columns and first or last occurrence in the csv. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row. Once duplicates are removed the output will be saved in a new csv. This uses the Flink Table API and SQL. Below is a working code example:
Add dependencies:
from pyflink.table import TableEnvironment, EnvironmentSettings
Create your table environment and define input and output files
# Create Table Environment
t_env = TableEnvironment.create(environment_settings=EnvironmentSettings.in_batch_mode())
# Define the source and output paths
input_path = '/path/input.csv' # Define your input path and csv here
output_path = '/path/output.csv' # Define your output path and csv here
Create your source table from CSV defining the column names and types that match your input csv. Then execute it to create the source table.
# Create Source table from CSV and Define the columns names and types
source_ddl = f"""
create table source_table (
column1 INT,
column2 STRING,
column3 BOOLEAN,
column4 INT,
column5 BIGINT
) with (
'connector' = 'filesystem',
'path' = '{input_path}',
'format' = 'csv'
)
#Execute to create the source table
t_env.execute_sql(source_ddl)
Remove duplicates via SQL Query.
# SQL code to remove duplicates
# Change column names to match your table
# duplicateSourceColumn will be the column that you use to consider if its a duplicate or not. You can add more then one column
# By doing order by id you are choosing the first row that came and removing any row after that matches the column
result = t_env.sql_query("SELECT column1, column2, column3, column4, column5 FROM (select column1, column2, column3, column4, column5, ROW_NUMBER() OVER(PARTITION BY duplicateSourceColumn ORDER BY id) rn from source_table) t where rn = 1")
Create the sink table for your output, execute it to create it, followed by inserting the data.
# Create the Sink Table
# Change column names to match your output table
# If you have less / different columns then the source csv make sure you change the above select statement to only select the correct columns you want in your output csv
sink_ddl = f"""
create table sink_table (
column1 INT,
column2 STRING,
column3 BOOLEAN,
column4 INT,
column5 BIGINT
) with (
'connector' = 'filesystem',
'path' = '{output_path}',
'format' = 'csv'
)
"""
# Execute to create new CSV
t_env.execute_sql(sink_ddl)
# Add rows
result.execute_insert("sink_table").wait()
Wrapping things up, I can’t stress enough how crucial deduplication is when it comes to getting streaming data ready for the big leagues — analysis, storage, and those all-important decision-making moments. It’s like giving your data a quality check, trimming down unnecessary expenses, and putting a turbo booster on your data processing engines. Honestly, deduplication can either be a walk in the park or a bit of a puzzle, depending entirely on the quirks of your data, how time-sensitive your tasks are, and what you specifically need to achieve. I’ve navigated through these waters myself, and I’m hopeful that sharing my journey will make your future adventures into the world of dedupe transformations a little less daunting and a lot more exciting.
Don’t forget about the Apache mailing list and Flink’s Slack channel to ask questions when you run into any problems. I have found a lot of Flink users have run into similar issues.
Feel free to share your comments and ideas! Are you using Kafka with Flink? What are your thoughts?
Find here some resources I used throughout my research:
In this follow-up article (see part 1), building on my initial explorations with Apache Flink, I aim to dive into
In this article, I will recount my initial foray into Apache Flink, shedding light on my background, first impressions,
Have you ever felt like your Flink Applications needed something more? Maybe you have wanted to take a
Fill out the short form below