Created
June 2, 2009 23:27
-
-
Save tony-landis/122674 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from pylons import config, request, response, session, tmpl_context as c | |
from pylons.controllers.util import abort, redirect_to, url_for | |
from pylons_openid.lib.base import BaseController, render | |
log = logging.getLogger(__name__) | |
import urllib2 | |
import md5 | |
import simplejson as json_ | |
from pylons_openid.model import User, UserOpenId, meta | |
from sqlalchemy import func, or_ | |
sa = meta.Session | |
class AuthController(BaseController): | |
def on_login(self): | |
"""Called on successful login""" | |
def on_logout(self, username): | |
"""Called on logout""" | |
def get_loginform(self, username, msg="Enter login information", from_page="/"): | |
c.username = username | |
c.alert = msg | |
c.from_page=from_page | |
return render("/login.html") | |
def login(self): | |
' normal account login ' | |
username,password,from_page = [(request.params.get(key, None)) for key in["username","password","from_page"]] | |
if not username or not password: | |
return self.get_loginform("") | |
query = sa.query(User).filter(User.username==username).\ | |
filter(User.password==md5.new(password).hexdigest()) | |
if not query.count(): | |
return self.get_loginform(username, "Invalid Credentials", from_page) | |
# set login date/ip | |
user = query.one() | |
user.ip = request.environ.get("X_FORWARDED_FOR", request.environ["REMOTE_ADDR"]) | |
user.dateLogin = func.now() | |
user.sessionId = session.id | |
# update the session | |
session['user_id'] = int(user.id) | |
session['user_acl'] = str(user.acl) | |
session['user_login'] = user.username | |
session['user_name'] = user.name | |
session.save() | |
redirect_to('/root/index') | |
def logout(self): | |
username = session.get('user_id', None) | |
session['user_id'] = None | |
del session['user_id'] | |
session.save() | |
redirect_to("/auth/login") | |
def rpx_token_url(self, *args, **kargs): | |
'token' in request.params or redirect_to(url_for(controller="auth", action="login")) | |
token = request.params['token'] | |
# contact rpx for the details: | |
url = "https://rpxnow.com/api/v2/auth_info?token=%s&apiKey=%s" % (token, config.get('rpx_token')) | |
json = json_.loads(urllib2.urlopen(url).read()) | |
if(json['stat'] == "ok"): | |
json = json["profile"] | |
user = None | |
openid = None | |
# check if openid user already in the db | |
openids = sa.query(UserOpenId).\ | |
filter(or_(UserOpenId.verifiedEmail == json['verifiedEmail'], UserOpenId.preferredUsername == json['preferredUsername'])).\ | |
filter(UserOpenId.providerName == json['providerName']) | |
if(openids.count()): | |
openid = openids.one() | |
user = openid.user | |
else: | |
# no openid record exists, check if old user exists with verifiedEmail... | |
query = sa.query(User).filter(User.username == json['verifiedEmail']) | |
if(query.count() == 1): | |
# one exact match | |
user = query.one() | |
# create user? | |
if user == None: | |
password = md5.new(str(json)) | |
user = User(username=json['verifiedEmail'], password=password.hexdigest(), acl='Cusomer', name=json['displayName']) | |
sa.save(user) | |
sa.flush() | |
# create openid | |
if openid == None: | |
openid = UserOpenId( | |
verifiedEmail=json['verifiedEmail'], | |
displayName=json['displayName'], | |
preferredUsername=json['preferredUsername'], | |
providerName=json['providerName'], | |
identifier=json['identifier'], | |
email=json['email'], | |
user_id = user.id) | |
sa.save(openid) | |
sa.flush() | |
# set login date/ip | |
user.ip = request.environ.get("X_FORWARDED_FOR", request.environ["REMOTE_ADDR"]) | |
user.dateLogin = func.now() | |
user.sessionId = session.id | |
# update logged in status | |
session['user_id'] = int(user.id) | |
session['user_acl'] = user.acl | |
session['user_login'] = user.username | |
session['user_name'] = user.name | |
session.save() | |
redirect_to("/root/index") | |
else: | |
# something bad happened | |
redirect_to(url_for(controller='auth', action='login')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment