Использование RabbitMQ и AMQP для распределенных рабочих очередей в Node.js
Это последняя статья из серии Work Queue Patterns. В прошлой статье мы рассмотрели, как управлять распределенными очередями работы, используя Redis в качестве службы очередей. В этой статье я объясню, как можно заменить эту реализацию Redis на реализацию RabbitMQ.
Помимо Redis, Rabbit-MQ - еще один сервер с открытым исходным кодом, который реализует постоянные очереди. В отличие от Redis, единственная цель RabbitMQ - предоставить надежное и масштабируемое решение для обмена сообщениями со многими функциями, которых нет или которые сложно реализовать в Redis. RabbitMQ можно использовать не только для рабочих очередей - он позволяет использовать его в качестве общей платформы обмена сообщениями, - но в целях этой главы мы просто собираемся использовать его для координации работы между производителями и потребителями работ.
RabbitMQ - это сервер, который работает локально или на каком-то узле в сети. Клиенты могут быть производителями работы, потребителями работы или обоими, и они будут общаться с сервером, используя протокол под названием Advanced Messaging Queuing Protocol (AMQP). Для Node.js есть несколько библиотек, реализующих этот протокол. Мы выбрали ampqlib
как наиболее удобный в использовании.
Установка RabbitMQ
Чтобы установить RabbitMQ, вам следует зайти на официальный сайт (https://www.rabbitmq.com) и прочитать инструкции по установке для вашей конкретной платформы:
После того, как вы установили Rabbit, вам нужно запустить его. Способ запуска зависит от вашей операционной системы и способа ее установки. Для Homebrew на MacOS:
Установка ваших зависимостей
Теперь вы должны создать каталог проекта, в котором будет находиться код для ваших клиентов RabbitMQ. Внутри этого каталога создайте package.json
манифест, в котором вы укажете свои зависимости:
package.json:
{
"name": "05-amqp_queue",
"version": "0.1.0",
"dependencies": {
"amqplib": "^0.3.0"
}
}
На данный момент нам нужна только зависимость amqplib
. Тогда давайте установим его:
$ npm install
[email protected] node_modules/amqplib
├── [email protected]
├── [email protected]
├── [email protected]
└── [email protected] ([email protected], [email protected], [email protected], [email protected])
Соединять
Чтобы отправлять или получать сообщения, нам нужно установить соединение и канал с сервером RabbitMQ. Мы собираемся поместить код, общий для производителей и потребителей работы, в общий файл с именем channel.js
.
Чтобы создавать сообщения, вам сначала нужно подключиться к заданному серверу RabbitMQ, указав URL-адрес.
channel.js:
var amqp = require('amqplib/callback_api');
var url = process.env.AMQP_URL || 'amqp://guest:guest@localhost:5672';
module.exports = createQueueChannel;
function createQueueChannel(queue, cb) { amqp.connect(url, onceConnected);
function onceConnected(err, conn) { if (err) { cb(err); } else { console.log('connected'); } } }
Во-первых, нам нужна amqplib
библиотека. Здесь мы конкретно требуем API на основе обратного вызова (вместо API на основе обещаний).
Затем мы получаем URL-адрес подключения AMQP из среды процесса. Если переменная AMQP_URL
environment отсутствует, по умолчанию выполняется подключение к localhost через порт по умолчанию с использованием имени пользователя учетной записи guest
с тем же паролем (который используется по умолчанию для RabbitMQ). Это последнее значение по умолчанию может быть полезно только во время разработки на своем компьютере. В других средах вам следует изменить имя пользователя и пароль учетной записи RabbitMQ и убедиться, что они правильно отражены в переменной среды AMQP_URL
.
Затем вы должны подключиться, используя URL-адрес, который вы только что вычислили с помощью amqp.connect
, также передав функцию обратного вызова, когда соединение будет успешным. Этот обратный вызов получает ошибку, если не может подключиться к серверу. Если это возможно, он получит объект подключения по второму аргументу.
Следующим шагом будет создание канала после подключения:
channel.js:
// ...
function onceConnected(err, conn) { if (err) { console.error('Error connecting:', err.stack); } else { console.log('connected'); conn.createChannel(onceChannelCreated); }
function onceChannelCreated(err, channel) { if (err) { cb(err); } else { console.log('channel created'); } } }
Теперь нам нужно убедиться, что очередь создана на RabbitMQ. Для этого мы можем использовать channel.assertQueue()
:
// ...
function onceChannelCreated(err, channel) { if (err) { cb(err); } else { channel.assertQueue(queue, {durable: true}, onceQueueCreated); }
function onceQueueCreated(err) { if (err) { cb(err); } else { cb(null, channel, conn); } } }
Убедившись, что очередь с таким именем существует, мы, наконец, перезваниваем без ошибок, передавая канал и объекты соединения.
Создавать сообщения
Теперь, когда у вас есть канал для RabbitMQ и вы убедились, что очередь создана на RabbitMQ, вы, наконец, можете опубликовать сообщение в эту очередь:
Producer.js:
var Channel = require('./channel');
var queue = 'queue';
Channel(queue, function(err, channel, conn) { if (err) { console.error(err.stack); } else { console.log('channel and queue created'); var work = 'make me a sandwich'; channel.sendToQueue(queue, encode(work), { persistent: true }); setImmediate(function() { channel.close(); conn.close(); }); } });
function encode(doc) { return new Buffer(JSON.stringify(doc)); }
Здесь мы отправляем конкретное сообщение в очередь, используя channel.sentToQueue()
. Эта функция принимает имя очереди в качестве первого аргумента. Второй аргумент ожидает буфер, который мы создаем с помощью функции encode
. Вы можете отправить любое двоичное сообщение в RabbitMQ, и здесь мы решили закодировать произвольный документ JavaScript как строку JSON, а затем создать из него двоичный буфер. Третий аргумент channel.sentToQueue()
содержит несколько параметров. Здесь мы говорим RabbitMQ сохранять сообщение в постоянном хранилище, позволяя ему пережить сбои и повторную инициализацию процесса RabbitMQ.
Затем мы закрываем соединение. Мы не закрываем его сразу, чтобы дать каналу возможность отправлять эти сообщения в сеть.
Обычно вы не хотите этого делать, поскольку вы, вероятно, будете создавать эти сообщения на сервере, обслуживающем несколько запросов. В этом случае у вас может быть общее соединение и вы можете отправлять все эти сообщения, используя это же соединение, и вы закроете соединение только после выключения сервера.
Теперь вы можете использовать наш producer.js
скрипт для создания рабочих запросов из командной строки:
$ node producer
Каждый раз, когда вы запускаете сценарий producer.js
, вы вставляете заказ на работу («сделай мне бутерброд») в очередь с именем «очередь».
Используя разные имена очередей, один сервер или кластер RabbitMQ может поддерживать несколько различных типов работы. Например, у вас может быть очередь с именем «send-email» для отправки электронных писем и другая с именем «profile-update» для обновления сторонней службы с помощью обновлений профиля пользователя.
Потребляйте сообщения
Теперь, когда у нас есть работа, ожидающая в нашей очереди, нам нужно, чтобы ее выполняли рабочие:
worker.js:
var Channel = require('./channel');
var queue = 'queue';
Channel(queue, function(err, channel, conn) { if (err) { console.error(err.stack); } else { console.log('channel and queue created'); consume(); }
function consume() { channel.get(queue, {}, onConsume);
function onConsume(err, msg) { if (err) { console.warn(err.message); } else if (msg) { console.log('consuming %j', msg.content.toString()); setTimeout(function() { channel.ack(msg); consume(); }, 1e3); } else { console.log('no message, waiting...'); setTimeout(consume, 1e3); } } } });
Здесь мы начинаем с создания канала. Как только мы его установили, мы можем начать попытки получать сообщения внутри функции consume()
. Здесь мы пытаемся получить сообщение с помощью функции channel.get
, передав имя очереди. Второй аргумент предполагает несколько вариантов. В нашем случае нам ничего передавать не нужно. Последний аргумент - это функция обратного вызова (onConsume
). При успешной или неудачной выборке сообщения вызывается обратный вызов onConsume
. Если ошибки нет, мы проверяем, не было ли нас ожидающее сообщение. Если для нас не было сообщения, мы немного ждем, а затем снова пытаемся получить другое (в нашем случае через одну секунду).
Если есть ожидающее сообщение, мы обрабатываем его, выполняя все, что является работой исполнителя (отправка электронной почты, связь с удаленным сервером и т. Д.), А затем подтверждаем сообщение и пытаемся принять следующее сообщение. Подтверждая сообщение, мы сообщаем RabbitMQ, что это сообщение было обработано, удаляя его из списка ожидающих обработки.
Если ваш рабочий процесс умирает, канал будет отключен. Поскольку RabbitMQ хранит список неподтвержденных сообщений для каждого канала, каждое из этих сообщений в конечном итоге будет повторено, как только его получит другой рабочий процесс.
Это последняя статья на тему Рабочие очереди, опубликованная для YLD.
Вы можете найти все предыдущие сообщения по этой теме здесь:
- Представляем очереди в Node.js
- Использование постоянной локальной очереди в Node.js
- Распределенная очередь работ