Ржавчина + TypeScript

Уже некоторое время назад я написал небольшой отчет о том, как использовать Rust в полнофункциональном веб-приложении (см. здесь). Этот пост предназначен для продолжения этого, предлагая альтернативный подход к тому, как включить Rust в приложение.

В целом можно сказать:

Самые очевидные преимущества Rust — это скорость решения ресурсоемких задач и очень эффективное обращение с памятью. Последний поставляется без необходимости в каком-либо сборщике мусора.

Какими бы прекрасными ни были эти функции, у них есть небольшой недостаток — они требуют от нас придерживаться очень строгой модели владения. Однако не всегда это можно считать недостатком, поскольку использование модели владения дает очень стабильный и удобный в сопровождении код.

Однако бывают случаи, когда скорость разработки является важным фактором в проекте. Тем более тогда желательно разбить приложение на несколько частей и использовать наиболее подходящий язык для каждой из этих частей. Одним из решений для реализации этой концепции является использование микросервисной архитектуры, и именно это мы и рассмотрим в этой статье.

Необходимыми условиями для понимания содержания этой статьи являются базовые знания Rust и TypeScript. Оба могут быть получены здесь соответственно. "здесь".

Общая цель

Чтобы получить простой пример, мы реализуем три микросервиса:

  1. main-server: Предоставляет общедоступный API и размещает небольшой клиент на основе Vue. Язык — TypeScript, и мы будем использовать очень популярный фреймворк NestJS.
  2. calc-engine: Это сервер Rust, который предоставляет методы для некоторых вычислений, интенсивно использующих процессор.
  3. rabbitmq: Это считается брокером сообщений между вышеупомянутыми службами и поддерживается RabbitMQ.

Наконец, все это будет развернуто в файле docker-compose, чтобы им было легко делиться.

В дальнейшем я ограничу описания только наиболее важными частями реализации, чтобы обеспечить как можно более приятное чтение. Весь код можно посмотреть в этом репозитории, который предназначен только для образовательных целей.

Calc-движок (Rust)

Эта часть представляет собой типичное бинарное приложение на основе грузов. Зависимости следующие:

amiquip = { version = "0.4.2", default-features = false }
serde_json = { version = "1.0.81" }
rayon = { version = "1.5.3" }
num_cpus = { version = "1.13.1" }
dotenv = { version = "0.15.0" }

Важным является amiquip, который обеспечивает связь с RabbitMQ. Крейт rayon используется для того, чтобы приложение использовало как можно больше параллельных вычислений. В частности, мы настраиваем пул потоков следующим образом:

fn setup_pool() -> rayon::ThreadPool {
    rayon::ThreadPoolBuilder::new()
        .num_threads(num_cpus::get()) 
        .build()
        .unwrap()
}

Для задач, интенсивно использующих ЦП, обычно имеет смысл не запускать параллельно больше, чем доступно ЦП.

Подключение к RabbitMQ получается так:

fn setup_connection() -> Connection {
    if let Ok(c) = Connection::insecure_open(&format!(
        "amqp://{}:{}@{}:{}",
        env::var("RABBITMQ_USER").unwrap(),
        env::var("RABBITMQ_PWD").unwrap(),
        env::var("RABBITMQ_HOST").unwrap(),
        env::var("RABBITMQ_PORT").unwrap()
    )) {
        println!("Connected to rabbitmq!");
        c
    } else {
        println!("Failed to connect to rabbitmq. Will retry in 2s.");
        std::thread::sleep(std::time::Duration::from_secs(2));
        setup_connection()
    }
}

Помимо традиционного подхода к выбору определенных значений из файла .env, мы справляемся со сбоями соединения, повторяя каждый 2s. Этот подход особенно подходит для настройки микросервисов, поскольку каждый вовлеченный сервис должен иметь возможность восстанавливаться после временного сбоя других сервисов.

В качестве стиля связи между сервисами мы будем использовать RPC (удаленный-процедурный-вызов). Не всегда это лучший вариант, но здесь этого достаточно. Нашей тяжелой задачей процессора будет вычисление шагов решения Ханойской башни. Это объяснит наименование в дальнейшем.

Первое, что мы делаем здесь, это обеспечиваем существование очереди с именем hanoi в RabbitMQ. Это делается путем открытия channel поверх connection, а затем простым объявлением queue.

Этот queue используется для создания/потребления сообщений во всех сервисах. Мы можем прослушать сообщение, передаваемое на queue, позвонив по телефону queue.receiver(). Это работает аналогично прослушиванию внутренних каналов Rust и гарантирует, что код внутри блока выполняется сообщение за сообщением.

Каждый новый message должен сопоставляться с его типом, который в принципе является либо ConsumerMessage::Delivery, либо некоторой ошибкой соединения, которую мы рассматриваем совместно в other. Помимо body в сообщении есть еще два поля, то есть reply_to и correlation_id. body представляет фактическое содержимое (здесь количество элементов в игре Hanoi), reply_to — «эксклюзивную» очередь, которая используется для отправки результата, а correlation_id используется для сопоставления запроса с ответом.

Таким образом, клиент, отправляющий сообщение в очередь hanoi, прикрепляет уникальный correlation_id, который позволяет ему позже выбрать результат, записанный сервером в очередь reply_to. Чтобы обрабатывать как можно больше CPU-задач параллельно, мы просто помещаем текущий запрос в замыкание на пул потоков. Каждое такое замыкание содержит логику для отправки результата обратно через очередь reply_to.

Отправка выполняется с использованием экземпляра Exchange::direct для канала channel_for_msg. other лечится повторной попыткой установить эту конструкцию прослушивателя каждые 2s. Опять же, это делает наш сервис невосприимчивым к сбоям подключения или перезапускам RabbitMQ.

Основной сервер (TypeScript)

Прежде всего, главный сервер предлагает общедоступный API, который позволяет клиентам запрашивать шаги решения для игры в Ханое:

@Get('/hanoi')
getHello(@Query('n', ParseIntPipe) n: number): Promise<string> {
  ...
  return this.appService.makeHanoi(n);
}

Если вы никогда не знакомились с NestJS, не проблема, это самый простой для изучения веб-фреймворк, который знакомит с его основными понятиями в этом удобном для чтения обзоре.

Как видите, конечная точка делегирует метод makeHanoi, который, в конце концов, заканчивается методом MessageService.sendMessage. Этот сервис содержит логику подключения к RabbitMQ, но теперь со стороны главного сервера:

async sendMessage(n: number): Promise<string> {
        const channel = await this.ensureChannel();
        const replyTo = await this.ensureResponseQueue(channel);
        const correlationId = this.generateUuid();
        channel.sendToQueue(
            this.HANOI_QUEUE,
            Buffer.from(`${n}`),
            {
                correlationId,
                replyTo
            });
        return lastValueFrom(
            this.queueResponse.pipe(
                filter(
                  m => m?.properties.correlationId === correlationId
                ),
                first(),
                map(m => m.content.toString())
            )
        );
}

Здесь вы можете узнать все предыдущие заданные параметры сообщения, то есть replyTo и correlationId. Поскольку мы находимся на стороне производителя, эти значения создаются на этой стороне и присоединяются к сообщению, которое отправляется в очередь hanoi.

После отправки в очередь hanoi мы регистрируем одноразовый прослушиватель в очереди, которая используется как replyTo. Более подробно, эта более поздняя очередь используется внутри для передачи значений в следующие BehaviorSubject:

private queueResponse = new BehaviorSubject<Message>(null);

Как вы видите, приведенный выше код регистрирует подписку на первом совпадении correlationId и преобразует его в Promise с помощью оператора lastValueFrom.

Если вы никогда не видели подобных вещей, это основано на популярной библиотеке реактивности rxjs.

Не вдаваясь во все подробности, давайте кратко рассмотрим реализацию ensureChannel.

На самом деле основная часть этого кода проста: connect(...) устанавливает connection для RabbitMQ, connection.createChannel использует соединение для создания channel, которое подтверждает существование очереди hanoi.

Весь этот беспорядок вокруг этого метода необходим для того, чтобы этот метод работал правильно в асинхронном контексте, в котором он выполняется. Установленный channel сохраняется в локальном поле, чтобы не создавать его заново каждый раз при отправке сообщения.

Клиент

Кроме того, на основном сервере размещено небольшое клиентское приложение, поддерживаемое Vue.js. Если вас интересует его реализация, вы можете найти весь код в вышеупомянутом репозитории. Его цель — предоставить следующий простой пользовательский интерфейс:

Докер-составить:

Все вышеописанные компоненты собираются и собираются внутри контейнера docker-compose. Ниже приводится содержание соответствующего docker-compose.yml:

version: "3"
services:
  main-server:
    build: ./main-server
    env_file: .env
    environment:
      - RABBITMQ_HOST=rabbitmq
    ports:
      - "3000:3000"
calc-engine:
    build: ./calc-engine
    env_file: .env
    environment:
      - RABBITMQ_HOST=rabbitmq
rabbitmq:
    image: rabbitmq:3-management

Если вы клонировали репозиторий и установили докер в своей системе, вы можете запустить код, выполнив следующую команду из терминала, расположенного в корневой папке репозитория:

> docker-compose up

После этого вы можете указать в своем браузере https://localhost:3000, чтобы «наслаждаться» вышеуказанным пользовательским интерфейсом.

Заключение

Это не единственный подход к разделению отдельных частей приложения на несколько единиц компиляции. Очень перспективным в контексте Node.js является использование NAPI-RS.

Это позволяет избежать накладных расходов на сообщения, и вы можете прочитать об этом подробнее здесь. С другой стороны, помещая различные части в автономный сервер (микросервер?), вы получаете возможность дифференцированной масштабируемости. Таким образом, каждая часть может быть увеличена или выполнена в зависимости от ее потребностей при работе в кластере.

Спасибо за прочтение!