В Javascript широко распространено асинхронное программирование (async/await). Асинхронность, с одной стороны, позволяет решать какие-то задачи отдельно от основного потока, а с другой – требует хорошего понимания процессов и контроля над ними для эффективного управления ресурсами. Одновременное выполнение сложных задач может привести к ухудшению производительности приложения. Основная проблема заключается в том, что асинхронным программированием сложно управлять, но здесь на помощь можно призвать RxJS.
Проблемы, которые решает RxJS
Операции ввода-вывода выполняются намного медленнее, чем обычные вычисления и манипуляции с данными.
Работа Javascript с внешними интерфейсами может приводить к большому количеству соединений веб-браузера с сервером и множеству обратных вызовов для обработки ответов. Этот путь фактически идет вразрез с практиками программирования, потому что не известно, сколько времени продлится процесс, нет возможности его полностью контролировать, но нужно реагировать на его завершение. В простых приложениях можно спокойно обходиться обратными вызовами (callback).
Promises в ES6 Javascript
makeHttpCall('/items',
items => {
for (itemId of items) {
makeHttpCall(`/items/${itemId}/info`,
itemInfo => {
makeHttpCall(`/items/${itemInfo.pic}`,
img => {
showImg(img);
});
});
}
});
beginUiRendering();
В этом коде на первый взгляд всё не так плохо, но если внутри цикла for
будет больше действий, он станет трудно читаемым и поддерживать его (что-то изменять, добавлять, убирать) станет сложнее.
В какой-то мере облегчить ситуацию могут Promises. Они помогают обеспечить плавный способ последовательной реакции на изменения и обрабатывать их с помощью обратного вызова then()
. Такой код будет выглядеть лаконичнее и проще для понимания:
makeHttpCall('/items')
.then(itemId => makeHttpCall(`/items/${itemId}/info`))
.then(itemInfo => makeHttpCall(`/items/${itemInfo}.pic}`))
.then(showImg);
Но вернёмся к RxJS, что это?
RxJS – это библиотека Javascript для решения асинхронных задач, она расширяет возможности библиотеки Reactive Extension, в ней оптимизировано применение шаблона Observer (наблюдатель) для функционального программирования. Observer – это шаблон, в котором субъект хранит список своих зависимостей-наблюдателей и автоматически уведомляет их о любых изменениях состояний. После знакомства с этим, можно спокойно попрощаться с обратными вызовами.
Потоки в RxJS
Поток (Stream) – это серия событий, происходящих за определенный период времени. Поток может использоваться для обработки любого типа событий: клики и скролл мыши, ввод с клавиатуры, обработка данных и т.д. Если представить поток, как переменную, она будет реагировать на любые изменения ее значений.
Переменная и поток – одинаково динамические по сути, но у них есть некоторые отличия. Рассмотрим простой пример:
var a = 2;
var b = 4;
var c = a + b;
console.log(c); //-> 6
a = 10; // переопределим a
console.log(c); //-> по-прежнему 6
Несмотря на изменение значения переменной a = 10
, зависимая от неё переменная c
останется с прежним значением. Это главное отличие от потоков, в которых наоборот можно отслеживать изменения в зависимостях и реагировать на них. Т.е. для потоков это будет работать так:
var A$ = 2;
var B$ = 4;
var C$ = A$ + B$;
console.log(C$); //-> 6
A$ = 10;
console.log(C$); //-> 16
В приведенном выше примере переопределяется поведение динамических переменных. Значение переменной C$
результат операции сложения A$
и B$
, но если изменить значение переменной A$
, переменная C$
немедленно получает новый результат от сложения: 16
. Этот пример очень простой, основная идея здесь пояснить, как в потоках событий изменяются значения переменных.
Observable в RxJS
Объект Observable (наблюдаемый), наверное, самая важная часть в RxJS. Он используется для обработки цепочек событий, например, манипуляции с мышью (клики, курсор, скролл) или клавиатурой, перебор значений (числа, строки, объекты или массивы), которые помогают управлять этапами прохождения событий. Примитивной формой наблюдаемого объекта может быть одиночная переменная, например:
var streamA$ = Rx.Observable.of(2);
Вот так будет выглядеть приведённый ранее пример с реальными командами RxJS и его API:
const streamA$ = Rx.Observable.of(2);
const streamB$ = Rx.Observable.of(4);
const streamC$ = Rx.Observable.concat(streamA$, streamB$)
.reduce((x, y) => x + y);
streamC$.subscribe(console.log); // будет 6
Конечный результат вернёт значение 6
. Однажды объявленная переменная потока streamA$
не может поменять своё значение (как в примере ранее). Поэтому нужно объявить новую переменную для потока. Поскольку переменная потока является неизменяемым типом данных, используем ES6 const
:
const streamA$ = Rx.Observable.of(2, 10)
...
streamC$.subscribe(console.log); // будет 16
Теперь, подписавшись на streamC$
, получим ожидаемое значение 16
. Как упоминалось ранее, поток – это серия событий в течение определенного периода времени.
Методы Observable
Вот некоторые методы:
of(arg)
- Преобразует любое переданное ему значение или значения, разделённые запятой, в объект Observable
from(iterable)
- Преобразует итерируемые значения в объект Observable
fromPromise(promise)
- Преобразует Promise в объект Observable
fromEvent(element, eventName)
- Создаёт объект Observable, добавляет наблюдателя элементу и слушает события, например, DOM-events
Программирование потоков иначе обрабатывает возникновение события, Observable реализует “ленивое поведение”, то есть вызов или подписка будет изолированной операцией: каждый вызов функций вызывает отдельный процесс, а каждая Observable-подписка запускает свой отдельный процесс.
Observer
Observer (наблюдатель) является получателем данных, он обрабатывает и реагирует на значения, передаваемые из Observable. API для Observer простой, функция next()
используется для каждой итерации в последовательности. Она вызывается, когда Observable передаёт событие.
В предыдущем примере streamC$.subscribe(console.log)
– это упрощенная версия, которая фактически использует базовую концепцию Observer. Итак, как создать Observer:
const observer = Rx.Observer.create(
function next(val) {
console.log(val);
},
function error(err) {
; // выполнится при возникновении ошибки
},
function complete() {
; // выполнится при завершении события
}
);
У Observer есть API обработки ошибок, который будет реагировать на возникновение ошибки во время выполнения. Все методы в наблюдателе необязательные и доступны для подписки. В методе next()
следует поместить соответствующее для обработки значений поведение.
Итоги
Выше рассмотрена базовая концепция реактивного программирования: stream
, observable
и observer
. Реактивное программирование предназначено главным образом для реализации асинхронного приложения на основе событий посредством наблюдаемых последовательностей. Эта концепция на самом деле похожа на приложение, с которым большинство знакомо – Microsoft Excel. Например, ячейка для вычисления суммы реагирует на изменение значений в любой ячейке, от которой зависит результат.