Setup a Kafka cluster on Amazon EC2

This is the fourth article in a series to build a Big Data development environment in AWS. Before you proceed further, please read the first and second and third articles first.

What is Kafka

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

kafka.apache.org

Kafka is capable of capturing data from producers in near real time in the form of messages, storing it and then allowing consumers to read that data in order that the message were sent.

Examples of data producers are application log handlers, IOT devices, the binary logs of MySQL database or bookings on a ride-hailing application etc.

In this article, I will demonstrate how to set up a Kafka Broker on a single EC2 instance. We will first setup and configure Zookeeper and the Kafka Broker, then I will demonstrate how to create topics, publish and consume logs. Finally, I will demonstrate an example of publishing application logfiles to a Kafka topic and then consuming from the same topic.

Those of you familiar with AWS may be a familiar with AWS managed data streaming offerings such as Amazon Managed Streaming for Apache Kafka (MSK) and Amazon Kinesis. These are both quality products from Amazon designed for production environments. However, today we are building things from scratch using open-source Kafka to maximize our learning.

Amazon’s Managed Streaming offerings

Ready? Let’s go

A primer on Streaming terminology

First, a quick run-down on terms used when discussing Kafka technology.

An event is a record of a real world event. It may also be called a message or a record. The most important components of an event are its key, value, timestamp and metadata.

A producer is a client application which writes events to Kafka

A consumer is a client application which reads events from Kafka.

Events are persisted in topics once they are published. Topics are durable collections of ordered events. Events are not deleted once they are consumed and can be read multiple times as long as they are not older than the retention period.

Topics are partitioned where the topic’s messages are spread out over multiple brokers. Partitioning is crucial to ensuring scalability and allowing multiple clients to read and write to brokers at the same time.

This list is not comprehensive and I recommend reading kafka.apache.org’s excellent documentation.

Download and setup Kafka

Start up your EC2 instance, if you don’t know what this means, please refer to Create a single node Hadoop cluster

I am using a t2.xlarge instance in the ap-southeast-1 region.

You will require approximately 2 hours to complete this tutorial and the estimated cost is $0.46 USD.

The first thing we have to do is SSH into your EC2 instance, go to your AWS console and get the public IP

ssh -i {your-public-key} ubuntu@{your-public-ip}

Now let’s download the Kafka 2.6.1 tarball for Scala 2.12 to our home directory using wget

wget https://archive.apache.org/dist/kafka/2.6.1/kafka_2.12-2.6.1.tgz

Now let’s untar and decompress the archive and move it the /opt directory

tar xzf kafka_2.12-2.6.1.tgz
sudo mv -f kafka_2.12-2.6.1 /opt

We will create a softlink from /opt/kafka to the Kafka files. This will later be used in PATH

sudo ln -s kafka_2.12-2.6.1 /opt/kafka

Add the following lines to ~/.profile using the text editor of your choice.

I’m using vim. These lines set the KAFKA_HOME and PATH environment variables

vim ~/.profile
# Add these 2 lines to ~/.profile
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:${KAFKA_HOME}/bin

Run the .profile script to set the environment variable.

Do this before proceeding to the next step.

source ~/.profile

Configure Zookeeper and the Kafka Broker

We will now set up Zookeeper and the Kafka Broker.

Let’s first take a look at /opt/kafka/config/zookeeper.properties

This is where zookeeper’s configurations are stored

Here’s a brief explanation of the default configurations

dataDir=/tmp/zookeeper

dataDir is directory where ZooKeeper stores the in-memory snapshots of the data as well as the log of updates to the database

clientPort=2181

clientPort is the port at where the ZooKeeper server listens for new client connections

maxClientCnxns=0

maxClientCnxns is maximum no. of connections which a client is allowed to a single member of the ZooKeeper cluster. Default is 10 but we have disable the limit since this is a non-production setting.

admin.enableServer=false

admin.enableServer allows access to the AdminServer which is an embedded Jetty server that provides an HTTP interface to the four letter word commands.

For a complete explanation of all properties. See Confluent’s documentation

We can go ahead and start the zookeeper daemon. We also tell it to use /opt/kafka/config/zookeeper.properties as a config file.

zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

Let’s check that zookeeper is running correctly

First we use the ps command to find the zookeeper PID amongst the system’s running processes

We combine it with grep to filter only lines containing the zookeeper keyword

ps -ef | grep zookeeper

Next we check that zookeeper is listening on port 2181 using the netstat command

netstat -tulpn flag is equivalent to “netstat –tcp –udp –listening –program –numeric”

We are looking for programs listening on tcp or udp ports and want to list their PID and numeric addresses

grep is used to filter lines containing the zookeeper port number

netstat -tulpn | grep 2181

Finally, we use telnet to establish a connection to port 2181

telnet localhost 2181
# use CTRL+] to close the connection
# type close to exit the telnet application

Start the Kafka cluster

In an earlier tutorial, we created the ~/start_all.sh helper to start all Hadoop, HDFS, Yarn, Hive metastore processes in single script

To streamline cluster management, we add the following lines to script to start Zookeeper and the Kafka broker as well.

zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
kafka-server-start.sh -daemon /opt/kafka/config/server.properties
My ~/.start_all.sh file looks like this. I start the HDFS and Yarn processes first, followed by the Hive metastore in a Docker MySQL container, then finally we start Zookeeper and Kafka

Conversely, we have the ~/stop_all.sh script to stop application process to make it safe to shutdown the EC2 instance.

We add the following lines to ~/stop_all.sh to stop the Kafka Broker and Zookeeper process

kafka-server-stop.sh
zookeeper-server-stop.sh
The contents of my ~/stop_all.sh file

First let’s test by running the stop_all.sh script

source stop_all.sh
Notice that output “No kafka server to stop”. That’s because we didn’t start the Kafka broker (only Zookeeper)

Let’s try starting up all the process using the start_all.sh script

source start_all.sh

Once again we can validate that Zookeeper and the Kafka Broker are up by connecting to their application ports

telnet localhost 2181
A successful connection to port 2181 indicates Zookeeper is running
telnet localhost 9092
A successful connection to port 9092 indicates the Kafka broker is running

Using the Kafka CLI

Now that your single node Kafka cluster is up, we can proceed to send and receive messages.

We must first create a topic to stpre the messages.

In a production environment, your topic should replication factor of at least 3.

You should also have multiple partitions for scalability. A detailed treatment of partitioning is discussed here

However, this is a single-node Kafka cluster meant for development, as such we will use a replication factor of 1 and a single partition.

The topic shall be called kafkamytopic

kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 \
  --partitions 1 \
  --topic kafkamytopic
Creating a new topic in the Kafka-CLI

We can also list topics on the cluster

kafka-topics.sh --list \
  --zookeeper localhost:2181
Listing topics using Kafka-CLI

We might choose to find a single topic called “kafkamytopic

kafka-topics.sh --list \
  --zookeeper localhost:2181 \
  --topic kafkamytopic
Listing a single topic using Kafka-CLI

We can also use the describe flag to get more information about the kafkamytopic topic

kafka-topics.sh --describe \
  --zookeeper localhost:2181 \
  --topic kafkamytopic
Getting the details of a single topic using Kafka-CLI

Having created the kafkamytopic topic, we can also delete it

kafka-topics.sh --delete \
  --zookeeper localhost:2181 \
  --topic kafkamytopic
Deleting a topic using the Kafka-CLI

Let’s re-create the kafkamytopic topic so we can send and receive messages

kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 \
  --partitions 1 \
  --topic kafkamytopic

We will now send a series of messages to the kafkamytopic topic interactively in the shell

Line breaks will send one message and await another. Use CTRL+C to terminate the shell

kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic kafkamytopic
I sent 4 messages in the Kafka-CLI producer shell

We consume those same messages by opening a consumer shell

kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic kafkamytopic \
  --from-beginning
We receive the same 4 messages using the Kafka-CLI consumer shell.

Generate and send logs to Kafka

We will now download and run a log generator script from https://github.com/frenoid/log_generator

It generates records simulating logs from a supermarket webserver. We will send this log file to Kafka for where it consumed, stored and can be ready by any number of consumers

First, let’s clone the repo to the home directory

cd ~/
git clone https://github.com/frenoid/log_generator.git

We should then move the contents to /opt/gen_logs alongside the Spark and Hadoop binaries

sudo mv -f log_generator /opt/gen_logs
Contents of /opt

We can list the contents of /opt/gen_logs

Notice the make_logs.sh, follow_logs.sh and cease_logs.sh scripts. We will use these later

Contents of /opt/gen_logs

Let’s remove the directory created by the git clone command

rm -rf log_generator
The contents of ~/ after removing the log_generator directory

Let’s also change the ownership of /opt/gen_logs to ubuntu so we can read and write to the directory

sudo chown -R ${USER} /opt/gen_logs
Detailed listing of /opt. Notice that gen_logs is owned by the ubuntu user

Now let’s add /opt/gen_logs to PATH so we can execute the shell scripts even outside the diretory

Add this line to ~/.profile

export PATH=$PATH:/opt/gen_logs
Contents of ~/.profile after adding /opt/gen_logs to PATH

We’re now ready to run the log generator.

Run the make_logs.sh script. It starts the log generator which writes a line to the /opt/gen_logs/logs/webserver.log every 1.5 seconds

make_logs.sh
Running the make_logs.sh script

Use the follow_logs.sh script to tail the webserver.log file

follow_logs.sh
The contents of webserver.log

Observe that each log line consists of an IP address an IP address, a timestamp, the request method, request PATH and a user agent.

You might want to ingest the logs using Kafka where it can be made available for further analysis of web traffic.

Now we can stop the script from producing output by running the cease_logs.sh script. This kills the log producing script by PID.

cease_log.sh
Running the cease_logs.sh

You can view the log file in /opt/gen_logs/logs

ls -ltr /opt/gen_logs/logs
Detailed listing of /opt/gen_logs/logs

Notice that the file’s owner is ubuntu and the file size is 11,720 bytes. Logfiles can grow to large sizes in a long-running application and it’s necessary to rotate them to keep some free disk space on the server

Now, we are going to send the log generator output to a Kafka topic called supermarket

First, we use the Kafka-CLI to create the supermarket Kafka topic

kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 \
  --partitions 1 \
  --topic supermarket
Created the supermarket topic

Check the topic is created by using the list command

kafka-topics.sh --list \
  --zookeeper localhost:2181 \
  --topic supermarket
Listed the supermarket topic

You can also check its properties by using the describe command

kafka-topics.sh --describe \
  --zookeeper localhost:2181 \
  --topic supermarket
Viewing the properties of the supermarket topic

Then, let’s start up the log generator. This will simulate an active application writing logs

make_logs.sh
Start up the log generator

We will now stream the contents of webserver.log to the Kafka broker where it will be consumed and stored in the supermarket topic.

We do this by piping the output of follow_logs.sh to the Kafka CLI producer command

/opt/gen_logs/follow_logs.sh| \
  kafka-console-producer.sh \
    --bootstrap-server localhost:9092 \
    --topic supermarket
Sending the output of follow_logs.sh to the Kafka producer API.
Notice the progress bar at the bottom

The logs are now being published to the supermarket topic. Note that this is a long-running process, Kafka will continually ingest the logs as the log generator produces them.

Use CTRL+C to terminate the Kafka CLI command.

We now have events stored in the Kafka supermarket topic. What if we want to view them to analyze website traffic?

Well, we can use the Kafka CLI to consume the events of the supermarket topic. Note the use of the –from-beginning flag, this means we want Kafka to give us all events in the topic from earliest timestamp. You can also use the –offset flag to start reading from a particular timestamp

kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic supermarket \
  --from-beginning
Consuming the supermarket topic.

Note that the consumer is a long-running process and it will continually receive new events as they arrive in the topic. Type CTRL+C to terminate the consumer process

Now, let’s stop the log generator process to conserve disk space

cease_logs.sh

We should also stop all HDFS, Yarn, Zookeeper and Kafka processes. This prevents data corruption and loss when we terminate the EC2 instance.

source stop_all.sh

To avoid spending unnecessary money, you should now terminate your EC2 instance via the AWS console.

Conclusion

Congratulations, you’ve successfully setup and configured a single-node Kafka cluster on an EC2 instance, published the logfile as an eventlog to the broker and even consumed the eventlog from topic.

2 thoughts on “Setup a Kafka cluster on Amazon EC2

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s