All Articles

MQTT5のGolang実装を公開した

MQTT5の仕様策定が進んでいたので実装した

随分前にMQTT5の仕様策定があることに気づいてポチポチと実装していたけど、このエントリ によると2019/03/22に正式公開日が決まったらしいので仕上げて公開した。

(2019/3/22更新 MQTTv5の正式公開日が3/7となりました。 とあるけどググってもソースが見当たらない。どこにあるんだろう?)

リポジトリはこちら: ysugimoto/gqtt

一部新規機能を除いて Broker / Client 両方実装してある…が、認証系やRetainの保存にDB使えるようにしたいなど、まだ手を入れないといけない点はあるのでUnder Development。

MQTT5

MQTT5は以前のv3系に加えて User PropertyAuthnetication といった機構が新規に組み込まれてた、エラー処理が詳細になったなど、より使いやすくなっている印象。

とはいえ、これらのメッセージを載せないといけないのでMessage Specは拡張され、v3とは互換性が無い。v3系のメッセージも実装していたけど、思い切って外してv5だけの実装とした。

OASISによるSpecは MQTT Version 5.0 - OASIS Standard ここにある。

個人的には User Propery が期待できる。ユーザ定義のパラメータを載せることでアプリケーション固有の実装がしやすくなりそう。

すでに Mosquitto あたりが実装を公開している。

Usage

Githubに書いてあるのを写しただけだけど、Brokerは

package main

import (
  "context"
  "github.com/ysugimoto/gqtt"
  "github.com/ysugimoto/gqtt/message"
)

func main() {
  server := gqtt.NewBroker(":9999")
  ctx := context.Background()
  // Start server inside goroutine
  go server.ListenAndServe(ctx)

  // Hooks of messages
  for evt := range server.MessageEvent {
    switch e := evt.(type) {
      // Client subscribed
      case *message.Subscribe:
        log.Println("Received SUBSCRIBE event: ", e.GetType())
          // Client connected
      case *message.Connect:
          log.Println("Received CONNECT event", e.GetType())
            // Client published message
      case *message.Publish:
            log.Println("Received PUBLISH event", e.GetType())
    }
  }
  <-ctx.Done()
}

で起動する。Brokerは基本的にクライアントとのコミュニケーションとメッセージ管理が主だけど、Subscribe / Publish / Connect あたりはフックイベントを用意してアプリケーションから操作できるようにしてある。 これに対してクライアントは、

package main

import (
  "log"
  "os"
  "time"
  "context"
  
  "github.com/ysugimoto/gqtt"
  "github.com/ysugimoto/gqtt/message"
)

func main() {
  client := gqtt.NewClient("mqtt://localhost:9999")
  defer client.Disconnect()

  ctx := context.Background()

  // Connect with authenticate
  auth := gqtt.WithLoginAuth("admin", "admin")
  if err := client.Connect(ctx, auth); err != nil {
    log.Fatal(err)
  }

  // Subscribe topic
  if err := client.Subscribe("gqtt/example", message.QoS2); err != nil {
    log.Fatal(err)
  }

  ticker := time.NewTicker(3 * time.Second)

  for {
    select {
    case <-client.Closed:
      log.Println("connection closed")
      return
    case <-ctx.Done():
      log.Println("context canceled")
      return
    case msg := <-client.Message:
      log.Printf("published message received: %s\n", string(msg.Body))
    case <-ticker.C:
      log.Printf("message publish")
      if err := client.Publish("gqtt/example", []byte("Hello, MQTT5!"), gqtt.WithQoS(message.QoS2)); err != nil {
        return
      }
    }
  }
}

という感じでConnect -> Authneticate -> Subscribe -> Publishするような実装ができる。

双方向TCP通信

Duplex TCP (と言って良いのかな?)のためにClient - Brokerを一つのTCP接続で送受信しているが、Goで実装している際にRead/Writeを愚直に回すと一部データが送信されないケースがあった。 これは自分の実装が悪いのかもしれないけど、送信/受信のタイミングで1ms程度Sleepを入れることで解決したけど、これは内部のシステムコールに関連していそうな気がしないでもなく、でも詳しく調べていない。

いつもこういう実装する時に困るけど、送信/受信でそれぞれTCP接続を張ったほうが良いんだろうか…ベストプラクティスを詳しい人に教えてもらいたいです。

まとめ

また一つ平成にやりかけで放置していたものを供養(公開)できてよかった。 仕様の決まっているプロトコルのEncode/Decodeの実装するのってめっちゃ楽しいですよね。テストもちゃんと書けるし。

現場からは以上です。