С операторами мы можем делать больше с помощью наблюдаемых RxJS. Они полезны для выполнения сложных операций с асинхронным кодом.

Операторы - это функции. Есть 2 типа операторов:

  • Конвейерные операторы - их можно передать по конвейеру в Observables с помощью метода pipe, доступного в Observables. Они возвращают новый Observable, поэтому несколько конвейерных операторов могут быть объединены в цепочку.
  • Операторы создания - это отдельные функции для создания нового Observable. Например, of(1,2,3) выдаст 1, 2 и 3.

Использование операторов

Мы можем импортировать операторы из RxJS, чтобы использовать их для управления нашими наблюдаемыми результатами.

Например, мы можем отображать значения из Observables в другой, используя функцию map:

import { of } from "rxjs";
import { map } from "rxjs/operators";
map(x => x * 2)(of(1, 2, 3)).subscribe(val => console.log(val));

Мы создали Observable, который испускает 1, 2 и 3 с помощью функции of, а затем вызвали map с обратным вызовом, чтобы мы могли сопоставить значения, которая возвращает функцию, в которую мы можем передать Observable, который мы создали, и вернуть новый Observable.

map(x => x * 2)(of(1, 2, 3))

возвращает новый Observable, на который мы можем подписаться. Затем мы получаем значения 2, 4 и 6 в функции обратного вызова subscribe.

Трубопровод

Мы можем использовать pipe метод Observable для объединения нескольких операций в одну.

В качестве аргументов он принимает несколько операций, что намного проще, чем их вложение.

Например, мы можем использовать его, чтобы переписать приведенный выше пример:

import { of, pipe } from "rxjs";
import { map } from "rxjs/operators";
of(1, 2, 3)
  .pipe(map(x => x * 2))
  .subscribe(val => console.log(val));

Мы получаем тот же результат, но намного чище.

Кроме того, мы можем передать более одной операции методу pipe:

of(1, 2, 3)
  .pipe(
    map(x => x * 2),
    map(x => x * 3)
  )
  .subscribe(val => console.log(val));

Затем мы сначала умножаем каждое значение, испускаемое Observable, на 2, затем снова умножаем возвращаемое значение на 3. В результате мы возвращаем 6, 12 и 18 в console.log.

Операторы создания

Операторы создания - это функции для создания Observable с нуля или объединения других Observable вместе.

Например, у нас есть функция interval для выдачи значения от 0 и выше в указанном нами интервале:

import { interval } from "rxjs";
interval(5000).subscribe(val => console.log(val));

Приведенный выше код будет выводить целое число от 0 и выше каждые 5 секунд.

Наблюдаемые высшего порядка

Наблюдаемые более высокого порядка - это наблюдаемые или наблюдаемые. Есть несколько вещей, которые мы можем сделать с разными наблюдаемыми.

RxJS имеет оператор concatAll(), который подписывается на каждую внутреннюю наблюдаемую и копирует все испущенные значения до завершения внешней наблюдаемой.

Например, мы можем использовать его следующим образом:

import { of, pipe } from "rxjs";
import { concatAll, map } from "rxjs/operators";
of(1, 2, 3)
  .pipe(
    map(outerVal => {
      console.log(`outerVal ${outerVal}`);
      return of(4, 5, 6);
    }),
    concatAll()
  )
  .subscribe(innerVal => console.log(`innerVal ${innerVal}`));

У нас должен получиться результат:

outerVal 1
innerVal 4
innerVal 5
innerVal 6
outerVal 2
innerVal 4
innerVal 5
innerVal 6
outerVal 3
innerVal 4
innerVal 5
innerVal 6

Как мы видим, мы сначала получаем первое значение из of(1,2,3) Observable, а затем все значения из второго. Затем мы получаем второе значение из of(1,2,3), а затем все значения из второго и так далее.

Другие функции оператора включают:

  • mergeAll() - подписывается на каждый внутренний Observable по мере его поступления и выдает каждое значение по мере его поступления.
  • switchAll() - подписывается на первый внутренний Observable, когда он поступает, а затем выдает каждое значение по мере его поступления. Он отписывается от предыдущей, а затем подписывается на новую.
  • exhaust() - подписывается на первый внутренний Observable, когда он прибывает, затем излучает каждое значение по мере его поступления, отбрасывая все вновь прибывающие Observable по мере его завершения и ожидая следующего внутреннего Observable.

Все они дают тот же результат, что и concatAll(), но разница ниже.

Мы можем создавать новые Observables, составляя несколько Observables или операторов.

Например:

of(1, 2, 3)
  .pipe(
    map(x => x * 2),
    map(x => x * 3)
  )

- это Observable, который умножает каждое значение из Observableof(1, 2, 3) на 6.

В RxJS есть встроенные операторы, которые мы можем использовать для создания и управления наблюдаемыми значениями. Кроме того, мы можем создавать новые Observables и комбинировать их по-разному.

Observables также могут быть вложенными, и значения всех вложенных Observables могут быть получены с помощью concatAll(), mergeAll(), switchAll() или exhaust().