Это третья и последняя часть из серии под названием Демистификация RxJS, в которой мы создаем нашу собственную миниатюрную версию RxJS, чтобы получить глубокое фундаментальное понимание того, как он работает. Если вы прочитали Часть II, вы можете продолжить с того места, на котором остановились. Или вы можете начать здесь, а затем создать ответвление CodeSandbox, содержащего завершенный код для Части II. Или посмотрите этот CodeSandbox, содержащий полную миниатюрную библиотеку.
В последних двух частях этой серии мы создали нашу собственную миниатюрную версию RxJS с наблюдаемыми объектами и операторами. В этой заключительной части мы собираемся немного углубиться и поговорить о планировщиках. Это, так сказать, ближе к металлу в RxJS, чем Observables и операторы. Особенно, если вы в основном используете RxJS через Angular, вы можете получить все необходимое, просто прочитав Часть I и Часть II. Однако, если вам нужно еще больше понять, как работает RxJS, или вы боретесь с чем-то в своем коде, которое, по вашему мнению, связано с точным временем, в которое Observable испускает значение, прочтите на.
Планировщики - это механизм, который дает вам детальный контроль над тем, когда именно генерируются значения из Observable, при этом самому коду наблюдателя не нужно знать детали за это время. Я обнаружил, что это особенно важно, когда вам нужно убедиться, что при разработке API, которые возвращают Observables, вы случайно не инициируете высвобождение демонического порождения ада, предназначенного поставить человечество на колени и организовать гибель жизни в этом Вселенная, какой мы ее знаем.
Позволь мне объяснить.
Рассмотрим этот пример службы API на основе Observable, которая извлекает данные из конечной точки и кэширует эти данные в памяти после их возврата, используя свой URI в качестве ключа:
const cache = new Map<string, any>(); function get<T>(endpoint: string): Observable<T> { if (cache.has(endpoint)) { return of(cache.get(endpoint) as T); } return http.get(endpoint).pipe( tap(data => { cache.set(endpoint, data); }), shareReplay(1), // So we only ever set to the cache once ); }
Подобный сервис может быть полезен, если вы разрабатываете аналитическое приложение, в котором клиент может внести несколько изменений на страницу, которые теоретически вызовут несколько запросов API, но возвращаемые данные вряд ли изменятся в течение одного сеанса.
Теперь предположим, что мы используем API так:
function fetch() { console.log('[fetch] Before subscribe'); get('/data').subscribe(data => console.log('Response:', data)); console.log('[fetch] After subscribe'); } console.log('Initiating first fetch'); fetch(); // Then, sometime later... console.log('Initiating second fetch'); fetch();
Вот что код выдаст при первом использовании:
Initiating first fetch [fetch] Before subscribe [fetch] After subscribe [fetch] Response: ...
Но как насчет второго? Вы бы подумали, что это будет то же самое, что и первый, не так ли? Увы:
Initiating second fetch [fetch] Before subscribe [fetch] Response: ... [fetch] After subscribe
Во второй подписке подписка запускает промежуточные журналы консоли до и после, тогда как для первой подписки она запускается после двух журналов.
Это имеет смысл, если мы вспомним, как мы реализовали of()
в Части I. Обратите внимание, как, если у нас нет ответа в нашем кеше, мы вызываем from(fetch(...))
. fetch
возвращает обещание, которое всегда оценивается асинхронно после завершения текущего выполняющегося стека вызовов. Однако для of()
, когда мы подписываемся, мы просто перебираем все заданные значения и вызываем next()
, передавая каждое из этих значений синхронно. Таким образом, в то время как from
, когда ему дано обещание, всегда будет выдавать значение из обещания асинхронно, of
всегда будет выдавать свои значения синхронно.
На мой взгляд, это серьезная проблема с точки зрения согласованности API. Если вы читали статью Исаака Шлютера Проектирование API-интерфейсов для асинхронности (а если нет, и вы разрабатываете JS-API, я настоятельно рекомендую вам это сделать), то вы можете вспомнить один из наиболее важных - если не самые важные - отрывки из этой статьи:
Если у вас есть API, который принимает обратный вызов,
и иногда этот обратный вызов вызывается немедленно,
и в других случаях этот обратный вызов вызывается в какой-то момент в будущем,
тогда вы будете визуализировать любой код, использующий этот API, о котором невозможно рассуждать, и вызовете выпуск Zalgo.
К сожалению, учитывая то, как сейчас работают наши Observables, наша логика подписки иногда вызывается немедленно, а иногда нет. Поэтому он выпускает Zalgo. И я думаю, мы все можем согласиться с тем, что это не идеальная ситуация.
Итак, как мы можем это предотвратить? Один из способов - всегда использовать from
и заключать наши кешированные ответы в обещание. Ничего страшного, но довольно неуклюже. Что нам действительно нужно, так это способ дать нашему Observable
классу лучший контроль над тем, когда запланированы выбросы данных. Следовательно, возникает потребность в планировщиках.
Чтобы продолжить создание нашей собственной миниатюрной версии RxJS, давайте сами реализуем базовую версию планировщиков. В него войдут:
- Два планировщика, один из которых генерирует значения немедленно, а другой - асинхронно с использованием
setTimeout
. - Поддержка планировщиков в нашем классе
Observable
- Оператор
observeOn
, подобный найденному в RxJS, который создает новый Observable, эквивалентный наблюдаемому исходному объекту, но с использованием указанного планировщика.
Построив это, мы можем увидеть, как планировщики позволяют нам легко контролировать время выбросов от наблюдаемых объектов, и, следовательно, могут помочь нам исправить проблему несогласованности с нашим кодом API выше (и, следовательно, сохранить Zalgo там, где он принадлежит: далеко-далеко от нашего Вселенная).
Интерфейс и экземпляры планировщика
Давайте начнем с создания Scheduler
интерфейса, похожего на RxJS, но - как и большинство того, что мы написали в этой серии - намного проще. Введите следующее чуть выше определения класса Observable
:
В нашем интерфейсе планировщика будет просто одна функция, schedule()
, которая принимает work
обратный вызов, который будет выполняться по усмотрению планировщика.
Теперь давайте создадим два конкретных экземпляра этого интерфейса: syncScheduler
, который просто выполняет work
немедленно, и asyncScheduler
, который откладывает выполнение работы с помощью setTimeout()
вызова. Введите следующее прямо под определением интерфейса планировщика:
Если вы не понимаете, как setTimeout вызывает отсрочку выполнения кода JavaScript, я настоятельно рекомендую прочитать Статью Джона Ресига о таймерах JS, которая выдержала испытание временем как одна из лучших статей о том, как Функции времени JavaScript работают.
Наблюдаемые модификации
Теперь, когда у нас есть планировщики, нам нужно будет использовать их в наших Observables. Измените конструктор Observable, а также метод subscribe()
, чтобы он выглядел следующим образом:
Обратите внимание, как в этом коде, когда мы создаем нашего наблюдателя внутри subscribe, мы создаем прокси-наблюдателя, который вызывает this.scheduler.schedule
в своих интерфейсных методах, делегируя фактические обратные вызовы внутри этого метода. Это разделение позволяет создать дополнительный промежуточный уровень, на котором наблюдатель по-прежнему отвечает за сигнализацию событий, касающихся асинхронного потока данных, но scheduler
отвечает за определение именно того, когда эти события прибывают к слушателям.
Теперь мы можем начать использовать наши разные планировщики в наших Observables, но для этого нам потребуется всегда использовать конструктор Observable, что немного неудобно. Вместо этого давайте создадим более приятный API, но добавим оператор ObservOn (), который мы можем использовать в качестве аргумента для .pipe (), чтобы заставить результирующую наблюдаемую функцию использовать правильный планировщик. Введите следующее в самом низу файла кода:
Мы можем доказать, что это работает, как и для наших операторов в Части II, написав простую тестовую программу:
После этого мы видим, что наблюдаемые значения всегда выводятся после just after subscribe
журнала консоли.
Ключевым выводом здесь является то, как планировщики могут использоваться для обеспечения детального контроля над выполнением наблюдаемой логики подписки. Это можно использовать в ситуациях, когда необходимо точно указать, как должно произойти событие. В случае нашего примера кода API RxJS мы можем использовать планировщики, чтобы гарантировать, что время подписки API всегда согласовано, передав asapScheduler - планировщик, используемый для Observables, который имеет дело с обещаниями, в качестве последнего аргумента для of()
:
return of(cache.get(endpoint) as T, asapScheduler)
Заключение
К настоящему времени вы построили почти все, что составляет основу RxJS. Я надеюсь, что, сделав это, вы развили фундаментальное понимание того, как работают все примитивные механизмы в RxJS. Я искренне надеюсь, что у вас не должно возникнуть проблем с эффективным использованием библиотеки.
Для дальнейшего изучения я предлагаю ознакомиться с https://rxjs.dev/guide/subject и руководством разработчиков по планировщикам. Кроме того, если вам понравилось работать над этой версией RxJS и вы хотите работать над настоящей версией, я уверен, что они с радостью примут участие :)