AWS IoT Greengrass Subscribe/Publish Component

markyang92·2021년 4월 23일
0

AWS

목록 보기
7/10

Subscribe Component

  • Subscribe Component = recipe + artifact
  • IoT Core(AWS server) <----> Greengrass(Greengrass Core)

recipe

# recipes/com.example.subscribe-1.0.0.json
{
  "RecipeFormatVersion": "2020-01-25",
  "ComponentName": "com.example.subscribe",	# 컴포넌트 이름
  "ComponentVersion": "1.0.0",
  "ComponentDescription": "Subscribe topic",
  "ComponentPublisher": "Amazon",
  "ComponentConfiguration": {
    "DefaultConfiguration": {
      "accessControl": {	
# accessControl: Authorization, IoT Core의 메시지 구독하기 위한 권한 부여 받음
# IoT Core와 Greengrass의 통신은 AWS Device SDK를 사용하며 이때 IPC 서비스 identifier는
# 아래의 aws.greengrass.ipc.mqttproxy이다.
        "aws.greengrass.ipc.mqttproxy": {
          "com.example.subscribe:pubsub:2": {
            "policyDescription": "Allows access to subscribe to my/topic.",
# operations은 구독(aws.greengrass#SubscribeToloTCore)와 
# 발행(aws.greengrass#PublishToloTCore)이 있으며 모두 허용하는 방식(*)도 있다.     
            "operations": [	
              "aws.greengrass#SubscribeToIoTCore" # 구독 컴포넌트 이므로 구독만 허용
            ],
            "resources": [
              "my/topic"
            ]
          }
        }
      }
    }
  },
  "Manifests": [
    {
      "Platform": {
        "os": "linux"
      },
      "Lifecycle": {
        "Run": "python3 -u {artifacts:path}/subscribe.py"
      },
      "Artifacts":[
        {
          "URI": "s3://s3 버킷 이름/artifacts/com.example.subscribe/1.0.0/subscribe.py"
        }
      ]
    }
  ]
}

artifact

main (subscribe.py)

# subscribe.py
import queue
import os
import socket
import sys
import time
import json
# === AWS IoT Device SDK내 AWS IoT Greengrass Core IPC 라이브러리 ===== #
import awsiot.greengrasscoreipc  
import awsiot.greengrasscoreipc.client as client
from awsiot.greengrasscoreipc.model import (
    IoTCoreMessage,
    QOS,
    SubscribeToIoTCoreRequest
)
# =================================================================== # 
queue = queue.Queue()	# 구독한 메시지를 저장하는 큐
TIMEOUT = 10
topic = "my/topic"	# 구독할 토픽
qos = QOS.AT_LEAST_ONCE	# MQTT QoS 설정
 
 
# Greengrass Core IPC 서비스에 대한 연결 생성
# IPC 클라이언트를 만들고 연결을 설정한다.
ipc_client = awsiot.greengrasscoreipc.connect()

# ================== 구독 request를 설정한다. ========================= #
request = SubscribeToIoTCoreRequest()
request.topic_name = topic
request.qos = qos
handler = StreamHandler()	# StreamHandler() 클래스

# ======= 지정한 topic에 대해 AWS IoT Core의 MQTT 메시지 구독 시작 ======= #
# ======== 컴포넌트의 life-cycle이 종료되면 구독이 제거 된다 ============== #
operation = ipc_client.new_subscribe_to_iot_core(handler)
future = operation.activate(request)
future.result(TIMEOUT)
# =================================================================== # 

# ========== 구독 컴포넌트의 life-cycle이 끝나지 않도록 한다. ============= #
# queue의 내용을 지속적으로 확인, 받은 메시지를 send_to_device() 함수로 전달  #

while True:
    if queue.qsize()>0:
        data = queue.get()
        send_to_device(data)

StreamHandler 클래스 및 로그 함수(subscribe.py)

class StreamHandler(client.SubscribeToIoTCoreStreamHandler):
    def __init__(self):
        super().__init__()
 
    def on_stream_event(self, event: IoTCoreMessage) -> None:
        global queue
        message = json.loads(event.message.payload.decode())	# IoT Core로 부터 받은 
        							#json 파일을 파싱하여
        queue.put(message)					# queue에 저장한다.
        logging('/tmp/Greengrass_Subscribe.log', f"Push Data {message} to Q")
 
    def on_stream_error(self, error: Exception) -> bool:
        logging('/tmp/Greengrass_Subscribe.log', "Error!")
        return True
 
    def on_stream_closed(self) -> None:
        pass
 
def logging(f_name, log):
    with open(f_name, 'a') as f:
        print(log, file=f, flush=True)

send_to_device() 함수(subscribe.py)

  • AWS IoT Core로 부터 넘어오는 JSON 데이터는 다음과 같은 형식과 의미를 갖는다.
    • `{"content": 1, "device_1": 1, "device_2": 1, "device_3": 0}
    • "content"의 value는 device가 출력할 이미지의 index를 의미한다.
    • "device_n"이 1인 경우 n번째 device의 화면에 해당 이미지를 띄우는 것을 의미하며, 0인 경우 변화가 없다.
def IP_map(i):
    if i == 1:
        return '10.177.xxx.xxx', 25380  # device 1's IP and PORT
    elif i == 2:
        return '10.177.xxx.xxx', 25381  # device 2's IP and PORT
    elif i == 3:
        return '10.177.xxx.xxx', 25382  # device 3's IP and PORT
    else:
        # Handle Error



def send_to_device(data):
    n = 3 # number of devices
# Greengrass core device에 IoT device가 3개 있다고 가정한다.
# - Core device에 연결되는 device 수를 늘리면 코드를 변경하고 재배포해야 하는 문제
# - 다른 device 인덱스 파일을 만들어 연결되는 전체 device 수와 각 device의 IP, PORT를
#   기입 하고, 본 함수에서 해당 파일을 읽어오도록 코드를 수정하면 이 문제를 해결할 수 있다.
# - 사용자는 device의 인덱스 파일만 수정하면 되므로 컴포넌트 코드를 수정할 필요가 없다.   



# 이미지를 변경할 device들을 검사하고 해당 device들의 IP와 PORT를 이용해 send_msg() 함수를 수행    
    for i in range(1,n+1):
        dev_key = "device_" + str(i)
        if dev_key in data and data[dev_key] == 1:
            IP, PORT = IP_map(i)
            send_msg(IP, PORT, data)
# ========================================================================== # 
 
def send_msg(HOST_IP, HOST_PORT, data):
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# device와 TCP/IP 소켓 통신을 하기 위해 클라이언트를 생성한다.
# "Subscribe" 컴포넌트가 client socket, device가 server socket이 된다.


# =========================== 서버(device) 연결 시도 ========================= #
    while True:
        try:
            client_socket.connect((HOST_IP, HOST_PORT))
            break
        except Exception as ex:
            logging('/tmp/Greengrass_Subscribe.log', f"Failed connect to server, {ex}")
            logging('/tmp/Greengrass_Subscribe.log', "retry after 3 sec")
            time.sleep(3)
 # ========================================================================= #
 
 
    client_socket.sendall(json.dumps(data).encode('utf-8'))
    # AWS IoT Core로 부터 받은 메시지를 device로 전달한다.
    
    logging('/tmp/Greengrass_Subscribe.log', f"Send Data {data} to RPI")
    reply = client_socket.recv(1024)
    reply = reply.decode()
    logging('/tmp/Greengrass_Subscribe.log', f"RPI reply {reply}")
    client_socket.close()

Publish component

  • Publish: Device들로 부터 topic을 받으면 이를 IoT Core에 발행하는 역할
    • Device들과 소켓 통신시 서버 소켓 역할
      • 여러 Device Client 처리를 위해 멀티스레드 서버 방식 사용

recipe

  • com.example.publish-1.0.0.json
{
  "RecipeFormatVersion": "2020-01-25",
  "ComponentName": "com.example.publish",
  "ComponentVersion": "1.0.0",
  "ComponentDescription": "publish topic to IoT Core.",
  "ComponentPublisher": "Amazon",
  "ComponentConfiguration": {
    "DefaultConfiguration": {
      "accessControl": {
        "aws.greengrass.ipc.mqttproxy": {	# IoT Core로 Topic 발행 권한 부여
          "com.example.IPC:pubsub:1": {
            "policyDescription": "Allows access to publish to device/data.",
            "operations": [
              "aws.greengrass#PublishToIoTCore"	# Publish!!!
            ],
            "resources": [
              "device/data"
            ]
          }
        }
      }
    }
  },
  "Manifests": [
    {
      "Platform": {
        "os": "linux"
      },
      "Lifecycle": {
        "Run": "python3 -u {artifacts:path}/publish.py"
      },
      "Artifacts":[
        {
          "URI": "s3://s3 버킷 이름/artifacts/com.example.publish/1.0.0/publish.py"
        }
      ]
    }
  ]
}

artifact

publish.py

import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (
    QOS,
    PublishToIoTCoreRequest
)
import socket
import threading
import time
import json
 
HOST = '10.177.xxx.xxx' # Core device's IP
PORT = 8888             # Core device's PORT
 
listen_device()

def listen_device():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_socket.bind((HOST,PORT))
    server_socket.listen(5)
    while True:
        client_socket, addr = server_socket.accept()
        t=threading.Thread(target=handle_client, args=(client_socket,addr))
        t.daemon = True
        t.start()
    server_socket.close()
    
def handle_client(client_socket, addr):
    ipc_client = awsiot.greengrasscoreipc.connect()
    while True:
        data = client_socket.recv(1024)
        if not data:
            logging('/tmp/Greengrass_Publish.log', 'disconnected')
            break
        client_socket.sendall(data) #Reply to Device
        data = json.loads(data.decode())
        logging('/tmp/Greengrass_Publish.log', f"Received Data:{data}")
        publish_to_iot_core(ipc_client, data)	# device로 부터 받은 걸
        					# IoT Core로 Publish!!
    client_socket.close()
 
def publish_to_iot_core(ipc_client, data):
    TIMEOUT = 10
    topic = "device/data"	# IoT 코어 <--> Greengrass Core 토픽
    qos = QOS.AT_LEAST_ONCE
    message = json.dumps(data).encode('utf-8')
    request = PublishToIoTCoreRequest()
    request.topic_name = topic
    request.payload = message
    request.qos = qos
    operation = ipc_client.new_publish_to_iot_core()
    operation.activate(request)
    future = operation.get_response()
    future.result(TIMEOUT)    
profile
pllpokko@alumni.kaist.ac.kr

0개의 댓글