mirror of
https://github.com/alerta/alerta.git
synced 2025-01-24 09:19:40 +00:00
124 lines
4.7 KiB
Python
124 lines
4.7 KiB
Python
import saml2.client # pylint: disable=import-error
|
|
import saml2.config # pylint: disable=import-error
|
|
import saml2.entity # pylint: disable=import-error
|
|
import saml2.metadata # pylint: disable=import-error
|
|
import saml2.saml # pylint: disable=import-error
|
|
from flask import (Response, current_app, make_response, render_template,
|
|
request)
|
|
from flask_cors import cross_origin
|
|
|
|
from alerta.auth.utils import create_token, get_customers, not_authorized
|
|
from alerta.exceptions import ApiError
|
|
from alerta.models.permission import Permission
|
|
from alerta.models.user import User
|
|
from alerta.utils.audit import auth_audit_trail
|
|
from alerta.utils.collections import merge
|
|
from alerta.utils.response import absolute_url
|
|
|
|
from . import auth
|
|
|
|
|
|
def get_saml2_config():
|
|
config = saml2.config.Config()
|
|
|
|
saml_settings = {
|
|
'metadata': {
|
|
'remote': [{
|
|
'url': current_app.config['SAML2_METADATA_URL'],
|
|
}]
|
|
},
|
|
'entityid': absolute_url(),
|
|
'service': {
|
|
'sp': {
|
|
'endpoints': {
|
|
'assertion_consumer_service': [
|
|
(absolute_url('/auth/saml'), saml2.BINDING_HTTP_POST)
|
|
]
|
|
},
|
|
'allow_unsolicited': True,
|
|
'authn_requests_signed': False,
|
|
'want_assertions_signed': True,
|
|
'want_response_signed': False
|
|
}
|
|
}
|
|
}
|
|
if current_app.config['SAML2_ENTITY_ID']:
|
|
saml_settings['entityid'] = current_app.config['SAML2_ENTITY_ID']
|
|
|
|
if current_app.config['SAML2_CONFIG'].get('metadata'):
|
|
saml_settings['metadata'] = current_app.config['SAML2_CONFIG']['metadata']
|
|
|
|
merge(saml_settings, current_app.config['SAML2_CONFIG']) # allow settings override
|
|
|
|
config.load(saml_settings)
|
|
config.allow_unknown_attributes = True
|
|
return config
|
|
|
|
|
|
def saml_client():
|
|
return saml2.client.Saml2Client(
|
|
config=get_saml2_config()
|
|
)
|
|
|
|
|
|
@auth.route('/auth/saml', methods=['GET'])
|
|
def saml_redirect_to_idp():
|
|
(session_id, result) = saml_client().prepare_for_authenticate(relay_state=request.referrer)
|
|
return make_response('', 302, result['headers'])
|
|
|
|
|
|
@auth.route('/auth/saml', methods=['OPTIONS', 'POST'])
|
|
@cross_origin(supports_credentials=True)
|
|
def saml_response_from_idp():
|
|
origin = request.form['RelayState']
|
|
|
|
authn_response = saml_client().parse_authn_request_response(
|
|
xmlstr=request.form['SAMLResponse'],
|
|
binding=saml2.entity.BINDING_HTTP_POST
|
|
)
|
|
identity = authn_response.get_identity()
|
|
subject = authn_response.get_subject()
|
|
|
|
name = current_app.config['SAML2_USER_NAME_FORMAT'].format(**dict(map(lambda x: (x[0], x[1][0]), identity.items())))
|
|
login = subject.text
|
|
email = identity[current_app.config['SAML2_EMAIL_ATTRIBUTE']][0]
|
|
|
|
# Create user if not yet there
|
|
user = User.find_by_username(username=email)
|
|
if not user:
|
|
user = User(name=name, login=login, password='', email=email,
|
|
roles=current_app.config['USER_ROLES'], text='SAML2 user', email_verified=True)
|
|
try:
|
|
user = user.create()
|
|
except Exception as e:
|
|
ApiError(str(e), 500)
|
|
|
|
if user.status != 'active':
|
|
raise ApiError(f'User {email} is not active', 403)
|
|
|
|
groups = identity.get('groups', [])
|
|
if not_authorized('ALLOWED_SAML2_GROUPS', groups) or not_authorized('ALLOWED_EMAIL_DOMAINS', groups=[user.domain]):
|
|
message = {'status': 'error', 'message': f'User {email} is not authorized'}
|
|
return render_template('auth/saml2.html', message=message, origin=origin), 403
|
|
|
|
user.update_last_login()
|
|
|
|
scopes = Permission.lookup(login=user.email, roles=user.roles + groups)
|
|
customers = get_customers(login=user.email, groups=groups + ([user.domain] if user.domain else []))
|
|
|
|
auth_audit_trail.send(current_app._get_current_object(), event='saml2-login', message='user login via SAML2',
|
|
user=user.email, customers=customers, scopes=scopes, roles=user.roles, groups=groups,
|
|
resource_id=user.id, type='user', request=request)
|
|
|
|
token = create_token(user_id=user.id, name=user.name, login=user.email, provider='saml2',
|
|
customers=customers, scopes=scopes, roles=user.roles, groups=groups,
|
|
email=user.email, email_verified=user.email_verified)
|
|
|
|
message = {'status': 'ok', 'token': token.tokenize()}
|
|
return render_template('auth/saml2.html', message=message, origin=origin), 200
|
|
|
|
|
|
@auth.route('/auth/saml/metadata.xml', methods=['GET'])
|
|
def saml_metadata():
|
|
descriptor = saml2.metadata.entity_descriptor(get_saml2_config())
|
|
return Response(str(descriptor), content_type='text/xml; charset=utf-8')
|