Rxjs window-операторы преобразования

Rxjs — это библиотека для реактивного программирования. Операторы создания полезны для генерации данных из различных источников данных, на которые подписчики могут подписаться.

Рассмотрим операторы windowCount, windowTime, windowToggle и windowWhen.

windowCount

Оператор windowCount разветвляет исходные Observable-значения, как вложенные Observable, каждый из которых генерирует не более windowSize событий.

Оператор принимает до 2-х аргументов. Первый аргумент — это windowSize, максимальное количество значений, которое будет отправлено каждым окном.

Второй аргумент не обязательный. Это число startWindowEvery, которое по умолчанию равно 0. Это интервал gthtl стартом нового окна. Интервал измеряется количеством элементов, отправленных источником Observable.

Например, это можно использовать так:

import { interval } from "rxjs";
import { windowCount, mergeAll, take, map, skip } from "rxjs/operators";
const nums = interval(1000).pipe(take(1000));
const result = nums.pipe(
  windowCount(3, 3),
  map(win => win.pipe(skip(1))),
  mergeAll()
);
result.subscribe(x => console.log(x));

В этом коде есть Observable nums, которое каждую секунду выдает число до 1000.

Оператор windowCount внутри pipe выдает 3 значения за раз и запускает новое окно после 3 значений, исходящих из nums.

Затем оператор map возвращает Observable win.pipe(skip (1)), которое на каждые 2 значения пропустит 1 значение.

Наконец, значения передаются в mergeAll(), чтобы объединить все Observables в один.

Таким образом, каждое третье число не выводится в консоль.

windowTime

Оператор windowTime возвращает Observable, который выдает окна элементов с периодичностью, установленной в
windowTimeSpan.

Он принимает до 2-х аргументов: первый windowTimeSpan, а второй является не обязательный, это объект scheduler.

Пример может быть таким:

import { interval } from "rxjs";
import { windowTime, mergeAll, take, map } from "rxjs/operators";
const nums = interval(1000);
const result = nums.pipe(
  windowTime(1000, 5000),
  map(win => win.pipe(take(2))),
  mergeAll()
);
result.subscribe(x => console.log(x));

В приведенном выше коде есть Observable num, который каждую секунду выдает значения, начинающиеся с 0. Затем полученные значения передаются в pipe оператору windowTime, который запускает окно каждые 5 секунд длительностью 1 секунду. Затем выбираются по 2 значения из каждого окна.

Это приведёт к пропуску 3-их значений в каждом окне, поскольку значения выдаются каждую секунду из num, но в take возвращаются только 2 значения для каждого окна.

windowToggle

windowToggle разветвляет исходные Observable значения как вложенные Observable, начиная с выдачи из openings и заканчивая выдачей closingSelector.

Оператору требуется до 2-х аргументов. Первый — это openings, то есть Observable уведомления о запуске нового окна.

Второй аргумент — closeSelector принимает значение, которое выдает openings, и возвращает Observable, который выдает сигнал next или complete, закрывающий окно.

Использовать это можно следующим образом:

import { interval, EMPTY } from "rxjs";
import { windowToggle, mergeAll } from "rxjs/operators";
const interval$ = interval(2000);
const openings = interval(2000);
const result = interval$.pipe(
  windowToggle(openings, i => (i % 3 === 0 ? interval(500) : EMPTY)),
  mergeAll()
);
result.subscribe(x => console.log(x));

Здесь всё начинается с Observable interval$, который выдаёт число каждые 2 секунды. Далее идёт такой же Observable для openings. Выдаваемые из interval$ значения передаются в pipe оператору windowToggle, у которого в качестве первого аргумента есть Observable openings, который генерируется каждые 2 секунды. Таким образом, новое окно запускается каждые 2 секунды.

Вторым аргументом в windowToggle передаётся функция:

i => (i % 3 === 0 ? interval(500) : EMPTY)

чтобы закрыть окно, когда поступившее значение не делится на 3. Это означает, что в консоль будет возвращено только каждое 3 значение, выдаваемое из interval$.

windowWhen

Оператор windowWhen разветвляет исходный Observable с помощью функции closingSelector, которая возвращает Observable, чтобы закрыть окно.

В качестве аргумента он принимает функцию closingSelector, которая принимает значение, выдаваемое openings, и возвращает Observable, который выдает сигнал next или complete, закрывающий окно.

Например, это можно использовать так:

import { interval } from "rxjs";
import { mergeAll, take, windowWhen, map } from "rxjs/operators";
const interval$ = interval(2000);
const result = interval$.pipe(
  windowWhen(() => interval(Math.random() * 4000)),
  map(win => win.pipe(take(2))),
  mergeAll()
);
result.subscribe(x => console.log(x));

В приведенном выше коде есть interval$, который выдает число каждые 2 секунды. Затем выдаваемое значение передается в pipe оператору windowWhen, у которого есть функция closingSelector:

() => interval(Math.random() * 4000)

Это означает, что окно закроется и снова откроется через Math.random () * 4000 миллисекунд.

Сделаем так, чтобы одни числа выдавались быстрее, чем другие.

Оператор windowCount разветвляет исходные Observable значения как вложенные Observable, при этом каждое из них генерирует не более windowSize событий. windowSize — это размер каждого окна.

Оператор windowTime возвращает Observable, который выдаёт окна элементов с периодичностью, установленной windowTimeSpan. windowTimeSpan устанавливает время, в течение которого окно открыто.

windowToggle разветвляет исходные Observable значения, как вложенные Observable, начиная с выдачи из openings и заканчивая выдачей closingSelector. Функции openings и closingSelector — это Observable, которые управляют открытием и закрытием окна, соответственно, для каждого Observable.

Оператор windowWhen разветвляет исходный Observable с помощью функции closingSelector, которая возвращает Observable, чтобы закрыть окно.