Skip to content

Instantly share code, notes, and snippets.

@dharmab
Created January 31, 2022 17:40
Show Gist options
  • Save dharmab/e31af8a6c28a023b39409e3fafbb3ce8 to your computer and use it in GitHub Desktop.
Save dharmab/e31af8a6c28a023b39409e3fafbb3ce8 to your computer and use it in GitHub Desktop.
k8s-proxy.py
from typing import Optional, Any, Union, Dict, List, Set
import json
import kubernetes.client
import urllib3.response
from yarl import URL
import functools
import operator
import datetime
import dataclasses
class ProxiedResponse(urllib3.response.HTTPResponse):
"""Extends an HTTPResponse with a raise_for_status() method"""
def __init__(self, base_response: urllib3.response.HTTPResponse):
self._base_response = base_response
super().__init__(self)
def raise_for_status(self) -> None:
"""
https://requests.readthedocs.io/en/master/api/#requests.Response.raise_for_status
"""
if 400 <= self._base_response.status < 600:
raise urllib3.exceptions.HTTPError(
f"{self._base_response.status} Error: {self._base_response.reason} for url: {self._base_response.geturl()}"
)
def __getattribute__(self, item: str) -> Any:
# https://docs.python.org/3/reference/datamodel.html#object.__getattribute__
if item in ["_base_response", "raise_for_status"]:
return object.__getattribute__(self, item)
return getattr(self._base_response, item)
JSONType = Union[str, int, float, bool, None, Dict, List]
def proxy_request(
*,
method: str,
core_client: kubernetes.client.CoreV1Api,
namespace: str,
service: str,
path: str,
params: Optional[Dict[str, Union[list, str]]] = None,
timeout: Optional[int] = None,
headers: Optional[Dict] = None,
json_request_body: Optional[JSONType] = None,
data: Optional[Union[str, Dict]] = None,
) -> ProxiedResponse:
"""
Proxies an HTTP request through the Kubernetes API server to a destination service.
:param method: HTTP method as string ("GET", "POST")
:param core_client: Kubernetes core client
:param namespace: Kubernetes service's Namespace
:param service: Target service in format "serviceName:port", e.g. "web:8080"
:param path: Target URL path
:param params: Target URL parameters
:param timeout: Request timeout
:param headers: Request HTTP headers
:param json_request_body: JSON request data
:param data: Non-JSON request data. See: https://requests.readthedocs.io/en/master/user/quickstart/#more-complicated-post-requests
:return: Response from the target service
"""
if headers is None:
headers = {}
if params is None:
params = {}
# Taken from kubernetes.client.CoreV1Api.connect_get_namespaced_service_proxy_with_path_with_http_info
base_path = URL("/api/v1/namespaces") / namespace / "services" / service / "proxy"
# core_client.api_client.call_api requires that the path have no queries on it.
resource_path = str(URL(f"{base_path}/{path.lstrip('/')}").with_query({}))
# core_client.api_client.call_api prefers lists of tuples over dictionaries. Using a dictionary leads to
# serialization issues in the path.
query_params: List[tuple] = functools.reduce(
operator.iconcat,
[
[(key, value)] if isinstance(value, str) else [(key, v) for v in value]
for key, value in params.items()
],
[],
)
# See kubernetes.client.CoreV1Api.connect_get_namespaced_service_proxy_with_path_with_http_info
header_params = {
"Accept": core_client.api_client.select_header_accept(["*/*"]),
}
body: Optional[Union[JSONType, Union[str, Dict]]] = None
post_params = None
if data is not None:
if isinstance(data, str):
header_params["Content-Type"] = "application/octet-stream"
body = data
elif isinstance(data, dict):
header_params["Content-Type"] = "application/x-www-form-urlencoded"
post_params = list(data.items())
else:
raise TypeError(f"Cannot POST data with type {type(data)}")
elif json_request_body is not None:
header_params[
"Content-Type"
] = core_client.api_client.select_header_content_type(["*/*"])
body = json_request_body
header_params = {**header_params, **headers}
return ProxiedResponse(
core_client.api_client.call_api(
resource_path=resource_path,
method=method,
query_params=query_params,
header_params=header_params,
post_params=post_params,
response_type="str",
auth_settings=["BearerToken"],
async_req=False,
_return_http_data_only=True,
_preload_content=False,
collection_formats={},
_request_timeout=timeout,
body=body,
)
)
# Usage example: Types and functions to manage Alertmanager silences
@dataclasses.dataclass(eq=True, frozen=True)
class Matcher:
"""
Matchers are conditions for Silences in the Alertmanager API.
"""
name: str
value: str
is_value_regular_expresion: bool
def as_dict(self) -> Dict[str, Any]:
return {
"name": self.name,
"value": self.value,
"isRegex": self.is_value_regular_expresion,
}
def as_string(self) -> str:
return f'{self.name}="{self.value}"'
class AlertNameMatcher(Matcher):
"""
A Matcher which filters for the alerts with the given names.
"""
def __init__(self, alert_names: Set[str]):
super().__init__(
name="alertname",
is_value_regular_expresion=True,
value="({})".format("|".join(alert_names)),
)
class AlertLabelMatcher(Matcher):
"""
A Matcher which filters for the alerts with the given label name and value.
"""
def __init__(self, label_name: str, label_value: str):
super().__init__(
name=label_name,
value=label_value,
is_value_regular_expresion=True,
)
def _format_time(time: datetime.datetime) -> str:
"""
Format the given time in the format expected by the Alertmanager API.
"""
return time.replace(tzinfo=None).isoformat(timespec="seconds")
def silence_alertmanager(
*,
duration: datetime.timedelta,
matchers: Set[Matcher],
core_client: kubernetes.client.CoreV1Api,
) -> Optional[str]:
"""
Attempt to create an alert silence.
Silence creation may fail, e.g. if Alertmanager is down. If that occurs,
this function returns None.
:param duration: Silence lifetime
:param matchers: Matcher conditions which the silence will use to select alerts
:param core_client: Kubernetes API client
:return: The created silence's ID, or None if the creation failed
"""
start_time = datetime.datetime.utcnow()
end_time = start_time + duration
try:
response = proxy_request(
method="POST",
core_client=core_client,
namespace="monitoring",
path="api/v2/silences",
service="alertmanager-main:9093",
timeout=10,
json_request_body={
"matchers": [m.as_dict() for m in matchers],
"startsAt": _format_time(start_time),
"endsAt": _format_time(end_time),
"createdBy": "k8s-infrastructure",
"comment": "k8s-infrastructure automated silence",
},
)
response.raise_for_status()
silence_id = json.loads(response.read())["silenceID"]
return silence_id
except Exception:
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment