Одна удивительная вещь в Node.js 12, которая стала основной линейкой релизов с долгосрочной поддержкой в октябре 2019 года, — это стабильная поддержка рабочих потоков.
Рабочие изначально являются функцией Интернета, где они позволяют разработчикам запускать фоновые задачи, не блокируя поток рендеринга браузера в течение длительного времени. Точно так же в Node.js запуск синхронного кода JavaScript в течение длительного времени считается плохой практикой, поскольку он не позволяет циклу обработки событий обрабатывать ввод-вывод и, например, выполнять операции ввода-вывода. другие ожидающие HTTP-запросы.
Традиционно решение проблемы ресурсоемких задач в Node.js заключалось в создании нескольких дочерних процессов, которые обрабатывают запросы параллельно и управляются с помощью диспетчеров процессов, таких как pm2, или встроенного кластерного модуля Node.js. Рабочие не заменяют эту модель, но они предоставляют альтернативу для случаев использования, в которых желательна более простая связь между различными задачами. Например, они предоставляют способы совместного использования или передачи типизированных массивов, когда это полезно.
Те, кто использовал Web Workers, найдут знакомый API в Node.js. Некоторые из отличий включают в себя то, что Node.js не имеет эквивалента EventTarget, а вместо него Worker использует более привычный API EventEmitter — например, сообщения изнутри Worker принимаются через `, а не через worker.on('message ', callback);`worker.onmessage = callback;`. `
Итак, давайте на самом деле соберем конкретный пример: допустим, мы хотим предоставить API, который позволяет разработчикам отправлять ему нерешенные судоку и получать обратно решенные. Решение судоку не занимает много времени — что-то порядка 40 мс на моем ноутбуке — но этого достаточно, чтобы сказать, что, возможно, мы не хотим, чтобы все остальное в процессе ждало его завершения.
Полный код, используемый в этом сообщении в блоге, также доступен в виде репозитория Github по адресу https://github.com/addaleax/workers-sudoku.
Базовое общение
Предположим, мы уже написали решатель судоку, который принимает массив из 81 поля со значениями 1–9 внутри массива для полей с фиксированным значением и 0, где мы не знаем числа в поле, и нам нужен этот решатель. для запуска в своем собственном потоке. Вот как это может выглядеть:
'use strict'; const { parentPort } = require('worker_threads'); // See the Github repo for the full sudoku solving code const { solveSudoku } = require('./solve-sudoku.js'); // parentPort is the Worker's way of communicating with the parent, similar to // window.onmessage in Web Workers. parentPort.on('message', (sudokuData) => { const solution = solveSudoku(sudokuData); parentPort.postMessage(solution); });
В браузерах рабочие процессы выполняются в очень разных средах с доступом к гораздо меньшему количеству API, чем основной поток, но в Node.js рабочие процессы ведут себя гораздо ближе к стандартным сценариям Node.js; например, работает так же, как и в Node.js, и доступны все встроенные модули. Node.js также не добавляет специальные методы к глобальному объекту для Workers, поэтому, например, для связи из основного потока метод , который в Node.js имеет и ` require( )`` Объект parentPort` необходимо загрузить из модуля worker_threads. Это экземпляр API `MessagePort`.on(‘message’)` ` .postMessage()`, который можно использовать, как показано выше. `
Теперь, в качестве второго шага, нам нужно как-то поговорить с этим рабочим потоком из нашего основного приложения. Предполагая, что мы сохранили приведенный выше код как worker.js, вот как это может выглядеть:
'use strict'; const { Worker } = require('worker_threads'); // Example Sudoku based on the one on Wikipedia's 'Sudoku' page: const sudoku = new Uint8Array([ 5, 3, 0, 0, 7, 0, 0, 0, 0, 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 8, 0, 0, 0, 0, 6, 0, 8, 0, 0, 0, 6, 0, 0, 0, 3, 4, 0, 0, 8, 0, 3, 0, 0, 1, 7, 0, 0, 0, 2, 0, 0, 0, 6, 0, 6, 0, 0, 0, 0, 2, 8, 0, 0, 0, 0, 4, 1, 9, 0, 0, 5, 0, 0, 0, 0, 8, 0, 0, 7, 9, ]); const worker = new Worker('./worker.js'); worker.postMessage(sudoku); worker.once('message', (solution) => { console.log(solution); // Let the Node.js main thread exit, even though the Worker // is still running: worker.unref(); });
Это печатает
$ node local-test.js Uint8Array [ 5, 3, 2, 1, 7, 6, 9, 4, 8, 6, 7, 4, 3, 9, 8, 5, 1, 2, 1, 9, 8, 2, 4, 5, 3, 6, 7, 8, 5, 9, 7, 6, 1, 4, 2, 3, 4, 2, 6, 8, 5, 3, 7, 9, 1, 7, 1, 3, 9, 2, 4, 8, 5, 6, 9, 6, 1, 5, 3, 7, 2, 8, 4, 2, 8, 7, 4, 1, 9, 6, 3, 5, 3, 4, 5, 6, 8, 2, 1, 7, 9 ]
на консоль, так что это действительно работает. Ура! Опять же, сам объект и представляет другой конец канала связи, к которому ` .on('message')` ` .postMessage()` API также доступны на рабочем parentPort``.
Здесь мы используем a, хотя ` Uint8Array`` Array` также подойдет: причина в том, что это немного более эффективно передавать типизированные массивы между потоками, чем обычные массивы, но об этом позже.
Объединение рабочих
Это работает, но оказывается, что у Node Workers есть две стороны столь же мощных, как они есть: поскольку они в основном являются полнофункциональными экземплярами Node.js, запуск одного из них занимает несколько миллисекунд каждый раз, поэтому на практике лучше держите несколько экземпляров Worker в так называемом пуле Worker, готовых отвечать на запросы. Существует ряд пакетов npm, которые реализуют пулы рабочих процессов, и хотя их рекомендуется использовать на практике, для этого примера мы реализуем пул рабочих процессов самостоятельно. (Это также означает, что мы пропустим реализацию расширенных функций, таких как правильное отслеживание асинхронных операций — пакеты npm для пулов рабочих потоков должны реализовать их.)
Итак, давайте соберем реальный HTTP-сервер, который принимает HTTP-запросы с полезной нагрузкой JSON, содержащей нерешенные судоку, и возвращает решенную в ответе. Мы будем использовать пул рабочих с фиксированным размером, а это означает, что когда нам нужно запустить задачу, мы берем рабочего из пула, когда он доступен, а в противном случае ждем, пока он не станет доступным.
'use strict'; const http = require('http'); const { Worker } = require('worker_threads'); const workerPool = [ // Start a pool of four workers new Worker('./worker.js'), new Worker('./worker.js'), new Worker('./worker.js'), new Worker('./worker.js'), ]; const waiting = []; http.createServer((req, res) => { let body = ''; req.setEncoding('utf8'); // Receive strings rather than binary data req.on('data', chunk => body += chunk); req.on('end', () => { let dataAsUint8Array; try { dataAsUint8Array = new Uint8Array(JSON.parse(body)); // Fix the length at 81 = 9*9 fields so that we are // not DoS'ed through overly long input data. dataAsUint8Array = dataAsUint8Array.slice(0, 81); } catch (err) { res.writeHead(400); res.end(`Failed to parse body: ${err}`); return; } res.writeHead(200, { 'Content-Type': 'application/json' }); if (workerPool.length > 0) { handleRequest(res, dataAsUint8Array, workerPool.shift()); } else { waiting.push((worker) => handleRequest(res, dataAsUint8Array, worker)); } }); }).listen(3000); function handleRequest(res, sudokuData, worker) { worker.postMessage(sudokuData); worker.once('message', (solutionData) => { res.end(JSON.stringify([...solutionData])); // Put the Worker back in the queue. if (waiting.length > 0) waiting.shift()(worker); else workerPool.push(worker); }); }
Запуск ` node server.js` запускает HTTP-сервер на порту 3000:
$ curl -d '[5,3,0,0,7,0,0,0,0,6,0,0,0,0,0,0,0,0,0,9,8,0,0,0,0,6,0,8,0,0,0,6,0,0,0,3,4,0,0,8,0,3,0,0,1,7,0,0,0,2,0,0,0,6,0,6,0,0,0,0,2,8,0,0,0,0,4,1,9,0,0,5,0,0,0,0,8,0,0,7,9]'https://localhost:3000/ [5,3,2,1,7,6,9,4,8,6,7,4,3,9,8,5,1,2,1,9,8,2,4,5,3,6,7,8,5,9,7,6,1,4,2,3,4,2,6,8,5,3,7,9,1,7,1,3,9,2,4,8,5,6,9,6,1,5,3,7,2,8,4,2,8,7,4,1,9,6,3,5,3,4,5,6,8,2,1,7,9]
Таким образом, мы можем отправить неразгаданную судоку и получить обратно правильное решение!
Перенос данных
Для дальнейшей оптимизации этого приложения мы можем заменить
worker.postMessage(sudokuData); worker.postMessage(sudokuData, [sudokuData.buffer]]);
То есть мы можем использовать тот факт, что мы сохранили это, добавив базовый элемент в «список передачи», список объектов, которые перемещаются на принимающую сторону канала связи, а не копируются. В настоящее время только и поддерживаются как передаваемые. После отправки передаваемого объекта с помощью . его больше нельзя использовать в потоке отправки — например, ` sudokuData` как типизированный массив, и вместо его содержимого мы can ArrayBuffer`` ArrayBuffer`` MessagePort`` ` postMessage()` sudokuData`` после вызова будет отображаться как пустой массив.
Применив эту оптимизацию в серверном и рабочем коде, мы можем практически полностью избавиться от копирования данных! Хотя эта оптимизация может не сэкономить много времени при отправке 81 байта туда и обратно для головоломки судоку, для более крупных объектов это становится весьма заметным.
Еще один трюк, который позволяет достичь аналогичного результата, заключается в том, чтобы фактически использовать память, а не перемещать ее между потоками; для этого мы будем использовать код, подобный следующему:
const sudokuDataShared = new Uint8Array(new SharedBuffer(sudokuData.length)); // Copy into the new Uint8Array sudokuDataShared.set(sudokuData); worker.postMessage(sudokuDataShared);
Таким образом, оба потока смогут одновременно обращаться к одному и тому же типизированному массиву — это очень мощная функция, хотя получить право на параллельный доступ к данным может быть непросто.
Первоначально опубликовано на https://www.nearform.com 12 ноября 2019 г.