Creating an External Scaler for KEDA

What is KEDA?

  1. Scaling Agent: The primary goal of the Scaling Agent is to activate and deactivate deployments.
  2. Metrics Server: The Metrics Server acts as a Kubernetes Metrics Server. Metrics Server sends scaling parameters like Queue Length, Number of messages in a stream to HPA so that HPA can initiate scaling.
  3. Scaler: Scalers are responsible for collecting metrics from an event source. The scale handler talks to the scaler to get the event metrics. Depends on the reported event metrics by the scaler, the scale handler scales the deployment or job. Scaler acts as a bridge between KEDA and external event sources. KEDA provides many built-in scalers. Kafka, RabbitMQ, AWS Kinesis, Azure Service Bus, NATS are some of the built-in scales.
  4. ScaledObject: ScaledObject is a Kubernetes Custom Resource Definition (CRD). We should deploy a ScaledaObject to scale a Deployment. ScaledObject defines the deployment name, minimum replicas, maximum replicas, metadata for scaler.

What is Apache Pulsar?

Developing custom scaler for KEDA

External scaler for Apache Pulsar

http://<server><port>/admin/v2/<persistent/non-persistent>/<tenant>/<namespace>/<topic>/stats
http://localhost:8080/admin/v2/persistent/public/default/my-topic/stats
{
"msgRateIn":0.0,
"msgThroughputIn":0.0,
"msgRateOut":0.0,
"msgThroughputOut":0.0,
"averageMsgSize":0.0,
"storageSize":0,
"backlogSize":0,
"publishers":[

],
"subscriptions":{
"first-subscription":{
"msgRateOut":0.0,
"msgThroughputOut":0.0,
"msgRateRedeliver":0.0,
"msgBacklog":0,
"msgBacklogNoDelayed":0,
"blockedSubscriptionOnUnackedMsgs":false,
"msgDelayed":0,
"unackedMessages":0,
"type":"Exclusive",
"activeConsumerName":"dec52",
"msgRateExpired":0.0,
"lastExpireTimestamp":0,
"lastConsumedFlowTimestamp":1590687726940,
"lastConsumedTimestamp":0,
"lastAckedTimestamp":0,
"consumers":[
{
"msgRateOut":0.0,
"msgThroughputOut":0.0,
"msgRateRedeliver":0.0,
"consumerName":"dec52",
"availablePermits":1000,
"unackedMessages":0,
"blockedConsumerOnUnackedMsgs":false,
"lastAckedTimestamp":0,
"lastConsumedTimestamp":0,
"metadata":{

},
"address":"/192.168.86.36:53688",
"connectedSince":"2020-05-28T17:42:06.93Z",
"clientVersion":"2.5.1"
}
],
"isReplicated":false
}
},
"replication":{

},
"deduplicationStatus":"Disabled",
"bytesInCounter":0,
"msgInCounter":0
}

External gRPC server

  • New(ctx context.Context, newRequest *pb.NewRequest) (*empty.Empty, error) - Notify the external scaler that a new scaler is created and initialized. There are two parameters, ctx and newRequest. The parameters ctx is the package context; newRequest is an instance of the structure NewRequest defined in the protocol file. The NewRequest structure contains ScaledObjectRef and Metadata. The ScaledObjectRef structure contains information about the Scaled Object defined in the YAML file. The member Metadata is a map containing the metadata data defined on the YAML file. The Metadata member variable is of particular interest to us. We retrieve and save the user-defined metadata and also initialize resources if any. The function returns empty pointer and error. If the error is not nil, then KEDA will not load our scaler.
  • GetMetricsSpec(ctx context.Context, in *pb.ScaledObjectRef) (*pb.GetMetricSpecResponse, error) - Return a spec containing Metric Name and Target Size. KEDA use this spec to create the HPA. The function receives two parameters, a package context and a pointer to ScaledObjectRef. The function returns a pointer to the GetMetricSpecResponse instance, which contains a list or MetricSpec. You can return a value in error if there is an error occurred, otherwise return nil.
  • IsActive(ctx context.Context, in *pb.ScaledObjectRef) (*pb.IsActiveResponse, error) - KEDA system call this function to check whether it is OK to scale the deployment to zero. The function should return a pointer to the IsActiveResponse structure. The boolean member Result indicates whether it is OK to scale to zero or not. A value of true indicates there are pending items to process and should not scale to zero. A value of false indicates there are no pending items and can safely scale to zero. Normally we will get pending messages; if there are no pending messages, we will return true otherwise we will return false. You can return a value in error if there is an error occurred, otherwise return nil.
  • GetMetrics(ctx context.Context, in *pb.GetMetricsRequest) (*pb.GetMetricsResponse, error) - KEDA calls this function to get the metrics. The function should return a list of structure MetricValue. The structure MetricValue contains the name of the metric and its value. You should get pending events to be processed and return. You can return an error if there is an error occurred, otherwise return nil.
  • Close(ctx context.Context, scaledObjectRef *pb.ScaledObjectRef) (*empty.Empty, error) - Notify the Scaler the deployment is removed and closed. We can do any cleanup procedure here. You can return an error if there is an error occurred, otherwise return nil.
syntax = "proto3";

option go_package = "github.com/krvarma/pulsar-ext-scaler/externalscaler";

package externalscaler;

import "google/protobuf/empty.proto";

service ExternalScaler {
rpc New(NewRequest) returns (google.protobuf.Empty) {}
rpc IsActive(ScaledObjectRef) returns (IsActiveResponse) {}
rpc GetMetricSpec(ScaledObjectRef) returns (GetMetricSpecResponse) {}
rpc GetMetrics(GetMetricsRequest) returns (GetMetricsResponse) {}
rpc Close(ScaledObjectRef) returns (google.protobuf.Empty) {}
}

message ScaledObjectRef {
string name = 1;
string namespace = 2;
}

message NewRequest {
ScaledObjectRef scaledObjectRef = 1;
map<string, string> metadata = 2;
}

message IsActiveResponse {
bool result = 1;
}

message GetMetricSpecResponse {
repeated MetricSpec metricSpecs = 1;
}

message MetricSpec {
string metricName = 1;
int64 targetSize = 2;
}

message GetMetricsRequest {
ScaledObjectRef scaledObjectRef = 1;
string metricName = 2;
}

message GetMetricsResponse {
repeated MetricValue metricValues = 1;
}

message MetricValue {
string metricName = 1;
int64 metricValue = 2;
}
protoc --go_out=plugins=grpc:. --go_opt=paths=source_relative externalscaler/externalscaler.proto
package main

import (
"context"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"strconv"
"strings"

log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"

"github.com/golang/protobuf/ptypes/empty"
pb "github.com/krvarma/pulsar-ext-scaler/externalscaler"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

type externalScalerServer struct {
scaledObjectRef map[string][]*pb.ScaledObjectRef
}

// Pulsar Context
type PulsarContext struct {
pulsarserver string // Pulsar URL
isPersistent bool // persistent or non-persistent
tenant string // tenant
namespace string // namespace
topic string // topic name
subscription string // subscription name
metricname string // Metric Name
targetsize int // Target size
serverurl string // Server URl
}

// Constant variables
const (
pulsarurlpattern = "%v/admin/v2/%v/%v/%v/%v/stats" // Pulsar URL format string
pulsarmsgbacklogpattern = "subscriptions.%v.msgBacklog" // JSON Tag format string
)

// Pulsar context
var (
pc PulsarContext
)

// Get environment variable, if it is not found return default value
func getEnv(key string, defvalue string) string {
value := os.Getenv(key)

if len(value) <= 0 {
value = defvalue
}

return value
}

// getPulsarAdminStatsUrl construct the Pulsar admin URL for getting the topic status and returns.
// The URL is constructed from the paramters.
func getPulsarAdminStatsUrl(server string, isPersistent bool, tenant string, namespace string, topic string) string {
pstring := "persistent"

if !isPersistent {
pstring = "non-persistent"
}

if !strings.HasPrefix(server, "https") && !strings.HasPrefix(server, "http") {
server = "http://" + server
}

return fmt.Sprintf(pulsarurlpattern, server, pstring, tenant, namespace, topic)
}

// getMsgBackLogTag JSON path that represents the topic backlog number
func getMsgBackLogTag(subscription string) string {
return fmt.Sprintf(pulsarmsgbacklogpattern, subscription)
}

// getMsgBackLog calls the Stats admin API and returns the backlog number reported
// by the server
func getMsgBackLog(pulsarurl string, subscription string) int {
log.Printf("Server URL %v", pulsarurl)
response, err := http.Get(pulsarurl)

var backlog = 0

if err == nil {
responseData, err := ioutil.ReadAll(response.Body)
if err != nil {
log.Fatal(err)
} else {
tag := getMsgBackLogTag(subscription)
value := gjson.Get(string(responseData), tag)

log.Printf("Backlog %v", value)

if value.String() != "" {
log.Print(value)

ival, err := strconv.Atoi(value.String())

if err != nil {
log.Error(err)
} else {
backlog = ival
}
}
}
} else {
log.Error(err)
}

return backlog
}

// Parse Pulsar Metadata
func parsePulsarMetadata(metadata map[string]string) {
pc.pulsarserver = metadata["server"]

ispersistent, err := strconv.ParseBool(metadata["persistent"])

if err != nil {
pc.isPersistent = ispersistent
} else {
pc.isPersistent = true
}

ts, err := strconv.Atoi(metadata["backlog"])

if err != nil {
pc.targetsize = ts
} else {
pc.targetsize = 10
}

pc.tenant = metadata["tenant"]
pc.namespace = metadata["namespace"]
pc.topic = metadata["topic"]
pc.subscription = metadata["subscription"]
pc.metricname = pc.tenant + "-" + pc.namespace + "-" + pc.topic
pc.serverurl = getPulsarAdminStatsUrl(pc.pulsarserver, pc.isPersistent, pc.tenant, pc.namespace, pc.topic)

}

// NewRequest
func (s *externalScalerServer) New(ctx context.Context, newRequest *pb.NewRequest) (*empty.Empty, error) {
out := new(empty.Empty)

parsePulsarMetadata(newRequest.Metadata)

return out, nil
}

// Close
func (s *externalScalerServer) Close(ctx context.Context, scaledObjectRef *pb.ScaledObjectRef) (*empty.Empty, error) {
out := new(empty.Empty)

return out, nil
}

// IsActive
func (s *externalScalerServer) IsActive(ctx context.Context, in *pb.ScaledObjectRef) (*pb.IsActiveResponse, error) {
backlog := getMsgBackLog(pc.serverurl, pc.subscription)

return &pb.IsActiveResponse{
Result: backlog > 0,
}, nil
}

func (s *externalScalerServer) GetMetricSpec(ctx context.Context, in *pb.ScaledObjectRef) (*pb.GetMetricSpecResponse, error) {
log.Info("Getting Metric Spec...")
spec := pb.MetricSpec{
MetricName: pc.metricname,
TargetSize: int64(pc.targetsize),
}

log.Printf("GetMetricSpec() method completed for %s", spec.MetricName)

return &pb.GetMetricSpecResponse{
MetricSpecs: []*pb.MetricSpec{&spec},
}, nil
}

func (s *externalScalerServer) GetMetrics(ctx context.Context, in *pb.GetMetricsRequest) (*pb.GetMetricsResponse, error) {
backlog := getMsgBackLog(pc.serverurl, pc.subscription)

m := pb.MetricValue{
MetricName: pc.metricname,
MetricValue: int64(backlog),
}

log.Printf("GetMetrics() method completed for %s", m.MetricName)

return &pb.GetMetricsResponse{
MetricValues: []*pb.MetricValue{&m},
}, nil
}

func newServer() *externalScalerServer {
s := &externalScalerServer{}

return s
}

func main() {
//url := getPulsarAdminStatsUrl("localhost:8080", true, "public", "default", "my-topic")
//value := getMsgBackLog(url, "first-subscription")

//log.Printf("Backlog %v\n", value)

port := getEnv("EXTERNAL_SCALER_GRPC_PORT", "8091")

lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%v", port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
} else {
log.Printf("gRPC server running on %v", port)
}

var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
reflection.Register(grpcServer)
pb.RegisterExternalScalerServer(grpcServer, newServer())
grpcServer.Serve(lis)
}

Build the external scaler and create a docker image

FROM golang:alpine AS builder
RUN mkdir /app
ADD . /app/
WORKDIR /app
RUN go build -o pulsar-ext-scaler .

FROM alpine
RUN mkdir /app
WORKDIR /app
COPY --from=builder /app/pulsar-ext-scaler .

CMD ["./pulsar-ext-scaler"]
docker buildx build --platform linux/amd64 --push -t <docker username>/pulsar-ext-scaler .
apiVersion: apps/v1
kind: Deployment
metadata:
name: keda-pulsar-ext-scaler
namespace: default
spec:
selector:
matchLabels:
service: keda-pulsar-ext-scaler
replicas: 1
template:
metadata:
labels:
service: keda-pulsar-ext-scaler
spec:
containers:
- image: <replace with your external scaler image>
name: scaler
env:
- name: EXTERNAL_SCALER_GRPC_PORT
value: "8091"
---
apiVersion: v1
kind: Service
metadata:
name: pulsar-ext-scaler-service
namespace: default
spec:
selector:
service: keda-pulsar-ext-scaler
type: ClusterIP
ports:
- protocol: TCP
port: 8091
kubectl apply -f pulsar-ext-scaler.yml

Sample Apache Pulsar consumer

package main

import (
"context"
"fmt"
"log"
"os"

"github.com/apache/pulsar-client-go/pulsar"
)

func getEnv(key string, defvalue string) string {
value := os.Getenv(key)

if len(value) <= 0 {
value = defvalue
}

return value
}

func main() {
server := getEnv("PULSAR_SERVER", "pulsar://localhost:6650")
topic := getEnv("PULSAR_TOPIC", "my-topic")
subscriptionName := getEnv("PULSAR_SUBSCRIPTION_NAME", "first-subscription")

log.Printf("Server: %v", server)
log.Printf("Topic: %v", topic)
log.Printf("Subscription: %v", subscriptionName)

client, err := pulsar.NewClient(pulsar.ClientOptions{URL: server})
if err != nil {
log.Fatal(err)
}

defer client.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: subscriptionName,
Type: pulsar.Shared,
})

if err != nil {
log.Fatal(err)
}

defer consumer.Close()

ctx := context.Background()

// Listen indefinitely on the topic
for {
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}

// Do something with the message
fmt.Printf("Message Received: %v\n", string(msg.Payload()))

if err == nil {
// Message processed successfully
consumer.Ack(msg)
} else {
// Failed to process messages
consumer.Nack(msg)
}
}
}
FROM golang:alpine AS builder
RUN mkdir /app
ADD . /app/
WORKDIR /app
RUN go build -o consumer .

FROM alpine
RUN mkdir /app
WORKDIR /app
COPY --from=builder /app/consumer .

ENV PULSAR_SERVER <replace with your pulsar server url>
ENV PULSAR_TOPIC my-topic
ENV PULSAR_SUBSCRIPTION_NAME first-subscription

CMD ["./consumer"]
apiVersion: apps/v1
kind: Deployment
metadata:
name: pulsar-consumer
namespace: default
spec:
selector:
matchLabels:
app: pulsar-consumer
replicas: 1
template:
metadata:
labels:
app: pulsar-consumer
spec:
containers:
- name: pulsar-consumer
image: <insert your image name>
imagePullPolicy: Always
---
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
name: pulsar-consumer-scaledobject
namespace: default
labels:
deploymentName: pulsar-consumer
spec:
pollingInterval: 10 # Optional. Default: 30 seconds
cooldownPeriod: 100 # Optional. Default: 300 seconds
minReplicaCount: 0 # Optional. Default: 0
maxReplicaCount: 30 # Optional. Default: 100
scaleTargetRef:
deploymentName: pulsar-consumer
triggers:
- type: external
metadata:
scalerAddress: pulsar-ext-scaler-service:8091
server: "http://<server>:8080"
persistent: "true"
backlog: "10"
tenant: "public"
namespace: "default"
topic: "my-topic"
subscription: "first-subscription"
  • server:- This points to the Pulsar Admin server endpoint.
  • persistent:- True or False, specify whether the topic is persistent or non-persistent.
  • backlog:- Specify backlog value, scaling our deployment depends on this number. If you have 30 pending messages and the backlog is 10, then KEDA will create three pods to process these messages.
  • tenant:- Name of the tenant
  • namespace:- Name of the namespace
  • topic:- Topic name
  • subscription:- Name of the subscription
kubectl apply -f pulsar-consumer-keda.yml
package main

import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/apache/pulsar-client-go/pulsar"
)

func getEnv(key string, defvalue string) string {
value := os.Getenv(key)

if value == "" {
value = defvalue
}

return value
}

func main() {
server := getEnv("PULSAR_SERVER", "pulsar://localhost:6650")
message := getEnv("PULSAR_MESSAGE", "Sample Message")
topic := getEnv("PULSAR_TOPIC", "my-topic")

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: server,
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})

if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}

defer client.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
})

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte(message),
})

defer producer.Close()

if err != nil {
fmt.Println("Failed to publish message", err)
}

fmt.Println("Published message")
}

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store