/* MQTT publish test This example code is in the Public Domain (or CC0 licensed, at your option.) Unless required by applicable law or agreed to in writing, this software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ #include #include #include #include #include "esp_wifi.h" #include "esp_system.h" #include "nvs_flash.h" #include "esp_event_loop.h" #include "freertos/FreeRTOS.h" #include "freertos/task.h" #include "freertos/semphr.h" #include "freertos/queue.h" #include "freertos/event_groups.h" #include "lwip/sockets.h" #include "lwip/dns.h" #include "lwip/netdb.h" #include "esp_log.h" #include "mqtt_client.h" static const char *TAG = "PUBLISH_TEST"; static EventGroupHandle_t wifi_event_group; static EventGroupHandle_t mqtt_event_group; const static int CONNECTED_BIT = BIT0; static esp_mqtt_client_handle_t mqtt_client = NULL; static char *expected_data = NULL; static char *actual_data = NULL; static size_t expected_size = 0; static size_t expected_published = 0; static size_t actual_published = 0; static int qos_test = 0; static esp_err_t wifi_event_handler(void *ctx, system_event_t *event) { switch (event->event_id) { case SYSTEM_EVENT_STA_START: esp_wifi_connect(); break; case SYSTEM_EVENT_STA_GOT_IP: xEventGroupSetBits(wifi_event_group, CONNECTED_BIT); break; case SYSTEM_EVENT_STA_DISCONNECTED: esp_wifi_connect(); xEventGroupClearBits(wifi_event_group, CONNECTED_BIT); break; default: break; } return ESP_OK; } static void wifi_init(void) { tcpip_adapter_init(); wifi_event_group = xEventGroupCreate(); ESP_ERROR_CHECK(esp_event_loop_init(wifi_event_handler, NULL)); wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT(); ESP_ERROR_CHECK(esp_wifi_init(&cfg)); ESP_ERROR_CHECK(esp_wifi_set_storage(WIFI_STORAGE_RAM)); wifi_config_t wifi_config = { .sta = { .ssid = CONFIG_WIFI_SSID, .password = CONFIG_WIFI_PASSWORD, }, }; ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA)); ESP_ERROR_CHECK(esp_wifi_set_config(ESP_IF_WIFI_STA, &wifi_config)); ESP_LOGI(TAG, "start the WIFI SSID:[%s]", CONFIG_WIFI_SSID); ESP_ERROR_CHECK(esp_wifi_start()); ESP_LOGI(TAG, "Waiting for wifi"); xEventGroupWaitBits(wifi_event_group, CONNECTED_BIT, false, true, portMAX_DELAY); } #if CONFIG_BROKER_CERTIFICATE_OVERRIDDEN == 1 static const uint8_t iot_eclipse_org_pem_start[] = "-----BEGIN CERTIFICATE-----\n" CONFIG_BROKER_CERTIFICATE_OVERRIDE "\n-----END CERTIFICATE-----"; #else extern const uint8_t iot_eclipse_org_pem_start[] asm("_binary_iot_eclipse_org_pem_start"); #endif extern const uint8_t iot_eclipse_org_pem_end[] asm("_binary_iot_eclipse_org_pem_end"); static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) { esp_mqtt_client_handle_t client = event->client; static int msg_id = 0; static int actual_len = 0; // your_context_t *context = event->context; switch (event->event_id) { case MQTT_EVENT_CONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); xEventGroupSetBits(mqtt_event_group, CONNECTED_BIT); msg_id = esp_mqtt_client_subscribe(client, CONFIG_SUBSCIBE_TOPIC, qos_test); ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_PUBLISHED: ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); printf("DATA=%.*s\r\n", event->data_len, event->data); printf("ID=%d, total_len=%d, data_len=%d, current_data_offset=%d\n", event->msg_id, event->total_data_len, event->data_len, event->current_data_offset); if (event->topic) { actual_len = event->data_len; msg_id = event->msg_id; } else { actual_len += event->data_len; // check consisency with msg_id across multiple data events for single msg if (msg_id != event->msg_id) { ESP_LOGI(TAG, "Wrong msg_id in chunked message %d != %d", msg_id, event->msg_id); abort(); } } memcpy(actual_data + event->current_data_offset, event->data, event->data_len); if (actual_len == event->total_data_len) { if (0 == memcmp(actual_data, expected_data, expected_size)) { printf("OK!"); memset(actual_data, 0, expected_size); actual_published ++; if (actual_published == expected_published) { printf("Correct pattern received exactly x times\n"); ESP_LOGI(TAG, "Test finished correctly!"); } } else { printf("FAILED!"); abort(); } } break; case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); break; default: ESP_LOGI(TAG, "Other event id:%d", event->event_id); break; } return ESP_OK; } static void mqtt_app_start(void) { mqtt_event_group = xEventGroupCreate(); const esp_mqtt_client_config_t mqtt_cfg = { .event_handle = mqtt_event_handler, .cert_pem = (const char *)iot_eclipse_org_pem_start, }; ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size()); mqtt_client = esp_mqtt_client_init(&mqtt_cfg); } static void get_string(char *line, size_t size) { int count = 0; while (count < size) { int c = fgetc(stdin); if (c == '\n') { line[count] = '\0'; break; } else if (c > 0 && c < 127) { line[count] = c; ++count; } vTaskDelay(10 / portTICK_PERIOD_MS); } } void app_main() { char line[256]; char pattern[32]; char transport[32]; int repeat = 0; ESP_LOGI(TAG, "[APP] Free memory: %d bytes", esp_get_free_heap_size()); ESP_LOGI(TAG, "[APP] IDF version: %s", esp_get_idf_version()); esp_log_level_set("*", ESP_LOG_INFO); esp_log_level_set("MQTT_CLIENT", ESP_LOG_VERBOSE); esp_log_level_set("TRANSPORT_TCP", ESP_LOG_VERBOSE); esp_log_level_set("TRANSPORT_SSL", ESP_LOG_VERBOSE); esp_log_level_set("TRANSPORT", ESP_LOG_VERBOSE); esp_log_level_set("OUTBOX", ESP_LOG_VERBOSE); nvs_flash_init(); wifi_init(); mqtt_app_start(); while (1) { get_string(line, sizeof(line)); sscanf(line, "%s %s %d %d %d", transport, pattern, &repeat, &expected_published, &qos_test); ESP_LOGI(TAG, "PATTERN:%s REPEATED:%d PUBLISHED:%d\n", pattern, repeat, expected_published); int pattern_size = strlen(pattern); free(expected_data); free(actual_data); actual_published = 0; expected_size = pattern_size * repeat; expected_data = malloc(expected_size); actual_data = malloc(expected_size); for (int i = 0; i < repeat; i++) { memcpy(expected_data + i * pattern_size, pattern, pattern_size); } printf("EXPECTED STRING %.*s, SIZE:%d\n", expected_size, expected_data, expected_size); esp_mqtt_client_stop(mqtt_client); if (0 == strcmp(transport, "tcp")) { ESP_LOGI(TAG, "[TCP transport] Startup.."); esp_mqtt_client_set_uri(mqtt_client, CONFIG_BROKER_TCP_URI); } else if (0 == strcmp(transport, "ssl")) { ESP_LOGI(TAG, "[SSL transport] Startup.."); esp_mqtt_client_set_uri(mqtt_client, CONFIG_BROKER_SSL_URI); } else if (0 == strcmp(transport, "ws")) { ESP_LOGI(TAG, "[WS transport] Startup.."); esp_mqtt_client_set_uri(mqtt_client, CONFIG_BROKER_WS_URI); } else if (0 == strcmp(transport, "wss")) { ESP_LOGI(TAG, "[WSS transport] Startup.."); esp_mqtt_client_set_uri(mqtt_client, CONFIG_BROKER_WSS_URI); } else { ESP_LOGE(TAG, "Unexpected transport"); abort(); } xEventGroupClearBits(mqtt_event_group, CONNECTED_BIT); esp_mqtt_client_start(mqtt_client); ESP_LOGI(TAG, "Note free memory: %d bytes", esp_get_free_heap_size()); xEventGroupWaitBits(mqtt_event_group, CONNECTED_BIT, false, true, portMAX_DELAY); for (int i = 0; i < expected_published; i++) { int msg_id = esp_mqtt_client_publish(mqtt_client, CONFIG_PUBLISH_TOPIC, expected_data, expected_size, qos_test, 0); ESP_LOGI(TAG, "[%d] Publishing...", msg_id); } } }