Last active
May 27, 2024 22:19
-
-
Save eladn/254c68c2f222593cabf92ad34c485bc3 to your computer and use it in GitHub Desktop.
Python util for representing exception information as primitive values including tracebacks and local variables
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__author__ = "Elad Nachmias" | |
__email__ = "eladnah@gmail.com" | |
__date__ = "2023-10-18" | |
import dataclasses | |
import itertools | |
import sys | |
import traceback | |
import linecache | |
from traceback import StackSummary, walk_tb, FrameSummary | |
from dataclasses import dataclass | |
from typing import Type, Optional, Dict, Tuple, List, Union | |
@dataclass(frozen=True) | |
class SerFrameSummary: | |
filename: str | |
function_name: str | |
lineno: int | |
locals: Dict[str, str] | |
line: str | |
@property | |
def name(self) -> str: | |
return self.function_name | |
@classmethod | |
def from_frame_summary(cls, frame_summary: FrameSummary) -> 'SerFrameSummary': | |
ser_frame_summary = SerFrameSummary( | |
filename=frame_summary.filename, | |
function_name=frame_summary.name, | |
lineno=frame_summary.lineno, | |
locals={} if frame_summary.locals is None else dict(frame_summary.locals), | |
line=frame_summary.line, | |
).finilize() | |
return ser_frame_summary | |
def finilize(self) -> 'SerFrameSummary': | |
as_dict = dataclasses.asdict(self) | |
# Note: here we can potentially fill missing fields. | |
ret = SerFrameSummary(**as_dict) | |
return ret | |
@classmethod | |
def from_dict(cls, as_dict: Dict) -> 'SerFrameSummary': | |
as_dict = dict(as_dict) | |
as_dict['function_name'] = as_dict.get('function_name', as_dict.get('name', None)) | |
res = cls(**as_dict).finilize() | |
return res | |
class SerStackSummary(list): | |
@classmethod | |
def from_list(cls, lst: List[Union[SerFrameSummary, FrameSummary, dict]]) -> 'SerStackSummary': | |
res = cls() | |
for item in lst: | |
if isinstance(item, SerFrameSummary): | |
frame = item | |
elif isinstance(item, FrameSummary): | |
frame = SerFrameSummary.from_frame_summary(item) | |
elif isinstance(item, dict): | |
frame = SerFrameSummary.from_dict(item) | |
else: | |
raise ValueError(f"Unsupported type {type(item)}") | |
res.append(frame) | |
return res | |
def to_list(self) -> List[dict]: | |
ret = [ | |
dataclasses.asdict(frame) | |
for frame in self | |
] | |
return ret | |
@dataclass(frozen=True) | |
class ExceptionInfo: | |
""" | |
Stores information for a raised exception. | |
Useful for storing this information as primitive values which are always pickable. | |
""" | |
exception_type_name: str | |
exception_str: str | |
traceback_str: str | |
traces: SerStackSummary | |
exception: Optional[Exception] = None | |
@classmethod | |
def create_from_exception( | |
cls, | |
e: Exception, | |
store_original_exception: bool = True, | |
limit: Optional[int] = None, | |
capture_locals: bool = True, | |
) -> 'ExceptionInfo': | |
traceback_info = sys.exc_info()[-1] # get only the traceback from the returned triplet | |
traces = StackSummary.extract( | |
walk_tb(traceback_info), limit=limit, capture_locals=capture_locals, lookup_lines=True | |
) | |
traces = SerStackSummary.from_list(traces) | |
last_meaningful_stack_frame = next( | |
(frame for frame in reversed(traces) if not traces[-1].name.lstrip('_').startswith('assert')), | |
traces[-1] | |
) | |
func_name = last_meaningful_stack_frame.function_name | |
line_nr = last_meaningful_stack_frame.lineno | |
exception_str = f'{type(e).__name__}: {e} -- [line {line_nr} @ `{func_name}()`]' | |
return cls( | |
exception=e if store_original_exception else None, | |
exception_type_name=str(type(e).__name__), | |
exception_str=exception_str, | |
traceback_str=traceback.format_exc(), | |
traces=traces, | |
) | |
@classmethod | |
def create_from_exception_type(cls, exception_type: Type[Exception], store_original_exception: bool = True) -> 'ExceptionInfo': | |
""" | |
Simulates an exception raise and process its information. | |
""" | |
try: | |
raise exception_type() | |
except exception_type as e: | |
return ExceptionInfo.create_from_exception(e=e, store_original_exception=store_original_exception) | |
def without_original_exception(self) -> 'ExceptionInfo': | |
if self.exception is None: | |
return self | |
return dataclasses.replace(self, exception=None) | |
def to_dict(self) -> dict: | |
as_dict = dataclasses.asdict(self) | |
as_dict['exception'] = None | |
as_dict['traces'] = self.traces.to_list() | |
return as_dict | |
@classmethod | |
def from_dict(cls, as_dict: dict) -> 'ExceptionInfo': | |
as_dict = dict(as_dict) | |
as_dict['traces'] = SerStackSummary.from_list(as_dict['traces']) | |
exception_info = cls(**as_dict) | |
return exception_info | |
def test_exception_info(): | |
def f1(n: int = 0): | |
z = f'value {n}' | |
if n == 10: | |
a = 'some local var value..' | |
raise ValueError( | |
'whattt' | |
) | |
f1(n=n + 1) | |
exception_info = None | |
try: | |
var_in_try = 'aaa' | |
f1() | |
except Exception as e: | |
exception_info = ExceptionInfo.create_from_exception(e=e) | |
assert exception_info is not None | |
assert len(exception_info.traces) >= 12 | |
assert exception_info.traces[-12].line == 'f1()' | |
assert exception_info.traces[-12].locals['var_in_try'] == "'aaa'" | |
assert all(exception_info.traces[i].line == 'f1(n=n + 1)' for i in range(-11, -2 + 1)) | |
assert all('a' not in exception_info.traces[i].locals for i in range(-11, -2 + 1)) | |
assert exception_info.traces[-1].line == "raise ValueError(" | |
assert exception_info.traces[-1].locals['a'] == "'some local var value..'" | |
assert all(exception_info.traces[-(10 - n + 1)].locals['z'] == f"'value {n}'" for n in range(0, 10 + 1)) | |
if __name__ == '__main__': | |
test_exception_info() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment