mirror of
https://github.com/alerta/alerta.git
synced 2025-01-25 01:39:40 +00:00
206 lines
8.9 KiB
Python
206 lines
8.9 KiB
Python
import json
|
|
import unittest
|
|
|
|
from alerta.app import create_app, db, key_helper
|
|
from alerta.models.enums import Scope
|
|
from alerta.models.key import ApiKey
|
|
from alerta.models.permission import Permission
|
|
|
|
|
|
class ScopesTestCase(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
|
|
test_config = {
|
|
'TESTING': True,
|
|
'AUTH_REQUIRED': True,
|
|
'ADMIN_USERS': ['admin@alerta.io', 'sys@alerta.io'],
|
|
'DEFAULT_ADMIN_ROLE': 'ops',
|
|
'ADMIN_ROLES': ['ops', 'devops'],
|
|
'DEFAULT_USER_ROLE': 'dev',
|
|
'USER_ROLES': ['dev'],
|
|
'CUSTOM_SCOPES': ['admin:foo', 'write:foo.bar', 'read:foo.baz', 'delete:foo.quux']
|
|
}
|
|
self.app = create_app(test_config, environment='development')
|
|
self.client = self.app.test_client()
|
|
|
|
def make_key(user, scopes=None, type=None, text=''):
|
|
api_key = ApiKey(
|
|
user=user,
|
|
scopes=scopes,
|
|
type=type,
|
|
text=text
|
|
)
|
|
return api_key.create().key
|
|
|
|
with self.app.test_request_context('/'):
|
|
self.app.preprocess_request()
|
|
self.api_keys_scopes = dict()
|
|
self.api_keys_scopes['read-only'] = make_key('user@alerta.io', scopes=['read'], type=None, text='read-only')
|
|
self.api_keys_scopes['read-write'] = make_key('user@alerta.io',
|
|
scopes=['read', 'write'], type=None, text='read-write')
|
|
self.api_keys_scopes['admin'] = make_key(
|
|
'admin@alerta.io', scopes=['read', 'write', 'admin'], type=None, text='admin')
|
|
|
|
# self.api_keys_types = dict()
|
|
# self.api_keys_types['read-only'] = make_key('user@alerta.io', scopes=None, type='read-only', text='read-only')
|
|
# self.api_keys_types['read-write'] = make_key('user@alerta.io', scopes=None, type='read-write', text='read-write')
|
|
# self.api_keys_types['admin'] = make_key('admin@alerta.io', scopes=None, type='read-write', text='admin')
|
|
|
|
def tearDown(self):
|
|
db.destroy()
|
|
|
|
def test_scopes(self):
|
|
|
|
response = self.client.get('/keys', headers={'Authorization': f"Key {self.api_keys_scopes['read-only']}"})
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data.decode('utf-8'))
|
|
for key in data['keys']:
|
|
self.assertEqual(self.api_keys_scopes[key['text']], key['key'])
|
|
if key['text'] == 'admin':
|
|
self.assertEqual('read-write', key['type'])
|
|
else:
|
|
self.assertEqual(key['text'], key['type'])
|
|
self.assertEqual(sorted(key_helper.type_to_scopes(key['user'], key['text'])), sorted(key['scopes']))
|
|
|
|
# def test_types(self):
|
|
# #FIXME
|
|
# response = self.client.get('/keys')
|
|
# self.assertEqual(response.status_code, 200)
|
|
# data = json.loads(response.data.decode('utf-8'))
|
|
# for key in data['keys']:
|
|
# self.assertEqual(self.api_keys_types[key['text']], key['key'])
|
|
# if key['text'] == 'admin':
|
|
# self.assertEqual('read-write', key['type'])
|
|
# else:
|
|
# self.assertEqual(key['text'], key['type'])
|
|
# self.assertEqual(sorted(key_helper.type_to_scopes(key['user'], key['text'])), sorted(key['scopes']))
|
|
|
|
def test_is_in_scope(self):
|
|
|
|
self.assertEqual(Permission.is_in_scope(Scope.read_customers, [Scope.read]), True)
|
|
self.assertEqual(Permission.is_in_scope(Scope.read_customers, [Scope.write]), True)
|
|
self.assertEqual(Permission.is_in_scope(Scope.read_customers, [Scope.admin]), True)
|
|
|
|
self.assertEqual(Permission.is_in_scope(Scope.read_heartbeats, [Scope.read_alerts]), False)
|
|
self.assertEqual(Permission.is_in_scope(Scope.read_heartbeats, [Scope.write_alerts]), False)
|
|
self.assertEqual(Permission.is_in_scope(Scope.read_heartbeats, [Scope.admin_alerts]), False)
|
|
|
|
self.assertEqual(Permission.is_in_scope(Scope.write_blackouts, [Scope.read]), False)
|
|
self.assertEqual(Permission.is_in_scope(Scope.write_blackouts, [Scope.read_blackouts, Scope.read]), False)
|
|
self.assertEqual(
|
|
Permission.is_in_scope(Scope.write_blackouts, [Scope.read_blackouts, Scope.write_blackouts]), True)
|
|
self.assertEqual(Permission.is_in_scope(Scope.write_blackouts, [Scope.write_blackouts]), True)
|
|
self.assertEqual(Permission.is_in_scope(Scope.write_blackouts, [Scope.read_blackouts, Scope.write]), True)
|
|
self.assertEqual(Permission.is_in_scope(Scope.write_blackouts, [Scope.read_blackouts, Scope.admin]), True)
|
|
self.assertEqual(Permission.is_in_scope(Scope.write_blackouts, [Scope.read, Scope.write_keys]), False)
|
|
self.assertEqual(Permission.is_in_scope(Scope.write_blackouts, [Scope.read, Scope.admin_keys]), False)
|
|
|
|
self.assertEqual(Permission.is_in_scope(Scope.admin, [Scope.write]), False)
|
|
self.assertEqual(Permission.is_in_scope(Scope.admin, [Scope.read, Scope.write, Scope.admin]), True)
|
|
self.assertEqual(Permission.is_in_scope(Scope.read_heartbeats, [Scope.write]), True)
|
|
|
|
def test_edit_perms(self):
|
|
|
|
headers = {
|
|
'Authorization': f"Key {self.api_keys_scopes['admin']}",
|
|
'Content-type': 'application/json'
|
|
}
|
|
|
|
# add permsission
|
|
payload = {
|
|
'scopes': [Scope.read],
|
|
'match': 'read-only'
|
|
}
|
|
response = self.client.post('/perm', data=json.dumps(payload),
|
|
content_type='application/json', headers=headers)
|
|
self.assertEqual(response.status_code, 201)
|
|
data = json.loads(response.data.decode('utf-8'))
|
|
|
|
perm_id = data['id']
|
|
|
|
# change scopes
|
|
update = {
|
|
'scopes': [Scope.write, Scope.read]
|
|
}
|
|
response = self.client.put('/perm/' + perm_id, data=json.dumps(update), headers=headers)
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data.decode('utf-8'))
|
|
self.assertEqual(data['status'], 'ok')
|
|
|
|
# try invalid scopes
|
|
update = {
|
|
'scopes': ['foo:bar']
|
|
}
|
|
response = self.client.put('/perm/' + perm_id, data=json.dumps(update), headers=headers)
|
|
self.assertEqual(response.status_code, 400)
|
|
data = json.loads(response.data.decode('utf-8'))
|
|
self.assertEqual(data['message'], "'foo:bar' is not a valid Scope")
|
|
|
|
# change perm
|
|
update = {
|
|
'match': 'read-write'
|
|
}
|
|
response = self.client.put('/perm/' + perm_id, data=json.dumps(update), headers=headers)
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data.decode('utf-8'))
|
|
self.assertEqual(data['status'], 'ok')
|
|
|
|
# check updates worked
|
|
response = self.client.get('/perm/' + perm_id, headers=headers)
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data.decode('utf-8'))
|
|
self.assertEqual(data['permission']['scopes'], [Scope.write, Scope.read])
|
|
self.assertEqual(data['permission']['match'], 'read-write')
|
|
|
|
def test_custom_scopes(self):
|
|
|
|
headers = {
|
|
'Authorization': f"Key {self.api_keys_scopes['admin']}",
|
|
'Content-type': 'application/json'
|
|
}
|
|
|
|
# list scopes
|
|
response = self.client.get('/scopes', content_type='application/json', headers=headers)
|
|
self.assertEqual(response.status_code, 200)
|
|
data = json.loads(response.data.decode('utf-8'))
|
|
|
|
self.assertIn('admin:foo', data['scopes'])
|
|
self.assertIn('write:foo.bar', data['scopes'])
|
|
self.assertIn('read:foo.baz', data['scopes'])
|
|
self.assertIn('delete:foo.quux', data['scopes'])
|
|
|
|
scope = Scope.from_str(action='write', resource='resource', type='type')
|
|
self.assertEqual(scope.action, 'write')
|
|
self.assertEqual(scope.resource, 'resource')
|
|
self.assertEqual(scope.type, 'type')
|
|
|
|
def test_system_roles(self):
|
|
|
|
login = 'admin@alerta.io'
|
|
roles = ['no-ops', 'team']
|
|
|
|
with self.app.test_request_context():
|
|
scopes = Permission.lookup(login, roles)
|
|
self.assertEqual(scopes, [Scope.admin, Scope.read, Scope.write])
|
|
|
|
login = 'dev_who_wants_to_be_admin@alerta.io'
|
|
roles = ['web', 'ops']
|
|
|
|
with self.app.test_request_context():
|
|
scopes = Permission.lookup(login, roles)
|
|
self.assertEqual(scopes, [Scope.admin, Scope.read, Scope.write])
|
|
|
|
login = 'user_who_wants_default_user_scopes@alerta.io'
|
|
roles = ['dev', 'engineer']
|
|
|
|
with self.app.test_request_context():
|
|
scopes = Permission.lookup(login, roles)
|
|
self.assertEqual(scopes, [Scope.read, Scope.write])
|
|
|
|
login = 'guest_user@alerta.io'
|
|
roles = ['guest']
|
|
|
|
with self.app.test_request_context():
|
|
scopes = Permission.lookup(login, roles)
|
|
self.assertEqual(scopes, [Scope.read_alerts])
|