For simplicity, I'm using the following "mqttsimple" example from wolfmqtt with some modification to use tls and connect to azure.
/* mqttsimple.c
*
* Copyright (C) 2006-2021 wolfSSL Inc.
*
* This file is part of wolfMQTT.
*
* wolfMQTT is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* wolfMQTT is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335, USA
*/
/* Standalone Example */
/* Include the autoconf generated config.h */
#ifdef HAVE_CONFIG_H
#include <config.h>
#endif
#include "wolfmqtt/mqtt_client.h"
#include "mqttsimple.h"
/* Requires BSD Style Socket */
#ifdef HAVE_SOCKET
#ifndef ENABLE_MQTT_TLS
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <unistd.h>
#endif
/* Configuration */
#define MQTT_HOST "wolfMQTT.azure-devices.net"
#define MQTT_QOS MQTT_QOS_1
#define MQTT_KEEP_ALIVE_SEC 60
#define MQTT_CMD_TIMEOUT_MS 30000
#define MQTT_CON_TIMEOUT_MS 5000
#define MQTT_CLIENT_ID "demoDevice"
#define MQTT_TOPIC_NAME "TestTopic"
#define MQTT_PUBLISH_MSG "TestPublish"
#define MQTT_USERNAME MQTT_HOST "/" MQTT_CLIENT_ID "/?api-version=2021-04-12"
#define MQTT_PASSWORD "SharedAccessSignature sr=wolfMQTT.azure-devices.net + other generated information"
#ifdef ENABLE_MQTT_TLS
#define MQTT_USE_TLS 1
#define MQTT_PORT 8883
#else
#define MQTT_USE_TLS 0
#define MQTT_PORT 1883
#endif
#define MQTT_MAX_PACKET_SZ 1024
#define INVALID_SOCKET_FD -1
#define PRINT_BUFFER_SIZE 80
/* BaltimoreCyberTrustRoot.crt.pem */
static const char* root_ca =
"-----BEGIN CERTIFICATE-----\n"
"MIIDdzCCAl+gAwIBAgIEAgAAuTANBgkqhkiG9w0BAQUFADBaMQswCQYDVQQGEwJJ\n"
"RTESMBAGA1UEChMJQmFsdGltb3JlMRMwEQYDVQQLEwpDeWJlclRydXN0MSIwIAYD\n"
"VQQDExlCYWx0aW1vcmUgQ3liZXJUcnVzdCBSb290MB4XDTAwMDUxMjE4NDYwMFoX\n"
"DTI1MDUxMjIzNTkwMFowWjELMAkGA1UEBhMCSUUxEjAQBgNVBAoTCUJhbHRpbW9y\n"
"ZTETMBEGA1UECxMKQ3liZXJUcnVzdDEiMCAGA1UEAxMZQmFsdGltb3JlIEN5YmVy\n"
"VHJ1c3QgUm9vdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKMEuyKr\n"
"mD1X6CZymrV51Cni4eiVgLGw41uOKymaZN+hXe2wCQVt2yguzmKiYv60iNoS6zjr\n"
"IZ3AQSsBUnuId9Mcj8e6uYi1agnnc+gRQKfRzMpijS3ljwumUNKoUMMo6vWrJYeK\n"
"mpYcqWe4PwzV9/lSEy/CG9VwcPCPwBLKBsua4dnKM3p31vjsufFoREJIE9LAwqSu\n"
"XmD+tqYF/LTdB1kC1FkYmGP1pWPgkAx9XbIGevOF6uvUA65ehD5f/xXtabz5OTZy\n"
"dc93Uk3zyZAsuT3lySNTPx8kmCFcB5kpvcY67Oduhjprl3RjM71oGDHweI12v/ye\n"
"jl0qhqdNkNwnGjkCAwEAAaNFMEMwHQYDVR0OBBYEFOWdWTCCR1jMrPoIVDaGezq1\n"
"BE3wMBIGA1UdEwEB/wQIMAYBAf8CAQMwDgYDVR0PAQH/BAQDAgEGMA0GCSqGSIb3\n"
"DQEBBQUAA4IBAQCFDF2O5G9RaEIFoN27TyclhAO992T9Ldcw46QQF+vaKSm2eT92\n"
"9hkTI7gQCvlYpNRhcL0EYWoSihfVCr3FvDB81ukMJY2GQE/szKN+OMY3EU/t3Wgx\n"
"jkzSswF07r51XgdIGn9w/xZchMB5hbgF/X++ZRGjD8ACtPhSNzkE1akxehi/oCr0\n"
"Epn3o0WC4zxe9Z2etciefC7IpJ5OCBRLbf1wbWsaY71k5h+3zvDyny67G7fyUIhz\n"
"ksLi4xaNmjICq44Y3ekQEe5+NauQrz4wlHrQMz2nZQ/1/I6eYs9HRCwBXbsdtTLS\n"
"R9I4LtD+gdwyah617jzV/OeBHRnDJELqYzmp\n"
"-----END CERTIFICATE-----";
struct timeval {
long tv_sec;
long tv_usec;
};
/* Local Variables */
static MqttClient mClient;
static MqttNet mNetwork;
static int mSockFd = INVALID_SOCKET_FD;
static byte mSendBuf[MQTT_MAX_PACKET_SZ];
static byte mReadBuf[MQTT_MAX_PACKET_SZ];
static volatile word16 mPacketIdLast;
/* Local Functions */
/* msg_new on first data callback */
/* msg_done on last data callback */
/* msg->total_len: Payload total length */
/* msg->buffer: Payload buffer */
/* msg->buffer_len: Payload buffer length */
/* msg->buffer_pos: Payload buffer position */
static int mqtt_message_cb(MqttClient *client, MqttMessage *msg,
byte msg_new, byte msg_done)
{
byte buf[PRINT_BUFFER_SIZE+1];
word32 len;
(void)client;
if (msg_new) {
/* Determine min size to dump */
len = msg->topic_name_len;
if (len > PRINT_BUFFER_SIZE) {
len = PRINT_BUFFER_SIZE;
}
XMEMCPY(buf, msg->topic_name, len);
buf[len] = '\0'; /* Make sure its null terminated */
/* Print incoming message */
PRINTF("MQTT Message: Topic %s, Qos %d, Len %u",
buf, msg->qos, msg->total_len);
}
/* Print message payload */
len = msg->buffer_len;
if (len > PRINT_BUFFER_SIZE) {
len = PRINT_BUFFER_SIZE;
}
XMEMCPY(buf, msg->buffer, len);
buf[len] = '\0'; /* Make sure its null terminated */
PRINTF("Payload (%d - %d): %s",
msg->buffer_pos, msg->buffer_pos + len, buf);
if (msg_done) {
PRINTF("MQTT Message: Done");
}
/* Return negative to terminate publish processing */
return MQTT_CODE_SUCCESS;
}
static void setup_timeout(struct timeval* tv, int timeout_ms)
{
tv->tv_sec = timeout_ms / 1000;
tv->tv_usec = (timeout_ms % 1000) * 1000;
/* Make sure there is a minimum value specified */
if (tv->tv_sec < 0 || (tv->tv_sec == 0 && tv->tv_usec <= 0)) {
tv->tv_sec = 0;
tv->tv_usec = 100;
}
}
static int socket_get_error(int sockFd)
{
int so_error = 0;
socklen_t len = sizeof(so_error);
getsockopt(sockFd, SOL_SOCKET, SO_ERROR, &so_error, &len);
return so_error;
}
static int mqtt_net_connect(void *context, const char* host, word16 port,
int timeout_ms)
{
int rc;
int sockFd, *pSockFd = (int*)context;
struct sockaddr_in addr;
struct addrinfo *result = NULL;
struct addrinfo hints;
if (pSockFd == NULL) {
return MQTT_CODE_ERROR_BAD_ARG;
}
(void)timeout_ms;
/* get address */
XMEMSET(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
XMEMSET(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
rc = getaddrinfo(host, NULL, &hints, &result);
if (rc >= 0 && result != NULL) {
struct addrinfo* res = result;
/* prefer ip4 addresses */
while (res) {
if (res->ai_family == AF_INET) {
result = res;
break;
}
res = res->ai_next;
}
if (result->ai_family == AF_INET) {
addr.sin_port = htons(port);
addr.sin_family = AF_INET;
addr.sin_addr =
((struct sockaddr_in*)(result->ai_addr))->sin_addr;
}
else {
rc = -1;
}
freeaddrinfo(result);
}
if (rc < 0) {
return MQTT_CODE_ERROR_NETWORK;
}
sockFd = socket(addr.sin_family, SOCK_STREAM, 0);
if (sockFd < 0) {
return MQTT_CODE_ERROR_NETWORK;
}
/* Start connect */
rc = connect(sockFd, (struct sockaddr*)&addr, sizeof(addr));
if (rc < 0) {
PRINTF("NetConnect_simple: Error %d (Sock Err %d)",
rc, socket_get_error(*pSockFd));
close(sockFd);
return MQTT_CODE_ERROR_NETWORK;
}
/* save socket number to context */
*pSockFd = sockFd;
return MQTT_CODE_SUCCESS;
}
static int mqtt_net_read(void *context, byte* buf, int buf_len, int timeout_ms)
{
int rc;
int *pSockFd = (int*)context;
int bytes = 0;
struct timeval tv;
if (pSockFd == NULL) {
return MQTT_CODE_ERROR_BAD_ARG;
}
/* Setup timeout */
setup_timeout(&tv, timeout_ms);
setsockopt(*pSockFd, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv));
/* Loop until buf_len has been read, error or timeout */
while (bytes < buf_len) {
rc = (int)recv(*pSockFd, &buf[bytes], buf_len - bytes, 0);
if (rc < 0) {
rc = socket_get_error(*pSockFd);
if (rc == 0)
break; /* timeout */
PRINTF("NetRead: Error %d", rc);
return MQTT_CODE_ERROR_NETWORK;
}
bytes += rc; /* Data */
}
if (bytes == 0) {
return MQTT_CODE_ERROR_TIMEOUT;
}
return bytes;
}
static int mqtt_net_write(void *context, const byte* buf, int buf_len,
int timeout_ms)
{
int rc;
int *pSockFd = (int*)context;
struct timeval tv;
if (pSockFd == NULL) {
return MQTT_CODE_ERROR_BAD_ARG;
}
/* Setup timeout */
setup_timeout(&tv, timeout_ms);
setsockopt(*pSockFd, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv, sizeof(tv));
rc = (int)send(*pSockFd, buf, buf_len, 0);
if (rc < 0) {
PRINTF("NetWrite: Error %d (Sock Err %d)",
rc, socket_get_error(*pSockFd));
return MQTT_CODE_ERROR_NETWORK;
}
return rc;
}
static int mqtt_net_disconnect(void *context)
{
int *pSockFd = (int*)context;
if (pSockFd == NULL) {
return MQTT_CODE_ERROR_BAD_ARG;
}
close(*pSockFd);
*pSockFd = INVALID_SOCKET_FD;
return MQTT_CODE_SUCCESS;
}
#ifdef ENABLE_MQTT_TLS
static int mqtt_tls_verify_cb(int preverify, WOLFSSL_X509_STORE_CTX* store)
{
char buffer[WOLFSSL_MAX_ERROR_SZ];
PRINTF("MQTT TLS Verify Callback: PreVerify %d, Error %d (%s)",
preverify, store->error, store->error != 0 ?
wolfSSL_ERR_error_string(store->error, buffer) : "none");
PRINTF(" Subject's domain name is %s", store->domain);
if (store->error != 0) {
/* Allowing to continue */
/* Should check certificate and return 0 if not okay */
PRINTF(" Allowing cert anyways");
}
return 1;
}
/* Use this callback to setup TLS certificates and verify callbacks */
static int mqtt_tls_cb(MqttClient* client)
{
int rc = WOLFSSL_FAILURE;
client->tls.ctx = wolfSSL_CTX_new(wolfTLSv1_2_client_method());
if (client->tls.ctx) {
wolfSSL_CTX_set_verify(client->tls.ctx, WOLFSSL_VERIFY_PEER,
mqtt_tls_verify_cb);
/* Load CA certificate buffer */
rc = wolfSSL_CTX_load_verify_buffer(client->tls.ctx,
(const byte*)root_ca, (long)XSTRLEN(root_ca), WOLFSSL_FILETYPE_PEM);
}
PRINTF("MQTT TLS Setup (%d)", rc);
return rc;
}
#else
static int mqtt_tls_cb(MqttClient* client)
{
(void)client;
return 0;
}
#endif /* ENABLE_MQTT_TLS */
static word16 mqtt_get_packetid(void)
{
/* Check rollover */
if (mPacketIdLast >= MAX_PACKET_ID) {
mPacketIdLast = 0;
}
return ++mPacketIdLast;
}
/* Public Function */
int mqttsimple_test(void)
{
int rc = 0;
MqttObject mqttObj;
MqttTopic topics[1];
/* Initialize MQTT client */
XMEMSET(&mNetwork, 0, sizeof(mNetwork));
mNetwork.connect = mqtt_net_connect;
mNetwork.read = mqtt_net_read;
mNetwork.write = mqtt_net_write;
mNetwork.disconnect = mqtt_net_disconnect;
mNetwork.context = &mSockFd;
rc = MqttClient_Init(&mClient, &mNetwork, mqtt_message_cb,
mSendBuf, sizeof(mSendBuf), mReadBuf, sizeof(mReadBuf),
MQTT_CON_TIMEOUT_MS);
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
PRINTF("MQTT Init Success");
/* Connect to broker */
rc = MqttClient_NetConnect(&mClient, MQTT_HOST, MQTT_PORT,
MQTT_CON_TIMEOUT_MS, MQTT_USE_TLS, mqtt_tls_cb);
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
PRINTF("MQTT Network Connect Success: Host %s, Port %d, UseTLS %d",
MQTT_HOST, MQTT_PORT, MQTT_USE_TLS);
/* Send Connect and wait for Ack */
XMEMSET(&mqttObj, 0, sizeof(mqttObj));
mqttObj.connect.keep_alive_sec = MQTT_KEEP_ALIVE_SEC;
mqttObj.connect.client_id = MQTT_CLIENT_ID;
mqttObj.connect.username = MQTT_USERNAME;
mqttObj.connect.password = MQTT_PASSWORD;
rc = MqttClient_Connect(&mClient, &mqttObj.connect);
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
PRINTF("MQTT Broker Connect Success: ClientID %s, Username %s, Password %s",
MQTT_CLIENT_ID,
(MQTT_USERNAME == NULL) ? "Null" : MQTT_USERNAME,
(MQTT_PASSWORD == NULL) ? "Null" : MQTT_PASSWORD);
/* Subscribe and wait for Ack */
XMEMSET(&mqttObj, 0, sizeof(mqttObj));
topics[0].topic_filter = MQTT_TOPIC_NAME;
topics[0].qos = MQTT_QOS;
mqttObj.subscribe.packet_id = mqtt_get_packetid();
mqttObj.subscribe.topic_count = sizeof(topics) / sizeof(MqttTopic);
mqttObj.subscribe.topics = topics;
rc = MqttClient_Subscribe(&mClient, &mqttObj.subscribe);
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
PRINTF("MQTT Subscribe Success: Topic %s, QoS %d",
MQTT_TOPIC_NAME, MQTT_QOS);
/* Publish */
XMEMSET(&mqttObj, 0, sizeof(mqttObj));
mqttObj.publish.qos = MQTT_QOS;
mqttObj.publish.topic_name = MQTT_TOPIC_NAME;
mqttObj.publish.packet_id = mqtt_get_packetid();
mqttObj.publish.buffer = (byte*)MQTT_PUBLISH_MSG;
mqttObj.publish.total_len = XSTRLEN(MQTT_PUBLISH_MSG);
rc = MqttClient_Publish(&mClient, &mqttObj.publish);
if (rc != MQTT_CODE_SUCCESS) {
goto exit;
}
PRINTF("MQTT Publish: Topic %s, Qos %d, Message %s",
mqttObj.publish.topic_name, mqttObj.publish.qos, mqttObj.publish.buffer);
/* Wait for messages */
while (1) {
rc = MqttClient_WaitMessage_ex(&mClient, &mqttObj, MQTT_CMD_TIMEOUT_MS);
if (rc == MQTT_CODE_ERROR_TIMEOUT) {
/* send keep-alive ping */
rc = MqttClient_Ping_ex(&mClient, &mqttObj.ping);
if (rc != MQTT_CODE_SUCCESS) {
break;
}
PRINTF("MQTT Keep-Alive Ping");
}
else if (rc != MQTT_CODE_SUCCESS) {
break;
}
}
exit:
if (rc != MQTT_CODE_SUCCESS) {
PRINTF("MQTT Error %d: %s", rc, MqttClient_ReturnCodeToString(rc));
}
return rc;
}
#endif /* HAVE_SOCKET */
#ifndef NO_MAIN_DRIVER
int main(int argc, char** argv)
{
int rc = -1;
(void)argc;
(void)argv;
#ifdef HAVE_SOCKET
rc = mqttsimple_test();
#endif
return (rc == 0) ? 0 : EXIT_FAILURE;
}
#endif /* !NO_MAIN_DRIVER */
It waits continuously in ProcessReply enter without receiving anything from server. Unfortunately, I could not see any logs from the broker since Azure logs doesnt cover TLs step. I also cant use tools like wireshark for monitoring packets since my device uses another network using a simcard.