Я уже некоторое время использую EventSource для потоковой передачи подписок GraphQL, и это здорово, но есть две причины, по которым это отстой.

  1. Он использует протокол на основе загадочного текста,
  2. Единственный способ сообщить о первоначальном запросе - это URL-адрес.

Второй начал доставлять мне проблемы. Когда мои запросы подписки GraphQL становятся слишком большими, я получаю неожиданные ошибки, которые трудно отлаживать.

Под капотом мы знаем, что EventSource - это просто потоковая выборка с использованием метода GET. Если бы только был способ выполнять потоковую выборку в виде POST, мы могли бы получить все преимущества EventSource с контролем над протоколом и возможностью отправлять запрос в теле запроса. К счастью, есть!

Читаемые потоки

Наряду с fetch(<url>).then(response => response.text()) и fetch(<url>).then(response => response.json()) есть fetch(<url>).then(response => response.body).

Свойство body предоставляет ReadableStream, который может использоваться для обработки ответа выборки по мере получения данных. Мы можем использовать это для генерации событий, которые имеют те же функции, что и EventSource. Вот функция, которую мы будем создавать.

function FetchEventTarget(input, init) {
  const eventTarget = new EventTarget()
  const jsonDecoder = makeJsonDecoder(input)
  const eventStream = makeWriteableEventStream(eventTarget)
  fetch(input, init)
    .then(response => {
      response.body
        .pipeThrough(new TextDecoderStream())
        .pipeThrough(jsonDecoder)
        .pipeTo(eventStream)
      })
    .catch(error => {
      eventTarget.dispatchEvent(
        new CustomEvent('error', { detail: error }))
    })
  return eventTarget
}

В основе этого лежит вызов fetch, из которого мы получаем body. Затем мы используем метод pipeThrough с TextDecoderStream, чтобы преобразовать входящие байты в текст. Затем нам нужно проанализировать текст как JSON и, наконец, сгенерировать события. Нам нужно проделать настоящую работу.

Следующая функция предполагает, что входящие сообщения JSON разделены строками. Он разбивает входящий текст на строки, а затем анализирует строку как JSON.

function makeJsonDecoder() {
  return new TransformStream({
    start(controller) {
      controller.buf = ''
      controller.pos = 0
    },
    transform(chunk, controller) {
      controller.buf += chunk
      while (controller.pos < controller.buf.length) {
        if (controller.buf[controller.pos] == '\n') {
          const line = controller.buf.substring(0, controller.pos)
          controller.enqueue(JSON.parse(line))
          controller.buf = controller.buf.substring(controller.pos + 1)
          controller.pos = 0
        } else {
          ++controller.pos
        }
      }
    }
  })
}

Я не буду вдаваться в алгоритм разбиения текста на строки. Ключевые моменты, на которые следует обратить внимание, - это использование класса TransformStream с предоставленным объектом, который имеет методы start и transform. Мы используем метод start для установки начального состояния, затем обрабатываем текст в строки, наконец, превращаем их в JSON и вызываем метод controller.enqueue для передачи JSON в следующий конвейер в конвейере.

Следующий шаг - превратить поток JSON в события.

function makeWriteableEventStream(eventTarget) {
  return new WritableStream({
    start(controller) {
      eventTarget.dispatchEvent(new Event('start'))
    },
    write(message, controller) {
      eventTarget.dispatchEvent(
        new MessageEvent(
          message.type,
          { data: message.data }
        )
      )
    },
    close(controller) {
      eventTarget.dispatchEvent(new CloseEvent('close'))
    },
    abort(reason) {
      eventTarget.dispatchEvent(new CloseEvent('abort', { reason }))
    }
  })
}

Мы используем класс WriteableStream, предоставляющий объект, реализующий методы отправки событий для методов его жизненного цикла: открытия, закрытия и сообщений с данными.

Теперь мы реализовали все части, необходимые для нашей исходной функции. Мы можем назвать это так:

const eventTarget = new FetchEventTarget(
  'https://example.com/events', {
    method: 'POST',
    headers: new Headers({
      'accept': 'application/json',
      'content-type': 'application/json'
    }),
    mode: 'same-origin',
    signal: abortController.signal,
    body: JSON.stringify({ query: 'Some query' })
  })
eventTarget.addEventListener('an-event-name', event => {
  console.log(event.data)
})

Теперь мы можем увидеть мощь полностью работоспособной потоковой выборки. Мы можем указать метод, заголовки, управлять CORS и передать тело. С поддержкой целевых событий у нас есть взаимодействие, которое очень похоже на подход EventSource.

Если вы хотите попробовать это, есть демонстрация Python.

$ mkdir demo
$ cd demo
demo $ python3.7 -m venv .venv
demo $ source .venv/bin/activate
(.venv) demo $ pip install bareasgi
Successfully installed bareasgi-3.2.0 baretypes-3.0.5 bareutils-3.1.0
(.venv) demo $ pip install uvicorn
Successfully installed click-7.0 h11-0.8.1 httptools-0.0.13 uvicorn-0.9.0 uvloop-0.13.0 websockets-8.0.2
(.venv) demo $ wget https://raw.githubusercontent.com/rob-blackbourn/bareasgi/master/examples/streaming_fetch.html
(.venv) demo $ wget https://raw.githubusercontent.com/rob-blackbourn/bareasgi/master/examples/streaming_fetch.py
(.venv) demo $ python streaming_fetch.py
INFO:uvicorn:Uvicorn running on https://127.0.0.1:9009 (Press CTRL+C to quit)

Перейдите к https://127.0.0.1:9009 и попробуйте.

Стандарты

В стандартах хорошо то, что есть из чего выбирать. На момент написания большинство современных браузеров поддерживают читаемые потоки, но только Chrome поддерживает методы конвейера.