Реактивные потоки - отличный инструмент. Они позволяют нам легко писать высокопроизводительный и асинхронный код. Они помогают нам сосредоточиться на том, что делать с данными, а не на том, как это делать. И их легко понять. Ну сначала.

Большинство из нас начинают с представления потоков как серии значений, излучаемых с течением времени. Этой простой модели достаточно, чтобы подписаться на потоки и применить к ним базовые операторы, такие как map. Но как только нам нужно добавить более сложные операторы или если что-то не работает должным образом, идея потоков создания ценности перестает быть полезной.

Итак, позвольте мне предложить другую ментальную модель, которая больше фокусируется на том, как работают потоки: потоки - это просто функции.

Небольшое примечание: я использую JavaScript в своих примерах и стараюсь сделать его как можно более простым, чтобы его было легко понять даже разработчикам без навыков JavaScript.

Источник

Когда вы применяете оператор к наблюдаемому, вы создаете новый наблюдаемый объект с исходным в качестве родительского. Источник - это наблюдаемое, у которого нет родителя. Он только излучает ценности.

function source(subscriber) {
  subscriber(1);
  subscriber(2);
}

Функция источника ожидает обратного вызова (подписчик) и выдает значения, вызывая подписчика. Вместо выдачи значений «вручную» мы могли бы также перебирать массив. Давайте подпишемся на нашу «наблюдаемую».

function subscriber(value) {
  console.log(value);
}
source(subscriber);
//Prints 1 and 2 in the console

Вы можете возразить, что это не позволяет использовать асинхронные потоки. Вот так:

function source(subscriber) {
  setTimeout(function() {
    subscriber(1);
  }, 1000);
  setTimeout(function() {
    subscriber(2);
  }, 3000);
}
source(subscriber);
//Prints 1 after 1 second and 2 after 3 seconds

Функция setTimeout вызывает подписчика по истечении заданного периода времени. Если вы попробуете это сделать, вы заметите, что значения выдаются через несколько секунд.

Оператор

Оператор - это простая функция, которая принимает значение и вызывает подписчика.

function operator(value, subscriber) {
  let newValue = "Value: " + value;
  subscriber(newValue);
}

Секретное оружие оператора в том, что он не возвращает новое значение, а вместо этого вызывает абонента. Это делает его чрезвычайно мощным. Мы можем решить, когда будет отправлено значение (например, debounce), сколько значений будет отправлено (например, flatMap) или будет ли вообще выбрано какое-либо значение (например, filter).

Клей

Конечно, мы не хотим вызывать оператор напрямую, а применяем его к другому наблюдаемому. Итак, давайте создадим функцию канала:

function pipe(source, operator) {
  function newSource(subscriber) {
    function operatorSubscriber(value) {
      operator(value, subscriber);
    }
    return source(operatorSubscriber);
  }
  return newSource;
}

Это выглядит немного сложнее из-за этих вложенных функций. Давайте пройдемся по ним. Начнем с самого сокровенного оператора Subscriber. Источник принимает только подписчиков, поэтому мы переносим оператор в другую функцию. Источник может вызвать оператора operatorSubscriber, который вызывает оператора, который, в свою очередь, может вызвать фактического абонента.

Когда мы применяем оператор к потоку, мы создаем новый поток. Таким образом, функция pipe должна возвращать единицу. Я назвал его operatorSource (хотя на самом деле это не источник). Когда кто-то «подписывается» на новый источник, этот запрос передается исходному источнику путем вызова функции источника.

Наш рабочий процесс работает так:

  • Абонент вызывает operatorSource (который обертывает оператора)
  • OperatorSource вызывает источник
  • Источник звонит оператору Абоненту повторно
  • Оператор Абонент звонит абоненту повторно

По сути, это шаблон запроса / ответа.

Подписка

Мы уже успешно подписались на поток, но мы также хотим отменить подписку. Давайте добавим функцию отказа от подписки:

function source(subscriber) {
  subscriber(1);
  let id1 = setTimeout(function() {
             subscriber(2);
            }, 1000);
  let id2 = setTimeout(function() {
              subscriber(3);
            }, 3000);
  function unsubscribe() {
    clearTimeout(id1);
    clearTimeout(id2);
  }
  return unsubscribe;
}
let unsubscribe = source(subscriber);
unsubscribe();

ClearTimeout отменяет setTimeout. Мы вызываем отказ от подписки сразу после подписки, поэтому мы не получаем асинхронные значения. Но мы получаем первое значение, потому что оно отправляется до того, как мы сможем отказаться от подписки.

Вывод

Для полноценной реализации требуется немного больше, чем мы рассмотрели, но у нас есть некоторые базовые строительные блоки для потоков. И мы узнали, что поток - это просто серия вызовов функций. На самом деле, две серии: от подписчика к источнику, когда мы подписываемся, и от источника к подписчику, когда отправляются значения.

И теперь мы также знаем, почему нам нужно активно подписываться на поток: нам нужно вызвать функцию источника.

В следующий раз, когда поток ведет себя не так, как ожидалось, попробуйте выяснить, какая функция была вызвана, а не вызвана и почему.