From a13292584866b76d2b62bfd90333dc9f6d9b2977 Mon Sep 17 00:00:00 2001 From: imi415 Date: Sat, 23 Oct 2021 02:31:54 +0800 Subject: [PATCH] Implemented MQTT routine. --- .clang-format | 2 +- include/impl/user_mqtt_impl.h | 7 +- lib/mqtt_influx/mqtt_influx.c | 4 + lib/mqtt_influx/mqtt_influx.h | 2 + misc/agent_config.cfg.example | 13 +- src/impl/user_mqtt_impl.c | 264 +++++++++++++++++++++++++++++++++- src/tasks/user_mqtt_task.c | 12 +- src/utils/user_system_util.c | 4 + 8 files changed, 295 insertions(+), 13 deletions(-) diff --git a/.clang-format b/.clang-format index 09d20c8..cab91f2 100644 --- a/.clang-format +++ b/.clang-format @@ -54,7 +54,7 @@ BreakConstructorInitializersBeforeComma: false BreakConstructorInitializers: BeforeColon BreakAfterJavaFieldAnnotations: false BreakStringLiterals: true -ColumnLimit: 80 +ColumnLimit: 160 CommentPragmas: '^ IWYU pragma:' CompactNamespaces: false ConstructorInitializerAllOnOneLineOrOnePerLine: false diff --git a/include/impl/user_mqtt_impl.h b/include/impl/user_mqtt_impl.h index 67933de..27dbfb4 100644 --- a/include/impl/user_mqtt_impl.h +++ b/include/impl/user_mqtt_impl.h @@ -6,14 +6,19 @@ #include "mqtt_influx.h" +#define USER_MQTT_IMPL_TOPIC_MAX_LEN 31 + typedef struct { struct mosquitto *mosq; - char topic[32]; + char topic[USER_MQTT_IMPL_TOPIC_MAX_LEN + 1]; + bool ready; } user_mqtt_impl_t; int user_mqtt_impl_init(user_mqtt_impl_t *handle); int user_mqtt_impl_deinit(user_mqtt_impl_t *handle); mqtt_influx_ret_t user_mqtt_get_nsec_timestamp_cb(user_mqtt_impl_t *handle, char *timestamp_string); mqtt_influx_ret_t user_mqtt_publish_message_cb(user_mqtt_impl_t *handle, char *data); +bool user_mqtt_ready_cb(user_mqtt_impl_t *handle); +int user_mqtt_network_loop(user_mqtt_impl_t *handle); #endif \ No newline at end of file diff --git a/lib/mqtt_influx/mqtt_influx.c b/lib/mqtt_influx/mqtt_influx.c index 9ecd8b2..3f9ad50 100644 --- a/lib/mqtt_influx/mqtt_influx.c +++ b/lib/mqtt_influx/mqtt_influx.c @@ -32,6 +32,10 @@ mqtt_influx_ret_t mqtt_influx_publish_measurement(mqtt_influx_t *influx, char mqtt_buf[MAX_MQTT_REPORT_SIZE]; char ns_ts[32]; + if(!influx->cb.ready_cb(influx->user_data)) { + return MQTT_INFLUX_ERROR; + } + if(influx->cb.get_nsec_timestamp_cb(influx->user_data, ns_ts) != MQTT_INFLUX_OK) { return MQTT_INFLUX_ERROR; diff --git a/lib/mqtt_influx/mqtt_influx.h b/lib/mqtt_influx/mqtt_influx.h index e06261a..6348435 100644 --- a/lib/mqtt_influx/mqtt_influx.h +++ b/lib/mqtt_influx/mqtt_influx.h @@ -2,6 +2,7 @@ #define MQTT_INFLUX_H #include +#include typedef enum { MQTT_INFLUX_OK, @@ -11,6 +12,7 @@ typedef enum { typedef struct { mqtt_influx_ret_t (*get_nsec_timestamp_cb)(void *handle, char *timestamp_string); mqtt_influx_ret_t (*publish_message_cb)(void *handle, char *data); + bool (*ready_cb)(void *handle); } mqtt_influx_cb_t; typedef struct { diff --git a/misc/agent_config.cfg.example b/misc/agent_config.cfg.example index d8c252b..e3b7d46 100644 --- a/misc/agent_config.cfg.example +++ b/misc/agent_config.cfg.example @@ -73,15 +73,22 @@ agent: { logging_enabled = true; }; mqtt: { + server: { + host = "127.0.0.1"; + port = 1883; + keepalive = 30; + topic = ""; + }; client: { id_prefix = ""; - server_host = "127.0.0.1"; - server_port = 1883; + username = ""; + password = ""; }; tls: { enabled = false; psk = ""; - ca_file = ""; + ca_file = ""; # Either ca_file or ca_path must not be null. + ca_path = ""; client_cert_file = ""; client_key_file = ""; }; diff --git a/src/impl/user_mqtt_impl.c b/src/impl/user_mqtt_impl.c index 66cea44..2e89b54 100644 --- a/src/impl/user_mqtt_impl.c +++ b/src/impl/user_mqtt_impl.c @@ -1,14 +1,223 @@ +#include #include #include #include -#include -#include "user_common.h" +#include "drivers/user_config_driver.h" #include "impl/user_mqtt_impl.h" +#include "user_common.h" #include "utils/user_log_util.h" +#define USER_MQTT_CONFIG_COMMON "agent.libraries.mqtt." + // TODO: Add callbacks here as static functions. +extern user_config_t g_config; + +/** + * @brief Mosquitto connect callback function. + * + * @param mosq mosquitto object pointer + * @param obj user pointer passed to mosquitto_new() + * @param rc connect return value, see section 3.2.2.3 of + * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html + */ +static void user_moqtt_impl_cb_on_connect(struct mosquitto *mosq, void *obj, int rc) { + switch(rc) { + case 0: + USER_LOG(USER_LOG_INFO, "MQTT connection accepted."); + break; + case 1: + USER_LOG(USER_LOG_WARN, "MQTT connection refused, unacceptable protocol version."); + break; + case 2: + USER_LOG(USER_LOG_WARN, "MQTT connection refused, identifier rejected."); + break; + case 3: + USER_LOG(USER_LOG_WARN, "MQTT connection refused, server unavailable."); + break; + case 4: + USER_LOG(USER_LOG_WARN, "MQTT connection refused, bad username or password."); + break; + case 5: + USER_LOG(USER_LOG_WARN, "MQTT connection refused, not authorized."); + break; + default: + USER_LOG(USER_LOG_WARN, "MQTT connection callback: unknown rc (%d).", rc); + break; + } + + if(rc == 0) { + user_mqtt_impl_t *handle = obj; + handle->ready = true; + // If we are about to subscribe some topics, do it here. + } +} + +/** + * @brief Mosquitto disconnect event callback function, + * used to determine whether this disconnect is expected or not. + * + * @param mosq mosquitto object pointer + * @param obj user pointer passed to mosquitto_new() + * @param rc disconnect reason, 0 is expected. + */ +static void user_mqtt_impl_cb_on_disconnect(struct mosquitto *mosq, void *obj, int rc) { + if(rc != 0) { + USER_LOG(USER_LOG_WARN, "MQTT received unexpected disconnect event (%d).", rc); + } else { + USER_LOG(USER_LOG_INFO, "Disconnected successfully from MQTT broker."); + } +} + +/** + * @brief Mosquitto logging callback function. + * + * @param mosq mosquitto object pointer + * @param obj user pointer passed to mosquitto_new() + * @param level log level + * @param str log message + */ +static void user_mqtt_impl_cb_log(struct mosquitto *mosq, void *obj, int level, const char *str) { + user_log_level_t log_level; + + switch(level) { + case MOSQ_LOG_DEBUG: + log_level = USER_LOG_DEBUG; + break; + case MOSQ_LOG_INFO: + case MOSQ_LOG_NOTICE: + log_level = USER_LOG_INFO; + break; + case MOSQ_LOG_WARNING: + log_level = USER_LOG_WARN; + break; + case MOSQ_LOG_ERR: + log_level = USER_LOG_ERROR; + break; + default: + log_level = USER_LOG_DEBUG; + break; + } + + USER_LOG(log_level, "mosquitto: %s", str); +} + +/** + * @brief Mosquitto config (mainly TLS related) + * + * @param handle user_mqtt_impl_t handle pointer + * @return 0 if success, negative value if error. + */ +static inline int user_mqtt_impl_config(user_mqtt_impl_t *handle) { + char *server_host = user_config_lookup_string(&g_config, USER_MQTT_CONFIG_COMMON "server.host"); + if(!server_host) { + USER_LOG(USER_LOG_ERROR, "failed to lookup MQTT host string."); + return -1; + } + + int server_port = 0; + if(user_config_lookup_int(&g_config, USER_MQTT_CONFIG_COMMON "server.port", &server_port) != 0) { + USER_LOG(USER_LOG_ERROR, "failed to lookup MQTT port."); + return -1; + } + + bool use_tls = false; + if(user_config_lookup_bool(&g_config, USER_MQTT_CONFIG_COMMON "tls.enabled", &use_tls) != 0) { + USER_LOG(USER_LOG_ERROR, "failed to get TLS config."); + return -1; + } + + if(use_tls) { + char *psk = user_config_lookup_string(&g_config, USER_MQTT_CONFIG_COMMON "tls.psk"); + if(!psk || (psk[0] == '\0')) { + // Use client certificate + char *ca_path = user_config_lookup_string(&g_config, USER_MQTT_CONFIG_COMMON "tls.ca_path"); + char *ca_file = user_config_lookup_string(&g_config, USER_MQTT_CONFIG_COMMON "tls.ca_file"); + if(!ca_path || (ca_path[0] == '\0')) ca_path = NULL; + if(!ca_file || (ca_file[0] == '\0')) ca_file = NULL; + if(!ca_file && !ca_path) { + USER_LOG(USER_LOG_ERROR, "failed to set CA path or file."); + return -2; + } + + char *client_cert = user_config_lookup_string(&g_config, USER_MQTT_CONFIG_COMMON "tls.client_cert_file"); + char *client_key = user_config_lookup_string(&g_config, USER_MQTT_CONFIG_COMMON "tls.client_key_file"); + if(!client_cert || (client_cert[0] == '\0')) client_cert = NULL; + if(!client_key || (client_key[0] == '\0')) client_key = NULL; + if(client_cert == NULL && client_key != NULL) { + USER_LOG(USER_LOG_ERROR, "must provide both certificate and private key."); + return -2; + } else if(client_cert != NULL && client_key == NULL) { + USER_LOG(USER_LOG_ERROR, "must provide both certificate and private key."); + return -2; + } + + if(mosquitto_tls_set(handle->mosq, ca_file, ca_path, client_cert, client_key, NULL) != MOSQ_ERR_SUCCESS) { + return -2; + } + } else { + if(mosquitto_tls_psk_set(handle->mosq, psk, "IDENTITY", NULL) != MOSQ_ERR_SUCCESS) { + return -2; + } + } + } + + return 0; +} + +/** + * @brief Set mosquitto username and password, connect + * to server. + * + * @param handle user_mqtt_impl_t handle + * @return 0 if success, negative value if error. + */ +static inline int user_mqtt_connect(user_mqtt_impl_t *handle) { + char *mqtt_user = user_config_lookup_string(&g_config, USER_MQTT_CONFIG_COMMON "client.username"); + char *mqtt_pass = user_config_lookup_string(&g_config, USER_MQTT_CONFIG_COMMON "client.password"); + if(mqtt_user && (mqtt_user[0] != '\0')) { + if(!mqtt_pass || (mqtt_pass[0] == '\0')) mqtt_pass = NULL; + mosquitto_username_pw_set(handle->mosq, mqtt_user, mqtt_pass); + USER_LOG(USER_LOG_DEBUG, "set MQTT username and password."); + } + + char *mqtt_host = user_config_lookup_string(&g_config, USER_MQTT_CONFIG_COMMON "server.host"); + if(!mqtt_host) { + USER_LOG(USER_LOG_ERROR, "failed to get MQTT server."); + return -1; + } + int mqtt_port = 0; + if(user_config_lookup_int(&g_config, USER_MQTT_CONFIG_COMMON "server.port", &mqtt_port) != 0) { + USER_LOG(USER_LOG_ERROR, "failed to get MQTT port."); + return -1; + } + int mqtt_keepalive = 0; + if(user_config_lookup_int(&g_config, USER_MQTT_CONFIG_COMMON "server.keepalive", &mqtt_keepalive) != 0) { + USER_LOG(USER_LOG_INFO, "No MQTT keepalive defined, use default."); + mqtt_keepalive = 30; + } + + mosquitto_log_callback_set(handle->mosq, user_mqtt_impl_cb_log); + mosquitto_connect_callback_set(handle->mosq, user_moqtt_impl_cb_on_connect); + mosquitto_disconnect_callback_set(handle->mosq, user_mqtt_impl_cb_on_disconnect); + + int result = mosquitto_connect(handle->mosq, mqtt_host, mqtt_port, mqtt_keepalive); + + if(result != MOSQ_ERR_SUCCESS) { + USER_LOG(USER_LOG_ERROR, "mosquitto connect() failed, result is %s.", mosquitto_strerror(result)); + return -2; + } + + return 0; +} + +/** + * @brief Init MQTT implementation + * + * @param handle user_mqtt_impl_t handle pointer + * @return 0 if success, negative value if error. + */ int user_mqtt_impl_init(user_mqtt_impl_t *handle) { int mosq_major, mosq_minor, mosq_revision; mosquitto_lib_init(); @@ -16,6 +225,20 @@ int user_mqtt_impl_init(user_mqtt_impl_t *handle) { mosquitto_lib_version(&mosq_major, &mosq_minor, &mosq_revision); USER_LOG(USER_LOG_INFO, "libmosquitto library version %d.%d rev. %d.", mosq_major, mosq_minor, mosq_revision); + char *topic = user_config_lookup_string(&g_config, USER_MQTT_CONFIG_COMMON "server.topic"); + if(!topic || (topic[0] == '\0')) { + USER_LOG(USER_LOG_ERROR, "MQTT topic undefined."); + return -1; + } + + int topic_len = strlen(topic); + if(topic_len > USER_MQTT_IMPL_TOPIC_MAX_LEN) { + USER_LOG(USER_LOG_WARN, "MQTT topic too long, cap to %d bytes.", USER_MQTT_IMPL_TOPIC_MAX_LEN); + topic_len = USER_MQTT_IMPL_TOPIC_MAX_LEN; + } + + strncpy(handle->topic, topic, topic_len); + // Init mosquitto instance. handle->mosq = mosquitto_new("ID", false, handle); if(handle->mosq == NULL) { @@ -27,9 +250,25 @@ int user_mqtt_impl_init(user_mqtt_impl_t *handle) { // Enable multi-thread support. mosquitto_threaded_set(handle->mosq, true); + if(user_mqtt_impl_config(handle) != 0) { + USER_LOG(USER_LOG_ERROR, "MQTT config failed."); + return -2; + } + + if(user_mqtt_connect(handle) != 0) { + USER_LOG(USER_LOG_ERROR, "MQTT connect failed."); + return -3; + } + return 0; } +/** + * @brief De-initialize MQTT implementation + * + * @param handle user_mqtt_impl_t handle pointer + * @return 0 if success, negative value if error. + */ int user_mqtt_impl_deinit(user_mqtt_impl_t *handle) { mosquitto_destroy(handle->mosq); mosquitto_lib_cleanup(); @@ -37,8 +276,23 @@ int user_mqtt_impl_deinit(user_mqtt_impl_t *handle) { return 0; } +/** + * @brief MQTT implementation main network loop, run this forever. + * + * @param handle user_mqtt_impl_t handle pointer + * @return 0 if success, negative value if error. + */ int user_mqtt_network_loop(user_mqtt_impl_t *handle) { - return mosquitto_loop(handle->mosq, 1000, 1); + int status = mosquitto_loop(handle->mosq, 1000, 1); + if(status == MOSQ_ERR_SUCCESS) { + return 0; + } else if(status == MOSQ_ERR_NO_CONN) { + USER_LOG(USER_LOG_DEBUG, "Mosquitto loop not connected"); + return 0; + } else { + USER_LOG(USER_LOG_ERROR, "Mosquitto loop returned error: %s.", mosquitto_strerror(status)); + return -1; + } } mqtt_influx_ret_t user_mqtt_get_nsec_timestamp_cb(user_mqtt_impl_t *handle, char *timestamp_string) { @@ -57,4 +311,8 @@ mqtt_influx_ret_t user_mqtt_publish_message_cb(user_mqtt_impl_t *handle, char *d mosquitto_publish(handle->mosq, NULL, handle->topic, strlen(data), data, 1, false); return MQTT_INFLUX_OK; +} + +bool user_mqtt_ready_cb(user_mqtt_impl_t *handle) { + return handle->ready; } \ No newline at end of file diff --git a/src/tasks/user_mqtt_task.c b/src/tasks/user_mqtt_task.c index 8a50e74..f7e87f1 100644 --- a/src/tasks/user_mqtt_task.c +++ b/src/tasks/user_mqtt_task.c @@ -5,10 +5,11 @@ static user_mqtt_impl_t s_mqtt_impl; mqtt_influx_t g_mqtt_influx = { - .cb = {.get_nsec_timestamp_cb = (mqtt_influx_ret_t(*)( - void *, char *))user_mqtt_get_nsec_timestamp_cb, - .publish_message_cb = (mqtt_influx_ret_t(*)( - void *, char *))user_mqtt_publish_message_cb}, + .cb = { + .get_nsec_timestamp_cb = (mqtt_influx_ret_t(*)(void *, char *))user_mqtt_get_nsec_timestamp_cb, + .publish_message_cb = (mqtt_influx_ret_t(*)(void *, char *))user_mqtt_publish_message_cb, + .ready_cb = (bool(*)(void *))user_mqtt_ready_cb, + }, .user_data = &s_mqtt_impl, .hostname = "SystemAgent", }; @@ -47,7 +48,8 @@ void *user_mqtt_task(void *arguments) { mqtt_influx_init(&g_mqtt_influx); while(g_running) { - sleep(1); + user_mqtt_network_loop(&s_mqtt_impl); + usleep(5 * 1000); } return NULL; diff --git a/src/utils/user_system_util.c b/src/utils/user_system_util.c index d461a86..a3ab95c 100644 --- a/src/utils/user_system_util.c +++ b/src/utils/user_system_util.c @@ -16,3 +16,7 @@ int user_system_get_systemd_unique_id(char *uuid) { return 0; } + +int user_system_get_primary_mac_address(char *mac_addr) { + return 0; +} \ No newline at end of file