Ads 468x60px

##EasyReadMore##

05 8月, 2016

MQtt public & subscrib

範例如下:

#include <stdio.h>
#include <errno.h>  
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include "clientMqtt.h"

#define STATUS_CONNECTING 0
#define STATUS_CONNACK_RECVD 1
#define STATUS_WAITING 2
static int status = STATUS_CONNECTING;
static int mid_sent = 0;
static int last_mid = -1;
static int last_mid_sent = -1;
static bool disconnect_sent = true;
char *topic;
char* userName;
char* password;
int qos;
static bool connected = true;
const void *message;
long msglen = 0;
int retain = 0;

//struct and callback for the message
message__mqtt_t state_message;
message_mqtt_cb_t state_callback_message;

ClientMqtt newClientMqtt(char* _ip, char* _port, char* _client, char* _topic, int _qos)
{
        ClientMqtt clientMqtt;
        clientMqtt.ip = _ip;
        clientMqtt.port = _port;
        clientMqtt.client = _client;
        clientMqtt.topic = _topic;
 clientMqtt.qos = _qos;
        return clientMqtt;
}

int message_mqtt_state_cb( message_mqtt_cb_t cb){

        state_callback_message=cb;
}

void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
      printf("CLIENTMQTT: callback  my_message_callback\n");
        char bufferJson[1024];
        if(message->payloadlen){
            state_message.auth_sts=0;
            memset(state_message.message,0,1024);
            int cp = 0;
            sprintf(state_message.message, "%s", message->payload);          
            printf("billy[%s:%d,%s] message = %s\n", __FILE__, __LINE__, __FUNCTION__,state_message.message);
            state_callback_message(&state_message);

        }else{
             printf("CLIENTMQTT: %s (null)\n", message->topic);
        }
        fflush(stdout);
}

void my_subscribe_username(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
        mosquitto_username_pw_set(mosq, userName, password); 
}


void my_sub_connect_callback(struct mosquitto *mosq, void *obj, int result)
{
        int i;
        struct ClientMqtt *ud;
        ud = (struct ClientMqtt *)obj;

        if(!result){
            for(i=0; i<1; i++){
                mosquitto_subscribe(mosq, NULL, topic, qos);
            }
        }else{
            if(result  ){
                fprintf(stderr, "%s\n", mosquitto_connack_string(result));
            }
        }
}

void my_pub_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
        int rc = MOSQ_ERR_SUCCESS;
        bool quiet = true;
        status = STATUS_CONNACK_RECVD;
}

void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
        int i;
//         printf("CLIENTMQTT: Subscribed (mid: %d): %d", mid, granted_qos[0]);
        for(i=1; i<qos_count; i++){
//             printf("CLIENTMQTT: , %d", granted_qos[i]);
        }
//         printf("CLIENTMQTT: \n");
}

void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str)
{
        printf("CLIENTMQTT: %s\n", str);
}

void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
{
      //printf("CLIENTMQTT: richiamato publish callback\n");
        connected = false;
        last_mid_sent = mid;

        if(mid == last_mid){
            mosquitto_disconnect(mosq);
            disconnect_sent = true;
        }
        else if(disconnect_sent == false){
            mosquitto_disconnect(mosq);
            disconnect_sent = true;
        }
}

void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc)
{
        connected = false;
}


int client_subscribe(ClientMqtt sender, char* _userName, char* _password){
        char id[30];
        int i;
        char *host = sender.ip;
        char *client = sender.client;
        int port = atoi(sender.port);//8000;
        topic = sender.topic;
        qos = sender.qos;
        userName = _userName;
        password = _password;
        int keepalive = 60;
        bool clean_session = false;
        struct mosquitto *mosq ;

        mosquitto_lib_init();

        mosq = mosquitto_new(client, clean_session, &sender);
        if(!mosq){
//                 printf("CLIENTMQTT: Error: Impossible create struct mosquitto.\n");
                return ERROR_MOSQUITTO;
        }
        mosquitto_username_pw_set(mosq, userName, password) ;
        mosquitto_connect_callback_set(mosq, my_sub_connect_callback);
        mosquitto_message_callback_set(mosq, my_message_callback); 
        mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);

        if(mosquitto_connect(mosq, host, port, keepalive)){
//                 printf("CLIENTMQTT: Unable to connect.\n");
                return ERROR_CONNECT;
        }

        int lp;

        mosquitto_loop_forever(mosq, -1, 1);       
        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
        return SUBSCRIBE_CORRECT;
}

int client_publish(ClientMqtt sender, char *_message, char* _userName, char* _password){
        char id[30];
        int i;
        char *host = sender.ip;
        char *client = sender.client;
        int port = atoi(sender.port);//1883;
        topic = sender.topic;
        qos = sender.qos;
        userName = _userName;
        password = _password;      
        int keepalive = 60;
        connected = true;
        int debug = 0;
        message = _message;
        msglen = strlen(message);
        retain = 1;
        bool clean_session = true;
        struct mosquitto *mosq ;

        mosquitto_lib_init();
        mosq = mosquitto_new(client, true, NULL);
        if(!mosq){
//                 printf("CLIENTMQTT: Error: Impossible create struct mosquitto.\n");
                return ERROR_MOSQUITTO;
        }

        if(debug==1)
           mosquitto_log_callback_set(mosq, my_log_callback);
        mosquitto_username_pw_set(mosq, userName, password);// 
        int max_inflight = 20;
//        mosquitto_max_inflight_messages_set(mosq, max_inflight);
        mosquitto_connect_callback_set(mosq, my_pub_connect_callback);
        mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
        mosquitto_publish_callback_set(mosq, my_publish_callback);

        int rc = mosquitto_connect(mosq, host, port, keepalive);
            if(rc == MOSQ_ERR_ERRNO){
//                 printf("CLIENTMQTT: Error to connect\n");
                return ERROR_CONNECT;
            }
        int lp=1;
        int p = -1;
        int rc1=1;
        do{
            if(p!=0){
                p = mosquitto_publish( mosq, &lp, topic,strlen(message),(uint8_t *)message, qos, 0);
                if(p){
//                     printf( "Error: Publish returned %d, disconnecting.\n", p);
                    mosquitto_disconnect(mosq);
                }/*else{
                    printf( "Error: Impossible publish the message.\n");  
                    mosquitto_disconnect(mosq);
                    return ERROR_PUBLISH;
                }*/

            }
            
            rc1 = mosquitto_loop(mosq, -1, 1);
            usleep(100000);
        }while(rc1 == MOSQ_ERR_SUCCESS && connected);     
 
        mosquitto_loop_stop(mosq, false);
//         printf("CLIENTMQTT: Publish at the topic=%s with qos=%d\n",topic,qos);
        mosquitto_disconnect(mosq);

        mosquitto_destroy(mosq);
        mosquitto_lib_cleanup();
//         printf("CLIENTMQTT: Publish End\n");
        return PUBLISH_CORRECT;
}        

static pthread_t g_MQttSub_td = 0;
//char *g_mqttIP = "172.25.13.94";
char *g_mqttIP = "iotdemo.ecs.com.tw";
char *g_mqttPort = "1883";
char *g_userName = "";
char *g_password = "";

static void *MQttSub(void *prama){
 char *topic = "Registry";
 int rc = -1;
 char cid[128];

 sprintf(cid, "%d", pthread_self());
 printf("subscribe pid [%d],tid [%s]\n", getpid(), cid);
 ClientMqtt sender = newClientMqtt( g_mqttIP, g_mqttPort, cid, topic, 0);
 rc = client_subscribe(sender, g_userName, g_password);
 printf("billy[%s:%d,%s] rc = %d\n", __FILE__, __LINE__, __FUNCTION__,rc);
}

int sub(){
 char *topic = "Registry";
 int rc = -1;
 
 ClientMqtt sender = newClientMqtt( g_mqttIP, g_mqttPort, "307792512", topic, 0);
 rc = client_subscribe(sender, g_userName, g_password);
 printf("billy[%s:%d,%s] rc = %d\n", __FILE__, __LINE__, __FUNCTION__,rc);
}

int main(int ac, char **av)
{
// char *msg = "{\"IP\":\"172.25.67.142\",\"data\":[{\"dl\":[{\"ab\":\"4\",\"sid\":\"SwitchDevice\",\"st\":\"3\"}],\"dn\":\"WZB-SMG02\",\"uid\":\"Zigbee-CID1-00124B0004261503\"},{\"dl\":[{\"ab\":\"4\",\"sid\":\"Gaso_Detector\",\"st\":\"18\"}],\"dn\":\"SG-02-GS-H1\",\"uid\":\"Zigbee-CID1-00124B000426161A\"},{\"dl\":[{\"ab\":\"4\",\"sid\":\"Smoke\",\"st\":\"5\"}],\"dn\":\"SG-01-SK-HB\",\"uid\":\"Zigbee-CID1-00124B000426156D\"},{\"dl\":[{\"ab\":\"1\",\"sid\":\"CO2\",\"st\":\"c\"}],\"dn\":\"SG-02-CO2-H1\",\"uid\":\"Zigbee-CID1-00124B00078341BB\"},{\"dl\":[{\"ab\":\"4\",\"sid\":\"PIR_Motion\",\"st\":\"6\"}],\"dn\":\"WZB-SPM05\",\"uid\":\"Zigbee-CID1-00124B0005A7BCB8\"},{\"dl\":[{\"ab\":\"1\",\"sid\":\"Power_Meter_U\",\"st\":\"11\"},{\"ab\":\"1\",\"sid\":\"Power_Meter_I\",\"st\":\"10\"},{\"ab\":\"1\",\"sid\":\"Power_Meter_PF\",\"st\":\"15\"},{\"ab\":\"3\",\"sid\":\"PowerPlug\",\"st\":\"17\"}],\"dn\":\"A11-H1\",\"uid\":\"Zigbee-CID1-00124B0004261318\"},{\"dl\":[{\"ab\":\"1\",\"sid\":\"Thermometer\",\"st\":\"1\"},{\"ab\":\"1\",\"sid\":\"Humidity\",\"st\":\"2\"}],\"dn\":\"STH-03ZB\",\"uid\":\"Zigbee-CID1-00124B00049BEB39\"},{\"dl\":[{\"ab\":\"1\",\"sid\":\"Thermometer\",\"st\":\"1\"},{\"ab\":\"1\",\"sid\":\"Humidity\",\"st\":\"2\"}],\"dn\":\"STH-03ZB\",\"uid\":\"Zigbee-CID1-00124B0005A7B6A9\"},{\"dl\":[{\"ab\":\"1\",\"sid\":\"Power_Meter_U\",\"st\":\"11\"},{\"ab\":\"1\",\"sid\":\"Power_Meter_I\",\"st\":\"10\"},{\"ab\":\"1\",\"sid\":\"Power_Meter_PF\",\"st\":\"15\"},{\"ab\":\"3\",\"sid\":\"PowerPlug\",\"st\":\"17\"}],\"dn\":\"A11-H1\",\"uid\":\"Zigbee-CID1-00124B000426135B\"},{\"dl\":[{\"ab\":\"4\",\"sid\":\"PIR_Motion\",\"st\":\"6\"}],\"dn\":\"WZB-SPM05\",\"uid\":\"Zigbee-CID1-00124B0005A7BDC8\"},{\"dl\":[{\"ab\":\"4\",\"sid\":\"Smoke\",\"st\":\"5\"}],\"dn\":\"SG-01-SK-HB\",\"uid\":\"Zigbee-CID1-00124B000782F08D\"},{\"dl\":[{\"ab\":\"4\",\"sid\":\"SwitchDevice\",\"st\":\"3\"}],\"dn\":\"WZB-SMG02\",\"uid\":\"Zigbee-CID1-00124B0004261536\"}],\"date\":\"2016-07-27 17:24:21\",\"mac\":\"b8aeedd177ef\",\"sn\":\"001\"}";
 char *msg = "hello! I'm billy";
 char *topic = "Registry";
 int rc = 0;
 char pwd[128];
 
 /*public message*/
 ClientMqtt sender = newClientMqtt( g_mqttIP, g_mqttPort, "307792512", topic, 0);
 client_publish(sender, msg, g_userName, g_password);
 
 /*subscrib message*/
// if( pthread_create(&g_MQttSub_td,NULL,&MQttSub,NULL)){
//  printf("billy[%s:%d,%s] error\n", __FILE__, __LINE__, __FUNCTION__);
// }
// sub();
 return 0;
}

編譯時記得加上:
-lmosquitto

smartlab/reference-realtime/linux/libraries/SDPlibC at 99c2b292c36d9650b4cb618dbe13e26e81efe8ef · csipiemonte/smartlab - https://goo.gl/dYU67B

0 意見:

張貼留言

 
Blogger Templates