mirror of
https://projects.torsion.org/witten/borgmatic.git
synced 2025-01-20 06:59:10 +00:00
201 lines
6.9 KiB
Python
201 lines
6.9 KiB
Python
import fnmatch
|
|
import os
|
|
|
|
import jsonschema
|
|
import ruamel.yaml
|
|
|
|
import borgmatic.config
|
|
from borgmatic.config import constants, environment, load, normalize, override
|
|
|
|
|
|
def schema_filename():
|
|
'''
|
|
Path to the installed YAML configuration schema file, used to validate and parse the
|
|
configuration.
|
|
|
|
Raise FileNotFoundError when the schema path does not exist.
|
|
'''
|
|
schema_path = os.path.join(os.path.dirname(borgmatic.config.__file__), 'schema.yaml')
|
|
|
|
with open(schema_path):
|
|
return schema_path
|
|
|
|
|
|
def format_json_error_path_element(path_element):
|
|
'''
|
|
Given a path element into a JSON data structure, format it for display as a string.
|
|
'''
|
|
if isinstance(path_element, int):
|
|
return str(f'[{path_element}]')
|
|
|
|
return str(f'.{path_element}')
|
|
|
|
|
|
def format_json_error(error):
|
|
'''
|
|
Given an instance of jsonschema.exceptions.ValidationError, format it for display as a string.
|
|
'''
|
|
if not error.path:
|
|
return f'At the top level: {error.message}'
|
|
|
|
formatted_path = ''.join(format_json_error_path_element(element) for element in error.path)
|
|
return f"At '{formatted_path.lstrip('.')}': {error.message}"
|
|
|
|
|
|
class Validation_error(ValueError):
|
|
'''
|
|
A collection of error messages generated when attempting to validate a particular
|
|
configuration file.
|
|
'''
|
|
|
|
def __init__(self, config_filename, errors):
|
|
'''
|
|
Given a configuration filename path and a sequence of string error messages, create a
|
|
Validation_error.
|
|
'''
|
|
self.config_filename = config_filename
|
|
self.errors = errors
|
|
|
|
def __str__(self):
|
|
'''
|
|
Render a validation error as a user-facing string.
|
|
'''
|
|
return (
|
|
f'An error occurred while parsing a configuration file at {self.config_filename}:\n'
|
|
+ '\n'.join(error for error in self.errors)
|
|
)
|
|
|
|
|
|
def apply_logical_validation(config_filename, parsed_configuration):
|
|
'''
|
|
Given a parsed and schematically valid configuration as a data structure of nested dicts (see
|
|
below), run through any additional logical validation checks. If there are any such validation
|
|
problems, raise a Validation_error.
|
|
'''
|
|
repositories = parsed_configuration.get('repositories')
|
|
check_repositories = parsed_configuration.get('check_repositories', [])
|
|
for repository in check_repositories:
|
|
if not any(
|
|
repositories_match(repository, config_repository) for config_repository in repositories
|
|
):
|
|
raise Validation_error(
|
|
config_filename,
|
|
(f'Unknown repository in "check_repositories": {repository}',),
|
|
)
|
|
|
|
|
|
def parse_configuration(config_filename, schema_filename, overrides=None, resolve_env=True):
|
|
'''
|
|
Given the path to a config filename in YAML format, the path to a schema filename in a YAML
|
|
rendition of JSON Schema format, a sequence of configuration file override strings in the form
|
|
of "option.suboption=value", return the parsed configuration as a data structure of nested dicts
|
|
and lists corresponding to the schema. Example return value:
|
|
|
|
{
|
|
'source_directories': ['/home', '/etc'],
|
|
'repository': 'hostname.borg',
|
|
'keep_daily': 7,
|
|
'checks': ['repository', 'archives'],
|
|
}
|
|
|
|
Also return a set of loaded configuration paths and a sequence of logging.LogRecord instances
|
|
containing any warnings about the configuration.
|
|
|
|
Raise FileNotFoundError if the file does not exist, PermissionError if the user does not
|
|
have permissions to read the file, or Validation_error if the config does not match the schema.
|
|
'''
|
|
config_paths = set()
|
|
|
|
try:
|
|
config = load.load_configuration(config_filename, config_paths)
|
|
schema = load.load_configuration(schema_filename)
|
|
except (ruamel.yaml.error.YAMLError, RecursionError) as error:
|
|
raise Validation_error(config_filename, (str(error),))
|
|
|
|
override.apply_overrides(config, schema, overrides)
|
|
constants.apply_constants(config, config.get('constants') if config else {})
|
|
|
|
if resolve_env:
|
|
environment.resolve_env_variables(config)
|
|
|
|
logs = normalize.normalize(config_filename, config)
|
|
|
|
try:
|
|
validator = jsonschema.Draft7Validator(schema)
|
|
except AttributeError: # pragma: no cover
|
|
validator = jsonschema.Draft4Validator(schema)
|
|
validation_errors = tuple(validator.iter_errors(config))
|
|
|
|
if validation_errors:
|
|
raise Validation_error(
|
|
config_filename, tuple(format_json_error(error) for error in validation_errors)
|
|
)
|
|
|
|
apply_logical_validation(config_filename, config)
|
|
|
|
return config, config_paths, logs
|
|
|
|
|
|
def normalize_repository_path(repository):
|
|
'''
|
|
Given a repository path, return the absolute path of it (for local repositories).
|
|
'''
|
|
# A colon in the repository could mean that it's either a file:// URL or a remote repository.
|
|
# If it's a remote repository, we don't want to normalize it. If it's a file:// URL, we do.
|
|
if ':' not in repository:
|
|
return os.path.abspath(repository)
|
|
elif repository.startswith('file://'):
|
|
return os.path.abspath(repository.partition('file://')[-1])
|
|
else:
|
|
return repository
|
|
|
|
|
|
def glob_match(first, second):
|
|
'''
|
|
Given two strings, return whether the first matches the second. Globs are
|
|
supported.
|
|
'''
|
|
if first is None or second is None:
|
|
return False
|
|
|
|
return fnmatch.fnmatch(first, second) or fnmatch.fnmatch(second, first)
|
|
|
|
|
|
def repositories_match(first, second):
|
|
'''
|
|
Given two repository dicts with keys "path" (relative and/or absolute),
|
|
and "label", two repository paths as strings, or a mix of the two formats,
|
|
return whether they match. Globs are supported.
|
|
'''
|
|
if isinstance(first, str):
|
|
first = {'path': first, 'label': first}
|
|
if isinstance(second, str):
|
|
second = {'path': second, 'label': second}
|
|
|
|
return glob_match(first.get('label'), second.get('label')) or glob_match(
|
|
normalize_repository_path(first.get('path')), normalize_repository_path(second.get('path'))
|
|
)
|
|
|
|
|
|
def guard_configuration_contains_repository(repository, configurations):
|
|
'''
|
|
Given a repository path and a dict mapping from config filename to corresponding parsed config
|
|
dict, ensure that the repository is declared at least once in all of the configurations. If no
|
|
repository is given, skip this check.
|
|
|
|
Raise ValueError if the repository is not found in any configurations.
|
|
'''
|
|
if not repository:
|
|
return
|
|
|
|
count = len(
|
|
tuple(
|
|
config_repository
|
|
for config in configurations.values()
|
|
for config_repository in config['repositories']
|
|
if repositories_match(config_repository, repository)
|
|
)
|
|
)
|
|
|
|
if count == 0:
|
|
raise ValueError(f'Repository "{repository}" not found in configuration files')
|