Apache Flink Deduplication: Key Strategies
Continuing on my Apache Flink Journey it’s time for some real world use cases. Now that I have
Have you ever felt like your Flink Applications needed something more? Maybe you have wanted to take a more functional approach when writing your ETL Pipelines? Maybe you have noticed the side effects that can occur from passing around mutable data throughout your applications? Today’s topic will help you dip your toes into the world of Apache Flink and Scala to show you how a more functional approach to your streaming pipeline can lead to cleaner code, less side effects, and a better testing plan.
If you were to travel over to the Scala website, you will find that Scala is an Expressive, Scalable, and Safe language that runs on top of the JVM – This means that it already plays nicely with the existing code that you have written utilizing Flink’s Java API. It is also very popular in the Data Processing space, which you may have already known if you are popular with other data frameworks like Spark.
Now you may be asking yourself, what exactly do I get out of using Apache Flink and Scala? Why would I choose to add another layer on top of my existing Flink and Java code? As an avid Scala and Java user I can attest that life with Scala can oftentimes lead to much simpler code, a more fluently expressed code structure, and a better developer testing experience.
If you’re already using Flink with Gradle, then you’re already a majority of the way there! Adding Scala is as simple as adding a few lines to your build.gradle file.
plugins {
// Place the scala plugin first to enable scala build support
id 'scala'
id 'application'
}
dependencies {
// Scala Library
implementation "org.scala-lang:scala-library:2.13.12"
... Your Other Project Dependencies ...
}
Now you’re ready to start writing Scala!
Okay, now that we’re ready to start writing Scala, we’ll need a code example to get us started. Let’s start by writing a simple WordFrequency pipeline. We’ll take a couple of sentences and get the most seen word across all of them that fall within a time window.
First we’ll need to create our case class that we’ll use to represent our WordFrequency data.
// Our case class that will be used to reduce down our words into their counts
case class WordFrequency(word: String, count: Int)
Now we’ll need to set up an object that has a main function, so the JVM knows which place we want to start when running our code. Let’s call it WordFrequencyApp.
object WordFrequencyApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
}
}
Now that we have our main function with our StreamExecutionEnvironment set up for us, we now need a few sentences to start working around with. This is a perfect job for ChatGPT, so I asked it to “Generate 5 sentences with less than 10 words that have word overlap in them” so that we’ll have some good test data for our use case.
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Our test sentences produced by ChatGPT
val sentences = env.fromElements(
"The cat quietly prowled through the dark alley",
"Raindrops softly tapped against the windowpane",
"Birds chirped melodiously in the early morning light",
"The river gently flowed beneath the old stone bridge",
"Leaves rustled softly in the autumn breeze"
)
}
Now that we have our input data, we can go ahead and start writing the transformations that we need in order to get us the result that we need. The transformations we need are:
val wordFrequency = sentences
// Take our sentences, lowercase them, split them apart by spaces,
// then collect them out as a WordFrequency with a count of 1
.flatMap(
(sentence: String, out: Collector[WordFrequency]) => {
sentence
.toLowerCase()
.split(' ')
.map(word => WordFrequency(word, count = 1))
.foreach(out.collect)
},
// We need to supply the TypeInformation of the class we're collecting
TypeInformation.of(classOf[WordFrequency])
)
// Key the stream by the word of the WordFrequency case class
.keyBy((frequency: WordFrequency) => frequency.word)
// Window the stream into 10 second windows using TumblingProcessingTime
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
// Reduce each WordFrequency down by summing the count attribute with an accumulator
.reduce((accumulator: WordFrequency, frequency: WordFrequency) =>
accumulator.copy(count = accumulator.count + frequency.count)
)
Hooray! You’ve just transformed a list of sentences into aggregated WordFrequency results. Let’s talk about what we have done differently versus how we would have done it in Java.
Now that we see the differences, we now need to print out the results to our Console, which we can do by executing the pipeline and collecting its results, then printing a string representing our WordFrequency data
wordFrequency
// Execute the code and collect it to an iterator, then convert it as a Scala Seq
.executeAndCollect().asScala.toSeq
// We only want words that were seen more than once, then sort descending by count
.filter(_.count > 1)
.sortBy(- _.count)
.foreach { frequency =>
Console.println(s"""Word "${frequency.word}" was seen ${frequency.count} times.""")
}
If you put all of these segments together, and then run the code on your FlinkCluster, you’ll see the following output
Word "the" was seen 7 times.
Word "in" was seen 2 times.
Word "softly" was seen 2 times.
If you wish to see all of the code working together, please feel free to checkout the “scala-example” module inside of my Flink Snippets repo, feel free to open any issues or reach out if you need help understanding any portion of the code.
I hope that after today’s topic you have a little bit of a better understanding of how to use Apache Flink with Scala. This example is just a minor fraction of what can be done using Scala’s powerful built in functional tools and robust type system. Scala is behind a large portion of the Data Processing that is all around us, and it helps you create more expressive pipelines that lead to fewer side effects. I hope you enjoyed getting your toes wet with Apache Flink and Scala. I look forward to introducing more concepts to you in the future.
Continuing on my Apache Flink Journey it’s time for some real world use cases. Now that I have
In this follow-up article (see part 1), building on my initial explorations with Apache Flink, I aim to dive into
In this article, I will recount my initial foray into Apache Flink, shedding light on my background, first impressions,
Fill out the short form below