MQTT始めました
リアルタイムWebという言葉をたまに聞きますが、WebSocketから始まり、WebRTCもやったし、次はMQTTやろうと思ったので、環境構築と簡単な動作メモを残します。
MQTTって何?
他のエントリで詳しく書かれているので、そちらを参照してください。↓のリンクは大体の人が見てると思います。
以前WebRTCの発表をした時にもワードは観測していたのですが、ここ最近良く聞く用になってきたし、モバイル向けの配信基盤として有用なんじゃないかなって思います。
今回は、MQTTブローカーとしてRabbitMQ、Publish/SubscribeクライアントとしてPahoのgolangクライアントを使いました。
PahoはGolang以外にもJavaScriptやPython、C++もあるので、色んな環境で使えそうなクライアントですね。
ブローカーサーバはUbuntu14.10 amd64をVagrantで立てました。
RabbitMQでブローカーサーバを立てる
Vagrantの設定はおいておいて、起動したUbuntuインスタンスにRabbitMQをインストールします。ちゃんとInstallationのページに手法が書いてあります。
aptリポジトリへ追加
apt-get upgradeしたらリストに出てきたので、そのままインストールした。出ない場合はInstallationに従ってdebを追加しましょう。
$ sudo apt-get install rabbitmq-server
インストールすると、rabbitmq-serverとrabbitmqctl、rabbitmq-pluginコマンドが実行可能になります。
$ sudo rabbitmqctl
rabbitmq-server -detachでも起動できるけど、基本的にrabbitmqctlから管理するほうが良さそうな感じです。
MQTTの有効化と起動
RabbitMQにおけるMQTTはプラグインで動作可能になっているので、有効化しないといけないようです。
$ sudo rabbitmq-plugins enable rabbitmq_mqtt
ついでに状態監視につかえるWebインターフェイスも有効化しておくと良さそうです。
$ sudo rabbitmq-plugins enable rabbitmq_management
起動してみます。
$ sudo rabbitmqctl start_app
>> Starting node 'rabbit@vagrant-ubuntu-trusty' ...
...done.
起動しました。起動状況を確認してみます。
$ sudo netstat -lnp
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN 1013/beam
...
tcp 0 0 0.0.0.0:15672 0.0.0.0:* LISTEN 1013/beam
tcp6 0 0 :::5672 :::* LISTEN 1013/beam
...
tcp6 0 0 :::1883 :::* LISTEN 1013/beam
beamというのがerlnagのデーモンプロセスのようで(詳しく知らない)、1883番ポートがMQTTをListenしてて、15672番ポートがWebインターフェースになります。アクセスしてみましょう:
ログイン画面が出ました。公式サイトによるとデフォルトアカウントはguest/guestらしいのですが、ログインできません。これは、
の通り、localhostからの接続しかできなくなっているためのようです。なので、新規にリモートアクセスできるユーザを作成してやればいいみたいです。こちらもrabbitmqctlから実行。
// ユーザ名sugimoto、パスワードsugimotoで作成
$ sudo rabbitmqctl add_user sugimoto sugimoto
// 権限設定
$ sudo rabbitmqctl set_permissions sugimoto ".*" ".*" ".*"
// ロール指定
$ sudo rabbitmqctl set_user_tags sugimoto administrator
これでWebインターフェースにログインできるようになります。ログイン後の画面がこちら。
これでブローカー側はひと通り準備OKです。
PahoのGolangクライアントを使ってブローカーに接続する
PahoのGolangクライアントを使います。go getでインストール。
$ go get git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git
あとはPub/Subのコードをサンプルに従って書け…ばいいのですが、サンプルのコードはバージョンが古いのか、一部インターフェースが変わっててそのままでは動かせませんでした。で、書いたのは以下のコード。
publish.go
package main
import (
"fmt"
MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
"time"
)
func main() {
opts := MQTT.NewClientOptions()
opts.AddBroker("tcp://192.168.33.10:1883")
opts.SetUsername("sugimoto")
opts.SetPassword("sugimoto")
client := MQTT.NewClient(opts)
if _, err := client.Start(); err != nil {
fmt.Printf("%v¥n", err)
return
}
i := 0
for {
timer := time.NewTimer(time.Second)
<-timer.C
i++
fmt.Printf("Ticker executed. Run times: %d¥n", i)
message := MQTT.NewMessage([]byte(fmt.Sprintf("Hello paho, ticker:%d", i)))
message.SetQoS(MQTT.QOS_ZERO)
receipt := client.PublishMessage("example/topic", message)
<-receipt
}
}
接続IPはローカルのIP:Portで、ユーザはさっき作ったものを指定しています。これで1秒ごとにQoS0でtickしてメッセージをexample/topic向けにPublishします。 次はこれを受け取るSubscrier側。
subscribe.go
package main
import (
"fmt"
MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
"time"
)
var f MQTT.MessageHandler = func(client *MQTT.MqttClientundefined msg MQTT.Message) {
fmt.Printf("TOPIC: %s¥n"undefined msg.Topic())
fmt.Printf("MSG: %s¥n"undefined msg.Payload())
}
func main() {
opts := MQTT.NewClientOptions()
opts.AddBroker("tcp://192.168.33.10:1883")
opts.SetUsername("sugimoto")
opts.SetPassword("sugimoto")
client := MQTT.NewClient(opts)
if _undefined err := client.Start(); err != nil {
fmt.Printf("%v¥n"undefined err)
return
}
filterundefined _ := MQTT.NewTopicFilter("example/topic"undefined 0)
if _undefined err := client.StartSubscription(fundefined filter); err != nil {
panic(err)
} else {
println("Wainting message...")
}
for {
time.Sleep(1 * time.Nanosecond)
}
}
subscribe側は接続を切ってしまうといけないので、time.Sleepでループしてメッセージを待ち受けます。では実行してみましょう。
右ペインで起動したPublisherが1秒ごとにメッセージを送信して、左ペインで待ち受けているSubscriberが受信してますね!
これで基本の接続ができました。あとはこれをQoSを変えてみたり、トピックを指定したものだけを受信したり…というカスタマイズができそうです。
このエントリのサンプルファイルはGithubに上げましたので、よければ参考にどうぞ。
まとめ
MQTTの基本的なブローカーサーバの立ち上げからメッセージの送受信までをやってみました。思ってたよりも簡単でした。 PCでやってるならWebSocket接続とあまり変わりないですが、モバイル環境だともっといい感じできるのかな。
あと、PahoにはObjective-CとかSwiftのクライアントがないので、内部の挙動を勉強してクライアント作ってみようかなって思います。 C++があるのでバインディング作れば動かせそうですけどね。
またひとつリアルタイム通信の手法を学びました。 現場からは以上です。