[CS] IPC message queue 사용해보기

윤동환·2023년 4월 3일
0

Computer Science

목록 보기
10/10

ipc_msg_q.h

#ifndef IPC_MSG_Q_H
# define IPC_MSG_Q_H

# include <cstdio>
# include <cstdlib>
# include <cstring>
# include <sys/msg.h>
# include <cerrno>
# include <ctime>
# include <unistd.h>

# define MSG_SZ 80
# define KEY 1 //for msgid instead of ftok()

/*//////////msgflg//////////
//시스템마다 처리하는 값이 다를 수 있어서 따로 정의하는 것은 좋지 않음

# ifndef IPC_NOWAIT 
#  define IPC_NOWAIT 2048
# endif

# ifndef MSG_NOERROR
#  define MSG_NOERROR 4096
# endif

# ifndef MSG_EXCEPT
#  define MSG_EXCEPT 8192
# endif

*////////////////////////////

typedef struct s_msg_buf {
        long mtype;
        char mtext[MSG_SZ];
} t_msg_buf;

#endif

t_msg_buf 구조체의 mtype 변수는 양의 정수를 가져야만 합니다.

send.c

#include "ipc_msg_q.h"

//argv : [path][block/nonblock][msg][count]
int main(int argc, char *argv[]) {
        t_msg_buf m_buf;

        int msgid = msgget(KEY, IPC_CREAT | 0644);
        if (msgid < 0) {
                printf("Error msgget : %s\n", strerror(errno));
        }


        m_buf.mtype = 1; //using by recv
        strcpy(m_buf.mtext, "===test msg===");

        int msgflg = 0; //initialize to block
        if (argc >= 2 && !(memcmp(argv[1], "nowait", 6)))       {
                msgflg = IPC_NOWAIT; //2048
                printf("Send : setting nowait\n");
        }

        if (argc >= 3) {
                memcpy(m_buf.mtext, argv[2], MSG_SZ);
        }

        int count = 1;
        if (argc >= 4) //if argv[3] isn't type int -> atoi return 0 and send nothhing
                count = atoi(argv[3]); //if count is big size that waste resource

        int chk_send = 0;
        for (int i = 0; i < count; ++i) {
                chk_send = msgsnd(msgid, (char *)&m_buf, MSG_SZ, msgflg);
                if (chk_send < 0) {
                        printf("Error msgget : %s\n", strerror(errno));
                }
        }
        return (0);
}

msgget

고유한 key를 넣으면 그에 대한 식별자(msgid)를 리턴해줍니다.
key값은 ftok() 함수를 사용해서 만들 수도 있습니다.
이 msgid는 커널에서 다루는 msg queue를 식별하는 식별자입니다.

msgsnd

 int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

버퍼에 담긴 데이터를 커널의 message queue에 메모리를 복사합니다.

만약 대기열이 충분하지 않다면 msgsnd()의 기본 동작은 공간이 사용가능해질 때 까지 차단합니다. IPC_NOWAIT가 msgflg에 지정된 경우 대신 EAGAIN 오류와 함께호출 실패합니다.

성공적으로 완료시 message queue 의 data structure는 다음과 같이 업데이트 됩니다.

  • msg_lspid는 호출 프로세스의 프로세스 ID로 설정됩니다.
  • msg_qnum은 1씩 증가합니다.
  • msg_stime은 현재 시간으로 설정됩니다.

recv.c

#include "ipc_msg_q.h"

//argv : [path][msgflg]
int main(int argc, char *argv[]) {
        t_msg_buf m_buf;

        int msgid = msgget(KEY, 0); //if already exist identifer by the key, return same identifier
        if (msgid < 0) {
                printf("Error msgget : %s\n", strerror(errno));
        }

        int msgflg = 0;
        if (argc >= 2) {
                if (!(memcmp(argv[1], "nowait", 6))) {
                        msgflg = IPC_NOWAIT;
                        printf("Recv : setting nowait\n");
                }
                else if (!(memcmp(argv[1], "noerror", 7))) {
                        msgflg = MSG_NOERROR;
                        printf("Recv : setting noerror\n");
                }
                else if (!(memcmp(argv[1], "except", 6))) {
                        msgflg = MSG_EXCEPT;
                        printf("Recv : setting except\n");
                }
                else
                        printf("first arg is invalid flag");
        }

  	//4 args set 0 : read from the first queue
    int len = 0;
    while (len = msgrcv(msgid, (char *)&m_buf, MSG_SZ, 0, msgflg) >= 0) {
     	printf("receive msg : %s, len : %d\n", m_buf.mtext, len);
    }
    if (len < 0) {
        printf("Error msgrev : %s\n", strerror(errno));
    }
        return (0);
}

msgrcv

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
                      int msgflg);
  • msqid는 들어온 id로 message queue를 찾습니다.
  • 찾은 데이터를 msgp라는 buffer에 옮겨 담습니다.
  • msgflg는 send에서 보낸 mtype을 기준으로 어떻게 처리할지 정하는 flag입니다.
    음수 : 해당 음수의 절대값보다 같거나 작은 값을 우선적으로 처리
    0 : 들어온 순서대로 처리
    양수 : 일치하는 msg만 처리

monitor.c

#include "ipc_msg_q.h"

void printTime(time_t *target_time) {
        time_t timer = time(target_time);
        struct tm *t = localtime(&timer);

        printf("%d년 %d월 %d일 %d시 %d분 %d초\n", 
                        t->tm_year + 1900, t->tm_mon + 1, t->tm_mday, t->tm_hour, t->tm_min, t->tm_sec);
}

int main() {
        struct msqid_ds msqstat;
        int msgid = msgget(KEY, 0);
        if (msgid < 0) {
                printf("[monitor] Error megget : %s\n", strerror(errno));
        }

        int chk_msqstat = msgctl(msgid, IPC_STAT, &msqstat);
        if (chk_msqstat < 0) {
                printf("[monitor] Error msgctr : %s\n", strerror(errno));
        }


        printf("대기열의 최대 바이트 수 : %lu\n", msqstat.msg_qbytes);
        printf("대기열에 있는 바이트 수 : %lu\n", msqstat.msg_cbytes);
        printf("매세지 큐에 저장된 매세지 개수 : %lu\n", (msqstat.msg_qnum));
        printf("마지막으로 매세지 보낸 시간 :"); printTime(&msqstat.msg_stime);
        printf("마지막으로 매세지 받은 시간 :"); printTime(&msqstat.msg_rtime);
        printf("매세지 생성 시간 혹은 수정 시간:"); printTime(&msqstat.msg_ctime);
        printf("마지막으로 매세지 보낸 pid %d\n", (int)msqstat.msg_lspid);
        printf("마지막으로 매세지 받은 pid %d\n", (int)msqstat.msg_lrpid);
        printf("%s\n", msqstat.msg_perm);

        return (0);
}

msgctl

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

cmd에는 몇몇의 옵션이 들어올 수 있으며, 옵션을 기준으로 buf에 값을 정합니다.

cmd

  • IPC_STAT : msqid와 관련된 커널 데이터 구조의 정보를 buf가 가리키는 msqid_ds 구조로 복사합니다. 발신자 메시지 대기열에 대한 읽기 권한이 있어야 합니다. - IPC_SET : buf가 가리키는 msqid_ds 구조의 일부 구성원 값을 이 메시지 큐와 연관된 커널 데이터 구조에 기록하고 msg_ctime 구성원도 업데이트합니다.
  • IPC_RMID : 대기중인 reader, writer 프로세스를 깨우고 즉시 해당 메시지 대기열을 지웁니다.
    오류 반환 및 errno가 다음으로 설정됩니다(EIDRM).
    메세지 큐의경우 3번째 인자는 무시됩니다.
  • MSG_STAT : IPC_STAT를 위한 msqid_ds 구조체를 반환한다. 하지만, 그러나 msqid 인수는 대기열 식별자가 아니라 대기열에 대한 인덱스입니다. 시스템의 모든 메시지 대기열에 대한 정보를 유지 관리하는 커널의 내부 배열입니다.

msqid_dz

ipc_perm

추가 용어

MSGMNI: Maximum number of message queue identifiers (system wide).
MSGMNB: Maximum number of bytes per message queue.

간단한 테스트

컴파일
gcc -o revc recv.c
gcc -o send send.c
gcc -o monitor monitor.c

monitor할 msg queue 채우고 확인하기
./send block testsetsetsetsetestsetsetset 999999999
./monitor
./recv

결과
1.send의 첫번째 인자값을 nowait으로 하면 msg queue가 다 차더라도 block에 걸리지 않아 에러를 출력합니다.
2.block으로 걸려있다면 출력하지않고 대기합니다.
3.block으로 걸려있을 때 보낼 msg가 남은상태에서 recv로 큐를 비워주면 다시 send를 합니다.

1번 
[ydh@krujyit1 ~/msgq]$ ./monitor 
대기열의 최대 바이트 수 : 536862720
대기열에 있는 바이트 수 : 536862720
매세지 큐에 저장된 매세지 개수 : 6710784

2번
[ydh@krujyit1 ~/msgq]$ ./monitor 
대기열의 최대 바이트 수 : 536862720
대기열에 있는 바이트 수 : 440936240
매세지 큐에 저장된 매세지 개수 : 5511703
:2024년 2월 23일 11시 23분 37초

3번
[ydh@krujyit1 ~/msgq]$ ./monitor 
대기열의 최대 바이트 수 : 536862720
대기열에 있는 바이트 수 : 77127280
매세지 큐에 저장된 매세지 개수 : 964091
:2024년 2월 23일 11시 23분 41초

위의 출력문은 block 테스트 상황에서 monitor로 확인한 값입니다.
send process를 띄우고 msg가 다 차있는 상태에서 recv process를 띄우니
대기열의 바이트 수가 줄어들기 시작했습니다. 1번과 2번의 결과를 보면 줄어드는 차이가 크지 않았고,
그 이유는 recv가 비워주는 족족 send가 채우고있었기 때문입니다.
3번은 send process를 종료시킨후 바로 확인한 결과입니다. 3~4초만에 거의 모든 메시지가 처리된 것을 확인할 수 있습니다.

profile
모르면 공부하고 알게되면 공유하는 개발자

0개의 댓글