Back to blog apache flink and scala

Leveraging the Power of Scala with Apache Flink

apache flink and scala
twitter facebook linkedin

Have you ever felt like your Flink Applications needed something more? Maybe you have wanted to take a more functional approach when writing your ETL Pipelines? Maybe you have noticed the side effects that can occur from passing around mutable data throughout your applications? Today’s topic will help you dip your toes into the world of Apache Flink and Scala to show you how a more functional approach to your streaming pipeline can lead to cleaner code, less side effects, and a better testing plan.

What is Scala?

If you were to travel over to the Scala website, you will find that Scala is an Expressive, Scalable, and Safe language that runs on top of the JVM – This means that it already plays nicely with the existing code that you have written utilizing Flink’s Java API. It is also very popular in the Data Processing space, which you may have already known if you are popular with other data frameworks like Spark.

  • Expressive – “Scala lets you write less to do more. As a high-level language, its modern features increase productivity and lead to more readable code. With Scala, you can combine both functional and object-oriented programming styles to help structure programs.”
  • Scalable – “Scala is well suited to building fast, concurrent, and distributed systems with its JVM, JavaScript and Native runtimes. Scala prioritizes interoperability, giving easy access to many ecosystems of industry-proven libraries.”
  • Safe – “Scala’s static types help you to build safe systems by default. Smart built-in checks and actionable error messages, combined with thread-safe data structures and collections, prevent many tricky bugs before the program first runs.”

Now you may be asking yourself, what exactly do I get out of using Apache Flink and Scala? Why would I choose to add another layer on top of my existing Flink and Java code? As an avid Scala and Java user I can attest that life with Scala can oftentimes lead to much simpler code, a more fluently expressed code structure, and a better developer testing experience.

  • Case Classes – Scala has a new type of class called case class, which is nearly the same as a Java Class, but it has a simplified constructor and is immutable by default. This means you won’t run into a scenario where you pass an instance of a case class into a method and the method modifies the existing object instead of returning a new or different instance. When a method alters an input variable it is called a “side effect” and can often lead to unexpected behavior when writing large stateful applications.
  • Objects – Scala has another new type called object, these are static types which can hold methods, attributes, and other constant values. Objects can also access private values that are held within a class or case class if they are named the same as the class or case class, which makes them powerful tools in extending the functionality of those types.
  • Options – One of the most powerful tools we get access to as a Scala user is the Option type. It is a wrapper for a value type and is seen as the “nullable safety” guard to help us from encountering all of those ugly NullPointerExceptions that we often see when we’re writing anything in Java. We also get the added bonus that options are treated like a “quasi-collection” type, so we can use functional methods on them like filter, map, flatMap, and foreach!
  • Better Collections – If you often handle collections (Array, List, Map) in Java and you want to access the functional methods, you have to include StreamSupport or use the .toStream method (if it’s actually exposed). With Scala, you get these for free!

How do I start using Scala?


If you’re already using Flink with Gradle, then you’re already a majority of the way there! Adding Scala is as simple as adding a few lines to your build.gradle file.

  • The first line you’ll need to add is under the plugins section:
plugins {
    // Place the scala plugin first to enable scala build support
    id 'scala'
    id 'application'
}
  • The second line you’ll need to add is under your dependencies section:
dependencies {
  // Scala Library
  implementation "org.scala-lang:scala-library:2.13.12"

  ... Your Other Project Dependencies ... 
}

Now you’re ready to start writing Scala!

Code Examples

Okay, now that we’re ready to start writing Scala, we’ll need a code example to get us started. Let’s start by writing a simple WordFrequency pipeline. We’ll take a couple of sentences and get the most seen word across all of them that fall within a time window.


First we’ll need to create our case class that we’ll use to represent our WordFrequency data.

// Our case class that will be used to reduce down our words into their counts
case class WordFrequency(word: String, count: Int)

Now we’ll need to set up an object that has a main function, so the JVM knows which place we want to start when running our code. Let’s call it WordFrequencyApp.

object WordFrequencyApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
  }
}

Now that we have our main function with our StreamExecutionEnvironment set up for us, we now need a few sentences to start working around with. This is a perfect job for ChatGPT, so I asked it to “Generate 5 sentences with less than 10 words that have word overlap in them” so that we’ll have some good test data for our use case.

def main(args: Array[String]): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  // Our test sentences produced by ChatGPT
  val sentences = env.fromElements(
    "The cat quietly prowled through the dark alley",
    "Raindrops softly tapped against the windowpane",
    "Birds chirped melodiously in the early morning light",
    "The river gently flowed beneath the old stone bridge",
    "Leaves rustled softly in the autumn breeze"
  )
}

Now that we have our input data, we can go ahead and start writing the transformations that we need in order to get us the result that we need. The transformations we need are:

  • For each sentence, we want to get the entire sentence as lowercase for more accurate word matching
  • Then we want to split each word apart by the space character
  • Transform each word into a WordFrequency instance with a count of 1, so we can aggregate them later
  • Key our WordFrequency instances by the word
  • Reduce each WordFrequency to a singular count (by summing each word frequency)
val wordFrequency = sentences
      // Take our sentences, lowercase them, split them apart by spaces,
      // then collect them out as a WordFrequency with a count of 1
      .flatMap(
        (sentence: String, out: Collector[WordFrequency]) => {
          sentence
            .toLowerCase()
            .split(' ')
            .map(word => WordFrequency(word, count = 1))
            .foreach(out.collect)
        },
        // We need to supply the TypeInformation of the class we're collecting
        TypeInformation.of(classOf[WordFrequency])
      )
      // Key the stream by the word of the WordFrequency case class
      .keyBy((frequency: WordFrequency) => frequency.word)
      // Window the stream into 10 second windows using TumblingProcessingTime
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      // Reduce each WordFrequency down by summing the count attribute with an accumulator
      .reduce((accumulator: WordFrequency, frequency: WordFrequency) =>
        accumulator.copy(count = accumulator.count + frequency.count)
      )

Hooray! You’ve just transformed a list of sentences into aggregated WordFrequency results. Let’s talk about what we have done differently versus how we would have done it in Java.

  • The Flat Map – This operator is fairly close, but you may notice something after we split the sentence into words. The .map is not something that’s provided on a Java Array type, we would have had to store the everything up to the split as a variable, then passed it into StreamSupport or Arrays.stream to unlock the power of the .map operator, but since we’re using Scala we can express it as one fluent chain of operations.
  • The Reduce – This operator is simplified compared to how we would express it in the Java version, the first win we see is that we’re able to pass in a single function to handle the entire ReduceFunction. The second win we see is that we’re using a case class so instead of instantiating a whole new object and passing in every attribute to it, we can use the .copy method on our WordFrequency case class and then only pass in the attribute we are changing, in this case we’re adding the accumulator and current frequency’s count attribute’s together.

Now that we see the differences, we now need to print out the results to our Console, which we can do by executing the pipeline and collecting its results, then printing a string representing our WordFrequency data

wordFrequency
      // Execute the code and collect it to an iterator, then convert it as a Scala Seq
      .executeAndCollect().asScala.toSeq
      // We only want words that were seen more than once, then sort descending by count
      .filter(_.count > 1)
      .sortBy(- _.count)
      .foreach { frequency =>
        Console.println(s"""Word "${frequency.word}" was seen ${frequency.count} times.""")
      }

If you put all of these segments together, and then run the code on your FlinkCluster, you’ll see the following output

Word "the" was seen 7 times.
Word "in" was seen 2 times.
Word "softly" was seen 2 times.

If you wish to see all of the code working together, please feel free to checkout the “scala-example” module inside of my Flink Snippets repo, feel free to open any issues or reach out if you need help understanding any portion of the code.

Summary

I hope that after today’s topic you have a little bit of a better understanding of how to use Apache Flink with Scala. This example is just a minor fraction of what can be done using Scala’s powerful built in functional tools and robust type system. Scala is behind a large portion of the Data Processing that is all around us, and it helps you create more expressive pipelines that lead to fewer side effects. I hope you enjoyed getting your toes wet with Apache Flink and Scala. I look forward to introducing more concepts to you in the future.

Related Articles

See The Data Behind Your Data

Start visualizing
Join Today

Fill out the short form below