Cloud-Agnostic Big Data Processing with Kubernetes, Spark and Minio

Introduction

In this article I discuss how to build a cloud agnostic Big Data processing and storage solution running entirely in Kubernetes. This design avoids vendor lock-in by using open-source technologies and eschewing cloud-managed products such as S3 and Amazon ElasticMapReduce in favour of MinIO and Apache Spark

Cloud managed solutions lower the barrier for organizations to build a Big Data platform because their pay-as-you-go model requires little or no upfront expenditure to start using. In addition, managed products abstract away the technical details and need fewer specialized technical staff to manage. They especially suit startups where one or two data engineers are expected to manage the entire data tech stack.

Pay As You Go needs little to no upfront cost

However as an organization’s data requirements grow in volume, velocity and variety, it starts discovering the limits of cloud-managed products. It might find its cloud bill sky-rocketing as the order of data processed grows from gigabytes to petabytes. It might find itself hitting API call or timeout limits. Or worse, it may even discover bugs in the product only visible at scale.

Enter the cloud-agnostic data architecture designed to be portable to any major cloud vendor. I made several assumptions

  • There must be enough engineers who are subject matter expects in the tech to effectively manage the platform
  • There is sufficient scale for the design can compete with cloud-managed tools in terms of cost and performance
  • We can scale-out by adding nodes to the Kubernetes cluster and increasing replicas in Minio and Spark.

The primary technologies used in my design are

  1. Kubernetes is a container orchestration platform on which the next two technologies are hosted.
  2. Minio is a high-performance, S3 compatible object storage. We will use this as our data storage solution.
  3. Apache Spark is a unified engine for large-scale analytics.

These three are all open-source technologies which we will run on Amazon EC2, a VM hosting solution. I consider this setup cloud-agnostic since Azure Virtual Machines and Google Compute Engine are close substitutes and the OS I use is Ubuntu 18.04.6, an open-source Linux distro.

Virtual machines

Pre-requisites

You will need to set up an EC2 instance loaded with

Click on the tool names for instructions on how to install them.

Since we will be running multiple applications including Spark in our Minikube cluster, I recommend using a large instance such as a t3a.2xlarge instance with 8 vCPUs and 32 GiB of RAM to avoid hitting core and memory limits.

Start up Minikube

Go to AWS console and start up your EC2 with Minikube installed. If you don’t have one, refer to Pre-requisites. Take note of the instance’s public IP and start an SSH session using the following

ssh -i {private-key-file} ubuntu@{ec2-public-ip}

Let’s start Minikube using the docker driver. We add the –cpus=”max” option to override the default value of 2. We need all cores available here.

minikube start --cpus="max" --driver=docker

Install Minio

We will now setup the storage layer by installing Minio in Minikube by using the official helm chart from charts.min.io.

Helm charts make installing applications on the Kubernetes clusters easy but grouping a set of k8s manifests together as charts. Installing an application is as simple as pulling a chart and installing it as a release. You find thousands of charts at artifacthub.io including applications such as prometheus and trino.

We will be installing Minio in the minio namespace. First we must create the namespace

kubectl create namespace minio

Next, we add the charts.min.io as a chart repository. Helm will search this repository when trying to pull a chart. The repository alias will be minio

helm repo add minio https://charts.min.io/

Let’s pull the minio chart from the minio repository. The chart will be retrieved as a tar file. We specify the –untar option to unpack the files into a folder named minio

helm pull minio/minio --untar
The minio folder as created in the user home directory

Let’s examine the contents of the helm chart.

ls -ltr ~/minio

The following explanation is abridged from helm.sh. The minio chart only includes the obligatory files.

minio/
  Chart.yaml          # A YAML file containing information about the chart
  README.md           # OPTIONAL: A human-readable README file
  values.yaml         # The default configuration values for this chart
  templates/          # A directory of templates that, when combined with values, will generate valid Kubernetes manifest files.

Read helm.sh for an explanation of all possible file types in a helm chart.

Contents of the minio helm chart

In particular, we are interested in values.yaml. We will have to customize them to our small Minikube cluster. Let’s take a look at the file in vim.

vim ~/minio/values.yaml

We see a list of YAML key-value pairs. As mentioned previously, these values are used in conjunction with the templates in /templates to create k8s manifests.

We will customize a few values

  1. rootUser is the username for the root user in MinIO
  2. rootPassword is the password for the root user in MinIO
  3. mode refers to the MinIO mode. standalone is the simplest MinIO mode which consists of a single MinIO process and drive. In production, you would use distributed mode for better availability and data redundancy
  4. replicas is the number of MinIO containers running. We set it to 1 to minimize resource usage but it would be much higher in production to maximize IO throughput
  5. resources.requests.memory refers to the amount of memory allocated per pod. We only assign 1Gi here but it would be much higher in production
  6. persistence.size defines the size of the Persistent Volume Claim. The standard storage class in Minikube will create a hostpath volume which the PVC will mount.

These settings are not intended for production but as a proof of concept in Minikube

We use the helm install command to install the minio/minio chart as the myminio release in the minio namespace together with multiple –set options with custom values.

helm install --namespace minio  \
--set rootUser=root,rootPassword=password123 \
--set mode=standalone \
--set replicas=1 \
--set resources.requests.memory=1Gi \
--set persistence.size=5Gi \
myminio minio/minio
Output of using helm to install MinIO

Let’s deep drive into the Kubernetes resources created by the helm chart installation.

kubectl -n minio get all

Notice that only a single pod was created but there are two services which expose it.

The myminio service exposes port 9000 on which runs the S3 compatible API. Spark (and other applications) will establish connection with this port to ListObjects, GetObjects and PutObjects.

The myminio-console services exposes port 9001 on which runs the MinIO console. The MinIO console is a graphical Web App that can be used to perform CRUD operations with objects and monitor the status of MinIO

Notice that both services of type ClusterIP which means they are only accessible from within the cluster. We will overcome this with port-forwarding later.

But what about the persistent storage? Let’s examine the Persistent Volume Claims.

kubectl -n minio get pvc

We see a single 5GiB PVC which has been bound to a volume. We can examine the Persistent Volume to which the PVC is bound.

kubectl -n minio get pv

We can observe the PV-PVC relationship, where a single PVC is bound to a single PV. But how was the Persistent Volume created? Let’s get more information about the PersistentVolume

kubectl -n minio describe pv/{persistent-volume-name}
Detailed information about the MinIO Persistent Volume

Notice that its StorageClass is standard and its Source.Type is HostPath and Source.Path is a directory on the VM filesystem. You would be able to inspect the contents by entering the Minikube VM file using minikube ssh then navigating to the directory.

Why is the Persistent Volume of type HostPath? We can examine the standard storage class in Minikube.

kubectl get sc

We can observe the standard storage class is backed by the k8s.io/minikube-hostpath provisioner. To sum up, the myminio PersistentVolumeClaim makes a call to the standard storage class which provides a 5Gi hostPath PersistentVolume to which the PVC binds to.

Great, now that we have an understanding of how MinIO was installed in Kubernetes. Let’s actually use it. Let’s start by accessing the MinIO console.

We will first need to port forward the console service running on port 9001. We add the –address 0.0.0.0 option to expose the service on port 9001 of the EC2 instance and make it accessible to the Internet.

kubectl -n minio port-forward --address 0.0.0.0 svc/myminio-console 9001
Port-forwading the MinIO console service to the internet

Use your web browser to access the console on the EC2 public IP address on port 9001. Login user credentials root and password123

http://{ec2-public-ip}:9001/login
Login page of the MinIO console

Logging in will drop you into a list of buckets. At this point, there are none. Click on Create Bucket + to begin creating a bucket

There are no buckets in MinIO (yet)

Name the bucket datalake and click on Create Bucket

Creating the datalake bucket

Wonderful, we have created the datalake bucket in MinIO. It’s empty right now but we will soon fix that.

An empty datalake bucket

We have created the data storage layer of architecture. Now let’s prepare the data processing layer.

Setup Spark

In this section we will build a Spark Docker image to run in Minikube.

I have previously written an article Set up Spark and Hive for Data warehousing and processing where I installed Spark and used YARN as a cluster manager. However in this article, Kubernetes will take the place of YARN and MinIO takes the place of HDFS. Hive is entirely absent.

To accommodate these differences, we will need to download AWS S3 drivers and build a Spark image to be submitted to the Kubernetes API server.

First, download the jar files for Spark 3.1.1 and Hadoop 3.2 from archive.apache.org

wget https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

Unpack the tar file into a directory of the same name

tar xzf spark-3.1.1-bin-hadoop3.2.tgz

Remove the tar file

rm spark-3.1.1-bin-hadoop3.2.tgz

Move the directory into /opt and make a symlink in the same directory /opt/spark that points to it.

sudo mv -f spark-3.1.1-bin-hadoop3.2 /opt
sudo ln -s spark-3.1.1-bin-hadoop3.2 /opt/spark

Now, change directory to /opt/spark/jars. Here we have a list of jars which contain the classes available to Spark at runtime.

cd /opt/spark/jars
ls -ltr

By default, Spark does not contain the classes need to access the S3 API (which MinIO is compatible with). We will need to download the jars into /opt/spark/jars. When we build the Spark Docker image later, these jars will be copied into the image and the classes will be available at runtime.

wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.2/hadoop-aws-3.1.2.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.11.534/aws-java-sdk-1.11.534.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.534/aws-java-sdk-core-1.11.534.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.534/aws-java-sdk-dynamodb-1.11.534.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.534/aws-java-sdk-kms-1.11.534.jar
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.534/aws-java-sdk-s3-1.11.534.jar
wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.3/httpclient-4.5.3.jar
wget https://repo1.maven.org/maven2/joda-time/joda-time/2.9.9/joda-time-2.9.9.jar
Downloading a long list of jars into /opt/spark/jars

We need to set a number of environment variables in ~/.profile for Spark and the Dockerfile image build script to run correctly.

vim ~/.profile
export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin
export PYSPARK_PYTHON=python3

Then execute the .profile script.

source ~/.profile

Spark also requires the Java Runtime Environment – we install JDK 11 using apt.

sudo apt install openjdk-11-jre-headless

Launch Spark Shell

In this section, we will build a Spark image and use it to create a spark-shell on Minikube. This section takes an example from Jacek Laskowski‘s excellent demo: Spark Shell on Minikube. I encourage you to read his online book The Internals of Spark on Kubernetes

One distinction to make is there are 2 Docker engines: the one running on the host machine and the one running inside of Minikube. Minikube itself is a virtual machine running in the Docker engine. You can observe this by listing Docker containers

docker ps
The host machine’s Docker containers

Notice there is a single container running Minikube. We need to build the Spark image and push it to the Docker image repository running inside of Minikube. To do this – we must first configure the environment to use Minikube’s Docker daemon instead.

eval $(minikube -p minikube docker-env)

Now, try running the same command to list Docker containers

Containers running in Minikube’s Docker engine

This time, we see many more containers including the Kubernetes Apiserver, Controller and Scheduler. Minikube is running the Kubernetes cluster daemons as Docker containers. Any pods created as part of an application will also launch new Docker containers.

Now, we’re ready to build the Spark docker image. Starting with version 2.3, Spark provides a Dockerfile under kubernetes/dockerfiles and a script named bin/docker-image-tool.sh used to build and publish the Docker image. This image is used by Kubernetes to deploy the containers inside of pods.

$SPARK_HOME/bin/docker-image-tool.sh \
  -m \
  -t v3.2.1 \
  build
Building the Spark image

You can examine the Dockerfile in $SPARK_HOME/kubernetes/dockerfiles/spark/Dockerfile. One of the steps is to copy the contents of $SPARK_HOME/jars into the image so the resulting Spark container can connect to the S3/MinIO filesystem.

Check that the image is built and published to your local image repository.

docker images spark

We will be running the Spark executors in a separate namespace called spark-shell. Let’s create it.

kubectl create ns spark-shell

One thing to understand here is how spark-submit (and by extension spark-shell) works in Kubernetes. The client makes a request to the K8s API server running on port 8443. The K8s scheduler then creates a driver pod which itself creates executor pods. A full explanation with graphic is available here.

In spark-shell’s case, the difference is that the driver runs on the host machine but it communicates with the K8s API server to create executor pods. When we kill the driver on host machine, the executors are also terminated.

We must first export the hostname and port number of the K8s API server.

K8S_SERVER=$(kubectl config view --output=jsonpath='{.clusters[].cluster.server}')
Viewing the K8s API server URI

We’re now ready to start the spark-shell by specifying the K8s API server URI as the master and using the Spark image we just built as the container image. We also add the context and the namespace.

$SPARK_HOME/bin/spark-shell \
  --master k8s://$K8S_SERVER \
  --conf spark.kubernetes.container.image=spark:v3.2.1 \
  --conf spark.kubernetes.context=minikube \
  --conf spark.kubernetes.namespace=spark-shell

After some time, it will drop you into the scala> prompt with the Spark session and Spark context and await input. You can experiment by running few Spark commands here but we’re more interested in looking at the K8s resources created.

Open another SSH session into the EC2 instance and get pods in the spark-shell namespace

kubectl -n spark-shell get pod
Two Spark executors running in the spark-shell namespace

Notice that two pods named *-exec-1 and *-exec-2 have been created. These are the Spark executors which accept connections from the driver, read and process data, and return results back to the driver. The driver itself is not visible here since it runs outside of Kubernetes and on the host machine.

A consequence of this that kubectl port-forward does not work. Instead we will create an SSH tunnel to forward port 4040 from the EC2 instance to your local machine so we can view the Spark WebUI.

First exit the current ssh sesssion but leave the spark-shell running. Now open the SSH tunnel from your local machine and forward port 4040

ssh -i {private-key-file} -L 4040:localhost:4040 ubuntu@{public-ip-of-ec2-instance}

You will now be able to see the Spark WebUI by entering the following URI in your web browser.

http://localhost:4040/jobs

A complete explanation of WebUI and all the tabs is outside the scope of this article but having a through understanding of it is invaluable for debugging and optimizing Spark applications.

Refer to official Spark documentation for a detailed treatment.

Jobs tab
Environment tab
Executors tab

Run a PySpark job

Now let’s put everything together.

We will create PySpark job that reads parquet files from Minio, filters the data, then writes the result back into Minio. This is a common ETL workflow in Spark where Minio is inter-changeable with object stores such as S3, ABFS or Google Cloud Storage.

Overview of Spark ETL job

We need to first obtain the source data from Teradata’s kylo repository

https://github.com/Teradata/kylo/tree/master/samples/sample-data/parquet

There are 5 parquet files, each containing 5000 records of mock user data.

Use the Minio console and upload the parquet files to datalake/userdata. You will need to port-forward the MinIO console service again if you stopped the previous command.

 kubectl -n minio port-forward --address 0.0.0.0 svc/myminio-console 9001

The datalake bucket is accessible in your local browser at

http://{ec2-public-ip}:9001/buckets/datalake/browse

At the end, your datalake/userdata MinIO directory should look like this.

Now, we will write the PySpark job which will contain the ETL logic. Returning to the the SSH session with the EC2 instance, we use vim to create a python file under /home/ubuntu/pyspark-project

vim /home/ubuntu/pyspark-project/run-job.py

Copy the following into the file and save.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .config("spark.hadoop.fs.s3a.endpoint", "http://myminio.minio.svc.cluster.local:9000") \
        .config("spark.hadoop.fs.s3a.access.key", "root") \
        .config("spark.hadoop.fs.s3a.secret.key", "password123") \
        .config("spark.hadoop.fs.s3a.path.style.access", True) \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .getOrCreate()

userdata = spark.read.parquet("s3a://datalake/userdata/")
userdata.show()
print(userdata.columns)

male_users = userdata.where("gender = 'Male'") \
                      .select("first_name", "last_name", "country")

male_users.write.mode("overwrite").parquet("s3a://datalake/maleuserdata/")

The information to connect to MinIO as a filesystem is in bold. The configurations are as follows

  1. spark.hadoop.fs.s3a.endpoint refers to the URL of the S3 filesystem. In this case, we give the internal DNS of the myminio service which follows the K8s convention of {service-name}.{namespace}.svc.cluster.local:{port}
  2. spark.hadoop.fs.s3a.access.key refers to the MinIO root user name
  3. spark.hadoop.fs.s3a.secret.key refers to the MinIO root user password
  4. spark.hadoop.fs.s3a.path.style.access is set to true to enable the s3a protocol which enables file sizes up to 5TB and higher throughput through multi-part upload
  5. spark.hadoop.fs.s3a.impl gives the class which implements the s3a protocol.

The remaining code is simple Spark SQL to read from MiniIO, filter the data and write the result back to MinIO. See the Dataframe API for a complete explanation.

Previously, we were using the minikube user on the host machine to launch the spark-shell driver and executor pod in Minikube. This time, we are launching the Spark application in cluster mode and the driver pod needs permissions to create and destroy pods.

We will accomplish this by creating a spark-job namespace, then creating a service account with right permissions. The Spark job will use this service account at runtime.

First, create the spark-job namespace

kubectl create namespace spark-job

Now create the spark service account in the spark-job namespace

kubectl -n spark-job create serviceaccount spark

Next, we create a ClusterRoleBinding called spark-role. This ClusterRoleBinding grants the ClusterRole edit to the spark service account which includes permissions to create, edit, and delete pods. By assuming the spark service account, the Spark job driver is granted the same permissions.

kubectl -n spark-job create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark-job:spark

We will now need to build another Spark image. Why another you may ask? Well, the previous image did not contain the bindings for Python. Fortunately, Spark provides a Dockerfile with Spark-Python bindings under kubernetes/dockerfiles/spark/bindings/python/Dockerfile

$SPARK_HOME/bin/docker-image-tool.sh \
    -m \
    -t v1.0.0 \
    -p kubernetes/dockerfiles/spark/bindings/python/Dockerfile \
    build

The build process is similar, in fact it produces base spark image first then installs Python 3 and pip etc. The final image is published as spark-py:v1.0.0

Now we will create a directory to store dependencies so that they are accessible by both Spark drivers and executors at runtime. During the spark-submit, /home/ubuntu/pyspark-project/run-job.py will be uploaded here mounted as a hostPath volume in the Spark pods.

Create the directory

mkdir /home/ubuntu/spark-upload

Mount the host machine directory into the Minikube VM under the same path using the minikube mount command.

minikube mount /home/ubuntu/spark-upload:/home/ubuntu/spark-upload

Open another SSH session. You should have multiple at this point including one port-forwarding the MinIO and another keeping the minikube mount process alive.

Now for the moment of truth, we will submit the PySpark script to the Kubernetes cluster.

K8S_SERVER=$(k config view --output=jsonpath='{.clusters[].cluster.server}') 
$SPARK_HOME/bin/spark-submit \
   --deploy-mode cluster \
   --name normans-app \
   --master k8s://$K8S_SERVER \
   --files /home/ubuntu/pyspark-project/run-job.py \
   --conf spark.kubernetes.file.upload.path=/home/ubuntu/spark-upload \
   --conf spark.kubernetes.driver.volumes.hostPath.dependencies.mount.path=/home/ubuntu/spark-upload \
    --conf spark.kubernetes.driver.volumes.hostPath.dependencies.options.path=/home/ubuntu/spark-upload \
  --conf spark.executor.instances=2 \
  --conf spark.kubernetes.container.image=pysparkjob:1.0.0 \
  --conf spark.kubernetes.driver.pod.name=pysparkjob \
  --conf spark.kubernetes.context=minikube \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  --conf spark.kubernetes.namespace=spark-job\
  /home/ubuntu/pyspark-project/run-job.py

There are multiple options to explain here so I will concentrate on the most interesting ones.

–deploy-mode cluster means both driver and executors run in the cluster. This contrasts with the previous spark-shell application where only executors ran in the cluster will the driver ran on the client machine.

–files /home/ubuntu/pyspark-project/run-job.py specifies the list of files to uploaded to the cluster

–conf spark.kubernetes.file.upload.path=/home/ubuntu/spark-upload gives the location in the cluster to which dependencies listed in –files are uploaded
–conf spark.kubernetes.driver.volumes.hostPath.dependencies.mount.path=/home/ubuntu/spark-upload means for the driver pod to mount the specified path in the cluster as a hostPath volume
–conf spark.kubernetes.driver.volumes.hostPath.dependencies.options.path=/home/ubuntu/spark-upload gives the mountPath in the driver pod under which the hostPath volume is mounted.

Submitting the Spark job in cluster mode
1 driver pod and 2 executor pods are created

You can view the logs in the Spark driver pod

kubectl -n spark-job logs -f pysparkjob
Logs of the driver pod starting up
Logs in the driver pod showing the DataFrame being processed
Logs indicating the write to MinIO is complete.

Let’s view the result of our write to datalake/maleuserdata in the MinIO console

Enter the following URL in your local machine and navigate to datalake/maleuserdata

http://{ec2-public-ip}:9001/buckets/datalake/browse/
Parquet files written to MinIO by the PySpark job

We can also view the Spark WebUI. Unlike the previous case with spark-shell, since the driver pod runs in Minikube, we can simply use kubectl port-forward.

kubectl -n spark-job port-forward --address 0.0.0.0 pod/pysparkjob 4040
Port-forwarding port 4040 on the Spark driver pod

View the Spark Web UI in your local browser.

http://{ec2-public-ip}:4040
Spark Web UI of the PySpark job

We can kill the Spark job by using the following command

$SPARK_HOME/bin/spark-submit \
        --master k8s://$K8S_SERVER \
        --kill spark-job:pysparkjob

See that no pods remain in the spark-job namespace

kubectl -n spark-job get pod
No pods in the spark-job namespace

Stop the Minikube virtual machine.

minikube stop

Congratulations, you’ve successfully executed an PySpark ETL job running in Kubernetes.

Conclusion

References

MinIO Helm Chart on Github https://github.com/minio/minio/tree/master/helm/minio

Running Spark on Kubernetes https://spark.apache.org/docs/latest/running-on-kubernetes.html

Demo: spark-shell on minikube https://jaceklaskowski.github.io/spark-kubernetes-book/demo/spark-shell-on-minikube/

Demo: Running PySpark Application on minikube https://jaceklaskowski.github.io/spark-kubernetes-book/demo/running-pyspark-application-on-minikube/

Demo: Spark and Local Filesystem in minikube https://jaceklaskowski.github.io/spark-kubernetes-book/demo/spark-and-local-filesystem-in-minikube/

Teradata’s Kylo on Github https://github.com/Teradata/kylo

3 thoughts on “Cloud-Agnostic Big Data Processing with Kubernetes, Spark and Minio

  1. Thank you so much man. Can’t imagine the amount of effort it must’ve taken you to create such a detailed explanation step by step.

    Like

Leave a comment