MQTT5の仕様策定が進んでいたので実装した
随分前にMQTT5の仕様策定があることに気づいてポチポチと実装していたけど、このエントリ によると2019/03/22に正式公開日が決まったらしいので仕上げて公開した。
(2019/3/22更新 MQTTv5の正式公開日が3/7となりました。 とあるけどググってもソースが見当たらない。どこにあるんだろう?)
リポジトリはこちら: ysugimoto/gqtt
一部新規機能を除いて Broker / Client 両方実装してある…が、認証系やRetainの保存にDB使えるようにしたいなど、まだ手を入れないといけない点はあるのでUnder Development。
MQTT5
MQTT5は以前のv3系に加えて User Property や Authnetication といった機構が新規に組み込まれてた、エラー処理が詳細になったなど、より使いやすくなっている印象。
とはいえ、これらのメッセージを載せないといけないのでMessage Specは拡張され、v3とは互換性が無い。v3系のメッセージも実装していたけど、思い切って外してv5だけの実装とした。
OASISによるSpecは MQTT Version 5.0 - OASIS Standard ここにある。
個人的には User Propery が期待できる。ユーザ定義のパラメータを載せることでアプリケーション固有の実装がしやすくなりそう。
すでに Mosquitto あたりが実装を公開している。
- [MQTT5 Test Relase - Eclipse Mosquitto]](https://mosquitto.org/blog/2019/02/mqtt5-test-release/)
Usage
Githubに書いてあるのを写しただけだけど、Brokerは
package main
import (
"context"
"github.com/ysugimoto/gqtt"
"github.com/ysugimoto/gqtt/message"
)
func main() {
server := gqtt.NewBroker(":9999")
ctx := context.Background()
// Start server inside goroutine
go server.ListenAndServe(ctx)
// Hooks of messages
for evt := range server.MessageEvent {
switch e := evt.(type) {
// Client subscribed
case *message.Subscribe:
log.Println("Received SUBSCRIBE event: ", e.GetType())
// Client connected
case *message.Connect:
log.Println("Received CONNECT event", e.GetType())
// Client published message
case *message.Publish:
log.Println("Received PUBLISH event", e.GetType())
}
}
<-ctx.Done()
}
で起動する。Brokerは基本的にクライアントとのコミュニケーションとメッセージ管理が主だけど、Subscribe / Publish / Connect あたりはフックイベントを用意してアプリケーションから操作できるようにしてある。 これに対してクライアントは、
package main
import (
"log"
"os"
"time"
"context"
"github.com/ysugimoto/gqtt"
"github.com/ysugimoto/gqtt/message"
)
func main() {
client := gqtt.NewClient("mqtt://localhost:9999")
defer client.Disconnect()
ctx := context.Background()
// Connect with authenticate
auth := gqtt.WithLoginAuth("admin", "admin")
if err := client.Connect(ctx, auth); err != nil {
log.Fatal(err)
}
// Subscribe topic
if err := client.Subscribe("gqtt/example", message.QoS2); err != nil {
log.Fatal(err)
}
ticker := time.NewTicker(3 * time.Second)
for {
select {
case <-client.Closed:
log.Println("connection closed")
return
case <-ctx.Done():
log.Println("context canceled")
return
case msg := <-client.Message:
log.Printf("published message received: %s\n", string(msg.Body))
case <-ticker.C:
log.Printf("message publish")
if err := client.Publish("gqtt/example", []byte("Hello, MQTT5!"), gqtt.WithQoS(message.QoS2)); err != nil {
return
}
}
}
}
という感じでConnect -> Authneticate -> Subscribe -> Publishするような実装ができる。
双方向TCP通信
Duplex TCP (と言って良いのかな?)のためにClient - Brokerを一つのTCP接続で送受信しているが、Goで実装している際にRead/Writeを愚直に回すと一部データが送信されないケースがあった。 これは自分の実装が悪いのかもしれないけど、送信/受信のタイミングで1ms程度Sleepを入れることで解決したけど、これは内部のシステムコールに関連していそうな気がしないでもなく、でも詳しく調べていない。
いつもこういう実装する時に困るけど、送信/受信でそれぞれTCP接続を張ったほうが良いんだろうか…ベストプラクティスを詳しい人に教えてもらいたいです。
まとめ
また一つ平成にやりかけで放置していたものを供養(公開)できてよかった。 仕様の決まっているプロトコルのEncode/Decodeの実装するのってめっちゃ楽しいですよね。テストもちゃんと書けるし。
現場からは以上です。