apache flink and python, pyflink
May 23, 2024

PyFlink x Datorios: A Comprehensive Introduction

Colten Pilgreen
Colten Pilgreen
apache flink and python, pyflink
twitter facebook linkedin

Observability, Troubleshooting, & Job Insights are some of the things that drive the team at Datorios everyday. From the beginning they’ve sought out to bring a better experience when trying to understand the inner workings of Flink pipelines, and with their latest release, why should PyFlink be any different? If you said it shouldn’t be any different, then you’ve come to the right place!

Pie and Flink, match made in heaven (Datorios happy hour)

The ability to use PyFlink opens many doors to you, your team, and especially to the Data Scientists and Analysts who already know and love Python. PyFlink allows you to bring in a bigger base of users to help build Flink pipelines and Python already has a whole host of packages to help aid in processing data. So why wouldn’t you want to get started with PyFlink?

Setting Up The Environment

Before we get started, we’ll need to make sure we’re using the latest version of Datorios so that it supports the use of PyFlink when analyzing your jobs!

  1. Navigate to the Datorios App and make sure to sign into your Organization
  2. Click the Guide Icon in the bottom left of the screen to bring up the Welcome Guide
  3. Scroll down to the list item labeled “Download the Datorios client tar.gz file for your specific version”
    • Follow the Welcome Guide to make the datorios.sh file executable, and to configure your .env file correctly to mount your jobs folder correctly. I have my jobs mounted to /jobs in the containers, so I’ll be using that path for this article.
  4. Congrats! You’ve just updated your Datorios CLI to the latest version.
    • Make sure you have a cluster up and running so we can submit our PyFlink job once we’re done writing it!

For this article we’re just going to be writing a simple PyFlink pipeline, but don’t let this limit your imagination! You can refer to the PyFlink documentation to learn more about PyFlink and all of the things it is capable of doing (spoiler, it’s a lot).

The PyFlink pipeline that we will be writing today will be a simple windowing pipeline to create metrics for the messages sent between users. We will start with by writing a collection, then watermarking our messages, then windowing them into 5 second windows, where we will count the number of messages a user sent in the window, then print the results out to standard out. Let’s get started!

To get started, we need to set up the environment to tell Flink how we want to process this pipeline in Python. We’ll be using the StreamExecutionEnvironment in Streaming mode.

from pyflink.common import WatermarkStrategy, Duration, Time
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.window import TumblingEventTimeWindows

# Create the StreamExecutionEnvironment in Streaming Mode
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

Creating a DataStream of Messages from a Collection

Now that we’ve gotten the StreamExecutionEnvironment set up, we need to have some sort of data to actually process against. Thankfully we can create a DataStream from a collection just the same as you can in the Java APIs.


The DataStream we will be creating will be a tuple of 4 elements: id, message, username, and timestamp. The DataStream represents messages sent back and forth between users in a messaging system. The DataStream is created using the from_collection method on the StreamExecutionEnvironment we created earlier.

# Create a DataStream from a collection of Tuples
messages = env.from_collection(
    [
        (1, 'Hi', 'Colten', 1716378540000),
        (2, 'Hello', 'Alex', 1716378541000),
        (3, 'How are you?', 'Colten', 1716378542000),
        (4, 'Good, what is your name?', 'Alex', 1716378544000),
        (5, 'Colten, who are you?', 'Colten', 1716378546000),
        (6, 'Alex, nice to meet you!', 'Alex', 1716378548000),
        (7, 'Likewise.', 'Colten', 1716378549000)
    ]
)

Watermarking a DataStream

Now that we’ve gotten some data to process, we’ll need to watermark the records using the timestamp element in the message tuple. If you are already familiar with the Java API this will look very similar, we must supply a WatermarkStrategy to apply to the DataStream and create a TimestampAssigner to choose which element in the tuple to use for the watermark.

# The TimestampAssigner to pull the timestamp element out of the tuple
class UserMessageTimestampAssigner(TimestampAssigner):
    def extract_timestamp(self, message, record_timestamp):
        return message[3]

# Apply a BoundedOutOfOrderness of 0 WatermarkStrategy to the
# messages stream, and then supply our UserMessageTimestampAsssigner
watermarked_messages = (
    messages
    .assign_timestamps_and_watermarks(
        WatermarkStrategy
        .for_bounded_out_of_orderness(Duration.of_seconds(5))
        .with_timestamp_assigner(UserMessageTimestampAssigner())
    )
)

Windowing & Reducing the Messages to a Metric

Now that we’ve gotten our Messages watermarked, we can now convert our messages tuple into something that can be aggregated inside of a reduce function, then key by the username tied to the message, then apply a TumblingEventTime window to the stream, then reduce the metrics to a singular window output!

# Convert our message tuple into a new tuple to reduce (username, count)
unaggregated_metrics = (
    watermarked_messages
    .map(lambda message: (message[2], 1))
)

# Window the unaggregated metrics and then reduce them
message_metrics = (
    unaggregated_metrics
    .key_by(lambda metric: metric[0])
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(lambda a, b: (a[0], a[1] + b[1]))
)

# Send out MessageMetrics to stdout, then execute the pipeline
message_metrics.print()
env.execute('UserMessageMetrics')

Congrats! You’ve just written your first PyFlink pipeline, you can see the full file with all of the snippets put together in my flink-snippets repo. Now we can get started with running this PyFlink pipeline in the Datorios cluster so we can see what’s going on inside of the job.

The great thing about PyFlink is that we do not need to compile our pipeline into a jar, we can simply submit the job to the cluster and then the Flink engine will do all of the work for us. So let’s get started!

  1. The first thing we must do is copy our python file into the folder you configured inside of your .env folder to run your jobs from. My python file for this example is called main.py and the folder is configured to be /jobs on both my local machine and it is mounted to /jobs in the cluster containers
  2. Open a Terminal and navigate to the location of your Datorios CLI folder (if you haven’t added Datorios to the your PATH environment variable)
  3. Make sure you have a Datorios Cluster running and ready to receive your job
    • ./datorios.sh colten start, I’ve named my cluster “colten”
  4. Now it’s time to submit your PyFlink job to the cluster, if you’ve copied the python file correctly you should be able to run the following command and see the information for the job and eventually the results once it is done running
    • ./datorios.sh colten flink run –python /jobs/main.py
    • The command to run a job is the same as a the Java API, except for you need to add –python before the location of the job you’re submitting to let Flink know it should interpret the job as a PyFlink pipeline
  5. After a couple of seconds you should see a line that looks like the below line, it will hold the Job ID so you can navigate to it in the Datorios UI and see the magic for yourself!
    • Job has been submitted with JobID …
    • My Job’s ID is 3d6824174933b2224b1e2c31cd2d496e
  6. In another couple of seconds you’ll see that the pipeline has finished executing, which means we’re ready to see the results in the Datorios UI (or you can find them in your task manager’s stdout logs)
  7. You’re officially a PyFlinker!

Visually Analyzing with Datorios

Now that we’ve seen the power of PyFlink, we can now dive into how you can use the Datorios UI that we all know and love. You’ll notice that we get the same view that we’re used to with Java Flink, but specifically tailored for this PyFlink pipeline.

We can see our Job Graph over on the right along with the Timeline of events flowing through it. You’ll notice that this PyFlink pipeline has a few more operators than we’d expect to see in a Java API pipeline, that is because under the hood PyFlink runs on the Java Environment and in between operators there is a translation between the Java and Python Beam runtime that runs our Python Code. Pretty neat right?

Along with seeing an overview of the pipeline, we can also do the same digging into individual records and seeing their content and the metadata about the record! Here we can see the aggregated message metric, in this 5 second window Alex sent 2 messages, which we can confirm by looking at the source data.

You can also do the same Window Investigation that we’re used to with Java API pipelines exactly the same and view the state held in the window operator.

Conclusion

Today we’ve written a simple PyFlink pipeline to give you a little insight into just how easy it is to get started by leveraging the power of Flink alongside the ease of use of Python. The power of PyFlink comes from its lower barrier of entry to people who may not be familiar or comfortable with Java and its ability to bring in other Python libraries into processing data with Flink.

We hope that with today’s introduction you’ll be empowered to go out and discover the use cases for you and your team to harness the power of PyFlink, and until next time! 👋🏻

Related Articles

See The Data Behind Your Data

Start Visualizing
Join Today

Fill out the short form below