mirror of
https://github.com/netdata/netdata.git
synced 2025-05-17 22:52:21 +00:00
postgres fix: detect servers version and use the right query (#4944)
* python.d postgres: use query depending on server version * python.d postgres repslot file query fix * python.d postgres minor * python.d postgres format fix * python.d postgres refactor queries * postgres query repslot files v11 fix * @anayrat patch
This commit is contained in:
parent
67608ce7c4
commit
d0de3356f3
1 changed files with 470 additions and 207 deletions
|
@ -16,12 +16,28 @@ except ImportError:
|
|||
|
||||
from bases.FrameworkServices.SimpleService import SimpleService
|
||||
|
||||
# default module values
|
||||
update_every = 1
|
||||
priority = 60000
|
||||
|
||||
WAL = 'WAL'
|
||||
ARCHIVE = 'ARCHIVE'
|
||||
BACKENDS = 'BACKENDS'
|
||||
TABLE_STATS = 'TABLE_STATS'
|
||||
INDEX_STATS = 'INDEX_STATS'
|
||||
DATABASE = 'DATABASE'
|
||||
BGWRITER = 'BGWRITER'
|
||||
LOCKS = 'LOCKS'
|
||||
DATABASES = 'DATABASES'
|
||||
STANDBY = 'STANDBY'
|
||||
REPLICATION_SLOT = 'REPLICATION_SLOT'
|
||||
STANDBY_DELTA = 'STANDBY_DELTA'
|
||||
REPSLOT_FILES = 'REPSLOT_FILES'
|
||||
IF_SUPERUSER = 'IF_SUPERUSER'
|
||||
SERVER_VERSION = 'SERVER_VERSION'
|
||||
AUTOVACUUM = 'AUTOVACUUM'
|
||||
DIFF_LSN = 'DIFF_LSN'
|
||||
WAL_WRITES = 'WAL_WRITES'
|
||||
|
||||
METRICS = {
|
||||
'DATABASE': [
|
||||
DATABASE: [
|
||||
'connections',
|
||||
'xact_commit',
|
||||
'xact_rollback',
|
||||
|
@ -37,32 +53,32 @@ METRICS = {
|
|||
'temp_bytes',
|
||||
'size'
|
||||
],
|
||||
'BACKENDS': [
|
||||
BACKENDS: [
|
||||
'backends_active',
|
||||
'backends_idle'
|
||||
],
|
||||
'INDEX_STATS': [
|
||||
INDEX_STATS: [
|
||||
'index_count',
|
||||
'index_size'
|
||||
],
|
||||
'TABLE_STATS': [
|
||||
TABLE_STATS: [
|
||||
'table_size',
|
||||
'table_count'
|
||||
],
|
||||
'WAL': [
|
||||
WAL: [
|
||||
'written_wal',
|
||||
'recycled_wal',
|
||||
'total_wal'
|
||||
],
|
||||
'WAL_WRITES': [
|
||||
WAL_WRITES: [
|
||||
'wal_writes'
|
||||
],
|
||||
'ARCHIVE': [
|
||||
ARCHIVE: [
|
||||
'ready_count',
|
||||
'done_count',
|
||||
'file_count'
|
||||
],
|
||||
'BGWRITER': [
|
||||
BGWRITER: [
|
||||
'checkpoint_scheduled',
|
||||
'checkpoint_requested',
|
||||
'buffers_checkpoint',
|
||||
|
@ -72,7 +88,7 @@ METRICS = {
|
|||
'buffers_alloc',
|
||||
'buffers_backend_fsync'
|
||||
],
|
||||
'LOCKS': [
|
||||
LOCKS: [
|
||||
'ExclusiveLock',
|
||||
'RowShareLock',
|
||||
'SIReadLock',
|
||||
|
@ -83,27 +99,34 @@ METRICS = {
|
|||
'ShareLock',
|
||||
'RowExclusiveLock'
|
||||
],
|
||||
'AUTOVACUUM': [
|
||||
AUTOVACUUM: [
|
||||
'analyze',
|
||||
'vacuum_analyze',
|
||||
'vacuum',
|
||||
'vacuum_freeze',
|
||||
'brin_summarize'
|
||||
],
|
||||
'STANDBY_DELTA': [
|
||||
STANDBY_DELTA: [
|
||||
'sent_delta',
|
||||
'write_delta',
|
||||
'flush_delta',
|
||||
'replay_delta'
|
||||
],
|
||||
'REPSLOT_FILES': [
|
||||
REPSLOT_FILES: [
|
||||
'replslot_wal_keep',
|
||||
'replslot_files'
|
||||
]
|
||||
}
|
||||
|
||||
QUERIES = {
|
||||
'WAL': """
|
||||
NO_VERSION = 0
|
||||
DEFAULT = 'DEFAULT'
|
||||
V96 = 'V96'
|
||||
V10 = 'V10'
|
||||
V11 = 'V11'
|
||||
|
||||
|
||||
QUERY_WAL = {
|
||||
DEFAULT: """
|
||||
SELECT
|
||||
count(*) as total_wal,
|
||||
count(*) FILTER (WHERE type = 'recycled') AS recycled_wal,
|
||||
|
@ -111,34 +134,76 @@ SELECT
|
|||
FROM
|
||||
(SELECT
|
||||
wal.name,
|
||||
pg_{0}file_name(
|
||||
pg_walfile_name(
|
||||
CASE pg_is_in_recovery()
|
||||
WHEN true THEN NULL
|
||||
ELSE pg_current_{0}_{1}()
|
||||
ELSE pg_current_wal_lsn()
|
||||
END ),
|
||||
CASE
|
||||
WHEN wal.name > pg_{0}file_name(
|
||||
WHEN wal.name > pg_walfile_name(
|
||||
CASE pg_is_in_recovery()
|
||||
WHEN true THEN NULL
|
||||
ELSE pg_current_{0}_{1}()
|
||||
ELSE pg_current_wal_lsn()
|
||||
END ) THEN 'recycled'
|
||||
ELSE 'written'
|
||||
END AS type
|
||||
FROM pg_catalog.pg_ls_dir('pg_{0}') AS wal(name)
|
||||
FROM pg_catalog.pg_ls_dir('pg_wal') AS wal(name)
|
||||
WHERE name ~ '^[0-9A-F]{{24}}$'
|
||||
ORDER BY
|
||||
(pg_stat_file('pg_{0}/'||name)).modification,
|
||||
(pg_stat_file('pg_wal/'||name)).modification,
|
||||
wal.name DESC) sub;
|
||||
""",
|
||||
'ARCHIVE': """
|
||||
V96: """
|
||||
SELECT
|
||||
count(*) as total_wal,
|
||||
count(*) FILTER (WHERE type = 'recycled') AS recycled_wal,
|
||||
count(*) FILTER (WHERE type = 'written') AS written_wal
|
||||
FROM
|
||||
(SELECT
|
||||
wal.name,
|
||||
pg_xlogfile_name(
|
||||
CASE pg_is_in_recovery()
|
||||
WHEN true THEN NULL
|
||||
ELSE pg_current_xlog_location()
|
||||
END ),
|
||||
CASE
|
||||
WHEN wal.name > pg_xlogfile_name(
|
||||
CASE pg_is_in_recovery()
|
||||
WHEN true THEN NULL
|
||||
ELSE pg_current_xlog_location()
|
||||
END ) THEN 'recycled'
|
||||
ELSE 'written'
|
||||
END AS type
|
||||
FROM pg_catalog.pg_ls_dir('pg_xlog') AS wal(name)
|
||||
WHERE name ~ '^[0-9A-F]{{24}}$'
|
||||
ORDER BY
|
||||
(pg_stat_file('pg_xlog/'||name)).modification,
|
||||
wal.name DESC) sub;
|
||||
""",
|
||||
}
|
||||
|
||||
QUERY_ARCHIVE = {
|
||||
DEFAULT: """
|
||||
SELECT
|
||||
CAST(COUNT(*) AS INT) AS file_count,
|
||||
CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)),0) AS INT) AS ready_count,
|
||||
CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)),0) AS INT) AS done_count
|
||||
FROM
|
||||
pg_catalog.pg_ls_dir('pg_{0}/archive_status') AS archive_files (archive_file);
|
||||
pg_catalog.pg_ls_dir('pg_wal/archive_status') AS archive_files (archive_file);
|
||||
""",
|
||||
'BACKENDS': """
|
||||
V96: """
|
||||
SELECT
|
||||
CAST(COUNT(*) AS INT) AS file_count,
|
||||
CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)),0) AS INT) AS ready_count,
|
||||
CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)),0) AS INT) AS done_count
|
||||
FROM
|
||||
pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);
|
||||
|
||||
""",
|
||||
}
|
||||
|
||||
QUERY_BACKEND = {
|
||||
DEFAULT: """
|
||||
SELECT
|
||||
count(*) - (SELECT count(*)
|
||||
FROM pg_stat_activity
|
||||
|
@ -150,21 +215,30 @@ SELECT
|
|||
AS backends_idle
|
||||
FROM pg_stat_activity;
|
||||
""",
|
||||
'TABLE_STATS': """
|
||||
}
|
||||
|
||||
QUERY_TABLE_STATS = {
|
||||
DEFAULT: """
|
||||
SELECT
|
||||
((sum(relpages) * 8) * 1024) AS table_size,
|
||||
count(1) AS table_count
|
||||
FROM pg_class
|
||||
WHERE relkind IN ('r', 't');
|
||||
""",
|
||||
'INDEX_STATS': """
|
||||
}
|
||||
|
||||
QUERY_INDEX_STATS = {
|
||||
DEFAULT: """
|
||||
SELECT
|
||||
((sum(relpages) * 8) * 1024) AS index_size,
|
||||
count(1) AS index_count
|
||||
FROM pg_class
|
||||
WHERE relkind = 'i';
|
||||
""",
|
||||
'DATABASE': """
|
||||
}
|
||||
|
||||
QUERY_DATABASE = {
|
||||
DEFAULT: """
|
||||
SELECT
|
||||
datname AS database_name,
|
||||
numbackends AS connections,
|
||||
|
@ -184,7 +258,10 @@ SELECT
|
|||
FROM pg_stat_database
|
||||
WHERE datname IN %(databases)s ;
|
||||
""",
|
||||
'BGWRITER': """
|
||||
}
|
||||
|
||||
QUERY_BGWRITER = {
|
||||
DEFAULT: """
|
||||
SELECT
|
||||
checkpoints_timed AS checkpoint_scheduled,
|
||||
checkpoints_req AS checkpoint_requested,
|
||||
|
@ -196,7 +273,10 @@ SELECT
|
|||
buffers_backend_fsync
|
||||
FROM pg_stat_bgwriter;
|
||||
""",
|
||||
'LOCKS': """
|
||||
}
|
||||
|
||||
QUERY_LOCKS = {
|
||||
DEFAULT: """
|
||||
SELECT
|
||||
pg_database.datname as database_name,
|
||||
mode,
|
||||
|
@ -207,7 +287,10 @@ INNER JOIN pg_database
|
|||
GROUP BY datname, mode
|
||||
ORDER BY datname, mode;
|
||||
""",
|
||||
'FIND_DATABASES': """
|
||||
}
|
||||
|
||||
QUERY_DATABASES = {
|
||||
DEFAULT: """
|
||||
SELECT
|
||||
datname
|
||||
FROM pg_stat_database
|
||||
|
@ -216,48 +299,129 @@ WHERE
|
|||
(SELECT current_user), datname, 'connect')
|
||||
AND NOT datname ~* '^template\d ';
|
||||
""",
|
||||
'FIND_STANDBY': """
|
||||
}
|
||||
|
||||
QUERY_STANDBY = {
|
||||
DEFAULT: """
|
||||
SELECT
|
||||
application_name
|
||||
FROM pg_stat_replication
|
||||
WHERE application_name IS NOT NULL
|
||||
GROUP BY application_name;
|
||||
""",
|
||||
'FIND_REPLICATION_SLOT': """
|
||||
}
|
||||
|
||||
QUERY_REPLICATION_SLOT = {
|
||||
DEFAULT: """
|
||||
SELECT slot_name
|
||||
FROM pg_replication_slots;
|
||||
""",
|
||||
'STANDBY_DELTA': """
|
||||
"""
|
||||
}
|
||||
|
||||
QUERY_STANDBY_DELTA = {
|
||||
DEFAULT: """
|
||||
SELECT
|
||||
application_name,
|
||||
pg_{0}_{1}_diff(
|
||||
pg_wal_lsn_diff(
|
||||
CASE pg_is_in_recovery()
|
||||
WHEN true THEN pg_last_{0}_receive_{1}()
|
||||
ELSE pg_current_{0}_{1}()
|
||||
WHEN true THEN pg_last_wal_receive_lsn()
|
||||
ELSE pg_current_wal_lsn()
|
||||
END,
|
||||
sent_{1}) AS sent_delta,
|
||||
pg_{0}_{1}_diff(
|
||||
sent_lsn) AS sent_delta,
|
||||
pg_wal_lsn_diff(
|
||||
CASE pg_is_in_recovery()
|
||||
WHEN true THEN pg_last_{0}_receive_{1}()
|
||||
ELSE pg_current_{0}_{1}()
|
||||
WHEN true THEN pg_last_wal_receive_lsn()
|
||||
ELSE pg_current_wal_lsn()
|
||||
END,
|
||||
write_{1}) AS write_delta,
|
||||
pg_{0}_{1}_diff(
|
||||
write_lsn) AS write_delta,
|
||||
pg_wal_lsn_diff(
|
||||
CASE pg_is_in_recovery()
|
||||
WHEN true THEN pg_last_{0}_receive_{1}()
|
||||
ELSE pg_current_{0}_{1}()
|
||||
WHEN true THEN pg_last_wal_receive_lsn()
|
||||
ELSE pg_current_wal_lsn()
|
||||
END,
|
||||
flush_{1}) AS flush_delta,
|
||||
pg_{0}_{1}_diff(
|
||||
flush_lsn) AS flush_delta,
|
||||
pg_wal_lsn_diff(
|
||||
CASE pg_is_in_recovery()
|
||||
WHEN true THEN pg_last_{0}_receive_{1}()
|
||||
ELSE pg_current_{0}_{1}()
|
||||
WHEN true THEN pg_last_wal_receive_lsn()
|
||||
ELSE pg_current_wal_lsn()
|
||||
END,
|
||||
replay_{1}) AS replay_delta
|
||||
replay_lsn) AS replay_delta
|
||||
FROM pg_stat_replication
|
||||
WHERE application_name IS NOT NULL;
|
||||
""",
|
||||
'REPSLOT_FILES': """
|
||||
V96: """
|
||||
SELECT
|
||||
application_name,
|
||||
pg_xlog_location_diff(
|
||||
CASE pg_is_in_recovery()
|
||||
WHEN true THEN pg_last_xlog_receive_location()
|
||||
ELSE pg_current_xlog_location()
|
||||
END,
|
||||
sent_location) AS sent_delta,
|
||||
pg_xlog_location_diff(
|
||||
CASE pg_is_in_recovery()
|
||||
WHEN true THEN pg_last_xlog_receive_location()
|
||||
ELSE pg_current_xlog_location()
|
||||
END,
|
||||
write_location) AS write_delta,
|
||||
pg_xlog_location_diff(
|
||||
CASE pg_is_in_recovery()
|
||||
WHEN true THEN pg_last_xlog_receive_location()
|
||||
ELSE pg_current_xlog_location()
|
||||
END,
|
||||
flush_location) AS flush_delta,
|
||||
pg_xlog_location_diff(
|
||||
CASE pg_is_in_recovery()
|
||||
WHEN true THEN pg_last_xlog_receive_location()
|
||||
ELSE pg_current_xlog_location()
|
||||
END,
|
||||
replay_location) AS replay_delta
|
||||
FROM pg_stat_replication
|
||||
WHERE application_name IS NOT NULL;
|
||||
""",
|
||||
}
|
||||
|
||||
QUERY_REPSLOT_FILES = {
|
||||
DEFAULT: """
|
||||
WITH wal_size AS (
|
||||
SELECT
|
||||
setting::int AS val
|
||||
FROM pg_settings
|
||||
WHERE name = 'wal_segment_size'
|
||||
)
|
||||
SELECT
|
||||
slot_name,
|
||||
slot_type,
|
||||
replslot_wal_keep,
|
||||
count(slot_file) AS replslot_files
|
||||
FROM
|
||||
(SELECT
|
||||
slot.slot_name,
|
||||
CASE
|
||||
WHEN slot_file <> 'state' THEN 1
|
||||
END AS slot_file ,
|
||||
slot_type,
|
||||
COALESCE (
|
||||
floor(
|
||||
(pg_wal_lsn_diff(pg_current_wal_lsn (),slot.restart_lsn)
|
||||
- (pg_walfile_name_offset (restart_lsn)).file_offset) / (s.val)
|
||||
),0) AS replslot_wal_keep
|
||||
FROM pg_replication_slots slot
|
||||
LEFT JOIN (
|
||||
SELECT
|
||||
slot2.slot_name,
|
||||
pg_ls_dir('pg_replslot/' || slot2.slot_name) AS slot_file
|
||||
FROM pg_replication_slots slot2
|
||||
) files (slot_name, slot_file)
|
||||
ON slot.slot_name = files.slot_name
|
||||
CROSS JOIN wal_size s
|
||||
) AS d
|
||||
GROUP BY
|
||||
slot_name,
|
||||
slot_type,
|
||||
replslot_wal_keep;
|
||||
""",
|
||||
V10: """
|
||||
WITH wal_size AS (
|
||||
SELECT
|
||||
current_setting('wal_block_size')::INT * setting::INT AS val
|
||||
|
@ -296,13 +460,22 @@ GROUP BY
|
|||
slot_type,
|
||||
replslot_wal_keep;
|
||||
""",
|
||||
'IF_SUPERUSER': """
|
||||
}
|
||||
|
||||
QUERY_SUPERUSER = {
|
||||
DEFAULT: """
|
||||
SELECT current_setting('is_superuser') = 'on' AS is_superuser;
|
||||
""",
|
||||
'DETECT_SERVER_VERSION': """
|
||||
}
|
||||
|
||||
QUERY_SHOW_VERSION = {
|
||||
DEFAULT: """
|
||||
SHOW server_version_num;
|
||||
""",
|
||||
'AUTOVACUUM': """
|
||||
}
|
||||
|
||||
QUERY_AUTOVACUUM = {
|
||||
DEFAULT: """
|
||||
SELECT
|
||||
count(*) FILTER (WHERE query LIKE 'autovacuum: ANALYZE%%') AS analyze,
|
||||
count(*) FILTER (WHERE query LIKE 'autovacuum: VACUUM ANALYZE%%') AS vacuum_analyze,
|
||||
|
@ -314,23 +487,78 @@ SELECT
|
|||
FROM pg_stat_activity
|
||||
WHERE query NOT LIKE '%%pg_stat_activity%%';
|
||||
""",
|
||||
'DIFF_LSN': """
|
||||
}
|
||||
|
||||
QUERY_DIFF_LSN = {
|
||||
DEFAULT: """
|
||||
SELECT
|
||||
pg_{0}_{1}_diff(
|
||||
pg_wal_lsn_diff(
|
||||
CASE pg_is_in_recovery()
|
||||
WHEN true THEN pg_last_{0}_receive_{1}()
|
||||
ELSE pg_current_{0}_{1}()
|
||||
WHEN true THEN pg_last_wal_receive_lsn()
|
||||
ELSE pg_current_wal_lsn()
|
||||
END,
|
||||
'0/0') as wal_writes ;
|
||||
"""
|
||||
""",
|
||||
V96: """
|
||||
SELECT
|
||||
pg_xlog_location_diff(
|
||||
CASE pg_is_in_recovery()
|
||||
WHEN true THEN pg_last_xlog_receive_location()
|
||||
ELSE pg_current_xlog_location()
|
||||
END,
|
||||
'0/0') as wal_writes ;
|
||||
""",
|
||||
}
|
||||
|
||||
|
||||
QUERY_STATS = {
|
||||
QUERIES['DATABASE']: METRICS['DATABASE'],
|
||||
QUERIES['BACKENDS']: METRICS['BACKENDS'],
|
||||
QUERIES['LOCKS']: METRICS['LOCKS']
|
||||
}
|
||||
def query_factory(name, version=NO_VERSION):
|
||||
if name == BACKENDS:
|
||||
return QUERY_BACKEND[DEFAULT]
|
||||
elif name == TABLE_STATS:
|
||||
return QUERY_TABLE_STATS[DEFAULT]
|
||||
elif name == INDEX_STATS:
|
||||
return QUERY_INDEX_STATS[DEFAULT]
|
||||
elif name == DATABASE:
|
||||
return QUERY_DATABASE[DEFAULT]
|
||||
elif name == BGWRITER:
|
||||
return QUERY_BGWRITER[DEFAULT]
|
||||
elif name == LOCKS:
|
||||
return QUERY_LOCKS[DEFAULT]
|
||||
elif name == DATABASES:
|
||||
return QUERY_DATABASES[DEFAULT]
|
||||
elif name == STANDBY:
|
||||
return QUERY_STANDBY[DEFAULT]
|
||||
elif name == REPLICATION_SLOT:
|
||||
return QUERY_REPLICATION_SLOT[DEFAULT]
|
||||
elif name == IF_SUPERUSER:
|
||||
return QUERY_SUPERUSER[DEFAULT]
|
||||
elif name == SERVER_VERSION:
|
||||
return QUERY_SHOW_VERSION[DEFAULT]
|
||||
elif name == AUTOVACUUM:
|
||||
return QUERY_AUTOVACUUM[DEFAULT]
|
||||
elif name == WAL:
|
||||
if version < 100000:
|
||||
return QUERY_WAL[V96]
|
||||
return QUERY_WAL[DEFAULT]
|
||||
elif name == ARCHIVE:
|
||||
if version < 100000:
|
||||
return QUERY_ARCHIVE[V96]
|
||||
return QUERY_ARCHIVE[DEFAULT]
|
||||
elif name == STANDBY_DELTA:
|
||||
if version < 100000:
|
||||
return QUERY_STANDBY_DELTA[V96]
|
||||
return QUERY_STANDBY_DELTA[DEFAULT]
|
||||
elif name == REPSLOT_FILES:
|
||||
if version < 110000:
|
||||
return QUERY_REPSLOT_FILES[V10]
|
||||
return QUERY_REPSLOT_FILES[DEFAULT]
|
||||
elif name == DIFF_LSN:
|
||||
if version < 100000:
|
||||
return QUERY_DIFF_LSN[V96]
|
||||
return QUERY_DIFF_LSN[DEFAULT]
|
||||
|
||||
raise ValueError('unknown query')
|
||||
|
||||
|
||||
ORDER = [
|
||||
'db_stat_temp_files',
|
||||
|
@ -553,151 +781,112 @@ CHARTS = {
|
|||
class Service(SimpleService):
|
||||
def __init__(self, configuration=None, name=None):
|
||||
SimpleService.__init__(self, configuration=configuration, name=name)
|
||||
self.order = ORDER[:]
|
||||
self.order = list(ORDER)
|
||||
self.definitions = deepcopy(CHARTS)
|
||||
self.table_stats = configuration.pop('table_stats', False)
|
||||
self.index_stats = configuration.pop('index_stats', False)
|
||||
self.database_poll = configuration.pop('database_poll', None)
|
||||
|
||||
self.do_table_stats = configuration.pop('table_stats', False)
|
||||
self.do_index_stats = configuration.pop('index_stats', False)
|
||||
self.databases_to_poll = configuration.pop('database_poll', None)
|
||||
self.configuration = configuration
|
||||
self.connection = False
|
||||
|
||||
self.conn = None
|
||||
self.server_version = None
|
||||
self.data = dict()
|
||||
self.locks_zeroed = dict()
|
||||
self.is_superuser = False
|
||||
self.alive = False
|
||||
|
||||
self.databases = list()
|
||||
self.secondaries = list()
|
||||
self.replication_slots = list()
|
||||
self.queries = QUERY_STATS.copy()
|
||||
|
||||
def _connect(self):
|
||||
params = dict(user='postgres',
|
||||
database=None,
|
||||
password=None,
|
||||
host=None,
|
||||
port=5432)
|
||||
params.update(self.configuration)
|
||||
self.queries = dict()
|
||||
|
||||
if not self.connection:
|
||||
try:
|
||||
self.connection = psycopg2.connect(**params)
|
||||
self.connection.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
self.connection.set_session(readonly=True)
|
||||
except OperationalError as error:
|
||||
return False, str(error)
|
||||
return True, True
|
||||
self.data = dict()
|
||||
|
||||
def reconnect(self):
|
||||
return self.connect()
|
||||
|
||||
def connect(self):
|
||||
if self.conn:
|
||||
self.conn.close()
|
||||
self.conn = None
|
||||
|
||||
try:
|
||||
params = dict(user='postgres',
|
||||
database=None,
|
||||
password=None,
|
||||
host=None,
|
||||
port=5432)
|
||||
params.update(self.configuration)
|
||||
|
||||
self.conn = psycopg2.connect(**params)
|
||||
self.conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
self.conn.set_session(readonly=True)
|
||||
except OperationalError as error:
|
||||
self.error(error)
|
||||
self.alive = False
|
||||
else:
|
||||
self.alive = True
|
||||
|
||||
return self.alive
|
||||
|
||||
def check(self):
|
||||
if not PSYCOPG2:
|
||||
self.error('\'python-psycopg2\' module is needed to use postgres.chart.py')
|
||||
self.error("'python-psycopg2' package is needed to use postgres module")
|
||||
return False
|
||||
result, error = self._connect()
|
||||
if not result:
|
||||
conf = dict((k, (lambda k, v: v if k != 'password' else '*****')(k, v))
|
||||
for k, v in self.configuration.items())
|
||||
self.error('Failed to connect to %s. Error: %s' % (str(conf), error))
|
||||
|
||||
if not self.connect():
|
||||
self.error('failed to connect to {0}'.format(hide_password(self.configuration)))
|
||||
return False
|
||||
|
||||
try:
|
||||
cursor = self.connection.cursor()
|
||||
self.databases = discover_databases_(cursor, QUERIES['FIND_DATABASES'])
|
||||
is_superuser = check_if_superuser_(cursor, QUERIES['IF_SUPERUSER'])
|
||||
self.secondaries = discover_secondaries_(cursor, QUERIES['FIND_STANDBY'])
|
||||
self.server_version = detect_server_version(cursor, QUERIES['DETECT_SERVER_VERSION'])
|
||||
if self.server_version >= 94000:
|
||||
self.replication_slots = discover_replication_slots_(cursor, QUERIES['FIND_REPLICATION_SLOT'])
|
||||
cursor.close()
|
||||
|
||||
if self.database_poll and isinstance(self.database_poll, str):
|
||||
self.databases = [dbase for dbase in self.databases if dbase in self.database_poll.split()] \
|
||||
or self.databases
|
||||
|
||||
self.locks_zeroed = populate_lock_types(self.databases)
|
||||
self.add_additional_queries_(is_superuser)
|
||||
self.create_dynamic_charts_()
|
||||
return True
|
||||
self.check_queries()
|
||||
except Exception as error:
|
||||
self.error(str(error))
|
||||
self.error(error)
|
||||
return False
|
||||
|
||||
def add_additional_queries_(self, is_superuser):
|
||||
self.populate_queries()
|
||||
self.create_dynamic_charts()
|
||||
|
||||
if self.server_version >= 100000:
|
||||
wal = 'wal'
|
||||
lsn = 'lsn'
|
||||
else:
|
||||
wal = 'xlog'
|
||||
lsn = 'location'
|
||||
self.queries[QUERIES['BGWRITER']] = METRICS['BGWRITER']
|
||||
self.queries[QUERIES['DIFF_LSN'].format(wal, lsn)] = METRICS['WAL_WRITES']
|
||||
self.queries[QUERIES['STANDBY_DELTA'].format(wal, lsn)] = METRICS['STANDBY_DELTA']
|
||||
return True
|
||||
|
||||
if self.index_stats:
|
||||
self.queries[QUERIES['INDEX_STATS']] = METRICS['INDEX_STATS']
|
||||
if self.table_stats:
|
||||
self.queries[QUERIES['TABLE_STATS']] = METRICS['TABLE_STATS']
|
||||
if is_superuser:
|
||||
self.queries[QUERIES['ARCHIVE'].format(wal)] = METRICS['ARCHIVE']
|
||||
if self.server_version >= 90400:
|
||||
self.queries[QUERIES['WAL'].format(wal, lsn)] = METRICS['WAL']
|
||||
if self.server_version >= 100000:
|
||||
self.queries[QUERIES['REPSLOT_FILES']] = METRICS['REPSLOT_FILES']
|
||||
if self.server_version >= 90400:
|
||||
self.queries[QUERIES['AUTOVACUUM']] = METRICS['AUTOVACUUM']
|
||||
|
||||
def create_dynamic_charts_(self):
|
||||
|
||||
for database_name in self.databases[::-1]:
|
||||
self.definitions['database_size']['lines'].append(
|
||||
[database_name + '_size', database_name, 'absolute', 1, 1024 * 1024])
|
||||
for chart_name in [name for name in self.order if name.startswith('db_stat')]:
|
||||
add_database_stat_chart_(order=self.order, definitions=self.definitions,
|
||||
name=chart_name, database_name=database_name)
|
||||
|
||||
add_database_lock_chart_(order=self.order, definitions=self.definitions, database_name=database_name)
|
||||
|
||||
for application_name in self.secondaries[::-1]:
|
||||
add_replication_delta_chart_(
|
||||
order=self.order,
|
||||
definitions=self.definitions,
|
||||
name='standby_delta',
|
||||
application_name=application_name)
|
||||
|
||||
for slot_name in self.replication_slots[::-1]:
|
||||
add_replication_slot_chart_(
|
||||
order=self.order,
|
||||
definitions=self.definitions,
|
||||
name='replication_slot',
|
||||
slot_name=slot_name)
|
||||
|
||||
def _get_data(self):
|
||||
result, _ = self._connect()
|
||||
if result:
|
||||
cursor = self.connection.cursor(cursor_factory=DictCursor)
|
||||
try:
|
||||
self.data.update(self.locks_zeroed)
|
||||
for query, metrics in self.queries.items():
|
||||
self.query_stats_(cursor, query, metrics)
|
||||
|
||||
except OperationalError:
|
||||
self.connection = False
|
||||
cursor.close()
|
||||
return None
|
||||
else:
|
||||
cursor.close()
|
||||
return self.data
|
||||
else:
|
||||
def get_data(self):
|
||||
if not self.alive and not self.reconnect():
|
||||
return None
|
||||
|
||||
def query_stats_(self, cursor, query, metrics):
|
||||
try:
|
||||
cursor = self.conn.cursor(cursor_factory=DictCursor)
|
||||
|
||||
self.data.update(zero_lock_types(self.databases))
|
||||
|
||||
for query, metrics in self.queries.items():
|
||||
self.query_stats(cursor, query, metrics)
|
||||
|
||||
except OperationalError:
|
||||
self.alive = False
|
||||
return None
|
||||
|
||||
cursor.close()
|
||||
|
||||
return self.data
|
||||
|
||||
def query_stats(self, cursor, query, metrics):
|
||||
cursor.execute(query, dict(databases=tuple(self.databases)))
|
||||
|
||||
for row in cursor:
|
||||
for metric in metrics:
|
||||
# databases
|
||||
if 'database_name' in row:
|
||||
dimension_id = '_'.join([row['database_name'], metric])
|
||||
# secondaries
|
||||
elif 'application_name' in row:
|
||||
dimension_id = '_'.join([row['application_name'], metric])
|
||||
# replication slots
|
||||
elif 'slot_name' in row:
|
||||
dimension_id = '_'.join([row['slot_name'], metric])
|
||||
# other
|
||||
else:
|
||||
dimension_id = metric
|
||||
|
||||
if metric in row:
|
||||
if row[metric] is not None:
|
||||
self.data[dimension_id] = int(row[metric])
|
||||
|
@ -705,35 +894,105 @@ class Service(SimpleService):
|
|||
if metric == row['mode']:
|
||||
self.data[dimension_id] = row['locks_count']
|
||||
|
||||
def check_queries(self):
|
||||
cursor = self.conn.cursor()
|
||||
|
||||
def discover_databases_(cursor, query):
|
||||
self.server_version = detect_server_version(cursor, query_factory(SERVER_VERSION))
|
||||
self.debug('server version: {0}'.format(self.server_version))
|
||||
|
||||
self.is_superuser = check_if_superuser(cursor, query_factory(IF_SUPERUSER))
|
||||
self.debug('superuser: {0}'.format(self.is_superuser))
|
||||
|
||||
self.databases = discover(cursor, query_factory(DATABASES))
|
||||
self.debug('discovered databases {0}'.format(self.databases))
|
||||
if self.databases_to_poll:
|
||||
to_poll = self.databases_to_poll.split()
|
||||
self.databases = [db for db in self.databases if db in to_poll] or self.databases
|
||||
|
||||
self.secondaries = discover(cursor, query_factory(STANDBY))
|
||||
self.debug('discovered secondaries: {0}'.format(self.secondaries))
|
||||
|
||||
if self.server_version >= 94000:
|
||||
self.replication_slots = discover(cursor, query_factory(REPLICATION_SLOT))
|
||||
self.debug('discovered replication slots: {0}'.format(self.replication_slots))
|
||||
|
||||
cursor.close()
|
||||
|
||||
def populate_queries(self):
|
||||
self.queries[query_factory(DATABASE)] = METRICS[DATABASE]
|
||||
self.queries[query_factory(BACKENDS)] = METRICS[BACKENDS]
|
||||
self.queries[query_factory(LOCKS)] = METRICS[LOCKS]
|
||||
self.queries[query_factory(BGWRITER)] = METRICS[BGWRITER]
|
||||
self.queries[query_factory(DIFF_LSN, self.server_version)] = METRICS[WAL_WRITES]
|
||||
self.queries[query_factory(STANDBY_DELTA, self.server_version)] = METRICS[STANDBY_DELTA]
|
||||
|
||||
if self.do_index_stats:
|
||||
self.queries[query_factory(INDEX_STATS)] = METRICS[INDEX_STATS]
|
||||
if self.do_table_stats:
|
||||
self.queries[query_factory(TABLE_STATS)] = METRICS[TABLE_STATS]
|
||||
|
||||
if self.is_superuser:
|
||||
self.queries[query_factory(ARCHIVE, self.server_version)] = METRICS[ARCHIVE]
|
||||
|
||||
if self.server_version >= 90400:
|
||||
self.queries[query_factory(WAL, self.server_version)] = METRICS[WAL]
|
||||
|
||||
if self.server_version >= 100000:
|
||||
self.queries[query_factory(REPSLOT_FILES, self.server_version)] = METRICS[REPSLOT_FILES]
|
||||
|
||||
if self.server_version >= 90400:
|
||||
self.queries[query_factory(AUTOVACUUM)] = METRICS[AUTOVACUUM]
|
||||
|
||||
def create_dynamic_charts(self):
|
||||
for database_name in self.databases[::-1]:
|
||||
dim = [
|
||||
database_name + '_size',
|
||||
database_name,
|
||||
'absolute',
|
||||
1,
|
||||
1024 * 1024,
|
||||
]
|
||||
self.definitions['database_size']['lines'].append(dim)
|
||||
for chart_name in [name for name in self.order if name.startswith('db_stat')]:
|
||||
add_database_stat_chart(
|
||||
order=self.order,
|
||||
definitions=self.definitions,
|
||||
name=chart_name,
|
||||
database_name=database_name,
|
||||
)
|
||||
add_database_lock_chart(
|
||||
order=self.order,
|
||||
definitions=self.definitions,
|
||||
database_name=database_name,
|
||||
)
|
||||
|
||||
for application_name in self.secondaries[::-1]:
|
||||
add_replication_delta_chart(
|
||||
order=self.order,
|
||||
definitions=self.definitions,
|
||||
name='standby_delta',
|
||||
application_name=application_name,
|
||||
)
|
||||
|
||||
for slot_name in self.replication_slots[::-1]:
|
||||
add_replication_slot_chart(
|
||||
order=self.order,
|
||||
definitions=self.definitions,
|
||||
name='replication_slot',
|
||||
slot_name=slot_name,
|
||||
)
|
||||
|
||||
|
||||
def discover(cursor, query):
|
||||
cursor.execute(query)
|
||||
result = list()
|
||||
for db in [database[0] for database in cursor]:
|
||||
if db not in result:
|
||||
result.append(db)
|
||||
for v in [value[0] for value in cursor]:
|
||||
if v not in result:
|
||||
result.append(v)
|
||||
return result
|
||||
|
||||
|
||||
def discover_secondaries_(cursor, query):
|
||||
cursor.execute(query)
|
||||
result = list()
|
||||
for sc in [standby[0] for standby in cursor]:
|
||||
if sc not in result:
|
||||
result.append(sc)
|
||||
return result
|
||||
|
||||
|
||||
def discover_replication_slots_(cursor, query):
|
||||
cursor.execute(query)
|
||||
result = list()
|
||||
for slot in [replication_slot[0] for replication_slot in cursor]:
|
||||
if slot not in result:
|
||||
result.append(slot)
|
||||
return result
|
||||
|
||||
|
||||
def check_if_superuser_(cursor, query):
|
||||
def check_if_superuser(cursor, query):
|
||||
cursor.execute(query)
|
||||
return cursor.fetchone()[0]
|
||||
|
||||
|
@ -743,7 +1002,7 @@ def detect_server_version(cursor, query):
|
|||
return int(cursor.fetchone()[0])
|
||||
|
||||
|
||||
def populate_lock_types(databases):
|
||||
def zero_lock_types(databases):
|
||||
result = dict()
|
||||
for database in databases:
|
||||
for lock_type in METRICS['LOCKS']:
|
||||
|
@ -753,7 +1012,11 @@ def populate_lock_types(databases):
|
|||
return result
|
||||
|
||||
|
||||
def add_database_lock_chart_(order, definitions, database_name):
|
||||
def hide_password(config):
|
||||
return dict((k, v if k != 'password' else '*****') for k, v in config.items())
|
||||
|
||||
|
||||
def add_database_lock_chart(order, definitions, database_name):
|
||||
def create_lines(database):
|
||||
result = list()
|
||||
for lock_type in METRICS['LOCKS']:
|
||||
|
@ -770,7 +1033,7 @@ def add_database_lock_chart_(order, definitions, database_name):
|
|||
}
|
||||
|
||||
|
||||
def add_database_stat_chart_(order, definitions, name, database_name):
|
||||
def add_database_stat_chart(order, definitions, name, database_name):
|
||||
def create_lines(database, lines):
|
||||
result = list()
|
||||
for line in lines:
|
||||
|
@ -787,7 +1050,7 @@ def add_database_stat_chart_(order, definitions, name, database_name):
|
|||
'lines': create_lines(database_name, chart_template['lines'])}
|
||||
|
||||
|
||||
def add_replication_delta_chart_(order, definitions, name, application_name):
|
||||
def add_replication_delta_chart(order, definitions, name, application_name):
|
||||
def create_lines(standby, lines):
|
||||
result = list()
|
||||
for line in lines:
|
||||
|
@ -805,7 +1068,7 @@ def add_replication_delta_chart_(order, definitions, name, application_name):
|
|||
'lines': create_lines(application_name, chart_template['lines'])}
|
||||
|
||||
|
||||
def add_replication_slot_chart_(order, definitions, name, slot_name):
|
||||
def add_replication_slot_chart(order, definitions, name, slot_name):
|
||||
def create_lines(slot, lines):
|
||||
result = list()
|
||||
for line in lines:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue