Subscribe Component
- Subscribe Component = recipe + artifact

- IoT Core(AWS server) <----> Greengrass(Greengrass Core)
recipe
# recipes/com.example.subscribe-1.0.0.json
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.subscribe", # 컴포넌트 이름
"ComponentVersion": "1.0.0",
"ComponentDescription": "Subscribe topic",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
# accessControl: Authorization, IoT Core의 메시지 구독하기 위한 권한 부여 받음
# IoT Core와 Greengrass의 통신은 AWS Device SDK를 사용하며 이때 IPC 서비스 identifier는
# 아래의 aws.greengrass.ipc.mqttproxy이다.
"aws.greengrass.ipc.mqttproxy": {
"com.example.subscribe:pubsub:2": {
"policyDescription": "Allows access to subscribe to my/topic.",
# operations은 구독(aws.greengrass#SubscribeToloTCore)와
# 발행(aws.greengrass#PublishToloTCore)이 있으며 모두 허용하는 방식(*)도 있다.
"operations": [
"aws.greengrass#SubscribeToIoTCore" # 구독 컴포넌트 이므로 구독만 허용
],
"resources": [
"my/topic"
]
}
}
}
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Lifecycle": {
"Run": "python3 -u {artifacts:path}/subscribe.py"
},
"Artifacts":[
{
"URI": "s3://s3 버킷 이름/artifacts/com.example.subscribe/1.0.0/subscribe.py"
}
]
}
]
}
artifact
main (subscribe.py)
import queue
import os
import socket
import sys
import time
import json
import awsiot.greengrasscoreipc
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
IoTCoreMessage,
QOS,
SubscribeToIoTCoreRequest
)
queue = queue.Queue()
TIMEOUT = 10
topic = "my/topic"
qos = QOS.AT_LEAST_ONCE
ipc_client = awsiot.greengrasscoreipc.connect()
request = SubscribeToIoTCoreRequest()
request.topic_name = topic
request.qos = qos
handler = StreamHandler()
operation = ipc_client.new_subscribe_to_iot_core(handler)
future = operation.activate(request)
future.result(TIMEOUT)
while True:
if queue.qsize()>0:
data = queue.get()
send_to_device(data)
StreamHandler 클래스 및 로그 함수(subscribe.py)
class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
def __init__(self):
super().__init__()
def on_stream_event(self, event: IoTCoreMessage) -> None:
global queue
message = json.loads(event.message.payload.decode())
queue.put(message)
logging('/tmp/Greengrass_Subscribe.log', f"Push Data {message} to Q")
def on_stream_error(self, error: Exception) -> bool:
logging('/tmp/Greengrass_Subscribe.log', "Error!")
return True
def on_stream_closed(self) -> None:
pass
def logging(f_name, log):
with open(f_name, 'a') as f:
print(log, file=f, flush=True)
send_to_device() 함수(subscribe.py)
- AWS IoT Core로 부터 넘어오는 JSON 데이터는 다음과 같은 형식과 의미를 갖는다.
- `{"content": 1, "device_1": 1, "device_2": 1, "device_3": 0}
- "content"의 value는 device가 출력할 이미지의 index를 의미한다.
- "device_n"이 1인 경우 n번째 device의 화면에 해당 이미지를 띄우는 것을 의미하며, 0인 경우 변화가 없다.
def IP_map(i):
if i == 1:
return '10.177.xxx.xxx', 25380
elif i == 2:
return '10.177.xxx.xxx', 25381
elif i == 3:
return '10.177.xxx.xxx', 25382
else:
def send_to_device(data):
n = 3
for i in range(1,n+1):
dev_key = "device_" + str(i)
if dev_key in data and data[dev_key] == 1:
IP, PORT = IP_map(i)
send_msg(IP, PORT, data)
def send_msg(HOST_IP, HOST_PORT, data):
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
while True:
try:
client_socket.connect((HOST_IP, HOST_PORT))
break
except Exception as ex:
logging('/tmp/Greengrass_Subscribe.log', f"Failed connect to server, {ex}")
logging('/tmp/Greengrass_Subscribe.log', "retry after 3 sec")
time.sleep(3)
client_socket.sendall(json.dumps(data).encode('utf-8'))
logging('/tmp/Greengrass_Subscribe.log', f"Send Data {data} to RPI")
reply = client_socket.recv(1024)
reply = reply.decode()
logging('/tmp/Greengrass_Subscribe.log', f"RPI reply {reply}")
client_socket.close()
Publish component

- Publish: Device들로 부터 topic을 받으면 이를 IoT Core에 발행하는 역할
- Device들과 소켓 통신시 서버 소켓 역할
- 여러 Device Client 처리를 위해 멀티스레드 서버 방식 사용
recipe
- com.example.publish-1.0.0.json
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.publish",
"ComponentVersion": "1.0.0",
"ComponentDescription": "publish topic to IoT Core.",
"ComponentPublisher": "Amazon",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.IPC:pubsub:1": {
"policyDescription": "Allows access to publish to device/data.",
"operations": [
"aws.greengrass#PublishToIoTCore"
],
"resources": [
"device/data"
]
}
}
}
}
},
"Manifests": [
{
"Platform": {
"os": "linux"
},
"Lifecycle": {
"Run": "python3 -u {artifacts:path}/publish.py"
},
"Artifacts":[
{
"URI": "s3://s3 버킷 이름/artifacts/com.example.publish/1.0.0/publish.py"
}
]
}
]
}
artifact
publish.py
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
QOS,
PublishToIoTCoreRequest
)
import socket
import threading
import time
import json
HOST = '10.177.xxx.xxx'
PORT = 8888
listen_device()
def listen_device():
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((HOST,PORT))
server_socket.listen(5)
while True:
client_socket, addr = server_socket.accept()
t=threading.Thread(target=handle_client, args=(client_socket,addr))
t.daemon = True
t.start()
server_socket.close()
def handle_client(client_socket, addr):
ipc_client = awsiot.greengrasscoreipc.connect()
while True:
data = client_socket.recv(1024)
if not data:
logging('/tmp/Greengrass_Publish.log', 'disconnected')
break
client_socket.sendall(data)
data = json.loads(data.decode())
logging('/tmp/Greengrass_Publish.log', f"Received Data:{data}")
publish_to_iot_core(ipc_client, data)
client_socket.close()
def publish_to_iot_core(ipc_client, data):
TIMEOUT = 10
topic = "device/data"
qos = QOS.AT_LEAST_ONCE
message = json.dumps(data).encode('utf-8')
request = PublishToIoTCoreRequest()
request.topic_name = topic
request.payload = message
request.qos = qos
operation = ipc_client.new_publish_to_iot_core()
operation.activate(request)
future = operation.get_response()
future.result(TIMEOUT)