Демоны — очереди сообщений

Всем привет!

В предыдущих выпусках мы уже узнали как можно создавать простых демонов, как правильно посылать демонам сигналы, и эти же самые, принятые сигналы, корректно обрабатывать. Теперь мы вплотную подошли к моменту, когда наш демон должен научиться общаться с другими программами, или сам с собой, но в разных экземплярах.

В мире unix, существует немыслимое множество способов передать команду или сообщение от демона к веб-скрипту и наборот. Но сейчас я хочу рассказать только про очереди сообщений — «System V IPC Messages Queues».

Внимание, в статье требуется php расширения pcntl и что бы php был собран с опцией —enable-sysvmsg

Должно быть, вам в детстве мама уже рассказывала о том, что очереди очереди могут быть как в реализации System V IPC, так и в реализации Posix. Я же хочу затронуть тему только System V, как близкую мне лично. А тему posix, по возможности, буду стараться избегать.

Итак, приступим. Очереди работают на уровне «нормальной» операционной системы, сохраняются в памяти и представляют из себя структуру данных, которая доступна всем системным программам. В очередях, подобно тому, как и в файловой системе, есть возможность настраивать права доступа и размер сообщения. Обычно сообщение в очереди устанавливают небольшого размера, не более 8 килобайт.

На этом вводная часть заканчивается, переходим к практике.

Отправляем сообщения

queue-send.php

// Конечно же, ключ генерируется функцией ftok на основе переданного файла, но можно и так тоже
$key = 555;

// Создаём очередь, в качестве ключа мы должны использовать любое целочисленное значение
$queue = msg_get_queue($key);

// Отправляем сообщение. Заметьте, что все обязательные поля уже заполнены, 
// но порой требуется и сериализовать объект или поставить на сообщение блокировку
// Обратите внимание, что мы указываем разный тип. Тип - это некая группировка в очереди.
msg_send($queue, 1, 'message, type 1');
msg_send($queue, 2, 'message, type 2');
msg_send($queue, 3, 'message, type 3');
msg_send($queue, 1, 'message, type 1');

echo "send 4 messages\n";

Принимаем сообщения

queue-receive.php

$key = 555;
$queue = msg_get_queue($key);

// Перебираем очередь по типам сообщений
for ($i = 1; $i <= 3; $i++) {
    echo "type: {$i}\n";
    
    // Перебираем очередь, прочитанные сообщения из очереди удаляются
    // Тут мы указали константу MSG_IPC_NOWAIT, без неё очередь будет крутиться вечно
    while ( msg_receive($queue, $i, $msgtype, 4096, $message, false, MSG_IPC_NOWAIT) ) {
        echo "type: {$i}, msgtype: {$msgtype}, message: {$message}\n";
    }    
}

Давайте запустим по очереди сперва файл queue-send.php, а затем queue-receive.php.

u% php queue-send.php   
send 4 messages
u% php queue-receive.php
type: 1
type: 1, msgtype: 1, message: s:15:"message, type 1";
type: 1, msgtype: 1, message: s:15:"message, type 1";
type: 2
type: 2, msgtype: 2, message: s:15:"message, type 2";
type: 3
type: 3, msgtype: 3, message: s:15:"message, type 3";

Можно заметить, что сообщения были сгруппированы, и сперва было выведено 2 сообщения первого типа, а затем все остальные.
Если же, мы бы указали получать сообщения типа 0, то получили бы все сообщения вне зависимости от типа.

while ( msg_receive($queue, $i, $msgtype, 4096, $message, false, MSG_IPC_NOWAIT) ) {
// ...

Так же, тут стоит заметить ещё одну особенность очереди: если мы уберём константу MSG_IPC_NOWAIT, уберём из скрипта лишнее, и запустим в одном терминале файл queue-receive.php, а в другом будем периодически запускать файл queue-send.php, то мы увидим как демон может эффективно использовать очередь, ожидая из неё задания.

queue-receive-wait.php

$key = 555;
$queue = msg_get_queue($key);

// Перебираем очередь по типам сообщений
// Перебираем очередь, прочитанные сообщения из очереди удаляются
while ( msg_receive($queue, 0, $msgtype, 4096, $message) ) {
    echo "msgtype: {$msgtype}, message: {$message}\n";
}

Собственно на этом всё самое интересное про очереди я уже рассказал. Ещё есть функции получения статистики очереди, удаление и проверка на существование очереди.

Давайте теперь попробуем написать демона слушающего очередь:
queue-daemon.php

// Форкаем процесс
$pid    = pcntl_fork();
$key    = 555;
$queue  = msg_get_queue($key);

if ($pid == -1) {
    // Ошибка 
    die('could not fork' . PHP_EOL);
} else if ($pid) {
    // Родительский процесс, убиваем
    die('die parent process' . PHP_EOL);
} else {
    // Новый процесс, запускаем главный цикл
    while ( msg_receive($queue, 0, $msgtype, 4096, $message) ) {
        echo "msgtype: {$msgtype}, message: {$message}\n";
    }    
}
// Отцепляемся от терминала
posix_setsid();