Torsten Müller

Create custom Rx.js operator: Aggregating events for a length of time

published Feb 29th, 2020

Rx.js offers a lot of operators out of the box, but sometimes, you just have to roll your own.

That happened to me when I needed functionality to handle navigation in an Angular application via key commands. Specifically, I needed to listen to multi-key commands such as rev, which were supposed to reverse the order in a table, for example. The relevant part of the currently implemented Rx chain looks as follows:

abstract-keypress.ts
this.obsRef = this.keyPressObservable
  .pipe(
    filter(this.permitKey),
    map(this.convertToString)
  )
  .subscribe(
    this.reactToKeyPress.bind(this),
  );

In short, it listens for key press events, filters out unwanted keys and then converts the information about the key in question, the KeypressEvent, into a unique string identifier. It performed those actions for every keypress.

The envisioned aggregation of events over a period of time shares characteristics of the scan() operator, which keeps aggregating events, the debounceTime() operator, which suppresses events that follow each other within a time span the developer can specify and the pairwise() operator, which takes subsequent pairs of events and passes them on as an array with two entries (the pair). In a marble diagram, the functionality of our aggregate operator looks like this:

Marble diagram for the aggregate() operator under development

Setting up the operator infrastructure

To start our implementation, we can use the following shell:

aggregate.operator.ts
export function aggregate<T>(stopAggregationIn: number): (source: Observable<T>) => Observable<T[]> {
  return function(source: Observable<T>): Observable<T[]> {
    return new Observable( (subscriber: Subscriber<T[]>) => {
      // Functionality here
    });
  };
}

We define a new operator, aggregate(), which takes a number as an argument, in analogy to the debounceTime() operator, which only passes on an event if it occurs after a specified amount of time has passed between events (Rx documentation for debounceTime()).

aggregate() is a curried function that returns a function accepting the source observable and returning an observable of a different type, here an array with the aggregated Rx events, on line 3. This is an example of immutability, since the observable does simply change the value of the source observable, but instead creates a new observable, which will contain the result of our operation.

With this basic implementation out of the way, we can get to the meat of the implementation.

Aggregating values

In a first step, I’m going to emulate the functionality of the scan operator, sort of: every event passed in will be aggregated in an array indefinitely — the scan() operator in contrast passes any new event to a function also accepting the current value and returns a new value to be “memoized” within the scan() operator.

With this functionality, our custom operator might look like this:

aggregate.operator.ts
export function aggregate<T>(stopAggregationIn: number): (source: Observable<T>) => Observable<T[]> {

  let aggregatedEventValues: T[] = [];

  return function(source: Observable<T>): Observable<T[]> {
    return new Observable( (subscriber: Subscriber<T[]>) => {
      source.subscribe({
        next(value) {
          if (value !== undefined && value !== null) {
            aggregatedEventValues.push(value);
            subscriber.next(aggregatedEventValues);
          }
        },
        error(error) {
          subscriber.error(error);
        },
        complete() {
          subscriber.complete();
        }
      });
    });
  };
}

The new functionality is in lines 3 and 7-20. On line 3, we define an array of type T, which will be the home for the aggregated events as they come in.

The real implementation occurs starting on line 7, where we subscribe to the source observable with an object for the three operator functions provided by Rx.js. The onError and onCompleted() events simply patch through to the next observable in the chain (or the subscription, if we’re the last operator in the pipe).

In the onNext method, we ascertain that we have a valid value, and if we do, we add it to the agggregatedEventValues property and pass it on to the next observable on line 11.

To recap: Our implementation so far

  1. listens for events
  2. pushes (aggregates) the events in an array
  3. passes on the augmented array containing the new value to the next operator in the Rx chain.
  4. passes through any error or completion events

Aggregating values for a set period of time

With the implementation in current state, we would be aggregating forever, always passing on an ever bigger array of events, as shown in this marble diagram:

Event aggregation in perpetuity

To limit the period we’re aggregating events, we need to refactor the implementation to use a timer which after elapsing sends the aggregate value, i.e. the array of events, on to the next observer and clears the aggregating array for the next sequence of events.

With those additional pieces, our final operator looks like this:

aggregate.operator.ts
export function aggregate<T>(stopAggregationIn: number): (source: Observable<T>) => Observable<T[]> {

  let aggregatedEventValues: T[] = [];
  let timerRef = null;

  const handleTimeout = (subscriber: Subscriber<T[]>): TimerHandler => {
    return (): void => {
      const keyEventsCopy = aggregatedEventValues.slice(0);
      aggregatedEventValues = [];
      subscriber.next(keyEventsCopy);
    };
  };

  return function(source: Observable<T>): Observable<T[]> {
    return new Observable( (subscriber: Subscriber<T[]>) => {
      source.subscribe({
        next(value) {
          if (value !== undefined && value !== null) {
            if (timerRef) clearTimeout(timerRef);
            timerRef = setTimeout(handleTimeout(subscriber), stopAggregationIn);

            aggregatedEventValues.push(value);
          }
        },
        error(error) {
          subscriber.error(error);
        },
        complete() {
          subscriber.complete();
        }
      });
    });
  };
}

The timeout functionality is implemented in the onNext method on lines 19 and 20. On line 4 we define a property timerRef, which will keep a reference to the currently active timer instance. Before we call setTimeout(), we first check whether a timer exists and stop it using the JavaScript clearTimeout() function.

Immediately following the clearing, we restart a new timer with the duration passed to the aggregate() operator. As the method to call at that time, we defined a new function handleTimeout on lines 6-12. This also is a curried function which takes an instance of a subscriber of type Subscriber<T[]> and returns a function which

  1. makes a copy of the array holding all previous events (line 8)
  2. empties the original list so we reset the operator for another burst of keypresses (line 9) and, finally,
  3. provides the copy of the collected events as an onNext() event on the current observable (line 10) to its subscriber.

Compared to the previous state of the implementation, this one moves the call to onNext() from directly after adding a new item to the events array into a method which gets called after the timer expired. Thus, we have implemented an asynchronous mechanism for this operator, as it takes input and at some point in the future triggers an event — when the user has stopped typing for stopAggregationIn milliseconds, the parameter to our operator.

Using the operator

This whole thing came about because I needed to implement a mechanism for an Angular app that allows the use of key shortcuts for navigation and the execution of commands. In that implementation, I used an Rx chain like this:

abstract-keypress.ts
  public ngOnInit() {
    this.obsRef = this.keyServiceRef.keyEventObs
      .pipe(
        filter(this.permitKey),
        map(this.convertToString.bind(this)),
      )
      .subscribe(
        this.reactToKeyPress.bind(this),
      );
  }

In short, it filters out unwanted keys and then builds a custom identifier to be used in a component to trigger actions. This works well for single-key instructions/commands, but now I’m going to extend this so that a user can blindly type a key combination and have that key combination trigger some kind of action.

So I need to add the new aggregate() operator (line 3) into the pipe, with a collection timeout of 300 ms between keystrokes, much like the debounceTime() operator:

some.component.ts
  .pipe(
    filter(this.permitKey),
    aggregate(300),
    map(this.convertToString.bind(this)),
  )

and modify the convertToString() method to now accept an array of KeyboardEvents instead of a single event. Now the implementation looks like this:

abstract-keypress.ts
  public convertToString(keyEventList: KeyboardEvent[]): string {
    if (keyEventList.length > 1) {
      return this.generateMultiKeystrokeString(keyEventList);
    } else {
      const modifiers = this.modifierKeysToString(keyEventList[0]);
      return `${ modifiers }-${ keyEventList[0].code }`;
    }
  }

  public generateMultiKeystrokeString(keyEventList: KeyboardEvent[]): string {
    const prefix = this.modifierKeysToString(keyEventList[0], 's-');
    let keySequence = '';
    for (const event of keyEventList) {
      keySequence += event.key.toLowerCase();
    }
    return `${prefix}-${keySequence}`;
  }

  private modifierKeysToString(keypress: KeyboardEvent, prefix = 'k-'): string {
    const modifierKeys = ['altKey', 'ctrlKey', 'shiftKey'];
    let keyCode = prefix;
    for (const code of modifierKeys) {
      if (keypress[code]) keyCode += code.substr(0, 1);
    }
    return keyCode;
  }

You might have noticed the this.convertToString.bind(this) call in the pipe(). The bind() changes the execution context for the function passed to map() to the class containing the called methods in the second listing, generateMultiKeyStrokeString() and modifierKeysToString(). Without this bind(), the this references in the previous listing wouldn’t work.

This functionality is quickly explained:

  1. The convertToString() method now distinguishes between a multi-key command and single-key command, where multi refers to multiple keys in sequence, not keys like Ctrl, which are pressed at the same time.
  2. When generating the ID string in generateMultiKeystrokeString(), we prefix the resulting string with s- and then proceed to parse the modifier keys and build a string of the keys having been used.
  3. The modifierKeysToString() method is unchanged from its previous incarnation.

In this implementation, it is noteworthy that there is an assumption that the special keys on the first key press are also valid for all subsequent events, as shown in this line (11):

const prefix = this.modifierKeysToString(keyEventList[0], 's-');

It passes only the first event in the array to analyse the special keys and passes the s- prefix.

The rest of the code in the last listing should be self-explanatory.

Summary

In this post, I have shown how to implement an asynchronous Rx.js operator, which aggregates events in the Rx chain into an array with a provided timeout and then releases an event containing an array with all the buffered events in an onNext() event.

I have shown how to use that operator in a previous Rx.js-based implementation converting keypress events into unique strings.

The code for this implementation can be found on bitbucket.