RxJS и реактивное программирование в JavaScript

В Javascript широко распространено асинхронное программирование (async/await). Асинхронность, с одной стороны, позволяет решать какие-то задачи отдельно от основного потока, а с другой — требует хорошего понимания процессов и контроля над ними для эффективного управления ресурсами. Одновременное выполнение сложных задач может привести к ухудшению производительности приложения. Основная проблема заключается в том, что асинхронным программированием сложно управлять, но здесь на помощь можно призвать RxJS.

Проблемы, которые решает RxJS

Операции ввода-вывода выполняются намного медленнее, чем обычные вычисления и манипуляции с данными.

Работа Javascript с внешними интерфейсами может приводить к большому количеству соединений веб-браузера с сервером и множеству обратных вызовов для обработки ответов. Этот путь фактически идет вразрез с практиками программирования, потому что не известно, сколько времени продлится процесс, нет возможности его полностью контролировать, но нужно реагировать на его завершение. В простых приложениях можно спокойно обходиться обратными вызовами (callback).

Promises в ES6 Javascript

makeHttpCall('/items',
   items => {
     for (itemId of items) {
       makeHttpCall(`/items/${itemId}/info`,
         itemInfo => {
           makeHttpCall(`/items/${itemInfo.pic}`,
             img => {
               showImg(img);
           });
        });
      }
});
beginUiRendering();

В этом коде на первый взгляд всё не так плохо, но если внутри цикла for будет больше действий, он станет трудно читаемым и поддерживать его (что-то изменять, добавлять, убирать) станет сложнее.

В какой-то мере облегчить ситуацию могут Promises. Они помогают обеспечить плавный способ последовательной реакции на изменения и обрабатывать их с помощью обратного вызова then(). Такой код будет выглядеть лаконичнее и проще для понимания:

makeHttpCall('/items')
   .then(itemId => makeHttpCall(`/items/${itemId}/info`))
   .then(itemInfo => makeHttpCall(`/items/${itemInfo}.pic}`))
   .then(showImg);

Но вернёмся к RxJS, что это?

RxJS — это библиотека Javascript для решения асинхронных задач, она расширяет возможности библиотеки Reactive Extension, в ней оптимизировано применение шаблона Observer (наблюдатель) для функционального программирования. Observer — это шаблон, в котором субъект хранит список своих зависимостей-наблюдателей и автоматически уведомляет их о любых изменениях состояний. После знакомства с этим, можно спокойно попрощаться с обратными вызовами.

Потоки в RxJS

Поток (Stream) — это серия событий, происходящих за определенный период времени. Поток может использоваться для обработки любого типа событий: клики и скролл мыши, ввод с клавиатуры, обработка данных и т.д. Если представить поток, как переменную, она будет реагировать на любые изменения ее значений.

Переменная и поток — одинаково динамические по сути, но у них есть некоторые отличия. Рассмотрим простой пример:

var a = 2;
var b = 4;
var c = a + b;
console.log(c); //-> 6
a = 10;  // переопределим a
console.log(c); //-> по-прежнему 6

Несмотря на изменение значения переменной a = 10, зависимая от неё переменная c останется с прежним значением. Это главное отличие от потоков, в которых наоборот можно отслеживать изменения в зависимостях и реагировать на них. Т.е. для потоков это будет работать так:

var A$ = 2;
var B$ = 4;
var C$ = A$ + B$;
console.log(C$); //-> 6
A$ = 10;
console.log(C$); //->  16

В приведенном выше примере переопределяется поведение динамических переменных. Значение переменной C$ результат операции сложения A$ и B$, но если изменить значение переменной A$, переменная C$ немедленно получает новый результат от сложения: 16. Этот пример очень простой, основная идея здесь пояснить, как в потоках событий изменяются значения переменных.

Observable в RxJS

Объект Observable (наблюдаемый), наверное, самая важная часть в RxJS. Он используется для обработки цепочек событий, например, манипуляции с мышью (клики, курсор, скролл) или клавиатурой, перебор значений (числа, строки, объекты или массивы), которые помогают управлять этапами прохождения событий. Примитивной формой наблюдаемого объекта может быть одиночная переменная, например:

var streamA$ = Rx.Observable.of(2);

Вот так будет выглядеть приведённый ранее пример с реальными командами RxJS и его API:

const streamA$ = Rx.Observable.of(2);
const streamB$ = Rx.Observable.of(4);
const streamC$ = Rx.Observable.concat(streamA$, streamB$)
     .reduce((x, y) => x  + y);
streamC$.subscribe(console.log); // будет 6

Конечный результат вернёт значение 6. Однажды объявленная переменная потока streamA$ не может поменять своё значение (как в примере ранее). Поэтому нужно объявить новую переменную для потока. Поскольку переменная потока является неизменяемым типом данных, используем ES6 const:

const streamA$ = Rx.Observable.of(2, 10)
...
streamC$.subscribe(console.log); // будет 16

Теперь, подписавшись на streamC$, получим ожидаемое значение 16. Как упоминалось ранее, поток — это серия событий в течение определенного периода времени.

Методы Observable

Вот некоторые методы:

of(arg)
Преобразует любое переданное ему значение или значения, разделённые запятой, в объект Observable
from(iterable)
Преобразует итерируемые значения в объект Observable
fromPromise(promise)
Преобразует Promise в объект Observable
fromEvent(element, eventName)
Создаёт объект Observable, добавляет наблюдателя элементу и слушает события, например, DOM-events

Программирование потоков иначе обрабатывает возникновение события, Observable реализует «ленивое поведение», то есть вызов или подписка будет изолированной операцией: каждый вызов функций вызывает отдельный процесс, а каждая Observable-подписка запускает свой отдельный процесс.

Observer

Observer (наблюдатель) является получателем данных, он обрабатывает и реагирует на значения, передаваемые из Observable. API для Observer простой, функция next() используется для каждой итерации в последовательности. Она вызывается, когда Observable передаёт событие.

В предыдущем примере streamC$.subscribe(console.log) — это упрощенная версия, которая фактически использует базовую концепцию Observer. Итак, как создать Observer:

const observer = Rx.Observer.create(
  function next(val) {
    console.log(val);
  },
  function error(err) {
    ; // выполнится при возникновении ошибки
  },
  function complete() {
    ; // выполнится при завершении события
  }
);

У Observer есть API обработки ошибок, который будет реагировать на возникновение ошибки во время выполнения. Все методы в наблюдателе необязательные и доступны для подписки. В методе next() следует поместить соответствующее для обработки значений поведение.

Итоги

Выше рассмотрена базовая концепция реактивного программирования: stream, observable и observer. Реактивное программирование предназначено главным образом для реализации асинхронного приложения на основе событий посредством наблюдаемых последовательностей. Эта концепция на самом деле похожа на приложение, с которым большинство знакомо — Microsoft Excel. Например, ячейка для вычисления суммы реагирует на изменение значений в любой ячейке, от которой зависит результат.