Last active
August 29, 2015 13:56
-
-
Save TallGirlVanessa/8835587 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from sqlalchemy.engine import create_engine | |
from sqlalchemy.types import Integer, String, TypeDecorator | |
from sqlalchemy.sql import text | |
from sqlalchemy.schema import Column | |
from sqlalchemy.orm import sessionmaker, scoped_session | |
from sqlalchemy.ext.declarative import declarative_base | |
from nose import tools | |
engine = create_engine('sqlite:///:memory:') | |
session_factory = sessionmaker(bind=engine) | |
Session = scoped_session(session_factory) | |
Base = declarative_base() | |
logging.basicConfig() | |
logging.getLogger('sqlalchemy.orm').setLevel(logging.INFO) | |
LITERAL_SQL = 'select * from users' | |
ALIASED_LITERAL_SQL = """ | |
select | |
id as users_id, | |
name as users_name, | |
department as users_department, | |
battle_cry as users_battle_cry | |
from users | |
""" | |
MISLABELED_SQL = """ | |
select | |
id as foo_id, | |
name as foo_name, | |
department as foo_department, | |
battle_cry as foo_battle_cry | |
from users | |
""" | |
class Shout(TypeDecorator): | |
impl = String | |
def process_bind_param(self, value, dialect): | |
if value is not None: | |
value = value.lower() | |
return value | |
def process_result_value(self, value, dialect): | |
if value is not None: | |
value = value.upper() | |
return value | |
class User(Base): | |
__tablename__ = 'users' | |
id = Column(Integer, primary_key=True) | |
name = Column(String(100)) | |
department = Column(String(100)) | |
battle_cry = Column(Shout(100)) | |
def __repr__(self): | |
return "<User(name='%s', department='%s', battle_cry='%s')>" % ( | |
self.name, self.department, self.battle_cry) | |
class _TestQueries(object): | |
@classmethod | |
def setUpClass(cls): | |
Base.metadata.drop_all(engine) | |
Base.metadata.create_all(engine) | |
Session.add_all([User(name='ed', department='Foo', battle_cry='ASGARD!'), | |
User(name='wendy', department='Dodgeball', battle_cry='SPARTA!')]) | |
Session.commit() | |
Session.remove() | |
def tearDown(self): | |
Session.remove() | |
class TestORM(_TestQueries): | |
def test_adhoc(self): | |
query = Session.query(User) | |
rows = query.all() | |
battle_cry = rows[0].battle_cry | |
tools.assert_true(battle_cry.isupper()) | |
class TestTAFMapping(_TestQueries): | |
def test_renamed_columns(self): | |
sp_call = text(ALIASED_LITERAL_SQL) | |
cols = list(User.__table__.c) | |
typemap = {'users_' + col.name: col.type for col in cols} | |
taf = sp_call.columns(**typemap) | |
query = Session.query(User.id, User.name, User.department, User.battle_cry).from_statement(taf) | |
rows = query.all() | |
battle_cry = rows[0].battle_cry | |
tools.assert_true(battle_cry.isupper()) | |
def test_renamed_columns_typemap(self): | |
cols = list(User.__table__.c) | |
typemap = {'users_' + col.name: col.type for col in cols} | |
sp_call = text(ALIASED_LITERAL_SQL, typemap=typemap) | |
query = Session.query(User.id, User.name, User.department, User.battle_cry).from_statement(sp_call) | |
rows = query.all() | |
battle_cry = rows[0].battle_cry | |
tools.assert_true(battle_cry.isupper()) | |
def test_renamed_model(self): | |
sp_call = text(ALIASED_LITERAL_SQL) | |
cols = list(User.__table__.c) | |
typemap = {'users_' + col.name: col.type for col in cols} | |
taf = sp_call.columns(**typemap) | |
query = Session.query(User).from_statement(taf) | |
rows = query.all() | |
battle_cry = rows[0].battle_cry | |
tools.assert_true(battle_cry.isupper()) | |
def test_model(self): | |
sp_call = text(LITERAL_SQL) | |
cols = list(User.__table__.c) | |
typemap = {col.name: col.type for col in cols} | |
taf = sp_call.columns(**typemap) | |
query = Session.query(User).from_statement(taf) | |
rows = query.all() | |
battle_cry = rows[0].battle_cry | |
tools.assert_true(battle_cry.isupper()) | |
class TestTAFLabeling(_TestQueries): | |
def test_model(self): | |
sp_call = text(LITERAL_SQL) | |
cols = list(User.__table__.c) | |
taf = sp_call.columns(*cols) | |
query = Session.query(User).from_statement(taf) | |
rows = query.all() | |
battle_cry = rows[0].battle_cry | |
tools.assert_true(battle_cry.isupper()) | |
def test_renamed_model(self): | |
sp_call = text(ALIASED_LITERAL_SQL) | |
cols = list(User.__table__.c) | |
labels = [col.label('users_' + col.name) for col in cols] | |
taf = sp_call.columns(*labels) | |
query = Session.query(User).from_statement(taf) | |
rows = query.all() | |
battle_cry = rows[0].battle_cry | |
tools.assert_true(battle_cry.isupper()) | |
def test_renamed_columns(self): | |
sp_call = text(ALIASED_LITERAL_SQL) | |
cols = list(User.__table__.c) | |
labels = [col.label('users_' + col.name) for col in cols] | |
taf = sp_call.columns(*labels) | |
query = Session.query(User.id, User.name, User.department, User.battle_cry).from_statement(taf) | |
rows = query.all() | |
battle_cry = rows[0].battle_cry | |
tools.assert_true(battle_cry.isupper()) | |
def test_renamed_columns_no_textual(self): | |
sp_call = text(ALIASED_LITERAL_SQL) | |
cols = list(User.__table__.c) | |
labels = [col.label('users_' + col.name) for col in cols] | |
taf = sp_call.columns(*labels) | |
query = Session.query(User.id, User.name, User.department, User.battle_cry).from_statement(taf) | |
rows = query.all() | |
battle_cry = rows[0].battle_cry | |
tools.assert_true(battle_cry.isupper()) | |
def test_renamed_labels(self): | |
sp_call = text(ALIASED_LITERAL_SQL) | |
cols = list(User.__table__.c) | |
labels = [col.label('users_' + col.name) for col in cols] | |
taf = sp_call.columns(*labels) | |
query = Session.query(*labels).from_statement(taf) | |
rows = query.all() | |
battle_cry = rows[0].users_battle_cry | |
tools.assert_true(battle_cry.isupper()) | |
def test_mislabeled_model(self): | |
sp_call = text(MISLABELED_SQL) | |
labels = list() | |
labels.append(User.id.property.expression.label('qux_id')) | |
labels.append(User.department.property.expression.label('baz_department')) | |
labels.append(User.name.property.expression.label('foo_name')) | |
labels.append(User.battle_cry.property.expression.label('bar_battle_cry')) | |
taf = sp_call.columns(*labels) | |
query = Session.query(User).from_statement(taf) | |
rows = query.all() | |
battle_cry = rows[0].battle_cry | |
tools.assert_true(battle_cry.isupper()) | |
class TestText(_TestQueries): | |
def test_model(self): | |
sp_call = text(LITERAL_SQL) | |
query = Session.query(User).from_statement(sp_call) | |
rows = query.all() | |
battle_cry = rows[0].battle_cry | |
tools.assert_true(battle_cry.islower()) | |
def test_renamed_model(self): | |
sp_call = text(ALIASED_LITERAL_SQL) | |
query = Session.query(User).from_statement(sp_call) | |
rows = query.all() | |
battle_cry = rows[0].battle_cry | |
tools.assert_true(battle_cry.islower()) | |
def test_renamed_columns(self): | |
sp_call = text(ALIASED_LITERAL_SQL) | |
query = Session.query(User.id, User.name, User.department, User.battle_cry).from_statement(sp_call) | |
rows = query.all() | |
battle_cry = rows[0].battle_cry | |
tools.assert_true(battle_cry.islower()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment