クラウド事業部エンジニアの川勝です。
前回 AWS IoT Core にRaspberry Piから赤外線モーションセンサのログを送るの続きになります。
目次
あらすじ
弊社の会議室の予定はgoogleカレンダーで管理されています。
使用者が随時カレンダーに登録しているのですが、急な予定や変更があった場合カレンダーと使用状況が一致しないこともあります。
今までは同一フロア内にあったのでちょっと見てくれば会議室に人がいるのかすぐわかったのですが、先月のオフィス増床(オフィス増床のお知らせ・会議室を移転しました!)につき今まですぐとなりにあった会議室が別室となったことでちょっと使用状況の確認が不便になってしまいました…
するとある日、CTOから人感センサーとかで人がいるか確認できるようにしたらいいんじゃない?という無茶振り依頼があったので作ってみることとなったのでした。
前回Raspberry Piから赤外線センサを使用してAWS IoT Coreにログを送ることができたので、今回はWEBブラウザで使用中かどうかを閲覧できるようにしていきたいと思います!
構成
最終的な構成はこうなりました。
各処理の説明の前に簡単に要件と全体の流れを説明します。
要件
- ブラウザで会議室の使用状態がわかる
- 使用状態は自動で更新される
- 状態は保持しない。(ブラウザ初回アクセス時は最初の使用状態メッセージが送信されてくるのを待機する)
全体の流れ
- AWS IoT に送信されてきたログはDynamoDBへ保存する
- DynamoDBストリームからLambdaを起動して使用状態を判定。判定方法は一定時間内にセンサONのログがきているか
- LambdaはDynamoDBからWebsocketのコネクション一覧を取得し、それらにメッセージを送信する。
- ブラウザはWebsocketでAPI Gatewayと接続し、メッセージを受信したら表示を更新する
仕様状態の判定条件について
今回使用しているセンサは検知ONの後、一定時間でOFF状態になります。そのためそのままON, OFFを単純に人がいる、いないと判定することができません。
そこでセンサがONになるたびAWS IoT にメッセージを送信しつつ、一定間隔でOFFメッセージも送信するようにしました。
そしてLambdaではDynamoDB ストリームから15秒毎に送信されてきたメッセージをチェックし、その中にセンサONがあれば使用中、OFFしかなけれな空きと判定することとしました。
ちなみにLambdaのストリームイベントの実行は新規メッセージが存在しないと実行されません。したがって今回の場合だと15秒間なにもデバイスからの送信がなければブラウザに空き状態であるというメッセージ送信ができません。
対策としてセンサの検知以外に15秒毎にOFFメッセージを送信するようにして毎回Lambdaが実行されるようにしました。
このあたりはセンサ側のプログラムで判定をしてもいいかとも思いますが、センサはできるだけそのままにしておいて、受け手側でハンドリングをしたかったのでこのようにしています。
このあたりはもっといい方法があるかもしれませんね。(そもそも赤外線センサじゃないほうがいいかもという話も…)
センサ側の最終的なプログラムはこちら。
https://github.com/kawakattsun/iot-motion-sensor-go/blob/master/cmd/iot-motion-sensor-go/main.go
定期的にOFFを送信しているのはこの辺です。
1go func() {
2 ticker := time.NewTicker(time.Second * 30)
3 defer ticker.Stop()
4 for {
5 select {
6 case <-ticker.C:
7 msg := fmt.Sprintf(messageTemplate, "off", time.Now().Format(time.RFC3339))
8 token := c.Publish(endpoint, 0, false, msg)
9 if token.Wait() && token.Error() != nil {
10 fmt.Printf("error: %+v", token.Error())
11 } else {
12 fmt.Print("Message Publish Success. timeTicker.\n")
13 }
14 }
15 }
16}()
goroutine起動してtickerをセットして30秒毎にOFFメッセージを送信させています。これでメインのセンサのメッセージと平行に処理させることができました。このあたりgoだと簡単にできていいですね。
それではAWS IoTからDynamoDBの処理から詳しく説明していきます。
AWS IoT Core からDynamoDBへメッセージ保存
AWS IoTではルールを作成しアクションを設定することで各種AWSのサービスまたは HTTPSで外部へ送信することができます。
まずは、ACT > ルール から新しいルールを作成します。
名前、概要は任意のものを入力します。
ポイントはルールクエリステートメントです。
これはAWS IoTに送信されたメッセージ(トピック)を取得するSQLになるのですが、実は前回はAWS側で予約されているトピックに送信していました。
https://docs.aws.amazon.com/ja_jp/iot/latest/developerguide/topics.html
$aws/things/<thingName>/shadow/update
こちらはルールからは取得できないトピックのようなのでセンサから送信するトピックを変更する必要があります。
プログラムだと以下のendpointという箇所で実行時の引数で設定しています。
例えば hoge/fuga に送信していれば、ルールクエリステートメントでは
SELECT * FROM ‘hoge/fuga’
で取得することができます。
続いてアクションの選択です。
「DynamoDBテーブルにメッセージを挿入する」を選択します。
その他のアクションについては以下を参照してください。
アクションの設定をします。
パーティションキーは必ず設定する必要があります。値は ${timestamp()} としておくとメッセージが作成されたtimestampが自動で挿入されます。
その他の定義されている関数などsqlの仕様はこちら
センサから送信されたメッセージは「この列にメッセージデータを書き込む」で設定したカラム名で保存されます。
その他テーブルを書き込むためのロールもこの画面で自動で作成ができるので簡単に設定することができます。
ちなみに当初はDynamoDBではなくAmazon Kinesis ストリームに流そうと思っていたのですが、Kinesisだとシャード/時間 で課金されるため1シャード作成しただけで課金がはじまってしまいます。
今回の用途だと大量のメッセージ送信は発生しないのでDynamoDB の低いキャパシティなら無料枠内でおさまるのでDynamoDBを選択しました。
さてこれでDynamoDBにデータ投入ができました。つづいてこのデータをつかってLambdaでハンドリングを実行できるようにします。
AWS Lambda メッセージのハンドリング
プログラムはこちら
https://github.com/kawakattsun/meeting-room-notify/blob/master/internal/handlers/iotmessage.go
1func IoTMessage(event events.DynamoDBEvent) error {
2 msg := sensorOff
3 doSendMessage := false
4 // バッチウィンドウ: 15
5 // バッチサイズ: 120
6 // としているのでevent.Recordsには最大120レコード入ってきます。
7 for _, r := range event.Records {
8 fmt.Printf("eventID: %s, eventName: %s, eventSourceARN: %s\n",
9 r.EventID,
10 r.EventName,
11 r.EventSourceArn,
12 )
13 // INSERT以外のeventも入っているのでswitchでさばきます。
14 // 特に処理したレコードは削除しているのでDELETEもいっぱい入ってます
15 switch r.EventName {
16 case "INSERT":
17 fmt.Print("Event execute.\n")
18 doSendMessage = true
19 item := r.Change.NewImage
20 fmt.Printf("item: %+v\n", item)
21 // リトライさせたくないのでレコードきたら最初に削除
22 if v, ok := item[detectedAtKey]; ok {
23 fmt.Printf("delete dynamodb record. table: %s, detected_at: %s\n", iotMessageTableName, v.String())
24 if err := dynamodb.Delete(iotMessageTableName, detectedAtKey, v.String()); err != nil {
25 fmt.Printf("error: delete dynamodb record. %+v\n", err)
26 }
27
28 }
29 // メッセージにONがきていたらdoSendMessageフラグ立てる
30 if v, ok := item[sensorKey]; ok {
31 sensor := v.Map()
32 fmt.Printf("sensor: %+v\n", sensor["sensor"].String())
33 if msg != sensorOn && sensor["sensor"].String() == sensorOn {
34 fmt.Print("Detected sensor.\n")
35 msg = sensorOn
36 }
37 }
38 default:
39 fmt.Print("Not executable event.\n")
40 }
41 }
42
43 // doSendMessageフラグが立っていたらメッセージ送信
44 if doSendMessage {
45 if err := sendMessage(msg); err != nil {
46 fmt.Printf("error: sendMessage %s. %+v\n", msg, err)
47 }
48 }
49
50 return nil
51}
ハンドリング箇所を抜粋すると上記になります。
ポイントとしてはLambdaからエラーはあえて返さないようにしています。ストリームイベントはエラーを返すとデータの有効期限が切れるまで無限にリトライされます。
今回の要件だと1メッセージを損失したところで問題はないのでリトライはさせないようにしています。
はじめちゃんとエラーを返していたらプログラムに問題があって必ずエラーになったときに延々とリトライされた…という対策でもありますw
実際は設定でリトライ回数はハンドリングできます。
- レコードの最長有効期間: 60
- 再試行: なし
このあたりの設定がエラーハンドリングに関わってきます。
あとはメッセージ送信などの後続の処理でなにかしらのエラーが起きない可能性はないため、処理後に対象レコードを削除していると処理対象レコードが残ったままになる可能性があるので最初に削除しています。
データ損失に問題がある要件だとこのあたりのエラーハンドリングはちゃんとしておかないといけないところですね。
つづいてAPI Gatewayにメッセージを送信している箇所です。
1func sendMessage(msg string) error {
2 // API Gatewayとのセッション作成
3 config := &aws.Config{
4 Region: aws.String(os.Getenv("AWS_REGION")),
5 }
6 newSession, err := session.NewSession(config)
7 if err != nil {
8 fmt.Print("error: New aws session.\n")
9 return err
10 }
11
12 svc := apigatewaymanagementapi.New(newSession)
13 svc.Endpoint = webSocketURI
14
15 // 現在Websocketで接続されているコネクションを取得
16 connections, err := repositories.GetAllConnection()
17 if err != nil {
18 fmt.Print("error: DynamoDB GetAllConnection.\n")
19 return err
20 }
21
22 // コネクション毎にメッセージを送信
23 for _, connection := range connections {
24 connectionID := connection.ConnectionID
25 _, err := svc.PostToConnection(&apigatewaymanagementapi.PostToConnectionInput{
26 ConnectionId: &connectionID,
27 Data: []byte(fmt.Sprintf(`{"message": "%s"}`, msg)),
28 })
29 if err != nil {
30 fmt.Printf("error: PostToConnection. %+v\n", err)
31 }
32 }
33
34 return nil
35}
API Gatewayに対してConnectionIDとDataを送信するだけでOKです。
簡単ですね。
API Gatewayについても少し触れておきます。
Amazon API Gateway のWebSocketを使う
API GatewayのWebsocketは初めて使ってみたのですが、非常に簡単で取り回しもしやすい印象でした。
protocolをWebsocketを選択して立ち上げて、接続と切断時に実行するLambdaを作成して設定します。connectionはデータベースに保存する必要があるのでDynamoDBを使用しました。
実装内容は下記のsampleのnode.jsを
goで書いたものになっています。
https://github.com/kawakattsun/meeting-room-notify/tree/master/internal/handlers
接続onconnect.goと切断ondisconnect.goです。
得につまったところはなかったのですが、「WebSocket API の接続時間」に「2時間」という制限があります。
https://docs.aws.amazon.com/ja_jp/apigateway/latest/developerguide/limits.html
ブラウザは最初に開いたらほったらかしの想定だったので、javascript側でwebsocketのconnectionが切れたら再接続する、、という実装が必要でした。
https://github.com/kawakattsun/meeting-room-notify-app/blob/master/src/App.tsx#L20
1 useEffect(() => {
2 const init = () => {
3 const ws = new WebSocket(wssEndpoint)
4 ws.onopen = () => {
5 console.log('info: ws connected.')
6 }
7 // メッセージを受信したらstateを更新
8 ws.onmessage = event => {
9 const response = JSON.parse(event.data)
10 if (response.message === 'on' || response.message === 'off') {
11 setAvailability(response.message)
12 }
13 }
14 // oncloseが呼ばれたら再接続
15 ws.onclose = event => {
16 console.log('info: ws closed. code:' + event.code)
17 setTimeout(() => { client = init() }, 3000);
18 }
19 return ws
20 }
21 let client = init()
22
23 return () => {
24 client.close()
25 };
26 }, []);
AWSリソースのデプロイ
API Gateway, Lambda, DynamoDBはまとめてsam-cliでデプロイしています。(構成図から抜けていますが…)
長くなってきたので中身の説明はしませんがtemplate.ymlはこのようになっています。
https://github.com/kawakattsun/meeting-room-notify/blob/master/template.yml
これでsam deployするだけで基本的にはOK。
ただAWS IoT Coreも組み込めたかもしれませんがそこだけ手動でつくっています。(なのでスクリーンショットが充実しているんですね)
ブラウザ表示のフロントアプリケーション
https://github.com/kawakattsun/meeting-room-notify-app
他のS3+Cloudfront アプリケーションはAWS CDKを使ったりしていたのですが、今回試しにAWS Amplify Console をつかってみました。
詳しい説明はまたの機会にしたいですが、Bitbucketと連携が可能でポチポチ設定していくだけでgit pushするとデプロイまでやってくれて非常に便利でした。
内部でCloudfrontも使用しているようなので、単純なS3+Cloudfrontな静的WEBホスティングであればAWS Amplify Consoleを使用するのがいいなと思いました。
Lambda使わなくてもBasic認証ができるのも利点です。(代わりにIP制限はできなさそう)
1点困ったことはビルド失敗時のログの出ている場所がすぐ見つけれられなかってちょっとハマってしまいましたw
設置
まとめ
実際この会議室使用状況表示が活用されるのかはともかくとしてw
使っていなかったAWSサービスを使用して活用の幅が広がった気がします。
今回の一連の作業で得た個人的な知見
- ストリームといえばKinesisというイメージを持っていたが、DynamoDBのストリームも使える。
- Lambdaでストリームイベントを使用するときは特にリトライをどうするか考えよう。
- API GatewayのWebsocketの構築が簡単。ただ制限は最初に確認しておこう。
- Amplify Consoleも簡単・便利!エラーログの確認の仕方を把握しておこう。
- goの並行処理が噂通りよい。最近go押し。
各種サービスの制限は使用前に目を通しておくべきですね。結構手をだしてから、制限くらって困るパターンはある気がします。
AWS IoT Coreに関して
IoTデバイスとサーバサイドの間にこのサービスがあることで、お互いが疎結合になるなという印象を持ちました。
デバイスからはAWS IoTに送信するだけ。あとどうするかはAWS IoTでハンドリング可能。
例えば今回の用にKinesisからDynamoDBへ変更すると行った場合でも、デバイス側の変更は必要なくAWS IoTの設定変更だけで実現できました。
サーバサイドからAWS IoTを介してデバイスにメッセージを送信も可能なので、また次の機会にそういった仕組みを使ったものも試したいと思います。