Skip to content

Filter

Filter

Rx.filter works like the filter function of Javascript arrays. Values are passed through a predicate function. If the predicate returns true for a value, it is emitted, otherwise it is ignored.

// Only pass-through values from 'source' that start with the letter 'b'
const f = Rx.filter(source, v => v.startsWith(`b`));

Drop

Rx.drop is the opposite of filter. If the predicate returns true, the value will be ignored.

// All values except those starting with `b` are passed-through
const f = Rx.drop(source, v => v.startsWith(`b`));

Field

Rx.field lets you pluck out a single field from a reactive stream of objects.

// Create a stream of 'name' properties from objects emitted by 'source'
const names = Rx.field(source, `name`);

field lets you specify fallback options if the field is missing:

// Use 'olaf' as a fallback name
const names = Rx.field(source, `name`, { fallbackFieldValue: `olaf` });
// Or read 'name' from a fallback object
const names = Rx.field(source, `name`, { fallbackObject: { name: `olaf` } });

Example

Here is a complete example using filter operators

import * as Rx from 'ixfx/rx.js';
const moveRx = Rx.run(
Rx.From.event(document, `pointermove`),
// Ignore events from the non-primary pointer
Rx.Ops.filter(evt => evt.isPrimary),
// Just grab the 'pointerType' field
Rx.Ops.field(`pointerType`),
// Drop all pen events
Rx.Ops.drop(`pen`)
);
moveRx.onValue(pointerType => {
// 'pointerType' will be a string: mouse, pen, touch etc.
});

Math

  • Rx.average emits the running average value. Non numerical values are ignored.

  • Rx.min /Rx.max emits the running min/max. Non numerical values are ignored.

  • Rx.sum emits the running numerical sum. Non numerical values are ignored.

  • Rx.tally emits the count of the number of values that have been emitted

  • Rx.count emits an incrementing number, for every input value, starting at 0.

Interpolation

Rx.interpolate interpolates toward the last value emitted from the source.

const i = Rx.interpolate(source, { amount: 0.1 }); // interpolate by 10%

The returned reactive only emits a value when the source emits a value. Because of this, you don’t really get the desired interpolation effect, since when the source emits, we set a new target, but we don’t get closer to it over time. What you need to do is ping it as some interval so it emits values. See the example given for using Rx.timeoutPing.

Timing

Rx.debounce ensures some minimum waiting period between emitted values. It allows you to only emit a value after there is a mandated pause from the upstream sender.

For example, if the debounce time is 100ms, it’s only when there’s a 100ms gap between values from the source that a value will be emitted. If the source keeps sending messages faster than that, debounce won’t let any messages be emitted. It’s useful for working with the latest value from a what you expect to be a burst of values, and not worry with the ones that came before.

Rx.throttle limits the rate of values, ensuring that there is only one emitted value per throttle time. For example, if the throttle time is 100ms, only one value will be emitted every 100ms. Values are not queued, it’s always the latest value which is sent.

Ranking

Rx.min and Rx.max work great for numerical values, but nothing else. Rx.rank allows you to do a similar kind of thing - finding the ‘biggest’ or ‘smallest’ using a custom ranking function.

The ranking function must return a string to denote whether the ‘a’ or ‘b’ parameter is highest, or ‘eq’ if they are the same.

const ranked = (a, b) => {
if (a.size > b.size) return `a`;
else if (a.size < b.size) return `b`;
return `eq`;
}
// Emits 'best' value whenever it changes
const rx = Rx.rank(source, ranker);

Combining reactives

Rx.combineLatestToArray takes the several input reactives, outputting the latest values from all sources as a combined array whenever one changes. Emitted arrays correspond in index to the sources.

// Eg. two sources that generate random numbers at intervals of 100 and 200ms.
const sources = [
Rx.From.func(Math.random, { loop: true, interval: 100 }),
Rx.From.func(Math.random, { loop: true, interval: 200 })
];
const r = Rx.combineLatestToArray(sources);
r.onValue(value => {
// Value will be an array of last value from each source:
// [ number, number ]
});

combineLatestToObject is the same, but lets you use objects to ‘label’ the sources and resulting values.

// Sources now labelled via the object
const sources = {
fast: Rx.From.func(Math.random, { loop: true, interval: 100 }),
slow: Rx.From.func(Math.random, { loop: true, interval: 200 })
];
const r = Rx.combineLatestToObject(sources);
r.onValue({ fast, slow } => {
// We get back 'labelled' values. 'fast' and 'slow' are numbers.
});

Etc

Rx.cloneFromFields takes an input object and outputs a clone of the objects copying its fields. This is sometimes needed when passing EventArgs between streams. These are not cloned using a simple destructure, so you miss important properties. If you do use it, place it directly after the operator/source that produces the EventArgs so the subsequent operators have data.

Rx.computeWithPrevious emits a value computed from the current value and the previous value. This is used under-the-hood by Rx.interpolate.

Rx.elapsed emits the elapsed time in milliseconds since the last message.