Last active
August 28, 2023 05:55
-
-
Save onefoursix/3704e6e09cd4157f2f8b72c91bbbf389 to your computer and use it in GitHub Desktop.
Python script to get pipeline and engine metrics for StreamSets platform
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
This script writes a rolling log file that contains running Pipeline names and record counts, | |
along with SDC CPU usage and JVM heap memory metrics for all Data Collectors registered | |
with StreamSets Platform that match the specified set of Labels. | |
The script writes a sdc-resource-metrics.log as a rolling log file with the pipeline and SDC metrics | |
as well as a sdc-resource-metrics-messages-and-errors.log file that shows the SDCs that are discovered | |
and whether connections to them are successful. | |
Prerequisites: | |
- Python 3.6+ | |
- StreamSets Platform SDK for Python v5 or v6 | |
See: https://docs.streamsets.com/platform-sdk/latest/learn/installation.html | |
- Control Hub API Credentials | |
- Set the following variables in the script: | |
# Comma delimited list of SDC Engine Labels to get metrics for | |
target_engine_label_list = 'label1,label2,label3' | |
# The output directory to write metrics and logs to | |
output_dir = '/path/to/sdc-metrics' | |
# Control Hub API credentials | |
cred_id = '<your CRED ID>' | |
cred_token = '<your CRED TOKEN>' | |
# Control Hub URL | |
sch_url = 'https://na02.hub.streamsets.com' | |
# Set to True if WebSocket Communication is enabled | |
# Set to False if Direct REST APIs are used | |
websockets_enabled = False | |
# How often to capture SDC metrics | |
metrics_capture_interval_seconds = 5 * 60 # five minutes | |
# How often to query Control Hub to refresh the list of SDCs | |
sdc_list_refresh_seconds = 5 * 60 # five minutes | |
# Rolling Logfile config | |
max_bytes_pre_log_file = 100 * 1024 * 1024 # 100MB | |
number_of_rolling_logfiles = 5 | |
- Run the script | |
$ python get-sdc-metrics.py | |
- To run the script as a background process, launch the script using a command like this: | |
$ nohup python get-sdc-metrics.py > /dev/null 2>&1 & | |
- Sample metrics in the rolling log file looks like this: | |
{"timestamp": "2023-08-27 22:12:37", "sdc_url": "https://sequoia.onefoursix.com:11119", | |
"pipeline_name": "Get Weather Events", "pipeline_state": "RUNNING", | |
"input_record_count": 135, "output_record_count": 135, "error_record_count": 0, | |
"cpu_load_percentage": 46, "heap_memory_used": 523244216, "heap_memory_max": 32125353984, | |
"heap_memory_percentage": 1} | |
{"timestamp": "2023-08-27 22:12:37", "sdc_url": "https://sequoia.onefoursix.com:11119", | |
"pipeline_name": "Weather Raw to Refined", "pipeline_state": "RUNNING", | |
"input_record_count": 134, "output_record_count": 134, "error_record_count": 0, | |
"cpu_load_percentage": 33, "heap_memory_used": 591783112, "heap_memory_max": 32125353984, | |
"heap_memory_percentage": 1} | |
{"timestamp": "2023-08-27 22:12:37", "sdc_url": "https://sequoia.onefoursix.com:11119", | |
"pipeline_name": "Weather to S3", "pipeline_state": "RUNNING", | |
"input_record_count": 132, "output_record_count": 132, "error_record_count": 0, | |
"cpu_load_percentage": 28, "heap_memory_used": 653891480, "heap_memory_max": 32125353984, | |
"heap_memory_percentage": 2} | |
... | |
''' | |
# Imports | |
import os, sys, json, time, logging | |
from datetime import datetime | |
from streamsets.sdk import ControlHub, DataCollector | |
from logging.handlers import RotatingFileHandler | |
# Comma delimited list of SDC Engine Labels to get metrics for | |
target_engine_label_list = 'label1,label2,label3' | |
# The output directory to write metrics and logs to | |
output_dir = '/path/to/sdc-metrics' | |
# Control Hub API credentials | |
cred_id = '<your CRED ID>' | |
cred_token = '<your CRED TOKEN>' | |
# Control Hub URL | |
sch_url = 'https://na02.hub.streamsets.com' | |
# Set to True if WebSocket Communication is enabled | |
# Set to False if Direct REST APIs are used | |
websockets_enabled = False | |
# How often to capture SDC metrics | |
metrics_capture_interval_seconds = 5 * 60 # five minutes | |
# How often to query Control Hub to refresh the list of SDCs | |
sdc_list_refresh_seconds = 5 * 60 # five minutes | |
# Rolling Logfile config | |
max_bytes_pre_log_file = 100 * 1024 * 1024 # 100MB | |
number_of_rolling_logfiles = 5 | |
# Method to create a rolling log file | |
def create_rotating_log(): | |
log_file_name = 'sdc-resource-metrics.log' | |
log_file = output_dir + '/' + log_file_name | |
logger = logging.getLogger("Rotating Log") | |
logger.setLevel(logging.INFO) | |
handler = RotatingFileHandler(log_file, maxBytes=max_bytes_pre_log_file, backupCount=number_of_rolling_logfiles) | |
logger.addHandler(handler) | |
return logger | |
def log_message(message): | |
messages_and_errors_log.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' ' + message + '\n') | |
messages_and_errors_log.flush() | |
# Confirm the logging directory exists | |
if not os.path.isdir(output_dir): | |
log_event('Error: the directory \'' + output_dir + '\' does not exist') | |
log_event('Please create that directory in advance') | |
sys.exit(-1) | |
# Parse the label list | |
target_engine_labels = target_engine_label_list.split(',') | |
try: | |
if len(target_engine_labels) == 0: | |
log_message('Error: no engine labels provided') | |
sys.exit(-1) | |
except Exception as e: | |
log_message('Error parsing the engine label list') | |
log_message('Exception: ' + str(e)) | |
sys.exit(-1) | |
# Open the event messages and errors log | |
messages_and_errors_log_file_name = 'sdc-resource-metrics-messages-and-errors.log' | |
messages_and_errors_log = open(output_dir + '/' + messages_and_errors_log_file_name, mode = 'a') | |
# Log the target engine labels | |
log_message('Starting metrics collection for SDCs with labels ' + str(target_engine_labels)) | |
# Create the rotating metrics log file | |
metrics_logger = create_rotating_log() | |
# Connect to Control Hub | |
sch = None | |
try: | |
sch = ControlHub(credential_id=cred_id, token=cred_token, use_websocket_tunneling=websockets_enabled) | |
except Exception as e: | |
log_message('Error: Could not connect to Control Hub.') | |
log_message('Check your API credentials and the Control Hub URL') | |
log_message('Exception: ' + str(e)) | |
sys.exit(-1) | |
# A method to filter the SDCs to get metrics from, from the full set of | |
# SDCs registered with Control Hub. You can implement your own filtering logic here. | |
# For this example, I'll include only SDCs whose engine labels are contained in | |
# the given label list | |
# Returns a list of SDCs to get metrics for | |
def get_sdcs_to_get_metrics_for(sch): | |
log_message('Refreshing the list of Data Collectors to get metrics for') | |
sdcs_to_include = [] | |
sdcs_to_exclude = [] | |
for sdc in sch.data_collectors: | |
try: | |
include_sdc = False | |
# Get SDC's labels (labels and reported labels) | |
engine_labels = sdc.labels | |
engine_labels.extend(sdc.reported_labels) | |
for label in engine_labels: | |
if label in target_engine_labels: | |
include_sdc = True | |
break | |
if include_sdc: | |
sdcs_to_include.append(sdc) | |
else: | |
sdcs_to_exclude.append(sdc) | |
except: | |
log_message('Error connecting to SDC with URL: ' + sdc.engine_url) | |
sdcs_to_exclude.append(sdc) | |
log_message('SDCs that will be included in metrics collection:') | |
for sdc in sdcs_to_include: | |
log_message(sdc.engine_url) | |
log_message('SDCs that will NOT be included in metrics collection:') | |
for sdc in sdcs_to_exclude: | |
log_message(sdc.engine_url) | |
return sdcs_to_include | |
# The initial list of SDCs to get metrics for | |
sdcs = get_sdcs_to_get_metrics_for(sch) | |
# The time when we got the list of SDCs | |
sdc_refresh_time = time.time() | |
# The time when we got metrics = time.time | |
metrics_capture_time = 0 | |
try: | |
# Loop forever | |
while (True): | |
# Refresh the SDC list from Control Hub based if its been longer than sdc_list_refresh_seconds | |
if time.time() > sdc_refresh_time + sdc_list_refresh_seconds: | |
sdcs = get_sdcs_to_get_metrics_for(sch) | |
sdc_refresh_time = time.time() | |
# Get the time of metrics collection loop | |
metrics_capture_time = time.time() | |
# For every SDC this script is getting metrics for | |
for sdc in sdcs: | |
sdc_instance = None | |
# Try to connect directly to the SDC | |
try: | |
sdc_instance = sdc._instance | |
except: | |
log_message('Error connecting to SDC with URL: ' + sdc.engine_url) | |
# If we connected to the SDC.. | |
if sdc_instance is not None: | |
try: | |
# For each pipeline on the SDC | |
for pipeline in sdc_instance.pipelines: | |
# Get pipeline state | |
pipeline_status = sdc_instance.get_pipeline_status(pipeline) | |
pipeline_state = json.loads(pipeline_status.response.text)['status'] | |
# Only get metrics for pipelines that should be running | |
if pipeline_state not in ['EDITED', 'FINISHED', 'STOPPED']: | |
metrics = {} | |
metrics['timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
metrics['sdc_url'] = sdc.engine_url | |
metrics['pipeline_name'] = pipeline.title | |
metrics['pipeline_state'] = pipeline_state | |
# Get pipeline metrics | |
pipeline_metrics = sdc_instance.get_pipeline_metrics(pipeline) | |
metrics['input_record_count'] = pipeline_metrics.pipeline.input_record_count | |
metrics['output_record_count'] = pipeline_metrics.pipeline.output_record_count | |
metrics['error_record_count'] = pipeline_metrics.pipeline._data['counters']['pipeline.batchErrorRecords.counter']['count'] | |
# Get CPU Metrics | |
jmx_metrics = sdc_instance.get_jmx_metrics() | |
cpu_metrics = jmx_metrics.get('java.lang:type=OperatingSystem') | |
metrics['cpu_load_percentage'] = int(cpu_metrics['SystemCpuLoad'] * 100) | |
# Get Heap Metrics | |
# heap_metrics = jmx_metrics.get('java.lang:type=Memory')['HeapMemoryUsage'] | |
metrics['heap_memory_used'] = pipeline_metrics.pipeline._data['gauges']['jvm.memory.heap.used']['value'] | |
metrics['heap_memory_max'] = pipeline_metrics.pipeline._data['gauges']['jvm.memory.heap.max']['value'] | |
metrics['heap_memory_percentage'] = int(( metrics['heap_memory_used'] / metrics['heap_memory_max']) * 100) | |
# Convert the metric data to JSON | |
data = json.dumps(metrics) | |
# Write metrics to the rolling logfile | |
metrics_logger.info(data) | |
except Exception as e: | |
log_message('Error getting metrics for SDC URL: ' + sdc.engine_url + str(e)) | |
# Sleep between metrics capture times | |
end_metrics_capture_time = time.time() | |
if end_metrics_capture_time < metrics_capture_time + metrics_capture_interval_seconds: | |
sleep_seconds = int(metrics_capture_time + metrics_capture_interval_seconds - end_metrics_capture_time) | |
log_message('Metrics capture sleeping for ' + str(sleep_seconds) + ' seconds') | |
time.sleep(sleep_seconds) | |
log_message('Metrics capture resuming') | |
except Exception as e: | |
log_message('Error getting metrics ' + str(e)) | |
finally: | |
messages_and_errors_log.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment