Annotate values with output from the annotation
function.
Returned values will be in the form { value:TIn, annotation:TAnnotation }
Optional
options: Partial<OpMathOptions>Accumulate a chunk of values, emitted as an array
Pluck and emit a single field from values
Throws away values that don't match predicate
Optional
options: Partial<OpMathOptions>Optional
options: Partial<OpMathOptions>Listen for values
Converts one source stream into two, with values being emitted by both
Optional
options: Partial<SplitOptions>Creates new streams for each case
Optional
options: Partial<OpMathOptions>Creates new streams for each case, sending values to the stream if they match the filter predicate
Emits values when this stream and any additional streams produce a value. The resulting stream is thus an array of values, each source at a given index. Waits to output a value until each stream has produced a value. Thus, the pace is determined by the slowest stream.
Optional
options: Partial<Rx.SyncOptions>Optional
options: { Optional
options: Partial<TallyOptions>Rest
...ops: ReactiveOp<TIn, TOut>[]Taps the stream, passing values to one or more 'processor' functions. This processing essentially happens in parallel, not affecting the main stream.
// Stream of pointermove events with {x:0,y:0} as default
const move = Rx.From.event(document.body, `pointermove`, {x:0,y:0});
// Wrap it for fluent access
const ptr = Rx.wrap(move)
.tapProcess(
// Create a string representation
v => `${v.x},${v.y}`
// Set to DOM
v => {
document.getElementById(`coords`).innerText = v;
}
)
.onValue(value => {
// 'value' will be original PointerEvent, since .tapProcess happened in parallel,
// not affecting stream
});
Only allow values through if a minimum of time has elapsed. Throws away values. Ie. converts a fast stream into a slower one.
'Pings' reactive (if it supports it) if a value is not received within a given interval. Behaviour can be stopped using an abort signal.
Emits a value if source
does not emit a value after interval
has elapsed. This can be useful to reset a reactive to some
'zero' state if nothing is going on.
If source
emits faster than the interval
, it won't get triggered.
Default for 'timeout': 1000s.
// Emit 'hello' if 'source' doesn't emit a value after 1 minute
const r = Rx.timeoutValue(source, { value: 'hello', interval: { mins: 1 } });
Can also emit results from a function or generator
// Emits a random number if 'source' doesn't emit a value after 500ms
const r = Rx.timeoutValue(source, { fn: Math.random, interval: 500 });
If immediate
option is true (default), the timer starts from stream initialisation.
Otherwise it won't start until it observes the first value from source
.
Copies values from source into an array.
Copies values from source into an array, throwing an error if expected number of items is not reached
Transforms all values
Wrapped Reactive for object-oriented access