Skip to content

Instantly share code, notes, and snippets.

@oleewere
Created July 6, 2022 17:20
Show Gist options
  • Save oleewere/895bc4c2ecdca1c3997364cf91f8e7a9 to your computer and use it in GitHub Desktop.
Save oleewere/895bc4c2ecdca1c3997364cf91f8e7a9 to your computer and use it in GitHub Desktop.
checkipaconsistency v2
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Tool to check consistency across FreeIPA servers
Author: Peter Pakos <peter.pakos@wandisco.com>
Copyright (C) 2017 WANdisco
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
from __future__ import absolute_import, print_function
import os
import sys
import json
import argparse
from prettytable import PrettyTable
import dns.resolver
from collections import OrderedDict
try:
import configparser
except ImportError:
import ConfigParser as configparser
from pplogger import get_logger
from .__version__ import __version__
from .freeipaserver import FreeIPAServer
class Checks(object):
def __init__(self):
pass
class Main(object):
def __init__(self):
self._app_name = os.path.basename(sys.modules['__main__'].__file__)
self._app_dir = os.path.dirname(os.path.realpath(__file__))
self._parse_args()
self._log = get_logger(debug=self._args.debug, quiet=self._args.quiet, verbose=self._args.verbose,
file_level='DEBUG' if self._args.log_file else False,
log_file=self._args.log_file if self._args.log_file else False)
self._log.debug(self._args)
self._log.debug('Initialising...')
self._domain = None
self._hosts = []
self._binddn = 'cn=Directory Manager'
self._bindpw = None
self._load_config()
if self._args.domain:
self._log.debug('Domain set by argument')
self._domain = self._args.domain
if not self._domain:
self._log.critical('IPA domain not set')
exit(1)
else:
self._log.debug('IPA domain: %s' % self._domain)
if self._args.hosts:
self._log.debug('Server list set by argument')
self._hosts = self._args.hosts
for i, host in enumerate(self._hosts):
if not host or ' ' in host:
self._log.critical('Incorrect server name: %s' % host)
exit(1)
if not self._hosts:
self._log.debug('Searching for IPA servers in DNS')
record = '_ldap._tcp.%s' % self._domain
answers = []
try:
answers = dns.resolver.query(record, 'SRV')
except (dns.resolver.NXDOMAIN, dns.resolver.NoNameservers):
self._log.critical('IPA servers not set, also failed to find any in DNS')
exit(1)
for answer in answers:
self._hosts.append(str(answer).split(' ')[3].rstrip('.'))
self._log.debug('IPA servers: %s' % ', '.join(self._hosts))
if self._args.binddn:
self._log.debug('Bind DN set by argument')
self._binddn = self._args.binddn
if not self._binddn:
self._log.critical('Bind DN not set')
exit(1)
if self._args.bindpw:
self._log.debug('Bind password set by argument')
self._bindpw = self._args.bindpw
if not self._bindpw:
self._log.critical('Bind password not set')
exit(1)
self._servers = OrderedDict()
for host in self._hosts:
self._servers[host] = FreeIPAServer(host, self._domain, self._binddn, self._bindpw)
self._checks = OrderedDict([
('users', 'Active Users'),
('susers', 'Stage Users'),
('pusers', 'Preserved Users'),
('hosts', 'Hosts'),
('services', 'Services'),
('ugroups', 'User Groups'),
('hgroups', 'Host Groups'),
('ngroups', 'Netgroups'),
('hbac', 'HBAC Rules'),
('sudo', 'SUDO Rules'),
('zones', 'DNS Zones'),
('certs', 'Certificates'),
('conflicts', 'LDAP Conflicts'),
('ghosts', 'Ghost Replicas'),
('bind', 'Anonymous BIND'),
('msdcs', 'Microsoft ADTrust'),
('replicas', 'Replication Status')
])
def _parse_args(self):
parser = argparse.ArgumentParser(description='Tool to check consistency across FreeIPA servers', add_help=False)
parser.add_argument('-H', '--hosts', nargs='*', dest='hosts', help='list of IPA servers')
parser.add_argument('-d', '--domain', nargs='?', dest='domain', help='IPA domain')
parser.add_argument('-D', '--binddn', nargs='?', dest='binddn', help='Bind DN (default: cn=Directory Manager)')
parser.add_argument('-W', '--bindpw', nargs='?', dest='bindpw', help='Bind password')
parser.add_argument('--help', action='help', help='show this help message and exit')
parser.add_argument('--version', action='version',
version='%s %s' % (os.path.basename(sys.argv[0]), __version__))
parser.add_argument('--debug', action='store_true', dest='debug', help='debugging mode')
parser.add_argument('--verbose', action='store_true', dest='verbose', help='verbose mode')
parser.add_argument('--quiet', action='store_true', dest='quiet', help='do not log to console')
parser.add_argument('-l', '--log-file', nargs='?', dest='log_file', default='not_set',
help='log to file (./%s.log by default)' % self._app_name)
parser.add_argument('-o', '--output', dest='output_mode', help='Set output mode. (table | json | metrics, default: table)', default='table')
parser.add_argument('--no-header', action='store_true', dest='disable_header', help='disable table header')
parser.add_argument('--no-border', action='store_true', dest='disable_border', help='disable table border')
parser.add_argument('-n', nargs='?', dest='nagios_check', help='Nagios plugin mode', default='not_set',
choices=['', 'all', 'users', 'susers', 'pusers', 'hosts', 'services', 'ugroups', 'hgroups',
'ngroups', 'hbac', 'sudo', 'zones', 'certs', 'conflicts', 'ghosts', 'bind',
'msdcs', 'replicas'])
parser.add_argument('-w', '--warning', type=int, dest='warning',
default=1, help='number of failed checks before warning (default: %(default)s)')
parser.add_argument('-c', '--critical', type=int, dest='critical',
default=2, help='number of failed checks before critical (default: %(default)s)')
args = parser.parse_args()
if args.log_file == 'not_set':
args.log_file = None
elif not args.log_file:
args.log_file = self._app_name + '.log'
if args.nagios_check == 'not_set':
args.nagios_check = None
elif not args.nagios_check:
args.nagios_check = 'all'
self._args = args
def _load_config(self):
config = configparser.ConfigParser()
file_dir = os.path.expanduser(os.environ.get('XDG_CONFIG_HOME', '~/.config'))
if not os.path.exists(file_dir):
self._log.debug('Config directory %s does not exist, creating' % file_dir)
os.makedirs(file_dir)
config_file = os.path.join(
file_dir,
os.path.splitext(__name__)[0]
)
if not os.path.isfile(config_file):
self._log.debug('Config file not found at %s' % config_file)
config.add_section('IPA')
config.set('IPA', 'DOMAIN', 'ipa.example.com')
config.set('IPA', 'HOSTS', 'ipa01, ipa02, ipa03, ipa04, ipa05, ipa06')
config.set('IPA', 'BINDDN', 'cn=Directory Manager')
config.set('IPA', 'BINDPW', 'example123')
with open(config_file, 'w') as cfgfile:
config.write(cfgfile)
self._log.info('Initial config saved to %s - PLEASE EDIT IT!' % config_file)
return
self._log.debug('Loading configuration file %s' % config_file)
if 'example' in open(config_file).read():
self._log.debug('Initial config found in %s - PLEASE EDIT IT!' % config_file)
return
config.read(config_file)
if not config.has_section('IPA'):
self._log.debug('Config file has no IPA section')
return
if config.has_option('IPA', 'DOMAIN'):
self._domain = config.get('IPA', 'DOMAIN')
self._log.debug('DOMAIN = %s' % self._domain)
else:
self._log.debug('IPA.DOMAIN not set')
if config.has_option('IPA', 'HOSTS'):
self._hosts = config.get('IPA', 'HOSTS')
self._log.debug('HOSTS = %s' % self._hosts)
self._hosts = self._hosts.replace(',', ' ').split()
else:
self._log.debug('IPA.SERVERS not set')
if config.has_option('IPA', 'BINDDN'):
self._binddn = config.get('IPA', 'BINDDN')
self._log.debug('BINDDN = %s' % self._binddn)
else:
self._log.debug('IPA.BINDDN not set')
if config.has_option('IPA', 'BINDPW'):
self._bindpw = config.get('IPA', 'BINDPW')
self._log.debug('BINDPW = ********')
else:
self._log.debug('IPA.BINDPW not set')
def run(self):
self._log.debug('Starting...')
if self._args.nagios_check:
self._log.debug('Nagios plugin mode')
self._nagios_plugin(self._args.nagios_check)
elif self._args.output_mode == "json":
self._log.debug('JSON output mode')
self._print_json()
elif self._args.output_mode == "metrics":
self._log.debug('OpenMetrics output mode')
self._print_metrics()
else:
self._log.debug('Table output mode')
self._print_table()
self._log.debug('Finishing...')
def _print_metrics(self):
object = self._to_object(True)
metrics_output = ""
for key in object:
vals = object[key]
state=vals["state"]
state_val = 0 if state == "OK" else 1
metrics_state_record = "cipa_%s_state{cipa_info=\"%s\"} %d\n" % (key, state, state_val)
server_values=vals["values"]
for server_key in server_values:
metric_val = server_values[server_key]
if metric_val.isdigit():
metrics_output+="cipa_%s{cipa_server=\"%s\"} %s\n" % (key, server_key, metric_val)
elif metric_val == "True":
metrics_output+="cipa_%s{cipa_server=\"%s\"} 0\n" % (key, server_key)
elif metric_val == "False":
metrics_output+="cipa_%s{cipa_server=\"%s\"} 1\n" % (key, server_key)
else:
metrics_output+="cipa_%s{cipa_server=\"%s\", cipa_server=\"%s\"} 0\n" % (key, server_key, metric_val)
metrics_output+=metrics_state_record
self._log.info(metrics_output.rstrip())
def _print_json(self):
json_output = self._to_object()
json_output = json.dumps(json_output)
self._log.info(json_output)
def _to_object(self, formatted_names=False):
json_output = {}
for check in self._checks:
json_output = OrderedDict(json_output)
name=self._checks[check]
if formatted_names:
name=name.replace(" ", "_").lower()
state = 'OK'if self._is_consistent(check, [getattr(server, check) for server in self._servers.values()]) else 'FAIL'
json_output[name]={}
json_output[name]['state']=state
json_output[name]['values']={}
for server in self._servers.values():
hostname_short = getattr(server, 'hostname_short')
json_output[name]['values'][hostname_short] = str(getattr(server, check)).replace("\n", ",")
return json_output
def _print_table(self):
table = PrettyTable(
['FreeIPA servers:'] + [getattr(server, 'hostname_short') for server in self._servers.values()] + ['STATE'],
header=not self._args.disable_header,
border=not self._args.disable_border
)
table.align = 'l'
for check in self._checks:
state = 'OK' if self._is_consistent(check, [getattr(server, check) for server in self._servers.values()])\
else 'FAIL'
table.add_row(
[self._checks[check]] +
[getattr(server, check) for server in self._servers.values()] +
[state]
)
self._log.info(table)
def _is_consistent(self, check, check_results):
if check == 'conflicts':
conflicts = [getattr(server, 'conflicts') for server in self._servers.values()]
if conflicts.count(conflicts[0]) == len(conflicts) and conflicts[0] == 0:
return True
else:
return False
elif check == 'ghosts':
ghosts = [getattr(server, 'ghosts') for server in self._servers.values()]
if ghosts.count(ghosts[0]) == len(ghosts) and ghosts[0] == 0:
return True
else:
return False
elif check == 'replicas':
healths = [getattr(server, 'healthy_agreements') for server in self._servers.values()]
if healths.count(healths[0]) == len(healths) and healths[0]:
return True
else:
return False
if check_results.count(check_results[0]) == len(check_results) and None not in check_results:
return True
else:
return False
def _nagios_plugin(self, check):
self._log.debug('Running check: %s' % check)
if check == 'all':
checks_no = len(self._checks)
oks = 0
for check in self._checks:
if self._is_consistent(check, [getattr(server, check) for server in self._servers.values()]):
oks += 1
fails = checks_no - oks
if 0 <= fails < self._args.warning:
msg = 'OK'
code = 0
elif self._args.warning <= fails < self._args.critical:
msg = 'WARNING'
code = 1
elif fails >= self._args.critical:
msg = 'CRITICAL'
code = 2
else:
msg = 'UNKNOWN'
code = 3
self._log.info('%s - %s/%s checks passed' % (msg, oks, checks_no))
exit(code)
else:
if self._is_consistent(check, [getattr(server, check) for server in self._servers.values()]):
msg = 'OK'
code = 0
else:
msg = 'CRITICAL'
code = 2
self._log.info('%s - %s' % (msg, self._checks[check]))
exit(code)
def main():
try:
Main().run()
except KeyboardInterrupt:
print('\nTerminating...')
exit(130)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment