文章目录
- 一、emqx安装
-
- 1.1 emqx启动
-
- 打开MQTT.fx进行emqx服务器的mqtt客户端测试:
- 1.2 基本命令
- 二、分布式集群
-
- 2.1 Erlang/OTP 分布式编程
-
- 节点(Node)
- 2.2 EMQ X 分布集群设计
-
- 主题树(Topic Trie)与路由表(Route Table)
- 订阅(Subscription)与消息派发
- 节点发现与自动集群
- 手动(manual) 方式管理集群介绍
- 2.3 防火墙设置
- 三、EMQX目录结构
-
- 3.1 bin 目录
- 四、COAP插件的使用
-
- 4.1 开启 emqx_coap 插件
- 4.2 安装libcoap
- 4.3 开启libcoap客户端 订阅
-
- libcoap使用的示例格式
- 开启一个libcoap监听订阅消息的client
- 开启一个libcoap发布消息的client并发布消息
- 在主机打开mqtt.fx实验
一、emqx安装
参考官网教程:官网安装教程
https://www.emqx.cn/downloads#broker
1.1 emqx启动
安装完成后,执行以下命令尝试运行:
//
# 启动emqx
./bin/emqx start
//or
emqx start# 检查运行状态
./bin/emqx_ctl status
//oremqx_ctl status# 停止emqx
./bin/emqx stop
//or
emqx stop
开启之后可以打开控制台查看具体信息,控制台地址: http://127.0.0.1:18083
,默认用户: admin
,密码:public
打开控制台之后界面如下:
左侧可以对EMQX设置,其中plugins可以管理插件,具体内容自行查看
https://developer.emqx.io/docs/emq/v3/cn/plugins.html
或者
https://docs.emqx.io/en/broker/latest/
可以用./bin/emqx_ctl plugins load
插件名字安装插件
打开MQTT.fx进行emqx服务器的mqtt客户端测试:
打开MQTT.fx后,进入设置界面。
输入emqx服务器的地址,本emqx是运行在虚拟机上的,Broker Address地址为192.168.237.131
,Broker Port 为1883
其他配置随便,然后Apply之后,返回主页点击Connect 。
Connect成功后,说明emqx服务器开始成功。
1.2 基本命令
二、分布式集群
2.1 Erlang/OTP 分布式编程
EMQ X 使用Erlang/OTP 平台。Erlang/OTP 最初是为了开发电信设备系统而设计的编程语言平台,在设计时就将集群特性作为一项重要的特性纳入考虑,以 Erlang/OTP 实现的分布式系统在集群上基本都是简单易用而又高效可靠,EMQ X 也不例外。
当某个客户端连接到 EMQ X 集群的一个节点后,他所订阅的主题会被通知到其他节点,他所发布的消息也会被所有节点上订阅了相应主题的客户端收到。用户无需关心其中的实现细节。对于用户而言,这一切都是透明和自动的。
Erlang/OTP 语言平台的分布式程序,由分布互联的 Erlang 运行系统组成,每个 Erlang 运行系统被称为节点(Node),节点(Node) 间通过 TCP 互联,消息传递的方式通信:
节点(Node)
集群中的每台 EMQ X 服务器就是一个节点。?????
在集群内部的通讯中,节点由一个唯一的节点名标识。节点名的格式为 name@host,其中name由用户指定,host是IP地址或者全程域名(FQDN,Fully Qualified Domain Name)。比如:
emqx1@192.168.1.165
emqx2@broker1.emqx.io
节点间通过名称进行通信寻址。 例如在本机启动四个 Erlang 节点,节点名称分别为:
erl -name node1@127.0.0.1
erl -name node2@127.0.0.1
erl -name node3@127.0.0.1
erl -name node4@127.0.0.1
node1@127.0.0.1 控制台下建立与其他节点的连接:
(node1@127.0.0.1)1> net_kernel:connect_node('node2@127.0.0.1').
true
(node1@127.0.0.1)2> net_kernel:connect_node('node3@127.0.0.1').
true
(node1@127.0.0.1)3> net_kernel:connect_node('node4@127.0.0.1').
true
(node1@127.0.0.1)4> nodes().
['node2@127.0.0.1','node3@127.0.0.1','node4@127.0.0.1']
2.2 EMQ X 分布集群设计
EMQ X 消息服务器集群基于 Erlang/OTP 分布式设计,集群原理可简述为下述两条规则:
- MQTT 客户端订阅主题时,所在节点订阅成功后广播通知其他节点:某个主题(Topic)被本节点订阅。
- MQTT 客户端发布消息时,所在节点会根据消息主题(Topic),检索订阅 并 路由消息到相关节点。
EMQ X 消息服务器同一集群的所有节点,都会复制一份主题(Topic) -> 节点(Node)
映射的路由表,例如:
topic1 -> node1, node2
topic2 -> node3
topic3 -> node2, node4
主题树(Topic Trie)与路由表(Route Table)
EMQ X 消息服务器每个集群节点,都保存一份主题树(Topic Trie)和路由表。
例如下述主题订阅关系:
最终会生成如下主题树(Topic Trie)和路由表(Route Table):
订阅(Subscription)与消息派发
客户端的主题订阅(Subscription)关系,只保存在客户端所在节点,用于本节点内派发消息到客户端。
例如client1向主题’t/a’发布消息,消息在节点间的路由与派发流程:
title: Message Route and Deliverclient1 -> node1: Publish[t/a]node1 --> node2: Route[t/#]node2 --> client2: Deliver[t/#]node1 --> node3: Route[t/a]node3 --> client3: Deliver[t/a]
节点发现与自动集群
EMQ X 支持基于 Ekka 库的集群自动发现 (Autocluster)。Ekka 是为 Erlang/OTP 应用开发的集群管理库,支持 Erlang 节点自动发现 (Service Discovery)、自动集群 (Autocluster)、脑裂自动愈合 (Network Partition Autoheal)、自动删除宕机节点 (Autoclean)。
手动(manual) 方式管理集群介绍
假设要在两台服务器 s1.emqx.io, s2.emqx.io 上部署 EMQ X 集群:
注意: 节点名格式为 Name@Host
, Host 必须是 IP 地址或 FQDN (主机名。域名)
- 配置 emqx@s1.emqx.io 节点
emqx/etc/emqx.conf:node.name = emqx@s1.emqx.io 或 node.name = emqx@192.168.0.10
也可通过环境变量:export EMQX_NODE_NAME=emqx@s1.emqx.io && ./bin/emqx start
注意: 节点启动加入集群后,节点名称不能变更。 - 配置 emqx@s2.emqx.io 节点
emqx/etc/emqx.conf:
- 节点加入集群
启动两台节点后,在 s2.emqx.io 上执行:$ ./bin/emqx_ctl cluster join emqx@s1.emqx.io
注意: s2.emqx.io加入集群后会清除本身全部的数据,同步s1.emqx.io节点的数据。如果还有s3.emqx.io节点,那么需要在s3.emqx.io节点去执行命令加入emqx@s1.emqx.io或者emqx@s2.emqx.io, 已经在集群的节点不能在join到其他节点,否则会退出当前集群和join的节点组成一个新的集群 - 在任意节点上查询集群状态:
命令:$ ./bin/emqx_ctl cluster status
- 退出集群
节点退出集群,两种方式:
1.leave: 让本节点退出集群
2.force-leave: 从集群删除其他节点
让 emqx@s2.emqx.io 主动退出集群:$ ./bin/emqx_ctl cluster leave
或在 s1.emqx.io 上,从集群删除 emqx@s2.emqx.io 节点:$ ./bin/emqx_ctl cluster force-leave emqx@s2.emqx.io
2.3 防火墙设置
若预先设置了环境变量 WITH_EPMD=1, 启动 emqx 时会使用启动 epmd (监听端口 4369) 做节点发现。称为 epmd 模式
。 若环境变量 WITH_EPMD 没有设置,则启动 emqx 时不启用 epmd,而使用 emqx ekka 的节点发现,这也是 4.0 之后的默认节点发现方式。称为 ekka 模式
。
epmd 模式: 如果集群节点间存在防火墙,防火墙需要开启 TCP 4369 端口和一个 TCP 端口段。4369 由 epmd 端口映射服务使用,TCP 端口段用于节点间建立连接与通信。
防火墙设置后,需要在 emqx/etc/emqx.conf
中配置相同的端口段:
## Distributed node port range
node.dist_listen_min = 6369
node.dist_listen_max = 7369
ekka 模式(4.0 版本之后的默认模式):
如果集群节点间存在防火墙,默认情况下,只需要开启 TCP 4370 端口。
但如果 node.name 配置制定的节点名字里,带有数字后缀(Offset),则需要开启 4370 + Offset 端口。
比如:
node.name = emqx-1@192.168.0.12
则需要开启 4371 端口。
三、EMQX目录结构
不同安装方式得到的 EMQ X 其目录结构会有所不同,具体如下:
以上目录中,用户经常接触与使用的是 bin
、etc
、data
、log
目录。
3.1 bin 目录
emqx、emqx.cmd
EMQ X 的可执行文件,具体使用可以查看 基本命令。
emqx_ctl、emqx_ctl.cmd
EMQ X 管理命令的可执行文件,具体使用可以查看
四、COAP插件的使用
参考页面:https://github.com/emqx/emqx-coap
https://docs.emqx.cn/cn/broker/latest/modules/coap_protocol.html#%E4%BD%BF%E7%94%A8%E7%A4%BA%E4%BE%8B
4.1 开启 emqx_coap 插件
首先需要安装好emqx,并运行emqx。
打开Dashboard,端口号为18083
开启 emqx_coap 插件时,需要同时开启 emqx_lwm2m 插件,然后再关闭emqx_lwm2m,不然会出现网络不可达的错误(可能是个人原因)
4.2 安装libcoap
安装如下步骤操作即可。
使用上述的 ./configure --enable-documentation=no --enable-tests=no
命令会遇到需要依赖tls,下面命令取消安装tls相关的内容。
./configure --enable-documentation=no --enable-tests=no --prefix=$(pwd)/ISVP_lib --disable-dtls
4.3 开启libcoap客户端 订阅
libcoap使用的示例格式
libcoap使用的示例格式如下:
- Publish example:
libcoap/examples/coap-client -m put -e 1234 "coap://127.0.0.1/mqtt/topic1?c=client1&u=tom&p=secret"
topic name is “topic1”, NOT “/topic1”
client id is client1
username is tom
password is secret
payload is a text string “1234”
A mqtt message with topic=“topic1”, payload=“1234” has been published. Any mqtt client or coap client, who has subscribed this topic could receive this message immediately. - Subscribe example:
libcoap/examples/coap-client -m get -s 10 "coap://127.0.0.1/mqtt/topic1?c=client1&u=tom&p=secret"
topic name is “topic1”, NOT “/topic1”
client id is client1
username is tom
password is secret
subscribe time is 10 seconds
And you will get following result if any mqtt client or coap client sent message with text “1234567” to “topic1”: - Publish、Subscribe 最后一个字段的格式
coap://localhost/mqtt/{topicname}?c={clientid}&u={username}&p={password}
路径中的 "mqtt "为必填项
将 {topicname}、{clientid}、{username} 和 {password} 替换为你的真实值
{topicname} 和 {clientid} 为必填项
如果 clientid 不存在,将返回 “bad_request”
URI 中的 {topicname} 应该用 percent-encoded,以防止特殊字符,如 + 和 #
{username} 和 {password} 是可选的
如果 {username} 和 {password} 不正确,将返回一个 uauthorized 错误
订阅的 QoS 等级恒定为 1
开启一个libcoap监听订阅消息的client
./coap-client -m get -s 10000 "coap://127.0.0.1/mqtt/topic1?c=client1"
博主这里是打开了一个coap CLient,监听订阅报文的时间为10000s。
主题为topic1
,(注意了,是topic1
,不是/topic1
)
client为client1,(这个client不需要预先定义,直接使用就行)
开启一个libcoap发布消息的client并发布消息
./coap-client -m put -e 1234 "coap://127.0.0.1/mqtt/topic1?c=client1"
发布消息的内容为 1234
发送的主题为topic1
clientId为client1
,clientId相同也无冲突!!!
发布消息后,可以观察到订阅topic1
的客户端接收到了发布的消息。
在主机打开mqtt.fx实验
这就利用emqx 的 emqx_coap 插件 实现了coap协议的报文转换成mqtt类型的报文。
./coap-client -m get -s 10000 "coap://47.111.181.157/mqtt/topic/lonn?c=client1"./coap-client -m put -e 1234 "coap://47.111.181.157/mqtt/topic/lonn?c=client001"