Last active
September 1, 2023 19:50
-
-
Save chriswgerber/0967ca35311848de938387f2ba31b88c to your computer and use it in GitHub Desktop.
Pi Monitor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Forked from https://raw.githubusercontent.com/Bunn/pi_monitor/master/pi-monitor.py""" | |
from dataclasses import dataclass, asdict | |
from http.server import BaseHTTPRequestHandler, HTTPServer | |
import json | |
import re | |
import sys | |
import subprocess | |
@dataclass | |
class Partition(object): | |
mount_point: str | |
size: int = 0 | |
used: int = 0 | |
available: int = 0 | |
utilized: int = 0 | |
def to_json(self): | |
return asdict(self) | |
class Monitor: | |
def get_thermal_temperature(self): | |
"""Alternate way of checking CPU temp""" | |
thermal = subprocess.check_output( | |
"cat /sys/class/thermal/thermal_zone0/temp", shell=True).decode("utf8") | |
return float(thermal) / 1000.0 | |
def get_soc_temperature(self): | |
"""returns the temperature of the SoC as measured by the on-board temperature sensor""" | |
temp = subprocess.check_output( | |
"vcgencmd measure_temp", shell=True).decode("utf8") | |
return float(re.findall(r'\d+\.\d+', temp)[0]) | |
def get_uptime(self): | |
"""uptime in seconds""" | |
uptime = subprocess.check_output( | |
"cat /proc/uptime", shell=True).decode("utf8") | |
return float(uptime.split(" ")[0]) | |
def get_load_average(self): | |
"""returns load averages for 1, 5, and 15 minutes""" | |
uptime = subprocess.check_output("uptime", shell=True).decode("utf8") | |
load_average = uptime.split("load average:")[1].split(",") | |
return list(map(float, load_average)) | |
def get_kernel_release(self): | |
return subprocess.check_output("uname -r", shell=True).decode("utf8").strip() | |
def get_disk_metrics(self): | |
"""Return mount metrics in MB""" | |
disks = list() | |
result = json.loads(subprocess.check_output("lsblk -o NAME,MOUNTPOINT -I 179 -J", shell=True).decode("utf8")) | |
for dev in result.get('blockdevices', []): | |
for item in dev.get('children', []): | |
disk = Partition(item.get('mountpoint', '')) | |
if disk.mount_point: | |
data = subprocess.check_output(f"df -BM | grep -E '.*{disk.mount_point}$'", shell=True).decode("utf8") | |
mnt_metrics = data.split() | |
disk.size = mnt_metrics[1][:-1] | |
disk.used = mnt_metrics[2][:-1] | |
disk.available = mnt_metrics[3][:-1] | |
disk.utilized = mnt_metrics[4][:-1] | |
disks.append(disk.to_json()) | |
return disks | |
def get_memory_usage(self): | |
"""returns total, free and available memory in kB""" | |
meminfo = subprocess.check_output("cat /proc/meminfo", shell=True).decode("utf8").strip() | |
memory_usage = meminfo.split("\n") | |
total_memory = [x for x in memory_usage if 'MemTotal' in x][0] | |
free_memory = [x for x in memory_usage if 'MemFree' in x][0] | |
available_memory = [x for x in memory_usage if 'MemAvailable' in x][0] | |
total_memory = re.findall(r'\d+', total_memory)[0] | |
free_memory = re.findall(r'\d+', free_memory)[0] | |
available_memory = re.findall(r'\d+', available_memory)[0] | |
data = { | |
"total_memory": int(total_memory), | |
"free_memory": int(free_memory), | |
"available_memory": int(available_memory) | |
} | |
return data | |
def get_json(self): | |
data = { | |
"soc_temperature": self.get_soc_temperature(), | |
"uptime": self.get_uptime(), | |
"load_average": self.get_load_average(), | |
"kernel_release": self.get_kernel_release(), | |
"memory": self.get_memory_usage(), | |
"disk": self.get_disk_metrics(), | |
} | |
return json.dumps(data) | |
class MonitorServer(BaseHTTPRequestHandler): | |
def set_response(self): | |
self.send_response(200) | |
self.send_header('Content-type', 'application/json') | |
self.end_headers() | |
def do_GET(self): | |
self.set_response() | |
print(self.path) | |
if self.path == "/monitor.json" or self.path == "/monitor": | |
response = Monitor().get_json().encode() | |
self.wfile.write(response) | |
def main(): | |
port = 8088 | |
port_argument = sys.argv[-1] | |
if port_argument.isdigit(): | |
port = int(port_argument) | |
httpd = HTTPServer(('', port), MonitorServer) | |
httpd.serve_forever() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment