Viewing file: exporting_engine.h (9.78 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_EXPORTING_ENGINE_H #define NETDATA_EXPORTING_ENGINE_H 1
#include "daemon/common.h"
#include <uv.h>
#define exporter_get(section, name, value) expconfig_get(&exporting_config, section, name, value) #define exporter_get_number(section, name, value) expconfig_get_number(&exporting_config, section, name, value) #define exporter_get_boolean(section, name, value) expconfig_get_boolean(&exporting_config, section, name, value)
extern struct config exporting_config;
#define EXPORTING_UPDATE_EVERY_OPTION_NAME "update every" #define EXPORTING_UPDATE_EVERY_DEFAULT 10
typedef enum exporting_options { EXPORTING_OPTION_NON = 0,
EXPORTING_SOURCE_DATA_AS_COLLECTED = (1 << 0), EXPORTING_SOURCE_DATA_AVERAGE = (1 << 1), EXPORTING_SOURCE_DATA_SUM = (1 << 2),
EXPORTING_OPTION_SEND_CONFIGURED_LABELS = (1 << 3), EXPORTING_OPTION_SEND_AUTOMATIC_LABELS = (1 << 4), EXPORTING_OPTION_USE_TLS = (1 << 5),
EXPORTING_OPTION_SEND_NAMES = (1 << 16) } EXPORTING_OPTIONS;
#define EXPORTING_OPTIONS_SOURCE_BITS \ (EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_SOURCE_DATA_SUM) #define EXPORTING_OPTIONS_DATA_SOURCE(exporting_options) (exporting_options & EXPORTING_OPTIONS_SOURCE_BITS)
#define sending_labels_configured(instance) \ (instance->config.options & (EXPORTING_OPTION_SEND_CONFIGURED_LABELS | EXPORTING_OPTION_SEND_AUTOMATIC_LABELS))
#define should_send_label(instance, label) \ ((instance->config.options & EXPORTING_OPTION_SEND_CONFIGURED_LABELS && \ label->label_source == LABEL_SOURCE_NETDATA_CONF) || \ (instance->config.options & EXPORTING_OPTION_SEND_AUTOMATIC_LABELS && \ label->label_source != LABEL_SOURCE_NETDATA_CONF))
typedef enum exporting_connector_types { EXPORTING_CONNECTOR_TYPE_UNKNOWN, // Invalid type EXPORTING_CONNECTOR_TYPE_GRAPHITE, // Send plain text to Graphite EXPORTING_CONNECTOR_TYPE_GRAPHITE_HTTP, // Send data to Graphite using HTTP API EXPORTING_CONNECTOR_TYPE_JSON, // Send data in JSON format EXPORTING_CONNECTOR_TYPE_JSON_HTTP, // Send data in JSON format using HTTP API EXPORTING_CONNECTOR_TYPE_OPENTSDB, // Send data to OpenTSDB using telnet API EXPORTING_CONNECTOR_TYPE_OPENTSDB_HTTP, // Send data to OpenTSDB using HTTP API EXPORTING_CONNECTOR_TYPE_PROMETHEUS_REMOTE_WRITE, // User selected to use Prometheus backend EXPORTING_CONNECTOR_TYPE_KINESIS, // Send message to AWS Kinesis EXPORTING_CONNECTOR_TYPE_PUBSUB, // Send message to Google Cloud Pub/Sub EXPORTING_CONNECTOR_TYPE_MONGODB, // Send data to MongoDB collection EXPORTING_CONNECTOR_TYPE_NUM // Number of backend types } EXPORTING_CONNECTOR_TYPE;
struct engine;
struct instance_config { EXPORTING_CONNECTOR_TYPE type; const char *type_name;
const char *name; const char *destination; const char *username; const char *password; const char *prefix; const char *hostname;
int update_every; int buffer_on_failures; long timeoutms;
EXPORTING_OPTIONS options; SIMPLE_PATTERN *charts_pattern; SIMPLE_PATTERN *hosts_pattern;
int initialized;
void *connector_specific_config; };
struct simple_connector_config { int default_port; };
struct simple_connector_buffer { BUFFER *header; BUFFER *buffer;
size_t buffered_metrics; size_t buffered_bytes;
int used;
struct simple_connector_buffer *next; };
#define CONNECTED_TO_MAX 1024
struct simple_connector_data { void *connector_specific_data;
char connected_to[CONNECTED_TO_MAX]; char *auth_string;
size_t total_buffered_metrics;
BUFFER *header; BUFFER *buffer; size_t buffered_metrics; size_t buffered_bytes;
struct simple_connector_buffer *previous_buffer; struct simple_connector_buffer *first_buffer; struct simple_connector_buffer *last_buffer;
#ifdef ENABLE_HTTPS SSL *conn; //SSL connection int flags; //The flags for SSL connection #endif };
struct prometheus_remote_write_specific_config { char *remote_write_path; };
struct aws_kinesis_specific_config { char *stream_name; char *auth_key_id; char *secure_key; };
struct pubsub_specific_config { char *credentials_file; char *project_id; char *topic_id; };
struct mongodb_specific_config { char *database; char *collection; };
struct engine_config { const char *hostname; int update_every; };
struct stats { collected_number buffered_metrics; collected_number lost_metrics; collected_number sent_metrics; collected_number buffered_bytes; collected_number lost_bytes; collected_number sent_bytes; collected_number received_bytes; collected_number transmission_successes; collected_number data_lost_events; collected_number reconnects; collected_number transmission_failures; collected_number receptions;
int initialized;
RRDSET *st_metrics; RRDDIM *rd_buffered_metrics; RRDDIM *rd_lost_metrics; RRDDIM *rd_sent_metrics;
RRDSET *st_bytes; RRDDIM *rd_buffered_bytes; RRDDIM *rd_lost_bytes; RRDDIM *rd_sent_bytes; RRDDIM *rd_received_bytes;
RRDSET *st_ops; RRDDIM *rd_transmission_successes; RRDDIM *rd_data_lost_events; RRDDIM *rd_reconnects; RRDDIM *rd_transmission_failures; RRDDIM *rd_receptions;
RRDSET *st_rusage; RRDDIM *rd_user; RRDDIM *rd_system; };
struct instance { struct instance_config config; void *buffer; void (*worker)(void *instance_p); struct stats stats;
int scheduled; int disabled; int skip_host; int skip_chart;
BUFFER *labels;
time_t after; time_t before;
uv_thread_t thread; uv_mutex_t mutex; uv_cond_t cond_var; int data_is_ready;
int (*start_batch_formatting)(struct instance *instance); int (*start_host_formatting)(struct instance *instance, RRDHOST *host); int (*start_chart_formatting)(struct instance *instance, RRDSET *st); int (*metric_formatting)(struct instance *instance, RRDDIM *rd); int (*end_chart_formatting)(struct instance *instance, RRDSET *st); int (*end_host_formatting)(struct instance *instance, RRDHOST *host); int (*end_batch_formatting)(struct instance *instance);
void (*prepare_header)(struct instance *instance); int (*check_response)(BUFFER *buffer, struct instance *instance);
void *connector_specific_data;
size_t index; struct instance *next; struct engine *engine;
volatile sig_atomic_t exited; };
struct engine { struct engine_config config;
size_t instance_num; time_t now;
int aws_sdk_initialized; int protocol_buffers_initialized; int mongoc_initialized;
struct instance *instance_root;
volatile sig_atomic_t exit; };
extern struct instance *prometheus_exporter_instance;
void *exporting_main(void *ptr);
struct engine *read_exporting_config(); EXPORTING_CONNECTOR_TYPE exporting_select_type(const char *type);
int init_connectors(struct engine *engine); void simple_connector_init(struct instance *instance);
int mark_scheduled_instances(struct engine *engine); void prepare_buffers(struct engine *engine);
size_t exporting_name_copy(char *dst, const char *src, size_t max_len);
int rrdhost_is_exportable(struct instance *instance, RRDHOST *host); int rrdset_is_exportable(struct instance *instance, RRDSET *st);
calculated_number exporting_calculate_value_from_stored_data( struct instance *instance, RRDDIM *rd, time_t *last_timestamp);
void start_batch_formatting(struct engine *engine); void start_host_formatting(struct engine *engine, RRDHOST *host); void start_chart_formatting(struct engine *engine, RRDSET *st); void metric_formatting(struct engine *engine, RRDDIM *rd); void end_chart_formatting(struct engine *engine, RRDSET *st); void end_host_formatting(struct engine *engine, RRDHOST *host); void end_batch_formatting(struct engine *engine); int flush_host_labels(struct instance *instance, RRDHOST *host); int simple_connector_end_batch(struct instance *instance);
int exporting_discard_response(BUFFER *buffer, struct instance *instance); void simple_connector_receive_response(int *sock, struct instance *instance); void simple_connector_send_buffer( int *sock, int *failures, struct instance *instance, BUFFER *header, BUFFER *buffer, size_t buffered_metrics); void simple_connector_worker(void *instance_p);
void create_main_rusage_chart(RRDSET **st_rusage, RRDDIM **rd_user, RRDDIM **rd_system); void send_main_rusage(RRDSET *st_rusage, RRDDIM *rd_user, RRDDIM *rd_system); void send_internal_metrics(struct instance *instance);
extern void clean_instance(struct instance *ptr); void simple_connector_cleanup(struct instance *instance);
static inline void disable_instance(struct instance *instance) { instance->disabled = 1; instance->scheduled = 0; uv_mutex_unlock(&instance->mutex); error("EXPORTING: Instance %s disabled", instance->config.name); }
#include "exporting/prometheus/prometheus.h" #include "exporting/opentsdb/opentsdb.h" #if ENABLE_PROMETHEUS_REMOTE_WRITE #include "exporting/prometheus/remote_write/remote_write.h" #endif
#if HAVE_KINESIS #include "exporting/aws_kinesis/aws_kinesis.h" #endif
#endif /* NETDATA_EXPORTING_ENGINE_H */
|