mirror of
https://github.com/netdata/netdata.git
synced 2025-05-01 16:09:52 +00:00
922 lines
26 KiB
Bash
922 lines
26 KiB
Bash
#!/usr/bin/env bash
|
|
'''':;
|
|
pybinary=$(which python3 || which python || which python2)
|
|
filtered=()
|
|
for arg in "$@"
|
|
do
|
|
case $arg in
|
|
-p*) pybinary=${arg:2}
|
|
shift 1 ;;
|
|
*) filtered+=("$arg") ;;
|
|
esac
|
|
done
|
|
if [ "$pybinary" = "" ]
|
|
then
|
|
echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM"
|
|
exit 1
|
|
fi
|
|
exec "$pybinary" "$0" "${filtered[@]}" # '''
|
|
|
|
# -*- coding: utf-8 -*-
|
|
# Description:
|
|
# Author: Pawel Krupa (paulfantom)
|
|
# Author: Ilya Mashchenko (l2isbad)
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
import collections
|
|
import copy
|
|
import gc
|
|
import json
|
|
import os
|
|
import pprint
|
|
import re
|
|
import sys
|
|
import threading
|
|
import time
|
|
import types
|
|
|
|
try:
|
|
from queue import Queue
|
|
except ImportError:
|
|
from Queue import Queue
|
|
|
|
PY_VERSION = sys.version_info[:2] # (major=3, minor=7, micro=3, releaselevel='final', serial=0)
|
|
|
|
if PY_VERSION > (3, 1):
|
|
from importlib.machinery import SourceFileLoader
|
|
else:
|
|
from imp import load_source as SourceFileLoader
|
|
|
|
ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
|
|
ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
|
|
ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
|
|
ENV_NETDATA_USER_PLUGINS_DIRS = 'NETDATA_USER_PLUGINS_DIRS'
|
|
ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR'
|
|
ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
|
|
ENV_NETDATA_LOCK_DIR = 'NETDATA_LOCK_DIR'
|
|
|
|
|
|
def add_pythond_packages():
|
|
pluginsd = os.getenv(ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__))
|
|
pythond = os.path.abspath(pluginsd + '/../python.d')
|
|
packages = os.path.join(pythond, 'python_modules')
|
|
sys.path.append(packages)
|
|
|
|
|
|
add_pythond_packages()
|
|
|
|
from bases.collection import safe_print
|
|
from bases.loggers import PythonDLogger
|
|
from bases.loaders import load_config
|
|
from third_party import filelock
|
|
|
|
try:
|
|
from collections import OrderedDict
|
|
except ImportError:
|
|
from third_party.ordereddict import OrderedDict
|
|
|
|
|
|
def dirs():
|
|
var_lib = os.getenv(
|
|
ENV_NETDATA_LIB_DIR,
|
|
'@varlibdir_POST@',
|
|
)
|
|
plugin_user_config = os.getenv(
|
|
ENV_NETDATA_USER_CONFIG_DIR,
|
|
'@configdir_POST@',
|
|
)
|
|
plugin_stock_config = os.getenv(
|
|
ENV_NETDATA_STOCK_CONFIG_DIR,
|
|
'@libconfigdir_POST@',
|
|
)
|
|
pluginsd = os.getenv(
|
|
ENV_NETDATA_PLUGINS_DIR,
|
|
os.path.dirname(__file__),
|
|
)
|
|
locks = os.getenv(
|
|
ENV_NETDATA_LOCK_DIR,
|
|
os.path.join('@varlibdir_POST@', 'lock')
|
|
)
|
|
modules_user_config = os.path.join(plugin_user_config, 'python.d')
|
|
modules_stock_config = os.path.join(plugin_stock_config, 'python.d')
|
|
modules = os.path.abspath(pluginsd + '/../python.d')
|
|
user_modules = [os.path.join(p, 'python.d') for p in
|
|
os.getenv(ENV_NETDATA_USER_PLUGINS_DIRS, "").split(" ") if
|
|
p]
|
|
|
|
Dirs = collections.namedtuple(
|
|
'Dirs',
|
|
[
|
|
'plugin_user_config',
|
|
'plugin_stock_config',
|
|
'modules_user_config',
|
|
'modules_stock_config',
|
|
'modules',
|
|
'user_modules',
|
|
'var_lib',
|
|
'locks',
|
|
]
|
|
)
|
|
return Dirs(
|
|
plugin_user_config,
|
|
plugin_stock_config,
|
|
modules_user_config,
|
|
modules_stock_config,
|
|
modules,
|
|
user_modules,
|
|
var_lib,
|
|
locks,
|
|
)
|
|
|
|
|
|
DIRS = dirs()
|
|
|
|
IS_ATTY = sys.stdout.isatty() or sys.stderr.isatty()
|
|
|
|
MODULE_SUFFIX = '.chart.py'
|
|
|
|
|
|
def find_available_modules(*directories):
|
|
AvailableModule = collections.namedtuple(
|
|
'AvailableModule',
|
|
[
|
|
'filepath',
|
|
'name',
|
|
]
|
|
)
|
|
available = list()
|
|
for d in directories:
|
|
try:
|
|
if not os.path.isdir(d):
|
|
continue
|
|
files = sorted(os.listdir(d))
|
|
except OSError:
|
|
continue
|
|
modules = [m for m in files if m.endswith(MODULE_SUFFIX)]
|
|
available.extend([AvailableModule(os.path.join(d, m), m[:-len(MODULE_SUFFIX)]) for m in modules])
|
|
|
|
return available
|
|
|
|
|
|
def available_modules():
|
|
obsolete = (
|
|
'apache_cache', # replaced by web_log
|
|
'cpuidle', # rewritten in C
|
|
'cpufreq', # rewritten in C
|
|
'gunicorn_log', # replaced by web_log
|
|
'linux_power_supply', # rewritten in C
|
|
'nginx_log', # replaced by web_log
|
|
'mdstat', # rewritten in C
|
|
'sslcheck', # rewritten in Go, memory leak bug https://github.com/netdata/netdata/issues/5624
|
|
'unbound', # rewritten in Go
|
|
)
|
|
|
|
stock = [m for m in find_available_modules(DIRS.modules) if m.name not in obsolete]
|
|
user = find_available_modules(*DIRS.user_modules)
|
|
|
|
available, seen = list(), set()
|
|
for m in user + stock:
|
|
if m.name in seen:
|
|
continue
|
|
seen.add(m.name)
|
|
available.append(m)
|
|
|
|
return available
|
|
|
|
|
|
AVAILABLE_MODULES = available_modules()
|
|
|
|
JOB_BASE_CONF = {
|
|
'update_every': int(os.getenv(ENV_NETDATA_UPDATE_EVERY, 1)),
|
|
'priority': 60000,
|
|
'autodetection_retry': 0,
|
|
'chart_cleanup': 10,
|
|
'penalty': True,
|
|
'name': str(),
|
|
}
|
|
|
|
PLUGIN_BASE_CONF = {
|
|
'enabled': True,
|
|
'default_run': True,
|
|
'gc_run': True,
|
|
'gc_interval': 300,
|
|
}
|
|
|
|
|
|
def multi_path_find(name, *paths):
|
|
for path in paths:
|
|
abs_name = os.path.join(path, name)
|
|
if os.path.isfile(abs_name):
|
|
return abs_name
|
|
return str()
|
|
|
|
|
|
def load_module(name, filepath):
|
|
module = SourceFileLoader('pythond_' + name, filepath)
|
|
if isinstance(module, types.ModuleType):
|
|
return module
|
|
return module.load_module()
|
|
|
|
|
|
class ModuleConfig:
|
|
def __init__(self, name, config=None):
|
|
self.name = name
|
|
self.config = config or OrderedDict()
|
|
|
|
def load(self, abs_path):
|
|
self.config.update(load_config(abs_path) or dict())
|
|
|
|
def defaults(self):
|
|
keys = (
|
|
'update_every',
|
|
'priority',
|
|
'autodetection_retry',
|
|
'chart_cleanup',
|
|
'penalty',
|
|
)
|
|
return dict((k, self.config[k]) for k in keys if k in self.config)
|
|
|
|
def create_job(self, job_name, job_config=None):
|
|
job_config = job_config or dict()
|
|
|
|
config = OrderedDict()
|
|
config.update(job_config)
|
|
config['job_name'] = job_name
|
|
for k, v in self.defaults().items():
|
|
config.setdefault(k, v)
|
|
|
|
return config
|
|
|
|
def job_names(self):
|
|
return [v for v in self.config if isinstance(self.config.get(v), dict)]
|
|
|
|
def single_job(self):
|
|
return [self.create_job(self.name, self.config)]
|
|
|
|
def multi_job(self):
|
|
return [self.create_job(n, self.config[n]) for n in self.job_names()]
|
|
|
|
def create_jobs(self):
|
|
return self.multi_job() or self.single_job()
|
|
|
|
|
|
class JobsConfigsBuilder:
|
|
def __init__(self, config_dirs):
|
|
self.config_dirs = config_dirs
|
|
self.log = PythonDLogger()
|
|
self.job_defaults = None
|
|
self.module_defaults = None
|
|
self.min_update_every = None
|
|
|
|
def load_module_config(self, module_name):
|
|
name = '{0}.conf'.format(module_name)
|
|
self.log.debug("[{0}] looking for '{1}' in {2}".format(module_name, name, self.config_dirs))
|
|
config = ModuleConfig(module_name)
|
|
|
|
abs_path = multi_path_find(name, *self.config_dirs)
|
|
if not abs_path:
|
|
self.log.warning("[{0}] '{1}' was not found".format(module_name, name))
|
|
return config
|
|
|
|
self.log.debug("[{0}] loading '{1}'".format(module_name, abs_path))
|
|
try:
|
|
config.load(abs_path)
|
|
except Exception as error:
|
|
self.log.error("[{0}] error on loading '{1}' : {2}".format(module_name, abs_path, repr(error)))
|
|
return None
|
|
|
|
self.log.debug("[{0}] '{1}' is loaded".format(module_name, abs_path))
|
|
return config
|
|
|
|
@staticmethod
|
|
def apply_defaults(jobs, defaults):
|
|
if defaults is None:
|
|
return
|
|
for k, v in defaults.items():
|
|
for job in jobs:
|
|
job.setdefault(k, v)
|
|
|
|
def set_min_update_every(self, jobs, min_update_every):
|
|
if min_update_every is None:
|
|
return
|
|
for job in jobs:
|
|
if 'update_every' in job and job['update_every'] < self.min_update_every:
|
|
job['update_every'] = self.min_update_every
|
|
|
|
def build(self, module_name):
|
|
config = self.load_module_config(module_name)
|
|
if config is None:
|
|
return None
|
|
|
|
configs = config.create_jobs()
|
|
self.log.info("[{0}] built {1} job(s) configs".format(module_name, len(configs)))
|
|
|
|
self.apply_defaults(configs, self.module_defaults)
|
|
self.apply_defaults(configs, self.job_defaults)
|
|
self.set_min_update_every(configs, self.min_update_every)
|
|
|
|
return configs
|
|
|
|
|
|
JOB_STATUS_ACTIVE = 'active'
|
|
JOB_STATUS_RECOVERING = 'recovering'
|
|
JOB_STATUS_DROPPED = 'dropped'
|
|
JOB_STATUS_INIT = 'initial'
|
|
|
|
|
|
class Job(threading.Thread):
|
|
inf = -1
|
|
|
|
def __init__(self, service, module_name, config):
|
|
threading.Thread.__init__(self)
|
|
self.daemon = True
|
|
self.service = service
|
|
self.module_name = module_name
|
|
self.config = config
|
|
self.real_name = config['job_name']
|
|
self.actual_name = config['override_name'] or self.real_name
|
|
self.autodetection_retry = config['autodetection_retry']
|
|
self.checks = self.inf
|
|
self.job = None
|
|
self.status = JOB_STATUS_INIT
|
|
|
|
def is_inited(self):
|
|
return self.job is not None
|
|
|
|
def init(self):
|
|
self.job = self.service(configuration=copy.deepcopy(self.config))
|
|
|
|
def full_name(self):
|
|
return self.job.name
|
|
|
|
def check(self):
|
|
ok = self.job.check()
|
|
self.checks -= self.checks != self.inf and not ok
|
|
return ok
|
|
|
|
def create(self):
|
|
self.job.create()
|
|
|
|
def need_to_recheck(self):
|
|
return self.autodetection_retry != 0 and self.checks != 0
|
|
|
|
def run(self):
|
|
self.job.run()
|
|
|
|
|
|
class ModuleSrc:
|
|
def __init__(self, m):
|
|
self.name = m.name
|
|
self.filepath = m.filepath
|
|
self.src = None
|
|
|
|
def load(self):
|
|
self.src = load_module(self.name, self.filepath)
|
|
|
|
def get(self, key):
|
|
return getattr(self.src, key, None)
|
|
|
|
def service(self):
|
|
return self.get('Service')
|
|
|
|
def defaults(self):
|
|
keys = (
|
|
'update_every',
|
|
'priority',
|
|
'autodetection_retry',
|
|
'chart_cleanup',
|
|
'penalty',
|
|
)
|
|
return dict((k, self.get(k)) for k in keys if self.get(k) is not None)
|
|
|
|
def is_disabled_by_default(self):
|
|
return bool(self.get('disabled_by_default'))
|
|
|
|
|
|
class JobsStatuses:
|
|
def __init__(self):
|
|
self.items = OrderedDict()
|
|
|
|
def dump(self):
|
|
return json.dumps(self.items, indent=2)
|
|
|
|
def get(self, module_name, job_name):
|
|
if module_name not in self.items:
|
|
return None
|
|
return self.items[module_name].get(job_name)
|
|
|
|
def has(self, module_name, job_name):
|
|
return self.get(module_name, job_name) is not None
|
|
|
|
def from_file(self, path):
|
|
with open(path) as f:
|
|
data = json.load(f)
|
|
return self.from_json(data)
|
|
|
|
@staticmethod
|
|
def from_json(items):
|
|
if not isinstance(items, dict):
|
|
raise Exception('items obj has wrong type : {0}'.format(type(items)))
|
|
if not items:
|
|
return JobsStatuses()
|
|
|
|
v = OrderedDict()
|
|
for mod_name in sorted(items):
|
|
if not items[mod_name]:
|
|
continue
|
|
v[mod_name] = OrderedDict()
|
|
for job_name in sorted(items[mod_name]):
|
|
v[mod_name][job_name] = items[mod_name][job_name]
|
|
|
|
rv = JobsStatuses()
|
|
rv.items = v
|
|
return rv
|
|
|
|
@staticmethod
|
|
def from_jobs(jobs):
|
|
v = OrderedDict()
|
|
for job in jobs:
|
|
status = job.status
|
|
if status not in (JOB_STATUS_ACTIVE, JOB_STATUS_RECOVERING):
|
|
continue
|
|
if job.module_name not in v:
|
|
v[job.module_name] = OrderedDict()
|
|
v[job.module_name][job.real_name] = status
|
|
|
|
rv = JobsStatuses()
|
|
rv.items = v
|
|
return rv
|
|
|
|
|
|
class StdoutSaver:
|
|
@staticmethod
|
|
def save(dump):
|
|
print(dump)
|
|
|
|
|
|
class CachedFileSaver:
|
|
def __init__(self, path):
|
|
self.last_save_success = False
|
|
self.last_saved_dump = str()
|
|
self.path = path
|
|
|
|
def save(self, dump):
|
|
if self.last_save_success and self.last_saved_dump == dump:
|
|
return
|
|
try:
|
|
with open(self.path, 'w') as out:
|
|
out.write(dump)
|
|
except Exception:
|
|
self.last_save_success = False
|
|
raise
|
|
self.last_saved_dump = dump
|
|
self.last_save_success = True
|
|
|
|
|
|
class PluginConfig(dict):
|
|
def __init__(self, *args):
|
|
dict.__init__(self, *args)
|
|
|
|
def is_module_explicitly_enabled(self, module_name):
|
|
return self._is_module_enabled(module_name, True)
|
|
|
|
def is_module_enabled(self, module_name):
|
|
return self._is_module_enabled(module_name, False)
|
|
|
|
def _is_module_enabled(self, module_name, explicit):
|
|
if module_name in self:
|
|
return self[module_name]
|
|
if explicit:
|
|
return False
|
|
return self['default_run']
|
|
|
|
|
|
class FileLockRegistry:
|
|
def __init__(self, path):
|
|
self.path = path
|
|
self.locks = dict()
|
|
|
|
@staticmethod
|
|
def rename(name):
|
|
# go version name is 'docker'
|
|
if name.startswith("dockerd"):
|
|
name = "docker" + name[7:]
|
|
return name
|
|
|
|
|
|
def register(self, name):
|
|
name = self.rename(name)
|
|
if name in self.locks:
|
|
return
|
|
file = os.path.join(self.path, '{0}.collector.lock'.format(name))
|
|
lock = filelock.FileLock(file)
|
|
lock.acquire(timeout=0)
|
|
self.locks[name] = lock
|
|
|
|
def unregister(self, name):
|
|
name = self.rename(name)
|
|
if name not in self.locks:
|
|
return
|
|
lock = self.locks[name]
|
|
lock.release()
|
|
del self.locks[name]
|
|
|
|
|
|
class DummyRegistry:
|
|
def register(self, name):
|
|
pass
|
|
|
|
def unregister(self, name):
|
|
pass
|
|
|
|
|
|
class Plugin:
|
|
config_name = 'python.d.conf'
|
|
jobs_status_dump_name = 'pythond-jobs-statuses.json'
|
|
|
|
def __init__(self, modules_to_run, min_update_every, registry):
|
|
self.modules_to_run = modules_to_run
|
|
self.min_update_every = min_update_every
|
|
self.config = PluginConfig(PLUGIN_BASE_CONF)
|
|
self.log = PythonDLogger()
|
|
self.registry = registry
|
|
self.started_jobs = collections.defaultdict(dict)
|
|
self.jobs = list()
|
|
self.saver = None
|
|
self.runs = 0
|
|
|
|
def load_config_file(self, filepath, expected):
|
|
self.log.debug("looking for '{0}'".format(filepath))
|
|
if not os.path.isfile(filepath):
|
|
log = self.log.info if not expected else self.log.error
|
|
log("'{0}' was not found".format(filepath))
|
|
return dict()
|
|
try:
|
|
config = load_config(filepath)
|
|
except Exception as error:
|
|
self.log.error("error on loading '{0}' : {1}".format(filepath, repr(error)))
|
|
return dict()
|
|
self.log.debug("'{0}' is loaded".format(filepath))
|
|
return config
|
|
|
|
def load_config(self):
|
|
user_config = self.load_config_file(
|
|
filepath=os.path.join(DIRS.plugin_user_config, self.config_name),
|
|
expected=False,
|
|
)
|
|
stock_config = self.load_config_file(
|
|
filepath=os.path.join(DIRS.plugin_stock_config, self.config_name),
|
|
expected=True,
|
|
)
|
|
self.config.update(stock_config)
|
|
self.config.update(user_config)
|
|
|
|
def load_job_statuses(self):
|
|
self.log.debug("looking for '{0}' in {1}".format(self.jobs_status_dump_name, DIRS.var_lib))
|
|
abs_path = multi_path_find(self.jobs_status_dump_name, DIRS.var_lib)
|
|
if not abs_path:
|
|
self.log.warning("'{0}' was not found".format(self.jobs_status_dump_name))
|
|
return
|
|
|
|
self.log.debug("loading '{0}'".format(abs_path))
|
|
try:
|
|
statuses = JobsStatuses().from_file(abs_path)
|
|
except Exception as error:
|
|
self.log.error("[{0}] config file invalid YAML format: {1}".format(
|
|
module_name, ' '.join([v.strip() for v in str(error).split('\n')])))
|
|
return None
|
|
self.log.debug("'{0}' is loaded".format(abs_path))
|
|
return statuses
|
|
|
|
def create_jobs(self, job_statuses=None):
|
|
paths = [
|
|
DIRS.modules_user_config,
|
|
DIRS.modules_stock_config,
|
|
]
|
|
|
|
builder = JobsConfigsBuilder(paths)
|
|
builder.job_defaults = JOB_BASE_CONF
|
|
builder.min_update_every = self.min_update_every
|
|
|
|
jobs = list()
|
|
for m in self.modules_to_run:
|
|
if not self.config.is_module_enabled(m.name):
|
|
self.log.info("[{0}] is disabled in the configuration file, skipping it".format(m.name))
|
|
continue
|
|
|
|
src = ModuleSrc(m)
|
|
try:
|
|
src.load()
|
|
except Exception as error:
|
|
self.log.warning("[{0}] error on loading source : {1}, skipping it".format(m.name, repr(error)))
|
|
continue
|
|
self.log.debug("[{0}] loaded module source : '{1}'".format(m.name, m.filepath))
|
|
|
|
if not (src.service() and callable(src.service())):
|
|
self.log.warning("[{0}] has no callable Service object, skipping it".format(m.name))
|
|
continue
|
|
|
|
if src.is_disabled_by_default() and not self.config.is_module_explicitly_enabled(m.name):
|
|
self.log.info("[{0}] is disabled by default, skipping it".format(m.name))
|
|
continue
|
|
|
|
builder.module_defaults = src.defaults()
|
|
configs = builder.build(m.name)
|
|
if not configs:
|
|
self.log.info("[{0}] has no job configs, skipping it".format(m.name))
|
|
continue
|
|
|
|
for config in configs:
|
|
config['job_name'] = re.sub(r'\s+', '_', config['job_name'])
|
|
config['override_name'] = re.sub(r'\s+', '_', config.pop('name'))
|
|
|
|
job = Job(src.service(), m.name, config)
|
|
|
|
was_previously_active = job_statuses and job_statuses.has(job.module_name, job.real_name)
|
|
if was_previously_active and job.autodetection_retry == 0:
|
|
self.log.debug('{0}[{1}] was previously active, applying recovering settings'.format(
|
|
job.module_name, job.real_name))
|
|
job.checks = 11
|
|
job.autodetection_retry = 30
|
|
|
|
jobs.append(job)
|
|
|
|
return jobs
|
|
|
|
def setup(self):
|
|
self.load_config()
|
|
|
|
if not self.config['enabled']:
|
|
self.log.info('disabled in the configuration file')
|
|
return False
|
|
|
|
statuses = self.load_job_statuses()
|
|
|
|
self.jobs = self.create_jobs(statuses)
|
|
if not self.jobs:
|
|
self.log.info('no jobs to run')
|
|
return False
|
|
|
|
if not IS_ATTY:
|
|
abs_path = os.path.join(DIRS.var_lib, self.jobs_status_dump_name)
|
|
self.saver = CachedFileSaver(abs_path)
|
|
return True
|
|
|
|
def start_jobs(self, *jobs):
|
|
for job in jobs:
|
|
if job.status not in (JOB_STATUS_INIT, JOB_STATUS_RECOVERING):
|
|
continue
|
|
|
|
if job.actual_name in self.started_jobs[job.module_name]:
|
|
self.log.info('{0}[{1}] : already served by another job, skipping it'.format(
|
|
job.module_name, job.real_name))
|
|
job.status = JOB_STATUS_DROPPED
|
|
continue
|
|
|
|
if not job.is_inited():
|
|
try:
|
|
job.init()
|
|
except Exception as error:
|
|
self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
|
|
job.module_name, job.real_name, repr(error)))
|
|
job.status = JOB_STATUS_DROPPED
|
|
continue
|
|
|
|
try:
|
|
ok = job.check()
|
|
except Exception as error:
|
|
self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
|
|
job.module_name, job.real_name, repr(error)))
|
|
job.status = JOB_STATUS_DROPPED
|
|
continue
|
|
if not ok:
|
|
self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.real_name))
|
|
job.status = JOB_STATUS_RECOVERING if job.need_to_recheck() else JOB_STATUS_DROPPED
|
|
continue
|
|
self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name))
|
|
|
|
try:
|
|
self.registry.register(job.full_name())
|
|
except filelock.Timeout as error:
|
|
self.log.info('{0}[{1}] : already registered by another process, skipping the job ({2})'.format(
|
|
job.module_name, job.real_name, error))
|
|
job.status = JOB_STATUS_DROPPED
|
|
continue
|
|
except Exception as error:
|
|
self.log.warning('{0}[{1}] : registration failed: {2}, skipping the job'.format(
|
|
job.module_name, job.real_name, error))
|
|
job.status = JOB_STATUS_DROPPED
|
|
continue
|
|
|
|
try:
|
|
job.create()
|
|
except Exception as error:
|
|
self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format(
|
|
job.module_name, job.real_name, repr(error)))
|
|
job.status = JOB_STATUS_DROPPED
|
|
try:
|
|
self.registry.unregister(job.full_name())
|
|
except Exception as error:
|
|
self.log.warning('{0}[{1}] : deregistration failed: {2}'.format(
|
|
job.module_name, job.real_name, error))
|
|
continue
|
|
|
|
self.started_jobs[job.module_name] = job.actual_name
|
|
job.status = JOB_STATUS_ACTIVE
|
|
job.start()
|
|
|
|
@staticmethod
|
|
def keep_alive():
|
|
if not IS_ATTY:
|
|
safe_print('\n')
|
|
|
|
def garbage_collection(self):
|
|
if self.config['gc_run'] and self.runs % self.config['gc_interval'] == 0:
|
|
v = gc.collect()
|
|
self.log.debug('GC collection run result: {0}'.format(v))
|
|
|
|
def restart_recovering_jobs(self):
|
|
for job in self.jobs:
|
|
if job.status != JOB_STATUS_RECOVERING:
|
|
continue
|
|
if self.runs % job.autodetection_retry != 0:
|
|
continue
|
|
self.start_jobs(job)
|
|
|
|
def cleanup_jobs(self):
|
|
self.jobs = [j for j in self.jobs if j.status != JOB_STATUS_DROPPED]
|
|
|
|
def have_alive_jobs(self):
|
|
return next(
|
|
(True for job in self.jobs if job.status in (JOB_STATUS_RECOVERING, JOB_STATUS_ACTIVE)),
|
|
False,
|
|
)
|
|
|
|
def save_job_statuses(self):
|
|
if self.saver is None:
|
|
return
|
|
if self.runs % 10 != 0:
|
|
return
|
|
dump = JobsStatuses().from_jobs(self.jobs).dump()
|
|
try:
|
|
self.saver.save(dump)
|
|
except Exception as error:
|
|
self.log.error("error on saving jobs statuses dump : {0}".format(repr(error)))
|
|
|
|
def serve_once(self):
|
|
if not self.have_alive_jobs():
|
|
self.log.info('no jobs to serve')
|
|
return False
|
|
|
|
time.sleep(1)
|
|
self.runs += 1
|
|
|
|
self.keep_alive()
|
|
self.garbage_collection()
|
|
self.cleanup_jobs()
|
|
self.restart_recovering_jobs()
|
|
self.save_job_statuses()
|
|
return True
|
|
|
|
def serve(self):
|
|
while self.serve_once():
|
|
pass
|
|
|
|
def run(self):
|
|
self.start_jobs(*self.jobs)
|
|
self.serve()
|
|
|
|
|
|
def parse_command_line():
|
|
opts = sys.argv[:][1:]
|
|
|
|
debug = False
|
|
trace = False
|
|
nolock = False
|
|
update_every = 1
|
|
modules_to_run = list()
|
|
|
|
def find_first_positive_int(values):
|
|
return next((v for v in values if v.isdigit() and int(v) >= 1), None)
|
|
|
|
u = find_first_positive_int(opts)
|
|
if u is not None:
|
|
update_every = int(u)
|
|
opts.remove(u)
|
|
if 'debug' in opts:
|
|
debug = True
|
|
opts.remove('debug')
|
|
if 'trace' in opts:
|
|
trace = True
|
|
opts.remove('trace')
|
|
if 'nolock' in opts:
|
|
nolock = True
|
|
opts.remove('nolock')
|
|
if opts:
|
|
modules_to_run = list(opts)
|
|
|
|
cmd = collections.namedtuple(
|
|
'CMD',
|
|
[
|
|
'update_every',
|
|
'debug',
|
|
'trace',
|
|
'nolock',
|
|
'modules_to_run',
|
|
])
|
|
return cmd(
|
|
update_every,
|
|
debug,
|
|
trace,
|
|
nolock,
|
|
modules_to_run,
|
|
)
|
|
|
|
|
|
def guess_module(modules, *names):
|
|
def guess(n):
|
|
found = None
|
|
for i, _ in enumerate(n):
|
|
cur = [x for x in modules if x.startswith(name[:i + 1])]
|
|
if not cur:
|
|
return found
|
|
found = cur
|
|
return found
|
|
|
|
guessed = list()
|
|
for name in names:
|
|
name = name.lower()
|
|
m = guess(name)
|
|
if m:
|
|
guessed.extend(m)
|
|
return sorted(set(guessed))
|
|
|
|
|
|
def disable():
|
|
if not IS_ATTY:
|
|
safe_print('DISABLE')
|
|
exit(0)
|
|
|
|
|
|
def get_modules_to_run(cmd):
|
|
if not cmd.modules_to_run:
|
|
return AVAILABLE_MODULES
|
|
|
|
modules_to_run, seen = list(), set()
|
|
for m in AVAILABLE_MODULES:
|
|
if m.name not in cmd.modules_to_run or m.name in seen:
|
|
continue
|
|
seen.add(m.name)
|
|
modules_to_run.append(m)
|
|
|
|
return modules_to_run
|
|
|
|
|
|
def main():
|
|
cmd = parse_command_line()
|
|
log = PythonDLogger()
|
|
|
|
if cmd.debug:
|
|
log.logger.severity = 'DEBUG'
|
|
if cmd.trace:
|
|
log.log_traceback = True
|
|
|
|
log.info('using python v{0}'.format(PY_VERSION[0]))
|
|
|
|
if DIRS.locks and not cmd.nolock:
|
|
registry = FileLockRegistry(DIRS.locks)
|
|
else:
|
|
registry = DummyRegistry()
|
|
|
|
unique_avail_module_names = set([m.name for m in AVAILABLE_MODULES])
|
|
unknown = set(cmd.modules_to_run) - unique_avail_module_names
|
|
if unknown:
|
|
log.error('unknown modules : {0}'.format(sorted(list(unknown))))
|
|
guessed = guess_module(unique_avail_module_names, *cmd.modules_to_run)
|
|
if guessed:
|
|
log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1)))
|
|
return
|
|
|
|
p = Plugin(
|
|
get_modules_to_run(cmd),
|
|
cmd.update_every,
|
|
registry,
|
|
)
|
|
|
|
# cheap attempt to reduce chance of python.d job running before go.d
|
|
# TODO: better implementation needed
|
|
if not IS_ATTY:
|
|
time.sleep(1.5)
|
|
|
|
try:
|
|
if not p.setup():
|
|
return
|
|
p.run()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
log.info('exiting from main...')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
disable()
|