!C99Shell v. 2.5 [PHP 8 Update] [24.05.2025]!

Software: Apache/2.4.41 (Ubuntu). PHP/8.0.30 

uname -a: Linux apirnd 5.4.0-204-generic #224-Ubuntu SMP Thu Dec 5 13:38:28 UTC 2024 x86_64 

uid=33(www-data) gid=33(www-data) groups=33(www-data) 

Safe-mode: OFF (not secure)

/netdata/aclk/legacy/   drwxr-xr-x
Free 13.19 GB of 57.97 GB (22.76%)
Home    Back    Forward    UPDIR    Refresh    Search    Buffer    Encoder    Tools    Proc.    FTP brute    Sec.    SQL    PHP-code    Update    Self remove    Logout    


Viewing file:     agent_cloud_link.c (45.94 KB)      -rw-r--r--
Select action/file-type:
(+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
// SPDX-License-Identifier: GPL-3.0-or-later

#include "libnetdata/libnetdata.h"
#include "agent_cloud_link.h"
#include "aclk_lws_https_client.h"
#include "aclk_query.h"
#include "aclk_common.h"
#include "aclk_stats.h"
#include "../aclk_collector_list.h"

#ifdef ENABLE_ACLK
#include <libwebsockets.h>
#endif

int aclk_shutting_down = 0;

// Other global state
static int aclk_subscribed = 0;
static char *aclk_username = NULL;
static char *aclk_password = NULL;

static char *global_base_topic = NULL;
static int aclk_connecting = 0;
int aclk_force_reconnect = 0;       // Indication from lower layers

static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;

#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)

void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
void aclk_lws_wss_destroy_context();

char *create_uuid()
{
    uuid_t uuid;
    char *uuid_str = mallocz(36 + 1);

    uuid_generate(uuid);
    uuid_unparse(uuid, uuid_str);

    return uuid_str;
}

int legacy_cloud_to_agent_parse(JSON_ENTRY *e)
{
    struct aclk_request *data = e->callback_data;

    switch (e->type) {
        case JSON_OBJECT:
        case JSON_ARRAY:
            break;
        case JSON_STRING:
            if (!strcmp(e->name, "msg-id")) {
                data->msg_id = strdupz(e->data.string);
                break;
            }
            if (!strcmp(e->name, "type")) {
                data->type_id = strdupz(e->data.string);
                break;
            }
            if (!strcmp(e->name, "callback-topic")) {
                data->callback_topic = strdupz(e->data.string);
                break;
            }
            if (!strcmp(e->name, "payload")) {
                if (likely(e->data.string)) {
                    size_t len = strlen(e->data.string);
                    data->payload = mallocz(len+1);
                    if (!url_decode_r(data->payload, e->data.string, len + 1))
                        strcpy(data->payload, e->data.string);
                }
                break;
            }
            break;
        case JSON_NUMBER:
            if (!strcmp(e->name, "version")) {
                data->version = e->data.number;
                break;
            }
            if (!strcmp(e->name, "min-version")) {
                data->min_version = e->data.number;
                break;
            }
            if (!strcmp(e->name, "max-version")) {
                data->max_version = e->data.number;
                break;
            }

            break;

        case JSON_BOOLEAN:
            break;

        case JSON_NULL:
            break;
    }
    return 0;
}


static RSA *aclk_private_key = NULL;
static int create_private_key()
{
    if (aclk_private_key != NULL)
        RSA_free(aclk_private_key);
    aclk_private_key = NULL;
    char filename[FILENAME_MAX + 1];
    snprintfz(filename, FILENAME_MAX, "%s/cloud.d/private.pem", netdata_configured_varlib_dir);

    long bytes_read;
    char *private_key = read_by_filename(filename, &bytes_read);
    if (!private_key) {
        error("Claimed agent cannot establish ACLK - unable to load private key '%s' failed.", filename);
        return 1;
    }
    debug(D_ACLK, "Claimed agent loaded private key len=%ld bytes", bytes_read);

    BIO *key_bio = BIO_new_mem_buf(private_key, -1);
    if (key_bio==NULL) {
        error("Claimed agent cannot establish ACLK - failed to create BIO for key");
        goto biofailed;
    }

    aclk_private_key = PEM_read_bio_RSAPrivateKey(key_bio, NULL, NULL, NULL);
    BIO_free(key_bio);
    if (aclk_private_key!=NULL)
    {
        freez(private_key);
        return 0;
    }
    char err[512];
    ERR_error_string_n(ERR_get_error(), err, sizeof(err));
    error("Claimed agent cannot establish ACLK - cannot create private key: %s", err);

biofailed:
    freez(private_key);
    return 1;
}

/*
 * After a connection failure -- delay in milliseconds
 * When a connection is established, the delay function
 * should be called with
 *
 * mode 0 to reset the delay
 * mode 1 to calculate sleep time [0 .. ACLK_MAX_BACKOFF_DELAY * 1000] ms
 *
 */
unsigned long int aclk_reconnect_delay(int mode)
{
    static int fail = -1;
    unsigned long int delay;

    if (!mode || fail == -1) {
        srandom(time(NULL));
        fail = mode - 1;
        return 0;
    }

    delay = (1 << fail);

    if (delay >= ACLK_MAX_BACKOFF_DELAY) {
        delay = ACLK_MAX_BACKOFF_DELAY * 1000;
    } else {
        fail++;
        delay *= 1000;
        delay += (random() % (MAX(1000, delay/2)));
    }

    return delay;
}

// This will give the base topic that the agent will publish messages.
// subtopics will be sent under the base topic e.g.  base_topic/subtopic
// This is called during the connection, we delete any previous topic
// in-case the user has changed the agent id and reclaimed.

char *create_publish_base_topic()
{
    char *agent_id = is_agent_claimed();
    if (unlikely(!agent_id))
        return NULL;

    ACLK_LOCK;

    if (global_base_topic)
        freez(global_base_topic);
    char tmp_topic[ACLK_MAX_TOPIC + 1], *tmp;

    snprintf(tmp_topic, ACLK_MAX_TOPIC, ACLK_TOPIC_STRUCTURE, agent_id);
    tmp = strchr(tmp_topic, '\n');
    if (unlikely(tmp))
        *tmp = '\0';
    global_base_topic = strdupz(tmp_topic);

    ACLK_UNLOCK;
    freez(agent_id);
    return global_base_topic;
}

/*
 * Build a topic based on sub_topic and final_topic
 * if the sub topic starts with / assume that is an absolute topic
 *
 */

char *get_topic(char *sub_topic, char *final_topic, int max_size)
{
    int rc;

    if (likely(sub_topic && sub_topic[0] == '/'))
        return sub_topic;

    if (unlikely(!global_base_topic))
        return sub_topic;

    rc = snprintf(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
    if (unlikely(rc >= max_size))
        debug(D_ACLK, "Topic has been truncated to [%s] instead of [%s/%s]", final_topic, global_base_topic, sub_topic);

    return final_topic;
}

/* Avoids the need to scan trough all RRDHOSTS
 * every time any Query Thread Wakes Up
 * (every time we need to check child popcorn expiry)
 * call with legacy_aclk_shared_state_LOCK held
 */
void aclk_update_next_child_to_popcorn(void)
{
    RRDHOST *host;
    int any = 0;

    rrd_rdlock();
    rrdhost_foreach_read(host) {
        if (unlikely(host == localhost || rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED)))
            continue;

        rrdhost_aclk_state_lock(host);
        if (!ACLK_IS_HOST_POPCORNING(host)) {
            rrdhost_aclk_state_unlock(host);
            continue;
        }

        any = 1;

        if (unlikely(!legacy_aclk_shared_state.next_popcorn_host)) {
            legacy_aclk_shared_state.next_popcorn_host = host;
            rrdhost_aclk_state_unlock(host);
            continue;
        }

        if (legacy_aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update)
            legacy_aclk_shared_state.next_popcorn_host = host;

        rrdhost_aclk_state_unlock(host);
    }
    if(!any)
        legacy_aclk_shared_state.next_popcorn_host = NULL;

    rrd_unlock();
}

/* If popcorning bump timer.
 * If popcorning or initializing (host not stable) return 1
 * Otherwise return 0
 */
static int aclk_popcorn_check_bump(RRDHOST *host)
{
    time_t now = now_monotonic_sec();
    int updated = 0, ret;
    legacy_aclk_shared_state_LOCK;
    rrdhost_aclk_state_lock(host);

    ret = ACLK_IS_HOST_INITIALIZING(host);
    if (unlikely(ACLK_IS_HOST_POPCORNING(host))) {
        if(now != host->aclk_state.t_last_popcorn_update) {
            updated = 1;
            info("Restarting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
        }
        host->aclk_state.t_last_popcorn_update = now;
        rrdhost_aclk_state_unlock(host);

        if (host != localhost && updated)
            aclk_update_next_child_to_popcorn();

        legacy_aclk_shared_state_UNLOCK;
        return ret;
    }

    rrdhost_aclk_state_unlock(host);
    legacy_aclk_shared_state_UNLOCK;
    return ret;
}

inline static int aclk_host_initializing(RRDHOST *host)
{
    rrdhost_aclk_state_lock(host);
    int ret = ACLK_IS_HOST_INITIALIZING(host);
    rrdhost_aclk_state_unlock(host);
    return ret;
}

static void aclk_start_host_popcorning(RRDHOST *host)
{
    usec_t now = now_monotonic_sec();
    info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
    legacy_aclk_shared_state_LOCK;
    rrdhost_aclk_state_lock(host);
    if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) {
        errno = 0;
        error("Localhost is allowed to do popcorning only once after startup!");
        rrdhost_aclk_state_unlock(host);
        legacy_aclk_shared_state_UNLOCK;
        return;
    }

    host->aclk_state.state = ACLK_HOST_INITIALIZING;
    host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
    host->aclk_state.t_last_popcorn_update = now;
    rrdhost_aclk_state_unlock(host);
    if (host != localhost)
        aclk_update_next_child_to_popcorn();
    legacy_aclk_shared_state_UNLOCK;
}

static void aclk_stop_host_popcorning(RRDHOST *host)
{
    legacy_aclk_shared_state_LOCK;
    rrdhost_aclk_state_lock(host);
    if (!ACLK_IS_HOST_POPCORNING(host)) {
        rrdhost_aclk_state_unlock(host);
        legacy_aclk_shared_state_UNLOCK;
        return;
    }
    
    info("Host Disconnected before ACLK popcorning finished. Canceling. Host \"%s\" GUID:\"%s\"", host->hostname, host->machine_guid);
    host->aclk_state.t_last_popcorn_update = 0;
    host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
    rrdhost_aclk_state_unlock(host);

    if(host == legacy_aclk_shared_state.next_popcorn_host) {
        legacy_aclk_shared_state.next_popcorn_host = NULL;
        aclk_update_next_child_to_popcorn();
    }
    legacy_aclk_shared_state_UNLOCK;
}

/*
 * Add a new collector to the list
 * If it exists, update the chart count
 */
void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
    struct _collector *tmp_collector;
    if (unlikely(!netdata_ready)) {
        return;
    }

    COLLECTOR_LOCK;

    tmp_collector = _add_collector(host->machine_guid, plugin_name, module_name);

    if (unlikely(tmp_collector->count != 1)) {
        COLLECTOR_UNLOCK;
        return;
    }

    COLLECTOR_UNLOCK;

    if(aclk_popcorn_check_bump(host))
        return;

    if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
        debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
}

/*
 * Delete a collector from the list
 * If the chart count reaches zero the collector will be removed
 * from the list by calling del_collector.
 *
 * This function will release the memory used and schedule
 * a cloud update
 */
void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
    struct _collector *tmp_collector;
    if (unlikely(!netdata_ready)) {
        return;
    }

    COLLECTOR_LOCK;

    tmp_collector = _del_collector(host->machine_guid, plugin_name, module_name);

    if (unlikely(!tmp_collector || tmp_collector->count)) {
        COLLECTOR_UNLOCK;
        return;
    }

    debug(
        D_ACLK, "DEL COLLECTOR [%s:%s] -- charts %u", plugin_name ? plugin_name : "*", module_name ? module_name : "*",
        tmp_collector->count);

    COLLECTOR_UNLOCK;

    _free_collector(tmp_collector);

    if (aclk_popcorn_check_bump(host))
        return;

    if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
        debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
}

static void aclk_graceful_disconnect()
{
    size_t write_q, write_q_bytes, read_q;
    time_t event_loop_timeout;

    // Send a graceful disconnect message
    BUFFER *b = buffer_create(512);
    aclk_create_header(b, "disconnect", NULL, 0, 0, legacy_aclk_shared_state.version_neg);
    buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}");
    aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
    buffer_free(b);

    event_loop_timeout = now_realtime_sec() + 5;
    write_q = 1;
    while (write_q && event_loop_timeout > now_realtime_sec()) {
        _link_event_loop();
        lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
    }

    aclk_shutting_down = 1;
    _link_shutdown();
    aclk_lws_wss_mqtt_layer_disconnect_notif();

    write_q = 1;
    event_loop_timeout = now_realtime_sec() + 5;
    while (write_q && event_loop_timeout > now_realtime_sec()) {
        _link_event_loop();
        lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
    }
    aclk_shutting_down = 0;
}

#ifndef __GNUC__
#pragma region Incoming Msg Parsing
#endif

struct dictionary_singleton {
    char *key;
    char *result;
};

int json_extract_singleton(JSON_ENTRY *e)
{
    struct dictionary_singleton *data = e->callback_data;

    switch (e->type) {
        case JSON_OBJECT:
        case JSON_ARRAY:
            break;
        case JSON_STRING:
            if (!strcmp(e->name, data->key)) {
                data->result = strdupz(e->data.string);
                break;
            }
            break;
        case JSON_NUMBER:
        case JSON_BOOLEAN:
        case JSON_NULL:
            break;
    }
    return 0;
}

#ifndef __GNUC__
#pragma endregion
#endif


#ifndef __GNUC__
#pragma region Challenge Response
#endif

// Base-64 decoder.
// Note: This is non-validating, invalid input will be decoded without an error.
//       Challenges are packed into json strings so we don't skip newlines.
//       Size errors (i.e. invalid input size or insufficient output space) are caught.
size_t base64_decode(unsigned char *input, size_t input_size, unsigned char *output, size_t output_size)
{
    static char lookup[256];
    static int first_time=1;
    if (first_time)
    {
        first_time = 0;
        for(int i=0; i<256; i++)
            lookup[i] = -1;
        for(int i='A'; i<='Z'; i++)
            lookup[i] = i-'A';
        for(int i='a'; i<='z'; i++)
            lookup[i] = i-'a' + 26;
        for(int i='0'; i<='9'; i++)
            lookup[i] = i-'0' + 52;
        lookup['+'] = 62;
        lookup['/'] = 63;
    }
    if ((input_size & 3) != 0)
    {
        error("Can't decode base-64 input length %zu", input_size);
        return 0;
    }
    size_t unpadded_size = (input_size/4) * 3;
    if ( unpadded_size > output_size )
    {
        error("Output buffer size %zu is too small to decode %zu into", output_size, input_size);
        return 0;
    }
    // Don't check padding within full quantums
    for (size_t i = 0 ; i < input_size-4 ; i+=4 )
    {
        uint32_t value = (lookup[input[0]] << 18) + (lookup[input[1]] << 12) + (lookup[input[2]] << 6) + lookup[input[3]];
        output[0] = value >> 16;
        output[1] = value >> 8;
        output[2] = value;
        //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
        output += 3;
        input += 4;
    }
    // Handle padding only in last quantum
    if (input[2] == '=') {
        uint32_t value = (lookup[input[0]] << 6) + lookup[input[1]];
        output[0] = value >> 4;
        //error("Decoded %c %c %c %c -> %02x", input[0], input[1], input[2], input[3], output[0]);
        return unpadded_size-2;
    }
    else if (input[3] == '=') {
        uint32_t value = (lookup[input[0]] << 12) + (lookup[input[1]] << 6) + lookup[input[2]];
        output[0] = value >> 10;
        output[1] = value >> 2;
        //error("Decoded %c %c %c %c -> %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1]);
        return unpadded_size-1;
    }
    else
    {
        uint32_t value = (input[0] << 18) + (input[1] << 12) + (input[2]<<6) + input[3];
        output[0] = value >> 16;
        output[1] = value >> 8;
        output[2] = value;
        //error("Decoded %c %c %c %c -> %02x %02x %02x", input[0], input[1], input[2], input[3], output[0], output[1], output[2]);
        return unpadded_size;
    }
}

size_t base64_encode(unsigned char *input, size_t input_size, char *output, size_t output_size)
{
    uint32_t value;
    static char lookup[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
                           "abcdefghijklmnopqrstuvwxyz"
                           "0123456789+/";
    if ((input_size/3+1)*4 >= output_size)
    {
        error("Output buffer for encoding size=%zu is not large enough for %zu-bytes input", output_size, input_size);
        return 0;
    }
    size_t count = 0;
    while (input_size>3)
    {
        value = ((input[0] << 16) + (input[1] << 8) + input[2]) & 0xffffff;
        output[0] = lookup[value >> 18];
        output[1] = lookup[(value >> 12) & 0x3f];
        output[2] = lookup[(value >> 6) & 0x3f];
        output[3] = lookup[value & 0x3f];
        //error("Base-64 encode (%04x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]);
        output += 4;
        input += 3;
        input_size -= 3;
        count += 4;
    }
    switch (input_size)
    {
        case 2:
            value = (input[0] << 10) + (input[1] << 2);
            output[0] = lookup[(value >> 12) & 0x3f];
            output[1] = lookup[(value >> 6) & 0x3f];
            output[2] = lookup[value & 0x3f];
            output[3] = '=';
            //error("Base-64 encode (%06x) -> %c %c %c %c\n", (value>>2)&0xffff, output[0], output[1], output[2], output[3]); 
            count += 4;
            break;
        case 1:
            value = input[0] << 4;
            output[0] = lookup[(value >> 6) & 0x3f];
            output[1] = lookup[value & 0x3f];
            output[2] = '=';
            output[3] = '=';
            //error("Base-64 encode (%06x) -> %c %c %c %c\n", value, output[0], output[1], output[2], output[3]); 
            count += 4;
            break;
        case 0:
            break;
    }
    return count;
}



int private_decrypt(unsigned char * enc_data, int data_len, unsigned char *decrypted)
{
    int  result = RSA_private_decrypt( data_len, enc_data, decrypted, aclk_private_key, RSA_PKCS1_OAEP_PADDING);
    if (result == -1) {
        char err[512];
        ERR_error_string_n(ERR_get_error(), err, sizeof(err));
        error("Decryption of the challenge failed: %s", err);
    }
    return result;
}

void aclk_get_challenge(char *aclk_hostname, int port)
{
    char *data_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
    debug(D_ACLK, "Performing challenge-response sequence");
    if (aclk_password != NULL)
    {
        freez(aclk_password);
        aclk_password = NULL;
    }
    // curl http://cloud-iam-agent-service:8080/api/v1/auth/node/00000000-0000-0000-0000-000000000000/challenge
    // TODO - target host?
    char *agent_id = is_agent_claimed();
    if (agent_id == NULL)
    {
        error("Agent was not claimed - cannot perform challenge/response");
        goto CLEANUP;
    }
    char url[1024];
    sprintf(url, "/api/v1/auth/node/%s/challenge", agent_id);
    info("Retrieving challenge from cloud: %s %d %s", aclk_hostname, port, url);
    if(aclk_send_https_request("GET", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, NULL))
    {
        error("Challenge failed: %s", data_buffer);
        goto CLEANUP;
    }
    struct dictionary_singleton challenge = { .key = "challenge", .result = NULL };

    debug(D_ACLK, "Challenge response from cloud: %s", data_buffer);
    if ( json_parse(data_buffer, &challenge, json_extract_singleton) != JSON_OK)
    {
        freez(challenge.result);
        error("Could not parse the json response with the challenge: %s", data_buffer);
        goto CLEANUP;
    }
    if (challenge.result == NULL ) {
        error("Could not retrieve challenge from auth response: %s", data_buffer);
        goto CLEANUP;
    }


    size_t challenge_len = strlen(challenge.result);
    unsigned char decoded[512];
    size_t decoded_len = base64_decode((unsigned char*)challenge.result, challenge_len, decoded, sizeof(decoded));

    unsigned char plaintext[4096]={};
    int decrypted_length = private_decrypt(decoded, decoded_len, plaintext);
    freez(challenge.result);
    char encoded[512];
    size_t encoded_len = base64_encode(plaintext, decrypted_length, encoded, sizeof(encoded));
    encoded[encoded_len] = 0;
    debug(D_ACLK, "Encoded len=%zu Decryption len=%d: '%s'", encoded_len, decrypted_length, encoded);

    char response_json[4096]={};
    sprintf(response_json, "{\"response\":\"%s\"}", encoded);
    debug(D_ACLK, "Password phase: %s",response_json);
    // TODO - host
    sprintf(url, "/api/v1/auth/node/%s/password", agent_id);
    if(aclk_send_https_request("POST", aclk_hostname, port, url, data_buffer, NETDATA_WEB_RESPONSE_INITIAL_SIZE, response_json))
    {
        error("Challenge-response failed: %s", data_buffer);
        goto CLEANUP;
    }

    debug(D_ACLK, "Password response from cloud: %s", data_buffer);

    struct dictionary_singleton password = { .key = "password", .result = NULL };
    if ( json_parse(data_buffer, &password, json_extract_singleton) != JSON_OK)
    {
        freez(password.result);
        error("Could not parse the json response with the password: %s", data_buffer);
        goto CLEANUP;
    }

    if (password.result == NULL ) {
        error("Could not retrieve password from auth response");
        goto CLEANUP;
    }
    if (aclk_password != NULL )
        freez(aclk_password);
    aclk_password = password.result;
    if (aclk_username != NULL)
        freez(aclk_username);
    aclk_username = agent_id;
    agent_id = NULL;

CLEANUP:
    if (agent_id != NULL)
        freez(agent_id);
    freez(data_buffer);
    return;
}

#ifndef __GNUC__
#pragma endregion
#endif

static void aclk_try_to_connect(char *hostname, int port)
{
    int rc;

// this is useful for developers working on ACLK
// allows connecting agent to any MQTT broker
// for debugging, development and testing purposes
#ifndef ACLK_DISABLE_CHALLENGE
    if (!aclk_private_key) {
        error("Cannot try to establish the agent cloud link - no private key available!");
        return;
    }
#endif

    info("Attempting to establish the agent cloud link");
#ifdef ACLK_DISABLE_CHALLENGE
    error("Agent built with ACLK_DISABLE_CHALLENGE. This is for testing "
          "and development purposes only. Warranty void. Won't be able "
          "to connect to Netdata Cloud.");
    if (aclk_password == NULL)
        aclk_password = strdupz("anon");
#else
    aclk_get_challenge(hostname, port);
    if (aclk_password == NULL)
        return;
#endif

    aclk_connecting = 1;
    create_publish_base_topic();

    legacy_aclk_shared_state_LOCK;
    legacy_aclk_shared_state.version_neg = 0;
    legacy_aclk_shared_state.version_neg_wait_till = 0;
    legacy_aclk_shared_state_UNLOCK;

    rc = mqtt_attempt_connection(hostname, port, aclk_username, aclk_password);
    if (unlikely(rc)) {
        error("Failed to initialize the agent cloud link library");
    }
}

// Sends "hello" message to negotiate ACLK version with cloud
static inline void aclk_hello_msg()
{
    BUFFER *buf = buffer_create(NETDATA_WEB_RESPONSE_HEADER_SIZE);

    char *msg_id = create_uuid();

    legacy_aclk_shared_state_LOCK;
    legacy_aclk_shared_state.version_neg = 0;
    legacy_aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
    legacy_aclk_shared_state_UNLOCK;

    //Hello message is versioned separately from the rest of the protocol
    aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
    buffer_sprintf(buf, ",\"min-version\":%d,\"max-version\":%d}", ACLK_VERSION_MIN, ACLK_VERSION_MAX);
    aclk_send_message(ACLK_METADATA_TOPIC, buf->buffer, msg_id);
    freez(msg_id);
    buffer_free(buf);
}

/**
 * Main agent cloud link thread
 *
 * This thread will simply call the main event loop that handles
 * pending requests - both inbound and outbound
 *
 * @param ptr is a pointer to the netdata_static_thread structure.
 *
 * @return It always returns NULL
 */
void *legacy_aclk_main(void *ptr)
{
    struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
    struct aclk_query_threads query_threads;
    struct aclk_stats_thread *stats_thread = NULL;
    time_t last_periodic_query_wakeup = 0;

    query_threads.thread_list = NULL;

    // This thread is unusual in that it cannot be cancelled by cancel_main_threads()
    // as it must notify the far end that it shutdown gracefully and avoid the LWT.
    netdata_thread_disable_cancelability();

#if defined( DISABLE_CLOUD ) || !defined( ENABLE_ACLK)
    info("Killing ACLK thread -> cloud functionality has been disabled");
    static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
    return NULL;
#endif

#ifndef LWS_WITH_SOCKS5
    ACLK_PROXY_TYPE proxy_type;
    aclk_get_proxy(&proxy_type);
    if(proxy_type == PROXY_TYPE_SOCKS5) {
        error("Disabling ACLK due to requested SOCKS5 proxy.");
        static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
        return NULL;
    }
#endif

    info("Waiting for netdata to be ready");
    while (!netdata_ready) {
        sleep_usec(USEC_PER_MS * 300);
    }

    info("Waiting for Cloud to be enabled");
    while (!netdata_cloud_setting) {
        sleep_usec(USEC_PER_SEC * 1);
        if (netdata_exit) {
            static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
            return NULL;
        }
    }

    query_threads.count = MIN(processors/2, 6);
    query_threads.count = MAX(query_threads.count, 2);
    query_threads.count = config_get_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count);
    if(query_threads.count < 1) {
        error("You need at least one query thread. Overriding configured setting of \"%d\"", query_threads.count);
        query_threads.count = 1;
        config_set_number(CONFIG_SECTION_CLOUD, "query thread count", query_threads.count);
    }

    //start localhost popcorning
    aclk_start_host_popcorning(localhost);

    aclk_stats_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "statistics", CONFIG_BOOLEAN_YES);
    if (aclk_stats_enabled) {
        stats_thread = callocz(1, sizeof(struct aclk_stats_thread));
        stats_thread->thread = mallocz(sizeof(netdata_thread_t));
        stats_thread->query_thread_count = query_threads.count;
        netdata_thread_create(
            stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_stats_main_thread,
            stats_thread);
    }

    char *aclk_hostname = NULL; // Initializers are over-written but prevent gcc complaining about clobbering.
    int port_num = 0;
    info("Waiting for netdata to be claimed");
    while(1) {
        char *agent_id = is_agent_claimed();
        while (likely(!agent_id)) {
            sleep_usec(USEC_PER_SEC * 1);
            if (netdata_exit)
                goto exited;
            agent_id = is_agent_claimed();
        }
        freez(agent_id);
        // The NULL return means the value was never initialised, but this value has been initialized in post_conf_load.
        // We trap the impossible NULL here to keep the linter happy without using a fatal() in the code.
        char *cloud_base_url = appconfig_get(&cloud_config, CONFIG_SECTION_GLOBAL, "cloud base url", NULL);
        if (cloud_base_url == NULL) {
            error("Do not move the cloud base url out of post_conf_load!!");
            goto exited;
        }
        if (aclk_decode_base_url(cloud_base_url, &aclk_hostname, &port_num))
            error("Agent is claimed but the configuration is invalid, please fix");
        else if (!create_private_key() && !_mqtt_lib_init())
                break;

        for (int i=0; i<60; i++) {
            if (netdata_exit)
                goto exited;

            sleep_usec(USEC_PER_SEC * 1);
        }
    }

    usec_t reconnect_expiry = 0; // In usecs

    while (!netdata_exit) {
        static int first_init = 0;
 /*       size_t write_q, write_q_bytes, read_q;
        lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);*/

        if (aclk_disable_runtime && !aclk_connected) {
            sleep(1);
            continue;
        }

        if (aclk_kill_link) {                       // User has reloaded the claiming state
            aclk_kill_link = 0;
            aclk_graceful_disconnect();
            create_private_key();
            continue;
        }

        if (aclk_force_reconnect) {
            aclk_lws_wss_destroy_context();
            aclk_force_reconnect = 0;
        }
        if (unlikely(!netdata_exit && !aclk_connected && !aclk_force_reconnect)) {
            if (unlikely(!first_init)) {
                aclk_try_to_connect(aclk_hostname, port_num);
                first_init = 1;
            } else {
                if (aclk_connecting == 0) {
                    if (reconnect_expiry == 0) {
                        unsigned long int delay = aclk_reconnect_delay(1);
                        reconnect_expiry = now_realtime_usec() + delay * 1000;
                        info("Retrying to establish the ACLK connection in %.3f seconds", delay / 1000.0);
                    }
                    if (now_realtime_usec() >= reconnect_expiry) {
                        reconnect_expiry = 0;
                        aclk_try_to_connect(aclk_hostname, port_num);
                    }
                    sleep_usec(USEC_PER_MS * 100);
                }
            }
            if (aclk_connecting) {
                _link_event_loop();
                sleep_usec(USEC_PER_MS * 100);
            }
            continue;
        }

        _link_event_loop();
        if (unlikely(!aclk_connected || aclk_force_reconnect))
            continue;
        /*static int stress_counter = 0;
        if (write_q_bytes==0 && stress_counter ++ >5)
        {
            aclk_send_stress_test(8000000);
            stress_counter = 0;
        }*/

        if (unlikely(!aclk_subscribed)) {
            aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 1);
            aclk_hello_msg();
        }

        if (unlikely(!query_threads.thread_list)) {
            legacy_aclk_query_threads_start(&query_threads);
        }

        time_t now = now_monotonic_sec();
        if(aclk_connected && last_periodic_query_wakeup < now) {
            // to make `legacy_aclk_queue_query()` param `run_after` work
            // also makes per child popcorning work
            last_periodic_query_wakeup = now;
            LEGACY_QUERY_THREAD_WAKEUP;
        }
    } // forever
exited:
    // Wakeup query thread to cleanup
    LEGACY_QUERY_THREAD_WAKEUP_ALL;

    freez(aclk_username);
    freez(aclk_password);
    freez(aclk_hostname);
    if (aclk_private_key != NULL)
        RSA_free(aclk_private_key);

    static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;

    char *agent_id = is_agent_claimed();
    if (agent_id && aclk_connected) {
        freez(agent_id);
        // Wakeup thread to cleanup
        LEGACY_QUERY_THREAD_WAKEUP;
        aclk_graceful_disconnect();
    }

    legacy_aclk_query_threads_cleanup(&query_threads);

    _reset_collector_list();
    freez(collector_list);

    if(aclk_stats_enabled) {
        netdata_thread_join(*stats_thread->thread, NULL);
        legacy_aclk_stats_thread_cleanup();
        freez(stats_thread->thread);
        freez(stats_thread);
    }

    /*
     * this must be last -> if all static threads signal
     * THREAD_EXITED rrdengine will dealloc the RRDSETs
     * and RRDDIMs that are used by still running stat thread.
     * see netdata_cleanup_and_exit() for reference
     */
    static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
    return NULL;
}

/*
 * Send a message to the cloud, using a base topic and sib_topic
 * The final topic will be in the form <base_topic>/<sub_topic>
 * If base_topic is missing then the global_base_topic will be used (if available)
 *
 */
int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id)
{
    int rc;
    int mid;
    char topic[ACLK_MAX_TOPIC + 1];
    char *final_topic;

    UNUSED(msg_id);

    if (!aclk_connected)
        return 0;

    if (unlikely(!message))
        return 0;

    final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);

    if (unlikely(!final_topic)) {
        errno = 0;
        error("Unable to build outgoing topic; truncated?");
        return 1;
    }

    ACLK_LOCK;
    rc = _link_send_message(final_topic, message, len, &mid);
    // TODO: link the msg_id with the mid so we can trace it
    ACLK_UNLOCK;

    if (unlikely(rc)) {
        errno = 0;
        error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
    }

    return rc;
}

int aclk_send_message(char *sub_topic, char *message, char *msg_id)
{
    return aclk_send_message_bin(sub_topic, message, strlen(message), msg_id);
}

/*
 * Subscribe to a topic in the cloud
 * The final subscription will be in the form
 * /agent/claim_id/<sub_topic>
 */
int aclk_subscribe(char *sub_topic, int qos)
{
    int rc;
    char topic[ACLK_MAX_TOPIC + 1];
    char *final_topic;

    final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
    if (unlikely(!final_topic)) {
        errno = 0;
        error("Unable to build outgoing topic; truncated?");
        return 1;
    }

    if (!aclk_connected) {
        error("Cannot subscribe to %s - not connected!", topic);
        return 1;
    }

    ACLK_LOCK;
    rc = _link_subscribe(final_topic, qos);
    ACLK_UNLOCK;

    // TODO: Add better handling -- error will flood the logfile here
    if (unlikely(rc)) {
        errno = 0;
        error("Failed subscribe to command topic %d (%s)", rc, _link_strerror(rc));
    }

    return rc;
}

// This is called from a callback when the link goes up
void aclk_connect()
{
    info("Connection detected (%u queued queries)", aclk_query_size());

    legacy_aclk_stats_upd_online(1);

    aclk_connected = 1;
    aclk_reconnect_delay(0);

    LEGACY_QUERY_THREAD_WAKEUP;
    return;
}

// This is called from a callback when the link goes down
void aclk_disconnect()
{
    if (likely(aclk_connected))
        info("Disconnect detected (%u queued queries)", aclk_query_size());

    legacy_aclk_stats_upd_online(0);

    aclk_subscribed = 0;
    rrdhost_aclk_state_lock(localhost);
    localhost->aclk_state.metadata = ACLK_METADATA_REQUIRED;
    rrdhost_aclk_state_unlock(localhost);
    aclk_connected = 0;
    aclk_connecting = 0;
    aclk_force_reconnect = 1;
}

inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version)
{
    uuid_t uuid;
    char uuid_str[36 + 1];

    if (unlikely(!msg_id)) {
        uuid_generate(uuid);
        uuid_unparse(uuid, uuid_str);
        msg_id = uuid_str;
    }

    if (ts_secs == 0) {
        ts_us = now_realtime_usec();
        ts_secs = ts_us / USEC_PER_SEC;
        ts_us = ts_us % USEC_PER_SEC;
    }

    buffer_sprintf(
        dest,
        "{\t\"type\": \"%s\",\n"
        "\t\"msg-id\": \"%s\",\n"
        "\t\"timestamp\": %ld,\n"
        "\t\"timestamp-offset-usec\": %llu,\n"
        "\t\"connect\": %ld,\n"
        "\t\"connect-offset-usec\": %llu,\n"
        "\t\"version\": %d",
        type, msg_id, ts_secs, ts_us, aclk_session_sec, aclk_session_us, version);

    debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", version, msg_id, type, ts_secs);
}


/*
 * This will send alarm information which includes
 *    configured alarms
 *    alarm_log
 *    active alarms
 */
void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);

void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
{
    BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);

    char *msg_id = create_uuid();
    buffer_flush(local_buffer);
    local_buffer->contenttype = CT_APPLICATION_JSON;

    debug(D_ACLK, "Metadata alarms start");

    // on_connect messages are sent on a health reload, if the on_connect message is real then we
    // use the session time as the fake timestamp to indicate that it starts the session. If it is
    // a fake on_connect message then use the real timestamp to indicate it is within the existing
    // session.

    if (metadata_submitted == ACLK_METADATA_SENT)
        aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
    else
        aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg);
    buffer_strcat(local_buffer, ",\n\t\"payload\": ");


    buffer_sprintf(local_buffer, "{\n\t \"configured-alarms\" : ");
    health_alarms2json(localhost, local_buffer, 1);
    debug(D_ACLK, "Metadata %s with configured alarms has %zu bytes", msg_id, local_buffer->len);
    //    buffer_sprintf(local_buffer, ",\n\t \"alarm-log\" : ");
    //   health_alarm_log2json(localhost, local_buffer, 0);
    //   debug(D_ACLK, "Metadata %s with alarm_log has %zu bytes", msg_id, local_buffer->len);
    buffer_sprintf(local_buffer, ",\n\t \"alarms-active\" : ");
    health_active_log_alarms_2json(localhost, local_buffer);
    //debug(D_ACLK, "Metadata message %s", local_buffer->buffer);



    buffer_sprintf(local_buffer, "\n}\n}");
    aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);

    freez(msg_id);
    buffer_free(local_buffer);
}

/*
 * This will send the agent metadata
 *    /api/v1/info
 *    charts
 */
int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host)
{
    BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);

    debug(D_ACLK, "Metadata /info start");

    char *msg_id = create_uuid();
    buffer_flush(local_buffer);
    local_buffer->contenttype = CT_APPLICATION_JSON;

    // on_connect messages are sent on a health reload, if the on_connect message is real then we
    // use the session time as the fake timestamp to indicate that it starts the session. If it is
    // a fake on_connect message then use the real timestamp to indicate it is within the existing
    // session.
    if (metadata_submitted == ACLK_METADATA_SENT)
        aclk_create_header(local_buffer, "update", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
    else
        aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg);
    buffer_strcat(local_buffer, ",\n\t\"payload\": ");

    buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
    web_client_api_request_v1_info_fill_buffer(host, local_buffer);
    debug(D_ACLK, "Metadata %s with info has %zu bytes", msg_id, local_buffer->len);

    buffer_sprintf(local_buffer, ", \n\t \"charts\" : ");
    charts2json(host, local_buffer, 1, 0);
    buffer_sprintf(local_buffer, "\n}\n}");
    debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);

    aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);

    freez(msg_id);
    buffer_free(local_buffer);
    return 0;
}

int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd)
{
    BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
    local_buffer->contenttype = CT_APPLICATION_JSON;

    if(legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
        fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, legacy_aclk_shared_state.version_neg);

    debug(D_ACLK, "Sending Child Disconnect");

    char *msg_id = create_uuid();

    aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);

    buffer_strcat(local_buffer, ",\"payload\":");

    buffer_sprintf(local_buffer, "{\"guid\":\"%s\",\"claim_id\":", host->machine_guid);
    rrdhost_aclk_state_lock(host);
    if(host->aclk_state.claimed_id)
        buffer_sprintf(local_buffer, "\"%s\"}}", host->aclk_state.claimed_id);
    else
        buffer_strcat(local_buffer, "null}}");

    rrdhost_aclk_state_unlock(host);

    aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);

    freez(msg_id);
    buffer_free(local_buffer);
    return 0;
}

void legacy_aclk_host_state_update(RRDHOST *host, int connect)
{
#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
    if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
        return;
#else
#warning "This check became unnecessary. Remove"
#endif

    if (unlikely(aclk_host_initializing(localhost)))
        return;

    if (connect) {
        debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid);
        aclk_start_host_popcorning(host);
        legacy_aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT);
    } else {
        debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid);
        aclk_stop_host_popcorning(host);
        legacy_aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT);
    }
}

void aclk_send_stress_test(size_t size)
{
    char *buffer = mallocz(size);
    if (buffer != NULL)
    {
        for(size_t i=0; i<size; i++)
            buffer[i] = 'x';
        buffer[size-1] = 0;
        time_t time_created = now_realtime_sec();
        sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created);
        buffer[strlen(buffer)] = '"';
        buffer[size-2] = '}';
        buffer[size-3] = '"';
        aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL);
        error("Sending stress of size %zu at time %ld", size, time_created);
    }
    free(buffer);
}

// Send info metadata message to the cloud if the link is established
// or on request
int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host)
{
    legacy_aclk_send_info_metadata(state, host);

    if(host == localhost)
        legacy_aclk_send_alarm_metadata(state);

    return 0;
}

// Triggered by a health reload, sends the alarm metadata
void legacy_aclk_alarm_reload()
{
    if (unlikely(aclk_host_initializing(localhost)))
        return;

    if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
        if (likely(aclk_connected)) {
            errno = 0;
            error("ACLK failed to queue on_connect command on alarm reload");
        }
    }
}
//rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)

int aclk_send_single_chart(RRDHOST *host, char *chart)
{
    RRDSET *st = rrdset_find(host, chart);
    if (!st)
        st = rrdset_find_byname(host, chart);
    if (!st) {
        info("FAILED to find chart %s", chart);
        return 1;
    }

    BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
    char *msg_id = create_uuid();
    buffer_flush(local_buffer);
    local_buffer->contenttype = CT_APPLICATION_JSON;

    aclk_create_header(local_buffer, "chart", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
    buffer_strcat(local_buffer, ",\n\t\"payload\": ");

    rrdset2json(st, local_buffer, NULL, NULL, 1);
    buffer_sprintf(local_buffer, "\t\n}");

    aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id);

    freez(msg_id);
    buffer_free(local_buffer);
    return 0;
}

int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create)
{
#ifndef ENABLE_ACLK
    UNUSED(host);
    UNUSED(chart_name);
    return 0;
#else
    if (unlikely(!netdata_ready))
        return 0;

    if (!netdata_cloud_setting)
        return 0;

    if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost)
        return 0;

    if (aclk_host_initializing(localhost))
        return 0;

    if (unlikely(aclk_disable_single_updates))
        return 0;

    if (aclk_popcorn_check_bump(host))
        return 0;

    if (unlikely(legacy_aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, create ? ACLK_CMD_CHART : ACLK_CMD_CHARTDEL))) {
        if (likely(aclk_connected)) {
            errno = 0;
            error("ACLK failed to queue chart_update command");
        }
    }

    return 0;
#endif
}

int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
{
    BUFFER *local_buffer = NULL;

    if (unlikely(!netdata_ready))
        return 0;

    if (host != localhost)
        return 0;

    if(unlikely(aclk_host_initializing(localhost)))
        return 0;

    /*
     * Check if individual updates have been disabled
     * This will be the case when we do health reload
     * and all the alarms will be dropped and recreated.
     * At the end of the health reload the complete alarm metadata
     * info will be sent
     */
    if (unlikely(aclk_disable_single_updates))
        return 0;

    local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
    char *msg_id = create_uuid();

    buffer_flush(local_buffer);
    aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
    buffer_strcat(local_buffer, ",\n\t\"payload\": ");

    netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
    health_alarm_entry2json_nolock(local_buffer, ae, host);
    netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);

    buffer_sprintf(local_buffer, "\n}");

    if (unlikely(legacy_aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
        if (likely(aclk_connected)) {
            errno = 0;
            error("ACLK failed to queue alarm_command on alarm_update");
        }
    }

    freez(msg_id);
    buffer_free(local_buffer);

    return 0;
}

char *legacy_aclk_state(void)
{
    BUFFER *wb = buffer_create(1024);
    char *ret;

    buffer_strcat(wb,
        "ACLK Available: Yes\n"
        "ACLK Implementation: Legacy\n"
        "Claimed: "
    );

    char *agent_id = is_agent_claimed();
    if (agent_id == NULL)
        buffer_strcat(wb, "No\n");
    else {
        buffer_sprintf(wb, "Yes\nClaimed Id: %s\n", agent_id);
        freez(agent_id);
    }

    buffer_sprintf(wb, "Online: %s", aclk_connected ? "Yes" : "No");

    ret = strdupz(buffer_tostring(wb));
    buffer_free(wb);
    return ret;
}

char *legacy_aclk_state_json(void)
{
    BUFFER *wb = buffer_create(1024);
    char *agent_id = is_agent_claimed();

    buffer_sprintf(wb,
        "{\"aclk-available\":true,"
        "\"aclk-implementation\":\"Legacy\","
        "\"agent-claimed\":%s,"
        "\"claimed-id\":",
        agent_id ? "true" : "false"
    );

    if (agent_id) {
        buffer_sprintf(wb, "\"%s\"", agent_id);
        freez(agent_id);
    } else
        buffer_strcat(wb, "null");

    buffer_sprintf(wb, ",\"online\":%s}", aclk_connected ? "true" : "false");

    return strdupz(buffer_tostring(wb));
}

:: Command execute ::

Enter:
 
Select:
 

:: Search ::
  - regexp 

:: Upload ::
 
[ Read-Only ]

:: Make Dir ::
 
[ Read-Only ]
:: Make File ::
 
[ Read-Only ]

:: Go Dir ::
 
:: Go File ::
 

--[ c99shell v. 2.5 [PHP 8 Update] [24.05.2025] | Generation time: 0.0173 ]--