Skip to content

Shaping

Transform

Rx.transform transforms values one-by-one, like the ‘map’ function of arrays.

For example, lets say we have source which emits objects that contain many properties, include a ‘name’. But we want a stream of just the name:

// Create a reactive that only emits the name value of objects
const rx = Rx.transform(source, v => v.name);

While it’s usual that transform works with the input value in some capacity, there are cases where you want to ignore it. For example, you could use transform to generate a random value every time someone clicks. In this case, we don’t care about the click event itself, we’re just using it to trigger a function.

Annotate

If you’re working with a stream of objects and you only want to append to them, rather than transform, Rx.annotate might be preferred. In a type-hinted way, it allows you to attach some extra properties using an annotation function. Emitted values have the shape of { value: TIn, annotation: TAnnotation }. Meaning that the original value is stored under .value, and the annotation under .annotation

let count = 0;
// Attach an incrementing 'count' field to all values
const annotated = Rx.annotate(source, v=> { count: count++ });
// Output of 'annotated' will be: { value: <type of source>, annotation: { count: number }}

Rx.annotateWithOp has a similar purpose, but rather than using the result of an annotation function, we use the output of another operator. The operator gets the value from the source stream as input.

const r1 = Rx.run(
// Emit values from an array
Rx.From.array([ 1, 2, 3 ]),
// Annotate with 'sum' operator
Rx.Ops.annotateWithOp(Rx.Ops.sum())
);
const data = await Rx.toArray(r1);
// Data = [ { value: 1, annotation: 1 }, { value: 2, annotation: 3 }, { value: 3, annotation: 6 } ]

annotateWithOp allows you to make use of an operator without losing the original flow of values. If you want a ‘cleaner’ data structure, instead of the { value, annotation } form, you could use a transform operator afterwards to fold the annotation into the value as you like.

Chunk

Rx.chunk consumes several values from the source, emitting them as an array. Its options let you set the chunk size by quantity and/or elapsed time, and whether to discard values that don’t complete a chunk. This is useful for collecting a batch of data, easily supporting ‘change over time’ sort of processing.

Example

Here’s a complete example, using transform, annotate and chunk operators.

import * as Rx from 'ixfx/rx.js';
const moveRx = Rx.run(
// Subscribe to 'pointermove'
Rx.From.event(document, `pointermove`),
// Grab just x & y
Rx.Ops.transform(move => ({ x: move.x, y: move.y })),
// Annotate with time
Rx.Ops.annotate(v => ({ time: performance.now() })),
// Grab sets of five events
Rx.Ops.chunk({ quantity: 5 })
);
moveRx.onValue(arr => {
// arr will be an array of { value: { x:number, y:number }, annotation: { time:number }}
});

Etc.

Rx.cloneFromFields takes an input object and outputs a clone of the objects copying its fields. This is sometimes needed when passing EventArgs between streams. These are not cloned using a simple destructure, so you miss important properties. If you do use it, place it directly after the operator/source that produces the EventArgs so the subsequent operators have data.