March 02, 2024

Apache Flink Deduplication: Key Strategies

Mitch Gray
Mitch Gray
twitter facebook linkedin

Continuing on my Apache Flink Journey it’s time for some real world use cases. Now that I have discussed my initial Flink experiences (See part 1) and setting up a source using Apache Kafka (See part 2), I thought it was best to discuss the T in ETL or ELT. Consider your data transformation a critical stage in any data pipeline; without it, you will have only raw data, which alone may not yield correct or actionable insights. One common task in transformation is the process of deduplication. Below I will outline some common transformation tasks, what and why to dedupe, use cases, and finally some code examples.

Common Types of Data Transformations

Data transformations are vital for maximizing data’s potential and can broken down into some key types:

  • Validation & Cleaning: Removes duplicates, corrects errors, and fills missing values, ensuring data’s accuracy and reliability.
  • Normalization/Formatting: Standardizes formats and values for consistency, crucial for integrating diverse data sources.
  • Enrichment: Enhances data by integrating additional sources, like demographic or weather information, for deeper analysis.
  • Aggregation: Summarizes data (through grouping, counting, averaging) to highlight trends and patterns, making it more manageable.
  • Compliance & Security: Anonymizes sensitive data to meet privacy laws and security policies, protecting personal information.
  • Performance Optimization: Adjusts data structure for better query performance and storage efficiency in the target system.
  • Business Rules Application: Applies specific business rules to align data with current business needs, enhancing its immediate usability.
  • Advanced Analytics/Machine Learning Preparation: Prepares data for advanced analytics and machine learning. Additionally, automating tasks like anomaly detection and enriching data further.

Transformations are critical for improving data quality, ensuring consistency, enriching content, and aligning data with business and analytical needs. Effective transformation, particularly when performed close to the data source, ensures that the data supports accurate and insightful analysis.

What and Why Dedupe?

Deduplication of streaming data is a vital technique in data processing that ensures the uniqueness of records within a dataset. This process is crucial for maintaining data quality and integrity, especially in real-time data processing systems where data comes from various sources and might contain duplicates due to retries, system errors, or inherent characteristics of the data source such as over sampling sensor that produces more data than needed from business perspective

Implementing deduplication involves identifying and removing duplicate records from your data streams, some typical ways might include leveraging unique identifiers, timestamps, or implementing custom logic by storing state on different fields or columns. You can achieve even more advanced deduplication can through various techniques such as hash-based deduplication, sequence numbering, or leveraging features of stream processing frameworks like Apache Flink or Apache Kafka’s exactly-once semantics.

Here are several reasons why you should consider using deduplication in your streaming data

  • Improve Data Quality
  • Reduce Storage Costs
  • Enhance Processing Speed
  • Ensure Consistent Data for Analytics
  • Support Compliance and Data Governance
  • Improve User Experience
  • Accurate Machine Learning Models

Use Cases

Deduplication is essential across various fields for ensuring data integrity and efficiency, especially in real-time processing. Here’s a condensed look at where it makes a big difference:

  1. Real-time Analytics: It ensures every event, like website traffic or sensor data, is counted once, enabling accurate trend and anomaly detection.
  2. E-commerce: Deduplication provides precise metrics on user actions, aiding in inventory management and personalized recommendations.
  3. Financial Services: It’s key for transaction accuracy, preventing issues like double-charging and maintaining trust and compliance.
  4. Log Aggregation: Ensures each log entry or error is recorded once, crucial for diagnosing issues and system health.
  5. Messaging Systems: Guarantees messages are delivered uniquely, maintaining order and integrity, critical in order processing and notification services.
  6. Data Lakes: Helps keep data repositories clean for accurate analytics, machine learning, and visualizations.
  7. IoT: Removes duplicate sensor data, enabling accurate real-time monitoring and decision-making.
  8. CRM: Prevents multiple profiles for the same customer, ensuring accurate customer information and service.

By streamlining data processing and maintaining data uniqueness, deduplication enhances system reliability and user experience, supporting informed decision-making and a competitive edge.

Best Practices for Deduplication in Flink

Implementing deduplication in Apache Flink requires careful consideration of your data characteristics, volume, and processing requirements. By following these best practices, you can design an efficient and scalable deduplication strategy that ensures high-quality data processing in your Flink applications.

  • State Management: Efficiently manage state to prevent memory bloat. Consider using state time-to-live (TTL) configurations to automatically purge old entries.
  • Use Flink’s Stateful Operations: Leverage Flink’s stateful operators to keep track of seen events. You can use keyed state or operator state to store identifiers of processed events, thereby preventing the processing of duplicate events.
  • Scaling Considerations: Design your deduplication strategy with scalability in mind. Ensure that your approach can handle increasing volumes of data without compromising performance.
  • Monitoring and Maintenance: Regularly monitor the performance of your deduplication logic. Make the right preparations to adjust parameters and strategies as your data patterns evolve.
  • Exploit Time Windows: In many use cases, deduplication can be scoped within a certain time window because events are only considered duplicates if they occur close together in time. Using Flink’s window functions can help manage and reduce the state size by only keeping track of events within a specific window.
  • Consistent Checkpoints for Fault Tolerance: Ensure that your deduplication logic works well with Flink’s checkpointing mechanism to provide fault tolerance. State used for deduplication should be consistently checkpointed to prevent data loss or incorrect deduplication after a failure.

Stateful, Windowing, Time-based

Apache Flink’s capabilities in managing state, handling time, and windowing are fundamental to building sophisticated, real-time data processing applications and are needed to accurately accomplish deduplication in the real world.

Windowing

Apache Flink’s Windowing is a powerful feature that allows developers to group events into windows for aggregated analysis. Windows can be defined based on time (e.g., processing time, event time), count, or even custom triggers. This flexibility enables a wide range of use cases, from simple aggregations (like summing values over a 10-minute window) to complex event pattern recognition.

Flink supports various window types, including tumbling, sliding, and session windows, each serving different analytical needs. Tumbling windows are fixed-sized, non-overlapping windows ideal for discrete, non-continuous analyses. Sliding windows overlap and are perfect for continuous analysis over a period. Session windows dynamically adjust based on data patterns, making them suitable for user session analysis.

Time-based

In the world of stream processing, accurately handling time is crucial, especially given the out-of-order nature of real-world data streams. Flink’s event time processing capabilities ensure that events are processed based on when they actually occurred, rather than when they were received. This distinction is critical for accurate windowing and time-based computations.

Flink’s Watermark mechanism plays a pivotal role here, allowing the system to handle late-arriving data gracefully. You can use watermarks to signal the progress of time in the data stream, enabling Flink to know when it’s safe to perform computations that depend on time.

State

Stateful operations are at the heart of Flink’s ability to process unbounded streams of data. Unlike stateless operations, which treat each incoming data item in isolation, stateful operations remember information about past events. This capability enables Flink to perform sophisticated processing like deduplication, pattern detection, and machine learning scoring.

Flink offers rich state management APIs that developers can use to manage custom state, including keyed state and operator state. Keyed state is useful for scenarios where state needs to be maintained per key (e.g., counting events per user), while operator state is more suited for operator-wide state (e.g., a global count).

Implementing Deduplication in Apache Flink

Java

I decided to give a python example because of the shortage of python ones out there I will link to a git repo with a java example

Python

This Python example will generate random numbers to stream from as the data source. Assigning a timestamps and watermark for each event. Any duplicate within a 5 second duration will be removed and print out the results.

import random
import time
import math
import typing
import json

from pyflink.common import WatermarkStrategy, Duration, Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment, KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor, ValueState
class DeduplicationOperator(KeyedProcessFunction):
def __init__(self, time_range: int):
self.unique_event: typing.Union[None, ValueState] = None
self.time_range = time_range

def open(self, runtime_context: RuntimeContext):
self.unique_event = (
runtime_context
.get_state(ValueStateDescriptor("UniqueEvent", Types.PICKLED_BYTE_ARRAY()))
)

def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
if self.unique_event.value() is None:
self.unique_event.update(value)
ctx.timer_service().register_event_time_timer(ctx.timestamp() + self.time_range)

def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
if self.unique_event.value() is not None:
yield self.unique_event.value()
self.unique_event.clear()

def main():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
rand = random.Random()

random_numbers = [
(rand.randint(1, 10), math.floor(index + (time.time() * 1000)))
for index in range(10000)
]

random_source = env.from_collection(random_numbers)
random_examples = random_source.map(lambda event: ExampleEvent(event[0], event[1]))
watermarked_examples = random_examples.assign_timestamps_and_watermarks(
WatermarkStrategy
.for_bounded_out_of_orderness(Duration.of_seconds(5))
.with_timestamp_assigner(ExampleEventTimestampAssigner())
)

unique_events = (
watermarked_examples
.key_by(lambda event: event.id)
.process(DeduplicationOperator(10000))
)

values = unique_events.execute_and_collect()
for i in values:
print(i)

if __name__ == '__main__':
main()


Python using Table API

Even though there were plenty of examples out there on using SQL to remove duplicates I decided to try it out. This code will read from an input csv and remove duplicates based on a column / columns and first or last occurrence in the csv. Ordering by ASC means keeping the first row, ordering by DESC means keeping the last row. Once duplicates are removed the output will be saved in a new csv. This uses the Flink Table API and SQL. Below is a working code example:

Add dependencies:

from pyflink.table import TableEnvironment, EnvironmentSettings

Create your table environment and define input and output files

# Create Table Environment
t_env = TableEnvironment.create(environment_settings=EnvironmentSettings.in_batch_mode())

# Define the source and output paths
input_path = '/path/input.csv' # Define your input path and csv here
output_path = '/path/output.csv' # Define your output path and csv here

Create your source table from CSV defining the column names and types that match your input csv. Then execute it to create the source table.

# Create Source table from CSV and Define the columns names and types

source_ddl = f"""
create table source_table (
column1 INT,
column2 STRING,
column3 BOOLEAN,
column4 INT,
column5 BIGINT
) with (
'connector' = 'filesystem',
'path' = '{input_path}',
'format' = 'csv'
)

#Execute to create the source table
t_env.execute_sql(source_ddl)

Remove duplicates via SQL Query.

# SQL code to remove duplicates
# Change column names to match your table
# duplicateSourceColumn will be the column that you use to consider if its a duplicate or not. You can add more then one column
# By doing order by id you are choosing the first row that came and removing any row after that matches the column

result = t_env.sql_query("SELECT column1, column2, column3, column4, column5 FROM (select column1, column2, column3, column4, column5, ROW_NUMBER() OVER(PARTITION BY duplicateSourceColumn ORDER BY id) rn from source_table) t where rn = 1")

Create the sink table for your output, execute it to create it, followed by inserting the data.

# Create the Sink Table
# Change column names to match your output table
# If you have less / different columns then the source csv make sure you change the above select statement to only select the correct columns you want in your output csv

sink_ddl = f"""
create table sink_table (
column1 INT,
column2 STRING,
column3 BOOLEAN,
column4 INT,
column5 BIGINT
) with (
'connector' = 'filesystem',
'path' = '{output_path}',
'format' = 'csv'
)
"""
# Execute to create new CSV
t_env.execute_sql(sink_ddl)

# Add rows
result.execute_insert("sink_table").wait()

Conclusion

Wrapping things up, I can’t stress enough how crucial deduplication is when it comes to getting streaming data ready for the big leagues — analysis, storage, and those all-important decision-making moments. It’s like giving your data a quality check, trimming down unnecessary expenses, and putting a turbo booster on your data processing engines. Honestly, deduplication can either be a walk in the park or a bit of a puzzle, depending entirely on the quirks of your data, how time-sensitive your tasks are, and what you specifically need to achieve. I’ve navigated through these waters myself, and I’m hopeful that sharing my journey will make your future adventures into the world of dedupe transformations a little less daunting and a lot more exciting.

Don’t forget about the Apache mailing list and Flink’s Slack channel to ask questions when you run into any problems. I have found a lot of Flink users have run into similar issues.

Feel free to share your comments and ideas! Are you using Kafka with Flink? What are your thoughts?

Find here some resources I used throughout my research:

Related Articles

See The Data Behind Your Data

Start Visualizing
Join Today

Fill out the short form below