Skip to content

Instantly share code, notes, and snippets.

@dahlia
Created October 29, 2012 15:38
Show Gist options
  • Save dahlia/3974241 to your computer and use it in GitHub Desktop.
Save dahlia/3974241 to your computer and use it in GitHub Desktop.
Flask-DebugToolbar without Flask-SQLAlchemy
""":mod:`crosspop.web.debugtoolbar` --- :pypi:`Flask-DebugToolbar` customization
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Some of objects in this module are compied from :pypi:`Flask-SQLAlchemy`
extension. Most of customization is removing dependencies on
:pypi:`Flask-SQLAlchemy` and making it to use internal database modules
(:mod:`crosspop.orm` and :mod:`crosspop.web.db`) instead.
This module is full of monkey patches! :-( Although it works well anyway.
Hong Minhee wrote a post about this (dirty) effort:
https://plus.google.com/113125643284871850887/posts/dbSsnMoTY5d
"""
import hashlib
import operator
import re
import sys
import time
from flask import current_app, g, json
from flask.ext.debugtoolbar.panels import sqlalchemy
from flask.ext.debugtoolbar.panels.sqlalchemy import SQLAlchemyDebugPanel
from flask.ext.debugtoolbar.utils import format_fname, format_sql
from sqlalchemy.interfaces import ConnectionProxy
from .db import get_database_engine
__all__ = ('ConnectionDebugProxy', 'DebugQueryTuple', 'MockAlchemy',
'PopAlchemyDebugPanel')
class MockAlchemy(object):
"""Monkey patching ``SQLAlchemy().get_engine(app)``."""
def get_engine(self, app):
return get_database_engine()
sqlalchemy.SQLAlchemy = MockAlchemy
class DebugQueryTuple(tuple):
"""Customized quintuple to record each issued query.
Copied from :pypi:`Flask-SQLAlchemy` extension.
"""
statement = property(operator.itemgetter(0))
parameters = property(operator.itemgetter(1))
start_time = property(operator.itemgetter(2))
end_time = property(operator.itemgetter(3))
context = property(operator.itemgetter(4))
@property
def duration(self):
return self.end_time - self.start_time
def __repr__(self):
fmt = '<query statement={0!r} parameters={1!r} duration={2:0.03f}'
return fmt.format(self.statement, self.parameters, self.duration)
class PopAlchemyDebugPanel(SQLAlchemyDebugPanel):
"""Customized subclass of
:class:`flask.ext.debugtoolbar.panels.sqlalchemy.SQLAlchemyDebugPanel`.
"""
def get_debug_queries(self):
return getattr(g, 'logged_queries', [])
@property
def has_content(self):
return bool(self.get_debug_queries())
def nav_subtitle(self):
count = len(self.get_debug_queries())
if count == 1:
return '{0} query'.format(count)
return '{0} queries'.format(count)
def content(self):
queries = self.get_debug_queries()
select_re = re.compile(r'^\s*SELECT', re.IGNORECASE)
data = []
for query in queries:
is_select = bool(select_re.match(query.statement))
try:
j_params = json.dumps(query.parameters)
except TypeError:
# parameters aren't JSON serializable
pass
digest = hashlib.sha1(current_app.secret_key + query.statement +
j_params).hexdigest()
data.append({
'duration': query.duration,
'sql': format_sql(query.statement, query.parameters),
'raw_sql': query.statement,
'hash': digest,
'params': j_params,
'is_select': is_select,
'context_long': query.context,
'context': format_fname(query.context)
})
return self.render('panels/sqlalchemy.html', {'queries': data})
class ConnectionDebugProxy(ConnectionProxy):
""":class:`~sqlalchemy.interfaces.ConnectionProxy` implementation
which records all issues queries and stores them to
:data:`flask.g.queries` context locals for each request.
"""
def cursor_execute(self, execute, cursor, statement, parameters,
context, executemany):
now = time.clock if sys.platform == 'win32' else time.time
start = now()
try:
return execute(cursor, statement, parameters, context)
finally:
end = now()
try:
queries = g.logged_queries
except AttributeError:
queries = []
g.logged_queries = queries
queries.append(
DebugQueryTuple((statement, parameters, start, end,
self.calling_context()))
)
def calling_context(self):
frm = sys._getframe(1)
while frm.f_back is not None:
name = frm.f_globals.get('__name__')
if name is None and '__jinja_template__' in frm.f_globals:
tpl = frm.f_globals['__jinja_template__']
lineno = tpl.get_corresponding_lineno(frm.f_lineno)
return '{0}:{1}'.format(frm.f_code.co_filename, lineno)
elif (name and name != __name__ and
(name == 'crosspop' or name.startswith('crosspop.'))):
funcname = frm.f_code.co_name
return '{0}:{1} ({2})'.format(
frm.f_code.co_filename,
frm.f_lineno,
funcname
)
frm = frm.f_back
return '<unknown>'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment