From 11a985abebed8e939bb3760927ea9402507169f0 Mon Sep 17 00:00:00 2001 From: imi415 Date: Sat, 23 Oct 2021 04:19:46 +0800 Subject: [PATCH] Updated InfluxDB line protocol driver. --- lib/mqtt_influx/mqtt_influx.c | 14 +++++++++++--- lib/mqtt_influx/mqtt_influx.h | 7 ++++++- misc/agent_config.cfg.rpi4 | 21 +++++++++++++++++++++ src/impl/user_lvgl_impl.c | 6 ++---- src/impl/user_mqtt_impl.c | 9 +++++++++ src/main.c | 2 +- src/tasks/user_dht_task.c | 24 ++++++++++++++++++++---- src/tasks/user_mqtt_task.c | 10 ++++++++-- 8 files changed, 78 insertions(+), 15 deletions(-) diff --git a/lib/mqtt_influx/mqtt_influx.c b/lib/mqtt_influx/mqtt_influx.c index 3f9ad50..bcf010f 100644 --- a/lib/mqtt_influx/mqtt_influx.c +++ b/lib/mqtt_influx/mqtt_influx.c @@ -45,13 +45,21 @@ mqtt_influx_ret_t mqtt_influx_publish_measurement(mqtt_influx_t *influx, mqtt_buf[i] = '\0'; } + mqtt_influx_measure_item_t *item_ptr = meas->first_item; + concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, meas->measurement); concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, ",hostname="); concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, influx->hostname); concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, " "); - concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, meas->key); - concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, "="); - concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, meas->value); + do { + concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, item_ptr->key); + concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, "="); + concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, item_ptr->value); + item_ptr = item_ptr->next; + if(item_ptr) { + concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, ","); + } + } while(item_ptr != (void *)0); concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, " "); concat_string(mqtt_buf, MAX_MQTT_REPORT_SIZE, ns_ts); diff --git a/lib/mqtt_influx/mqtt_influx.h b/lib/mqtt_influx/mqtt_influx.h index 6348435..89f614b 100644 --- a/lib/mqtt_influx/mqtt_influx.h +++ b/lib/mqtt_influx/mqtt_influx.h @@ -16,9 +16,14 @@ typedef struct { } mqtt_influx_cb_t; typedef struct { - char *measurement; char *key; char *value; + void *next; +} mqtt_influx_measure_item_t; + +typedef struct { + char *measurement; + mqtt_influx_measure_item_t *first_item; } mqtt_influx_measure_t; typedef struct { diff --git a/misc/agent_config.cfg.rpi4 b/misc/agent_config.cfg.rpi4 index 7f980e3..40156a7 100644 --- a/misc/agent_config.cfg.rpi4 +++ b/misc/agent_config.cfg.rpi4 @@ -72,6 +72,27 @@ agent: { fs_base = "lvgl_fs"; logging_enabled = false; }; + mqtt: { + server: { + host = "127.0.0.1"; + port = 1883; + keepalive = 30; + topic = "test_topic"; + }; + client: { + id_prefix = "SA_"; + username = "test_account"; + password = "test_password"; + }; + tls: { + enabled = true; + psk = ""; + ca_file = ""; # Either ca_file or ca_path must not be null. + ca_path = "/etc/ca-certificates"; + client_cert_file = ""; + client_key_file = ""; + }; + }; }; theme: { diff --git a/src/impl/user_lvgl_impl.c b/src/impl/user_lvgl_impl.c index 2804a4c..3908ad6 100644 --- a/src/impl/user_lvgl_impl.c +++ b/src/impl/user_lvgl_impl.c @@ -144,13 +144,11 @@ lv_fs_res_t user_lvgl_impl_fs_close_cb(lv_fs_drv_t *drv, void *file_p) { int fd = *(int *)file_p; if(fd > 0) { + USER_LOG(USER_LOG_DEBUG, "Free'd fd %d@%p", fd, file_p); free(file_p); - USER_LOG(USER_LOG_DEBUG, "Free'd fd %d@%p", fd, file_p); - - close(fd); - USER_LOG(USER_LOG_DEBUG, "Called close() on fd %d", fd); + close(fd); return LV_FS_RES_OK; } diff --git a/src/impl/user_mqtt_impl.c b/src/impl/user_mqtt_impl.c index 2e89b54..c9565a0 100644 --- a/src/impl/user_mqtt_impl.c +++ b/src/impl/user_mqtt_impl.c @@ -23,6 +23,8 @@ extern user_config_t g_config; * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html */ static void user_moqtt_impl_cb_on_connect(struct mosquitto *mosq, void *obj, int rc) { + UNUSED(mosq); + switch(rc) { case 0: USER_LOG(USER_LOG_INFO, "MQTT connection accepted."); @@ -63,6 +65,9 @@ static void user_moqtt_impl_cb_on_connect(struct mosquitto *mosq, void *obj, int * @param rc disconnect reason, 0 is expected. */ static void user_mqtt_impl_cb_on_disconnect(struct mosquitto *mosq, void *obj, int rc) { + UNUSED(mosq); + UNUSED(obj); + if(rc != 0) { USER_LOG(USER_LOG_WARN, "MQTT received unexpected disconnect event (%d).", rc); } else { @@ -79,6 +84,9 @@ static void user_mqtt_impl_cb_on_disconnect(struct mosquitto *mosq, void *obj, i * @param str log message */ static void user_mqtt_impl_cb_log(struct mosquitto *mosq, void *obj, int level, const char *str) { + UNUSED(mosq); + UNUSED(obj); + user_log_level_t log_level; switch(level) { @@ -270,6 +278,7 @@ int user_mqtt_impl_init(user_mqtt_impl_t *handle) { * @return 0 if success, negative value if error. */ int user_mqtt_impl_deinit(user_mqtt_impl_t *handle) { + mosquitto_disconnect(handle->mosq); mosquitto_destroy(handle->mosq); mosquitto_lib_cleanup(); diff --git a/src/main.c b/src/main.c index e9de1e8..8355cfa 100644 --- a/src/main.c +++ b/src/main.c @@ -32,7 +32,7 @@ int main(int argc, const char *argv[]) { USER_LOG(USER_LOG_INFO, "Application started."); int signal_arr[] = { SIGINT, SIGTERM }; - for(uint8_t i = 0; i < sizeof(signal_arr) / sizeof(int); i++) { + for(unsigned int i = 0; i < sizeof(signal_arr) / sizeof(int); i++) { if(signal(signal_arr[i], signal_handler) == SIG_ERR) { USER_LOG(USER_LOG_FATAL, "Cannot register signal handler %s.", strsignal(signal_arr[i])); return -1; diff --git a/src/tasks/user_dht_task.c b/src/tasks/user_dht_task.c index 62c0814..b7dff32 100644 --- a/src/tasks/user_dht_task.c +++ b/src/tasks/user_dht_task.c @@ -58,15 +58,31 @@ void *user_dht_task(void *arguments) { bme280_result_t res; mqtt_influx_measure_t meas; - char value_buf[32]; + mqtt_influx_measure_item_t meas_item[3]; + char temperature_buf[32]; + char humidity_buf[32]; + char pressure_buf[32]; meas.measurement = "dht"; + meas.first_item = meas_item; + + meas_item[0].key = "temperature"; + meas_item[0].next = &meas_item[1]; + + meas_item[1].key = "humidity"; + meas_item[1].next = &meas_item[2]; + + meas_item[2].key = "pressure"; + meas_item[2].next = NULL; while(g_running) { bme280_measure(&bme, &res); - meas.key = "temperature"; - snprintf(value_buf, 32, "%.02f", res.temperature); - meas.value = value_buf; + snprintf(temperature_buf, 32, "%.02f", res.temperature); + snprintf(humidity_buf, 32, "%.02f", res.humidity); + snprintf(pressure_buf, 32, "%.02f", res.pressure); + meas_item[0].value = temperature_buf; + meas_item[1].value = humidity_buf; + meas_item[2].value = pressure_buf; mqtt_influx_publish_measurement(&g_mqtt_influx, &meas); sleep(1); } diff --git a/src/tasks/user_mqtt_task.c b/src/tasks/user_mqtt_task.c index f7e87f1..01688fd 100644 --- a/src/tasks/user_mqtt_task.c +++ b/src/tasks/user_mqtt_task.c @@ -44,11 +44,17 @@ int user_mqtt_task_deinit(void) { void *user_mqtt_task(void *arguments) { UNUSED(arguments); - user_mqtt_impl_init(&s_mqtt_impl); + if(user_mqtt_impl_init(&s_mqtt_impl) != 0) { + USER_LOG(USER_LOG_ERROR, "MQTT implementation init failed."); + return NULL; + } + mqtt_influx_init(&g_mqtt_influx); while(g_running) { - user_mqtt_network_loop(&s_mqtt_impl); + if(user_mqtt_network_loop(&s_mqtt_impl) !=0) { + USER_LOG(USER_LOG_ERROR, "MQTT network loop returned failure."); + } usleep(5 * 1000); }