Software (mqtt.org)中mqtt客户端的c库里面有一个叫MQTT-C的库,就2个实现文件,算比较简单的了,实现了基本的mqtt客户端功能,移植一下试试。
我的移植代码放在我的资源里面:https://download.csdn.net/download/oushaojun2/87281533?spm=1001.2014.3001.5501
进到这个项目的github仓库地址:https://github.com/LiamBindle/MQTT-C
src里面有两个文件mqtt.c和mqtt_pal.c,第一个是mqtt的实现,第二个是移植需要的文件。移植文件里面主要包括了常见平台的socket接收和发送函数的封装,假如移植到自己的平台可能需要修改这个文件里面的代码,目前的移植是想要在visual studio里面移植,里面已经有了移植接口了。
移植到visual studio里面的步骤如下:
1 将MQTT-C的代码增加到visual studio的一个空白工程里面。需要的文件如下,记得删掉创建工程是自带的文件和修改文件包含路径:
2 修改mqtt_pal.h,128行增加一行:#pragma comment(lib,"ws2_32.lib"),为了在win32平台下链接到ws2_32.lib库,否则编译
3 修改posix_sockets.h内容,虽然这个头文件是按照socket标准接口来调用的,但是win32的socket接口跟linux的接口有些不一样,例如close在win32里面是没有的,gai_strerror在win32里面没效果,win32需要调用WSAStartup函数。修改如下:
#if !defined(__POSIX_SOCKET_TEMPLATE_H__)
#define __POSIX_SOCKET_TEMPLATE_H__
#include <stdio.h>
#include <sys/types.h>
#if !defined(WIN32)
#include <sys/socket.h>
#include <netdb.h>
#else
#include <ws2tcpip.h>
#define close closesocket
#endif
#if defined(__VMS)
#include <ioctl.h>
#endif
#include <fcntl.h>
/*
A template for opening a non-blocking POSIX socket.
*/
int open_nb_socket(const char* addr, const char* port);
int open_nb_socket(const char* addr, const char* port) {
struct addrinfo hints = {0};
hints.ai_family = AF_UNSPEC; /* IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* Must be TCP */
int sockfd = -1;
int rv;
struct addrinfo *p, *servinfo;
#if defined(WIN32)
{
WSADATA wsaData;
int iResult;
iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iResult != NO_ERROR) {
printf("WSAStartup failed: %d\n", iResult);
return 1;
}
}
#endif
/* get address information */
rv = getaddrinfo(addr, port, &hints, &servinfo);
if(rv != 0) {
#if defined(__UNIX__)
fprintf(stderr, "Failed to open socket (getaddrinfo): %s\n", gai_strerror(rv));
#else
fprintf(stderr, "Failed to open socket (getaddrinfo): %s\n", gai_strerrorA(rv));
#endif
return -1;
}
/* open the first possible socket */
for(p = servinfo; p != NULL; p = p->ai_next) {
sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
if (sockfd == -1) continue;
/* connect to server */
rv = connect(sockfd, p->ai_addr, p->ai_addrlen);
if(rv == -1) {
close(sockfd);
sockfd = -1;
continue;
}
break;
}
/* free servinfo */
freeaddrinfo(servinfo);
/* make non-blocking */
#if !defined(WIN32)
if (sockfd != -1) fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL) | O_NONBLOCK);
#else
if (sockfd != INVALID_SOCKET) {
int iMode = 1;
ioctlsocket(sockfd, FIONBIO, &iMode);
}
#endif
#if defined(__VMS)
/*
OpenVMS only partially implements fcntl. It works on file descriptors
but silently fails on socket descriptors. So we need to fall back on
to the older ioctl system to set non-blocking IO
*/
int on = 1;
if (sockfd != -1) ioctl(sockfd, FIONBIO, &on);
#endif
/* return the new socket fd */
return sockfd;
}
#endif
4 修改simple_publisher.c文件,这个文件的接口都是posix接口,在win32环境下有些要修改,例如建立线程函数等。修改如下:
/**
* @file
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
#if defined(__unix__)
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#else
#include <stdlib.h>
#include <stdio.h>
#pragma warning(disable : 4996)
#endif
#include <mqtt.h>
#include "posix_sockets.h"
/**
* @brief The function that would be called whenever a PUBLISH is received.
*
* @note This function is not used in this example.
*/
void publish_callback(void** unused, struct mqtt_response_publish *published);
/**
* @brief The client's refresher. This function triggers back-end routines to
* handle ingress/egress traffic to the broker.
*
* @note All this function needs to do is call \ref __mqtt_recv and
* \ref __mqtt_send every so often. I've picked 100 ms meaning that
* client ingress/egress traffic will be handled every 100 ms.
*/
#if defined(__UNIX__)
void* client_refresher(void* client);
#else
DWORD WINAPI client_refresher(LPVOID client);
#endif
/**
* @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit.
*/
#if defined(__unix__)
void exit_example(int status, int sockfd, pthread_t *client_daemon);
#else
void exit_example(int status, int sockfd, HANDLE client_daemon);
#endif
/**
* A simple program to that publishes the current time whenever ENTER is pressed.
*/
int main(int argc, const char *argv[])
{
const char* addr;
const char* port;
const char* topic;
/* get address (argv[1] if present) */
if (argc > 1) {
addr = argv[1];
} else {
addr = "test.mosquitto.org";
}
/* get port number (argv[2] if present) */
if (argc > 2) {
port = argv[2];
} else {
port = "1883";
}
/* get the topic name to publish */
if (argc > 3) {
topic = argv[3];
} else {
topic = "datetime";
}
/* open the non-blocking TCP socket (connecting to the broker) */
int sockfd = open_nb_socket(addr, port);
if (sockfd == -1) {
perror("Failed to open socket: ");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* setup a client */
struct mqtt_client client;
uint8_t sendbuf[2048]; /* sendbuf should be large enough to hold multiple whole mqtt messages */
uint8_t recvbuf[1024]; /* recvbuf should be large enough any whole mqtt message expected to be received */
mqtt_init(&client, sockfd, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf), publish_callback);
/* Create an anonymous session */
const char* client_id = NULL;
/* Ensure we have a clean session */
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
/* Send connection request to the broker. */
mqtt_connect(&client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400);
/* check that we don't have any errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, NULL);
}
/* start a thread to refresh the client (handle egress and ingree client traffic) */
#if defined(__UNIX__)
pthread_t client_daemon;
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
#else
HANDLE client_daemon;
DWORD dwThreadIdArray;
client_daemon = CreateThread(
NULL, // default security attributes
0, // use default stack size
client_refresher, // thread function name
&client, // argument to thread function
0, // use default creation flags
&dwThreadIdArray); // returns the thread identifier
#endif
/* start publishing the time */
printf("%s is ready to begin publishing the time.\n", argv[0]);
printf("Press ENTER to publish the current time.\n");
printf("Press CTRL-D (or any other key) to exit.\n\n");
while(fgetc(stdin) == '\n') {
/* get the current time */
time_t timer;
time(&timer);
struct tm* tm_info = localtime(&timer);
char timebuf[26];
strftime(timebuf, 26, "%Y-%m-%d %H:%M:%S", tm_info);
/* print a message */
char application_message[256];
snprintf(application_message, sizeof(application_message), "The time is %s", timebuf);
printf("%s published : \"%s\"", argv[0], application_message);
/* publish the time */
mqtt_publish(&client, topic, application_message, strlen(application_message) + 1, MQTT_PUBLISH_QOS_0);
/* check for errors */
if (client.error != MQTT_OK) {
fprintf(stderr, "error: %s\n", mqtt_error_str(client.error));
exit_example(EXIT_FAILURE, sockfd, &client_daemon);
}
}
/* disconnect */
printf("\n%s disconnecting from %s\n", argv[0], addr);
#if defined(__UNIX__)
sleep(1);
#else
Sleep(1000);
#endif
/* exit */
exit_example(EXIT_SUCCESS, sockfd, &client_daemon);
}
#if defined(__UNIX__)
void exit_example(int status, int sockfd, pthread_t *client_daemon)
{
if (sockfd != -1) close(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
exit(status);
}
#else
void exit_example(int status, int sockfd, HANDLE client_daemon)
{
if (sockfd != -1) close(sockfd);
if (client_daemon != NULL) CloseHandle(client_daemon);
exit(status);
}
#endif
void publish_callback(void** unused, struct mqtt_response_publish *published)
{
/* not used in this example */
}
#if defined(__UNIX__)
void* client_refresher(void* client)
#else
DWORD WINAPI client_refresher(LPVOID client)
#endif
{
while(1)
{
mqtt_sync((struct mqtt_client*) client);
#if defined(__UNIX__)
usleep(100000U);
#else
Sleep(100);
#endif
}
return NULL;
}
点击visual studio编译后运行,这个程序会去连接test.mosquitto.org的1883接口,然后订阅datetime主题,当用户在命令行点击换行后将发布消息到datetime主题,消息内容为当前时间。
在另外一个mqtt客户端订阅这个主题后会收到发布的消息: