Skip to content

Instantly share code, notes, and snippets.

@eddieparker
Last active July 8, 2022 20:50
Show Gist options
  • Save eddieparker/3bf9c22a6c4f60e4b6d5b51ec03da45c to your computer and use it in GitHub Desktop.
Save eddieparker/3bf9c22a6c4f60e4b6d5b51ec03da45c to your computer and use it in GitHub Desktop.
Format postgresql's blocked pids into a tree so you can see which root query is blocking which children.
'''
Generates a tree on the command line showing you which pid and query is blocking which. Looks like:
Blocking and Blocked Query Tree
===============================
2022-06-09 21:14:06.707790 [15257]: (SELECT item.id AS item_id, item.itemid AS item_id)
-> 2022-06-09 21:14:17.785848 [15536]: (UPDATE item SET foo_id=187 WHERE item.id = 37454797)
-> 2022-06-09 21:14:07.364804 [15280]: (UPDATE item SET foo_id=187 WHERE item.id = 37454797)
-> 2022-06-09 21:14:09.445913 [15344]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570)
-> 2022-06-09 21:14:07.059882 [15275]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570)
-> 2022-06-09 21:14:08.791996 [15327]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570)
-> 2022-06-09 21:14:06.279527 [15237]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570)
-> 2022-06-09 21:14:17.325280 [15534]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570)
-> 2022-06-09 21:08:33.299530 [14518]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570)
-> 2022-06-09 21:14:07.445441 [15282]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570)
-> 2022-06-09 21:14:11.560278 [15434]: (UPDATE item SET foo_id=1 WHERE item.id = 37460570)
-> 2022-06-09 21:03:40.118221 [14231]: (DELETE FROM scraper_work_queue_item WHERE scraper_work_queue_item.id = 9170918)
-> 2022-06-09 21:14:06.444947 [15245]: (UPDATE item SET foo_id=1 WHERE item.id = 37465692)
-> 2022-06-09 21:14:11.062618 [15426]: (UPDATE item SET foo_id=1 WHERE item.id = 37465692)
-> 2022-06-09 21:14:13.644536 [15475]: (UPDATE item SET foo_id=1 WHERE item.id = 37465692)
-> 2022-06-09 21:14:06.008387 [15233]: (UPDATE item SET foo_id=1 WHERE item.id = 37465692)
-> 2022-06-09 21:14:14.211249 [15496]: (UPDATE item SET foo_id=1 WHERE item.id = 37465692)
'''
import datetime
import os
import sys
import json
g_sql = '''
SELECT
activity.pid,
extract(epoch from activity.backend_start) as start_time,
blocking.pid AS blocking_pid,
activity.application_name,
activity.wait_event_type,
activity.wait_event,
activity.query,
blocking.query AS blocking_query
FROM pg_stat_activity AS activity
LEFT JOIN pg_stat_activity AS blocking ON blocking.pid = ANY(pg_blocking_pids(activity.pid))
ORDER BY activity.backend_start ASC
;
'''
import psycopg2
g_settings_filename = 'settings.json'
if os.path.exists(g_settings_filename):
with open(g_settings_filename, 'r') as f:
d = json.load(f)
for key, value in d.items():
os.environ[key] = value
g_query_db = True
def try_get_env_variable(env_name):
global g_query_db
value = os.environ.get(env_name, None)
if value is None:
g_query_db = False
print(f'Failed to get environment variable {env_name}.')
return None
return value
dbname = try_get_env_variable(f'BLOCKED_PIDS_DBNAME')
dbuser = try_get_env_variable(f'BLOCKED_PIDS_DBUSER')
dbhost = try_get_env_variable(f'BLOCKED_PIDS_DBHOST')
dbpass = try_get_env_variable(f'BLOCKED_PIDS_DBPASS')
if not g_query_db:
print(f'One or more environment variable was not setup. Aborting.')
sys.exit(-1)
dsn = f"dbname='{dbname}' user='{dbuser} host='{dbhost}' password='{dbpass}"
connection = psycopg2.connect(
host=dbhost,
database=dbname,
user=dbuser,
password=dbpass,
)
cursor = connection.cursor()
cursor.execute(g_sql)
header = [x[0] for x in cursor.description]
data = []
for row in cursor:
data.append({header[x] : row[x] for x in range(len(row)) })
class QueryNode:
def __init__(self, datum):
self.m_datum = datum
self.m_blocking_node = None
self.m_blocked_child_nodes = set()
def get_pid(self): return self.m_datum['pid']
def get_blocking_pid(self):
blocking_pid = self.m_datum['blocking_pid']
if blocking_pid in ("NULL", ''):
return None
return blocking_pid
def get_query(self): return self.m_datum['query']
def set_blocking_node(self, node): self.m_blocking_node = node
def get_blocking_node(self): return self.m_blocking_node
def add_blocked_child_node(self, node):
self.m_blocked_child_nodes.add(node)
def get_blocked_child_nodes(self): return self.m_blocked_child_nodes
def get_start_time(self):
return datetime.datetime.fromtimestamp((float)(self.m_datum['start_time']))
def get_app_name(self):
return self.m_datum['application_name']
def get_wait_info(self):
t = self.m_datum['wait_event_type']
w = self.m_datum['wait_event']
if t is None:
return ''
return f'{t}:{w}'
def __str__(self):
replace_with_spaces = ['\r\n', '\n\r', '\n', '\r', '\t', ' ']
query_one_line = self.get_query()
while True:
old = query_one_line
for to_replace_with_space in replace_with_spaces:
query_one_line = query_one_line.replace(to_replace_with_space, ' ')
if old == query_one_line:
break
return f'[{self.get_app_name()}:{self.get_pid()}]: ({self.get_wait_info()}) ({query_one_line[:100]})'
blocked_nodes_by_pid = {}
unblocked_nodes_by_pid = {}
for datum in data:
node = QueryNode(datum)
if node.get_blocking_pid() is None:
unblocked_nodes_by_pid[node.get_pid()] = node
else:
blocked_nodes_by_pid[node.get_pid()] = node
# Resolve lookups for all nodes to see which are blocked by what
for _, node in blocked_nodes_by_pid.items():
blocking_node = blocked_nodes_by_pid.get(node.get_blocking_pid(), None)
if blocking_node is None:
blocking_node = unblocked_nodes_by_pid.get(node.get_blocking_pid())
if blocking_node is None:
print(f'Node {node} says it is blocked but we can\'t find the node that\'s blocking it?')
continue
node.set_blocking_node(blocking_node)
blocking_node.add_blocked_child_node(node)
root_blockers = set()
for _, node in blocked_nodes_by_pid.items():
while True:
parent_node = node.get_blocking_node()
if parent_node is None:
root_blockers.add(node)
break
node = parent_node
# Print out root blockers and iterate through their children.
def recursively_print_node(node, recursion, visited_pids):
tab = ' '*recursion
prefix = tab
if recursion > 50:
print(f'{prefix}!!!MAX RECURSION REACHED! QUITTING!!!')
return
if recursion > 0:
prefix += f' [{recursion+1}]-> '
if node.get_pid() in visited_pids:
print(f"{prefix}!!!CIRCULAR LOOP DETECTED!!!")
return # Circular loop
print(f'{node.get_start_time()} {prefix}{node}')
recursion += 1
# Print children
for child in node.get_blocked_child_nodes():
recursively_print_node(child, recursion, visited_pids)
print("Blocking and Blocked Query Tree")
print("===============================")
found_something = False
for node in sorted(root_blockers, key=lambda x: x.get_start_time() ):
found_something = True
recursion = 0
visited_pids = set()
recursively_print_node(node, recursion, visited_pids)
print()
if not found_something:
print("None found.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment