Aggregating Flight Pricing Data with Apache Kafka and ksqlDB

Introduction

Lately, I decided to explore some other notable open source projects. Apache Kafka is a project I heard about years ago, but I never encountered work that warranted its use. After I shifted into the software development world and became more accustomed to working with large datasets, specifically through web scraping, Apache Kafka came across my radar once again. Apache Kafka, as defined by the Apache Foundation, is an open source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission critical applications. Apache Kafka has an extremely high throughput and can scale to large numbers. The core of Apache Kafka is written in Java and its famous Streams API is written in Java as well. I had a sinking feeling, at first, that to use Apache Kafka well, I would have to learn Java. Fortunately, I learned there are many open source Kafka clients/bindings written in other languages, like Python. For this project, I chose to use Confluent’s Kafka Python Client. One of my personal projects that I find really interesting to work on is a Google Flight price scraper. Even though some people see web scraping as dull, I find collecting data and making decisions quite interesting, especially when it revolves around travel. One of my favorite destinations to travel to is Japan. For this project, I chose to incorporate Apache Kafka components to my existing Google Flight scraper. In the end, I want to achieve basic streaming and use ksqlDB to interact with the flight data as its flowing in real time.

Use Case Diagram

To help illustrate this use case, I created a basic diagram in LibreOffice Draw. I hope this diagram explains the use case in an easy way:

How It Works

The Google Flight price scraper is called FlightScraper. FlightScraper is written in Python and uses requests to fetch HTML content from flights.google.com. Once the HTML has been fetched, Beautiful Soup then parses out the flight information from the page. In order to prevent Google’s bot protections from interfering with the data collection, all HTTP requests will go through a rotating proxy endpoint via a third-party provider. Only one node will perform requests using a multithreaded approach. The request load being sent is small compared to other large botnets that may act maliciously. DISCLAIMER: Please act responsibly when scraping data from a website.

The processing is numbered in order by start to finish. That is, 1 signifies FlightScraper sending out the HTTP request to the external proxy endpoint. Once the proxy endpoint receives the request, the proxy then handles the completion of the request to flights.google.com. Once flights.google.com processes the request, the response is sent back to the proxy endpoint that originally established the request and from there, the proxy endpoint relays the response back to the threaded worker that started the whole process. FlightScraper will then parse the data using Beautiful Soup and then build a JSON object that represents the flight.

How Kafka Is Involved

Once the flight data JSON has been created, FlightScraper will then produce a message to the configured Kafka topic. In this case, the Kafka topic is called flight-data. The Kafka message is sent to the Kafka topic that exists on the broker. Like many distributed systems, Kafka can scale to hundreds or even thousands of brokers. The distributed storage platform Ceph operates in a similar fashion. For example, if an user wants to add additional storage to expand the cluster and increase redundancy, the administrator of the Ceph storage cluster can add an OSD (Object Storage Daemon), which causes the cluster to increase in storage and rebalance data. This backend process ensures that, if one OSD fails, the data will be protected. When I read through Kafka’s documentation, I noticed these similarities to Ceph. Just like Ceph’s auto-healing/auto-balancing capabilities, Kafka will ensure messages in a topic are protected from data corruption in the best way it can. Kafka messages sent to a broker are also ordered, which ensures real-time applications built around Kafka don’t receive data that may be “old”. If data changes frequently, then even a message from a minute ago might be considered outdated. Finally, in order to leverage and manipulate the incoming data, ksqlDB is used. ksqlDB is a project that works with Apache Kafka to provide a SQL-like layer to Kafka data. If an user knows SQL-like functions, they can use that knowledge to interact with incoming data. In the following examples, ksqlDB will be used to filter out incoming flights that don’t match certain criteria.

FlightScraper in Action

Now that FlightScraper’s architecture has been outlined, let’s actually start seeing the data it produces. FlightScraper has additional abilities, like being able to produce date ranges based on how long a trip will last. In the following examples, I will have FlightScraper generate 10-day date ranges for flights going from Cleveland (CLE) to Haneda (HND) from today (i.e. 9/2/24) into the future by 180 ranges (each range will have 10 days). As the data flows into the Kafka broker, ksqlDB will then process the data and EMIT the changes as they come in real time.

In order to interact with the Kafka data via ksqlDB, we’ll need to create a stream based on the Kafka topic. Here’s an example of a query that creates a stream based on incoming data to the Kafka broker:

CREATE STREAM flights (origin VARCHAR, destination VARCHAR, embarkDate VARCHAR, embarkTime VARCHAR, airline VARCHAR, arrivalTime VARCHAR, departDate VARCHAR, duration VARCHAR, flightType VARCHAR, price INTEGER, pricePerWay DOUBLE, timestamp VARCHAR) WITH (kafka_topic='flight-data', value_format='json', partitions=3);

Before creating the preceding stream, we need to spin up ksqlDB. You can start ksqlDB by running the following command:

sudo /usr/bin/ksql-server-start /etc/ksqldb/ksql-server.properties
NOTE: Many of the command examples can be found on ksqlDB’s documentation. I’m using ksqlDB via the standalone Debian package, so I’m referencing this. If you’re using this method, you’ll need to configure the ksql-server.properties file first.
If you’re using ksqlDB in a container or via another method, Confluent provides documentation for that too.
Now, let’s connect to ksqlDB. In my environment, I’m running ksqlDB on the same node as FlightScraper (i.e. localhost):
/usr/bin/ksql http://0.0.0.0:8088

Now, for the grand finale, let’s see how the data gets represented:

(Example Using Just a Simple SELECT *)

As I mentioned before, ksqlDB allows users to leverage SQL-like functionality to manipulate and change how the data stream is represented. In the next example, I’m going to filter the price and only return flights that are less than or equal to $1,050:

(Example Using a <= Comparison)

The preceding examples are just basic use cases of Apache Kafka and ksqlDB. From my experiences with this project, I can now think of many different use cases where streaming data can be built into real-time applications. Apache Kafka and ksqlDB make a great pair for building dynamic and real-time platforms. If you haven’t checked out Apache Kafka or ksqlDB, you should!

Credits:

Apache Kafka in 100 Secondshttps://www.youtube.com/watch?v=uvb00oaa3k8

ksqlDB SQL Quick Referencehttps://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/quick-reference/

apache/kafkahttps://hub.docker.com/r/apache/kafka

ksqlDB Quick Starthttps://ksqldb.io/quickstart.html

Fun with Flask: Creating Simple GET Endpoints

Python is a great programming language to build web applications with. Not only is the entry bar lower than other languages, there’s a wide variety of web frameworks to choose from (e.g. Flask, Django).

My personal favorite is Flask. Flask is easy to use and has the ability to scale out (i.e. Blueprints). When building my personal website, I wanted to keep things simple. The design is minimal and serves the intended purpose. With the endpoints however, I wanted to be more creative. Currently, there are two endpoints: /skills and /education. Both /skills and /education accept only two HTTP methods: GET and OPTIONS. Later on, I’m going to create some cooler endpoints that integrate with other Python libraries. However, for now, I think querying endpoints for data is just as cool.

card.mcclunetechnologies.net is my personal website. The site is essentially my virtual business card. I want to have endpoints that are under-the-radar and return more information about myself. Right now, you can query /skills and /education by navigating to the endpoints directly. You can also send a GET request using a tool like curl.

Both endpoints return JSON responses. When querying an endpoint like /skills, Python will open a connection to a remote MySQL database and fetch all information within the appropriate table. As the OPTIONS method describes, /skills will return the following:

user@debian:~$ curl -s -XOPTIONS https://card.mcclunetechnologies.net/skills
Supported Methods for /skills: GET
Provides the following information: skill_name (string), skill_description (string), years_of_experience (integer), and comfort_level (string; can either be low, medium, or high)

Sending a GET request to /skills will return the MySQL fetchall() response dumped into JSON:

user@debian:~$ curl -s -XGET https://card.mcclunetechnologies.net/skills
[
[
"Active Directory",
"Active Directory (AD) is a directory service developed by Microsoft for Windows domain networks. It is included in most Windows Server operating systems as a set of processes and services. Initially, Active Directory was only in charge of centralized domain management. However, Active Directory became an umbrella title for a broad range of directory-based identity-related services.",
7,
"medium"
],
[
"Ansible",
"Ansible is an open-source software provisioning, configuration management, and application-deployment tool enabling infrastructure as code. It runs on many Unix-like systems, and can configure both Unix-like systems as well as Microsoft Windows.",
2,
"medium"
],
[
"Apache CloudStack",
"CloudStack is open-source cloud computing software for creating, managing, and deploying infrastructure cloud services.",
3,
"medium"
],
[
"Bash",
"Bash is a Unix shell and command language written by Brian Fox for the GNU Project as a free software replacement for the Bourne shell. First released in 1989, it has been used as the default login shell for most Linux distributions and all releases of Apple's macOS prior to macOS Catalina.",
4,
"medium"
],
[
"Cisco IOS",
"Cisco Internetwork Operating System (IOS) is a family of network operating systems used on many Cisco Systems routers and current Cisco network switches.",
7,
"medium"
],
[
"Git",
"Git is a distributed version-control system for tracking changes in any set of files, originally designed for coordinating work among programmers cooperating on source code during software development.",
4,
"medium"
],
[
"Linux",
"Linux is a family of open-source Unix-like operating systems based on the Linux kernel, an operating system kernel first released on September 17, 1991, by Linus Torvalds. Linux is typically packaged in a Linux distribution.",
7,
"high"
],
[
"Nagios",
"Nagios Core, formerly known as Nagios, is a free and open-source computer-software application that monitors systems, networks and infrastructure.",
6,
"medium"
],
[
"Python",
"Python is an interpreted, high-level and general-purpose programming language. Python's design philosophy emphasizes code readability with its notable use of significant whitespace. Its language constructs and object-oriented approach aim to help programmers write clear, logical code for small and large-scale projects.",
2,
"medium"
],
[
"Technical Support",
"Technical support (often shortened to tech support) refers to services that entities provide to users of technology products or services. In general, technical support provides help regarding specific problems with a product or service, rather than providing training, provision or customization of the product, or other support services.",
7,
"high"
]

Here’s the /skills endpoint within my Flask view:

@about.route("/skills", methods=["GET", "OPTIONS"])
def skills():
if request.method == 'GET':
conn = mysqlConn()
skillsCursor = conn.cursor()
skillsCursor.execute("SELECT * FROM skills")
skillsInfo = skillsCursor.fetchall()
skillsCursor.close()
conn.close()
return json.dumps(skillsInfo, indent=4)
elif request.method == 'OPTIONS':
return skillsOptions

/skills and /education don’t have filtering capabilities, however, you can use a tool like jq to achieve similar results. One example is filtering just the skill names:

user@debian:~$ curl -s -XGET https://card.mcclunetechnologies.net/skills | jq .[][0]
"Active Directory"
"Ansible"
"Apache CloudStack"
"Bash"
"Cisco IOS"
"Git"
"Linux"
"Nagios"
"Python"
"Technical Support"

Explaining & Illustrating curl

curl is an exceptionally useful program. As described on the project homepage (https://curl.se/), curl is a tool to transfer data from or to a server, using one of the supported protocols. curl can be used to send & receive data with the following protocols:

  • DICT
  • FILE
  • FTP
  • FTPS
  • GOPHER
  • HTTP
  • HTTPS
  • IMAP
  • IMAPS
  • LDAP
  • LDAPS
  • POP3
  • POP3S
  • RTMP
  • RTSP
  • SCP
  • SFTP
  • SMB
  • SMBS
  • SMTP
  • SMTPS
  • TELNET
  • TFTP

To better explain curl, I will demo curl on Ubuntu 14.04. First, I will execute curl http://gitlab.com

The command executes very quickly. However, a lot is actually being performed in the background.

curl http://gitlab.com

When the user executes curl http://gitlab.com, a request is sent from the application (i.e. curl) to the kernel.

The kernel acts as a middle man between the system’s software applications and hardware. curl needs to talk to some of the hardware components, including the CPU, memory, and network adapter. Given that curl is a network application, the kernel definitely needs to talk with the network adapter. When curl http://gitlab.com is executed, the user is telling curl to send some data over HTTP, in hopes of a response.

In order to visualize the HTTP data being sent to http://gitlab.com, I will use WireShark to sniff the outgoing packets. Below is the packet capture performed while executing curl http://gitlab.com:

My computer is currently using Google’s public DNS server (8.8.4.4). Given that gitlab.com is out on the Internet, my computer has to send a DNS request to Google so the domain name can be translated to an IP address. Gitlab.com appears to be at 52.167.219.168. Now that my computer knows the IP address of gitlab.com, HTTP requests can be sent. The curl HTTP requests go from my computer, to my ISP gateway, and out to 52.167.219.168. The processes appear to have completed and data was received.

Let’s see what WireShark collected from the HTTP request:

The text highlighted in red was the request my computer sent to gitlab.com’s server. The text highlighted in blue is what was returned from gitlab.com’s server. When the request was sent to gitlab.com’s server, gitlab.com returned a 301 status. The HTTP 301 status code means Moved Permanently. This code is usually thrown when an user accesses a site and the web server redirects them to another. To prove this holds true, let’s open a web browser and go to http://gitlab.com.

When launching the HTTP request from my web browser, the server redirected me to this:

The URL http://gitlab.com takes you to the home page of gitlab.com:

Hard to believe all of that happens just by executing a small command! The preceding was the whole curl process, from the kernel to over the network, and completed on gitlab.com’s server.

(Originally posted on June 8th, 2017. Updated on December 29th, 2020)

Please Bring Back Cub Linux!

Cub Linux (formerly Chromixium) is a great Linux distribution that mixes both the Chrome and Ubuntu experience. Cub Linux’s development has officially stopped, however, there is hope that Cub Linux will carry on. There is talk that a fork of Cub Linux is in-development. The forked project is called Phoenix Linux. For more information, please visit this open issue:

https://github.com/CubLinux/one/issues/4

I commented on the issue:

I just want to say that I continue to use Cub Linux everyday! I love Cub Linux! 

I don’t have a great amount of development experience within furthering Linux OS features. However, if there is anything I can do to help, please let me know! I have taken Cub Linux (Ubuntu 14.04) and upgraded it to Ubuntu 16.04. There were some features that broke (going from 14.04 to 16.04). However, it still works okay for me.

Very eager to see Phoenix Linux! 

I have to speak my mind on this project because Cub Linux needs to continue. I understand that in the open source community, there can be developers that feel unappreciated. I am writing this to say that every project in the open source community is welcome and appreciated! No matter what a project’s purpose is, everyone should be welcome to contribute to open source applications.

Thank you to anyone reading this post! Please spread the word about Cub Linux! Here are some resources to get you acquainted with what Cub Linux is, if you don’t know already:

https://en.wikipedia.org/wiki/Cub_Linux

http://www.makeuseof.com/tag/replicate-chrome-os-laptop-cub-linux/

https://github.com/CubLinux

(Originally posted on August 5th, 2017. Updated on December 29th, 2020)