Skip to content

Instantly share code, notes, and snippets.

@AdheipSingh
Created November 12, 2020 13:16
Show Gist options
  • Save AdheipSingh/db009db6122dabd763fa70e0b1d9385b to your computer and use it in GitHub Desktop.
Save AdheipSingh/db009db6122dabd763fa70e0b1d9385b to your computer and use it in GitHub Desktop.
Clonset MM Autoscaling
package main
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/exec"
"strings"
"time"
ctrl "sigs.k8s.io/controller-runtime"
kruiseapps "github.com/openkruise/kruise-api/apps/v1alpha1"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)
var NAMESPACE = os.Getenv("NAMESPACE")
var ROUTER_SVC = os.Getenv("ROUTER_SVC")
var STORAGE_CLASS = os.Getenv("STORAGE_CLASS")
func main() {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
fmt.Println(err, "unable to start manager")
os.Exit(1)
}
/*
conf, err := clientcmd.BuildConfigFromFlags("", os.Getenv("HOME")+"/.kube/config")
if err != nil {
log.Infof("error in getting Kubeconfig: %v", err)
}
*/
conf, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
k8sClient, err := kubernetes.NewForConfig(conf)
if err != nil {
panic(err)
}
_ = kruiseapps.AddToScheme(mgr.GetScheme())
apiClient, err := client.New(conf, client.Options{Scheme: mgr.GetScheme()})
svcList, err := k8sClient.CoreV1().Services(NAMESPACE).List(context.TODO(), v1.ListOptions{
LabelSelector: "app=mm",
})
svc := make([]*corev1.Service, 0)
for i := range svcList.Items {
svc = append(svc, &svcList.Items[i])
}
uniqueString := getUniqueString()
boolP := GetPendingTasks("http://" + ROUTER_SVC + ":8888/druid/indexer/v1/pendingTasks")
if boolP == true {
err = createMMClonset(apiClient, k8sClient, uniqueString)
if err != nil {
log.Errorf("Error creating MM Cloneset %s", err)
}
}
for _, s := range svc {
boolM := GetMMTasks("http://" + s.Name + ":8091/druid/worker/v1/tasks")
if boolM == true {
log.Infof("MM is [%s] in namespace [%s] is not running any tasks, preparing for scale down", s.Name, s.Namespace)
cloneSet := makeCloneSetEmptyObj()
apiClient.Get(context.TODO(), types.NamespacedName{
Name: s.Name,
Namespace: NAMESPACE,
}, cloneSet)
apiClient.Delete(context.TODO(), cloneSet, &client.DeleteOptions{})
k8sClient.CoreV1().Services(NAMESPACE).Delete(context.TODO(), s.Name, v1.DeleteOptions{})
}
}
}
func makeCloneSetEmptyObj() *kruiseapps.CloneSet {
return &kruiseapps.CloneSet{
TypeMeta: v1.TypeMeta{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
},
}
}
func createMMClonset(apiClient client.Client, k8sClient *kubernetes.Clientset, uniqueString string) error {
var err error
mmCloneset := getMMCloneSet(uniqueString)
err = apiClient.Create(context.TODO(), mmCloneset)
if err != nil {
return err
}
kubeclient := k8sClient.CoreV1().Services(NAMESPACE)
svcMM := getMMService(uniqueString)
_, err = kubeclient.Create(context.TODO(), svcMM, v1.CreateOptions{})
if err != nil {
return err
}
log.Infof("New MM Node added [%s]", mmCloneset.Name)
log.Info("Waiting for 120 seconds for task to get started")
time.Sleep(120)
return nil
}
func getMMCloneSet(uniqueString string) *kruiseapps.CloneSet {
var replica int32 = 1
clone := &kruiseapps.CloneSet{
TypeMeta: v1.TypeMeta{
APIVersion: "apps.kruise.io/v1alpha1",
Kind: "CloneSet",
},
ObjectMeta: v1.ObjectMeta{
Name: "mm" + "-" + uniqueString,
Namespace: NAMESPACE,
Labels: map[string]string{
"app": "mm",
"mm": uniqueString,
},
},
Spec: kruiseapps.CloneSetSpec{
Replicas: &replica,
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{
"app": "mm",
"mm": uniqueString,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
"app": "mm",
"mm": uniqueString,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "druid",
Image: "apache/druid:0.19.0",
Command: []string{"/druid.sh", "middleManager"},
VolumeMounts: []corev1.VolumeMount{
{
Name: "common-mm",
ReadOnly: true,
MountPath: "/opt/druid/conf/druid/cluster/_common",
},
{
Name: "node-mm",
ReadOnly: true,
MountPath: "/opt/druid/conf/druid/cluster/data/middleManager",
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "common-mm",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: "common-mm",
},
},
},
},
{
Name: "node-mm",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: "node-mm",
},
},
},
},
},
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{
{
TypeMeta: v1.TypeMeta{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
},
ObjectMeta: v1.ObjectMeta{
Name: "mm" + "-" + uniqueString,
Labels: map[string]string{
"app": "mm",
"mm": uniqueString,
},
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{"ReadWriteOnce"},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
"storage": *resource.NewQuantity(10737418240, resource.BinarySI),
},
},
StorageClassName: ptrstring(STORAGE_CLASS),
},
},
},
},
}
return clone
}
func ptrstring(p string) *string {
return &p
}
func getMMService(uniqueString string) *corev1.Service {
svc := &corev1.Service{
TypeMeta: v1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: v1.ObjectMeta{
Name: "mm" + "-" + uniqueString,
Labels: map[string]string{
"app": "mm",
},
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Protocol: corev1.Protocol("TCP"),
Port: 8091,
TargetPort: intstr.IntOrString{
Type: intstr.Type(0),
IntVal: 8091,
},
NodePort: 0,
},
},
Selector: map[string]string{
"mm": uniqueString,
},
ClusterIP: "None",
},
}
return svc
}
func getUniqueString() string {
var err error
cmd := exec.Command("sh", "bash.sh")
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr
err = cmd.Run()
if err != nil {
log.Errorf("cmd.Run() failed with %s\n", err)
}
rs, _ := string(stdout.Bytes()), string(stderr.Bytes())
uniqueString := strings.TrimSuffix(rs, "\n")
return uniqueString
}
func GetPendingTasks(url string) bool {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatalf("NewRequest construct error : %d", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatalf("GET request error on URL specified : %d", err)
}
defer resp.Body.Close()
respData, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatalf("Reading Body Error : %d", err)
}
if string(respData) == "[]" {
log.Info("No task in pending state")
return false
} else {
return true
}
}
func GetMMTasks(url string) bool {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatalf("NewRequest construct error : %d", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatalf("GET request error on URL specified : %d", err)
}
defer resp.Body.Close()
respData, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatalf("Reading Body Error : %d", err)
}
if string(respData) == "[]" {
return true
} else {
return false
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment