Source code for velruse.providers.live

"""Live Authentication Views"""
import datetime

from pyramid.httpexceptions import HTTPFound
from pyramid.security import NO_PERMISSION_REQUIRED

import requests

from ..api import (
    AuthenticationComplete,
    AuthenticationDenied,
    register_provider,
)
from ..exceptions import ThirdPartyFailure
from ..settings import ProviderSettings
from ..utils import flat_url


[docs]class LiveAuthenticationComplete(AuthenticationComplete): """Live Connect auth complete"""
[docs]def includeme(config): config.add_directive('add_live_login', add_live_login) config.add_directive('add_live_login_from_settings', add_live_login_from_settings)
[docs]def add_live_login_from_settings(config, prefix='velruse.live.'): settings = config.registry.settings p = ProviderSettings(settings, prefix) p.update('consumer_key', required=True) p.update('consumer_secret', required=True) p.update('scope') p.update('login_path') p.update('callback_path') config.add_live_login(**p.kwargs)
[docs]def add_live_login(config, consumer_key, consumer_secret, scope=None, login_path='/login/live', callback_path='/login/live/callback', name='live'): """ Add a Live login provider to the application. """ provider = LiveProvider(name, consumer_key, consumer_secret, scope) config.add_route(provider.login_route, login_path) config.add_view(provider, attr='login', route_name=provider.login_route, permission=NO_PERMISSION_REQUIRED) config.add_route(provider.callback_route, callback_path, use_global_views=True, factory=provider.callback) register_provider(config, name, provider)
class LiveProvider(object): def __init__(self, name, consumer_key, consumer_secret, scope): self.name = name self.type = 'live' self.consumer_key = consumer_key self.consumer_secret = consumer_secret self.scope = scope self.login_route = 'velruse.%s-login' % name self.callback_route = 'velruse.%s-callback' % name def login(self, request): """Initiate a Live login""" scope = request.POST.get('scope', self.scope or 'wl.basic wl.emails wl.signin') fb_url = flat_url('https://oauth.live.com/authorize', scope=scope, client_id=self.consumer_key, redirect_uri=request.route_url(self.callback_route), response_type="code") return HTTPFound(location=fb_url) def callback(self, request): """Process the Live redirect""" if 'error' in request.GET: raise ThirdPartyFailure(request.GET.get('error_description', 'No reason provided.')) code = request.GET.get('code') if not code: reason = request.GET.get('error_reason', 'No reason provided.') return AuthenticationDenied(reason, provider_name=self.name, provider_type=self.type) # Now retrieve the access token with the code access_url = flat_url( 'https://oauth.live.com/token', client_id=self.consumer_key, client_secret=self.consumer_secret, redirect_uri=request.route_url(self.callback_route), grant_type="authorization_code", code=code) r = requests.get(access_url) if r.status_code != 200: raise ThirdPartyFailure("Status %s: %s" % ( r.status_code, r.content)) data = r.json() access_token = data['access_token'] # Retrieve profile data graph_url = flat_url('https://apis.live.net/v5.0/me', access_token=access_token) r = requests.get(graph_url) if r.status_code != 200: raise ThirdPartyFailure("Status %s: %s" % ( r.status_code, r.content)) live_profile = r.json() profile = extract_live_data(live_profile) cred = {'oauthAccessToken': access_token} if 'refresh_token' in data: cred['oauthRefreshToken'] = data['refresh_token'] return LiveAuthenticationComplete(profile=profile, credentials=cred, provider_name=self.name, provider_type=self.type) def extract_live_data(data): """Extract and normalize Windows Live Connect data""" emails = data.get('emails', {}) profile = { 'accounts': [{'domain': 'live.com', 'userid': data['id']}], 'gender': data.get('gender'), 'verifiedEmail': emails.get('preferred'), 'updated': data.get('updated_time'), 'name': { 'formatted': data.get('name'), 'familyName': data.get('last_name'), 'givenName': data.get('first_name'), }, 'displayName': data.get('name'), 'emails': [], 'urls': [], } if emails.get('personal'): profile['emails'].append( {'type': 'personal', 'value': emails['personal']}) if emails.get('business'): profile['emails'].append( {'type': 'business', 'value': emails['business']}) if emails.get('preferred'): profile['emails'].append( {'type': 'preferred', 'value': emails['preferred'], 'primary': True}) if emails.get('account'): profile['emails'].append( {'type': 'account', 'value': emails['account']}) if 'link' in data: profile['urls'].append( {'type': 'profile', 'value': data['link']}) if 'birth_day' in data: try: profile['birthday'] = datetime.date( int(data['birth_year']), int(data['birth_month']), int(data['birth_day']), ).strftime('%Y-%m-%d') except ValueError: pass return profile