Create custom Rx.js operator: Aggregating events for a length of time
Rx.js offers a lot of operators out of the box, but sometimes, you just have to roll your own.
That happened to me when I needed functionality to handle
navigation in an Angular application via key commands.
Specifically, I needed to listen to multi-key commands such as rev
, which were supposed to
reverse the order in a table, for example. The relevant part of the currently implemented
Rx chain looks as follows:
this.obsRef = this.keyPressObservable
.pipe(
filter(this.permitKey),
map(this.convertToString)
)
.subscribe(
this.reactToKeyPress.bind(this),
);
In short, it listens for key press events, filters out unwanted keys and then converts the
information about the key in question, the KeypressEvent
, into a unique string identifier.
It performed those actions for every keypress.
The envisioned aggregation of events over a period of time shares characteristics of
the scan()
operator, which keeps
aggregating events, the debounceTime()
operator, which suppresses events that follow each other within a time span the developer can
specify and the pairwise()
operator,
which takes subsequent pairs of events and passes them on as an array with two entries (the pair).
In a marble diagram, the functionality of our aggregate
operator looks like this:
Setting up the operator infrastructure
To start our implementation, we can use the following shell:
export function aggregate<T>(stopAggregationIn: number): (source: Observable<T>) => Observable<T[]> {
return function(source: Observable<T>): Observable<T[]> {
return new Observable( (subscriber: Subscriber<T[]>) => {
// Functionality here
});
};
}
We define a new operator, aggregate()
, which takes a number
as an argument, in analogy
to the debounceTime()
operator, which only passes on an event if it occurs after a specified
amount of time has passed between events (Rx documentation for debounceTime()).
aggregate()
is a curried function that returns a function accepting the source observable
and returning an observable of a different type, here an array with the aggregated Rx events,
on line 3. This is an example of immutability, since the observable does simply change the
value of the source observable, but instead creates a new observable, which will contain the
result of our operation.
With this basic implementation out of the way, we can get to the meat of the implementation.
Aggregating values
In a first step, I’m going to emulate the functionality of the scan operator, sort of: every
event passed in will be aggregated in an array indefinitely — the scan()
operator in contrast
passes any new event to a function also accepting the current value and returns a new value to
be “memoized” within the scan()
operator.
With this functionality, our custom operator might look like this:
export function aggregate<T>(stopAggregationIn: number): (source: Observable<T>) => Observable<T[]> {
let aggregatedEventValues: T[] = [];
return function(source: Observable<T>): Observable<T[]> {
return new Observable( (subscriber: Subscriber<T[]>) => {
source.subscribe({
next(value) {
if (value !== undefined && value !== null) {
aggregatedEventValues.push(value);
subscriber.next(aggregatedEventValues);
}
},
error(error) {
subscriber.error(error);
},
complete() {
subscriber.complete();
}
});
});
};
}
The new functionality is in lines 3 and 7-20. On line 3, we define an array of type T
, which
will be the home for the aggregated events as they come in.
The real implementation occurs
starting on line 7, where we subscribe to the source
observable with an object for the three
operator functions provided by Rx.js. The onError
and onCompleted()
events simply patch
through to the next observable in the chain (or the subscription, if we’re the last operator in
the pipe).
In the onNext
method, we ascertain that we have a valid value, and if we do, we add it to the
agggregatedEventValues
property and pass it on to the next observable on line 11.
To recap: Our implementation so far
- listens for events
- pushes (aggregates) the events in an array
- passes on the augmented array containing the new value to the next operator in the Rx chain.
- passes through any error or completion events
Aggregating values for a set period of time
With the implementation in current state, we would be aggregating forever, always passing on an ever bigger array of events, as shown in this marble diagram:
To limit the period we’re aggregating events, we need to refactor the implementation to use a timer which after elapsing sends the aggregate value, i.e. the array of events, on to the next observer and clears the aggregating array for the next sequence of events.
With those additional pieces, our final operator looks like this:
export function aggregate<T>(stopAggregationIn: number): (source: Observable<T>) => Observable<T[]> {
let aggregatedEventValues: T[] = [];
let timerRef = null;
const handleTimeout = (subscriber: Subscriber<T[]>): TimerHandler => {
return (): void => {
const keyEventsCopy = aggregatedEventValues.slice(0);
aggregatedEventValues = [];
subscriber.next(keyEventsCopy);
};
};
return function(source: Observable<T>): Observable<T[]> {
return new Observable( (subscriber: Subscriber<T[]>) => {
source.subscribe({
next(value) {
if (value !== undefined && value !== null) {
if (timerRef) clearTimeout(timerRef);
timerRef = setTimeout(handleTimeout(subscriber), stopAggregationIn);
aggregatedEventValues.push(value);
}
},
error(error) {
subscriber.error(error);
},
complete() {
subscriber.complete();
}
});
});
};
}
The timeout functionality is implemented in the onNext
method on lines 19 and 20. On line
4 we define a property timerRef
, which will keep a reference to the currently active timer
instance. Before we call setTimeout()
, we first check whether a timer exists and stop
it using the JavaScript clearTimeout()
function.
Immediately following the clearing, we restart a new timer with the duration passed to the
aggregate()
operator. As the method to call at that time, we defined a new function
handleTimeout
on lines 6-12. This also is a curried function which takes an instance of a
subscriber of type Subscriber<T[]>
and returns a function which
- makes a copy of the array holding all previous events (line 8)
- empties the original list so we reset the operator for another burst of keypresses (line 9) and, finally,
- provides the copy of the collected events as an
onNext()
event on the current observable (line 10) to its subscriber.
Compared to the previous state of the implementation, this one moves the call to onNext()
from directly after adding a new item to the
events array into a method which gets called after the timer expired. Thus, we have implemented
an asynchronous mechanism for this operator, as it takes input and at some point in the future
triggers an event — when the user has stopped typing for stopAggregationIn
milliseconds,
the parameter to our operator.
Using the operator
This whole thing came about because I needed to implement a mechanism for an Angular app that allows the use of key shortcuts for navigation and the execution of commands. In that implementation, I used an Rx chain like this:
public ngOnInit() {
this.obsRef = this.keyServiceRef.keyEventObs
.pipe(
filter(this.permitKey),
map(this.convertToString.bind(this)),
)
.subscribe(
this.reactToKeyPress.bind(this),
);
}
In short, it filters out unwanted keys and then builds a custom identifier to be used in a component to trigger actions. This works well for single-key instructions/commands, but now I’m going to extend this so that a user can blindly type a key combination and have that key combination trigger some kind of action.
So I need to add the new aggregate()
operator (line 3) into the pipe, with a collection
timeout of 300 ms between keystrokes, much like the debounceTime()
operator:
.pipe(
filter(this.permitKey),
aggregate(300),
map(this.convertToString.bind(this)),
)
and modify the convertToString()
method to now accept an array of KeyboardEvent
s instead
of a single event. Now the implementation looks like this:
public convertToString(keyEventList: KeyboardEvent[]): string {
if (keyEventList.length > 1) {
return this.generateMultiKeystrokeString(keyEventList);
} else {
const modifiers = this.modifierKeysToString(keyEventList[0]);
return `${ modifiers }-${ keyEventList[0].code }`;
}
}
public generateMultiKeystrokeString(keyEventList: KeyboardEvent[]): string {
const prefix = this.modifierKeysToString(keyEventList[0], 's-');
let keySequence = '';
for (const event of keyEventList) {
keySequence += event.key.toLowerCase();
}
return `${prefix}-${keySequence}`;
}
private modifierKeysToString(keypress: KeyboardEvent, prefix = 'k-'): string {
const modifierKeys = ['altKey', 'ctrlKey', 'shiftKey'];
let keyCode = prefix;
for (const code of modifierKeys) {
if (keypress[code]) keyCode += code.substr(0, 1);
}
return keyCode;
}
You might have noticed the this.convertToString.bind(this)
call in the pipe()
. The bind()
changes the execution context for the function passed to map()
to the class containing
the called methods in the second listing, generateMultiKeyStrokeString()
and
modifierKeysToString()
. Without this bind()
, the this
references in the previous listing
wouldn’t work.
This functionality is quickly explained:
- The
convertToString()
method now distinguishes between a multi-key command and single-key command, where multi refers to multiple keys in sequence, not keys like Ctrl, which are pressed at the same time. - When generating the ID string in
generateMultiKeystrokeString()
, we prefix the resulting string withs-
and then proceed to parse the modifier keys and build a string of the keys having been used. - The
modifierKeysToString()
method is unchanged from its previous incarnation.
In this implementation, it is noteworthy that there is an assumption that the special keys on the first key press are also valid for all subsequent events, as shown in this line (11):
const prefix = this.modifierKeysToString(keyEventList[0], 's-');
It passes only the
first event in the array to analyse the special keys and passes the s-
prefix.
The rest of the code in the last listing should be self-explanatory.
Summary
In this post, I have shown how to implement an asynchronous Rx.js operator, which aggregates
events in the Rx chain into an array with a provided timeout and then releases an event
containing an array with all the buffered events in an onNext()
event.
I have shown how to use that operator in a previous Rx.js-based implementation converting keypress events into unique strings.
The code for this implementation can be found on bitbucket.