Viewing file: gearman.chart.py (6.5 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
# Description: dovecot netdata python.d module # Author: Kyle Agronick (agronick) # SPDX-License-Identifier: GPL-3.0+
# Gearman Netdata Plugin
from copy import deepcopy
from bases.FrameworkServices.SocketService import SocketService
CHARTS = { 'total_workers': { 'options': [None, 'Total Jobs', 'Jobs', 'Total Jobs', 'gearman.total_jobs', 'line'], 'lines': [ ['total_pending', 'Pending', 'absolute'], ['total_running', 'Running', 'absolute'], ] }, }
def job_chart_template(job_name): return { 'options': [None, job_name, 'Jobs', 'Activity by Job', 'gearman.single_job', 'stacked'], 'lines': [ ['{0}_pending'.format(job_name), 'Pending', 'absolute'], ['{0}_idle'.format(job_name), 'Idle', 'absolute'], ['{0}_running'.format(job_name), 'Running', 'absolute'], ] }
def build_result_dict(job): """ Get the status for each job :return: dict """
total, running, available = job['metrics']
idle = available - running pending = total - running
return { '{0}_pending'.format(job['job_name']): pending, '{0}_idle'.format(job['job_name']): idle, '{0}_running'.format(job['job_name']): running, }
def parse_worker_data(job): job_name = job[0] job_metrics = job[1:]
return { 'job_name': job_name, 'metrics': job_metrics, }
class GearmanReadException(BaseException): pass
class Service(SocketService): def __init__(self, configuration=None, name=None): super(Service, self).__init__(configuration=configuration, name=name) self.request = "status\n" self._keep_alive = True
self.host = self.configuration.get('host', 'localhost') self.port = self.configuration.get('port', 4730)
self.tls = self.configuration.get('tls', False) self.cert = self.configuration.get('cert', None) self.key = self.configuration.get('key', None)
self.active_jobs = set() self.definitions = deepcopy(CHARTS) self.order = ['total_workers']
def _get_data(self): """ Format data received from socket :return: dict """
try: active_jobs = self.get_active_jobs() except GearmanReadException: return None
found_jobs, job_data = self.process_jobs(active_jobs) self.remove_stale_jobs(found_jobs) return job_data
def get_active_jobs(self): active_jobs = []
for job in self.get_worker_data(): parsed_job = parse_worker_data(job)
# Gearman does not clean up old jobs # We only care about jobs that have # some relevant data if not any(parsed_job['metrics']): continue
active_jobs.append(parsed_job)
return active_jobs
def get_worker_data(self): """ Split the data returned from Gearman into a list of lists
This returns the same output that you would get from a gearadmin --status command.
Example output returned from _get_raw_data(): prefix generic_worker4 78 78 500 generic_worker2 78 78 500 generic_worker3 0 0 760 generic_worker1 0 0 500
:return: list """
try: raw = self._get_raw_data() except (ValueError, AttributeError): raise GearmanReadException()
if raw is None: self.debug("Gearman returned no data") raise GearmanReadException()
workers = list()
for line in raw.splitlines()[:-1]: parts = line.split() if not parts: continue
name = '_'.join(parts[:-3]) try: values = [int(w) for w in parts[-3:]] except ValueError: continue
w = [name] w.extend(values) workers.append(w)
return workers
def process_jobs(self, active_jobs):
output = { 'total_pending': 0, 'total_idle': 0, 'total_running': 0, } found_jobs = set()
for parsed_job in active_jobs:
job_name = self.add_job(parsed_job) found_jobs.add(job_name) job_data = build_result_dict(parsed_job)
for sum_value in ('pending', 'running', 'idle'): output['total_{0}'.format(sum_value)] += job_data['{0}_{1}'.format(job_name, sum_value)]
output.update(job_data)
return found_jobs, output
def remove_stale_jobs(self, active_job_list): """ Removes jobs that have no workers, pending jobs, or running jobs :param active_job_list: The latest list of active jobs :type active_job_list: iterable :return: None """
for to_remove in self.active_jobs - active_job_list: self.remove_job(to_remove)
def add_job(self, parsed_job): """ Adds a job to the list of active jobs :param parsed_job: A parsed job dict :type parsed_job: dict :return: None """
def add_chart(job_name): """ Adds a new job chart :param job_name: The name of the job to add :type job_name: string :return: None """
job_key = 'job_{0}'.format(job_name) template = job_chart_template(job_name) new_chart = self.charts.add_chart([job_key] + template['options']) for dimension in template['lines']: new_chart.add_dimension(dimension)
if parsed_job['job_name'] not in self.active_jobs: add_chart(parsed_job['job_name']) self.active_jobs.add(parsed_job['job_name'])
return parsed_job['job_name']
def remove_job(self, job_name): """ Removes a job to the list of active jobs :param job_name: The name of the job to remove :type job_name: string :return: None """
def remove_chart(job_name): """ Removes a job chart :param job_name: The name of the job to remove :type job_name: string :return: None """
job_key = 'job_{0}'.format(job_name) self.charts[job_key].obsolete() del self.charts[job_key]
remove_chart(job_name) self.active_jobs.remove(job_name)
|