# -*- coding: utf-8 -*-
# Description: example netdata python.d module
# Authors: facetoe, dangtranhoang
# SPDX-License-Identifier: GPL-3.0-or-later

from copy import deepcopy

try:
    import psycopg2
    from psycopg2 import extensions
    from psycopg2.extras import DictCursor
    from psycopg2 import OperationalError
    PSYCOPG2 = True
except ImportError:
    PSYCOPG2 = False

from bases.FrameworkServices.SimpleService import SimpleService


DEFAULT_PORT = 5432
DEFAULT_USER = 'postgres'
DEFAULT_CONNECT_TIMEOUT = 2       # seconds
DEFAULT_STATEMENT_TIMEOUT = 5000  # ms


CONN_PARAM_DSN = 'dsn'
CONN_PARAM_HOST = 'host'
CONN_PARAM_PORT = 'port'
CONN_PARAM_DATABASE = 'database'
CONN_PARAM_USER = 'user'
CONN_PARAM_PASSWORD = 'password'
CONN_PARAM_CONN_TIMEOUT = 'connect_timeout'
CONN_PARAM_STATEMENT_TIMEOUT = 'statement_timeout'
CONN_PARAM_SSL_MODE = 'sslmode'
CONN_PARAM_SSL_ROOT_CERT = 'sslrootcert'
CONN_PARAM_SSL_CRL = 'sslcrl'
CONN_PARAM_SSL_CERT = 'sslcert'
CONN_PARAM_SSL_KEY = 'sslkey'


QUERY_NAME_WAL = 'WAL'
QUERY_NAME_ARCHIVE = 'ARCHIVE'
QUERY_NAME_BACKENDS = 'BACKENDS'
QUERY_NAME_TABLE_STATS = 'TABLE_STATS'
QUERY_NAME_INDEX_STATS = 'INDEX_STATS'
QUERY_NAME_DATABASE = 'DATABASE'
QUERY_NAME_BGWRITER = 'BGWRITER'
QUERY_NAME_LOCKS = 'LOCKS'
QUERY_NAME_DATABASES = 'DATABASES'
QUERY_NAME_STANDBY = 'STANDBY'
QUERY_NAME_REPLICATION_SLOT = 'REPLICATION_SLOT'
QUERY_NAME_STANDBY_DELTA = 'STANDBY_DELTA'
QUERY_NAME_REPSLOT_FILES = 'REPSLOT_FILES'
QUERY_NAME_IF_SUPERUSER = 'IF_SUPERUSER'
QUERY_NAME_SERVER_VERSION = 'SERVER_VERSION'
QUERY_NAME_AUTOVACUUM = 'AUTOVACUUM'
QUERY_NAME_DIFF_LSN = 'DIFF_LSN'
QUERY_NAME_WAL_WRITES = 'WAL_WRITES'

METRICS = {
    QUERY_NAME_DATABASE: [
        'connections',
        'xact_commit',
        'xact_rollback',
        'blks_read',
        'blks_hit',
        'tup_returned',
        'tup_fetched',
        'tup_inserted',
        'tup_updated',
        'tup_deleted',
        'conflicts',
        'temp_files',
        'temp_bytes',
        'size'
    ],
    QUERY_NAME_BACKENDS: [
        'backends_active',
        'backends_idle'
    ],
    QUERY_NAME_INDEX_STATS: [
        'index_count',
        'index_size'
    ],
    QUERY_NAME_TABLE_STATS: [
        'table_size',
        'table_count'
    ],
    QUERY_NAME_WAL: [
        'written_wal',
        'recycled_wal',
        'total_wal'
    ],
    QUERY_NAME_WAL_WRITES: [
        'wal_writes'
    ],
    QUERY_NAME_ARCHIVE: [
        'ready_count',
        'done_count',
        'file_count'
    ],
    QUERY_NAME_BGWRITER: [
        'checkpoint_scheduled',
        'checkpoint_requested',
        'buffers_checkpoint',
        'buffers_clean',
        'maxwritten_clean',
        'buffers_backend',
        'buffers_alloc',
        'buffers_backend_fsync'
    ],
    QUERY_NAME_LOCKS: [
        'ExclusiveLock',
        'RowShareLock',
        'SIReadLock',
        'ShareUpdateExclusiveLock',
        'AccessExclusiveLock',
        'AccessShareLock',
        'ShareRowExclusiveLock',
        'ShareLock',
        'RowExclusiveLock'
    ],
    QUERY_NAME_AUTOVACUUM: [
        'analyze',
        'vacuum_analyze',
        'vacuum',
        'vacuum_freeze',
        'brin_summarize'
    ],
    QUERY_NAME_STANDBY_DELTA: [
        'sent_delta',
        'write_delta',
        'flush_delta',
        'replay_delta'
    ],
    QUERY_NAME_REPSLOT_FILES: [
        'replslot_wal_keep',
        'replslot_files'
    ]
}

NO_VERSION = 0
DEFAULT = 'DEFAULT'
V96 = 'V96'
V10 = 'V10'
V11 = 'V11'


QUERY_WAL = {
    DEFAULT: """
SELECT
    count(*) as total_wal,
    count(*) FILTER (WHERE type = 'recycled') AS recycled_wal,
    count(*) FILTER (WHERE type = 'written') AS written_wal
FROM
    (SELECT
        wal.name,
        pg_walfile_name(
          CASE pg_is_in_recovery()
            WHEN true THEN NULL
            ELSE pg_current_wal_lsn()
          END ),
        CASE
          WHEN wal.name > pg_walfile_name(
            CASE pg_is_in_recovery()
              WHEN true THEN NULL
              ELSE pg_current_wal_lsn()
            END ) THEN 'recycled'
          ELSE 'written'
        END AS type
    FROM pg_catalog.pg_ls_dir('pg_wal') AS wal(name)
    WHERE name ~ '^[0-9A-F]{24}$'
    ORDER BY
        (pg_stat_file('pg_wal/'||name)).modification,
        wal.name DESC) sub;
""",
    V96: """
SELECT
    count(*) as total_wal,
    count(*) FILTER (WHERE type = 'recycled') AS recycled_wal,
    count(*) FILTER (WHERE type = 'written') AS written_wal
FROM
    (SELECT
        wal.name,
        pg_xlogfile_name(
          CASE pg_is_in_recovery()
            WHEN true THEN NULL
            ELSE pg_current_xlog_location()
          END ),
        CASE
          WHEN wal.name > pg_xlogfile_name(
            CASE pg_is_in_recovery()
              WHEN true THEN NULL
              ELSE pg_current_xlog_location()
            END ) THEN 'recycled'
          ELSE 'written'
        END AS type
    FROM pg_catalog.pg_ls_dir('pg_xlog') AS wal(name)
    WHERE name ~ '^[0-9A-F]{24}$'
    ORDER BY
        (pg_stat_file('pg_xlog/'||name)).modification,
        wal.name DESC) sub;
""",
}

QUERY_ARCHIVE = {
    DEFAULT: """
SELECT
    CAST(COUNT(*) AS INT) AS file_count,
    CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)),0) AS INT) AS ready_count,
    CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)),0) AS INT) AS done_count
FROM
    pg_catalog.pg_ls_dir('pg_wal/archive_status') AS archive_files (archive_file);
""",
    V96: """
SELECT
    CAST(COUNT(*) AS INT) AS file_count,
    CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.ready$$r$ as INT)),0) AS INT) AS ready_count,
    CAST(COALESCE(SUM(CAST(archive_file ~ $r$\.done$$r$ AS INT)),0) AS INT) AS done_count
FROM
    pg_catalog.pg_ls_dir('pg_xlog/archive_status') AS archive_files (archive_file);

""",
}

QUERY_BACKEND = {
    DEFAULT: """
SELECT
    count(*) - (SELECT  count(*)
                FROM pg_stat_activity
                WHERE state = 'idle')
      AS backends_active,
    (SELECT count(*)
     FROM pg_stat_activity
     WHERE state = 'idle')
      AS backends_idle
FROM pg_stat_activity;
""",
}

QUERY_TABLE_STATS = {
    DEFAULT: """
SELECT
    ((sum(relpages) * 8) * 1024) AS table_size,
    count(1)                     AS table_count
FROM pg_class
WHERE relkind IN ('r', 't');
""",
}

QUERY_INDEX_STATS = {
    DEFAULT: """
SELECT
    ((sum(relpages) * 8) * 1024) AS index_size,
    count(1)                     AS index_count
FROM pg_class
WHERE relkind = 'i';
""",
}

QUERY_DATABASE = {
    DEFAULT: """
SELECT
    datname AS database_name,
    numbackends AS connections,
    xact_commit AS xact_commit,
    xact_rollback AS xact_rollback,
    blks_read AS blks_read,
    blks_hit AS blks_hit,
    tup_returned AS tup_returned,
    tup_fetched AS tup_fetched,
    tup_inserted AS tup_inserted,
    tup_updated AS tup_updated,
    tup_deleted AS tup_deleted,
    conflicts AS conflicts,
    pg_database_size(datname) AS size,
    temp_files AS temp_files,
    temp_bytes AS temp_bytes
FROM pg_stat_database
WHERE datname IN %(databases)s ;
""",
}

QUERY_BGWRITER = {
    DEFAULT: """
SELECT
    checkpoints_timed AS checkpoint_scheduled,
    checkpoints_req AS checkpoint_requested,
    buffers_checkpoint * current_setting('block_size')::numeric buffers_checkpoint,
    buffers_clean * current_setting('block_size')::numeric buffers_clean,
    maxwritten_clean,
    buffers_backend * current_setting('block_size')::numeric buffers_backend,
    buffers_alloc * current_setting('block_size')::numeric buffers_alloc,
    buffers_backend_fsync
FROM pg_stat_bgwriter;
""",
}

QUERY_LOCKS = {
    DEFAULT: """
SELECT
    pg_database.datname as database_name,
    mode,
    count(mode) AS locks_count
FROM pg_locks
INNER JOIN pg_database
    ON pg_database.oid = pg_locks.database
GROUP BY datname, mode
ORDER BY datname, mode;
""",
}

QUERY_DATABASES = {
    DEFAULT: """
SELECT
    datname
FROM pg_stat_database
WHERE
    has_database_privilege(
      (SELECT current_user), datname, 'connect')
    AND NOT datname ~* '^template\d ';
""",
}

QUERY_STANDBY = {
    DEFAULT: """
SELECT
    application_name
FROM pg_stat_replication
WHERE application_name IS NOT NULL
GROUP BY application_name;
""",
}

QUERY_REPLICATION_SLOT = {
    DEFAULT: """
SELECT slot_name
FROM pg_replication_slots;
"""
}

QUERY_STANDBY_DELTA = {
    DEFAULT: """
SELECT
    application_name,
    pg_wal_lsn_diff(
      CASE pg_is_in_recovery()
        WHEN true THEN pg_last_wal_receive_lsn()
        ELSE pg_current_wal_lsn()
      END,
    sent_lsn) AS sent_delta,
    pg_wal_lsn_diff(
      CASE pg_is_in_recovery()
        WHEN true THEN pg_last_wal_receive_lsn()
        ELSE pg_current_wal_lsn()
      END,
    write_lsn) AS write_delta,
    pg_wal_lsn_diff(
      CASE pg_is_in_recovery()
        WHEN true THEN pg_last_wal_receive_lsn()
        ELSE pg_current_wal_lsn()
      END,
    flush_lsn) AS flush_delta,
    pg_wal_lsn_diff(
      CASE pg_is_in_recovery()
        WHEN true THEN pg_last_wal_receive_lsn()
        ELSE pg_current_wal_lsn()
      END,
    replay_lsn) AS replay_delta
FROM pg_stat_replication
WHERE application_name IS NOT NULL;
""",
    V96: """
SELECT
    application_name,
    pg_xlog_location_diff(
      CASE pg_is_in_recovery()
        WHEN true THEN pg_last_xlog_receive_location()
        ELSE pg_current_xlog_location()
      END,
    sent_location) AS sent_delta,
    pg_xlog_location_diff(
      CASE pg_is_in_recovery()
        WHEN true THEN pg_last_xlog_receive_location()
        ELSE pg_current_xlog_location()
      END,
    write_location) AS write_delta,
    pg_xlog_location_diff(
      CASE pg_is_in_recovery()
        WHEN true THEN pg_last_xlog_receive_location()
        ELSE pg_current_xlog_location()
      END,
    flush_location) AS flush_delta,
    pg_xlog_location_diff(
      CASE pg_is_in_recovery()
        WHEN true THEN pg_last_xlog_receive_location()
        ELSE pg_current_xlog_location()
      END,
    replay_location) AS replay_delta
FROM pg_stat_replication
WHERE application_name IS NOT NULL;
""",
}

QUERY_REPSLOT_FILES = {
    DEFAULT: """
WITH wal_size AS (
  SELECT
    setting::int AS val
  FROM pg_settings
  WHERE name = 'wal_segment_size'
  )
SELECT
    slot_name,
    slot_type,
    replslot_wal_keep,
    count(slot_file) AS replslot_files
FROM
    (SELECT
        slot.slot_name,
        CASE
            WHEN slot_file <> 'state' THEN 1
        END AS slot_file ,
        slot_type,
        COALESCE (
          floor(
            (pg_wal_lsn_diff(pg_current_wal_lsn (),slot.restart_lsn)
             - (pg_walfile_name_offset (restart_lsn)).file_offset) / (s.val)
          ),0) AS replslot_wal_keep
    FROM pg_replication_slots slot
    LEFT JOIN (
        SELECT
            slot2.slot_name,
            pg_ls_dir('pg_replslot/' || slot2.slot_name) AS slot_file
        FROM pg_replication_slots slot2
        ) files (slot_name, slot_file)
        ON slot.slot_name = files.slot_name
    CROSS JOIN wal_size s
    ) AS d
GROUP BY
    slot_name,
    slot_type,
    replslot_wal_keep;
""",
    V10: """
WITH wal_size AS (
  SELECT
    current_setting('wal_block_size')::INT * setting::INT AS val
  FROM pg_settings
  WHERE name = 'wal_segment_size'
  )
SELECT
    slot_name,
    slot_type,
    replslot_wal_keep,
    count(slot_file) AS replslot_files
FROM
    (SELECT
        slot.slot_name,
        CASE
            WHEN slot_file <> 'state' THEN 1
        END AS slot_file ,
        slot_type,
        COALESCE (
          floor(
            (pg_wal_lsn_diff(pg_current_wal_lsn (),slot.restart_lsn)
             - (pg_walfile_name_offset (restart_lsn)).file_offset) / (s.val)
          ),0) AS replslot_wal_keep
    FROM pg_replication_slots slot
    LEFT JOIN (
        SELECT
            slot2.slot_name,
            pg_ls_dir('pg_replslot/' || slot2.slot_name) AS slot_file
        FROM pg_replication_slots slot2
        ) files (slot_name, slot_file)
        ON slot.slot_name = files.slot_name
    CROSS JOIN wal_size s
    ) AS d
GROUP BY
    slot_name,
    slot_type,
    replslot_wal_keep;
""",
}

QUERY_SUPERUSER = {
    DEFAULT: """
SELECT current_setting('is_superuser') = 'on' AS is_superuser;
""",
}

QUERY_SHOW_VERSION = {
    DEFAULT: """
SHOW server_version_num;
""",
}

QUERY_AUTOVACUUM = {
    DEFAULT: """
SELECT
    count(*) FILTER (WHERE query LIKE 'autovacuum: ANALYZE%%') AS analyze,
    count(*) FILTER (WHERE query LIKE 'autovacuum: VACUUM ANALYZE%%') AS vacuum_analyze,
    count(*) FILTER (WHERE query LIKE 'autovacuum: VACUUM%%'
                       AND query NOT LIKE 'autovacuum: VACUUM ANALYZE%%'
                       AND query NOT LIKE '%%to prevent wraparound%%') AS vacuum,
    count(*) FILTER (WHERE query LIKE '%%to prevent wraparound%%') AS vacuum_freeze,
    count(*) FILTER (WHERE query LIKE 'autovacuum: BRIN summarize%%') AS brin_summarize
FROM pg_stat_activity
WHERE query NOT LIKE '%%pg_stat_activity%%';
""",
}

QUERY_DIFF_LSN = {
    DEFAULT: """
SELECT
    pg_wal_lsn_diff(
      CASE pg_is_in_recovery()
        WHEN true THEN pg_last_wal_receive_lsn()
        ELSE pg_current_wal_lsn()
      END,
    '0/0') as wal_writes ;
""",
    V96: """
SELECT
    pg_xlog_location_diff(
      CASE pg_is_in_recovery()
        WHEN true THEN pg_last_xlog_receive_location()
        ELSE pg_current_xlog_location()
      END,
    '0/0') as wal_writes ;
""",
}


def query_factory(name, version=NO_VERSION):
    if name == QUERY_NAME_BACKENDS:
        return QUERY_BACKEND[DEFAULT]
    elif name == QUERY_NAME_TABLE_STATS:
        return QUERY_TABLE_STATS[DEFAULT]
    elif name == QUERY_NAME_INDEX_STATS:
        return QUERY_INDEX_STATS[DEFAULT]
    elif name == QUERY_NAME_DATABASE:
        return QUERY_DATABASE[DEFAULT]
    elif name == QUERY_NAME_BGWRITER:
        return QUERY_BGWRITER[DEFAULT]
    elif name == QUERY_NAME_LOCKS:
        return QUERY_LOCKS[DEFAULT]
    elif name == QUERY_NAME_DATABASES:
        return QUERY_DATABASES[DEFAULT]
    elif name == QUERY_NAME_STANDBY:
        return QUERY_STANDBY[DEFAULT]
    elif name == QUERY_NAME_REPLICATION_SLOT:
        return QUERY_REPLICATION_SLOT[DEFAULT]
    elif name == QUERY_NAME_IF_SUPERUSER:
        return QUERY_SUPERUSER[DEFAULT]
    elif name == QUERY_NAME_SERVER_VERSION:
        return QUERY_SHOW_VERSION[DEFAULT]
    elif name == QUERY_NAME_AUTOVACUUM:
        return QUERY_AUTOVACUUM[DEFAULT]
    elif name == QUERY_NAME_WAL:
        if version < 100000:
            return QUERY_WAL[V96]
        return QUERY_WAL[DEFAULT]
    elif name == QUERY_NAME_ARCHIVE:
        if version < 100000:
            return QUERY_ARCHIVE[V96]
        return QUERY_ARCHIVE[DEFAULT]
    elif name == QUERY_NAME_STANDBY_DELTA:
        if version < 100000:
            return QUERY_STANDBY_DELTA[V96]
        return QUERY_STANDBY_DELTA[DEFAULT]
    elif name == QUERY_NAME_REPSLOT_FILES:
        if version < 110000:
            return QUERY_REPSLOT_FILES[V10]
        return QUERY_REPSLOT_FILES[DEFAULT]
    elif name == QUERY_NAME_DIFF_LSN:
        if version < 100000:
            return QUERY_DIFF_LSN[V96]
        return QUERY_DIFF_LSN[DEFAULT]

    raise ValueError('unknown query')


ORDER = [
    'db_stat_temp_files',
    'db_stat_temp_bytes',
    'db_stat_blks',
    'db_stat_tuple_returned',
    'db_stat_tuple_write',
    'db_stat_transactions',
    'db_stat_connections',
    'database_size',
    'backend_process',
    'index_count',
    'index_size',
    'table_count',
    'table_size',
    'wal',
    'wal_writes',
    'archive_wal',
    'checkpointer',
    'stat_bgwriter_alloc',
    'stat_bgwriter_checkpoint',
    'stat_bgwriter_backend',
    'stat_bgwriter_backend_fsync',
    'stat_bgwriter_bgwriter',
    'stat_bgwriter_maxwritten',
    'replication_slot',
    'standby_delta',
    'autovacuum'
]

CHARTS = {
    'db_stat_transactions': {
        'options': [None, 'Transactions on db', 'transactions/s', 'db statistics', 'postgres.db_stat_transactions',
                    'line'],
        'lines': [
            ['xact_commit', 'committed', 'incremental'],
            ['xact_rollback', 'rolled back', 'incremental']
        ]
    },
    'db_stat_connections': {
        'options': [None, 'Current connections to db', 'count', 'db statistics', 'postgres.db_stat_connections',
                    'line'],
        'lines': [
            ['connections', 'connections', 'absolute']
        ]
    },
    'db_stat_blks': {
        'options': [None, 'Disk blocks reads from db', 'reads/s', 'db statistics', 'postgres.db_stat_blks', 'line'],
        'lines': [
            ['blks_read', 'disk', 'incremental'],
            ['blks_hit', 'cache', 'incremental']
        ]
    },
    'db_stat_tuple_returned': {
        'options': [None, 'Tuples returned from db', 'tuples/s', 'db statistics', 'postgres.db_stat_tuple_returned',
                    'line'],
        'lines': [
            ['tup_returned', 'sequential', 'incremental'],
            ['tup_fetched', 'bitmap', 'incremental']
        ]
    },
    'db_stat_tuple_write': {
        'options': [None, 'Tuples written to db', 'writes/s', 'db statistics', 'postgres.db_stat_tuple_write', 'line'],
        'lines': [
            ['tup_inserted', 'inserted', 'incremental'],
            ['tup_updated', 'updated', 'incremental'],
            ['tup_deleted', 'deleted', 'incremental'],
            ['conflicts', 'conflicts', 'incremental']
        ]
    },
    'db_stat_temp_bytes': {
        'options': [None, 'Temp files written to disk', 'KiB/s', 'db statistics', 'postgres.db_stat_temp_bytes',
                    'line'],
        'lines': [
            ['temp_bytes', 'size', 'incremental', 1, 1024]
        ]
    },
    'db_stat_temp_files': {
        'options': [None, 'Temp files written to disk', 'files', 'db statistics', 'postgres.db_stat_temp_files',
                    'line'],
        'lines': [
            ['temp_files', 'files', 'incremental']
        ]
    },
    'database_size': {
        'options': [None, 'Database size', 'MiB', 'database size', 'postgres.db_size', 'stacked'],
        'lines': [
        ]
    },
    'backend_process': {
        'options': [None, 'Current Backend Processes', 'processes', 'backend processes', 'postgres.backend_process',
                    'line'],
        'lines': [
            ['backends_active', 'active', 'absolute'],
            ['backends_idle', 'idle', 'absolute']
        ]
    },
    'index_count': {
        'options': [None, 'Total indexes', 'index', 'indexes', 'postgres.index_count', 'line'],
        'lines': [
            ['index_count', 'total', 'absolute']
        ]
    },
    'index_size': {
        'options': [None, 'Indexes size', 'MiB', 'indexes', 'postgres.index_size', 'line'],
        'lines': [
            ['index_size', 'size', 'absolute', 1, 1024 * 1024]
        ]
    },
    'table_count': {
        'options': [None, 'Total Tables', 'tables', 'tables', 'postgres.table_count', 'line'],
        'lines': [
            ['table_count', 'total', 'absolute']
        ]
    },
    'table_size': {
        'options': [None, 'Tables size', 'MiB', 'tables', 'postgres.table_size', 'line'],
        'lines': [
            ['table_size', 'size', 'absolute', 1, 1024 * 1024]
        ]
    },
    'wal': {
        'options': [None, 'Write-Ahead Logs', 'files', 'wal', 'postgres.wal', 'line'],
        'lines': [
            ['written_wal', 'written', 'absolute'],
            ['recycled_wal', 'recycled', 'absolute'],
            ['total_wal', 'total', 'absolute']
        ]
    },
    'wal_writes': {
        'options': [None, 'Write-Ahead Logs', 'KiB/s', 'wal_writes', 'postgres.wal_writes', 'line'],
        'lines': [
            ['wal_writes', 'writes', 'incremental', 1, 1024]
        ]
    },
    'archive_wal': {
        'options': [None, 'Archive Write-Ahead Logs', 'files/s', 'archive wal', 'postgres.archive_wal', 'line'],
        'lines': [
            ['file_count', 'total', 'incremental'],
            ['ready_count', 'ready', 'incremental'],
            ['done_count', 'done', 'incremental']
        ]
    },
    'checkpointer': {
        'options': [None, 'Checkpoints', 'writes', 'checkpointer', 'postgres.checkpointer', 'line'],
        'lines': [
            ['checkpoint_scheduled', 'scheduled', 'incremental'],
            ['checkpoint_requested', 'requested', 'incremental']
        ]
    },
    'stat_bgwriter_alloc': {
        'options': [None, 'Buffers allocated', 'KiB/s', 'bgwriter', 'postgres.stat_bgwriter_alloc', 'line'],
        'lines': [
            ['buffers_alloc', 'alloc', 'incremental', 1, 1024]
        ]
    },
    'stat_bgwriter_checkpoint': {
        'options': [None, 'Buffers written during checkpoints', 'KiB/s', 'bgwriter',
                    'postgres.stat_bgwriter_checkpoint', 'line'],
        'lines': [
            ['buffers_checkpoint', 'checkpoint', 'incremental', 1, 1024]
        ]
    },
    'stat_bgwriter_backend': {
        'options': [None, 'Buffers written directly by a backend', 'KiB/s', 'bgwriter',
                    'postgres.stat_bgwriter_backend', 'line'],
        'lines': [
            ['buffers_backend', 'backend', 'incremental', 1, 1024]
        ]
    },
    'stat_bgwriter_backend_fsync': {
        'options': [None, 'Fsync by backend', 'times', 'bgwriter', 'postgres.stat_bgwriter_backend_fsync', 'line'],
        'lines': [
            ['buffers_backend_fsync', 'backend fsync', 'incremental']
        ]
    },
    'stat_bgwriter_bgwriter': {
        'options': [None, 'Buffers written by the background writer', 'KiB/s', 'bgwriter',
                    'postgres.bgwriter_bgwriter', 'line'],
        'lines': [
            ['buffers_clean', 'clean', 'incremental', 1, 1024]
        ]
    },
    'stat_bgwriter_maxwritten': {
        'options': [None, 'Too many buffers written', 'times', 'bgwriter', 'postgres.stat_bgwriter_maxwritten',
                    'line'],
        'lines': [
            ['maxwritten_clean', 'maxwritten', 'incremental']
        ]
    },
    'autovacuum': {
        'options': [None, 'Autovacuum workers', 'workers', 'autovacuum', 'postgres.autovacuum', 'line'],
        'lines': [
            ['analyze', 'analyze', 'absolute'],
            ['vacuum', 'vacuum', 'absolute'],
            ['vacuum_analyze', 'vacuum analyze', 'absolute'],
            ['vacuum_freeze', 'vacuum freeze', 'absolute'],
            ['brin_summarize', 'brin summarize', 'absolute']
        ]
    },
    'standby_delta': {
        'options': [None, 'Standby delta', 'KiB', 'replication delta', 'postgres.standby_delta', 'line'],
        'lines': [
            ['sent_delta', 'sent delta', 'absolute', 1, 1024],
            ['write_delta', 'write delta', 'absolute', 1, 1024],
            ['flush_delta', 'flush delta', 'absolute', 1, 1024],
            ['replay_delta', 'replay delta', 'absolute', 1, 1024]
        ]
    },
    'replication_slot': {
        'options': [None, 'Replication slot files', 'files', 'replication slot', 'postgres.replication_slot', 'line'],
        'lines': [
            ['replslot_wal_keep', 'wal keeped', 'absolute'],
            ['replslot_files', 'pg_replslot files', 'absolute']
        ]
    }
}


class Service(SimpleService):
    def __init__(self, configuration=None, name=None):
        SimpleService.__init__(self, configuration=configuration, name=name)
        self.order = list(ORDER)
        self.definitions = deepcopy(CHARTS)
        self.do_table_stats = configuration.pop('table_stats', False)
        self.do_index_stats = configuration.pop('index_stats', False)
        self.databases_to_poll = configuration.pop('database_poll', None)
        self.configuration = configuration
        self.conn = None
        self.conn_params = dict()
        self.server_version = None
        self.is_superuser = False
        self.alive = False
        self.databases = list()
        self.secondaries = list()
        self.replication_slots = list()
        self.queries = dict()
        self.data = dict()

    def reconnect(self):
        return self.connect()

    def build_conn_params(self):
        conf = self.configuration

        # connection URIs: https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
        if conf.get(CONN_PARAM_DSN):
            return {'dsn': conf[CONN_PARAM_DSN]}

        params = {
            CONN_PARAM_HOST: conf.get(CONN_PARAM_HOST),
            CONN_PARAM_PORT: conf.get(CONN_PARAM_PORT, DEFAULT_PORT),
            CONN_PARAM_DATABASE: conf.get(CONN_PARAM_DATABASE),
            CONN_PARAM_USER: conf.get(CONN_PARAM_USER, DEFAULT_USER),
            CONN_PARAM_PASSWORD: conf.get(CONN_PARAM_PASSWORD),
            CONN_PARAM_CONN_TIMEOUT: conf.get(CONN_PARAM_CONN_TIMEOUT, DEFAULT_CONNECT_TIMEOUT),
            'options': '-c statement_timeout={0}'.format(
                conf.get(CONN_PARAM_STATEMENT_TIMEOUT, DEFAULT_STATEMENT_TIMEOUT)),
        }

        # https://www.postgresql.org/docs/current/libpq-ssl.html
        ssl_params = dict(
            (k, v) for k, v in {
                CONN_PARAM_SSL_MODE: conf.get(CONN_PARAM_SSL_MODE),
                CONN_PARAM_SSL_ROOT_CERT: conf.get(CONN_PARAM_SSL_ROOT_CERT),
                CONN_PARAM_SSL_CRL: conf.get(CONN_PARAM_SSL_CRL),
                CONN_PARAM_SSL_CERT: conf.get(CONN_PARAM_SSL_CERT),
                CONN_PARAM_SSL_KEY: conf.get(CONN_PARAM_SSL_KEY),
            }.items() if v)

        if CONN_PARAM_SSL_MODE not in ssl_params and len(ssl_params) > 0:
            raise ValueError("mandatory 'sslmode' param is missing, please set")

        params.update(ssl_params)

        return params

    def connect(self):
        if self.conn:
            self.conn.close()
            self.conn = None

        try:
            self.conn = psycopg2.connect(**self.conn_params)
            self.conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
            self.conn.set_session(readonly=True)
        except OperationalError as error:
            self.error(error)
            self.alive = False
        else:
            self.alive = True

        return self.alive

    def check(self):
        if not PSYCOPG2:
            self.error("'python-psycopg2' package is needed to use postgres module")
            return False

        try:
            self.conn_params = self.build_conn_params()
        except ValueError as error:
            self.error('error on creating connection params : {0}', error)
            return False

        if not self.connect():
            self.error('failed to connect to {0}'.format(hide_password(self.conn_params)))
            return False

        try:
            self.check_queries()
        except Exception as error:
            self.error(error)
            return False

        self.populate_queries()
        self.create_dynamic_charts()

        return True

    def get_data(self):
        if not self.alive and not self.reconnect():
            return None

        try:
            cursor = self.conn.cursor(cursor_factory=DictCursor)

            self.data.update(zero_lock_types(self.databases))

            for query, metrics in self.queries.items():
                self.query_stats(cursor, query, metrics)

        except OperationalError:
            self.alive = False
            return None

        cursor.close()

        return self.data

    def query_stats(self, cursor, query, metrics):
        cursor.execute(query, dict(databases=tuple(self.databases)))

        for row in cursor:
            for metric in metrics:
                #  databases
                if 'database_name' in row:
                    dimension_id = '_'.join([row['database_name'], metric])
                #  secondaries
                elif 'application_name' in row:
                    dimension_id = '_'.join([row['application_name'], metric])
                # replication slots
                elif 'slot_name' in row:
                    dimension_id = '_'.join([row['slot_name'], metric])
                #  other
                else:
                    dimension_id = metric

                if metric in row:
                    if row[metric] is not None:
                        self.data[dimension_id] = int(row[metric])
                elif 'locks_count' in row:
                    if metric == row['mode']:
                        self.data[dimension_id] = row['locks_count']

    def check_queries(self):
        cursor = self.conn.cursor()

        self.server_version = detect_server_version(cursor, query_factory(QUERY_NAME_SERVER_VERSION))
        self.debug('server version: {0}'.format(self.server_version))

        self.is_superuser = check_if_superuser(cursor, query_factory(QUERY_NAME_IF_SUPERUSER))
        self.debug('superuser: {0}'.format(self.is_superuser))

        self.databases = discover(cursor, query_factory(QUERY_NAME_DATABASES))
        self.debug('discovered databases {0}'.format(self.databases))
        if self.databases_to_poll:
            to_poll = self.databases_to_poll.split()
            self.databases = [db for db in self.databases if db in to_poll] or self.databases

        self.secondaries = discover(cursor, query_factory(QUERY_NAME_STANDBY))
        self.debug('discovered secondaries: {0}'.format(self.secondaries))

        if self.server_version >= 94000:
            self.replication_slots = discover(cursor, query_factory(QUERY_NAME_REPLICATION_SLOT))
            self.debug('discovered replication slots: {0}'.format(self.replication_slots))

        cursor.close()

    def populate_queries(self):
        self.queries[query_factory(QUERY_NAME_DATABASE)] = METRICS[QUERY_NAME_DATABASE]
        self.queries[query_factory(QUERY_NAME_BACKENDS)] = METRICS[QUERY_NAME_BACKENDS]
        self.queries[query_factory(QUERY_NAME_LOCKS)] = METRICS[QUERY_NAME_LOCKS]
        self.queries[query_factory(QUERY_NAME_BGWRITER)] = METRICS[QUERY_NAME_BGWRITER]
        self.queries[query_factory(QUERY_NAME_DIFF_LSN, self.server_version)] = METRICS[QUERY_NAME_WAL_WRITES]
        self.queries[query_factory(QUERY_NAME_STANDBY_DELTA, self.server_version)] = METRICS[QUERY_NAME_STANDBY_DELTA]

        if self.do_index_stats:
            self.queries[query_factory(QUERY_NAME_INDEX_STATS)] = METRICS[QUERY_NAME_INDEX_STATS]
        if self.do_table_stats:
            self.queries[query_factory(QUERY_NAME_TABLE_STATS)] = METRICS[QUERY_NAME_TABLE_STATS]

        if self.is_superuser:
            self.queries[query_factory(QUERY_NAME_ARCHIVE, self.server_version)] = METRICS[QUERY_NAME_ARCHIVE]

            if self.server_version >= 90400:
                self.queries[query_factory(QUERY_NAME_WAL, self.server_version)] = METRICS[QUERY_NAME_WAL]

            if self.server_version >= 100000:
                self.queries[query_factory(QUERY_NAME_REPSLOT_FILES, self.server_version)] = METRICS[QUERY_NAME_REPSLOT_FILES]

        if self.server_version >= 90400:
            self.queries[query_factory(QUERY_NAME_AUTOVACUUM)] = METRICS[QUERY_NAME_AUTOVACUUM]

    def create_dynamic_charts(self):
        for database_name in self.databases[::-1]:
            dim = [
                database_name + '_size',
                database_name,
                'absolute',
                1,
                1024 * 1024,
            ]
            self.definitions['database_size']['lines'].append(dim)
            for chart_name in [name for name in self.order if name.startswith('db_stat')]:
                    add_database_stat_chart(
                        order=self.order,
                        definitions=self.definitions,
                        name=chart_name,
                        database_name=database_name,
                    )
            add_database_lock_chart(
                order=self.order,
                definitions=self.definitions,
                database_name=database_name,
            )

        for application_name in self.secondaries[::-1]:
            add_replication_delta_chart(
                order=self.order,
                definitions=self.definitions,
                name='standby_delta',
                application_name=application_name,
            )

        for slot_name in self.replication_slots[::-1]:
            add_replication_slot_chart(
                order=self.order,
                definitions=self.definitions,
                name='replication_slot',
                slot_name=slot_name,
            )


def discover(cursor, query):
    cursor.execute(query)
    result = list()
    for v in [value[0] for value in cursor]:
        if v not in result:
            result.append(v)
    return result


def check_if_superuser(cursor, query):
    cursor.execute(query)
    return cursor.fetchone()[0]


def detect_server_version(cursor, query):
    cursor.execute(query)
    return int(cursor.fetchone()[0])


def zero_lock_types(databases):
    result = dict()
    for database in databases:
        for lock_type in METRICS['LOCKS']:
            key = '_'.join([database, lock_type])
            result[key] = 0

    return result


def hide_password(config):
    return dict((k, v if k != 'password' else '*****') for k, v in config.items())


def add_database_lock_chart(order, definitions, database_name):
    def create_lines(database):
        result = list()
        for lock_type in METRICS['LOCKS']:
            dimension_id = '_'.join([database, lock_type])
            result.append([dimension_id, lock_type, 'absolute'])
        return result

    chart_name = database_name + '_locks'
    order.insert(-1, chart_name)
    definitions[chart_name] = {
            'options':
            [None, 'Locks on db: ' + database_name, 'locks', 'db ' + database_name, 'postgres.db_locks', 'line'],
            'lines': create_lines(database_name)
            }


def add_database_stat_chart(order, definitions, name, database_name):
    def create_lines(database, lines):
        result = list()
        for line in lines:
            new_line = ['_'.join([database, line[0]])] + line[1:]
            result.append(new_line)
        return result

    chart_template = CHARTS[name]
    chart_name = '_'.join([database_name, name])
    order.insert(0, chart_name)
    name, title, units, _, context, chart_type = chart_template['options']
    definitions[chart_name] = {
               'options': [name, title + ': ' + database_name,  units, 'db ' + database_name, context,  chart_type],
               'lines': create_lines(database_name, chart_template['lines'])}


def add_replication_delta_chart(order, definitions, name, application_name):
    def create_lines(standby, lines):
        result = list()
        for line in lines:
            new_line = ['_'.join([standby, line[0]])] + line[1:]
            result.append(new_line)
        return result

    chart_template = CHARTS[name]
    chart_name = '_'.join([application_name, name])
    position = order.index('database_size')
    order.insert(position, chart_name)
    name, title, units, _, context, chart_type = chart_template['options']
    definitions[chart_name] = {
               'options': [name, title + ': ' + application_name,  units, 'replication delta', context,  chart_type],
               'lines': create_lines(application_name, chart_template['lines'])}


def add_replication_slot_chart(order, definitions, name, slot_name):
    def create_lines(slot, lines):
        result = list()
        for line in lines:
            new_line = ['_'.join([slot, line[0]])] + line[1:]
            result.append(new_line)
        return result

    chart_template = CHARTS[name]
    chart_name = '_'.join([slot_name, name])
    position = order.index('database_size')
    order.insert(position, chart_name)
    name, title, units, _, context, chart_type = chart_template['options']
    definitions[chart_name] = {
               'options': [name, title + ': ' + slot_name,  units, 'replication slot files', context,  chart_type],
               'lines': create_lines(slot_name, chart_template['lines'])}