Added MQTT influx library.

This commit is contained in:
imi415 2021-07-19 01:26:17 +08:00
parent 4161d3c6df
commit d107d4cf43
Signed by: imi415
GPG Key ID: 17F01E106F9F5E0A
9 changed files with 175 additions and 0 deletions

View File

@ -17,11 +17,13 @@ set(C_SOURCES
"src/impl/user_stick_impl.c"
"src/impl/user_bme280_impl.c"
"src/impl/user_ccs811_impl.c"
"src/impl/user_mqtt_impl.c"
"src/tasks/user_lvgl_task.c"
"src/tasks/user_clock_task.c"
"src/tasks/user_dht_task.c"
"src/tasks/user_tvoc_task.c"
"src/tasks/user_base_task.c"
"src/tasks/user_mqtt_task.c"
"src/utils/user_log_util.c"
"src/assets/encode_sans_sc_light_24.c"
"src/assets/encode_sans_sc_regular_32.c"
@ -34,6 +36,7 @@ set(C_INCLUDES
"lib/st7789_lcd"
"lib/bme280_dht"
"lib/ccs811_tvoc"
"lib/mqtt_influx"
"include"
)
@ -54,6 +57,7 @@ set(C_LIBRARIES
"st7789"
"bme280"
"ccs811"
"mqtt_influx"
)
add_subdirectory(lib)

View File

@ -0,0 +1,16 @@
#ifndef USER_MQTT_IMPL_H
#define USER_MQTT_IMPL_H
#include <stdint.h>
#include <mosquitto.h>
#include "mqtt_influx.h"
typedef struct {
struct mosquito *mosq;
} user_mqtt_impl_t;
mqtt_influx_ret_t user_mqtt_get_nsec_timestamp_cb(user_mqtt_impl_t *handle, char *timestamp_string);
mqtt_influx_ret_t user_mqtt_publish_message_cb(user_mqtt_impl_t *handle, char *data);
#endif

View File

@ -13,6 +13,9 @@ int user_dht_task_deinit(void);
int user_base_task_init(void);
int user_base_task_deinit(void);
int user_mqtt_task_init(void);
int user_mqtt_task_deinit(void);
int user_tvoc_task_init(void);
int user_tvoc_task_deinit(void);

View File

@ -33,3 +33,9 @@ set(CCS811_TVOC_SOURCES
add_library(ccs811 STATIC ${CCS811_TVOC_SOURCES})
target_compile_definitions(ccs811 PRIVATE ${CCS811_TVOC_DEFINES})
set(MQTT_INFLUX_SOURCES
"mqtt_influx/mqtt_influx.c"
)
add_library(mqtt_influx STATIC ${MQTT_INFLUX_SOURCES})
target_compile_definitions(mqtt_influx PRIVATE ${MQTT_INFLUX_DEFINES})

View File

@ -0,0 +1,50 @@
#include "mqtt_influx.h"
#define MAX_MQTT_REPORT_SIZE 256
static int concat_string(char *dest, int dest_length, char *append) {
int dest_ptr;
for(dest_ptr = 0; dest_ptr < dest_length; dest_ptr++) {
if(dest[dest_ptr] == '\0') break;
}
int i = 0;
while(dest_ptr + i < dest_length - 1) {
if(append[i] != '\0') {
dest[dest_ptr + i] = append[i];
dest[dest_ptr + i + 1] = '\0';
i++;
} else
break;
}
}
mqtt_influx_ret_t mqtt_influx_init(mqtt_influx_t *influx) {
return MQTT_INFLUX_OK;
}
mqtt_influx_ret_t mqtt_influx_publish_measurement(mqtt_influx_t *influx,
mqtt_influx_measure_t *meas) {
char mqtt_buf[MAX_MQTT_REPORT_SIZE];
char ns_ts[32];
if(influx->cb.get_nsec_timestamp_cb(influx->user_data, ns_ts) !=
MQTT_INFLUX_OK) {
return MQTT_INFLUX_ERROR;
}
concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, meas->measurement);
concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, ",hostname=");
concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, influx->hostname);
concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, " ");
concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, meas->key);
concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, "=");
concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, meas->value);
concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, ns_ts);
if(influx->cb.publish_message_cb(influx->user_data, mqtt_buf) != MQTT_INFLUX_OK) {
return MQTT_INFLUX_ERROR;
}
return MQTT_INFLUX_OK;
}

View File

@ -0,0 +1,31 @@
#ifndef MQTT_INFLUX_H
#define MQTT_INFLUX_H
#include <stdint.h>
typedef enum {
MQTT_INFLUX_OK,
MQTT_INFLUX_ERROR
} mqtt_influx_ret_t;
typedef struct {
mqtt_influx_ret_t (*get_nsec_timestamp_cb)(void *handle, char *timestamp_string);
mqtt_influx_ret_t (*publish_message_cb)(void *handle, char *data);
} mqtt_influx_cb_t;
typedef struct {
char *measurement;
char *key;
char *value;
} mqtt_influx_measure_t;
typedef struct {
mqtt_influx_cb_t cb;
void *user_data;
char hostname[32];
} mqtt_influx_t;
mqtt_influx_ret_t mqtt_influx_init(mqtt_influx_t *influx);
mqtt_influx_ret_t mqtt_influx_publish_measurement(mqtt_influx_t *influx, mqtt_influx_measure_t *meas);
#endif

13
src/impl/user_mqtt_impl.c Normal file
View File

@ -0,0 +1,13 @@
#include <stdio.h>
#include <string.h>
#include "impl/user_mqtt_impl.h"
mqtt_influx_ret_t user_mqtt_get_nsec_timestamp_cb(user_mqtt_impl_t *handle, char *timestamp_string) {
sprintf(timestamp_string, "00000000");
return MQTT_INFLUX_OK;
}
mqtt_influx_ret_t user_mqtt_publish_message_cb(user_mqtt_impl_t *handle, char *data) {
fprintf(stderr, data);
}

View File

@ -15,6 +15,7 @@ uint8_t g_running = 1;
user_config_t g_config;
static void signal_handler(int signo) {
if(signo == SIGINT || signo == SIGTERM) {
g_running = 0;
@ -42,6 +43,7 @@ int main(int argc, const char *argv[]) {
user_config_lookup_int(&g_config, "agent.common.log_level", (int *)&log_level);
user_log_set_level(log_level);
user_mqtt_task_init();
user_lvgl_task_init();
user_base_task_init();
user_clock_task_init();
@ -58,6 +60,7 @@ int main(int argc, const char *argv[]) {
user_clock_task_deinit();
user_base_task_deinit();
user_lvgl_task_deinit();
user_mqtt_task_deinit();
user_config_deinit(&g_config);

View File

@ -0,0 +1,49 @@
#include "tasks/user_task_lvgl_common.h"
#include "impl/user_mqtt_impl.h"
static user_mqtt_impl_t s_mqtt_impl;
mqtt_influx_t g_mqtt_influx = {
.cb = {
.get_nsec_timestamp_cb = (mqtt_influx_ret_t (*)(void *, char *))user_mqtt_get_nsec_timestamp_cb,
.publish_message_cb = (mqtt_influx_ret_t (*)(void *, char *))user_mqtt_publish_message_cb
},
.user_data = &s_mqtt_impl,
};
pthread_t user_mqtt_task_thread;
void *user_mqtt_task(void *arguments);
int user_mqtt_task_init(void) {
int ret;
ret = pthread_create(&user_mqtt_task_thread, NULL, user_mqtt_task, NULL);
if(ret) return ret;
pthread_setname_np(user_mqtt_task_thread, "MQTT");
USER_LOG(USER_LOG_INFO, "MQTT thread created.");
}
int user_mqtt_task_deinit(void) {
USER_LOG(USER_LOG_INFO, "MQTT task_deinit() called.");
pthread_join(user_mqtt_task_thread, NULL);
USER_LOG(USER_LOG_INFO, "MQTT tasks joined.");
return 0;
}
void *user_mqtt_task(void *arguments) {
mqtt_influx_init(&g_mqtt_influx);
while(g_running) {
sleep(1);
}
return NULL;
}