Want to learn how to autoscale your Kinesis Data Streams consumer applications on Kubernetes to save costs and improve resource efficiency? This blog provides a step-by-step guide on how to do so.
When you leverage Kubernetes to autoscale your Kinesis consumer applications, you can benefit from built-in features such as the horizontal pod autoscaler. If you’re not familiar with Kubernetes, see Nigel Poulton’s article, “What is Kubernetes and why should I learn?”
What are Amazon Kinesis and Kinesis Data Streams?
Amazon Kinesis is a platform for real-time data processing, ingestion, and analysis. Kinesis Data Streams is a serverless streaming data service that enables developers to collect, process, and analyze large amounts of data in real time from various sources such as social media, IoT devices, and logs. It is part of the Kinesis Streaming Data Platform, along with Kinesis Data Firehose, Kinesis Video Streams, and Kinesis Data Analytics.
Kinesis Data Streams can flexibly and continuously adapt and scale to changes in data ingestion and stream consumption rates. You can use it to build real-time data analysis applications, real-time dashboards, and real-time data pipelines.
Let’s start with an overview of some of the key concepts of Kinesis Data Streams.
Kinesis Data Streams: High-level architecture
- Kinesis data streams are the following set: Debris. Each shard has a set of data records.
- of producer Continuously push data to Kinesis Data Streams and consumer Process data in real time.
- a partition key Used to group data by shards within a stream. Kinesis Data Streams separates data records belonging to a stream into multiple shards. The partition key associated with each data record is used to determine which shard a particular data record belongs to.
- Consumers retrieve records from Amazon Kinesis Data Streams, process them, and store the results in places such as Amazon DynamoDB, Amazon Redshift, or Amazon S3. These consumers are also known as Amazon Kinesis Data Streams applications. One way to develop custom consumer applications that can process data from KDS data streams is to use the Kinesis Client Library (KCL).
How do Kinesis consumer applications scale horizontally?
The Kinesis client library ensures that there is a record processor running on every shard and processing data from that shard. KCL helps consume and process data from Kinesis data streams by handling many complex tasks related to distributed computing and scalability. Connect to data streams, enumerate shards in data streams, and use leases to coordinate shard associations with consumer applications.
A record processor is instantiated for each shard it manages. The KCL pulls data records from the data stream, pushes the records to the corresponding record processors, and checkpoints the processed records. More importantly, the shard worker associations (leases) are balanced when the number of worker instances changes or when the data stream is resharded (shards are split or merged) . This means you can scale your Kinesis Data Streams application simply by adding more instances, as KCL automatically balances shards across instances.
However, you need a way to scale your application as the load increases. Of course, you can do this manually or build a custom solution to do this.
This is where Kubernetes event-driven autoscaling (KEDA) comes into play. KEDA is a Kubernetes-based event-driven autoscaling component that can monitor event sources such as Kinesis and scale the underlying deployment (and pods) based on the number of events that need to be processed.
To see autoscaling in action, use Kinesis Client Library (KCL) 2.x to work with a Java application that uses data from Kinesis Data Stream. It is deployed to a Kubernetes cluster on Amazon EKS and scales automatically using KEDA. This application includes an implementation of ShardRecordProcessor that processes data from Kinesis streams and persists it to a DynamoDB table. Use the AWS CLI to generate data to a Kinesis stream and watch your application scale.
Before we get into the main topic, let me give you a brief overview of KEDA.
KEDA is an open source CNCF project built on native Kubernetes primitives, such as the horizontal pod autoscaler, and can be added to any Kubernetes cluster. Below is an overview of its main components (see the KEDA documentation for more information).
-
of keda-operator-metrics-apiserver The KEDA component acts as a Kubernetes metrics server that exposes metrics for the horizontal pod autoscaler.
-
KEDA Scaler integrates with external systems (such as Redis) to capture these metrics (such as list length) and drives automatic scaling of containers within Kubernetes based on the number of events that need to be processed.
-
The role of the keda-operator component is as follows: activation and deactivate introduction That is, scale to and from zero.
You can see the Kinesis Stream KEDA scaler in action, scaling based on the number of shards in AWS Kinesis Stream.
Now let’s move on to the practical part of this post.
In addition to an AWS account, you must have AWS CLI, kubectl, Docker, Java 11, and Maven installed.
Set up an EKS cluster and create a DynamoDB table and Kinesis Data Stream
There are various ways to create an Amazon EKS cluster. I prefer using the eksctl CLI for convenience.Create an EKS cluster using eksctlyou can do it simply like this:
eksctl create cluster --name <cluster name> --region <region e.g. us-east-1>
For more information, see Getting Started with Amazon EKS – eksctl.
Create DynamoDB tables to persist application data. You can use the AWS CLI to create a table with the following command.
aws dynamodb create-table
--table-name users
--attribute-definitions AttributeName=email,AttributeType=S
--key-schema AttributeName=email,KeyType=HASH
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
Create a Kinesis stream with two shards using the AWS CLI.
aws kinesis create-stream --stream-name kinesis-keda-demo --shard-count 2
Clone this GitHub repository and change to the appropriate directory.
git clone
cd kinesis-keda-autoscaling
Setting up and configuring KEDA on EKS
This tutorial uses YAML files to deploy KEDA. However, you can also use Helm charts.
Install KEDA.
# update version 2.8.2 if required
kubectl apply -f
# check Custom Resource Definitions
kubectl get crd
# check KEDA Deployments
kubectl get deployment -n keda
# check KEDA operator logs
kubectl logs -f $(kubectl get pod -l=app=keda-operator -o jsonpath='.items[0].metadata.name' -n keda) -n keda
KEDA operators and Kinesis consumer applications need to call AWS APIs. Both are executed as follows, so introduction EKS uses IAM roles for service accounts (IRSA) to provide the necessary permissions.
In this particular scenario:
- A KEDA operator must be able to obtain the shard count for a Kinesis stream. this is, Stream overview explanation API.
- Applications (specifically the KCL library) need to interact with Kinesis and DynamoDB. Large amount of IAM permissions To do so.
Configure IRSA for KEDA operators
Set your AWS account ID and OIDC ID provider as environment variables.
ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
#update the cluster name and region as required
export EKS_CLUSTER_NAME=demo-eks-cluster
export AWS_REGION=us-east-1
OIDC_PROVIDER=$(aws eks describe-cluster --name $EKS_CLUSTER_NAME --query "cluster.identity.oidc.issuer" --output text | sed -e "s/^https:////")
Create a JSON file containing trusted entities for your role.
read -r -d '' TRUST_RELATIONSHIP <<EOF
{
"Version": "2012-10-17",
"Statement": [
"Effect": "Allow",
"Principal":
"Federated": "arn:aws:iam::$ACCOUNT_ID:oidc-provider/$OIDC_PROVIDER"
,
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition":
"StringEquals":
"$OIDC_PROVIDER:aud": "sts.amazonaws.com",
"$OIDC_PROVIDER:sub": "system:serviceaccount:keda:keda-operator"
]
}
EOF
echo "$TRUST_RELATIONSHIP" > trust_keda.json
Now create an IAM role and attach the policy (see the policy_kinesis_keda.json file for details).
export ROLE_NAME=keda-operator-kinesis-role
aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust_keda.json --description "IRSA for kinesis KEDA scaler on EKS"
aws iam create-policy --policy-name keda-kinesis-policy --policy-document file://policy_kinesis_keda.json
aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::$ACCOUNT_ID:policy/keda-kinesis-policy
Associate an IAM role with a service account.
kubectl annotate serviceaccount -n keda keda-operator eks.amazonaws.com/role-arn=arn:aws:iam::$ACCOUNT_ID:role/$ROLE_NAME
# verify the annotation
kubectl describe serviceaccount/keda-operator -n keda
You must restart your KEDA operator deployment for this to take effect.
kubectl rollout restart deployment.apps/keda-operator -n keda
# to verify, confirm that the KEDA operator has the right environment variables
kubectl describe pod -n keda $(kubectl get po -l=app=keda-operator -n keda --output=jsonpath=.items..metadata.name) | grep "^s*AWS_"
# expected output
AWS_STS_REGIONAL_ENDPOINTS: regional
AWS_DEFAULT_REGION: us-east-1
AWS_REGION: us-east-1
AWS_ROLE_ARN: arn:aws:iam::<AWS_ACCOUNT_ID>:role/keda-operator-kinesis-role
AWS_WEB_IDENTITY_TOKEN_FILE: /var/run/secrets/eks.amazonaws.com/serviceaccount/token
Configure IRSA for KCL consumer applications
First, create a Kubernetes service account.
kubectl apply -f - <<EOF
apiVersion: v1
kind: ServiceAccount
metadata:
name: kcl-consumer-app-sa
EOF
Create a JSON file containing trusted entities for your role.
read -r -d '' TRUST_RELATIONSHIP <<EOF
{
"Version": "2012-10-17",
"Statement": [
"Effect": "Allow",
"Principal":
"Federated": "arn:aws:iam::$ACCOUNT_ID:oidc-provider/$OIDC_PROVIDER"
,
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition":
"StringEquals":
"$OIDC_PROVIDER:aud": "sts.amazonaws.com",
"$OIDC_PROVIDER:sub": "system:serviceaccount:default:kcl-consumer-app-sa"
]
}
EOF
echo "$TRUST_RELATIONSHIP" > trust.json
Now create an IAM role and attach the policy (see below) policy.json (File for details):
export ROLE_NAME=kcl-consumer-app-role
aws iam create-role --role-name $ROLE_NAME --assume-role-policy-document file://trust.json --description "IRSA for KCL consumer app on EKS"
aws iam create-policy --policy-name kcl-consumer-app-policy --policy-document file://policy.json
aws iam attach-role-policy --role-name $ROLE_NAME --policy-arn=arn:aws:iam::$ACCOUNT_ID:policy/kcl-consumer-app-policy
Associate an IAM role with a service account.
kubectl annotate serviceaccount -n default kcl-consumer-app-sa eks.amazonaws.com/role-arn=arn:aws:iam::$ACCOUNT_ID:role/$ROLE_NAME
# verify the annotation
kubectl describe serviceaccount/kcl-consumer-app-sa
The core infrastructure is now ready. Prepare and deploy your consumer application.
Deploy the KCL consumer application to EKS
First, you need to build a Docker image and push it to Amazon Elastic Container Registry (ECR) (see Dockerfile for more information).
Build the Docker image and push it to ECR
# create runnable JAR file
mvn clean compile assembly:single
# build docker image
docker build -t kcl-consumer-app .
AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text)
# create a private ECR repo
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com
aws ecr create-repository --repository-name kcl-consumer-app --region us-east-1
# tag and push the image
docker tag kcl-consumer-app:latest $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
docker push $AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
Build the Docker image and push it to ECR
update Consumer.yaml Include the Docker image you just pushed to ECR. The rest of the manifest remains the same.
apiVersion: apps/v1
kind: Deployment
metadata:
name: kcl-consumer
spec:
replicas: 1
selector:
matchLabels:
app: kcl-consumer
template:
metadata:
labels:
app: kcl-consumer
spec:
serviceAccountName: kcl-consumer-app-sa
containers:
- name: kcl-consumer
image: AWS_ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/kcl-consumer-app:latest
imagePullPolicy: Always
env:
- name: STREAM_NAME
value: kinesis-keda-demo
- name: TABLE_NAME
value: users
- name: APPLICATION_NAME
value: kinesis-keda-demo
- name: AWS_REGION
value: us-east-1
- name: INSTANCE_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
kubectl apply -f consumer.yaml
# verify Pod transition to Running state
kubectl get pods -w
How autoscaling works for KCL apps using KEDA
Now that you have deployed your consumer application, the KCL library should start working. The first thing to do is create a “control table” in DynamoDB.This must be the same as the name of your KCL application (in this case Kinesis Keda Demo).
It may take a few minutes for initial adjustments to occur and tables to be created. You can check the consumer application’s logs to track its progress.
kubectl logs -f $(kubectl get po -l=app=kcl-consumer --output=jsonpath=.items..metadata.name)
After you complete the lease assignment, review the table and note the following: lease owner attribute:
aws dynamodb describe-table --table-name kinesis-keda-demo
aws dynamodb scan --table-name kinesis-keda-demo
Next, let’s send data to a Kinesis stream using the AWS CLI
export KINESIS_STREAM=kinesis-keda-demo
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user1", "city":"new york"' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user2", "city":"tel aviv"' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user3", "city":"new delhi"' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user4", "city":"seattle"' | base64)
The KCL application persists each record to a target. DynamoDB A table (named users in this case). You can check the table to see the records.
aws dynamodb scan --table-name users
Notice the value of . processed attribute? Same as KCL consumer pod. This makes it easier to validate the end-to-end autoscaling process.
Create a KEDA scaler for Kinesis
here it is scaled object meaning. Please note that we are targeting . Deploying kcl-consumer (the one you created earlier) and shard count is set to 1.
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: aws-kinesis-stream-scaledobject
spec:
scaleTargetRef:
name: kcl-consumer
triggers:
- type: aws-kinesis-stream
metadata:
# Required
streamName: kinesis-keda-demo
# Required
awsRegion: "us-east-1"
shardCount: "1"
identityOwner: "operator"
Create a KEDA Kinesis scaler.
kubectl apply -f keda-kinesis-scaler.yaml
Check autoscaling for KCL applications
I started with one pod of a KCL application. But thanks to KEDA, a second pod should be coming.
kubectl get pods -l=app=kcl-consumer -w
# check logs of the new pod
kubectl logs -f <enter Pod name>
The application was able to autoscale to two pods. Number of shards: “1” of scaled object meaning.This means that there is one per pod Shards in Kinesis streams.
check Kinesis Keda Demo control table of DynamoDB – You should see an update. lease owner.
Let’s send some more data to a Kinesis stream.
export KINESIS_STREAM=kinesis-keda-demo
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user5", "city":"new york"' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user6", "city":"tel aviv"' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user7", "city":"new delhi"' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user8", "city":"seattle"' | base64)
Check the value of processed attribute. Since we scaled out to two Pods, each Pod processes a subset of records from the Kinesis stream, so the value should be different for each record.
Increase the capacity of your Kinesis streams
Let’s scale out the number of shards from 2 to 3 and continue to monitor the autoscaling of the KCL application.
aws kinesis update-shard-count --stream-name kinesis-keda-demo --target-shard-count 3 --scaling-type UNIFORM_SCALING
Once the Kinesis resharding is complete, the KEDA scaler starts working and scales out the KCL application to three pods.
kubectl get pods -l=app=kcl-consumer -w
As before, verify that your Kinesis shard lease has been renewed. Kinesis Keda Demo control table of DynamoDB – Please check lease owner attribute.
Continue sending more data to your Kinesis stream.As expected, the Pods share record processing and this processed attributes of user table.
export KINESIS_STREAM=kinesis-keda-demo
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user9", "city":"new york"' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user10", "city":"tel aviv"' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user11", "city":"new delhi"' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user12", "city":"seattle"' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user14", "city":"tel aviv"' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user15", "city":"new delhi"' | base64)
aws kinesis put-record --stream-name $KINESIS_STREAM --partition-key [email protected] --data $(echo -n '"name":"user16", "city":"seattle"' | base64)
Scale down: So far, we’ve only scaled in one direction. What happens if you reduce the shard capacity of your Kinesis stream? Try this yourself. Try reducing the number of shards from 3 to 2 and see what happens to your KCL application.
Once you have confirmed your end-to-end solution, you will need to clean up your resources to avoid incurring additional charges.
Delete the EKS cluster, Kinesis stream, and DynamoDB table.
eksctl delete cluster --name keda-kinesis-demo
aws kinesis delete-stream --stream-name kinesis-keda-demo
aws dynamodb delete-table --table-name users
In this post, you learned how to use KEDA to autoscale a KCL application that consumes data from Kinesis streams.
You can configure the KEDA scaler according to your application requirements. For example, you can set: Set shardCount to 3 and create one pod for every three shards in the Kinesis stream. However, if you want to maintain a one-to-one mapping, setting shardCount to 1 ensures that each Pod has one instance of the record processor as the KCL handles distribution coordination and lease allocation. This is an effective approach that allows you to scale out your Kinesis stream processing pipeline to meet your application’s demands.