POSIX Message Queues

../../../../../../_images/mqueue.svg

Overview

  • Message queue parameters

    • Maximum number of messages

    • Maximum size of a single message

    • Realtime guarantees (memory is never swapped out)

  • Message priorities

    • Messages are sent with a priority

    • Higher prioritized messages overtake lower prioritized messages

    • Linux: 0 through 32768-1 (POSIX: 0 through 31)

  • Linux Specific

    • The POSIX API tries to differentiate itself from file IO

    • Although very similar

    • ⟶ Message queue descriptors (mqd_t) are file descriptors

    • ⟶ Usable in event loops (man -s 2 select, man -s 2 poll, man -s 7 epoll)

Setup: Message Queue Creation

  • mq_open is used for creating and opening (much like man -s 2 open)

  • When flags contains O_CREAT

    • mode is required (permissions, see man -s 2 open)

    • attr is required

#include <mqueue.h>
#include <fcntl.h>
#include <stdio.h>
#include <string.h>

struct my_message
{
    int x, y;
};

int main()
{
    struct mq_attr attr = {
        .mq_maxmsg = 10,
        .mq_msgsize = sizeof(my_message)
    };
    mqd_t q = mq_open("/my-queue", O_CREAT|O_EXCL|O_RDWR, 0666, &attr);
    if (q == -1) {
        perror("mq_open");
        return 1;
    }

    return 0;
}

mqueue File System

  • Message queues are visible as files, actually (just a little funny)

  • mqueue filesystem

  • Usually mounted on /dev/mqueue

# mkdir /dev/mqueue
# mount -t mqueue none /dev/mqueue
  • All message queues visible in /dev/mqueue

$ ls -l /dev/mqueue/
...
-rw-r--r--. 1 jfasch jfasch 80 Nov 11 09:08 my-queue
...
  • File content: metadata ⟶ great for debugging

    • QSIZE: number of contained (unread) bytes (not messages)

    • NOTIFY, SIGNO, NOTIFY_PID: historical baggage (nobody uses signals for notification anymore)

$ cat /dev/mqueue/my-queue
QSIZE:0          NOTIFY:0     SIGNO:0     NOTIFY_PID:0

Teardown: Message Queue Deletion

#include <mqueue.h>
#include <stdio.h>

int main()
{
    int error = mq_unlink("/my-queue");
    if (error) {
        perror("mq_unlink");
        return 1;
    }
    return 0;
}
  • rm would work too (on Linux)

$ rm /dev/mqueue/my-queue

Usage: Open An Existing Message Queue, And Produce Into It

  • Open for writing: flags contains O_WRONLY

  • Use mq_send to produce a message into queue ⟶ priority

  • Attention: mq_send() returns 0 on success (not the number of bytes written)

#include <mqueue.h>
#include <stdio.h>

struct my_message
{
    int x, y;
};

int main()
{
    mqd_t q = mq_open("/my-queue", O_WRONLY);
    if (q == -1) {
        perror("mq_open");
        return 1;
    }

    struct my_message msg = { 1, 2 };
    int error = mq_send(q, (const char*)&msg, sizeof(msg), /*prio*/0);
    if (error) {
        perror("mq_send");
        return 1;
    }

    return 0;
}
  • ⟶ Eight bytes in queue

$ cat /dev/mqueue/my-queue
QSIZE:8          NOTIFY:0     SIGNO:0     NOTIFY_PID:0

Usage: Open An Existing Message Queue, And Consume From It

  • Open for reading: flags contains O_RDONLY

  • Use mq_receive to consume a message from queue ⟶ priority

#include <mqueue.h>
#include <stdio.h>

struct my_message
{
    int x, y;
};

int main()
{
    mqd_t q = mq_open("/my-queue", O_RDONLY);
    if (q == -1) {
        perror("mq_open");
        return 1;
    }

    struct my_message msg;
    unsigned int prio;
    ssize_t nread = mq_receive(q, (char*)&msg, sizeof(msg), &prio);
    if (nread == -1) {
        perror("mq_receive");
        return 1;
    }

    printf("prio=%u, x=%d, y=%d\n", prio, msg.x, msg.y);
    return 0;
}
  • ⟶ Queue now empty

$ cat /dev/mqueue/my-queue
QSIZE:0          NOTIFY:0     SIGNO:0     NOTIFY_PID:0

Do Not Use: mq_notify()

Obscure feature …

  • Only shown because of its obscurity

  • Specification predates that of event loops

  • Guess what … SIGNALS

  • Please read yourself and be disturbed!