Filter
Filter
Rx.filter works like the filter function of Javascript arrays. Values are passed through a predicate function. If the predicate returns true for a value, it is emitted, otherwise it is ignored.
// Only pass-through values from 'source' that start with the letter 'b'const f = Rx.filter(source, v => v.startsWith(`b`));
Drop
Rx.drop is the opposite of filter
. If the predicate returns true, the value will be ignored.
// All values except those starting with `b` are passed-throughconst f = Rx.drop(source, v => v.startsWith(`b`));
Field
Rx.field lets you pluck out a single field from a reactive stream of objects.
// Create a stream of 'name' properties from objects emitted by 'source'const names = Rx.field(source, `name`);
field
lets you specify fallback options if the field is missing:
// Use 'olaf' as a fallback nameconst names = Rx.field(source, `name`, { fallbackFieldValue: `olaf` });
// Or read 'name' from a fallback objectconst names = Rx.field(source, `name`, { fallbackObject: { name: `olaf` } });
Example
Here is a complete example using filter operators
import * as Rx from 'ixfx/rx.js';
const moveRx = Rx.run( Rx.From.event(document, `pointermove`),
// Ignore events from the non-primary pointer Rx.Ops.filter(evt => evt.isPrimary),
// Just grab the 'pointerType' field Rx.Ops.field(`pointerType`),
// Drop all pen events Rx.Ops.drop(`pen`));moveRx.onValue(pointerType => { // 'pointerType' will be a string: mouse, pen, touch etc.});
Math
-
Rx.average emits the running average value. Non numerical values are ignored.
-
Rx.min /Rx.max emits the running min/max. Non numerical values are ignored.
-
Rx.sum emits the running numerical sum. Non numerical values are ignored.
-
Rx.tally emits the count of the number of values that have been emitted
-
Rx.count emits an incrementing number, for every input value, starting at 0.
Interpolation
Rx.interpolate interpolates toward the last value emitted from the source.
const i = Rx.interpolate(source, { amount: 0.1 }); // interpolate by 10%
The returned reactive only emits a value when the source emits a value. Because of this, you don’t really get the desired interpolation effect, since when the source emits, we set a new target, but we don’t get closer to it over time. What you need to do is ping it as some interval so it emits values. See the example given for using Rx.timeoutPing.
Timing
Rx.debounce ensures some minimum waiting period between emitted values. It allows you to only emit a value after there is a mandated pause from the upstream sender.
For example, if the debounce time is 100ms, it’s only when there’s a 100ms gap between values from the source that a value will be emitted. If the source keeps sending messages faster than that, debounce won’t let any messages be emitted. It’s useful for working with the latest value from a what you expect to be a burst of values, and not worry with the ones that came before.
Rx.throttle limits the rate of values, ensuring that there is only one emitted value per throttle time. For example, if the throttle time is 100ms, only one value will be emitted every 100ms. Values are not queued, it’s always the latest value which is sent.
Ranking
Rx.min
and Rx.max
work great for numerical values, but nothing else. Rx.rank allows you to do a similar kind of thing - finding the ‘biggest’ or ‘smallest’ using a custom ranking function.
The ranking function must return a string to denote whether the ‘a’ or ‘b’ parameter is highest, or ‘eq’ if they are the same.
const ranked = (a, b) => { if (a.size > b.size) return `a`; else if (a.size < b.size) return `b`; return `eq`;}
// Emits 'best' value whenever it changesconst rx = Rx.rank(source, ranker);
Combining reactives
Rx.combineLatestToArray takes the several input reactives, outputting the latest values from all sources as a combined array whenever one changes. Emitted arrays correspond in index to the sources.
// Eg. two sources that generate random numbers at intervals of 100 and 200ms.const sources = [ Rx.From.func(Math.random, { loop: true, interval: 100 }), Rx.From.func(Math.random, { loop: true, interval: 200 })];
const r = Rx.combineLatestToArray(sources);r.onValue(value => { // Value will be an array of last value from each source: // [ number, number ]});
combineLatestToObject is the same, but lets you use objects to ‘label’ the sources and resulting values.
// Sources now labelled via the objectconst sources = { fast: Rx.From.func(Math.random, { loop: true, interval: 100 }), slow: Rx.From.func(Math.random, { loop: true, interval: 200 })];
const r = Rx.combineLatestToObject(sources);r.onValue({ fast, slow } => { // We get back 'labelled' values. 'fast' and 'slow' are numbers.});
Etc
Rx.cloneFromFields takes an input object and outputs a clone of the objects copying its fields. This is sometimes needed when passing EventArgs between streams. These are not cloned using a simple destructure, so you miss important properties. If you do use it, place it directly after the operator/source that produces the EventArgs so the subsequent operators have data.
Rx.computeWithPrevious emits a value computed from the current value and the previous value. This is used under-the-hood by Rx.interpolate
.
Rx.elapsed emits the elapsed time in milliseconds since the last message.