Skip to content

Operators

Operators work by subscribing to values from an upstream source reactive and emitting values.

Operators are usually used with Rx.run or Rx.wrap, discussed in Usage.

You can also use operators manually. Give it a source and usually some options. It will return a new reactive which can be subscribed to or further processed.

Changing shape

Rx.transform transforms values one-by-one, like the ‘map’ function on an array.

For example, lets say we have source which emits objects that contain many properties, include a ‘name’. But we want a stream of just the name:

// Create a reactive that only emits the name value of objects
const rx = Rx.transform(source, v => v.name);

If you’re working with a stream of objects and you only want to append to them, rather than transform, Rx.annotate might be preferred. In a type-hinted way, it allows you to attach some extra properties using an annotation function.

let count = 0;
// Attach an incrementing 'count' field to all values
const annotated = Rx.annotate(source, v=> { count: count++ });

Rx.annotateWithOp has a similar purpose, but rather than using the result of an annotation function, we use the output of another op which is merged with objects. Emitted values have the shape of { value: TIn, annotation: TAnnotation }. Meaning that the original value is stored under .value, and the annotation under .annotation

const r1 = Rx.run(
// Emit values from an array
Rx.From.array([ 1, 2, 3 ]),
// Annotate with 'sum' operator
Rx.Ops.annotateWithOp(Rx.Ops.sum())
);
const data = await Rx.toArray(r1);
// Data = [ { value: 1, annotation: 1 }, { value: 2, annotation: 3 }, { value: 3, annotation: 6 } ]

Rx.chunk consumes several values from the source, emitting them as an array. Its options let you set the chunk size by quantity and/or elapsed time, and whether to discard values that don’t complete a chunk.

Filtering

Rx.filter works as you’d expect of a filter function. Values are passed through a predicate function. If the predicate returns true for a value, it is emitted, otherwise it is ignored.

// Create a reactive which only emits values from 'source' that start with 'b'
const f = Rx.filter(source, v => v.startsWith(`b`));

Rx.drop is the opposite of Rx.filter, instead ignoring values for which the predicate returns true.

Rx.field lets you pluck out a single field from a reactive stream of objects.

// Create a stream of 'name' properties from objects emitted by 'source'
const names = Rx.field(source, `name`);

field lets you specify fallback options if the field is missing:

// Use 'olaf' as a fallback name
const names = Rx.field(source, `name`, { fallbackFieldValue: `olaf` });
// Or read 'name' from a fallback object
const names = Rx.field(source, `name`, { fallbackObject: { name: `olaf` } });

Math

  • Rx.average emits the running average value. Non numerical values are ignored.

  • Rx.min /Rx.max emits the running min/max. Non numerical values are ignored.

  • Rx.sum emits the running numerical sum. Non numerical values are ignored.

  • Rx.tally emits the count of the number of values that have been emitted

  • Rx.count emits an incrementing number, for every input value, starting at 0.

Interpolation

Rx.interpolate interpolates toward the last value emitted from the source.

const i = Rx.interpolate(source, { amount: 0.1 }); // interpolate by 10%

The returned reactive only emits a value when the source emits a value. Because of this, you don’t really get the desired interpolation effect, since when the source emits, we set a new target, but we don’t get closer to it over time. What you need to do is ping it as some interval so it emits values. See the example given for using Rx.timeoutPing.

Timing

Rx.debounce ensures some minimum waiting period between emitted values. It allows you to only emit a value after there is a mandated pause from the upstream sender.

For example, if the debounce time is 100ms, it’s only when there’s a 100ms gap between values from the source that a value will be emitted. If the source keeps sending messages faster than that, debounce won’t let any messages be emitted. It’s useful for working with the latest value from a what you expect to be a burst of values, and not worry with the ones that came before.

Rx.throttle limits the rate of values, ensuring that there is only one emitted value per throttle time. For example, if the throttle time is 100ms, only one value will be emitted every 100ms. Values are not queued, it’s always the latest value which is sent.

Ranking

Rx.min and Rx.max work great for numerical values, but nothing else. Rx.rank allows you to do a similar kind of thing - finding the ‘biggest’ or ‘smallest’ using a custom ranking function.

The ranking function must return a string to denote whether the ‘a’ or ‘b’ parameter is highest, or ‘eq’ if they are the same.

const ranked = (a, b) => {
if (a.size > b.size) return `a`;
else if (a.size < b.size) return `b`;
return `eq`;
}
// Emits 'best' value whenever it changes
const rx = Rx.rank(source, ranker);

Combining reactives

Rx.combineLatestToArray takes the several input reactives, outputting the latest values from all sources as a combined array whenever one changes. Emitted arrays correspond in index to the sources.

// Eg. two sources that generate random numbers at intervals of 100 and 200ms.
const sources = [
Rx.From.func(Math.random, { loop: true, interval: 100 }),
Rx.From.func(Math.random, { loop: true, interval: 200 })
];
const r = Rx.combineLatestToArray(sources);
r.onValue(value => {
// Value will be an array of last value from each source:
// [ number, number ]
});

combineLatestToObject is the same, but lets you use objects to ‘label’ the sources and resulting values.

// Sources now labelled via the object
const sources = {
fast: Rx.From.func(Math.random, { loop: true, interval: 100 }),
slow: Rx.From.func(Math.random, { loop: true, interval: 200 })
];
const r = Rx.combineLatestToObject(sources);
r.onValue({ fast, slow } => {
// We get back 'labelled' values. 'fast' and 'slow' are numbers.
});

Etc

Rx.cloneFromFields takes an input object and outputs a clone of the objects copying its fields. This is sometimes needed when passing EventArgs between streams. These are not cloned using a simple destructure, so you miss important properties. If you do use it, place it directly after the operator/source that produces the EventArgs so the subsequent operators have data.

Rx.computeWithPrevious emits a value computed from the current value and the previous value. This is used under-the-hood by Rx.interpolate.

Rx.elapsed emits the elapsed time in milliseconds since the last message.