Created
November 12, 2020 13:16
-
-
Save AdheipSingh/db009db6122dabd763fa70e0b1d9385b to your computer and use it in GitHub Desktop.
Clonset MM Autoscaling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"context" | |
"fmt" | |
"io/ioutil" | |
"net/http" | |
"os" | |
"os/exec" | |
"strings" | |
"time" | |
ctrl "sigs.k8s.io/controller-runtime" | |
kruiseapps "github.com/openkruise/kruise-api/apps/v1alpha1" | |
log "github.com/sirupsen/logrus" | |
corev1 "k8s.io/api/core/v1" | |
"k8s.io/apimachinery/pkg/api/resource" | |
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | |
"k8s.io/apimachinery/pkg/types" | |
"k8s.io/apimachinery/pkg/util/intstr" | |
"k8s.io/client-go/kubernetes" | |
"k8s.io/client-go/rest" | |
"sigs.k8s.io/controller-runtime/pkg/client" | |
) | |
var NAMESPACE = os.Getenv("NAMESPACE") | |
var ROUTER_SVC = os.Getenv("ROUTER_SVC") | |
var STORAGE_CLASS = os.Getenv("STORAGE_CLASS") | |
func main() { | |
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) | |
if err != nil { | |
fmt.Println(err, "unable to start manager") | |
os.Exit(1) | |
} | |
/* | |
conf, err := clientcmd.BuildConfigFromFlags("", os.Getenv("HOME")+"/.kube/config") | |
if err != nil { | |
log.Infof("error in getting Kubeconfig: %v", err) | |
} | |
*/ | |
conf, err := rest.InClusterConfig() | |
if err != nil { | |
panic(err.Error()) | |
} | |
k8sClient, err := kubernetes.NewForConfig(conf) | |
if err != nil { | |
panic(err) | |
} | |
_ = kruiseapps.AddToScheme(mgr.GetScheme()) | |
apiClient, err := client.New(conf, client.Options{Scheme: mgr.GetScheme()}) | |
svcList, err := k8sClient.CoreV1().Services(NAMESPACE).List(context.TODO(), v1.ListOptions{ | |
LabelSelector: "app=mm", | |
}) | |
svc := make([]*corev1.Service, 0) | |
for i := range svcList.Items { | |
svc = append(svc, &svcList.Items[i]) | |
} | |
uniqueString := getUniqueString() | |
boolP := GetPendingTasks("http://" + ROUTER_SVC + ":8888/druid/indexer/v1/pendingTasks") | |
if boolP == true { | |
err = createMMClonset(apiClient, k8sClient, uniqueString) | |
if err != nil { | |
log.Errorf("Error creating MM Cloneset %s", err) | |
} | |
} | |
for _, s := range svc { | |
boolM := GetMMTasks("http://" + s.Name + ":8091/druid/worker/v1/tasks") | |
if boolM == true { | |
log.Infof("MM is [%s] in namespace [%s] is not running any tasks, preparing for scale down", s.Name, s.Namespace) | |
cloneSet := makeCloneSetEmptyObj() | |
apiClient.Get(context.TODO(), types.NamespacedName{ | |
Name: s.Name, | |
Namespace: NAMESPACE, | |
}, cloneSet) | |
apiClient.Delete(context.TODO(), cloneSet, &client.DeleteOptions{}) | |
k8sClient.CoreV1().Services(NAMESPACE).Delete(context.TODO(), s.Name, v1.DeleteOptions{}) | |
} | |
} | |
} | |
func makeCloneSetEmptyObj() *kruiseapps.CloneSet { | |
return &kruiseapps.CloneSet{ | |
TypeMeta: v1.TypeMeta{ | |
APIVersion: "apps.kruise.io/v1alpha1", | |
Kind: "CloneSet", | |
}, | |
} | |
} | |
func createMMClonset(apiClient client.Client, k8sClient *kubernetes.Clientset, uniqueString string) error { | |
var err error | |
mmCloneset := getMMCloneSet(uniqueString) | |
err = apiClient.Create(context.TODO(), mmCloneset) | |
if err != nil { | |
return err | |
} | |
kubeclient := k8sClient.CoreV1().Services(NAMESPACE) | |
svcMM := getMMService(uniqueString) | |
_, err = kubeclient.Create(context.TODO(), svcMM, v1.CreateOptions{}) | |
if err != nil { | |
return err | |
} | |
log.Infof("New MM Node added [%s]", mmCloneset.Name) | |
log.Info("Waiting for 120 seconds for task to get started") | |
time.Sleep(120) | |
return nil | |
} | |
func getMMCloneSet(uniqueString string) *kruiseapps.CloneSet { | |
var replica int32 = 1 | |
clone := &kruiseapps.CloneSet{ | |
TypeMeta: v1.TypeMeta{ | |
APIVersion: "apps.kruise.io/v1alpha1", | |
Kind: "CloneSet", | |
}, | |
ObjectMeta: v1.ObjectMeta{ | |
Name: "mm" + "-" + uniqueString, | |
Namespace: NAMESPACE, | |
Labels: map[string]string{ | |
"app": "mm", | |
"mm": uniqueString, | |
}, | |
}, | |
Spec: kruiseapps.CloneSetSpec{ | |
Replicas: &replica, | |
Selector: &v1.LabelSelector{ | |
MatchLabels: map[string]string{ | |
"app": "mm", | |
"mm": uniqueString, | |
}, | |
}, | |
Template: corev1.PodTemplateSpec{ | |
ObjectMeta: v1.ObjectMeta{ | |
Labels: map[string]string{ | |
"app": "mm", | |
"mm": uniqueString, | |
}, | |
}, | |
Spec: corev1.PodSpec{ | |
Containers: []corev1.Container{ | |
{ | |
Name: "druid", | |
Image: "apache/druid:0.19.0", | |
Command: []string{"/druid.sh", "middleManager"}, | |
VolumeMounts: []corev1.VolumeMount{ | |
{ | |
Name: "common-mm", | |
ReadOnly: true, | |
MountPath: "/opt/druid/conf/druid/cluster/_common", | |
}, | |
{ | |
Name: "node-mm", | |
ReadOnly: true, | |
MountPath: "/opt/druid/conf/druid/cluster/data/middleManager", | |
}, | |
}, | |
}, | |
}, | |
Volumes: []corev1.Volume{ | |
{ | |
Name: "common-mm", | |
VolumeSource: corev1.VolumeSource{ | |
ConfigMap: &corev1.ConfigMapVolumeSource{ | |
LocalObjectReference: corev1.LocalObjectReference{ | |
Name: "common-mm", | |
}, | |
}, | |
}, | |
}, | |
{ | |
Name: "node-mm", | |
VolumeSource: corev1.VolumeSource{ | |
ConfigMap: &corev1.ConfigMapVolumeSource{ | |
LocalObjectReference: corev1.LocalObjectReference{ | |
Name: "node-mm", | |
}, | |
}, | |
}, | |
}, | |
}, | |
}, | |
}, | |
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ | |
{ | |
TypeMeta: v1.TypeMeta{ | |
APIVersion: "v1", | |
Kind: "PersistentVolumeClaim", | |
}, | |
ObjectMeta: v1.ObjectMeta{ | |
Name: "mm" + "-" + uniqueString, | |
Labels: map[string]string{ | |
"app": "mm", | |
"mm": uniqueString, | |
}, | |
}, | |
Spec: corev1.PersistentVolumeClaimSpec{ | |
AccessModes: []corev1.PersistentVolumeAccessMode{"ReadWriteOnce"}, | |
Resources: corev1.ResourceRequirements{ | |
Requests: corev1.ResourceList{ | |
"storage": *resource.NewQuantity(10737418240, resource.BinarySI), | |
}, | |
}, | |
StorageClassName: ptrstring(STORAGE_CLASS), | |
}, | |
}, | |
}, | |
}, | |
} | |
return clone | |
} | |
func ptrstring(p string) *string { | |
return &p | |
} | |
func getMMService(uniqueString string) *corev1.Service { | |
svc := &corev1.Service{ | |
TypeMeta: v1.TypeMeta{ | |
Kind: "Service", | |
APIVersion: "v1", | |
}, | |
ObjectMeta: v1.ObjectMeta{ | |
Name: "mm" + "-" + uniqueString, | |
Labels: map[string]string{ | |
"app": "mm", | |
}, | |
}, | |
Spec: corev1.ServiceSpec{ | |
Ports: []corev1.ServicePort{ | |
{ | |
Protocol: corev1.Protocol("TCP"), | |
Port: 8091, | |
TargetPort: intstr.IntOrString{ | |
Type: intstr.Type(0), | |
IntVal: 8091, | |
}, | |
NodePort: 0, | |
}, | |
}, | |
Selector: map[string]string{ | |
"mm": uniqueString, | |
}, | |
ClusterIP: "None", | |
}, | |
} | |
return svc | |
} | |
func getUniqueString() string { | |
var err error | |
cmd := exec.Command("sh", "bash.sh") | |
var stdout, stderr bytes.Buffer | |
cmd.Stdout = &stdout | |
cmd.Stderr = &stderr | |
err = cmd.Run() | |
if err != nil { | |
log.Errorf("cmd.Run() failed with %s\n", err) | |
} | |
rs, _ := string(stdout.Bytes()), string(stderr.Bytes()) | |
uniqueString := strings.TrimSuffix(rs, "\n") | |
return uniqueString | |
} | |
func GetPendingTasks(url string) bool { | |
req, err := http.NewRequest("GET", url, nil) | |
if err != nil { | |
log.Fatalf("NewRequest construct error : %d", err) | |
} | |
resp, err := http.DefaultClient.Do(req) | |
if err != nil { | |
log.Fatalf("GET request error on URL specified : %d", err) | |
} | |
defer resp.Body.Close() | |
respData, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
log.Fatalf("Reading Body Error : %d", err) | |
} | |
if string(respData) == "[]" { | |
log.Info("No task in pending state") | |
return false | |
} else { | |
return true | |
} | |
} | |
func GetMMTasks(url string) bool { | |
req, err := http.NewRequest("GET", url, nil) | |
if err != nil { | |
log.Fatalf("NewRequest construct error : %d", err) | |
} | |
resp, err := http.DefaultClient.Do(req) | |
if err != nil { | |
log.Fatalf("GET request error on URL specified : %d", err) | |
} | |
defer resp.Body.Close() | |
respData, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
log.Fatalf("Reading Body Error : %d", err) | |
} | |
if string(respData) == "[]" { | |
return true | |
} else { | |
return false | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment