Created
October 29, 2012 15:38
-
-
Save dahlia/3974241 to your computer and use it in GitHub Desktop.
Flask-DebugToolbar without Flask-SQLAlchemy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""":mod:`crosspop.web.debugtoolbar` --- :pypi:`Flask-DebugToolbar` customization | |
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
Some of objects in this module are compied from :pypi:`Flask-SQLAlchemy` | |
extension. Most of customization is removing dependencies on | |
:pypi:`Flask-SQLAlchemy` and making it to use internal database modules | |
(:mod:`crosspop.orm` and :mod:`crosspop.web.db`) instead. | |
This module is full of monkey patches! :-( Although it works well anyway. | |
Hong Minhee wrote a post about this (dirty) effort: | |
https://plus.google.com/113125643284871850887/posts/dbSsnMoTY5d | |
""" | |
import hashlib | |
import operator | |
import re | |
import sys | |
import time | |
from flask import current_app, g, json | |
from flask.ext.debugtoolbar.panels import sqlalchemy | |
from flask.ext.debugtoolbar.panels.sqlalchemy import SQLAlchemyDebugPanel | |
from flask.ext.debugtoolbar.utils import format_fname, format_sql | |
from sqlalchemy.interfaces import ConnectionProxy | |
from .db import get_database_engine | |
__all__ = ('ConnectionDebugProxy', 'DebugQueryTuple', 'MockAlchemy', | |
'PopAlchemyDebugPanel') | |
class MockAlchemy(object): | |
"""Monkey patching ``SQLAlchemy().get_engine(app)``.""" | |
def get_engine(self, app): | |
return get_database_engine() | |
sqlalchemy.SQLAlchemy = MockAlchemy | |
class DebugQueryTuple(tuple): | |
"""Customized quintuple to record each issued query. | |
Copied from :pypi:`Flask-SQLAlchemy` extension. | |
""" | |
statement = property(operator.itemgetter(0)) | |
parameters = property(operator.itemgetter(1)) | |
start_time = property(operator.itemgetter(2)) | |
end_time = property(operator.itemgetter(3)) | |
context = property(operator.itemgetter(4)) | |
@property | |
def duration(self): | |
return self.end_time - self.start_time | |
def __repr__(self): | |
fmt = '<query statement={0!r} parameters={1!r} duration={2:0.03f}' | |
return fmt.format(self.statement, self.parameters, self.duration) | |
class PopAlchemyDebugPanel(SQLAlchemyDebugPanel): | |
"""Customized subclass of | |
:class:`flask.ext.debugtoolbar.panels.sqlalchemy.SQLAlchemyDebugPanel`. | |
""" | |
def get_debug_queries(self): | |
return getattr(g, 'logged_queries', []) | |
@property | |
def has_content(self): | |
return bool(self.get_debug_queries()) | |
def nav_subtitle(self): | |
count = len(self.get_debug_queries()) | |
if count == 1: | |
return '{0} query'.format(count) | |
return '{0} queries'.format(count) | |
def content(self): | |
queries = self.get_debug_queries() | |
select_re = re.compile(r'^\s*SELECT', re.IGNORECASE) | |
data = [] | |
for query in queries: | |
is_select = bool(select_re.match(query.statement)) | |
try: | |
j_params = json.dumps(query.parameters) | |
except TypeError: | |
# parameters aren't JSON serializable | |
pass | |
digest = hashlib.sha1(current_app.secret_key + query.statement + | |
j_params).hexdigest() | |
data.append({ | |
'duration': query.duration, | |
'sql': format_sql(query.statement, query.parameters), | |
'raw_sql': query.statement, | |
'hash': digest, | |
'params': j_params, | |
'is_select': is_select, | |
'context_long': query.context, | |
'context': format_fname(query.context) | |
}) | |
return self.render('panels/sqlalchemy.html', {'queries': data}) | |
class ConnectionDebugProxy(ConnectionProxy): | |
""":class:`~sqlalchemy.interfaces.ConnectionProxy` implementation | |
which records all issues queries and stores them to | |
:data:`flask.g.queries` context locals for each request. | |
""" | |
def cursor_execute(self, execute, cursor, statement, parameters, | |
context, executemany): | |
now = time.clock if sys.platform == 'win32' else time.time | |
start = now() | |
try: | |
return execute(cursor, statement, parameters, context) | |
finally: | |
end = now() | |
try: | |
queries = g.logged_queries | |
except AttributeError: | |
queries = [] | |
g.logged_queries = queries | |
queries.append( | |
DebugQueryTuple((statement, parameters, start, end, | |
self.calling_context())) | |
) | |
def calling_context(self): | |
frm = sys._getframe(1) | |
while frm.f_back is not None: | |
name = frm.f_globals.get('__name__') | |
if name is None and '__jinja_template__' in frm.f_globals: | |
tpl = frm.f_globals['__jinja_template__'] | |
lineno = tpl.get_corresponding_lineno(frm.f_lineno) | |
return '{0}:{1}'.format(frm.f_code.co_filename, lineno) | |
elif (name and name != __name__ and | |
(name == 'crosspop' or name.startswith('crosspop.'))): | |
funcname = frm.f_code.co_name | |
return '{0}:{1} ({2})'.format( | |
frm.f_code.co_filename, | |
frm.f_lineno, | |
funcname | |
) | |
frm = frm.f_back | |
return '<unknown>' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment