All Articles

RabbitMQ + Paho-Golang-ClientでMQTTを始める

MQTT始めました

リアルタイムWebという言葉をたまに聞きますが、WebSocketから始まり、WebRTCもやったし、次はMQTTやろうと思ったので、環境構築と簡単な動作メモを残します。

MQTTって何?

他のエントリで詳しく書かれているので、そちらを参照してください。↓のリンクは大体の人が見てると思います。

MQTTについてのまとめ -そこはかとなく書くよん。

以前WebRTCの発表をした時にもワードは観測していたのですが、ここ最近良く聞く用になってきたし、モバイル向けの配信基盤として有用なんじゃないかなって思います。

今回は、MQTTブローカーとしてRabbitMQ、Publish/SubscribeクライアントとしてPahoのgolangクライアントを使いました。

Rabbit MQ

Paho

PahoはGolang以外にもJavaScriptやPython、C++もあるので、色んな環境で使えそうなクライアントですね。

ブローカーサーバはUbuntu14.10 amd64をVagrantで立てました。

RabbitMQでブローカーサーバを立てる

Vagrantの設定はおいておいて、起動したUbuntuインスタンスにRabbitMQをインストールします。ちゃんとInstallationのページに手法が書いてあります。

aptリポジトリへ追加

apt-get upgradeしたらリストに出てきたので、そのままインストールした。出ない場合はInstallationに従ってdebを追加しましょう。

$ sudo apt-get install rabbitmq-server

インストールすると、rabbitmq-serverrabbitmqctlrabbitmq-pluginコマンドが実行可能になります。

$ sudo rabbitmqctl

rabbitmq-server -detachでも起動できるけど、基本的にrabbitmqctlから管理するほうが良さそうな感じです。

MQTTの有効化と起動

RabbitMQにおけるMQTTはプラグインで動作可能になっているので、有効化しないといけないようです。

$ sudo rabbitmq-plugins enable rabbitmq_mqtt

ついでに状態監視につかえるWebインターフェイスも有効化しておくと良さそうです。

$ sudo rabbitmq-plugins enable rabbitmq_management

起動してみます。

$ sudo rabbitmqctl start_app
>> Starting node 'rabbit@vagrant-ubuntu-trusty' ...
...done.

起動しました。起動状況を確認してみます。

$ sudo netstat -lnp
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 0.0.0.0:25672           0.0.0.0:*               LISTEN      1013/beam
...
tcp        0      0 0.0.0.0:15672           0.0.0.0:*               LISTEN      1013/beam
tcp6       0      0 :::5672                 :::*                    LISTEN      1013/beam
...
tcp6       0      0 :::1883                 :::*                    LISTEN      1013/beam

beamというのがerlnagのデーモンプロセスのようで(詳しく知らない)、1883番ポートがMQTTをListenしてて、15672番ポートがWebインターフェースになります。アクセスしてみましょう:

ログイン画面が出ました。公式サイトによるとデフォルトアカウントはguest/guestらしいのですが、ログインできません。これは、

RabbitMQ3.3.xでweb管理ツールにログインできないときの対処法 -Qiita

の通り、localhostからの接続しかできなくなっているためのようです。なので、新規にリモートアクセスできるユーザを作成してやればいいみたいです。こちらもrabbitmqctlから実行。

// ユーザ名sugimoto、パスワードsugimotoで作成
$ sudo rabbitmqctl add_user sugimoto sugimoto
// 権限設定
$ sudo rabbitmqctl set_permissions sugimoto ".*" ".*" ".*"
// ロール指定
$ sudo rabbitmqctl set_user_tags sugimoto administrator

これでWebインターフェースにログインできるようになります。ログイン後の画面がこちら。

これでブローカー側はひと通り準備OKです。

PahoのGolangクライアントを使ってブローカーに接続する

PahoのGolangクライアントを使います。go getでインストール。

Paho Go Client

$ go get git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git

あとはPub/Subのコードをサンプルに従って書け…ばいいのですが、サンプルのコードはバージョンが古いのか、一部インターフェースが変わっててそのままでは動かせませんでした。で、書いたのは以下のコード。

publish.go

package main

import (
    "fmt"
    MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
    "time"
)

func main() {

    opts := MQTT.NewClientOptions()
    opts.AddBroker("tcp://192.168.33.10:1883")
    opts.SetUsername("sugimoto")
    opts.SetPassword("sugimoto")

    client := MQTT.NewClient(opts)
    if _, err := client.Start(); err != nil {
        fmt.Printf("%v¥n", err)
        return
    }

    i := 0

    for {
        timer := time.NewTimer(time.Second)
        <-timer.C

        i++

        fmt.Printf("Ticker executed. Run times: %d¥n", i)

        message := MQTT.NewMessage([]byte(fmt.Sprintf("Hello paho, ticker:%d", i)))
        message.SetQoS(MQTT.QOS_ZERO)

        receipt := client.PublishMessage("example/topic", message)
        <-receipt
    }
}

接続IPはローカルのIP:Portで、ユーザはさっき作ったものを指定しています。これで1秒ごとにQoS0でtickしてメッセージをexample/topic向けにPublishします。 次はこれを受け取るSubscrier側。

subscribe.go

package main

import (
    "fmt"
    MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
    "time"
)

var f MQTT.MessageHandler = func(client *MQTT.MqttClient, msg MQTT.Message) {
    fmt.Printf("TOPIC: %s¥n", msg.Topic())
    fmt.Printf("MSG: %s¥n", msg.Payload())
}

func main() {

    opts := MQTT.NewClientOptions()
    opts.AddBroker("tcp://192.168.33.10:1883")
    opts.SetUsername("sugimoto")
    opts.SetPassword("sugimoto")

    client := MQTT.NewClient(opts)
    if _, err := client.Start(); err != nil {
        fmt.Printf("%v¥n", err)
        return
    }

    filter, _ := MQTT.NewTopicFilter("example/topic", 0)
    if _, err := client.StartSubscription(f, filter); err != nil {
        panic(err)
    } else {
        println("Wainting message...")
    }

    for {
        time.Sleep(1 * time.Nanosecond)
    }
}

subscribe側は接続を切ってしまうといけないので、time.Sleepでループしてメッセージを待ち受けます。では実行してみましょう。

右ペインで起動したPublisherが1秒ごとにメッセージを送信して、左ペインで待ち受けているSubscriberが受信してますね!

これで基本の接続ができました。あとはこれをQoSを変えてみたり、トピックを指定したものだけを受信したり…というカスタマイズができそうです。

このエントリのサンプルファイルはGithubに上げましたので、よければ参考にどうぞ。

ysugimoto/mqtt-example

まとめ

MQTTの基本的なブローカーサーバの立ち上げからメッセージの送受信までをやってみました。思ってたよりも簡単でした。 PCでやってるならWebSocket接続とあまり変わりないですが、モバイル環境だともっといい感じできるのかな。

あと、PahoにはObjective-CとかSwiftのクライアントがないので、内部の挙動を勉強してクライアント作ってみようかなって思います。 C++があるのでバインディング作れば動かせそうですけどね。

またひとつリアルタイム通信の手法を学びました。 現場からは以上です。