Operators
Operators work by subscribing to values from an upstream source reactive and emitting values.
Operators are usually used with Rx.run
or Rx.wrap
, discussed in Usage.
You can also use operators manually. Give it a source and usually some options. It will return a new reactive which can be subscribed to or further processed.
Changing shape
Rx.transform transforms values one-by-one, like the ‘map’ function on an array.
For example, lets say we have source
which emits objects that contain many properties, include a ‘name’. But we want a stream of just the name:
If you’re working with a stream of objects and you only want to append to them, rather than transform, Rx.annotate might be preferred. In a type-hinted way, it allows you to attach some extra properties using an annotation function.
Rx.annotateWithOp has a similar purpose, but rather than using the result of an annotation function, we use the output of another op which is merged with objects. Emitted values have the shape of { value: TIn, annotation: TAnnotation }
. Meaning that the original value is stored under .value
, and the annotation under .annotation
Rx.chunk consumes several values from the source, emitting them as an array. Its options let you set the chunk size by quantity and/or elapsed time, and whether to discard values that don’t complete a chunk.
Filtering
Rx.filter works as you’d expect of a filter function. Values are passed through a predicate function. If the predicate returns true for a value, it is emitted, otherwise it is ignored.
Rx.drop is the opposite of Rx.filter
, instead ignoring values for which the predicate returns true.
Rx.field lets you pluck out a single field from a reactive stream of objects.
field
lets you specify fallback options if the field is missing:
Math
-
Rx.average emits the running average value. Non numerical values are ignored.
-
Rx.min /Rx.max emits the running min/max. Non numerical values are ignored.
-
Rx.sum emits the running numerical sum. Non numerical values are ignored.
-
Rx.tally emits the count of the number of values that have been emitted
-
Rx.count emits an incrementing number, for every input value, starting at 0.
Interpolation
Rx.interpolate interpolates toward the last value emitted from the source.
The returned reactive only emits a value when the source emits a value. Because of this, you don’t really get the desired interpolation effect, since when the source emits, we set a new target, but we don’t get closer to it over time. What you need to do is ping it as some interval so it emits values. See the example given for using Rx.timeoutPing.
Timing
Rx.debounce ensures some minimum waiting period between emitted values. It allows you to only emit a value after there is a mandated pause from the upstream sender.
For example, if the debounce time is 100ms, it’s only when there’s a 100ms gap between values from the source that a value will be emitted. If the source keeps sending messages faster than that, debounce won’t let any messages be emitted. It’s useful for working with the latest value from a what you expect to be a burst of values, and not worry with the ones that came before.
Rx.throttle limits the rate of values, ensuring that there is only one emitted value per throttle time. For example, if the throttle time is 100ms, only one value will be emitted every 100ms. Values are not queued, it’s always the latest value which is sent.
Ranking
Rx.min
and Rx.max
work great for numerical values, but nothing else. Rx.rank allows you to do a similar kind of thing - finding the ‘biggest’ or ‘smallest’ using a custom ranking function.
The ranking function must return a string to denote whether the ‘a’ or ‘b’ parameter is highest, or ‘eq’ if they are the same.
Combining reactives
Rx.combineLatestToArray takes the several input reactives, outputting the latest values from all sources as a combined array whenever one changes. Emitted arrays correspond in index to the sources.
combineLatestToObject is the same, but lets you use objects to ‘label’ the sources and resulting values.
Etc
Rx.cloneFromFields takes an input object and outputs a clone of the objects copying its fields. This is sometimes needed when passing EventArgs between streams. These are not cloned using a simple destructure, so you miss important properties. If you do use it, place it directly after the operator/source that produces the EventArgs so the subsequent operators have data.
Rx.computeWithPrevious emits a value computed from the current value and the previous value. This is used under-the-hood by Rx.interpolate
.
Rx.elapsed emits the elapsed time in milliseconds since the last message.