Rxjs – это библиотека для реактивного программирования. Операторы создания полезны для генерации данных из различных источников данных, на которые подписчики могут подписаться.
Рассмотрим операторы windowCount
, windowTime
, windowToggle
и windowWhen
.
windowCount
Оператор windowCount
разветвляет исходные Observable-значения, как вложенные Observable, каждый из которых генерирует не более windowSize
событий.
Оператор принимает до 2-х аргументов. Первый аргумент – это windowSize
, максимальное количество значений, которое будет отправлено каждым окном.
Второй аргумент не обязательный. Это число startWindowEvery
, которое по умолчанию равно 0. Это интервал gthtl стартом нового окна. Интервал измеряется количеством элементов, отправленных источником Observable.
Например, это можно использовать так:
import { interval } from "rxjs";
import { windowCount, mergeAll, take, map, skip } from "rxjs/operators";
const nums = interval(1000).pipe(take(1000));
const result = nums.pipe(
windowCount(3, 3),
map(win => win.pipe(skip(1))),
mergeAll()
);
result.subscribe(x => console.log(x));
В этом коде есть Observable nums
, которое каждую секунду выдает число до 1000.
Оператор windowCount
внутри pipe
выдает 3 значения за раз и запускает новое окно после 3 значений, исходящих из nums
.
Затем оператор map
возвращает Observable win.pipe(skip (1))
, которое на каждые 2 значения пропустит 1 значение.
Наконец, значения передаются в mergeAll()
, чтобы объединить все Observables в один.
Таким образом, каждое третье число не выводится в консоль.
windowTime
Оператор windowTime
возвращает Observable, который выдает окна элементов с периодичностью, установленной в
windowTimeSpan
.
Он принимает до 2-х аргументов: первый windowTimeSpan
, а второй является не обязательный, это объект scheduler
.
Пример может быть таким:
import { interval } from "rxjs";
import { windowTime, mergeAll, take, map } from "rxjs/operators";
const nums = interval(1000);
const result = nums.pipe(
windowTime(1000, 5000),
map(win => win.pipe(take(2))),
mergeAll()
);
result.subscribe(x => console.log(x));
В приведенном выше коде есть Observable num
, который каждую секунду выдает значения, начинающиеся с 0. Затем полученные значения передаются в pipe
оператору windowTime
, который запускает окно каждые 5 секунд длительностью 1 секунду. Затем выбираются по 2 значения из каждого окна.
Это приведёт к пропуску 3-их значений в каждом окне, поскольку значения выдаются каждую секунду из num
, но в take
возвращаются только 2 значения для каждого окна.
windowToggle
windowToggle
разветвляет исходные Observable значения как вложенные Observable, начиная с выдачи из openings
и заканчивая выдачей closingSelector
.
Оператору требуется до 2-х аргументов. Первый – это openings
, то есть Observable уведомления о запуске нового окна.
Второй аргумент – closeSelector
принимает значение, которое выдает openings
, и возвращает Observable, который выдает сигнал next
или complete
, закрывающий окно.
Использовать это можно следующим образом:
import { interval, EMPTY } from "rxjs";
import { windowToggle, mergeAll } from "rxjs/operators";
const interval$ = interval(2000);
const openings = interval(2000);
const result = interval$.pipe(
windowToggle(openings, i => (i % 3 === 0 ? interval(500) : EMPTY)),
mergeAll()
);
result.subscribe(x => console.log(x));
Здесь всё начинается с Observable interval$
, который выдаёт число каждые 2 секунды. Далее идёт такой же Observable для openings
. Выдаваемые из interval$
значения передаются в pipe
оператору windowToggle
, у которого в качестве первого аргумента есть Observable openings
, который генерируется каждые 2 секунды. Таким образом, новое окно запускается каждые 2 секунды.
Вторым аргументом в windowToggle
передаётся функция:
i => (i % 3 === 0 ? interval(500) : EMPTY)
чтобы закрыть окно, когда поступившее значение не делится на 3. Это означает, что в консоль будет возвращено только каждое 3 значение, выдаваемое из interval$
.
windowWhen
Оператор windowWhen
разветвляет исходный Observable с помощью функции closingSelector
, которая возвращает Observable, чтобы закрыть окно.
В качестве аргумента он принимает функцию closingSelector
, которая принимает значение, выдаваемое openings
, и возвращает Observable, который выдает сигнал next
или complete
, закрывающий окно.
Например, это можно использовать так:
import { interval } from "rxjs";
import { mergeAll, take, windowWhen, map } from "rxjs/operators";
const interval$ = interval(2000);
const result = interval$.pipe(
windowWhen(() => interval(Math.random() * 4000)),
map(win => win.pipe(take(2))),
mergeAll()
);
result.subscribe(x => console.log(x));
В приведенном выше коде есть interval$
, который выдает число каждые 2 секунды. Затем выдаваемое значение передается в pipe
оператору windowWhen
, у которого есть функция closingSelector
:
() => interval(Math.random() * 4000)
Это означает, что окно закроется и снова откроется через Math.random () * 4000
миллисекунд.
Сделаем так, чтобы одни числа выдавались быстрее, чем другие.
Оператор windowCount
разветвляет исходные Observable значения как вложенные Observable, при этом каждое из них генерирует не более windowSize
событий. windowSize
– это размер каждого окна.
Оператор windowTime
возвращает Observable, который выдаёт окна элементов с периодичностью, установленной windowTimeSpan
. windowTimeSpan
устанавливает время, в течение которого окно открыто.
windowToggle
разветвляет исходные Observable значения, как вложенные Observable, начиная с выдачи из openings
и заканчивая выдачей closingSelector
. Функции openings
и closingSelector
– это Observable, которые управляют открытием и закрытием окна, соответственно, для каждого Observable.
Оператор windowWhen
разветвляет исходный Observable с помощью функции closingSelector
, которая возвращает Observable, чтобы закрыть окно.