Develop

[c] 메시지큐(Message Queue) 설명.. (joinc)

by hooni posted Apr 23, 2013
?

단축키

Prev이전 문서

Next다음 문서

ESC닫기

크게 작게 위로 아래로 댓글로 가기 인쇄

1 메시지 큐란 

메시지큐는 메시지를 queue 데이타 구조 형태로 관리한다. 큐(:12)는 선입선출(먼저 들어간게 먼저 나오는) 데이타 구조를 말하며, 보통의 은행창구 혹은 일반적인 줄서기를 생각하면 된다. 이것은 흔히 FIFO(First in First Out)라고 불리운다(IPC 의 FIFO 설비와 혼동하지 말자). 이것과 반대되는 데이타 구조를 stack 이라고 하며, 큐와 반대로 가장 나중에 들어온게(가장 최근데이타) 먼저 나오는 형태를 가진다.

일반적인 배열을 접근방법에 따라 특수하게 분류한것이라고 생각하기 바란다.

메시지큐는 커널에서 전역적으로 관리되며(이를테면 커널 전역변수형태로), 모든 프로세스에서 접근가능하도록 되어있으므로, 하나의 메시지큐 서버가 커널에 요청해서 메시지큐를 작성하게 되면, 메시지큐의 접근자(식별자)를 아는 모든 프로세서는 동일한 메시지큐에 접근함으로써, 데이타를 공유할수 있게 된다.

메시지큐의 IPC로써의 특징은 다른 공유방식에 비해서 사용방법이 매우 직관적이고 간단하다라는데 있다. 다른 코드의 수정없이 단지 몇줄만의 코드를 추가시킴으로써 간단하게 메시지큐에 접근할수 있다.

또한 뒤에서 자세히 다루겠지만 메시지의 type 에 의해서 여러종류의 메시지를 효과적으로 다룰수 있는 장점을 가지고 있다. 여러개의 프로세스가 하나의 메시지큐를 엑세스 할때, 각 메시지에 type 를 줌으로써 각 프로세스에게 필요로하는 메시지만을 가져가게 할수 있는 상당히 편리한 기능을 제공한다.

또한 구조체를 몽땅 넘길수 있고, 이는 데이타의 사용을 매우 편하게 만들어준다.

2 System V 메시지 큐 

2.1 메시지큐의 생성, 사용, 제어 

메시지큐를 생성하고, 이를 이용 및 조작하기 위해서 전통적인 Unix(:12) 시스템은 다음과 같은 함수들을 제공한다. 앞으로의 설명은 아래의 함수들을 기준으로 이루어지게 될것이다.

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget (key_t key, int msgflg)
int msgsnd (int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg)
ssize_t msgrcv (int msqid, struct  msgbuf  *msgp,  size_t msgsz, 
                long msgtyp, int msgflg) 
Unix 커널은 메시지큐 정보를 유지하기 위해서 msqid_ds 라는 별도의 구조체를 유지한다. msqid_ds 구조체는 /usr/include/bits/msq.h 에 선언되어 있으며 대충 다음과 같은 구조를 가진다

이건 linux os 기준이며, Unix 에 따라 약간씩 다를수 있다
struct msqid_ds
{
    struct ipc_perm msg_perm; /* structure describing operation permission */
    __time_t msg_stime;       /* time of last msgsnd command */
    unsigned long int __unused1;
    __time_t msg_rtime;       /* time of last msgrcv command */
    unsigned long int __unused2;
    __time_t msg_ctime;       /* time of last change */
    unsigned long int __unused3;
    unsigned long int __msg_cbytes; /* current number of bytes on queue */
    msgqnum_t msg_qnum;       /* number of messages currently on queue */
    msglen_t msg_qbytes;      /* max number of bytes allowed on queue */
    __pid_t msg_lspid;        /* pid of last msgsnd() */
    __pid_t msg_lrpid;        /* pid of last msgrcv() */
    unsigned long int __unused4;
    unsigned long int __unused5;
};
최초 msgget 를 이용해서 커널에 메시지큐를 요청하면, 커널은 해당 메시지큐를 위해 메모리를 할당하고, 메모리 관리와 그밖의 정보 관리를 위해 위의 구조체를 세팅하게 된다.

2.2 메시지큐 생성 

메시지큐의 생성과 기존에 있던 메시지큐의 참조는 msgget(2) 를 이용해서 이루어진다. 첫번째 아규먼트인 key 는 kernel 에서 유일한 메시지큐를 만들고 참조하기 위해서 사용하는 식별번호이며, msgflg 는 메시지큐를 어떻게 생성하고 참조할지 행동양 식을 정해주기 위한 아규먼트이다. key 는 적당하게 유일한 int 형의 숫자를 정해주면 된다. msgflg 에는 IPC_CREAT와 IPC_EXCL등의 시작동작을 정해줄수 있으며, 퍼미션을 지정해 줄수도 있다.
IPC_CREAT 메시지큐를 새로 생성하기 위해서 사용한다. 만약 기존에 key 로 생선된 메시지큐가 있다면 해당 메시지큐의 식별자를 되돌려준다. IPC_EXCL IPC_CREAT 와 함께 쓰이며, IPC_EXCL이 지정되어 있을경우 이미 key 로 존재하는 메시지큐가 있다면, -1 을 리턴하고 errno 를 세팅한다. 

msgget 는 성공할경우 메시지큐에 접근할수 있는 int 형의 메시지큐 식별자를 되돌려주며, 이후로는 이 메시지큐 식별자를 통해서 필요한 작업을 하게 된다.

2.3 메시지큐에 데이타 쓰기 

메시지를 보내기 위해서는 msgsnd(2) 를 사용한다. 첫번째 아규먼트는 msgget 를 통해서 얻어온 메시지큐 식별자이며, 2번째 아규먼트는 메시지큐에 넘기고자하는 구조체, 3번째 아규먼트는 2번째 아규먼트인 구조체의 크기, 마지막 아규먼트는 메시지전달 옵션으로 봉쇄할것인지 아니면 비봉쇄로 메시지를 결정하기 위해서 사용된다.

2번째 아규먼트가 메시지큐로 전달할 메시지라고 했는데, 이것은 구조체로 전달 되며, 다음과 같은 모습을 가지게 된다.
struct msgbuf
{
    long mtype;
    char mtext[255];
}
위의 모습은 메시지 구조체의 매우 전형적인 모습으로 사실멤버변수는 필요에 따라서 얼마든지 변경될수 있다. 다만 long mtype 만이 필수요소이다.

mtype 는 메시지의 타입으로 반드시 0보다 더큰 정수이여야 한다. 우리는 이 mtype 를 각각 다르게 줌으로써, 특정 프로세스에서 특정 메시지를 참조할수 있도록 만들수 있다. 예를 들어 A 라는 프로세스가 A 라는 메시 타입을 참조해야 하고 B 는 B 로 참조하도록 만들어야 한다면, msgbuf 를 만들때, mtype 에 A 은 1 B 은 2 이런식으로 메시지 타입을 정의 하고 A 는 mtype 가 1인것을 B는 mtype 이 2인것을 가지고 가도록 만들면 된다.


위의 그림에서 처럼 mtype 을 이용해서 자신이 원하는 메시지에만 선택 적으로 접근이 가능하다. 이특성을 잘 이용하면 매우 유용하게 사용할수 있을것이다.

msgsz 은 구조체의 크기이니 그냥 넘어가고, msgflg 에 대해서 설명하겠다. msgflg 에는 IPC_NOWAIT를 설정할수 있으며 이 값을 이용해서 봉쇄형으로 할것인지 비봉쇄형으로 할것인지 결정할수 있다. IPC_NOWAIT를 설정하면, 메시지가 성공적으로 보내지게 될때까지 해당영역에서 봉쇄(block)되며, 설정하지 않을경우 에는 바로 return 하게 된다.

2.4 메시지큐의 데이타 가져오기 

데이타는 msgrcv(2) 함수를 이용해서 가져올수 있다. 1번째 아규먼트는 메시지큐 식별자이며, 2번째가 가져올 데이타가 저장될 구조체, 3번째는 구조체의 크기, 4번째는 가져올 메시지 타입, 5번째는 세부 조종 옵션이다.

다른것들은 굳이 설명할 필요가 없는 간단한 것들이고, 다만 4번째 메시지 타입인 msgtyp에 대해서 상세히 설명하고, msgflg 를 간단히 설명하는 정도로 넘어가도록 하겠다. 우리는 메시지를 보낼 구조체를 만들때 mtype 라는것을 정의 해서, 메시지를 분류해서 접근할수 있도록 한다는것을 알고 있다. 메시지를 가져올때는 바로 msgtyp 를 통해서 자기가 원하는 msgtyp 의 메시지 구조체에 접근할수 있게 된다.
msgtyp == 0
메시지 큐의 첫번째 데이타를 돌려준다.
msgtyp > 0
메시지의 mtype 가 msgtyp 와 같은 첫번째 데이타를 돌려준다.
msgtyp < 0 메시지의 mtype 이 msgtyp 의 절대값보다 작거나 같은 첫번째 데이타를 돌려준다. 

msgflg 는 msgrcv 의 메시지 가져오는 형태를 봉쇄로 할것인지 비 봉쇄로 할것인지 지정하기 위해서 사용한다. IPC_NOWAIT 를 설정할경우 가져올 메시지가 없더라도 해당 영역에서 봉쇄되지 않고 바로 error 코드를 넘겨주고 리턴한다. 그렇군


2.5 예제를 통해 알아본 메시지큐 

여기에는 총 2개의 예제프로그램이 만들어진다. 하나는 메시지큐 생산자로써, 메시지큐를 생성하고 메시지를 만들어서 메시지큐에 보내는(msgsnd) 일을 하고 다른 하나는 소비자로써 메시지큐에 있는 데이타를 받아오는 일을 한다. 다음은 메시지큐 생산자 이다.
#include <sys/types.h
#include <sys/ipc.h> 
#include <sys/msg.h> 
#include <sys/stat.h> 

struct msgbuf
{
    long msgtype;
    char mtext[256];
    char myname[16];
    int  seq;
};

int main()
{
    key_t key_id;
    int i;
    struct msgbuf mybuf, rcvbuf;

    key_id = msgget((key_t)1234, IPC_CREAT|0666);
    if (key_id == -1)
    {
        perror("msgget error : ");
        exit(0);
    }

    printf("Key is %d\n", key_id);

    memset(mybuf.mtext, 0x00, 256); 
    memset(mybuf.myname, 0x00, 16); 
    memcpy(mybuf.mtext, "hello world 4", 13);
    memcpy(mybuf.myname, "yundream", 8);
    mybuf.seq = 0;
    i = 0;

    while(1)
    {
        // 짝수일경우 메시지 타입이 4
        // 홀수일경우에는 메시지 타입이 3
        if (i % 2 == 0)
            mybuf.msgtype = 4;
        else 
            mybuf.msgtype = 3;
        mybuf.seq = i;

        // 메시지를 전송한다. 
        if (msgsnd( key_id, (void *)&mybuf, sizeof(struct msgbuf), IPC_NOWAIT) == -1)
        {
            perror("msgsnd error : ");
            exit(0);
        } 
        printf("send %d\n", i);
        i++;
        sleep(1);
    }

    printf("%d \n", rcvbuf.msgtype);
    printf("%s \n", rcvbuf.mtext);
    printf("%s \n", rcvbuf.myname);
    exit(0);
}
프로그램은 간단하다. mybuf 란 구조체를 만들어서 메시지를 전송하는데, 이때 메시지 타입을 i % 2 가 0일경우 4로 그렇지 않을경우 3으로 해서 전송을 하도록 만들었다.

#include <sys/types.h> 
#include <sys/ipc.h> 
#include <sys/msg.h> 
#include <sys/stat.h> 
struct msgbuf  //구조체도 헤더파일에서 선언된 것과 겹치는 현상이 일어나서 컴파일이 되질 않습니다. 구조체 이름도 바꾸어야합니다.
{
    long msgtype;
    char mtext[256];
    char myname[16];
    int  seq;
};

int main(int argc, char **argv)
{
    key_t key_id;
    struct msgbuf mybuf;
    int msgtype;//전역변수로 빼야 됩니다.

    // 아규먼트가 있을경우 msgtype 가 3인 메시지를 받아오고(홀수) 
    // 아규먼트가 없을경우 msgtype 가 4인 메시지를 받아온다(짝수)  
    if (argc == 2)
        msgtype = 3;
    else 
        msgtype = 4;

    key_id = msgget(1234, IPC_CREAT|0666);
    if (key_id < 0)
    {
        perror("msgget error : ");
        exit(0);
    }
    while(1)
    {
        if (msgrcv( key_id, (void *)&mybuf, sizeof(struct msgbuf), msgtype, 0) == -1)
        {
            perror("msgrcv error : ");
            exit(0);    
        }
        printf("%d\n", mybuf.seq);
    }
    exit(0);
}
이 예제는 더 간단하다. 아규먼트가 있으면 메시지타입이 3인 메시지를 아규먼트가 없으면 메시지 타입이 4인 메시지를 가져오도록 한다.

프로그램을 컴파일후 테스트를 해보면 ./msgrcv 1을 (아규먼트를 주고 실행) 실행시키면 msgtype 가 4인 메시지를 받아오고 그렇지 않을경우 msgtype 가 3인 메시지를 받아옴을 알수 있을것이다. ./msgsnd, ./msgrcv, ./msgrcv 1 을 동시에 띄워서 테스트하면 된다.

2.6 메시지큐의 제어 

msgctl(2)함수를 이용한다. 첫번째 아규먼트인 msqid 는 메시지 식별자이며, 2번째 아규먼트인 cmd 는 해당 작동명령, 그리고 마지막 아규먼트는 msqid_ds 구조체 이다. 우리는 cmd 를 통해서 해당 메시지식별자가 가르키는 메시지큐를 제어할수 있다. cmd 에는 아래와 같은 종류의 명령을 사용할수 있다.

IPC_STAT 메시지큐의 정보를 원할때 사용한다. 해당 메시지큐의 정보는 3번째 아규먼트인 msqid_ds 구조체를 통해 넘어오게 된다. 

IPC_SET msqid_ds 구조체 정보를 변경하고자 할때 사용한다. 주로 퍼미션 정보를 바꾸기 위해서 사용한다. IPCRMID 현재 메시지큐를 제거한다. 

2.7 정리 

이상 메시지큐에 대해서 간단히 알아보았다. 지금까지 설명에서 처럼 메시지큐는 내부 프로세스간 통신을 위한 상당히 유연한 방법을 제공하고 있음을 알수 있다. 반면 단점이 있는데, 제어하기가 상당히 까다롭다는 점이다.

우선 메시지큐에 들어갈수 있는 데이타의 수가 고정되어 있는데, 메시지큐가 어떤 이유로 꽉찼을 경우 이를 알수 있는 방법이 애매하다. 위의 예제에서 ./msgrcv 와 ./msgrcv 1 이 메시지를 계속적으로 소비하도록 되어 있는데, 만약 둘중 하나가 이상작동을 해서 메시지를 받아오지 못할경우 결국 메시지큐가 꽉 차버리게 되고, 더이상 정상적인 작동을 못하게 될것이다. 또한 커널의 영향을 많이 받으며, 잘못된 메시지큐의 사용은 전체 시스템에 영향을 미칠수도 있게 만든다. 이는 전체 시스템에서 사용할수 있는 메시지큐의 수와 크기에 제한이 있기 때문으로, 메시지큐를 사용하기 위해서는 조심해서 사용해야될 필요성이 있다.

또한 커널은 몇개의 프로세스가 현재 메시지큐를 참조하는지를 알려주는 참조계수를 제공하지 않는다. 그러므로 프로세스에 어떤 문제가 생겼을때, 해당 프로세스에 정확하게 어떤 문제가 발생했는지 알아내는게 상당히 까다롭다.

그러므로 메시지큐를 짧은 시간에 다량의 정보를 전달하기 위한 목적으로 사용하는 데에는 적당치 않다. 그리 많지 않은 정보를 프로세스간 교환하기 위한 용도로 사용하기에 적당한 IPC 설비이다.

3 POSIX 메시지 큐 

POSIX 메시지 큐는 System V 메시지 큐의 새로운 버전이다. 함수의 이름과 종류는 다르지만 하는일은 비슷하다. 비교적 최근의 표준인 만큼 System V 기반의 메시지 큐 함수보다, 더 직관적이고 더 쓰기 편하다.

3.1 메세지 큐 만들기 

리눅스에서 메세지 큐는 가상 파일 시스템에 만들수 있다. 다음과 같이 만들면 된다. 파일 시스템을 사용하지 않을 경우, 커널이 관리한다. 파일 시스템을 만들어서 사용하면, ls, cat, rm 과 같은 명령을 이용해서 쉽게 큐를 관리할 수 있으므로 추천하는 방식이다.
$ mkdir /dev/mqueue
$ mount -t mqueue none /dev/mqueue
생성된 디렉토리는 자동으로 sticky bit가 설정된다.

이제 mq_open(:12)함수로 메세지 큐를 생성 요청을 하면 /dev/mqueue아래에 파일 형태로 만들어진다. 일반 파일이므로 ls(1), rm(1)등으로 관리할 수 있다.
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>

mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode,
                     struct mq_attr *attr);
mq_open 함수는 name이름을 가지는 메시지 큐를 만든다. 매개변수 oflag는 큐의 생성방식을 정의 하기 위해서 사용한다.mode는 권한을 설정하기 위해서 사용한다. 자세한 내용은 man page 문서를 참고하자.

mq_open 함수는 메시지 큐를 가리키는 지정번호를 반환한다. 이 지정번호는 파일 지정번호와 매우 유사하며, mq_send(:3), mq_receive(:3)와 같은 함수에서 사용한다. 실패하면 -1을 반환한다.

attr은 만들 메시지큐의 특성을 설정하기 위해서 사용한다.
struct mq_attr
{
    long mq_flags;            /* 0 or O_NONBLOCK */
    long mq_maxmsg;           /* 메시지 큐의 크기. 들어갈 수 있는 아이템의 수 */
    long mq_msgsize;           /* 메시지 크기로 바이트 단위 */
    long mq_curmsgs;           /*
}


cat으로 메세지큐의 내용을 확인할 수 있다.
$ cat /dev/mqueue/q.123
QSIZE:0          NOTIFY:0     SIGNO:0     NOTIFY_PID:0     
각 필드에 대해서 살펴보자.
  • QSIZE : 큐에 있는 모든 메시지의 바이트 크기
  • NOTIFY_PID :
  • NOTIFY :
  • SIGNO :

3.2 예제 

3.2.1 mq_server 

#include <stdlib.h>
#include <fcntl.h>
#include <mqueue.h>
#include <stdio.h>

int main(int argc, char **argv)
{
    struct mq_attr attr;
    attr.mq_maxmsg = 10;
    attr.mq_msgsize = 4;
    int value;

    mqd_t mfd;

    mfd = mq_open("/my_mq", O_RDWR | O_CREAT,  0666, &attr);
    if (mfd == -1)
    {
            perror("open error");
            exit(0);
    }

    while(1)
    {
            if((mq_receive(mfd, (char *)&value, attr.mq_msgsize,NULL)) == -1)
            {
                    exit(-1);
            }
            printf("Read Data %d\n", value);
    }
}         

3.2.2 mq_client 

#include <stdlib.h>
#include <fcntl.h>
#include <mqueue.h>
#include <stdio.h>

int main(int argc, char **argv)
{
    struct mq_attr attr;
    attr.mq_maxmsg = 10;
    attr.mq_msgsize = 4;
    int value = 0;

    mqd_t mfd;

    mfd = mq_open("/my_mq", O_WRONLY, 0666, &attr);
    if(mfd == -1)
    {
        perror("open error");
        exit(0);
    }

    while(1)
    {
        if((mq_send(mfd, (char *)&value, attr.mq_msgsize, 1)) == -1)
        {
            perror("send error");
            exit(-1);
        }
        sleep(1);
        value ++;
    }
}


[출처] http://www.joinc.co.kr/modules/moniwiki/wiki.php/Site/system_programing/IPC/MessageQueue


TAG •