Last active
July 8, 2022 20:50
-
-
Save eddieparker/3bf9c22a6c4f60e4b6d5b51ec03da45c to your computer and use it in GitHub Desktop.
Format postgresql's blocked pids into a tree so you can see which root query is blocking which children.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Generates a tree on the command line showing you which pid and query is blocking which. Looks like: | |
Blocking and Blocked Query Tree | |
=============================== | |
2022-06-09 21:14:06.707790 [15257]: (SELECT item.id AS item_id, item.itemid AS item_id) | |
-> 2022-06-09 21:14:17.785848 [15536]: (UPDATE item SET foo_id=187 WHERE item.id = 37454797) | |
-> 2022-06-09 21:14:07.364804 [15280]: (UPDATE item SET foo_id=187 WHERE item.id = 37454797) | |
-> 2022-06-09 21:14:09.445913 [15344]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570) | |
-> 2022-06-09 21:14:07.059882 [15275]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570) | |
-> 2022-06-09 21:14:08.791996 [15327]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570) | |
-> 2022-06-09 21:14:06.279527 [15237]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570) | |
-> 2022-06-09 21:14:17.325280 [15534]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570) | |
-> 2022-06-09 21:08:33.299530 [14518]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570) | |
-> 2022-06-09 21:14:07.445441 [15282]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570) | |
-> 2022-06-09 21:14:11.560278 [15434]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570) | |
-> 2022-06-09 21:03:40.118221 [14231]: (DELETE FROM scraper_work_queue_item WHERE scraper_work_queue_item.id = 9170918) | |
-> 2022-06-09 21:14:06.444947 [15245]: (UPDATE item SET foo_id=1 WHERE item.id = 37465692) | |
-> 2022-06-09 21:14:11.062618 [15426]: (UPDATE item SET foo_id=1 WHERE item.id = 37465692) | |
-> 2022-06-09 21:14:13.644536 [15475]: (UPDATE item SET foo_id=1 WHERE item.id = 37465692) | |
-> 2022-06-09 21:14:06.008387 [15233]: (UPDATE item SET foo_id=1 WHERE item.id = 37465692) | |
-> 2022-06-09 21:14:14.211249 [15496]: (UPDATE item SET foo_id=1 WHERE item.id = 37465692) | |
''' | |
import datetime | |
import os | |
import sys | |
import json | |
g_sql = ''' | |
SELECT | |
activity.pid, | |
extract(epoch from activity.backend_start) as start_time, | |
blocking.pid AS blocking_pid, | |
activity.application_name, | |
activity.wait_event_type, | |
activity.wait_event, | |
activity.query, | |
blocking.query AS blocking_query | |
FROM pg_stat_activity AS activity | |
LEFT JOIN pg_stat_activity AS blocking ON blocking.pid = ANY(pg_blocking_pids(activity.pid)) | |
ORDER BY activity.backend_start ASC | |
; | |
''' | |
import psycopg2 | |
g_settings_filename = 'settings.json' | |
if os.path.exists(g_settings_filename): | |
with open(g_settings_filename, 'r') as f: | |
d = json.load(f) | |
for key, value in d.items(): | |
os.environ[key] = value | |
g_query_db = True | |
def try_get_env_variable(env_name): | |
global g_query_db | |
value = os.environ.get(env_name, None) | |
if value is None: | |
g_query_db = False | |
print(f'Failed to get environment variable {env_name}.') | |
return None | |
return value | |
dbname = try_get_env_variable(f'BLOCKED_PIDS_DBNAME') | |
dbuser = try_get_env_variable(f'BLOCKED_PIDS_DBUSER') | |
dbhost = try_get_env_variable(f'BLOCKED_PIDS_DBHOST') | |
dbpass = try_get_env_variable(f'BLOCKED_PIDS_DBPASS') | |
if not g_query_db: | |
print(f'One or more environment variable was not setup. Aborting.') | |
sys.exit(-1) | |
dsn = f"dbname='{dbname}' user='{dbuser} host='{dbhost}' password='{dbpass}" | |
connection = psycopg2.connect( | |
host=dbhost, | |
database=dbname, | |
user=dbuser, | |
password=dbpass, | |
) | |
cursor = connection.cursor() | |
cursor.execute(g_sql) | |
header = [x[0] for x in cursor.description] | |
data = [] | |
for row in cursor: | |
data.append({header[x] : row[x] for x in range(len(row)) }) | |
class QueryNode: | |
def __init__(self, datum): | |
self.m_datum = datum | |
self.m_blocking_node = None | |
self.m_blocked_child_nodes = set() | |
def get_pid(self): return self.m_datum['pid'] | |
def get_blocking_pid(self): | |
blocking_pid = self.m_datum['blocking_pid'] | |
if blocking_pid in ("NULL", ''): | |
return None | |
return blocking_pid | |
def get_query(self): return self.m_datum['query'] | |
def set_blocking_node(self, node): self.m_blocking_node = node | |
def get_blocking_node(self): return self.m_blocking_node | |
def add_blocked_child_node(self, node): | |
self.m_blocked_child_nodes.add(node) | |
def get_blocked_child_nodes(self): return self.m_blocked_child_nodes | |
def get_start_time(self): | |
return datetime.datetime.fromtimestamp((float)(self.m_datum['start_time'])) | |
def get_app_name(self): | |
return self.m_datum['application_name'] | |
def get_wait_info(self): | |
t = self.m_datum['wait_event_type'] | |
w = self.m_datum['wait_event'] | |
if t is None: | |
return '' | |
return f'{t}:{w}' | |
def __str__(self): | |
replace_with_spaces = ['\r\n', '\n\r', '\n', '\r', '\t', ' '] | |
query_one_line = self.get_query() | |
while True: | |
old = query_one_line | |
for to_replace_with_space in replace_with_spaces: | |
query_one_line = query_one_line.replace(to_replace_with_space, ' ') | |
if old == query_one_line: | |
break | |
return f'[{self.get_app_name()}:{self.get_pid()}]: ({self.get_wait_info()}) ({query_one_line[:100]})' | |
blocked_nodes_by_pid = {} | |
unblocked_nodes_by_pid = {} | |
for datum in data: | |
node = QueryNode(datum) | |
if node.get_blocking_pid() is None: | |
unblocked_nodes_by_pid[node.get_pid()] = node | |
else: | |
blocked_nodes_by_pid[node.get_pid()] = node | |
# Resolve lookups for all nodes to see which are blocked by what | |
for _, node in blocked_nodes_by_pid.items(): | |
blocking_node = blocked_nodes_by_pid.get(node.get_blocking_pid(), None) | |
if blocking_node is None: | |
blocking_node = unblocked_nodes_by_pid.get(node.get_blocking_pid()) | |
if blocking_node is None: | |
print(f'Node {node} says it is blocked but we can\'t find the node that\'s blocking it?') | |
continue | |
node.set_blocking_node(blocking_node) | |
blocking_node.add_blocked_child_node(node) | |
root_blockers = set() | |
for _, node in blocked_nodes_by_pid.items(): | |
while True: | |
parent_node = node.get_blocking_node() | |
if parent_node is None: | |
root_blockers.add(node) | |
break | |
node = parent_node | |
# Print out root blockers and iterate through their children. | |
def recursively_print_node(node, recursion, visited_pids): | |
tab = ' '*recursion | |
prefix = tab | |
if recursion > 50: | |
print(f'{prefix}!!!MAX RECURSION REACHED! QUITTING!!!') | |
return | |
if recursion > 0: | |
prefix += f' [{recursion+1}]-> ' | |
if node.get_pid() in visited_pids: | |
print(f"{prefix}!!!CIRCULAR LOOP DETECTED!!!") | |
return # Circular loop | |
print(f'{node.get_start_time()} {prefix}{node}') | |
recursion += 1 | |
# Print children | |
for child in node.get_blocked_child_nodes(): | |
recursively_print_node(child, recursion, visited_pids) | |
print("Blocking and Blocked Query Tree") | |
print("===============================") | |
found_something = False | |
for node in sorted(root_blockers, key=lambda x: x.get_start_time() ): | |
found_something = True | |
recursion = 0 | |
visited_pids = set() | |
recursively_print_node(node, recursion, visited_pids) | |
print() | |
if not found_something: | |
print("None found.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment