/* ESP drivers */ #include "esp_log.h" #include "esp_system.h" #include "esp_tls.h" /* FreeRTOS */ #include "freertos/FreeRTOS.h" #include "freertos/queue.h" #include "freertos/task.h" /* Cert bundle */ #include "esp_crt_bundle.h" /* MQTT client */ #include "mqtt_client.h" /* Private */ #include "app_wifi.h" #define APP_LOG_TAG "APP_MQTT" typedef enum { APP_MQTT_CMD_PUBLISH, APP_MQTT_CMD_SUBSCRIBE, APP_MQTT_CMD_UNSUBSCRIBE, } app_mqtt_queue_cmd_t; typedef struct { app_mqtt_queue_cmd_t cmd; char *topic; uint8_t *payload; uint32_t payload_len; } app_mqtt_queue_item_t; extern const char mqtt_client_cert_start[] asm("_binary_client_crt_start"); extern const char mqtt_client_cert_end[] asm("_binary_client_crt_end"); extern const char mqtt_client_key_start[] asm("_binary_client_key_start"); extern const char mqtt_client_key_end[] asm("_binary_client_key_end"); static void app_mqtt_event_handler(void *arg, esp_event_base_t event_base, int32_t event_id, void *event_data); static void app_mqtt_task(void *pvParameters); static QueueHandle_t s_app_mqtt_command_queue; static QueueHandle_t s_app_mqtt_response_queue; int app_mqtt_init(void) { s_app_mqtt_command_queue = xQueueCreate(4, sizeof(app_mqtt_queue_item_t)); if (s_app_mqtt_command_queue == NULL) { return -1; } s_app_mqtt_response_queue = xQueueCreate(4, sizeof(app_mqtt_queue_item_t)); if (s_app_mqtt_response_queue == NULL) { return -2; } if (xTaskCreate(app_mqtt_task, "MQ_TASK", 2048, NULL, 2U, NULL) != pdPASS) { return -3; } return 0; } int app_mqtt_publish(char *topic, uint8_t *payload, uint32_t payload_len) { app_mqtt_queue_item_t item; item.cmd = APP_MQTT_CMD_PUBLISH; item.topic = malloc(strlen(topic) + 1); if (item.topic == NULL) return -1; item.payload = malloc(payload_len); if (item.payload == NULL) { free(item.topic); return -2; } item.payload_len = payload_len; strcpy(item.topic, topic); memcpy(item.payload, payload, payload_len); if (xQueueSend(s_app_mqtt_command_queue, &item, portMAX_DELAY) != pdPASS) { free(item.topic); free(item.payload); return -3; } return 0; } int app_mqtt_subscribe(char *topic) { app_mqtt_queue_item_t item; item.cmd = APP_MQTT_CMD_SUBSCRIBE; item.topic = malloc(strlen(topic) + 1); if (item.topic == NULL) { return -1; } strcpy(item.topic, topic); if (xQueueSend(s_app_mqtt_command_queue, &item, portMAX_DELAY) != pdPASS) { free(item.topic); return -2; } return 0; } int app_mqtt_unsubscribe(char *topic) { app_mqtt_queue_item_t item; item.cmd = APP_MQTT_CMD_UNSUBSCRIBE; item.topic = malloc(strlen(topic) + 1); if (item.topic == NULL) { return -1; } strcpy(item.topic, topic); if (xQueueSend(s_app_mqtt_command_queue, &item, portMAX_DELAY) != pdPASS) { free(item.topic); return -2; } return 0; } int app_mqtt_poll(char **topic, uint8_t **payload, uint32_t *payload_len, uint32_t timeout) { app_mqtt_queue_item_t item; if (xQueueReceive(s_app_mqtt_response_queue, &item, pdMS_TO_TICKS(timeout)) != pdPASS) { return -1; } *topic = item.topic; *payload = item.payload; *payload_len = item.payload_len; return 0; } static void app_mqtt_task(void *pvParameters) { const esp_mqtt_client_config_t mqtt_cfg = { .uri = CONFIG_APP_MQTT_BROKER_ADDR, .client_cert_pem = mqtt_client_cert_start, .client_key_pem = mqtt_client_key_start, .clientkey_password = CONFIG_APP_MQTT_TLS_CLIENT_PASSPHRASE, .clientkey_password_len = strlen(CONFIG_APP_MQTT_TLS_CLIENT_PASSPHRASE), .crt_bundle_attach = esp_crt_bundle_attach, }; esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, app_mqtt_event_handler, NULL); app_wifi_wait_ready(portMAX_DELAY); esp_mqtt_client_start(client); app_mqtt_queue_item_t item; for (;;) { if (xQueueReceive(s_app_mqtt_command_queue, &item, portMAX_DELAY) == pdPASS) { switch (item.cmd) { case APP_MQTT_CMD_PUBLISH: { esp_mqtt_client_publish(client, item.topic, (char *)item.payload, item.payload_len, 0, 0); /* This is alloc'ed by us. */ free(item.topic); free(item.payload); } break; case APP_MQTT_CMD_SUBSCRIBE: { esp_mqtt_client_subscribe(client, item.topic, 0); free(item.topic); } break; case APP_MQTT_CMD_UNSUBSCRIBE: { esp_mqtt_client_unsubscribe(client, item.topic); free(item.topic); } default: break; } } } } static void app_mqtt_event_handler(void *arg, esp_event_base_t event_base, int32_t event_id, void *event_data) { esp_mqtt_event_handle_t event = event_data; switch ((esp_mqtt_event_id_t)event_id) { case MQTT_EVENT_DATA: { app_mqtt_queue_item_t item; item.topic = malloc(event->topic_len + 1); if (item.topic == NULL) { ESP_LOGE(APP_LOG_TAG, "Failed to allocate topic"); return; } item.payload = malloc(event->data_len); if (item.payload == NULL) { ESP_LOGE(APP_LOG_TAG, "Failed to allocate data"); free(item.topic); return; } memcpy(item.topic, event->topic, event->topic_len); memcpy(item.payload, event->data, event->data_len); item.topic[event->topic_len] = '\0'; item.payload_len = event->data_len; if (xQueueSend(s_app_mqtt_response_queue, &item, pdMS_TO_TICKS(500)) != pdTRUE) { ESP_LOGE(APP_LOG_TAG, "Response queue is full..."); free(item.topic); free(item.payload); } } break; default: break; } }