
This is the fourth article in a series to build a Big Data development environment in AWS. Before you proceed further, please read the first and second and third articles first.
What is Kafka
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
kafka.apache.org
Kafka is capable of capturing data from producers in near real time in the form of messages, storing it and then allowing consumers to read that data in order that the message were sent.
Examples of data producers are application log handlers, IOT devices, the binary logs of MySQL database or bookings on a ride-hailing application etc.
In this article, I will demonstrate how to set up a Kafka Broker on a single EC2 instance. We will first setup and configure Zookeeper and the Kafka Broker, then I will demonstrate how to create topics, publish and consume logs. Finally, I will demonstrate an example of publishing application logfiles to a Kafka topic and then consuming from the same topic.
Those of you familiar with AWS may be a familiar with AWS managed data streaming offerings such as Amazon Managed Streaming for Apache Kafka (MSK) and Amazon Kinesis. These are both quality products from Amazon designed for production environments. However, today we are building things from scratch using open-source Kafka to maximize our learning.

Ready? Let’s go
A primer on Streaming terminology
First, a quick run-down on terms used when discussing Kafka technology.
An event is a record of a real world event. It may also be called a message or a record. The most important components of an event are its key, value, timestamp and metadata.
A producer is a client application which writes events to Kafka
A consumer is a client application which reads events from Kafka.
Events are persisted in topics once they are published. Topics are durable collections of ordered events. Events are not deleted once they are consumed and can be read multiple times as long as they are not older than the retention period.
Topics are partitioned where the topic’s messages are spread out over multiple brokers. Partitioning is crucial to ensuring scalability and allowing multiple clients to read and write to brokers at the same time.
This list is not comprehensive and I recommend reading kafka.apache.org’s excellent documentation.
Download and setup Kafka
Start up your EC2 instance, if you don’t know what this means, please refer to Create a single node Hadoop cluster
I am using a t2.xlarge instance in the ap-southeast-1 region.
You will require approximately 2 hours to complete this tutorial and the estimated cost is $0.46 USD.
The first thing we have to do is SSH into your EC2 instance, go to your AWS console and get the public IP
ssh -i {your-public-key} ubuntu@{your-public-ip}

Now let’s download the Kafka 2.6.1 tarball for Scala 2.12 to our home directory using wget
wget https://archive.apache.org/dist/kafka/2.6.1/kafka_2.12-2.6.1.tgz

Now let’s untar and decompress the archive and move it the /opt directory
tar xzf kafka_2.12-2.6.1.tgz

sudo mv -f kafka_2.12-2.6.1 /opt

We will create a softlink from /opt/kafka to the Kafka files. This will later be used in PATH
sudo ln -s kafka_2.12-2.6.1 /opt/kafka

Add the following lines to ~/.profile using the text editor of your choice.
I’m using vim. These lines set the KAFKA_HOME and PATH environment variables
vim ~/.profile
# Add these 2 lines to ~/.profile
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:${KAFKA_HOME}/bin

Run the .profile script to set the environment variable.
Do this before proceeding to the next step.
source ~/.profile

Configure Zookeeper and the Kafka Broker
We will now set up Zookeeper and the Kafka Broker.
Let’s first take a look at /opt/kafka/config/zookeeper.properties
This is where zookeeper’s configurations are stored


Here’s a brief explanation of the default configurations
dataDir=/tmp/zookeeper
dataDir is directory where ZooKeeper stores the in-memory snapshots of the data as well as the log of updates to the database
clientPort=2181
clientPort is the port at where the ZooKeeper server listens for new client connections
maxClientCnxns=0
maxClientCnxns is maximum no. of connections which a client is allowed to a single member of the ZooKeeper cluster. Default is 10 but we have disable the limit since this is a non-production setting.
admin.enableServer=false
admin.enableServer allows access to the AdminServer which is an embedded Jetty server that provides an HTTP interface to the four letter word commands.
For a complete explanation of all properties. See Confluent’s documentation
We can go ahead and start the zookeeper daemon. We also tell it to use /opt/kafka/config/zookeeper.properties as a config file.
zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties

Let’s check that zookeeper is running correctly
First we use the ps command to find the zookeeper PID amongst the system’s running processes
We combine it with grep to filter only lines containing the zookeeper keyword
ps -ef | grep zookeeper

Next we check that zookeeper is listening on port 2181 using the netstat command
netstat -tulpn flag is equivalent to “netstat –tcp –udp –listening –program –numeric”
We are looking for programs listening on tcp or udp ports and want to list their PID and numeric addresses
grep is used to filter lines containing the zookeeper port number
netstat -tulpn | grep 2181

Finally, we use telnet to establish a connection to port 2181
telnet localhost 2181
# use CTRL+] to close the connection
# type close to exit the telnet application


Start the Kafka cluster
In an earlier tutorial, we created the ~/start_all.sh helper to start all Hadoop, HDFS, Yarn, Hive metastore processes in single script
To streamline cluster management, we add the following lines to script to start Zookeeper and the Kafka broker as well.
zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
kafka-server-start.sh -daemon /opt/kafka/config/server.properties

Conversely, we have the ~/stop_all.sh script to stop application process to make it safe to shutdown the EC2 instance.
We add the following lines to ~/stop_all.sh to stop the Kafka Broker and Zookeeper process
kafka-server-stop.sh
zookeeper-server-stop.sh

First let’s test by running the stop_all.sh script
source stop_all.sh

Let’s try starting up all the process using the start_all.sh script
source start_all.sh

Once again we can validate that Zookeeper and the Kafka Broker are up by connecting to their application ports
telnet localhost 2181

telnet localhost 9092

Using the Kafka CLI
Now that your single node Kafka cluster is up, we can proceed to send and receive messages.
We must first create a topic to stpre the messages.
In a production environment, your topic should replication factor of at least 3.
You should also have multiple partitions for scalability. A detailed treatment of partitioning is discussed here
However, this is a single-node Kafka cluster meant for development, as such we will use a replication factor of 1 and a single partition.
The topic shall be called kafkamytopic
kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic kafkamytopic

We can also list topics on the cluster
kafka-topics.sh --list \
--zookeeper localhost:2181

We might choose to find a single topic called “kafkamytopic“
kafka-topics.sh --list \
--zookeeper localhost:2181 \
--topic kafkamytopic

We can also use the describe flag to get more information about the kafkamytopic topic
kafka-topics.sh --describe \
--zookeeper localhost:2181 \
--topic kafkamytopic

Having created the kafkamytopic topic, we can also delete it
kafka-topics.sh --delete \
--zookeeper localhost:2181 \
--topic kafkamytopic

Let’s re-create the kafkamytopic topic so we can send and receive messages
kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic kafkamytopic
We will now send a series of messages to the kafkamytopic topic interactively in the shell
Line breaks will send one message and await another. Use CTRL+C to terminate the shell
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic kafkamytopic

We consume those same messages by opening a consumer shell
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic kafkamytopic \
--from-beginning

Generate and send logs to Kafka
We will now download and run a log generator script from https://github.com/frenoid/log_generator
It generates records simulating logs from a supermarket webserver. We will send this log file to Kafka for where it consumed, stored and can be ready by any number of consumers
First, let’s clone the repo to the home directory
cd ~/
git clone https://github.com/frenoid/log_generator.git

We should then move the contents to /opt/gen_logs alongside the Spark and Hadoop binaries
sudo mv -f log_generator /opt/gen_logs

We can list the contents of /opt/gen_logs
Notice the make_logs.sh, follow_logs.sh and cease_logs.sh scripts. We will use these later

Let’s remove the directory created by the git clone command
rm -rf log_generator

Let’s also change the ownership of /opt/gen_logs to ubuntu so we can read and write to the directory
sudo chown -R ${USER} /opt/gen_logs

Now let’s add /opt/gen_logs to PATH so we can execute the shell scripts even outside the diretory
Add this line to ~/.profile
export PATH=$PATH:/opt/gen_logs

We’re now ready to run the log generator.
Run the make_logs.sh script. It starts the log generator which writes a line to the /opt/gen_logs/logs/webserver.log every 1.5 seconds
make_logs.sh

Use the follow_logs.sh script to tail the webserver.log file
follow_logs.sh

Observe that each log line consists of an IP address an IP address, a timestamp, the request method, request PATH and a user agent.
You might want to ingest the logs using Kafka where it can be made available for further analysis of web traffic.
Now we can stop the script from producing output by running the cease_logs.sh script. This kills the log producing script by PID.
cease_log.sh

You can view the log file in /opt/gen_logs/logs
ls -ltr /opt/gen_logs/logs

Notice that the file’s owner is ubuntu and the file size is 11,720 bytes. Logfiles can grow to large sizes in a long-running application and it’s necessary to rotate them to keep some free disk space on the server
Now, we are going to send the log generator output to a Kafka topic called supermarket
First, we use the Kafka-CLI to create the supermarket Kafka topic
kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic supermarket

Check the topic is created by using the list command
kafka-topics.sh --list \
--zookeeper localhost:2181 \
--topic supermarket

You can also check its properties by using the describe command
kafka-topics.sh --describe \
--zookeeper localhost:2181 \
--topic supermarket

Then, let’s start up the log generator. This will simulate an active application writing logs
make_logs.sh

We will now stream the contents of webserver.log to the Kafka broker where it will be consumed and stored in the supermarket topic.
We do this by piping the output of follow_logs.sh to the Kafka CLI producer command
/opt/gen_logs/follow_logs.sh| \
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic supermarket

Notice the progress bar at the bottom
The logs are now being published to the supermarket topic. Note that this is a long-running process, Kafka will continually ingest the logs as the log generator produces them.
Use CTRL+C to terminate the Kafka CLI command.
We now have events stored in the Kafka supermarket topic. What if we want to view them to analyze website traffic?
Well, we can use the Kafka CLI to consume the events of the supermarket topic. Note the use of the –from-beginning flag, this means we want Kafka to give us all events in the topic from earliest timestamp. You can also use the –offset flag to start reading from a particular timestamp
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic supermarket \
--from-beginning

Note that the consumer is a long-running process and it will continually receive new events as they arrive in the topic. Type CTRL+C to terminate the consumer process
Now, let’s stop the log generator process to conserve disk space
cease_logs.sh

We should also stop all HDFS, Yarn, Zookeeper and Kafka processes. This prevents data corruption and loss when we terminate the EC2 instance.
source stop_all.sh

To avoid spending unnecessary money, you should now terminate your EC2 instance via the AWS console.
Conclusion
Congratulations, you’ve successfully setup and configured a single-node Kafka cluster on an EC2 instance, published the logfile as an eventlog to the broker and even consumed the eventlog from topic.
Why t2.xlarge, Can I use anything below that?
LikeLike