빅데이터 Java 개발자 교육 - 13일차 [MQTT]

Jun_Gyu·2023년 2월 26일
0
post-thumbnail

오늘은 수업 시작 전 Maven 한개를 추가하였다.

<dependency>
		<groupId>org.eclipse.paho</groupId>
		<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
		<version>1.2.5</version>
	</dependency>

위의 라이브러리는 MQTT 기능을 사용하고자 추가한 라이브러리이다.

여기서

MQTT란?

MQTT

출처

정의

MQTT는 머신 대 머신 통신에 사용되는 표준 기반 메시징 프로토콜 또는 규칙 세트이다. 스마트 센서, 웨어러블 및 기타 사물 인터넷(IoT) 디바이스는 일반적으로 리소스 제약이 있는 네트워크를 통해 제한된 대역폭으로 데이터를 전송하고 수신해야 한다. 이러한 IoT 디바이스는 MQTT를 데이터 전송에 사용하는데, 구현이 쉽고 IoT 데이터를 효율적으로 전달할 수 있기 때문이다. MQTT는 디바이스에서 클라우드로, 클라우드에서 디바이스로의 메시징을 지원한다.


그렇다면,

MQTT에는 어떤 요소들이 있을까?

1. MQTT 클라이언트

MQTT 클라이언트는 서버부터 MQTT 라이브러리를 실행하는 마이크로컨트롤러에 이르는 모든 디바이스가 될 수 있다. 클라이언트는 메시지를 보내는 경우 게시자 역할을, 메시지를 수신하는 경우 수신자 역할을 한다. 기본적으로 네트워크를 통해 MQTT를 사용하여 통신하는 모든 디바이스를 MQTT 클라이언트 디바이스라고 할 수 있다.

2. MQTT 브로커

MQTT 브로커는 여러 클라이언트 간의 메시지를 조정하는 백엔드 시스템이다. 브로커는 메시지 수신 및 필터링, 각 메시지를 구독하는 클라이언트 식별, 메시지 전송 등과 같은 작업을 담당한다. 또한 다음과 같은 다른 테스크도 처리한다.

3. MQTT 연결

MQTT 브로커는 여러 클라이언트 간의 메시지를 조정하는 백엔드 시스템이다. 브로커는 메시지 수신 및 필터링, 각 메시지를 구독하는 클라이언트 식별, 메시지 전송 등과 같은 작업을 담당한다. 또한 다음과 같은 다른 테스크도 처리한다.

  • MQTT 클라이언트 권한 부여 및 인증
  • 추가 분석을 위해 다른 시스템으로 메시지 전달
  • 누락된 메시지 및 클라이언트 세션 처리

4. MQTT 연결

클라이언트와 브로커는 MQTT 연결을 사용하여 통신을 시작한다. 클라이언트는 CONNECT 메시지를 MQTT 브로커로 보내 연결을 시작한다. 브로커는 CONNACK 메시지로 응답하여 연결이 설정되었음을 확인한다. MQTT 클라이언트와 브로커는 모두 TCP/IP 스택이 있어야 통신할 수 있다. 클라이언트는 서로 연결되지 않으며 브로커에만 연결한다.


현재 실생활에서 접할수 있는 여러 SNS서비스 그리고 엔터테인먼트 서비스들이다. 위의 사진과 같은 네트워크 방식들을 가용하고 있다.

그렇다면 오늘 위의 라이브러리를 이용하여 실습을 채팅을 할 수 있는 프로그램을 만들어보도록 하자.


먼저 8일차의 Config 클래스에 몇가지 내용을 추가해보자. 아래와 같이 미리 사용할 브로커 주소와 ID, PW를 저장해준다.

그리고 채팅기능을 활성화할 수 있도록 클래스를 만들어준다.

오늘은 실습 내용이 좀 많은 관계로 주석에 있는 설명들로 대신하겠다.

package day13;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.json.JSONObject;

import day8.Config;

public class ChatClient implements MqttCallback { // 채팅클래스

	private MqttClient client = null;
	private ItemDB iDB = new ItemDBImpl(); // items 컬렉션 연동 객체 생성

	public ChatClient() { // 생성자, 서버 접속
		try {
			String clientid = "/*DB아이디*/" + System.currentTimeMillis();
			this.client = new MqttClient(Config.BROKER, clientid);

			MqttConnectOptions options = new MqttConnectOptions();
			options.setUserName(Config.CONNECTID);
			options.setPassword(Config.CONNECTPW.toCharArray());
			options.setCleanSession(true);
			options.setKeepAliveInterval(30);

			this.client.connect(options);
			this.client.setCallback(this);
			System.out.println("접속성공 => " + client);
		} catch (Exception e) {
			e.printStackTrace();
			System.out.println("접속실패");
		}
	}

	// 구독설정 (해당 주소로 오는 내용을 확인이 가능.)
	public boolean setSubscribe() {
		try {
			client.subscribe("/*정보를 받고자 하는 주소*/");
			return true;
		} catch (Exception e) {
			return false;
		}
	}

	// 메시지 보내기(보낼토픽, 메시지)
	public boolean sendMessage(String topic, String msg) {
		try {
			MqttMessage message = new MqttMessage();
			message.setPayload(msg.getBytes());
			client.publish(topic, message);
			return true;
		} catch (Exception e) {
			e.printStackTrace();
			return false;
		}
	}

	@Override
	public void connectionLost(Throwable cause) {
		System.out.println("connectionLost");
	}

	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		try {
			// MqttMessage타입을 String 타입으로 변환 ( 변환된 String 타입을 DB로 전송하기 위함 )
			byte[] tmp = message.getPayload();
			String str = new String(tmp);

			// {"name" : "물품명", "content" : "물품내용", "price":1234, "quantity":1234};
			// price와 quantity의 경우 DB에 Int64(long타입)으로 저장되어 있기 때문에 long으로 불러올것.
			JSONObject jobj = new JSONObject(str); //받아온 문자에서 각각의 항목으로 분리.
			String name = jobj.getString("name");
			String content = jobj.getString("content");
			long price = jobj.getLong("price");
			long quantity = jobj.getLong("quantity");

			// map에 데이터 담기 (interface의 insertItemMap의 형태가 map이기 때문.)
			Map<String, Object> map = new HashMap<>(); // 분리된 항목을 map에 담음.
			map.put("name", name); 
			map.put("content", content);
			map.put("price", price);
			map.put("quantity", quantity);

			// 데이터 베이스에 추가하기
			int ret = iDB.insertItemMap(map); // (interface의) insertItemMap로 결과를 보냄.
			System.out.println("DB추가 유무 = > " + ret); // 자료가 올라가면 1 표시.

			// 데이터 베이스에서 조회
			List<Map<String, Object>> list = iDB.selecItemListMap(ret); // 
			for(Map<String, Object> map1 : list){
				System.out.println(map1.get("_id"));
				System.out.println(map1.get("name"));
				System.out.println("---------------------------");
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		}

		// System.out.println("토픽 => " + topic + "메시지 =>" + str);
	}

	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
		System.out.println("deliveryComplete");
	}
}

이후 메인에서의 코드를 넣고 수행을 시키게 되면

package day13;

import java.util.Scanner;

import org.json.JSONObject;

public class Main {

	public static void main(String[] args) {
		try {
			ChatClient chat = new ChatClient(); // 서버접속
			chat.setSubscribe(); // 구독설정

			Scanner sc = new Scanner(System.in); // 내용입력
			while (true) { // 무한반복
				System.out.print("보낼 메세지 입력 : ");
				String msg = sc.nextLine();
				if (msg.equals("exit")) {
					break; // 반복문 탈출
				}

				// ex)가나다
				JSONObject jobj = new JSONObject();
				jobj.put("name", msg);
				jobj.put("content", "내용들...");
				jobj.put("price", 1111);
				jobj.put("quantity", 1234);

				// {"name":"입력내용", "type":1}
				// {"name" : "물품명", "content" : "물품내용", "price":1234, "quantity":1234};

				
				// JSONOBject = > String => byte[] ~~~byte[] => String=>
				chat.sendMessage("/*메세지를 보내고자 하는 주소*/", jobj.toString());
				Thread.sleep(200); // 0.2초 기다림

			}
			sc.close();
			System.exit(0);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

이렇게 같은 서버에 접속한 사용자들과 함께 채팅을 이용할 수 있다. 아래 소스코드는 지금까지 배운것들을 총합해서 복습을 한 예제들이다.

package day13;

import java.util.List;
import java.util.Map;

public interface ItemDB { // 인터페이스 페이지

	// 물품 전체 목록
	public List<Map<String, Object>> selecItemListMap(int n);
	public List<Item> selectItemList(int n);
	
	// 물품 등록
	public int insertItemMap(Map<String, Object> map);
	
	// 물품에서 한개 삭제 ex)1004
	public int deleteItemOne(long no);
	
	// 물품 n개 삭제 ex){1004, 1007, 1010}
	public int deleteItemMany(long[] no);
	
	// 물품 수정 (name, content, price, quantity)를 변경하기
	public int updateItemOne(Item item);
}
package day13;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.bson.Document;
import org.bson.conversions.Bson;

import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.client.result.UpdateResult;

import day8.Config;

public class ItemDBImpl implements ItemDB {

	private MongoCollection<Document> sequence = null;
	private MongoCollection<Document> items = null;

	public ItemDBImpl() {
		try {
			this.sequence = MongoClients.create(Config.URL).getDatabase(Config.DBNAME)
					.getCollection(Config.RESEQUENCECOL);
			this.items = MongoClients.create(Config.URL).getDatabase(Config.DBNAME).getCollection(Config.ITEMCOL);

		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	@Override
	public List<Map<String, Object>> selecItemListMap(int n) {
		try {
			Bson sort = Filters.eq("_id", -1);
			FindIterable<Document> docs = this.items.find().sort(sort).limit(n);

			List<Map<String, Object>> list = new ArrayList<>();
			for (Document doc : docs) {
				Map<String, Object> map = new HashMap<>();
				map.put("_id", doc.getLong("_id"));
				map.put("name", doc.getString("name"));
				map.put("content", doc.getString("content"));
				map.put("price", doc.getLong("price"));
				map.put("quantity", doc.getLong("quantity"));
				map.put("regdate", doc.getDate("regdate"));

				// doc 를 map으로 변환하기
				list.add(map);
			}
			return list;
		} catch (Exception e) {
			e.printStackTrace();
			return null;
		}
	}

	@Override
	public int insertItemMap(Map<String, Object> map) { // 전달받은 map 값을 대입.
		try {
			// items의 시퀀스값을 높여주기 위함.
			Bson filter = Filters.eq("_id", "SEQ_ITEMS_NO");
			Bson update = Updates.inc("idx", 1);
			Document doc = this.sequence.findOneAndUpdate(filter, update);
			long itemCode = (long) doc.getInteger("idx"); // 현재 DB에 저장된 idx값이 int타입이기 때문에 변환이 필요.
// -------------------------------------------------------------------------
			Document saveDoc = new Document();
			saveDoc.append("_id", itemCode); // 시퀀스에서 꺼낸값
			saveDoc.append("name", map.get("name"));
			saveDoc.append("content", map.get("content"));
			saveDoc.append("price", map.get("price"));
			saveDoc.append("quantity", map.get("quantity"));
			saveDoc.append("regdate", new Date()); // 현재시간
// -------------------------------------------------------------------------
			InsertOneResult result = this.items.insertOne(saveDoc); // 저장된 위의 정보들을 DB에 저장.
			if (result.getInsertedId().asInt64().getValue() == itemCode) {
				return 1;
			}
			return 0;
		} catch (Exception e) {
			e.printStackTrace();
			return -1;

		}

	}

	@Override
	public int deleteItemOne(long no) {
		try {
			Bson filter = Filters.eq("_id", no);
			DeleteResult result = this.items.deleteOne(filter);

			return 1;
		}

		catch (Exception e) {
			e.printStackTrace();
			return -1;
		}
	}

	@Override
	public int deleteItemMany(long[] no) {
		try {
			int cnt = 0;

			// long[] no = {1,2,5,6}
			for (int i = 0; i < no.length; i++) {
				Bson filter = Filters.eq("_id", no[i]);
				DeleteResult result = this.items.deleteOne(filter);
				cnt += result.getDeletedCount();
			}
			// 삭제한 갯수 == 배열의 갯수
			if (cnt == no.length) {
				return 1;
			}
			return 0;
		} catch (Exception e) {
			e.printStackTrace();
			return -1;
		}
	}

	@Override // 물품 수정 (name, content, price, quantity)를 변경하기
	public int updateItemOne(Item item) {
		try {
			int cnt = 0;
			for (int i = 0; i < item.getNo(); i++) {
//		     	Document doc = new Document();
				Bson filter = Filters.eq("_id", item.getNo());
				Bson update1 = Updates.set("name", item.getName());
				Bson update2 = Updates.set("content", item.getContent());
				Bson update3 = Updates.set("price", item.getPrice());
				Bson update4 = Updates.set("quantity", item.getQuantity());
				Bson update = Updates.combine(update1, update2, update3, update4);
				UpdateResult result = this.items.updateOne(filter, update);
				if (result.getModifiedCount() == 1) {
					return 1;
				}
			}
			return 0;
		} catch (Exception e) {
			e.printStackTrace();
			return -1;
		}
	}

	// n개 목록 불러오기
	@Override
	public List<Item> selectItemList(int n) {
		try {
			Bson sort = Filters.eq("_id", -1);
			FindIterable<Document> docs = this.items.find().sort(sort).limit(n);

			List<Item> list = new ArrayList<>();
			for (Document doc : docs) {
				Item item = new Item();
				item.setNo(doc.getLong("_id"));
				item.setName(doc.getString("name"));
				item.setContent(doc.getString("content"));
				item.setPrice(doc.getLong("price").intValue());
				item.setQuantity(doc.getLong("quantity").intValue());
				item.setDate(doc.getDate("regdate"));
				// item 을 list로 변환하기
				list.add(item);
			}
			return list;
		} catch (Exception e)
      }
   }
}
package day13;

import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

public class Main1 {

	public static void main(String[] args) {
		ItemDB iDB = new ItemDBImpl();

//		long no[] = {328, 329};
//		int ret = iDB.deleteItemMany(no);
//		
//		System.out.println(ret);
// --------------------------------------------

//		int ret1 = iDB.updateItemOne();
//		Item item = new Item();
//		
//		short ret = 
//		System.out.println(ret1);

// ----------------------------------------------------------------------------------------------------------------

		// 문1) 가격대별 수량 구하기

		// 1 ~ 999 =>
		// 1000 ~ 9999=>
		// 10000
		// 100000 이상

		// 입력받은만큼 불러와서 가격대별로 조건문 걸어서 출력하기?
		List<Item> list = iDB.selectItemList(100);
		int[] i = new int[4];
		System.out.println("----- 문1) 가격대별 수량 구하기 ------");
		System.out.println();
		for (Item item1 : list) {// 불러온 양 만큼 반복문
			if (item1.getPrice() >= 1 && item1.getPrice() <= 999) {
				i[0]++;
			} else if (item1.getPrice() >= 1000 && item1.getPrice() <= 9999) {
				i[1]++;
			} else if (item1.getPrice() >= 10000 && item1.getPrice() <= 99999) {
				i[2]++;
			} else if (item1.getPrice() >= 100000) {
				i[3]++;
			}
		}
		System.out.println("1원 ~ 999원 : " + i[0] + "개");
		System.out.println("1000원 ~ 9999원 : " + i[1] + "개");
		System.out.println("10000원 ~ 99999원 : " + i[2] + "개");
		System.out.println("100000원 이상 : " + i[3] + "개");
		System.out.println("-------------------------------");
		System.out.println();
		System.out.println();

// -----------------------------------------------------------------------------------------------------------------

		// 문2) 시간대별 재고수량 합계 (Date to String으로 변환)

		// 0시
		// 1시
		// 2시
		// ~~~
//		// 23시
		SimpleDateFormat hour = new SimpleDateFormat("HH");
		int[] j = new int[24];
		
		System.out.println("----- 문2) 시간대별 재고수량 합계 (Date to String으로 변환) ------");
		System.out.println();
		
		for (Item item2 : list) { // 전체 반복
			String str1 = hour.format(item2.getDate());
			int h = Integer.parseInt(str1);
			j[h]++;
		}

		for (int a = 0; a < j.length; a++) {
			System.out.println(a + "시 재고수량 : " + j[a]);
		}
		System.out.println("------------------------------------------------------");
		System.out.println();
		System.out.println();

// -----------------------------------------------------------------------------------------------------------------
		// 문3) 재고수량이 1000이상인 것만 가져와서 3자리마다 콤마를 넣어서 출력
		// ex) 물품번호, 이름, 가격, 재고수량
		// 1001, 가나다, 300, 12,343,343
		DecimalFormat com = new DecimalFormat("###,###");
		String str2 = null;
		System.out.println("----- 문3) 재고수량이 1000이상인 것만 가져와서 3자리마다 콤마를 넣어서 출력 ------");
		System.out.println();
		for (Item item3 : list) {
			if (item3.getQuantity() >= 1000) {
				str2 = com.format(item3.getQuantity());
			}
			System.out.println("물품번호 : " + item3.getNo() + "   이름 : " + item3.getName() + "   가격 : " + item3.getPrice()
					+ "   재고수량" + str2);
			System.out.println();
		}
		System.out.println("-------------------------------------------------------------");

		// -----------------------------------------------------------------------------------------------------------------

	}
}

위의 메인을 실행시키면..

이렇게 나오게 된다.

오늘 문제들도 해결 ㅎㅎ

profile
시작은 미약하지만, 그 끝은 창대하리라

0개의 댓글