mirror of
https://github.com/netdata/netdata.git
synced 2025-04-12 16:58:10 +00:00
427 lines
14 KiB
Bash
Executable file
427 lines
14 KiB
Bash
Executable file
#!/usr/bin/env bash
|
|
'''':; exec "$(command -v python || command -v python3 || command -v python2 ||
|
|
echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
|
|
|
|
# -*- coding: utf-8 -*-
|
|
# Description:
|
|
# Author: Pawel Krupa (paulfantom)
|
|
# Author: Ilya Mashchenko (l2isbad)
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import gc
|
|
import os
|
|
import sys
|
|
import threading
|
|
|
|
from re import sub
|
|
from sys import version_info, argv
|
|
from time import sleep
|
|
|
|
GC_RUN = True
|
|
GC_COLLECT_EVERY = 300
|
|
|
|
PY_VERSION = version_info[:2]
|
|
|
|
USER_CONFIG_DIR = os.getenv('NETDATA_USER_CONFIG_DIR', '@configdir_POST@')
|
|
STOCK_CONFIG_DIR = os.getenv('NETDATA_STOCK_CONFIG_DIR', '@libconfigdir_POST@')
|
|
|
|
PLUGINS_USER_CONFIG_DIR = os.path.join(USER_CONFIG_DIR, 'python.d')
|
|
PLUGINS_STOCK_CONFIG_DIR = os.path.join(STOCK_CONFIG_DIR, 'python.d')
|
|
|
|
|
|
PLUGINS_DIR = os.path.abspath(os.getenv(
|
|
'NETDATA_PLUGINS_DIR',
|
|
os.path.dirname(__file__)) + '/../python.d')
|
|
|
|
|
|
PYTHON_MODULES_DIR = os.path.join(PLUGINS_DIR, 'python_modules')
|
|
|
|
sys.path.append(PYTHON_MODULES_DIR)
|
|
|
|
from bases.loaders import ModuleAndConfigLoader
|
|
from bases.loggers import PythonDLogger
|
|
from bases.collection import setdefault_values, run_and_exit
|
|
|
|
try:
|
|
from collections import OrderedDict
|
|
except ImportError:
|
|
from third_party.ordereddict import OrderedDict
|
|
|
|
BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
|
|
'retries': 60,
|
|
'priority': 60000,
|
|
'autodetection_retry': 0,
|
|
'chart_cleanup': 10,
|
|
'name': str()}
|
|
|
|
|
|
MODULE_EXTENSION = '.chart.py'
|
|
OBSOLETE_MODULES = ['apache_cache', 'gunicorn_log', 'nginx_log']
|
|
|
|
|
|
def module_ok(m):
|
|
return m.endswith(MODULE_EXTENSION) and m[:-len(MODULE_EXTENSION)] not in OBSOLETE_MODULES
|
|
|
|
|
|
ALL_MODULES = [m for m in sorted(os.listdir(PLUGINS_DIR)) if module_ok(m)]
|
|
|
|
|
|
def parse_cmd():
|
|
debug = 'debug' in argv[1:]
|
|
trace = 'trace' in argv[1:]
|
|
override_update_every = next((arg for arg in argv[1:] if arg.isdigit() and int(arg) > 1), False)
|
|
modules = [''.join([m, MODULE_EXTENSION]) for m in argv[1:] if ''.join([m, MODULE_EXTENSION]) in ALL_MODULES]
|
|
return debug, trace, override_update_every, modules or ALL_MODULES
|
|
|
|
|
|
def multi_job_check(config):
|
|
return next((True for key in config if isinstance(config[key], dict)), False)
|
|
|
|
|
|
class RawModule:
|
|
def __init__(self, name, path, explicitly_enabled=True):
|
|
self.name = name
|
|
self.path = path
|
|
self.explicitly_enabled = explicitly_enabled
|
|
|
|
|
|
class Job(object):
|
|
def __init__(self, initialized_job, job_id):
|
|
"""
|
|
:param initialized_job: instance of <Class Service>
|
|
:param job_id: <str>
|
|
"""
|
|
self.job = initialized_job
|
|
self.id = job_id # key in Modules.jobs()
|
|
self.module_name = self.job.__module__ # used in Plugin.delete_job()
|
|
self.recheck_every = self.job.configuration.pop('autodetection_retry')
|
|
self.checked = False # used in Plugin.check_job()
|
|
self.created = False # used in Plugin.create_job_charts()
|
|
if self.job.update_every < int(OVERRIDE_UPDATE_EVERY):
|
|
self.job.update_every = int(OVERRIDE_UPDATE_EVERY)
|
|
|
|
def __getattr__(self, item):
|
|
return getattr(self.job, item)
|
|
|
|
def __repr__(self):
|
|
return self.job.__repr__()
|
|
|
|
def is_dead(self):
|
|
return bool(self.ident) and not self.is_alive()
|
|
|
|
def not_launched(self):
|
|
return not bool(self.ident)
|
|
|
|
def is_autodetect(self):
|
|
return self.recheck_every
|
|
|
|
|
|
class Module(object):
|
|
def __init__(self, service, config):
|
|
"""
|
|
:param service: <Module>
|
|
:param config: <dict>
|
|
"""
|
|
self.service = service
|
|
self.name = service.__name__
|
|
self.config = self.jobs_configurations_builder(config)
|
|
self.jobs = OrderedDict()
|
|
self.counter = 1
|
|
|
|
self.initialize_jobs()
|
|
|
|
def __repr__(self):
|
|
return "<Class Module '{name}'>".format(name=self.name)
|
|
|
|
def __iter__(self):
|
|
return iter(OrderedDict(self.jobs).values())
|
|
|
|
def __getitem__(self, item):
|
|
return self.jobs[item]
|
|
|
|
def __delitem__(self, key):
|
|
del self.jobs[key]
|
|
|
|
def __len__(self):
|
|
return len(self.jobs)
|
|
|
|
def __bool__(self):
|
|
return bool(self.jobs)
|
|
|
|
def __nonzero__(self):
|
|
return self.__bool__()
|
|
|
|
def jobs_configurations_builder(self, config):
|
|
"""
|
|
:param config: <dict>
|
|
:return:
|
|
"""
|
|
counter = 0
|
|
job_base_config = dict()
|
|
|
|
for attr in BASE_CONFIG:
|
|
job_base_config[attr] = config.pop(attr, getattr(self.service, attr, BASE_CONFIG[attr]))
|
|
|
|
if not config:
|
|
config = {str(): dict()}
|
|
elif not multi_job_check(config):
|
|
config = {str(): config}
|
|
|
|
for job_name in config:
|
|
if not isinstance(config[job_name], dict):
|
|
continue
|
|
|
|
job_config = setdefault_values(config[job_name], base_dict=job_base_config)
|
|
job_name = sub(r'\s+', '_', job_name)
|
|
config[job_name]['name'] = sub(r'\s+', '_', config[job_name]['name'])
|
|
counter += 1
|
|
job_id = 'job' + str(counter).zfill(3)
|
|
|
|
yield job_id, job_name, job_config
|
|
|
|
def initialize_jobs(self):
|
|
"""
|
|
:return:
|
|
"""
|
|
for job_id, job_name, job_config in self.config:
|
|
job_config['job_name'] = job_name
|
|
job_config['override_name'] = job_config.pop('name')
|
|
|
|
try:
|
|
initialized_job = self.service.Service(configuration=job_config)
|
|
except Exception as error:
|
|
Logger.error("job initialization: '{module_name} {job_name}' "
|
|
"=> ['FAILED'] ({error})".format(module_name=self.name,
|
|
job_name=job_name,
|
|
error=error))
|
|
continue
|
|
else:
|
|
Logger.debug("job initialization: '{module_name} {job_name}' "
|
|
"=> ['OK']".format(module_name=self.name,
|
|
job_name=job_name or self.name))
|
|
self.jobs[job_id] = Job(initialized_job=initialized_job,
|
|
job_id=job_id)
|
|
del self.config
|
|
del self.service
|
|
|
|
|
|
class Plugin(object):
|
|
def __init__(self):
|
|
self.loader = ModuleAndConfigLoader()
|
|
self.modules = OrderedDict()
|
|
self.sleep_time = 1
|
|
self.runs_counter = 0
|
|
|
|
user_config = os.path.join(USER_CONFIG_DIR, 'python.d.conf')
|
|
stock_config = os.path.join(STOCK_CONFIG_DIR, 'python.d.conf')
|
|
|
|
Logger.debug("loading '{0}'".format(user_config))
|
|
self.config, error = self.loader.load_config_from_file(user_config)
|
|
|
|
if error:
|
|
Logger.error("cannot load '{0}': {1}. Will try stock version.".format(user_config, error))
|
|
Logger.debug("loading '{0}'".format(stock_config))
|
|
self.config, error = self.loader.load_config_from_file(stock_config)
|
|
if error:
|
|
Logger.error("cannot load '{0}': {1}".format(stock_config, error))
|
|
|
|
self.do_gc = self.config.get("gc_run", GC_RUN)
|
|
self.gc_interval = self.config.get("gc_interval", GC_COLLECT_EVERY)
|
|
|
|
if not self.config.get('enabled', True):
|
|
run_and_exit(Logger.info)('DISABLED in configuration file.')
|
|
|
|
self.load_and_initialize_modules()
|
|
if not self.modules:
|
|
run_and_exit(Logger.info)('No modules to run. Exit...')
|
|
|
|
def __iter__(self):
|
|
return iter(OrderedDict(self.modules).values())
|
|
|
|
@property
|
|
def jobs(self):
|
|
return (job for mod in self for job in mod)
|
|
|
|
@property
|
|
def dead_jobs(self):
|
|
return (job for job in self.jobs if job.is_dead())
|
|
|
|
@property
|
|
def autodetect_jobs(self):
|
|
return [job for job in self.jobs if job.not_launched()]
|
|
|
|
def enabled_modules(self):
|
|
for mod in MODULES_TO_RUN:
|
|
mod_name = mod[:-len(MODULE_EXTENSION)]
|
|
mod_path = os.path.join(PLUGINS_DIR, mod)
|
|
if any(
|
|
[
|
|
self.config.get('default_run', True) and self.config.get(mod_name, True),
|
|
(not self.config.get('default_run')) and self.config.get(mod_name),
|
|
]
|
|
):
|
|
yield RawModule(
|
|
name=mod_name,
|
|
path=mod_path,
|
|
explicitly_enabled=self.config.get(mod_name),
|
|
)
|
|
|
|
def load_and_initialize_modules(self):
|
|
for mod in self.enabled_modules():
|
|
|
|
# Load module from file ------------------------------------------------------------
|
|
loaded_module, error = self.loader.load_module_from_file(mod.name, mod.path)
|
|
log = Logger.error if error else Logger.debug
|
|
log("module load source: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK',
|
|
module_name=mod.name))
|
|
if error:
|
|
Logger.error("load source error : {0}".format(error))
|
|
continue
|
|
|
|
# Load module config from file ------------------------------------------------------
|
|
user_config = os.path.join(PLUGINS_USER_CONFIG_DIR, mod.name + '.conf')
|
|
stock_config = os.path.join(PLUGINS_STOCK_CONFIG_DIR, mod.name + '.conf')
|
|
|
|
Logger.debug("loading '{0}'".format(user_config))
|
|
loaded_config, error = self.loader.load_config_from_file(user_config)
|
|
if error:
|
|
Logger.error("cannot load '{0}' : {1}. Will try stock version.".format(user_config, error))
|
|
Logger.debug("loading '{0}'".format(stock_config))
|
|
loaded_config, error = self.loader.load_config_from_file(stock_config)
|
|
|
|
if error:
|
|
Logger.error("cannot load '{0}': {1}".format(stock_config, error))
|
|
|
|
# Skip disabled modules
|
|
if getattr(loaded_module, 'disabled_by_default', False) and not mod.explicitly_enabled:
|
|
Logger.info("module '{0}' disabled by default".format(loaded_module.__name__))
|
|
continue
|
|
|
|
# Module initialization ---------------------------------------------------
|
|
|
|
initialized_module = Module(service=loaded_module, config=loaded_config)
|
|
Logger.debug("module status: '{module_name}' => [{status}] "
|
|
"(jobs: {jobs_number})".format(status='OK' if initialized_module else 'FAILED',
|
|
module_name=initialized_module.name,
|
|
jobs_number=len(initialized_module)))
|
|
if initialized_module:
|
|
self.modules[initialized_module.name] = initialized_module
|
|
|
|
@staticmethod
|
|
def check_job(job):
|
|
"""
|
|
:param job: <Job>
|
|
:return:
|
|
"""
|
|
try:
|
|
check_ok = bool(job.check())
|
|
except Exception as error:
|
|
job.error('check() unhandled exception: {error}'.format(error=error))
|
|
return None
|
|
else:
|
|
return check_ok
|
|
|
|
@staticmethod
|
|
def create_job_charts(job):
|
|
"""
|
|
:param job: <Job>
|
|
:return:
|
|
"""
|
|
try:
|
|
create_ok = job.create()
|
|
except Exception as error:
|
|
job.error('create() unhandled exception: {error}'.format(error=error))
|
|
return False
|
|
else:
|
|
return create_ok
|
|
|
|
def delete_job(self, job):
|
|
"""
|
|
:param job: <Job>
|
|
:return:
|
|
"""
|
|
del self.modules[job.module_name][job.id]
|
|
|
|
def run_check(self):
|
|
checked = list()
|
|
for job in self.jobs:
|
|
if job.name in checked:
|
|
job.info('check() => [DROPPED] (already served by another job)')
|
|
self.delete_job(job)
|
|
continue
|
|
ok = self.check_job(job)
|
|
if ok:
|
|
job.info('check() => [OK]')
|
|
checked.append(job.name)
|
|
job.checked = True
|
|
continue
|
|
if not job.is_autodetect() or ok is None:
|
|
job.info('check() => [FAILED]')
|
|
self.delete_job(job)
|
|
else:
|
|
job.info('check() => [RECHECK] (autodetection_retry: {0})'.format(job.recheck_every))
|
|
|
|
def run_create(self):
|
|
for job in self.jobs:
|
|
if not job.checked:
|
|
# skip autodetection_retry jobs
|
|
continue
|
|
ok = self.create_job_charts(job)
|
|
if ok:
|
|
job.debug('create() => [OK] (charts: {0})'.format(len(job.charts)))
|
|
job.created = True
|
|
continue
|
|
job.error('create() => [FAILED] (charts: {0})'.format(len(job.charts)))
|
|
self.delete_job(job)
|
|
|
|
def start(self):
|
|
self.run_check()
|
|
self.run_create()
|
|
for job in self.jobs:
|
|
if job.created:
|
|
job.start()
|
|
|
|
while True:
|
|
if threading.active_count() <= 1 and not self.autodetect_jobs:
|
|
run_and_exit(Logger.info)('FINISHED')
|
|
|
|
sleep(self.sleep_time)
|
|
self.cleanup()
|
|
self.autodetect_retry()
|
|
|
|
# FIXME: https://github.com/netdata/netdata/issues/3817
|
|
if self.do_gc and self.runs_counter % self.gc_interval == 0:
|
|
v = gc.collect()
|
|
Logger.debug("GC full collection run result: {0}".format(v))
|
|
|
|
def cleanup(self):
|
|
for job in self.dead_jobs:
|
|
self.delete_job(job)
|
|
for mod in self:
|
|
if not mod:
|
|
del self.modules[mod.name]
|
|
|
|
def autodetect_retry(self):
|
|
self.runs_counter += self.sleep_time
|
|
for job in self.autodetect_jobs:
|
|
if self.runs_counter % job.recheck_every == 0:
|
|
checked = self.check_job(job)
|
|
if checked:
|
|
created = self.create_job_charts(job)
|
|
if not created:
|
|
self.delete_job(job)
|
|
continue
|
|
job.start()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
DEBUG, TRACE, OVERRIDE_UPDATE_EVERY, MODULES_TO_RUN = parse_cmd()
|
|
Logger = PythonDLogger()
|
|
if DEBUG:
|
|
Logger.logger.severity = 'DEBUG'
|
|
if TRACE:
|
|
Logger.log_traceback = True
|
|
Logger.info('Using python {version}'.format(version=PY_VERSION[0]))
|
|
|
|
plugin = Plugin()
|
|
plugin.start()
|