0
0
Fork 0
mirror of https://github.com/alerta/alerta.git synced 2025-04-14 01:29:13 +00:00

Fix linting errors ()

* Use flake8 instead of pylint and black

* Fix type errors
This commit is contained in:
Nick Satterly 2021-11-21 17:26:37 +01:00 committed by GitHub
parent 4bbfc923a1
commit 0cb430d2ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 51 additions and 59 deletions

View file

@ -27,7 +27,7 @@ repos:
- id: requirements-txt-fixer
- id: trailing-whitespace
- repo: https://gitlab.com/pycqa/flake8
rev: 3.7.7
rev: 3.9.2
hooks:
- id: flake8
- repo: https://github.com/asottile/pyupgrade

View file

@ -50,8 +50,8 @@ Alerta can be run in "development mode" from the command-line:
$ alertad run
```
Test that Alerta API is working by `curl`-ing the API endpoint:
Test that Alerta API is working by `curl`-ing the API endpoint:
```bash
$ curl http://localhost:8080/
```
```

View file

@ -3,9 +3,8 @@
VENV=venv
PYTHON=$(VENV)/bin/python
PIP=$(VENV)/bin/pip --disable-pip-version-check
PYLINT=$(VENV)/bin/pylint
FLAKE8=$(VENV)/bin/flake8
MYPY=$(VENV)/bin/mypy
BLACK=$(VENV)/bin/black
TOX=$(VENV)/bin/tox
PYTEST=$(VENV)/bin/pytest
DOCKER_COMPOSE=docker-compose
@ -32,15 +31,12 @@ all: help
$(PIP):
python3 -m venv $(VENV)
$(PYLINT): $(PIP)
$(PIP) install pylint==2.11.1
$(FLAKE8): $(PIP)
$(PIP) install flake8==4.0.1
$(MYPY): $(PIP)
$(PIP) install mypy==0.812
$(BLACK): $(PIP)
$(PIP) install black==21.11b1
$(TOX): $(PIP)
$(PIP) install tox
@ -60,22 +56,17 @@ ifdef TOXENV
toxparams?=-e $(TOXENV)
endif
## install - Install dependencies.
## install - Install dependencies.
install: $(PIP)
$(PIP) install -r requirements.txt
## format - Code formatter.
format: $(BLACK)
$(BLACK) -l120 -S -v $(PROJECT)
## hooks - Run pre-commit hooks.
hooks: $(PRE_COMMIT)
$(PRE_COMMIT) run --all-files --show-diff-on-failure
## lint - Lint and type checking.
lint: $(PYLINT) $(BLACK) $(MYPY)
$(PYLINT) --rcfile pylintrc $(PROJECT)
$(BLACK) -l120 -S --check -v $(PROJECT) || true
lint: $(FLAKE8) $(MYPY)
$(FLAKE8) $(PROJECT)/
$(MYPY) $(PROJECT)/
## test - Run all tests.
@ -91,7 +82,7 @@ test.integration: $(PYTEST)
$(DOCKER_COMPOSE) -f docker-compose.ci.yml up -d
$(PYTEST) tests/integration $(toxparams)
## test.forwarder - Run forwarder tests.
## test.forwarder - Run forwarder tests.
test.forwarder:
$(DOCKER_COMPOSE) -f tests/integration/fixtures/docker-compose.yml pull
$(DOCKER_COMPOSE) -f tests/integration/fixtures/docker-compose.yml up

View file

@ -60,7 +60,7 @@ def signup():
token = create_token(user_id=user.id, name=user.name, login=user.login, provider='basic',
customers=customers, scopes=scopes, roles=user.roles, groups=groups,
email=user.email, email_verified=user.email_verified)
return jsonify(token=token.tokenize)
return jsonify(token=token.tokenize())
def login():
@ -101,7 +101,7 @@ def login():
token = create_token(user_id=user.id, name=user.name, login=user.login, provider='basic',
customers=customers, scopes=scopes, roles=user.roles, groups=groups,
email=user.email, email_verified=user.email_verified)
return jsonify(token=token.tokenize)
return jsonify(token=token.tokenize())
@auth.route('/auth/confirm/<hash>', methods=['OPTIONS', 'POST'])

View file

@ -168,4 +168,4 @@ def login():
token = create_token(user_id=user.id, name=user.name, login=user.email, provider='ldap',
customers=customers, scopes=scopes, roles=user.roles, groups=groups,
email=user.email, email_verified=user.email_verified)
return jsonify(token=token.tokenize)
return jsonify(token=token.tokenize())

View file

@ -80,4 +80,4 @@ def github():
token = create_token(user_id=subject, name=name, login=login, provider='github',
customers=customers, scopes=scopes, roles=user.roles, orgs=organizations,
email=email, email_verified=email_verified, picture=picture)
return jsonify(token=token.tokenize)
return jsonify(token=token.tokenize())

View file

@ -164,4 +164,4 @@ def openid():
token = create_token(user_id=subject, name=name, login=login, provider=current_app.config['AUTH_PROVIDER'],
customers=customers, scopes=scopes, **custom_claims,
email=email, email_verified=email_verified, picture=picture)
return jsonify(token=token.tokenize)
return jsonify(token=token.tokenize())

View file

@ -114,7 +114,7 @@ def saml_response_from_idp():
customers=customers, scopes=scopes, roles=user.roles, groups=groups,
email=user.email, email_verified=user.email_verified)
message = {'status': 'ok', 'token': token.tokenize}
message = {'status': 'ok', 'token': token.tokenize()}
return render_template('auth/saml2.html', message=message, origin=origin), 200

View file

@ -2,6 +2,7 @@ import json
import re
from collections import namedtuple
from datetime import datetime
from typing import Any, Dict, List
import pytz
from pyparsing import ParseException
@ -213,8 +214,8 @@ class Blackouts(QueryBuilder):
@staticmethod
def from_params(params: ImmutableMultiDict, customers=None, query_time=None):
query = dict()
params = MultiDict(params)
query = dict() # type: Dict[str, Any]
params = MultiDict(params) # type: ignore
# customer
if customers:
@ -223,7 +224,7 @@ class Blackouts(QueryBuilder):
customer_query = None # type: ignore
# status
status = params.poplist('status')
status = params.poplist('status') # type: List[str]
if status:
query['$or'] = list()
if BlackoutStatus.Active in status:
@ -265,8 +266,8 @@ class Heartbeats(QueryBuilder):
@staticmethod
def from_params(params: ImmutableMultiDict, customers=None, query_time=None):
query = dict()
params = MultiDict(params)
query = dict() # type: Dict[str, Any]
params = MultiDict(params) # type: ignore
# customer
if customers:
@ -308,8 +309,8 @@ class ApiKeys(QueryBuilder):
@staticmethod
def from_params(params: MultiDict, customers=None, query_time=None):
query = dict()
params = MultiDict(params)
query = dict() # type: Dict[str, Any]
params = MultiDict(params) # type: ignore
# customer
if customers:
@ -359,8 +360,8 @@ class Users(QueryBuilder):
@staticmethod
def from_params(params: MultiDict, customers=None, query_time=None):
query = dict()
params = MultiDict(params)
query = dict() # type: Dict[str, Any]
params = MultiDict(params) # type: ignore
# filter, sort-by, group-by
query = QueryBuilder.filter_query(params, Users.VALID_PARAMS, query)
@ -382,8 +383,8 @@ class Groups(QueryBuilder):
@staticmethod
def from_params(params: MultiDict, customers=None, query_time=None):
query = dict()
params = MultiDict(params)
query = dict() # type: Dict[str, Any]
params = MultiDict(params) # type: ignore
# filter, sort-by, group-by
query = QueryBuilder.filter_query(params, Groups.VALID_PARAMS, query)
@ -405,8 +406,8 @@ class Permissions(QueryBuilder):
@staticmethod
def from_params(params: MultiDict, customers=None, query_time=None):
query = dict()
params = MultiDict(params)
query = dict() # type: Dict[str, Any]
params = MultiDict(params) # type: ignore
# filter, sort-by, group-by
query = QueryBuilder.filter_query(params, Permissions.VALID_PARAMS, query)
@ -427,8 +428,8 @@ class Customers(QueryBuilder):
@staticmethod
def from_params(params: MultiDict, customers=None, query_time=None):
query = dict()
params = MultiDict(params)
query = dict() # type: Dict[str, Any]
params = MultiDict(params) # type: ignore
# filter, sort-by, group-by
query = QueryBuilder.filter_query(params, Customers.VALID_PARAMS, query)

View file

@ -359,7 +359,7 @@ class Users(QueryBuilder):
def from_params(params: MultiDict, customers=None, query_time=None):
query = ['1=1']
qvars = dict()
qvars = dict() # type: Dict[str, Any]
params = MultiDict(params)
# filter, sort-by
@ -383,7 +383,7 @@ class Groups(QueryBuilder):
def from_params(params: MultiDict, customers=None, query_time=None):
query = ['1=1']
qvars = dict()
qvars = dict() # type: Dict[str, Any]
params = MultiDict(params)
# filter, sort-by
@ -407,7 +407,7 @@ class Permissions(QueryBuilder):
def from_params(params: MultiDict, customers=None, query_time=None):
query = ['1=1']
qvars = dict()
qvars = dict() # type: Dict[str, Any]
params = MultiDict(params)
# filter, sort-by
@ -430,7 +430,7 @@ class Customers(QueryBuilder):
def from_params(params: MultiDict, customers=None, query_time=None):
query = ['1=1']
qvars = dict()
qvars = dict() # type: Dict[str, Any]
params = MultiDict(params)
# filter, sort-by

View file

@ -75,7 +75,7 @@ class Group:
self.id = kwargs.get('id') or str(uuid4())
self.name = name
self.text = text or ''
self.count = kwargs.get('count')
self.count = kwargs.get('count') # type: ignore
@classmethod
def parse(cls, json: JSON) -> 'Group':

View file

@ -20,7 +20,7 @@ class ApiKeyStatus(str, Enum):
class ApiKey:
def __init__(self, user: str, scopes: List[Scope], text: str = '', expire_time: datetime = None, customer: str = None, **kwargs) -> None:
def __init__(self, user: str, scopes: List[str], text: str = '', expire_time: datetime = None, customer: str = None, **kwargs) -> None:
self.id = kwargs.get('id') or str(uuid4())
self.key = kwargs.get('key', None) or key_helper.generate()
@ -28,7 +28,7 @@ class ApiKey:
self.scopes = scopes or key_helper.user_default_scopes
self.text = text
self.expire_time = expire_time or datetime.utcnow() + timedelta(days=key_helper.api_key_expire_days)
self.count = kwargs.get('count', 0)
self.count = kwargs.get('count', 0) # type: ignore
self.last_used_time = kwargs.get('last_used_time', None)
self.customer = customer

View file

@ -46,7 +46,7 @@ class Jwt:
token,
key=key or current_app.config['SECRET_KEY'],
options={'verify_signature': verify},
algorithms=algorithm,
algorithms=[algorithm],
audience=current_app.config['OAUTH2_CLIENT_ID'] or current_app.config['SAML2_ENTITY_ID'] or absolute_url()
)
except (DecodeError, ExpiredSignatureError, InvalidAudienceError):
@ -114,7 +114,6 @@ class Jwt:
data['oid'] = self.oid
return data
@property
def tokenize(self, algorithm: str = 'HS256') -> str:
return jwt.encode(self.serialize, key=current_app.config['SECRET_KEY'], algorithm=algorithm)

View file

@ -19,7 +19,7 @@ class TimeoutPolicy(PluginBase):
return alert
def post_receive(self, alert: 'Alert', **kwargs) -> Optional['Alert']:
return
return None
def status_change(self, alert: 'Alert', status: str, text: str, **kwargs) -> Any:
return

View file

@ -69,8 +69,9 @@ USER_DEFAULT_SCOPES = ['read', 'write'] # Note: 'write' scope implicitly includ
DEFAULT_GUEST_ROLE = 'guest'
GUEST_ROLES = [DEFAULT_GUEST_ROLE]
GUEST_DEFAULT_SCOPES = ['read:alerts']
CUSTOM_SCOPES = []
DELETE_SCOPES = [] # Set to "delete:alerts" to prevent users with "write:alerts" scope being able to delete alerts
CUSTOM_SCOPES = [] # type: List[str]
# Set DELETE_SCOPES to "delete:alerts" to prevent users with "write:alerts" scope being able to delete alerts
DELETE_SCOPES = [] # type: List[str]
CUSTOMER_VIEWS = False
ALLOW_READONLY = False
@ -112,7 +113,7 @@ LDAP_CACERT = '' # Path to CA certificate to verify LDAPS connection against
LDAP_ALLOW_SELF_SIGNED_CERT = False
LDAP_DOMAINS = {
# 'planetexpress.com': 'cn=%s,ou=people,dc=planetexpress,dc=com'
}
} # type: Dict[str, Any]
LDAP_BIND_USERNAME = '' # required if using LDAP_SEARCH_QUERY eg. uid=admin,ou=users,dc=domain,dc=com
LDAP_BIND_PASSWORD = '' # required if using LDAP_BIND_USERNAME
LDAP_USER_BASEDN = '' # BASEDN for user search (default: LDAP_BASEDN)

View file

@ -11,7 +11,7 @@ from alerta.models.alert import Alert
from alerta.models.enums import Scope
def assign_customer(wanted: str = None, permission: Scope = Scope.admin_alerts) -> Optional[str]:
def assign_customer(wanted: str = None, permission: str = Scope.admin_alerts) -> Optional[str]:
customers = g.get('customers', [])
if wanted:
if Scope.admin in g.scopes or permission in g.scopes:

View file

@ -19,7 +19,7 @@ class ApiKeyHelper:
def init_app(self, app: Flask) -> None:
self.secret_key = app.config['SECRET_KEY']
self.admin_users = app.config['ADMIN_USERS']
self.user_default_scopes = [Scope(s) for s in app.config['USER_DEFAULT_SCOPES']]
self.user_default_scopes = [Scope(s) for s in app.config['USER_DEFAULT_SCOPES']] # type: List[str]
self.api_key_expire_days = app.config['API_KEY_EXPIRE_DAYS']
def generate(self) -> str:
@ -27,13 +27,13 @@ class ApiKeyHelper:
digest = hmac.new(self.secret_key.encode('utf-8'), msg=random, digestmod=hashlib.sha256).digest()
return base64.urlsafe_b64encode(digest).decode('utf-8')[:40]
def scopes_to_type(self, scopes: List[Scope]) -> str:
def scopes_to_type(self, scopes: List[str]) -> str:
for scope in scopes:
if scope.startswith(Scope.write) or scope.startswith(Scope.admin):
return 'read-write'
return 'read-only'
def type_to_scopes(self, user: str, key_type: str) -> List[Scope]:
def type_to_scopes(self, user: str, key_type: str) -> List[str]:
if user in self.admin_users:
return ADMIN_SCOPES
if key_type == 'read-write':

View file

@ -46,7 +46,7 @@ def receive():
alert.customer = assign_customer(wanted=alert.customer)
def audit_trail_alert(event: str):
write_audit_trail.send(current_app._get_current_object(), event=event, message=alert.text, user=g.login,
write_audit_trail.send(current_app._get_current_object(), event=event, message=alert.text, user=g.login, # type: ignore
customers=g.customers, scopes=g.scopes, resource_id=alert.id, type='alert', request=request)
try:

View file

@ -47,7 +47,7 @@ def custom(webhook, path):
alert.customer = assign_customer(wanted=alert.customer)
def audit_trail_alert(event: str):
write_audit_trail.send(current_app._get_current_object(), event=event, message=alert.text, user=g.login,
write_audit_trail.send(current_app._get_current_object(), event=event, message=alert.text, user=g.login, # type: ignore
customers=g.customers, scopes=g.scopes, resource_id=alert.id, type='alert',
request=request)