0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-03-13 04:53:16 +00:00

Prevent user with lesser role assigning role with more privileges ()

This commit is contained in:
Nick Satterly 2019-03-22 00:35:43 +01:00 committed by GitHub
parent b7f5abf9f3
commit 75c707716f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 14 deletions
alerta/views
tests

View file

@ -6,6 +6,7 @@ from alerta.auth.decorators import permission
from alerta.auth.utils import not_authorized
from alerta.exceptions import ApiError
from alerta.models.enums import Scope
from alerta.models.permission import Permission
from alerta.models.user import User
from alerta.utils.audit import admin_audit_trail, write_audit_trail
from alerta.utils.response import jsonp
@ -30,6 +31,12 @@ def create_user():
if User.find_by_email(email=user.email):
raise ApiError('username already exists', 409)
want_scopes = Permission.lookup(login=user.email, groups=user.roles)
for want_scope in want_scopes:
if not Permission.is_in_scope(want_scope, have_scopes=g.scopes):
raise ApiError("Requested scope '{}' not in existing scopes: {}".format(
want_scope, ','.join(g.scopes)), 403)
try:
user = user.create()
except Exception as e:
@ -130,6 +137,13 @@ def update_user(user_id):
if user_by_email and user_by_email.id != user.id:
raise ApiError('user with email already exists', 409)
if request.json.get('roles'):
want_scopes = Permission.lookup(login='', groups=request.json['roles'])
for want_scope in want_scopes:
if not Permission.is_in_scope(want_scope, have_scopes=g.scopes):
raise ApiError("Requested scope '{}' not in existing scopes: {}".format(
want_scope, ','.join(g.scopes)), 403)
admin_audit_trail.send(current_app._get_current_object(), event='user-updated', message='', user=g.user,
customers=g.customers, scopes=g.scopes, resource_id=user.id, type='user', request=request)

View file

@ -3,6 +3,8 @@ import json
import unittest
from alerta.app import create_app, db
from alerta.models.enums import Scope
from alerta.models.key import ApiKey
class UserTestCase(unittest.TestCase):
@ -11,11 +13,34 @@ class UserTestCase(unittest.TestCase):
test_config = {
'TESTING': True,
'AUTH_REQUIRED': False
'AUTH_REQUIRED': True,
'ADMIN_USERS': ['admin@alerta.io'],
'ALLOWED_EMAIL_DOMAINS': ['alerta.io', 'doe.com']
}
self.app = create_app(test_config)
self.client = self.app.test_client()
self.alert = {
'event': 'Foo',
'resource': 'Bar',
'environment': 'Production',
'service': ['Quux']
}
with self.app.test_request_context('/'):
self.app.preprocess_request()
self.api_key = ApiKey(
user='admin@alerta.io',
scopes=[Scope.admin, Scope.read, Scope.write],
text='demo-key'
)
self.api_key.create()
self.headers = {
'Authorization': 'Key %s' % self.api_key.key,
'Content-type': 'application/json'
}
def tearDown(self):
db.destroy()
@ -28,13 +53,10 @@ class UserTestCase(unittest.TestCase):
'roles': ['operator'],
'text': 'devops user'
}
headers = {
'Content-type': 'application/json'
}
# create user
response = self.client.post('/user', data=json.dumps(payload), headers=headers)
self.assertEqual(response.status_code, 201)
response = self.client.post('/user', data=json.dumps(payload), headers=self.headers)
self.assertEqual(response.status_code, 201, response.data)
data = json.loads(response.data.decode('utf-8'))
self.assertEqual(data['user']['name'], 'John Doe')
self.assertEqual(data['user']['email'], 'john@doe.com')
@ -48,12 +70,12 @@ class UserTestCase(unittest.TestCase):
}
# modify user (assign different role)
response = self.client.put('/user/' + user_id, data=json.dumps(payload), headers=headers)
response = self.client.put('/user/' + user_id, data=json.dumps(payload), headers=self.headers)
self.assertEqual(response.status_code, 200)
data = json.loads(response.data.decode('utf-8'))
# get user
response = self.client.get('/user/' + user_id, headers=headers)
response = self.client.get('/user/' + user_id, headers=self.headers)
self.assertEqual(response.status_code, 200)
data = json.loads(response.data.decode('utf-8'))
self.assertEqual(data['user']['name'], 'John Doe')
@ -66,12 +88,12 @@ class UserTestCase(unittest.TestCase):
}
# modify user (assign multiple roles)
response = self.client.put('/user/' + user_id, data=json.dumps(payload), headers=headers)
response = self.client.put('/user/' + user_id, data=json.dumps(payload), headers=self.headers)
self.assertEqual(response.status_code, 200)
data = json.loads(response.data.decode('utf-8'))
# get user
response = self.client.get('/user/' + user_id, headers=headers)
response = self.client.get('/user/' + user_id, headers=self.headers)
self.assertEqual(response.status_code, 200)
data = json.loads(response.data.decode('utf-8'))
self.assertEqual(data['user']['name'], 'John Doe')
@ -88,12 +110,9 @@ class UserTestCase(unittest.TestCase):
'roles': ['operator'],
'text': 'devops user'
}
headers = {
'Content-type': 'application/json'
}
# create user
response = self.client.post('/user', data=json.dumps(payload), headers=headers)
response = self.client.post('/user', data=json.dumps(payload), headers=self.headers)
self.assertEqual(response.status_code, 201)
data = json.loads(response.data.decode('utf-8'))
self.assertEqual(data['user']['name'], 'John Doe')