(摘) 基于Golang的MQTT学习

声明:内容源自网络,版权归原作者所有。若有侵权请在网页聊天中联系我

因为涉及到物联网项目(手环类),学习一下MQTT。

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。OASIS现在已经发布了官方的MQTT v5.0标准。

特点

开放消息协议,简单易实现
发布订阅模式,一对多消息发布
基于TCP/IP网络连接,提供有序,无损,双向连接。
1字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量。
消息QoS支持,可靠传输保证

MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

用自己的话来说,就是:代理是中间人,提供信息中转。而发布者发信息,订阅者收信息。

MQTT服务质量 qos

为了满足不同的场景,MQTT支持三种不同级别的服务质量(Quality of Service,QoS)为不同场景提供消息可靠性:

0. 最多发送一次(发完就忘),也就是不确认
1. 至少发送一次,需要进行确认
2. 正好发送一次,要进行 4 步握手

关于质量,知乎网友有讲得更详细

更详细的就看网站吧,它们解释的更具体。

服务端

找到了一个golang的服务端: https://github.com/DrmagicE/gmqtt 他实现了MQTT V3.1.1和V5协议。 这里还有最喜欢的中文说明

go get 后,cd cmd/gmqttd, go run . start -c default_config.yml 可以启动gmqtt,监听1883端口提供TCP服务和8883端口提供websocket服务。Gmqtt默认配置没有启用鉴权,客户端不需配置鉴权可以直接连接。

客户端

go get github.com/eclipse/paho.mqtt.golang

在cmd目录中找一个示例学习一下

示例1:

在cmd/simple目录下的示例

package main

import (
	"fmt"
	"log"
	"os"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("主题: %s\n", msg.Topic())
	fmt.Printf("信息: %s\n", msg.Payload())
}

func main() {
	//mqtt.DEBUG = log.New(os.Stdout, "", 0)
	mqtt.ERROR = log.New(os.Stdout, "", 0)
	opts := mqtt.NewClientOptions().AddBroker("tcp://127.0.0.1:1883").SetClientID("gotrivial")
	opts.SetKeepAlive(2 * time.Second)
	// 注入一个client收到消息后对消息处理的方法
	opts.SetDefaultPublishHandler(f)
	opts.SetPingTimeout(1 * time.Second)

	// 启动一个链接
	c := mqtt.NewClient(opts)
	if token := c.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	// 订阅一个topic
	if token := c.Subscribe("EaseSample", 0, nil); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}

	// 向topic发布消息, SetDefaultPublishHandler里收消息
	for i := 0; i < 5; i++ {
		text := fmt.Sprintf("this is msg #%d!", i)
		token := c.Publish("EaseSample", 0, false, text)
		token.Wait()
	}

	time.Sleep(6 * time.Second)

	// 取消订阅
	if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}

	c.Disconnect(250)

	time.Sleep(1 * time.Second)
}

运行成功,自发自收。但修改的A发B收却没成功。继续学习。

示例2:

在cmd/smaple目录下找到示例运行。以下示例中,兼具发布和接收两个身份。成功完成了收发。

发布:go run main.go -broker tcp://127.0.0.1:1883 -id EasePub -action pub -message TestInfo4 -topic EaseSample

订阅:gr main.go -broker tcp://127.0.0.1:1883 -id EaseSub -action sub -topic EaseSample

package main

import (
	"flag"
	"fmt"
	"os"

	MQTT "github.com/eclipse/paho.mqtt.golang"
)

func main() {
	topic := flag.String("topic", "", "The topic name to/from which to publish/subscribe")
	broker := flag.String("broker", "tcp://iot.eclipse.org:1883", "The broker URI. ex: tcp://10.10.1.1:1883")
	password := flag.String("password", "", "The password (optional)")
	user := flag.String("user", "", "The User (optional)")
	id := flag.String("id", "testgoid", "The ClientID (optional)")
	cleansess := flag.Bool("clean", false, "Set Clean Session (default false)")
	qos := flag.Int("qos", 0, "The Quality of Service 0,1,2 (default 0)")
	num := flag.Int("num", 1, "The number of messages to publish or subscribe (default 1)")
	payload := flag.String("message", "", "The message text to publish (default empty)")
	action := flag.String("action", "", "Action publish or subscribe (required)")
	store := flag.String("store", ":memory:", "The Store Directory (default use memory store)")
	flag.Parse()

	if *action != "pub" && *action != "sub" {
		fmt.Println("Invalid setting for -action, must be pub or sub")
		return
	}

	if *topic == "" {
		fmt.Println("Invalid setting for -topic, must not be empty")
		return
	}

	fmt.Printf("Sample Info:\n")
	fmt.Printf("\taction:    %s\n", *action)
	fmt.Printf("\tbroker:    %s\n", *broker)
	fmt.Printf("\tclientid:  %s\n", *id)
	fmt.Printf("\tuser:      %s\n", *user)
	fmt.Printf("\tpassword:  %s\n", *password)
	fmt.Printf("\ttopic:     %s\n", *topic)
	fmt.Printf("\tmessage:   %s\n", *payload)
	fmt.Printf("\tqos:       %d\n", *qos)
	fmt.Printf("\tcleansess: %v\n", *cleansess)
	fmt.Printf("\tnum:       %d\n", *num)
	fmt.Printf("\tstore:     %s\n", *store)

	opts := MQTT.NewClientOptions()
	opts.AddBroker(*broker)
	opts.SetClientID(*id)
	opts.SetUsername(*user)
	opts.SetPassword(*password)
	opts.SetCleanSession(*cleansess)
	if *store != ":memory:" {
		opts.SetStore(MQTT.NewFileStore(*store))
	}

	if *action == "pub" {
		client := MQTT.NewClient(opts)
		if token := client.Connect(); token.Wait() && token.Error() != nil {
			panic(token.Error())
		}
		fmt.Println("Sample Publisher Started")
		for i := 0; i < *num; i++ {
			fmt.Println("---- doing publish ----")
			token := client.Publish(*topic, byte(*qos), false, *payload)
			token.Wait()
		}

		client.Disconnect(250)
		fmt.Println("Sample Publisher Disconnected")
	} else {
		receiveCount := 0
		choke := make(chan [2]string)

		opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
			choke <- [2]string{msg.Topic(), string(msg.Payload())}
		})

		client := MQTT.NewClient(opts)
		if token := client.Connect(); token.Wait() && token.Error() != nil {
			panic(token.Error())
		}

		if token := client.Subscribe(*topic, byte(*qos), nil); token.Wait() && token.Error() != nil {
			fmt.Println(token.Error())
			os.Exit(1)
		}

		for receiveCount < *num {
			incoming := <-choke
			fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1])
			receiveCount++
		}

		client.Disconnect(250)
		fmt.Println("Sample Subscriber Disconnected")
	}
}

至此,基本流程跑通。还存在一些未解决的问题:

  1. 授权:现在谁都可以收发消息)。在示例2中也有收发消息用的帐号和密码参数可用。

Gmqtt内置了基于username/password的简单鉴权机制。 Gmqtt默认配置没有开启鉴权,可以通过修改配置文件来加载鉴权插件。

它是以插件的形式解决的。auth插件

在配置文件 default_config.yml 中可以进行设置。

而保存用户密码的文件 gmqtt_password.yml 样式如下:

- username: u1
  password: p1
- username: u2
  password: p2

看配置文件中,密码还支持多种密文格式。

  1. 消息存储:上文中提到的服务器暂时使用内存存储消息,虽然也支持redis。而在实际中,我们可能会用到sqlite3小型数据库,或者mysql/postgresql专业数据库。不知道它的插件机制能否很快的解决这个需求。

最后,我还好奇的试了中文信息的收发,没问题,正确收发。 比如象这样:go run main.go -broker tcp://127.0.0.1:1883 -id Ease -action pub -message 中文信息发送 -topic 测试

在收取多条消息时,似乎总存在:接收端不能收到之前发送的消息。原因待查。(根据学习的情况分析,是关于信息质量QOS的设置)


测试用户名密码,防止随意接入:

  1. 在配置文件中启用auth,并配置auth的设置文件,例如:
# 设置加密方式及配置文件
  auth:
    # Password hash type. (plain | md5 | sha256 | bcrypt)
    # Default to MD5. 
    hash: plain
    # The file to store password. Default to $HOME/gmqtt_password.yml
    password_file: ./gmqtt_password.yml

# plugin loading orders
plugin_order:
  # Uncomment auth to enable authentication.
  - auth
  - prometheus
  - admin
  1. 发布:

go run main.go -broker tcp://127.0.0.1:1883 -id EasePub -action pub -message 中文信息发送 -topic 测试 -user ease -password 123456

  1. 订阅:

go run main.go -broker tcp://127.0.0.1:1883 -id EaseSub -action sub -message 1中文信息发送 -topic 测试 -user ease -password 123456

当没有设置帐号密码时,代理端显示错误信息,且不能发布和订阅。


admin插件

客户信息列表,包括了发布和订阅都算是客户。这里jq是用于输出格式化: curl 127.0.0.1:8083/v1/clients | jq

指定客户信息,区分大小写: curl 127.0.0.1:8083/v1/clients/Ease | jq

筛选订阅: curl 127.0.0.1:8083/v1/filter_subscriptions?filter_type=1,2,3&match_type=1&topic_name=测试

发布讯息: curl -X POST 127.0.0.1:8083/v1/publish -d ‘{“topic_name”:“a”,“payload”:“test”,“qos”:1}’