0
0
Fork 0
mirror of https://github.com/netdata/netdata.git synced 2025-04-18 11:28:47 +00:00
netdata_netdata/collectors/python.d.plugin/python.d.plugin.in
Ilya Mashchenko 120518e248
python.d.plugin: py2 fix crash on macos ()
##### Summary

Fixes: 

similar issue: https://github.com/ansible/ansible/issues/32499

> This is apparently due to some new security changes made in High Sierra that are breaking lots of Python things that use fork(). Rumor has it that adding
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES before your Ansible run should clear it up. The code that's causing issues is well below Ansible in the stack.

^^

multiprocessing broken on macOS
python.d.plugin uses multiprocessing package

Fix:
 - export `OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES` in case of MacOS

##### Component Name

[/collectors/python.d.plugin](https://github.com/netdata/netdata/tree/master/collectors/python.d.plugin)

##### Additional Information
2019-04-11 19:46:04 +03:00

733 lines
21 KiB
Bash

#!/usr/bin/env bash
'''':;
if [[ "$OSTYPE" == "darwin"* ]]; then
export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES
fi
exec "$(command -v python || command -v python3 || command -v python2 ||
echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
# -*- coding: utf-8 -*-
# Description:
# Author: Pawel Krupa (paulfantom)
# Author: Ilya Mashchenko (l2isbad)
# SPDX-License-Identifier: GPL-3.0-or-later
import collections
import copy
import gc
import multiprocessing
import os
import re
import sys
import time
import threading
import types
PY_VERSION = sys.version_info[:2]
if PY_VERSION > (3, 1):
from importlib.machinery import SourceFileLoader
else:
from imp import load_source as SourceFileLoader
ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
def dirs():
user_config = os.getenv(
ENV_NETDATA_USER_CONFIG_DIR,
'@configdir_POST@',
)
stock_config = os.getenv(
ENV_NETDATA_STOCK_CONFIG_DIR,
'@libconfigdir_POST@',
)
modules_user_config = os.path.join(user_config, 'python.d')
modules_stock_config = os.path.join(stock_config, 'python.d')
modules = os.path.abspath(
os.getenv(
ENV_NETDATA_PLUGINS_DIR,
os.path.dirname(__file__),
) + '/../python.d'
)
pythond_packages = os.path.join(modules, 'python_modules')
return collections.namedtuple(
'Dirs',
[
'user_config',
'stock_config',
'modules_user_config',
'modules_stock_config',
'modules',
'pythond_packages',
]
)(
user_config,
stock_config,
modules_user_config,
modules_stock_config,
modules,
pythond_packages,
)
DIRS = dirs()
sys.path.append(DIRS.pythond_packages)
from bases.collection import safe_print
from bases.loggers import PythonDLogger
from bases.loaders import load_config
try:
from collections import OrderedDict
except ImportError:
from third_party.ordereddict import OrderedDict
END_TASK_MARKER = None
IS_ATTY = sys.stdout.isatty()
PLUGIN_CONF_FILE = 'python.d.conf'
MODULE_SUFFIX = '.chart.py'
OBSOLETED_MODULES = (
'apache_cache', # replaced by web_log
'cpuidle', # rewritten in C
'cpufreq', # rewritten in C
'gunicorn_log', # replaced by web_log
'linux_power_supply', # rewritten in C
'nginx_log', # replaced by web_log
'mdstat', # rewritten in C
'sslcheck', # memory leak bug https://github.com/netdata/netdata/issues/5624
)
AVAILABLE_MODULES = [
m[:-len(MODULE_SUFFIX)] for m in sorted(os.listdir(DIRS.modules))
if m.endswith(MODULE_SUFFIX) and m[:-len(MODULE_SUFFIX)] not in OBSOLETED_MODULES
]
PLUGIN_BASE_CONF = {
'enabled': True,
'default_run': True,
'gc_run': True,
'gc_interval': 300,
}
JOB_BASE_CONF = {
'update_every': os.getenv(ENV_NETDATA_UPDATE_EVERY, 1),
'priority': 60000,
'autodetection_retry': 0,
'chart_cleanup': 10,
'penalty': True,
'name': str(),
}
def heartbeat():
if IS_ATTY:
return
safe_print('\n')
class HeartBeat(threading.Thread):
def __init__(self, every):
threading.Thread.__init__(self)
self.daemon = True
self.every = every
def run(self):
while True:
time.sleep(self.every)
heartbeat()
def load_module(name):
abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX))
module = SourceFileLoader(name, abs_path)
if isinstance(module, types.ModuleType):
return module
return module.load_module()
def multi_path_find(name, paths):
for path in paths:
abs_name = os.path.join(path, name)
if os.path.isfile(abs_name):
return abs_name
return ''
Task = collections.namedtuple(
'Task',
[
'module_name',
'explicitly_enabled',
],
)
Result = collections.namedtuple(
'Result',
[
'module_name',
'jobs_configs',
],
)
class ModuleChecker(multiprocessing.Process):
def __init__(
self,
task_queue,
result_queue,
):
multiprocessing.Process.__init__(self)
self.log = PythonDLogger()
self.log.job_name = 'checker'
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
self.log.info('starting...')
HeartBeat(1).start()
while self.run_once():
pass
self.log.info('terminating...')
def run_once(self):
task = self.task_queue.get()
if task is END_TASK_MARKER:
# TODO: find better solution, understand why heartbeat thread doesn't work
heartbeat()
self.task_queue.task_done()
self.result_queue.put(END_TASK_MARKER)
return False
result = self.do_task(task)
if result:
self.result_queue.put(result)
self.task_queue.task_done()
return True
def do_task(self, task):
self.log.info("{0} : checking".format(task.module_name))
# LOAD SOURCE
module = Module(task.module_name)
try:
module.load_source()
except Exception as error:
self.log.warning("{0} : error on loading source : {1}, skipping module".format(
task.module_name,
error,
))
return None
else:
self.log.info("{0} : source successfully loaded".format(task.module_name))
if module.is_disabled_by_default() and not task.explicitly_enabled:
self.log.info("{0} : disabled by default".format(task.module_name))
return None
# LOAD CONFIG
paths = [
DIRS.modules_user_config,
DIRS.modules_stock_config,
]
conf_abs_path = multi_path_find(
name='{0}.conf'.format(task.module_name),
paths=paths,
)
if conf_abs_path:
self.log.info("{0} : found config file '{1}'".format(task.module_name, conf_abs_path))
try:
module.load_config(conf_abs_path)
except Exception as error:
self.log.warning("{0} : error on loading config : {1}, skipping module".format(
task.module_name, error))
return None
else:
self.log.info("{0} : config was not found in '{1}', using default 1 job config".format(
task.module_name, paths))
# CHECK JOBS
jobs = module.create_jobs()
self.log.info("{0} : created {1} job(s) from the config".format(task.module_name, len(jobs)))
successful_jobs_configs = list()
for job in jobs:
if job.autodetection_retry() > 0:
successful_jobs_configs.append(job.config)
self.log.info("{0}[{1}]: autodetection job, will be checked in main".format(task.module_name, job.name))
continue
try:
job.init()
except Exception as error:
self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job)".format(
task.module_name, job.name, error))
continue
try:
ok = job.check()
except Exception as error:
self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
task.module_name, job.name, error))
continue
if not ok:
self.log.info("{0}[{1}] : check failed, skipping the job".format(task.module_name, job.name))
continue
self.log.info("{0}[{1}] : check successful".format(task.module_name, job.name))
job.config['autodetection_retry'] = job.config['update_every']
successful_jobs_configs.append(job.config)
if not successful_jobs_configs:
self.log.info("{0} : all jobs failed, skipping module".format(task.module_name))
return None
return Result(module.source.__name__, successful_jobs_configs)
class JobConf(OrderedDict):
def __init__(self, *args):
OrderedDict.__init__(self, *args)
def set_defaults_from_module(self, module):
for k in [k for k in JOB_BASE_CONF if hasattr(module, k)]:
self[k] = getattr(module, k)
def set_defaults_from_config(self, module_config):
for k in [k for k in JOB_BASE_CONF if k in module_config]:
self[k] = module_config[k]
def set_job_name(self, name):
self['job_name'] = re.sub(r'\s+', '_', name)
def set_override_name(self, name):
self['override_name'] = re.sub(r'\s+', '_', name)
def as_dict(self):
return copy.deepcopy(OrderedDict(self))
class Job:
def __init__(
self,
service,
module_name,
config,
):
self.service = service
self.config = config
self.module_name = module_name
self.name = config['job_name']
self.override_name = config['override_name']
self.wrapped = None
def init(self):
self.wrapped = self.service(configuration=self.config.as_dict())
def check(self):
return self.wrapped.check()
def post_check(self, min_update_every):
if self.wrapped.update_every < min_update_every:
self.wrapped.update_every = min_update_every
def create(self):
return self.wrapped.create()
def autodetection_retry(self):
return self.config['autodetection_retry']
def run(self):
self.wrapped.run()
class Module:
def __init__(self, name):
self.name = name
self.source = None
self.config = dict()
def is_disabled_by_default(self):
return bool(getattr(self.source, 'disabled_by_default', False))
def load_source(self):
self.source = load_module(self.name)
def load_config(self, abs_path):
self.config = load_config(abs_path) or dict()
def gather_jobs_configs(self):
job_names = [v for v in self.config if isinstance(self.config[v], dict)]
if len(job_names) == 0:
job_conf = JobConf(JOB_BASE_CONF)
job_conf.set_defaults_from_module(self.source)
job_conf.update(self.config)
job_conf.set_job_name(self.name)
job_conf.set_override_name(job_conf.pop('name'))
return [job_conf]
configs = list()
for job_name in job_names:
raw_job_conf = self.config[job_name]
job_conf = JobConf(JOB_BASE_CONF)
job_conf.set_defaults_from_module(self.source)
job_conf.set_defaults_from_config(self.config)
job_conf.update(raw_job_conf)
job_conf.set_job_name(job_name)
job_conf.set_override_name(job_conf.pop('name'))
configs.append(job_conf)
return configs
def create_jobs(self, jobs_conf=None):
return [Job(self.source.Service, self.name, conf) for conf in jobs_conf or self.gather_jobs_configs()]
class JobRunner(threading.Thread):
def __init__(self, job):
threading.Thread.__init__(self)
self.daemon = True
self.wrapped = job
def run(self):
self.wrapped.run()
class PluginConf(dict):
def __init__(self, *args):
dict.__init__(self, *args)
def is_module_enabled(self, module_name, explicit):
if module_name in self:
return self[module_name]
if explicit:
return False
return self['default_run']
class Plugin:
def __init__(
self,
min_update_every=1,
modules_to_run=tuple(AVAILABLE_MODULES),
):
self.log = PythonDLogger()
self.config = PluginConf(PLUGIN_BASE_CONF)
self.task_queue = multiprocessing.JoinableQueue()
self.result_queue = multiprocessing.JoinableQueue()
self.min_update_every = min_update_every
self.modules_to_run = modules_to_run
self.auto_detection_jobs = list()
self.tasks = list()
self.results = list()
self.checked_jobs = collections.defaultdict(list)
self.runs = 0
@staticmethod
def shutdown():
safe_print('DISABLE')
exit(0)
def run(self):
jobs = self.create_jobs()
if not jobs:
return
for job in self.prepare_jobs(jobs):
self.log.info('{0}[{1}] : started in thread'.format(job.module_name, job.name))
JobRunner(job).start()
self.serve()
def enqueue_tasks(self):
for task in self.tasks:
self.task_queue.put(task)
self.task_queue.put(END_TASK_MARKER)
def dequeue_results(self):
while True:
result = self.result_queue.get()
self.result_queue.task_done()
if result is END_TASK_MARKER:
break
self.results.append(result)
def load_config(self):
paths = [
DIRS.user_config,
DIRS.stock_config,
]
self.log.info("checking for config in {0}".format(paths))
abs_path = multi_path_find(name=PLUGIN_CONF_FILE, paths=paths)
if not abs_path:
self.log.warning('config was not found, using defaults')
return True
self.log.info("config found, loading config '{0}'".format(abs_path))
try:
config = load_config(abs_path) or dict()
except Exception as error:
self.log.error('error on loading config : {0}'.format(error))
return False
self.log.info('config successfully loaded')
self.config.update(config)
return True
def setup(self):
self.log.info('starting setup')
if not self.load_config():
return False
if not self.config['enabled']:
self.log.info('disabled in configuration file')
return False
for mod in self.modules_to_run:
if self.config.is_module_enabled(mod, False):
task = Task(mod, self.config.is_module_enabled(mod, True))
self.tasks.append(task)
else:
self.log.info("{0} : disabled in configuration file".format(mod))
if not self.tasks:
self.log.info('no modules to run')
return False
worker = ModuleChecker(self.task_queue, self.result_queue)
self.log.info('starting checker process ({0} module(s) to check)'.format(len(self.tasks)))
worker.start()
# TODO: timeouts?
self.enqueue_tasks()
self.task_queue.join()
self.dequeue_results()
self.result_queue.join()
self.task_queue.close()
self.result_queue.close()
self.log.info('stopping checker process')
worker.join()
if not self.results:
self.log.info('no modules to run')
return False
self.log.info("setup complete, {0} active module(s) : '{1}'".format(
len(self.results),
[v.module_name for v in self.results])
)
return True
def create_jobs(self):
jobs = list()
for result in self.results:
module = Module(result.module_name)
try:
module.load_source()
except Exception as error:
self.log.warning("{0} : error on loading module source : {1}, skipping module".format(
result.module_name, error))
continue
module_jobs = module.create_jobs(result.jobs_configs)
self.log.info("{0} : created {1} job(s)".format(module.name, len(module_jobs)))
jobs.extend(module_jobs)
return jobs
def prepare_jobs(self, jobs):
prepared = list()
for job in jobs:
check_name = job.override_name or job.name
if check_name in self.checked_jobs[job.module_name]:
self.log.info('{0}[{1}] : already served by another job, skipping the job'.format(
job.module_name, job.name))
continue
try:
job.init()
except Exception as error:
self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
job.module_name, job.name, error))
continue
self.log.info("{0}[{1}] : init successful".format(job.module_name, job.name))
try:
ok = job.check()
except Exception as error:
self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
job.module_name, job.name, error))
continue
if not ok:
self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.name))
if job.autodetection_retry() > 0:
self.log.info('{0}[{1}] : will recheck every {2} second(s)'.format(
job.module_name, job.name, job.autodetection_retry()))
self.auto_detection_jobs.append(job)
continue
self.log.info('{0}[{1}] : check successful'.format(job.module_name, job.name))
job.post_check(int(self.min_update_every))
if not job.create():
self.log.info('{0}[{1}] : create failed'.format(job.module_name, job.name))
self.checked_jobs[job.module_name].append(check_name)
prepared.append(job)
return prepared
def serve(self):
gc_run = self.config['gc_run']
gc_interval = self.config['gc_interval']
while True:
self.runs += 1
# threads: main + heartbeat
if threading.active_count() <= 2 and not self.auto_detection_jobs:
return
time.sleep(1)
if gc_run and self.runs % gc_interval == 0:
v = gc.collect()
self.log.debug('GC collection run result: {0}'.format(v))
self.auto_detection_jobs = [job for job in self.auto_detection_jobs if not self.retry_job(job)]
def retry_job(self, job):
stop_retrying = True
retry_later = False
if self.runs % job.autodetection_retry() != 0:
return retry_later
check_name = job.override_name or job.name
if check_name in self.checked_jobs[job.module_name]:
self.log.info("{0}[{1}]: already served by another job, give up on retrying".format(
job.module_name, job.name))
return stop_retrying
try:
ok = job.check()
except Exception as error:
self.log.warning("{0}[{1}] : unhandled exception on recheck : {2}, give up on retrying".format(
job.module_name, job.name, error))
return stop_retrying
if not ok:
self.log.info('{0}[{1}] : recheck failed, will retry in {2} second(s)'.format(
job.module_name, job.name, job.autodetection_retry()))
return retry_later
self.log.info('{0}[{1}] : recheck successful'.format(job.module_name, job.name))
if not job.create():
return stop_retrying
job.post_check(int(self.min_update_every))
self.checked_jobs[job.module_name].append(check_name)
JobRunner(job).start()
return stop_retrying
def parse_cmd():
opts = sys.argv[:][1:]
debug = False
trace = False
update_every = 1
modules_to_run = list()
v = next((opt for opt in opts if opt.isdigit() and int(opt) >= 1), None)
if v:
update_every = v
opts.remove(v)
if 'debug' in opts:
debug = True
opts.remove('debug')
if 'trace' in opts:
trace = True
opts.remove('trace')
if opts:
modules_to_run = list(opts)
return collections.namedtuple(
'CMD',
[
'update_every',
'debug',
'trace',
'modules_to_run',
],
)(
update_every,
debug,
trace,
modules_to_run,
)
def main():
cmd = parse_cmd()
logger = PythonDLogger()
if cmd.debug:
logger.logger.severity = 'DEBUG'
if cmd.trace:
logger.log_traceback = True
logger.info('using python v{0}'.format(PY_VERSION[0]))
unknown_modules = set(cmd.modules_to_run) - set(AVAILABLE_MODULES)
if unknown_modules:
logger.error('unknown modules : {0}'.format(sorted(list(unknown_modules))))
safe_print('DISABLE')
return
plugin = Plugin(
cmd.update_every,
cmd.modules_to_run or AVAILABLE_MODULES,
)
HeartBeat(1).start()
if not plugin.setup():
safe_print('DISABLE')
return
plugin.run()
logger.info('exiting from main...')
plugin.shutdown()
if __name__ == '__main__':
main()