Skip to content

Instantly share code, notes, and snippets.

@tos-kamiya
Created September 15, 2024 18:26
Show Gist options
  • Save tos-kamiya/4665a763fc17b3d4fb6134a69a575159 to your computer and use it in GitHub Desktop.
Save tos-kamiya/4665a763fc17b3d4fb6134a69a575159 to your computer and use it in GitHub Desktop.
A script to monitor CPU/GPU RAM usage of a command on NVIDIA systems, with process search by keyword and JSON support.
#!/usr/bin/env python3
import argparse
import json
import os
import subprocess
import sys
import time
from typing import List, Tuple, Dict
def monitor_processes(command: List[str], keywords: List[str]) -> Tuple[int, Dict[int, Tuple[str, int, int]], float]:
"""
Executes the specified command and monitors resource usage (GPU and CPU memory) for both
the executed process and other target processes.
Args:
command (List[str]): The command and its arguments to be executed.
keywords (List[str]): List of keywords to search for target processes.
Returns:
Tuple[int, Dict[int, Tuple[str, int, int]], float]:
A tuple containing:
- The PID of the executed command as an integer.
- A dictionary where:
- Key: PID of the process (executed or target).
- Value: Tuple containing (command line, peak GPU memory, peak CPU memory).
- Elapsed time of the execution of the command as a float.
"""
start_time = time.time()
process = subprocess.Popen(command)
# Dictionary to store information about each monitored process
# Key: PID, Value: (command line, peak GPU memory, peak CPU memory)
monitored_processes: Dict[int, Tuple[str, int, int]] = {}
# Store the command line for the executed process
executed_process_cmdline = " ".join(command)
# Add the executed process's initial entry
monitored_processes[process.pid] = (executed_process_cmdline, 0, 0)
try:
while process.poll() is None: # Loop until the process finishes
# Gather all PIDs (executed and target processes)
pids_to_monitor = set([process.pid]) # Start with the executed process (use set to ensure uniqueness)
pid_to_cmdline = {}
for keyword in keywords:
target_pids = find_target_processes(keyword)
pids_to_monitor.update(target_pids.keys()) # Add target PIDs to monitor set
pid_to_cmdline.update(target_pids) # Update dictionary with target PIDs and their command lines
# Get the GPU and CPU memory usage for all monitored PIDs
pids = sorted(pids_to_monitor)
gpu_memories = get_gpu_memory(pids)
cpu_memories = get_cpu_memory(pids)
# Update the peak memory usage for each monitored process
for pid in pids_to_monitor:
# Skip the executed process when monitoring target processes
if pid == process.pid:
cmdline = executed_process_cmdline
else:
cmdline = pid_to_cmdline.get(pid, "Unknown")
gpu_memory = gpu_memories.get(pid, 0)
cpu_memory = cpu_memories.get(pid, 0)
if pid not in monitored_processes:
monitored_processes[pid] = (cmdline, gpu_memory, cpu_memory)
else:
_, old_gpu_memory, old_cpu_memory = monitored_processes[pid]
monitored_processes[pid] = (
cmdline,
max(old_gpu_memory, gpu_memory),
max(old_cpu_memory, cpu_memory),
)
time.sleep(0.5) # Wait for 0.5 seconds before the next check
except KeyboardInterrupt:
print("Monitoring interrupted.")
finally:
process.terminate() # Terminate the process
process.wait()
end_time = time.time()
elapsed_time = end_time - start_time
return process.pid, monitored_processes, elapsed_time
def get_gpu_memory(pids: List[int]) -> Dict[int, int]:
"""
Retrieves the VRAM usage for a list of PIDs.
Args:
pids (List[int]): List of process IDs.
Returns:
Dict[int, int]: A dictionary where the keys are PIDs and the values are VRAM usage in MiB.
"""
gpu_memory_usage = {}
try:
output = subprocess.check_output(
["nvidia-smi", "--query-compute-apps=pid,used_gpu_memory", "--format=csv,noheader"],
stderr=subprocess.STDOUT,
universal_newlines=True,
)
for line in output.splitlines():
ps, gs = line.split(",")
process_pid = int(ps)
if process_pid in pids:
assert gs.endswith(" MiB")
gpu_memory = int(gs[:-4])
gpu_memory_usage[process_pid] = gpu_memory
except subprocess.CalledProcessError:
pass # If there's an error, we'll return an empty dictionary
return gpu_memory_usage
def get_cpu_memory(pids: List[int]) -> Dict[int, int]:
"""
Retrieves the RAM (CPU memory) usage for a list of PIDs.
Args:
pids (List[int]): List of process IDs.
Returns:
Dict[int, int]: A dictionary where the keys are PIDs and the values are RAM usage in MiB.
"""
cpu_memory_usage = {}
for pid in pids:
try:
# For Linux
if sys.platform.startswith("linux"):
with open(f"/proc/{pid}/status", "r") as f:
for line in f:
if line.startswith("VmRSS:"):
ram_kb = int(line.split()[1].replace("kB", ""))
cpu_memory_usage[pid] = ram_kb // 1024 # Return RAM usage in MiB
break
except FileNotFoundError:
cpu_memory_usage[pid] = 0 # If the process is not found, set its memory usage to 0
return cpu_memory_usage
def find_target_processes(keyword: str) -> Dict[int, str]:
"""
Retrieve the PIDs and command lines of all processes matching the specified keyword,
skipping the current script itself.
Args:
keyword (str): The keyword to search for in the process command line.
Returns:
Dict[int, str]: A dictionary where the keys are PIDs (integers) and the values are the corresponding command lines (strings).
"""
current_pid = os.getpid() # Get the current script's PID
try:
# Run the 'ps' command to find all processes that contain the keyword
output = subprocess.check_output(["ps", "ax", "-o", "pid,command"], universal_newlines=True)
target_pids = {}
for line in output.splitlines():
if keyword in line:
pid = int(line.split()[0]) # The first part of the line is the PID
if pid != current_pid: # Skip the current script itself
cmdline = " ".join(line.split()[1:]) # The rest is the command line
target_pids[pid] = cmdline
return target_pids
except subprocess.CalledProcessError:
return {} # Return an empty dictionary if there's an error
def output_text_results(
executed_process_pid: int,
pids: List[int],
monitored_processes: Dict[int, Tuple[str, int, int]],
elapsed_time: float,
output_file: str,
) -> None:
"""
Outputs the monitoring results to a file or standard output in plain text format.
Args:
executed_process_pid (int): The PID of the executed command.
pids (List[int]): List of PIDs to display.
monitored_processes (Dict[int, Tuple[str, int, int]]): Dictionary containing monitored process information.
elapsed_time (float): The total elapsed time for the monitored command.
output_file (str): The file to write the results to (if specified).
"""
output_data = [
"----------",
f"Executed process ID: {executed_process_pid}",
f"Elapsed time: {elapsed_time:.2f} seconds",
"",
]
for pid in pids:
cmdline, gpu_memory, cpu_memory = monitored_processes[pid]
output_data += [
f"PID: {pid}",
f"Command: {cmdline}",
f"Peak GPU memory usage: {gpu_memory} MiB",
f"Peak CPU memory usage: {cpu_memory} MiB",
"",
]
output_data.append("") # Ensure a newline at the end of the file
# Write to file if specified, otherwise print to stdout
if output_file:
with open(output_file, "w") as f:
f.write("\n".join(output_data))
else:
print("\n".join(output_data), end="")
def output_json_results(
executed_process_pid: int,
pids: List[int],
monitored_processes: Dict[int, Tuple[str, int, int]],
elapsed_time: float,
output_file: str,
) -> None:
"""
Outputs the monitoring results to a file or standard output in JSON format.
Args:
executed_process_pid (int): The PID of the executed command.
pids (List[int]): List of PIDs to display.
monitored_processes (Dict[int, Tuple[str, int, int]]): Dictionary containing monitored process information.
elapsed_time (float): The total elapsed time for the monitored command.
output_file (str): The file to write the results to (if specified).
"""
results = {"executed_process_pid": executed_process_pid, "elapsed_time": elapsed_time, "processes": []}
for pid in pids:
cmdline, gpu_memory, cpu_memory = monitored_processes[pid]
process_info = {
"pid": pid,
"command": cmdline,
"peak_gpu_memory_mib": gpu_memory,
"peak_cpu_memory_mib": cpu_memory,
}
results["processes"].append(process_info)
output_data = json.dumps(results, indent=4)
# Write to file if specified, otherwise print to stdout
if output_file:
with open(output_file, "w") as f:
f.write(output_data)
else:
print(output_data)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Monitor the memory usage of a process and related processes.")
parser.add_argument("-w", "--keyword", action="append", help="Keyword to monitor target processes", required=False)
parser.add_argument("-o", "--output", help="Output file to store the results", required=False)
parser.add_argument("--json", action="store_true", help="Output in JSON format", required=False)
parser.add_argument("command", nargs=argparse.REMAINDER, help="The command to execute")
args = parser.parse_args()
if not args.command:
print("Error: A command to execute must be specified.")
sys.exit(1)
keywords = args.keyword if args.keyword else []
pid, monitored_processes, elapsed_time = monitor_processes(args.command, keywords)
pids = list(monitored_processes.keys())
# Sort the PIDs to ensure the invoked process is at the top
if pid in pids:
pids.remove(pid)
pids.sort()
pids.insert(0, pid)
# Output results based on the format
f = output_json_results if args.json else output_text_results
f(pid, pids, monitored_processes, elapsed_time, args.output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment