MQTT**(**消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件。
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
主题(Topic),负载(payload)和QoS
“至多一次”(QoS0):消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。即是推送之后就完事了,至于对方有没有收到,收到是什么,数据有没有丢失,都不管。
“至少一次”(QoS1):确保消息到达,但消息重复可能会发生。即是你收到推送后,你还得返回一个puback给对方,告诉对方收到了,不然对方会以为你没收到,隔一段时间后重新给你推送,直到你给对方返回一个Puback为止。
“只有一次”(QoS2):确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
使用 Last Will 和 Testament 特性通知有关各方客户端。
MQTT服务器有很多种,且部署方式也不一样。
安装命令:
curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
sudo apt-get install emqx :安装
sudo systemctl start emqx :启动
下载链接:https://www.emqx.io/zh/downloads?os=Ubuntu
Docker:https://emqx.io/zh/downloads?os=Docker
Linux(centos):https://www.emqx.io/zh/downloads?os=CentOS
windows:https://www.emqx.io/zh/downloads?os=Windows
测试:确保emqx已正常运行后,可在Linux本地浏览器中输入: http://127.0.0.1:18083。
注意:未登录需修改密码,两次保持一致。
局域网其他电脑访问须知Linux的IP地址。
#代表通配符,代表订阅所有该类型的topic。
依赖地址:https://gitee.com/openharmony-tpc/ohos_mqtt。
有所不同的是我用的npm安装的依赖npm install @ohos/mqtt。
创建mqtt客户端并建立连接:
const clientOptions: MqttClientOptions = {
url: '192.168.xxx.xxx/:1883',
clientId: 'client_id_' + new Date().getTime(),
persistenceType: 1,
}
const connectOptions: MqttConnectOptions = {
userName: '',
password: '',
connectTimeout: 30,
}
this.mqClient = MqttAsync.createMqtt(this.clientOptions);
this.mqClient.connect(this.connectOptions, (data: MqttResponse) => {
console.log(TAG+" data: "+JSON.stringify(data));
if (data.code == 0) {
this.messageArrived();
this.subscribe('主设备号/#');
}
});
//接收消息,使用此接口后,当订阅的主题有消息发布时,会自动接收到消息。
public messageArrived(): void {
this.mqClient.messageArrived((err, data) => {
console.log(TAG+"messageArrived!!!!!!!!!!!");
console.log(TAG+"messageArrived data:"+JSON.stringify(data));
});
}
// 订阅消息
public subscribe(topic: string, qos: QoS = 1): void {
const subscribeOption: MqttSubscribeOptions = { topic, qos };
this.mqClient.subscribe(subscribeOption, (err, data)=>{
this.handleMessage(data)
});
}
// 发布消息
public publish<T>(topic: string, payload: string | Record<string, any>, qos: QoS = 0): void {
if (typeof payload !== 'string') {
payload = JSON.stringify(payload)
}
const payloadLen = payload.length;
const publishOption: MqttPublishOptions = { topic, payload, qos, payloadLen };
console.log(TAG, 'publishOption data: ' + JSON.stringify(publishOption));
this.mqClient.publish(publishOption, (err, data)=>{
console.log(TAG+"publish!!!!!!!!!!!");
console.log(TAG+"publish data:"+JSON.stringify(data));
}));
}
handleMessage(data:any){
console.log(TAG+"subscribe!!!!!!!!!!!");
console.log(TAG+"subscribe data:"+JSON.stringify(data));
//根据data的不同进行不同的处理
}
mqtt与MQ中间件的关系。
消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统。
MQTT 与消息队列有一定的区别,队列是一种先进先出的数据结构,消息队列常用于应用服务层面,实现参考如 RabbitMQ Kafka RocketMQ。
MQTT 是传输协议,绝大部分 MQTT Broker 不保证消息顺序(Queue),常用语物联网、消息传输等。