!C99Shell v. 2.5 [PHP 8 Update] [24.05.2025]!

Software: Apache/2.4.41 (Ubuntu). PHP/8.0.30 

uname -a: Linux apirnd 5.4.0-204-generic #224-Ubuntu SMP Thu Dec 5 13:38:28 UTC 2024 x86_64 

uid=33(www-data) gid=33(www-data) groups=33(www-data) 

Safe-mode: OFF (not secure)

/netdata/exporting/pubsub/   drwxr-xr-x
Free 13.27 GB of 57.97 GB (22.89%)
Home    Back    Forward    UPDIR    Refresh    Search    Buffer    Encoder    Tools    Proc.    FTP brute    Sec.    SQL    PHP-code    Update    Self remove    Logout    


Viewing file:     pubsub.c (6.52 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
// SPDX-License-Identifier: GPL-3.0-or-later

#include "pubsub.h"

/**
 * Initialize Pub/Sub connector instance
 *
 * @param instance an instance data structure.
 * @return Returns 0 on success, 1 on failure.
 */
int init_pubsub_instance(struct instance *instance)
{
    instance->worker = pubsub_connector_worker;

    instance->start_batch_formatting = NULL;
    instance->start_host_formatting = format_host_labels_json_plaintext;
    instance->start_chart_formatting = NULL;


    if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
        instance->metric_formatting = format_dimension_collected_json_plaintext;
    else
        instance->metric_formatting = format_dimension_stored_json_plaintext;

    instance->end_chart_formatting = NULL;
    instance->end_host_formatting = flush_host_labels;
    instance->end_batch_formatting = NULL;

    instance->prepare_header = NULL;
    instance->check_response = NULL;

    instance->buffer = (void *)buffer_create(0);
    if (!instance->buffer) {
        error("EXPORTING: cannot create buffer for Pub/Sub exporting connector instance %s", instance->config.name);
        return 1;
    }
    uv_mutex_init(&instance->mutex);
    uv_cond_init(&instance->cond_var);

    struct pubsub_specific_data *connector_specific_data = callocz(1, sizeof(struct pubsub_specific_data));
    instance->connector_specific_data = (void *)connector_specific_data;

    struct pubsub_specific_config *connector_specific_config =
        (struct pubsub_specific_config *)instance->config.connector_specific_config;
    char error_message[ERROR_LINE_MAX + 1] = "";
    if (pubsub_init(
            (void *)connector_specific_data, error_message, instance->config.destination,
            connector_specific_config->credentials_file, connector_specific_config->project_id,
            connector_specific_config->topic_id)) {
        error(
            "EXPORTING: Cannot initialize a Pub/Sub publisher for instance %s: %s",
            instance->config.name, error_message);
        return 1;
    }

    return 0;
}

/**
 * Clean a PubSub connector instance
 *
 * @param instance an instance data structure.
 */
void clean_pubsub_instance(struct instance *instance)
{
    info("EXPORTING: cleaning up instance %s ...", instance->config.name);

    struct pubsub_specific_data *connector_specific_data =
        (struct pubsub_specific_data *)instance->connector_specific_data;
    pubsub_cleanup(connector_specific_data);
    freez(connector_specific_data);

    buffer_free(instance->buffer);

    struct pubsub_specific_config *connector_specific_config =
        (struct pubsub_specific_config *)instance->config.connector_specific_config;
    freez(connector_specific_config->credentials_file);
    freez(connector_specific_config->project_id);
    freez(connector_specific_config->topic_id);
    freez(connector_specific_config);

    info("EXPORTING: instance %s exited", instance->config.name);
    instance->exited = 1;

    return;
}

/**
 * Pub/Sub connector worker
 *
 * Runs in a separate thread for every instance.
 *
 * @param instance_p an instance data structure.
 */
void pubsub_connector_worker(void *instance_p)
{
    struct instance *instance = (struct instance *)instance_p;
    struct pubsub_specific_config *connector_specific_config = instance->config.connector_specific_config;
    struct pubsub_specific_data *connector_specific_data = instance->connector_specific_data;

    while (!instance->engine->exit) {
        struct stats *stats = &instance->stats;
        char error_message[ERROR_LINE_MAX + 1] = "";

        uv_mutex_lock(&instance->mutex);
        while (!instance->data_is_ready)
            uv_cond_wait(&instance->cond_var, &instance->mutex);
        instance->data_is_ready = 0;


        if (unlikely(instance->engine->exit)) {
            uv_mutex_unlock(&instance->mutex);
            break;
        }

        // reset the monitoring chart counters
        stats->received_bytes =
        stats->sent_bytes =
        stats->sent_metrics =
        stats->lost_metrics =
        stats->receptions =
        stats->transmission_successes =
        stats->transmission_failures =
        stats->data_lost_events =
        stats->lost_bytes =
        stats->reconnects = 0;

        BUFFER *buffer = (BUFFER *)instance->buffer;
        size_t buffer_len = buffer_strlen(buffer);

        stats->buffered_bytes = buffer_len;

        if (pubsub_add_message(instance->connector_specific_data, (char *)buffer_tostring(buffer))) {
            error("EXPORTING: Instance %s: Cannot add data to a message", instance->config.name);

            stats->data_lost_events++;
            stats->lost_metrics += stats->buffered_metrics;
            stats->lost_bytes += buffer_len;

            goto cleanup;
        }

        debug(
            D_BACKEND, "EXPORTING: pubsub_publish(): project = %s, topic = %s, buffer = %zu",
            connector_specific_config->project_id, connector_specific_config->topic_id, buffer_len);

        if (pubsub_publish((void *)connector_specific_data, error_message, stats->buffered_metrics, buffer_len)) {
            error("EXPORTING: Instance: %s: Cannot publish a message: %s", instance->config.name, error_message);

            stats->transmission_failures++;
            stats->data_lost_events++;
            stats->lost_metrics += stats->buffered_metrics;
            stats->lost_bytes += buffer_len;

            goto cleanup;
        }

        stats->sent_bytes = buffer_len;
        stats->transmission_successes++;

        size_t sent_metrics = 0, lost_metrics = 0, sent_bytes = 0, lost_bytes = 0;

        if (unlikely(pubsub_get_result(
                connector_specific_data, error_message, &sent_metrics, &sent_bytes, &lost_metrics, &lost_bytes))) {
            // oops! we couldn't send (all or some of the) data
            error("EXPORTING: %s", error_message);
            error(
                "EXPORTING: failed to write data to service '%s'. Willing to write %zu bytes, wrote %zu bytes.",
                instance->config.destination, lost_bytes, sent_bytes);

            stats->transmission_failures++;
            stats->data_lost_events++;
            stats->lost_metrics += lost_metrics;
            stats->lost_bytes += lost_bytes;
        } else {
            stats->receptions++;
            stats->sent_metrics = sent_metrics;
        }

    cleanup:
        send_internal_metrics(instance);

        buffer_flush(buffer);
        stats->buffered_metrics = 0;

        uv_mutex_unlock(&instance->mutex);

#ifdef UNIT_TESTING
        return;
#endif
    }

    clean_pubsub_instance(instance);
}

:: Command execute ::

Enter:
 
Select:
 

:: Search ::
  - regexp 

:: Upload ::
 
[ Read-Only ]

:: Make Dir ::
 
[ Read-Only ]
:: Make File ::
 
[ Read-Only ]

:: Go Dir ::
 
:: Go File ::
 

--[ c99shell v. 2.5 [PHP 8 Update] [24.05.2025] | Generation time: 0.01 ]--