Skip to main content

MQTT客户端

简介

  • MQTT全称为消息队列遥测传输(英语:Message Queuing Telemetry Transport),是ISO 标准(ISO/IEC PRF 20922)下基于发布 (Publish) /订阅 (Subscribe)范式的消息协议,工作在 TCP/IP协议族上。
  • MQTT最大优点在于,可以用极少的数据和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
  • MQTT 协议定义了两种网络实体:消息代理(message broker)与客户端(client)。其中,消息代理用于接收来自客户端的消息并转发至目标客户端,也被称为MQTT服务器。MQTT 客户端可以是任何运行有 MQTT 库并通过网络连接至消息代理的设备,例如微型控制器或大型服务器。
  • 在MQTT 协议中,消息的传输是通过主题(topic)管理的,消息的发布者和订阅者均为客户端(client)。消息由主题(Topic)和负载(payload)两部分组成。当某个客户端有需要分发的数据时,会向连接的消息代理(MQTT服务器)发送指定主题的携带有负载的消息。服务器收到消息后,会向订阅此主题的客户端分发此消息。发布者不需要配置订阅者的相关信息和具体位置;同样,订阅者也不需要配置发布者的相关信息和具体位置。

驱动配置

一般情况下,如果实例配置、设备表配置、表记录配置中存在相同的字段,优先级依次升高,不填时默认继承上一级的配置。

  1. 打开设备监控-驱动管理, 点击添加驱动, 选择driver-mqtt-client image.png

  2. 配置实例和设备表参数 image.png

    配置项说明:
    1. MQTT服务器:填写要连接的mqtt服务器的地址。
    格式为 tcp://服务器IP或域名:服务器端口,开头的协议部分根据服务器不同,可填tcp,mqtt,mqtts,ssl,ws,wss
    例如:tcp://127.0.0.1:1883
    2. 用户名、密码:填写MQTT服务器的用户名和密码。
    3. 只使用资产配置中的topic:勾选后,实际只订阅表记录中填写的topic
    4. 客户端:mqtt客户端id,一般不填写,驱动使用随机字符串作为客户端id
    5. 服务质量: 即mqtt协议中的QOS,可填0/1/2
    6. SSL/TLS: 当服务器地址为ssl或mqtts时,需要选true,自认证服务器需要添加证书信息
    7. 订阅消息主题:填写该模型要订阅的消息主题,可使用通配符#和+。
    Topic是个utf-8编码的字符串,可由多个'/'分割开。
    +:为单级通配符,如:check/+/baseline
    #:为多级通配符,必须放在topic的最后,前一个字符必须是'/'
    Topic中不要包含空格符。
    8. 通讯监控参数:平台通用配置,当某个资产数据点中的最新上数时间距离当前时间超过此参数时,平台判定资产掉线。
    9. 自定义脚本:由于mqtt协议中未规定具体的消息内容格式,因此需要用户编写自定义脚本,用于解析消息和组装要发送的消息内容。

    注:实例配置和驱动配置基本相同,实际使用中,相同的字段按优先级覆盖。
  3. 配置实例和设备表参数 image.png 设备匹配规则: 驱动优先匹配表记录配置里的设备标识,然后匹配资产编号

由于MQTT协议通过主题区分不同种类的消息,因此在使用时,不同的主题建议建立不同的设备表,下边介绍脚本的编写方法。

<<<<<<< HEAD (4)通讯监控参数:平台通用配置,当某个资产数据点中的最新上数时间距离当前时间超过此参数时,平台判定资产掉线。 (5)自定义脚本:由于mqtt协议中未规定具体的消息内容格式,因此需要用户编写自定义脚本,用于解析消息和组装要发送的消息内容。
脚本编辑后可进行解析调试或指令调试。

  • 解析调试:输入参数主题和消息内容,点击执行调试按钮,可进行脚本调试,结果将显示在回执结果栏中
    image.png
  • 指令调试:通过选择设备和指令进行调试 image.png

使用示例

由于MQTT协议通过主题区分不同种类的消息,因此在使用时,不同的主题需要建立不同的模型,下边以网关常用的MQTT格式为例,介绍脚本的编写方法。

消息格式

数据采集网关发布的主题的为 box/网关唯一编号/data
假设网关唯一编号为8位,例:box/abcd1234/data
消息内容:{"data":{"d1":100},"time":1644290863000}
=======
#### 脚本说明(示例参考下面的使用示例)
>>>>>>> 3cf92429d1baae61a56ccbb996edcdeda59af30b

脚本语言采用的是JavaScript,一部分为固定格式:

```javascript
// 解析接收数据
// 该服务器和主题收到的消息会调用此方法进行解析,消息主题是方法参数topic,消息内容是方法参数package.
// topic类型为字符串,考虑到部分设备的消息内容为二进制数据,package类型为字节数组.
ParseHandle = function (topic, package) {

// 解析数据
let id = msg.id //id是资产编号,平台通过id判断数据和资产的对应关系,必需
let values = msg.values // values是一个对象,例如{"d1": 100},key为数据点标识,值就是采集的具体数值,必需
// let time = 1644290863000 // 数据点采集时间,UNIX毫秒时间戳,没有time时,默认采用服务器时间
return [ //可返回多个对象,对应多个资产
{id, values} //返回的对象中的key的名字必须为id,values,time, 有时间戳时, 返回{id, values, time}
]
}

// 构建发送数据
// topic为指令上配置的发送指令, id为发送指令时选择的资产的资产编号, op
CommandHandle = function (topic, id, op) {
// 构建发送数据
let sendTopic = topic
let sendData = {"d1": 200}
return {sendTopic, sendData} //返回的对象中的key的名字必须为sendTopic,sendData
}

脚本调试功能(测试功能)

最新版本的mqtt驱动增加了脚本调试功能, 可以分别调试在脚本输入框中编写的解析脚本和指令脚本 image.png image.png

使用示例

示例1-默认脚本

当新建驱动实例或新建设备表时,脚本中默认会有脚本,该脚本对应的报文格式为:

{"id":"表记录编号","values":{"数据点标识":数据点值}}

例如: {"id":"node01","values":{"d1":100,"d2":200}}

脚本:

// 解析接收数据
// 该服务器和主题收到的消息会调用此方法进行解析,消息主题是方法参数topic,消息内容是方法参数package.
// topic类型为字符串;考虑到部分设备的消息内容为二进制数据,package类型为字节数组,
// 当上传数据为json字符串时,首先使用JSON.parse(package)将收到的数据转为json对象.

ParseHandle = function (topic, package) {

let msg = JSON.parse(package)

let id = msg.id
let values = msg.values

return [
{ id, values }
]
}

// 构建发送数据
CommandHandle = function (topic, id, op) {

// 构建发送数据
let sendTopic = topic
let c = JSON.stringify([{ "abc": op.value }])
let sendData = Buffer.from(c)
return { sendTopic, sendData } //返回的对象中的key的名字必须为sendTopic,sendData
}

设备表中的配置如下图所示: image.png

数据点配置:

image.png

添加表记录:

image.png

未配置表记录中设备配置,驱动将按照表记录编号匹配资产.

平台配置完成后,点击重启驱动.

使用mqttx模拟设备,连接mqtt服务器,使用/test/01主题,发布消息{"id":"node01","values":{"d1":100,"d2":200}}

image.png

可在数据点上查看收到的数据

image.png

示例2-JSON类型1

消息格式

某数据采集网关发布的主题的为 /sys/网关id/up, 收到的消息内容如下:
{
"timeStamp": 1514764896,
"version": "5.0",
"messageId": 24,
"devices": [{
"deviceId": "Slave01",
"deviceState": 0,
"deviceData": {
"40001": 0
}
}]
}
其中,devices为数组类型,每个对象对应一个设备及其数据点值. 本示例中,设备id为Slave01,deviceData中为采集的数据
因为平台数据点表上不支持纯数字,因此在脚本中,将报文的中的数据点标识增加一个x字母作为前缀.如将40001改为x40001.
脚本如下:

脚本:

// 解析接收数据
let ParseHandle = function (topic, package) {
const msg = JSON.parse(package.toString());

const res = []

msg.devices.map(el => {
const id = el.deviceId
const values = {}
const entries = Object.entries(el.deviceData)
entries.map(([key, value]) => {
values["x" + key] = Number(value)
})
const time = msg.timeStamp * 1000
res.push({ id, values, time })
})

return res
}
// 构建发送数据
CommandHandle = function (topic, id, op) {

let sendTopic = `box/${id}/command`
let sendData = `{"${op.tag}":${op.value}}`
return {sendTopic, sendData}
}

示例2-JSON类型2

消息格式

收到的topic
收到的消息内容如下:
{
"d":
[
{ "tag": "PLC1:风机未开", "value": 0 },
{ "tag": "PLC1:泵未开", "value": 0 },
{ "tag": "PLC1:阀未开", "value": 0 },
{ "tag": "PLC1:实时功率", "value": 300 },
{ "tag": "PLC2:风机未开", "value": 0 },
{ "tag": "PLC2:泵未开", "value": 1 },
{ "tag": "PLC2:阀未开", "value": 0 },
{ "tag": "PLC2:实时功率", "value": 500 }
],
"ts": "1691573552353"
}

其中,d为数组类型,每个对象对应一个设备的其数据点标识和值. 以{ "tag": "PLC1:实时功率", "value": 300 }为例,
表示设备PLC1的实时功率数据点的值为300.因此,经过脚本处理后,整理为以下平台接收的格式,分别匹配2个设备:
[
{
id: 'PLC1',
values: { '风机未开': 0, '泵未开': 0, '阀未开': 0, '实时功率': 300 },
time: 1691573552353
},
{
id: 'PLC2',
values: { '风机未开': 0, '泵未开': 1, '阀未开': 0, '实时功率': 500 },
time: 1691573552353
}
]

脚本:

// 解析接收数据

ParseHandle = function (topic, package) {
try {
let msg = JSON.parse(package.toString());

let res = {}

// 解析数据为一个对象,key为资产id,值为属于该资产的数据点
// res示例: {"WH_PLC300":{"LT3003":130,"LT5002":181,"LT5001":365}}
if (msg.d) {
for (let i = 0; i < msg.d.length; i++) {
const el = msg.d[i];
let splits = el.tag.split(":")
if (splits.length === 2) {
let did = splits[0]
let tagId = splits[1]
if (!res[did]) {
res[did] = {}
}
res[did][tagId] = Number(el.value)
}
}
}

console.log(JSON.stringify(res))

// 时间解析为毫秒时间戳
// let date = moment(msg.ts)
let time = Number(msg.ts)

// 将res转为平台要求的返回格式
let arr = []
for (const k in res) {
if (Object.hasOwnProperty.call(res, k)) {
const v = res[k];
arr.push({ id: k, values: v, time })
}
}

return arr;

} catch (e) {
console.error("解析脚本错误:", e)
}
return [];
}
// 构建发送数据
CommandHandle = function (topic, id, op) {

let sendTopic = `box/${id}/command`
let sendData = `{"${op.tag}":${op.value}}`
return {sendTopic, sendData}
}

示例3-非json格式的消息解析

消息格式

收到的topic
收到的消息内容如下:
dt01:00000 04201 00077 -11467 00126 0.24 7.11 -76738
消息是一个字符串, 收到的topic是设备id, 冒号后为7个数据点的值,其中最后有一个值缩小100倍,
处理后的格式为
[{"id":"","values":{"d1":"04201","d2":"00077","d3":"-11467","d4":"00126","d5":"0.24","d6":"7.11","d7":-767.38}}]

脚本:

// 解析接收数据
ParseHandle = function (topic, package) {
let msg = package.toString()
let newPkg = msg.split(' ')
let id = topic
console.log(id)
let values = {
"d1": newPkg[1],
"d2": newPkg[2],
"d3": newPkg[3],
"d4": newPkg[4],
"d5": newPkg[5],
"d6": newPkg[6],
"d7": newPkg[7] * 0.01,
}

return [
{ 'id': id, 'values': values }
]
}
// 构建发送数据
CommandHandle = function (topic, id, op) {

let sendTopic = `box/${id}/command`
let sendData = `{"${op.tag}":${op.value}}`
return {sendTopic, sendData}
}

内置对象

日志logger
ParseHandle = function (topic, package) {

logger.Debug(topic)
logger.Info(topic)
logger.Warn(topic)
logger.Error(topic)

let id = msg.id
let values = msg.values
return [
{id, values}
]
}
脚本内mqtt客户端mqttClient
ParseHandle = function (topic, package) {

// 发送到该设备表配置的mqtt服务器
mqttClient.publish("要发送的主题","要发送的消息")

let id = msg.id
let values = msg.values
return [
{id, values}
]
}
脚本内置平台客户端iotClient
ParseHandle = function (topic, package) {

// 指令日志


let id = msg.id
let values = msg.values
return [
{id, values}
]
}

新建服务器方法

1. Windows

1.1 新建一个mqtt的服务器 (Windows)

在软件安装的目录下找到mosquitto文件夹复制并重命名,重命名后在文件夹下找到mosquitto.conf文件以记事本方式打开,在211行找到port 1883。将端口进行修改 例如1884 然后在文件夹窗口下输入cmd进入命令窗口输入mosquitto.exe -c mosquitto.conf运行。 图片.png

1.2 在软件平台中添加mqtt的服务器

进入到运维平台127.0.0.1:13030中,点击服务管理—— 添加服务——高级添加应用名称例如:mqtt2, 命令:mosquitto.exe -c mosquitto.conf 目录:../mqtt2 (mqtt2为本文档一中复制的文件夹重命名后名称) 图片.png

2. Linux

2.1 Linux平台启用mqtt server

找到docker-compose.yml文件位置 图片.png 鼠标右键,选择用记事本编辑 图片.png 图片.png 将下面这句复制到docker-compose.yml里。 mqttserver: container_name: mqttserver environment:

  • RABBITMQ_DEFAULT_USER=admin
  • RABBITMQ_DEFAULT_PASS=public image: airiot/rabbitmq:3.8.3-management-alpine logging: driver: json-file options: max-size: 100m max-file: "1" networks:
  • backend ports:
  • 1884:1883 restart: always ulimits: nproc: 40960 nofile: soft: 10240 hard: 30720 volumes:
  • /etc/localtime:/etc/localtime:ro 图片.png 配置完成后需要进入平台安装目录下 创建并启动服务:docker-compose up -d 查看mqtt服务是否启动:docker ps | grep mqtt