По мере того как наблюдаемые объекты становятся все более популярными в JavaScript, мы стремимся выполнять наши повседневные задачи, используя их и оценивая, действительно ли они стоят всей этой шумихи. Одна из задач, которую вы можете выполнять, - это опрос серверной части, чтобы узнать, завершена ли более длительная задача.
Мы рассмотрим пример такого сценария и реализуем решение с использованием RxJS. По пути мы изучим некоторые базовые операторы для RxJS и несколько методов, а также узнаем, как избежать одной или двух ловушек. В конце я представлю реальный пример, чтобы показать вам, как реализовать то, что мы узнали, в конкретном сценарии.
Вы должны иметь базовое понимание Streams / Observables, а также прочную основу в JavaScript, чтобы насладиться этим постом. В оставшейся части этого поста я буду рассматривать Stream и Observable как взаимозаменяемые слова для одного и того же. Хотя мы рассмотрим множество основных вещей, они в основном будут относиться к RxJS и не касаться основ Streams. Если вы ищете общее введение, рассмотрите суть заголовка Введение в реактивное программирование, которое вам не хватало.
Код для этого поста был протестирован с использованием RxJS 6.2.0.
Сценарий
Допустим, у нас есть бэкэнд, который предоставляет конечную точку / tasks / [taskId], которую вы можете запросить, чтобы узнать о статусе конкретной задачи. Он возвращает такой объект:
{
// Whether the task is still running
processing: boolean;
// A unique ID for this task
taskId: string;
}
Как только мы начнем опрос, мы хотим получать текущее состояние этой задачи дважды в секунду и останавливать опрос после обработки === false.
Программное решение
Для начала мы рассмотрим программное решение этой проблемы.
Здесь мы просто вызываем новый тайм-аут каждый раз, когда серверная часть все еще обрабатывает.
Использование RxJS
Теперь мы собираемся добиться того же поведения с помощью RxJS.
Прежде всего нам нужно что-то, чтобы генерировать событие каждые x раз. RxJS предоставляет для этого две функции:
- интервал
- таймер
В то время как interval генерирует первое событие через заданное время, а затем непрерывно с тем же интервалом, timer запускается через заданное время, чтобы генерировать события каждые x время. Для наших двух обновлений в секунду мы можем начать с помощью таймера (0, 500). Это приведет к запуску событий сразу же после этого, а затем дважды в секунду.
Давайте сначала увидим это в действии, записав что-нибудь в консоль.
import { timer } from 'rxjs' timer(0, 500) .subscribe(() => console.log('polling'))
Вы должны увидеть, как ваша консоль печатает «опрос» дважды в секунду.
Позаботьтесь о импорте из правильного пакета (rxjs или rxjs / operator). К сожалению, документация RxJS может не соответствовать используемой вами версии.
Затем мы хотим превратить эти «тики» в запросы к нашему бэкэнду. Мы собираемся использовать ту же самую выборку сверху, но на этот раз превратить обещание в Observable. К счастью, RxJS предоставляет для этого удобные функции, а именно from. Используя это, мы теперь можем создать Observable (или поток), представляющий запрос к бэкэнду на каждом тике, и продолжить работу с ним.
import { timer, from } from 'rxjs'
import { map } from 'rxjs/operators'
timer(0, 500)
.pipe(from(fetch(`/tasks/${taskId}`)).pipe(map(response => response.json())))
.pipe - это способ RxJS указать, что теперь в потоке будет выполняться преобразование. Благодаря извлечению операторов в их собственный импорт, RxJS обеспечивает лучшее древовидное построение, чем когда-либо могло быть в перегруженной реализации Observable, см. Это объяснение для получения дополнительной информации.
Pipe - это оболочка RxJS для преобразований, которые применяются к Stream of Event.
Результатом этого будет поток потоков. Каждое испускаемое значение само будет наблюдаемым. Чтобы справиться с хаосом, мы можем передать его через concatMap, который сведет все потоки в один, содержащий вложенные значения.
import { timer, from } from 'rxjs'
import { map, concatMap } from 'rxjs/operators'
timer(0, 500)
.pipe(concatMap(() => from(fetch(`/tasks/${taskId}`))
.pipe(map(response => response.json())))
)
Завершить опрос
Наконец, мы действительно заботимся о получении события, которое сообщает нам, что серверная часть завершила обработку, что наш опрос завершен. Мы можем добиться этого, отфильтровав события, в которых серверная часть больше не обрабатывает, и взяв только первое из них. Используя take (1), мы указываем, что нам нужно только одно (первое) событие, сообщающее нам, что обработка завершена. Это остановит наш опрос, как только серверная часть закончит обработку задачи.
import { timer, from } from 'rxjs' import { map, concatMap, filter, take } from 'rxjs/operators'
timer(0, 500) .pipe(concatMap(() => from(fetch(`/tasks/${taskId}`)) .pipe(map(response => response.json()))) ) .pipe(filter(backendData => backendData.processing === false)
) .pipe(take(1))
Собираем все вместе
Пришло время собрать все воедино и заменить нашу функцию сверху, используя новый код на основе RxJS. Последний штрих - использовать subscribe в конце нашего Stream для работы с единственным событием, которое генерирует наш Stream.
import { timer, from } from 'rxjs' import { map, concatMap, filter, take } from 'rxjs/operators'
pollUntilTaskFinished(taskId) { timer(0, 500) .pipe(concatMap(() => from(fetch(`/tasks/${taskId}`)) .pipe(map(response => response.json()))) ) .pipe(filter(backendData => backendData.processing === false)
) .pipe(take(1)) .subscribe(() => pollingFinishedFor(taskId)) }
Возможно, вы не захотите вызывать функцию после того, как закончите, но используйте вывод своего Observable для визуализации пользовательского интерфейса. Благодаря использованию слияния, которое объединяет два потока вместе, мы можем сопоставить наш опрос с двумя состояниями и использовать выходные данные непосредственно для нашего пользовательского интерфейса.
Для этого мы объединим наш поток сверху вместе с начальным значением, которое мы преобразуем в Stream, используя of.
import { timer, from, merge, of } from 'rxjs' import { map, concatMap, filter, take } from 'rxjs/operators'
const loadingEmoji = merge( of(true), timer(0, 500) .pipe(concatMap(() => from(fetch(`/tasks/${taskId}`)) .pipe(map(response => response.json()))) ) .pipe(filter(backendData => backendData.processing === false)
) ) .pipe(take(2)).pipe(map(processing => processing ? '⏳' : '✅'));
После того, как мы сопоставим ответ от нашей серверной части с атрибутом обработки с помощью map, мы, в свою очередь, можем сопоставить результат с эмодзи для отображения нашим пользователям.
Пример из реального мира
Теория всегда хороша, но реальный мир обычно представляет собой другую проблему, чем хорошо написанный и содержательный учебник. Позвольте мне представить вам решение проблемы, с которой мы столкнулись при формировании наших знаний об опросах с использованием RxJS.
Ситуация: У нас есть приложение на Angular, для которого мы используем NGXS в качестве менеджера состояний. Подобно Redux, он использует Действия для представления событий, изменяющих состояние.
Как оказалось, NGXS предоставляет поток всех действий, отправленных как Observable, к которым мы можем подключиться. Вот наше окончательное решение для опроса серверной части на предмет состояний обработки для каждого документа, который добавляется к состоянию, и обновления состояния после завершения обработки серверной частью.
Несколько примечаний:
- environment - это среда Angular, обеспечивающая конфигурацию для нашего приложения.
- backend - это служба, обеспечивающая соединение с нашей серверной частью. Он вставляет несколько необходимых заголовков и тому подобное.
- Здесь используется TypeScript, поэтому polledDocument: Document описывает переменную с именем «polledDocument», которая следует за типом «Document».
Сложность здесь в том, что нам нужно создать новый «поток опроса» для каждого документа, добавляемого в наше состояние. Сначала мы попытались вывести логику на один уровень, но это закончилось тем, что мы смогли опрашивать только один документ на каждую загрузку страницы, поскольку take (1) блокирует Stream для всех будущих опросов.
Подведение итогов
Сегодня мы создали нашу первую логику опроса, используя RxJS, попутно изучая эту замечательную библиотеку. Мы также рассмотрели реальный пример и увидели, насколько выразительным он может сделать наш код.
А теперь идите и примените свои новообретенные знания.
Другие полезные ресурсы
Https://blog.strongbrew.io/rxjs-polling/
Https://www.sitepoint.com/angular-rxjs-create-api-service-rest-backend/