April 24, 2024

Apache Flink Watermarks, Event Time Processing, Windowing & You!

Colten Pilgreen
Colten Pilgreen
twitter facebook linkedin

So, you’re using Flink! Congratulations on taking a step into the future of real time data processing, you’re going to love it, and we’re glad you’re here! Today we’re going to shine a little light on the pros and woes that come along with using Apache Flink watermarks & event time to process real time data into Windows. These concepts are considered foundational and will jump start your jobs into robust, accurate, and decisionable pipelines.

Before we jump into the pros and woes, we need to get started by providing a little context and definitions for what these concepts are and why they matter to us.

When you are processing unbounded data sources (meaning there is theoretically no start nor end to the incoming messages) we need to provide some context about where we are within that stream of unbounded data. This is where Apache Flink watermarks come in! A watermark is the highest timestamp that has been seen by a Flink job. A good way to visualise this is to think of a tube of water, the water level (watermark) corresponds to the largest timestamp seen by your Flink job so far.

Event Time Processing

When you’re processing data with Flink, there are two types of time characteristics: Processing Time and Event Time. Processing Time refers to setting the watermark against the current clock time, so every new second the watermark is increased. Whereas when you’re dealing with Event Time you’re using a timestamp held inside of the event you’re processing to increase the watermark. Which means if you set a timer, window events, or have TTLs set for state, none of the acts will fire until a new event comes in that increases the watermark past the trigger time.

Windowing

The concepts of Windows are fairly simple, you can break up your stream of data into smaller chunks, collect the data into those time (or session) chunks and then process the data held in the window once the watermark has passed the end of the window. The concepts of Flink windows are very similar to those that you will find in batch data engines (like SQL or Spark), but the nuance here is that the data is not known to the window operator at the time of job graph execution.

Now that we’ve gotten to know the concepts a little better, now we can jump into our use case and work through a scenario of when we would use the concepts to process our data. Let’s start off with defining the data that we will be processing!

For our source data, we will be collecting information about Traffic Lights (Blinking Robots 🤖🚦if you’re from South Africa) and we want to see how many times the lights changed for every intersection in a 5 minute window. The schema is defined as the following attributes:

  • intersectionId: UUID
  • eventId: UUID
  • timestamp: Long (Unix Epoch Milliseconds of when the phase changed)
  • phase: Enum Int (The phase the intersection lights are currently in, ranging from 0 to 8 depending on which traffic is allowed to go and if turns are protected. The sequence of events should cycle between 1-8 and to 0 if all traffic should stop for emergency vehicles).

For the metrics we will be collecting about our Traffic Light phases, we’ll want to collect how many times the lights at an intersection changed phases, and then also collect how many times an emergency phase occurred at the intersection. The schema for the output metrics is defined as the following attributes:

  • intersectionId: UUID
  • windowStart: Long – The UNIX epoch in milliseconds of when the window started
  • windowEnd: Long – The UNIX epoch in milliseconds of when the window ended
  • phases: Integer – The total amount of phase changes that occurred in the window, including the emergency phases
  • emergencyPhases: Integer – The amount of times the emergency phase (0) occurred in the window

The Traffic Lights phases last 25 seconds, so for a time window of 5 minutes we should expect to see roughly ~12 phases occur in our metrics processed by each window. Let’s dive in and start coding our Flink Job to process our sweet sweet Traffic Light metrics!
First, we need to define our schemas as code – we’ll need our TrafficLightPhaseEvent and PhaseChangeMetric classes up and running:

public class TrafficLightPhaseEvent {
    public final UUID intersectionId;
    public final UUID eventId;
    public final Long eventTimestamp;
    public final Integer phase;

    public TrafficLightPhaseEvent(UUID intersectionId, UUID eventId, Long eventTimestamp, Integer phase) {
        this.intersectionId = intersectionId;
        this.eventId = eventId;
        this.eventTimestamp = eventTimestamp;
        this.phase = phase;
    }
}
public class PhaseChangeMetric {
  public final UUID intersectionId;
  public final Long windowStart;
  public final Long windowEnd;
  public final Integer phases;
  public final Integer emergencyPhases;

  public PhaseChangeMetric(UUID intersectionId, Integer phases, Integer emergencyPhases) {
    this.intersectionId = intersectionId;
    this.windowStart = Long.MIN_VALUE;
    this.windowEnd = Long.MAX_VALUE;
    this.phases = phases;
    this.emergencyPhases = emergencyPhases;
  }

  public PhaseChangeMetric(UUID intersectionId, Long windowStart, Long windowEnd, Integer phases, Integer emergencyPhases) {
    this.intersectionId = intersectionId;
    this.windowStart = windowStart;
    this.windowEnd = windowEnd;
    this.phases = phases;
    this.emergencyPhases = emergencyPhases;
  }
}

Now that we’ve gotten that out of the way, we can now start writing our actual logic for the data flowing into and out of our Flink application. The logical flow for the data should follow this happy path:

  • We receive our TrafficLightPhaseEvent from a data source (for this example, we will be implementing a GeneratorFunction that Flink provides as part of its datagen connector)
  • Watermark the stream by the timestamp attribute
  • Take these events and then map (transform) them to an unaggregated PhaseChangeMetric object
  • Take our DataStream of PhaseChangeMetric and keyBy the intersectionId attribute so that our windows only produce data for a single intersection
  • Window our KeyedDataStream into 5 minute TumblingEventTime windows, and then apply a reduce function to aggregate the PhaseChangeMetric objects into a single output per window
  • Apply a window function to the aggregated PhaseChangeMetric objects to get the window’s start and end timestamp
  • Convert our PhaseChangeMetric objects into JSON and then sink our metrics to a FileSink

I will be skipping the implementation of TrafficLightPhaseEventGenerator, but as always, you can see the implementation by looking at the linked GitHub repository. So we’ll get started with our reduce and windowing functions. The first is the reducer function, PhaseChangeMetricReducer.

public class PhaseChangeMetricReducer implements ReduceFunction<PhaseChangeMetric> {
  @Override
  public PhaseChangeMetric reduce(PhaseChangeMetric accumulator, PhaseChangeMetric metric) {
    return new PhaseChangeMetric(
        accumulator.intersectionId,
        accumulator.phases + metric.phases,
        accumulator.emergencyPhases + metric.emergencyPhases
    );
  }
}

As you can see, the reduce function is doing a fairly simple operation, it takes an accumulator (the first object seen for a window) and another metric, sums together the phases and emergencyPhases attributes and returns a new PhaseChangeMetric which will be used as the accumulator for the next event. This reduce function will be called on all new events that make it into a window until there are none left. At that point the output of the window will be passed to our next function, the PhaseChangeMetricWindowFunction.

public class PhaseChangeMetricWindowFunction extends ProcessWindowFunction<PhaseChangeMetric, PhaseChangeMetric, UUID, TimeWindow> {
  @Override
  public void process(UUID uuid,
                      ProcessWindowFunction<PhaseChangeMetric, PhaseChangeMetric, UUID, TimeWindow>.Context context,
                      Iterable<PhaseChangeMetric> metrics,
                      Collector<PhaseChangeMetric> out) {
    for (PhaseChangeMetric metric : metrics) {
      out.collect(new PhaseChangeMetric(
          metric.intersectionId,
          context.window().getStart(),
          context.window().getEnd(),
          metric.phases,
          metric.emergencyPhases
      ));
    }
  }
}

This function extends a ProcessWindowFunction. This function is called once the watermark has passed a window’s end timestamp and every window has been evaluated. This function has access to the context about the window itself, so this is where we grab the windowStart and windowEnd attributes that are sent downstream to our FileSink.
The last function that we need to define is our Jsonifer, so that we can write the output metrics to the FileSink. It is a simple process function that accepts a single event and then uses Jackson’s ObjectMapper to write the value out as a JSON string. This function is generic, which means we could technically use it for any input type if we wanted to extend this job later on down the road.

public class Jsonifier<T> extends ProcessFunction<T, String> {
  public ObjectMapper mapper;

  @Override
  public void open(Configuration parameters) {
    mapper = new ObjectMapper();
  }

  @Override
  public void processElement(T event, ProcessFunction<T, String>.Context ctx, Collector<String> out) throws Exception {
    out.collect(mapper.writeValueAsString(event));
  }
}

Now that we have all of our pieces, let’s put them together to form our JobGraph!

public static void runFlow() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    configureCheckpoints(env);

    DataStream<TrafficLightPhaseEvent> intersectionEvents = TrafficLightPhaseEventGenerator.toSource(
        env,
        WatermarkStrategy.
            <TrafficLightPhaseEvent>forBoundedOutOfOrderness(Duration.ZERO)
            .withTimestampAssigner(
                (SerializableTimestampAssigner<TrafficLightPhaseEvent>)
                (intersectionEvent, timestamp) -> intersectionEvent.eventTimestamp
            )
    );

    intersectionEvents
        .map(event -> new PhaseChangeMetric(
            event.intersectionId,
            1,
            event.phase == 0 ? 1 : 0
        ))
        .name("IntersectionEvent-to-PhaseChangeMetric")
        .keyBy(event -> event.intersectionId)
        .window(TumblingEventTimeWindows.of(Time.minutes(5)))
        .reduce(new PhaseChangeMetricReducer(), new PhaseChangeMetricWindowFunction())
        .process(new Jsonifier<>())
        .name("PhaseChangeMetric-to-Json")
        .sinkTo(
            FileSink
                .forRowFormat(
                    new Path("/tmp/phase-change-metrics"),
                    new SimpleStringEncoder<String>("utf-8")
                )
                .build()
        );

    env.execute("TrafficLightMetrics");
}

If we refer back to our happy path we defined, we can see that the JobGraph matches up with the steps we defined. Now all that’s left to do is to submit our job to a Flink Cluster and then start seeing the data flow in! I’m assuming that you’re already familiar with that step, but if you’re not you can refer to the Flink docs on how to submit a job to a cluster.

Now, we have our metrics! We’ve successfully written a Flink app that takes the TrafficLightPhaseEvents, computes, and then writes out our metrics. We can see the window’s start and end has a duration of 5 minutes (300,000 milliseconds), we have an emergency phase in one of our windows, but wait, something doesn’t seem quite correct. If our phases have a duration of 25 seconds, then we should expect to see around 20 phases per window. Hmm, what could it be?

{"intersectionId":"...","windowStart":1713110100000,"windowEnd":1713110400000,"phases":7,"emergencyPhases":0}
{"intersectionId":"...","windowStart":1713110400000,"windowEnd":1713110700000,"phases":9,"emergencyPhases":1}
{"intersectionId":"...","windowStart":1713110700000,"windowEnd":1713111000000,"phases":8,"emergencyPhases":0}
{"intersectionId":"...","windowStart":1713111000000,"windowEnd":1713111300000,"phases":9,"emergencyPhases":0}
{"intersectionId":"...","windowStart":1713111300000,"windowEnd":1713111600000,"phases":8,"emergencyPhases":0}
{"intersectionId":"...","windowStart":1713111600000,"windowEnd":1713111900000,"phases":7,"emergencyPhases":0}
{"intersectionId":"...","windowStart":1713111900000,"windowEnd":1713112200000,"phases":9,"emergencyPhases":0}
{"intersectionId":"...","windowStart":1713112200000,"windowEnd":1713112500000,"phases":8,"emergencyPhases":0}
{"intersectionId":"...","windowStart":1713112500000,"windowEnd":1713112800000,"phases":9,"emergencyPhases":0}

Investigation

This is where a tool like Datorios comes in quite handy, it allows us to see more information about what’s going on in our JobGraph than the information that we get out of the box from Flink. Let’s go open up our Job Analysation tool and see if we can spot any errors to our logic.

Alright, now that we’ve learned a little more about how to peer into Flink’s Black Box using Datorios, let’s see if our findings were correct. Let’s open back up our runFlow method and correct our Watermarking Strategy.

    DataStream<TrafficLightPhaseEvent> intersectionEvents = TrafficLightPhaseEventGenerator.toSource(
        env,
        WatermarkStrategy
            .<TrafficLightPhaseEvent>forBoundedOutOfOrderness(Duration.ZERO)
            .withTimestampAssigner(
                (SerializableTimestampAssigner<TrafficLightPhaseEvent>)
                (intersectionEvent, timestamp) -> intersectionEvent.eventTimestamp
            )
    );

There we go, there’s our problem. We had our WatermarkStrategy set to have an OutOfOrderness value of zero milliseconds. If we correct our forBoundedOutOfOrderness method call to something a little higher, then maybe we’ll get the results we want to see. Let’s first correct the code to the following below, and then re-run our Flink app and check the outputs.

.<TrafficLightPhaseEvent>forBoundedOutOfOrderness(Duration.ofMinutes(10))
{"intersectionId":"...","windowStart":1713330000000,"windowEnd":1713330300000,"phases":12,"emergencyPhases":1}
{"intersectionId":"...","windowStart":1713330300000,"windowEnd":1713330600000,"phases":12,"emergencyPhases":0}
{"intersectionId":"...","windowStart":1713330600000,"windowEnd":1713330900000,"phases":12,"emergencyPhases":0}
{"intersectionId":"...","windowStart":1713330900000,"windowEnd":1713331200000,"phases":12,"emergencyPhases":0}
{"intersectionId":"...","windowStart":1713331200000,"windowEnd":1713331500000,"phases":12,"emergencyPhases":0}
{"intersectionId":"...","windowStart":1713331500000,"windowEnd":1713331800000,"phases":12,"emergencyPhases":2}

Yeeep! That’s the output we expect, so it does look like we were correct and assuming that our WatermarkStrategy was the problem. So why was the observed behaviour?

Explanation

Well, since we are using a WatermarkStrategy with a TimestampAssigner, our watermark is advancing every time a higher timestamp is ingested into our Flink job (there is also a configurable Flink setting that tells the job how often to re-evaluate the watermark, by default it is every 200 milliseconds, this value is in Processing Time).

Since we specified a BoundedOutOfOrderness of zero milliseconds, that means the watermark is always exactly the same as the last highest timestamp. That means since our incoming stream is out of order, if event A has an event timestamp of 1000 our Apache Flink watermark is also 1000, now if event B comes in with an event timestamp of 900. In this scenario event B would be considered late and ejected from the windowing operator.

When you are dealing with streams that have out of order data, the first place to start is by trying to define a Service Level Agreement (SLA) for your incoming data. This mainly involves communicating with the upstream provider to work out how quickly they will be able to send you their events. Once you have this SLA, then a good way to determine your BoundedOutOfOrderness is to set it to the same duration of your SLA, plus a small buffer to account for network and pipeline latency. For example, in our example we have worked with the city and they have noted the Traffic Lights are controlled by an IOT device which is also what sends the phase change event. It connects to the cellular network and sends its data within 5 minutes of the phase change event occurring, so in this case our SLA would be 5 minutes. For this scenario, I would use a BoundedOutOfOrderness set to 8 minutes. Please note that when setting BoundedOutOfOrderness that it essentially creates a lag between the highest seen timestamp minus the value set. This causes data to be delayed by the length of the duration of the BoundedOutOfOrderness, so if possible, try to get your data sent to you as quickly as possible to have the most real time data.

Conclusion

I hope that next time you’re out on a drive that you have a little more appreciation for the Flink engineers who had to help write the pipelines that processes your city’s Traffic Light data – who knows it might even be you!

Today we stepped through how we could produce metrics for a traffic light system using some of the very core concepts to the Flink framework – Apache Flink Watermarks, Windows, and Event Time processing. Our step-by-step walkthrough of the metric processing underscores the importance of careful consideration when configuring Watermarking strategies, ensuring accurate and timely processing of streaming data. Ultimately, armed with a deeper understanding of Flink’s capabilities and best practices, developers are equipped to navigate the complexities of real-time data processing with confidence and precision. When you’re stumped and need the extra help along the way, I really do hope that you’ll give Datorios a try!

Until next time! 👋🏻

Related Articles

See The Data Behind Your Data

Start Visualizing
Join Today

Fill out the short form below