This is my MQTT client library for Contiki.
It is completely asynchronous, starting a new process to handle communication with the message broker. It supports subscribing, publishing, authentication, will messages, keep alive pings and all three QoS levels. In short, it should be a fully functional client, though some areas haven’t been well tested yet.
See the example below for usage.
Files
Browse: repo
Example
#include "mqtt-service.h" PROCESS_THREAD(mqtt_client_process, ev, data) { static uip_ip6addr_t server_address; // Allocate buffer space for the MQTT client static uint8_t in_buffer[64]; static uint8_t out_buffer[64]; // Setup an mqtt_connect_info_t structure to describe // how the connection should behave static mqtt_connect_info_t connect_info = { .client_id = "contiki", .username = NULL, .password = NULL, .will_topic = NULL, .will_message = NULL, .keepalive = 60, .will_qos = 0, .will_retain = 0, .clean_session = 1 }; // The list of topics that we want to subscribe to static const char* topics[] = { "0", "1", "2", "3", "4", "5", NULL }; PROCESS_BEGIN(); // Set the server address uip_ip6addr(&server_address, 0xbbbb, 0, 0, 0, 0, 0, 0, 0x110); // Initialise the MQTT client mqtt_init(in_buffer, sizeof(in_buffer, out_buffer, sizeof(out_buffer)); // Ask the client to connect to the server // and wait for it to complete. mqtt_connect(&server_address, UIP_HTONS(1883), 1, &connect_info); PROCESS_WAIT_EVENT_UNTIL(ev == mqtt_event); if(mqtt_connected()) { static int i; for(i = 0; topics[i] != NULL; ++i) { // Ask the client to subscribe to the topic // and wait for it to complete mqtt_subscribe(topics[i]); PROCESS_WAIT_EVENT_UNTIL(ev == mqtt_event); } // Loop waiting for events from the MQTT client while(1) { PROCESS_WAIT_EVENT_UNTIL(ev == mqtt_event); // If we've received a publish event then print // out the topic and message if(mqtt_event_is_publish(data)) { const char* topic = mqtt_svc_event_get_topic(data); const char* message = mqtt_svc_event_get_message(data); int level = 0; printf("%s = %s\n", topic, message); // Relay the received message to a new topic mqtt_publish("new_topic", message, 0, 1); } } } else printf("mqtt service connect failed\n"); PROCESS_END(); }
High-Level API
// Must be called before any other function is called. // // Initialises the MQTT client library and associates the // provided buffers with it. The buffer memory must remain // valid throughout the use of the API. void mqtt_init(uint8_t* in_buffer, int in_buffer_length, uint8_t* out_buffer, int out_buffer_length); // Starts an asynchronous connect to the server specified by // the address and port. If auto_reconnect is non-zero then // the it will keep trying to connect indefinitely and if the // connection drops it will attempt to reconnect. // The info structure provides other connection details // such as username/password, will topic/message, etc. // The memory pointed to by info must remain valid // throughout the use of the API. // // The calling process will receive an mqtt_event of type // MQTT_EVENT_CONNECTED when the operation is complete. // Or an event of type MQTT_EVENT_DISCONNECTED if the // connect attempt fails. int mqtt_connect(uip_ip6addr_t* address, uint16_t port, int auto_reconnect, mqtt_connect_info* info); // Starts an asynchronous disconnect from the server. // The calling process will receive a mqtt_event of type // MQTT_EVENT_TYPE_EXITED when the operation is complete. int mqtt_disconnect() // Starts an asynchronous subscribe to the specified topic // The calling process will receive a mqtt_event of type // MQTT_EVENT_TYPE_SUBSCRIBE when the servers reply has // been received. int mqtt_subscribe(const char* topic) // Starts an asynchronous unsubscribe of the specified topic. // The calling process will receive a mqtt_event of type // MQTT_EVENT_TYPE_UNSUBSCRIBED when the server's reply // has been received. int mqtt_unsubscribe(const char* topic) // Starts an asynchronous publish of the specified data to // the specified topic. // The calling process will receive a mqtt_event of type // MQTT_EVENT_TYPE_PUBLISHED when the operation is complete int mqtt_publish(const char* topic, const char* data, int qos, int retain) // Same as mqtt_publish() but the data doesn't have to be // NULL terminated as a length is supplied instead. int mqtt_publish_with_length(const char* topic, const char* data, int data_length, int qos, int retain) // The following functions are helpers for use when the calling // process receives an mqtt_event. // Returns non-zero if the client is currently connected int mqtt_connected() // Returns zon-zero if the client is ready to accept a // new operation, or zero if it is still busy with a // previous request. int mqtt_ready() // Tests whether the event data passed to the calling process // is for an MQTT_EVENT_TYPE_CONNECTED event. int mqtt_event_is_connected(void* data) // Tests whether the event data passed to the calling process // is for an MQTT_EVENT_TYPE_DISCONNECTED event. int mqtt_event_is_disconnected(void* data) // Tests whether the event data passed to the calling process // is for an MQTT_EVENT_TYPE_SUBSCRIBED event. int mqtt_event_is_subscribed(void* data) // Tests whether the event data passed to the calling process // is for an MQTT_EVENT_TYPE_UNSUBSCRIBED event. int mqtt_event_is_unsubscribed(void* data) // Tests whether the event data passed to the calling process // is for an MQTT_EVENT_TYPE_PUBLISH event. int mqtt_event_is_publish(void* data) // Tests whether the event data passed to the calling process // is for an MQTT_EVENT_TYPE_PUBLISHED event. int mqtt_event_is_published(void* data) // Tests whether the event data passed to the calling process // is for an MQTT_EVENT_TYPE_EXITED event. int mqtt_event_is_exited(void* data) // Retrieves the topic from the data returned to the // calling process. The topic data is guaranteed to be // NULL terminated. const char* mqtt_event_get_topic(void* data) // Retrieves the length of the topic from the data returned // to the calling process. uint16_t mqtt_event_get_topic_length(void* data) // Retrieves the message data from the data returned to the // calling process. The data is guaranteed to be NULL // terminated. const char* mqtt_event_get_data(void* data) // Retrieves the length of the message data from the data // returned to the calling process. This is needed if the // message data contains NULLs. uint16_t mqtt_event_get_data_length(void* data)
Low-Level Message Reading/Writing
This is the low-level message handling API used internally by the library.
Initialisation
// Must be called once before any other function is called. // // Initialises the connection structure and associates the // provided buffer with it. The buffer is where created // messages are written and the memory must remain // valid throughout the use of the API. void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, int buffer_length);
Message Reading
// Returns the type of the message in the buffer int mqtt_get_type(uint8_t* buffer); // Returns the DUP flag from he message in the buffer int mqtt_get_dup(uint8_t* buffer); // Returns the QoS level of the message in the buffer int mqtt_get_qos(uint8_t* buffer); // Returns the retain flag from the message in the buffer int mqtt_get_retain(uint8_t* buffer); // Returns a pointer to the topic string within the buffer, // or NULL if there is an error parsing the message. // On calling, length must contain the total length of the // buffer. On return, length will contain the length // of the topic. const char* mqtt_get_publish_topic(uint8_t* buffer, int* length); // Returns a pointer to the message body within the buffer, // or NULL if there is an error parsing the message. // On calling, length must contain the total length of the // buffer. On return, length will contain the length // of the message body. const char* mqtt_get_publish_message(uint8_t* buffer, int* length); // Returns the message ID of the message in the buffer, // or zero if there is an error parsing the message. // when callin, length must be set to the total length // of the buffer. uint16_t mqtt_get_message_id(uint8_t* buffer, int length)
Message Creation
uint8_t* mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info); uint8_t* mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* message, int qos); uint8_t* mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id); uint8_t* mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id); uint8_t* mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id); uint8_t* mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id); uint8_t* mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id); uint8_t* mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic); uint8_t* mqtt_msg_ping(mqtt_connection_t* connection); uint8_t* mqtt_msg_ping_reply(mqtt_connection_t* connection); uint8_t* mqtt_msg_disconnect(mqtt_connection_t* connection);
Hello,
I have tried to test your source code above but the Contiki simulation still do not recognise the mqtt-service.h header.
Any suggestion please.
The error message is as bellow:
user@instant-contiki:~/contiki/examples/rime$ sudo make TARGET=sky mscproject
CC mscproject.c
mscproject.c:12:28: fatal error: mqtt-s-service.h: No such file or directory
compilation terminated.
make: *** [mscproject.co] Error 1
user@instant-contiki:~/contiki/examples/rime$ sudo make TARGET=sky mscproject
CC mscproject.c
mscproject.c:12:28: fatal error: mqtt-s-service.h: No such file or directory
compilation terminated.
make: *** [mscproject.co] Error 1
user@instant-contiki:~/contiki/examples/rime$ sudo make TARGET=sky syed-example-broadcast
CC syed-example-broadcast.c
syed-example-broadcast.c:11:28: fatal error: mqtt-s-service.h: No such file or directory
compilation terminated.
make: *** [syed-example-broadcast.co] Error 1
user@instant-contiki:~/contiki/examples/rime$
Hi Esar
Just wanting to know, is this project still active?
Are you planning to maintain the software / get it accepted into Contiki core?
Cheers
Ron
Hi Ron,
I’m not using it in any of my current projects, which has led me to mostly ignore it, but if anyone submits bug fixes and/or improvements then I will try to get them merged in. However due to not using it I don’t have a good environment to test any submissions, so I’m likely to be a bit slow.
I don’t have any plans to try and get it accepted into Contiki core, but if anyone else wants to try then that’s fine with me.
Apologies, I didn’t see your reply until just now. Thanks, that all seems reasonable. So I guess it’s to be taken effectively as a very solid leg-up towards a production application. Best wishes, Ron.
Hi, how this repository is included in Contiki?
I really need to run the MQTT protocol but do not know the Contiki OS,
Please, could you help me?
I haven’t done anything with Contiki for a few years, so all I can really do is point you to this bit of the contiki-mqtt readme:
> To use this library, create a mqtt-service directory in
> the Contiki apps directory and place these library files
> within it. Then add mqtt-service to the APPS variable
> in your application’s makefile.
To see an example of this in use you can look at the Makefile for one of my projects that used it:
https://github.com/esar/myha/blob/master/devices/rako-bridge/Makefile