Torsten Müller

Rx.Js paradigm: resolving nested observables and side effects

published Apr 12th, 2020

Working with other people is great: You get to learn how other people think about problems and their solutions, learn new approaches and sometimes get to provide some insight into the way you think about certain things. That’s what happened to me a few times recently when it came to Rx.js, a technology near and dear to my heart.

In this post, I’m going to write about and discuss two approaches I saw that I consider to be problematic. I’ll describe the original code, why I see problems with it and how I would refactor it.

Nested Rx chains

In a recent class on web development, someone asked about an Rx.js implementation that they created in their company and how he thought that Rx.js is leading to unreadable and intertwined code. The example he showed me went something like this (abbreviated for clarity and topic changed to not identify the company):

v1-nested-observables.ts
let activeUser: User;
let activeBankList: Bank[];
let activeAccountInformation: Account[];

const userService = new UserService();
const bankService = new BankService();
const accountService = new AccountInformationService();

userService.getUserByEmail(userEmail)
  .subscribe( (user: User) => {
    activeUser = user;
    if (!activeUser.disabled) {
      bankService.getBanksForUser(user)
        .subscribe( (bankInfo: Bank[]) => {
          activeBankList = bankInfo;
          accountService.getAccountsForUserBanks(activeUser, bankInfo)
            .subscribe((allAccounts: Account[]) => {
              activeAccountInformation = allAccounts;
              // logic based on user here
            })
        })
    }
  });

To summarize this implementation in words:

  1. We have a UserService, which retrieves the user’s information based on an email address
  2. If the returned user has access (is not “disabled”), another API call retrieves the banking information for that user
  3. Once the banking information is retrieved, we load all the active accounts for that user.
  4. Intermittently, we store any information we retrieve throughout this chain of API calls in variables as we get the data. In this example case, it’s the global scope, but it could, of course, also be class properties.

It’s the Rx.js equivalent of callback hell we used to have before Promises and Rx, where the result of one call was used to perform another — all in a long, nested structure. For many reasons, which include code clarity, unit testing and dependency of all nested calls, this is less than optimal. So in a first step, let’s get rid of the nesting in the aforementioned code using Rx paradigms.

Not addressing the variable declarations for now, which stay the same, we arrive at the following implementation:

v2-flatMaps.ts
userService.getUserByEmail(userEmail)
  .pipe(
    flatMap((user: User) => {
      if (user.disabled) throw( new DisabledUserException(activeUser) );
      else {
        activeUser = user;
        return bankService.getBanksForUser(user);
      }
    }),
    flatMap((bankInfo: Bank[]) => {
      activeBankList = bankInfo;
      return accountService.getAccountsForUserBanks(activeUser, bankInfo)
    }),
    catchError((err) => {
      // Handle errors, such as unauthorized users or non-existing records
    })
  )
  .subscribe((accountInfo: Account[]) => {
    // Processing goes here
  });

Here, I’ve flattened the Rx chain and gotten rid of the nesting by using the Rx.js flatMap operator. I could have used the switchMap operator here as well with the same result since the chain only ever processes one result. If you want to learn more about the difference between the two xxMap() operators, you may want to read a thread about flatMap vs. switchMap on stackoverflow.com

In this first step, I’ve flattened the nested implementation to a sequence of calls within the same .pipe() operator. What stays thus far is the side effect of storing the data retrieved by the various service calls in global variables.

The other enhancement in this code is the addition of error handling. I introduced a catchError operator to which we provide a method that will handle any error we might receive from any of the APIs, such as 401 Unauthorized or 404 Not found errors.

Doing pretty well here, so now let’s get rid of the side effects. The lack of side-effects is one of the hallmarks of clean, reusable functional code, enabling us to pass functions/methods to solve the same problems in multiple locations.

With side effects, i.e. accessing variables outside the function’s scope for reading or writing purposes, we strongly tie the function to its environment and thus prevent easy reuse of our code. Side-effect free code also helps with reasoning about a function’s action and allows us to reuse the same method in various situations, without having to set up each environment with the expected properties and then having to pass the context using JavaScript’s .bind() object method. Here is the re-worked example, getting rid of the side effects:

v3-populate-object.ts
class UserAccountInfo {

  public user: User;
  public bankList: Bank[];
  public accountInfo: Account[];

  constructor(user: User) {
    this.user = user;
  }
}

userService.getUserByEmail(userEmail)
  .pipe(
    map((user: User) => {
      if (user.disabled) throw( new DisabledUserException(user) );
      else return new UserAccountInfo(user);
    }),
    flatMap((userAcct: UserAccountInfo) => {
      return bankService.getBanksForUser(userAcct.user)
        .pipe(
          map((bankInfo: Bank[]) => {
            userAcct.bankList = bankInfo;
            return userAcct;
          })
        );
    }),
    flatMap((userAcct: UserAccountInfo) => {
      return accountService.getAccountsForUserBanks(userAcct.user, userAcct.bankList)
        .pipe(map((acct: Account[]) => { 
          userAcct.accountInfo = acct; 
          return acct
        }))
    }),
    catchError((err) => {
      // Processing of any error, or mapping, occurs here
    })
  )
  .subscribe((userAccountInfo: UserAccountInfo) => {
    // Final state changes here
  });

In this segment, a few things happen:

  1. I introduced a new “data container” called UserAccountInfo, which is essentially a container for the data retrieved in this example. It contains properties for the user, the corresponding list of banks as well as all the accounts.
  2. The initial call to getUserByEmail() is not changed an returns the same data, but
  3. the following map() operator takes the information from the API call, the User object, and converts it into an object of type UserAccountInfo. If the user is disabled, it throws an error and thus bypass all the other operators except the catchAll().
  4. The following flatMap executes a method that returns another Observable, passing the user attribute of our just generated UserAccountInfo object. The method then takes the passed UserAccountInfo object and augments it with the banking information for the user.
  5. The previous mechanism repeats, this time loading and storing the account information in the UserAccountInfo class.
  6. This far, we have had no side-effects from anywhere, only building up an object with the results of the various API calls.
  7. That object now gets passed to the method specified for the onNext event handler in the subscription. This is the place where we implement all the side-effects that we need in response to the various API calls.

One thing you probably will object to is the use of a side-effect when calling methods on the various services. We can get around that short-coming using currying of methods, turning something like this:

acct-side-effect.ts
const aua = function(userAcct: UserAccountInfo) {
  return accountService.getAccountsForUserBanks(userAcct.user, userAcct.bankList)
    .pipe(map((acct: Account[]) => {
      userAcct.accountInfo = acct;
      return userAcct;
    }))
}

into something like this (variable names shortened to keep lines short for readability):

acct-side-effect-free.ts
const aua = function(service: AccountService): (user: UserAccountInfo) => Observable<UserAccountInfo> {
  return (userAcct: UserAccountInfo): Observable<UserAccountInfo> => {
    return service.getAccountsForUserBanks(userAcct.user, userAcct.bankList)
      .pipe(map((acct: Account[]) => {
        userAcct.accountInfo = acct;
        return userAcct;
      }))
  }
}

This curried function accepts an instance of the API service to retrieve accounts on line 1. It then returns a new function that accepts our UserAccountInfo class and performs an API call on the service passed as the first parameter in line 1. This way, the function does not rely on its environment at all and thus becomes more universally usable.

Getting there! There is just one eye-sore that I have with this code: It’s that the code is not as self-explanatory as it could be: You’d need to read the code to understand what the functions passed to the flatMap() operators are doing. In addition, the code structure mixes various functionality in one Rx chain. Wouldn’t it be nice to have the code be more expressive in what it does?

Then take a look at this:

final.ts
userService.getUserByEmail(userEmail)
  .pipe(
    map(Ops.instantiateUserAccountInfo),
    flatMap(Ops.getBanksForUser(bankService)),
    flatMap(Ops.getAccountsForUser(accountService)),
    catchError((err) => {
      // Intermittent error handling and mapping
    })
  )
  .subscribe(
    (userAccountInfo: UserAccountInfo) => { /* Process result, state change */ },
    (err) => { /* Error handling for the entire chain happens here */}
  );

Here, Ops is an object (with a short name to fit this space) which now houses the side-effect free methods we’ve developed in the previous step. So this implementation changes the following characteristics of the original code:

  1. The population of “global” variables during the processing has been replaced by passing a result object from operator to operator
  2. The result object UserAccountInfo gets passed to the subscription, where all the processing happens, including side effects on the state of the system.
  3. The passing of a results object allows us to have side-effect free and thus easily testable methods (I will return to this side-effect free point shortly)
  4. A universal error handling (or mapping) was introduced through the catchAll() operator, which is intended to map raw HTTP errors to standard errors we define in hopes that these are more explanatory and can be interpreted better and follow project standards.
  5. We are able to group the functionality the operators provide in a nicely organized class, which will help with code organization and other people understanding what is going on. This should go a long way towards writing cleaner, more understandable and less error prone code (Certainly will make code reviews easier).
  6. The descriptive method names communicate even to the casual reader, or a newly onboarded developer, what is happening in this Rx chain by reading it from top to bottom — in contrast to the first version, where the user had to read every line to even get a rough idea of what’s going on.

Side effects in Rx chain

We’ve already seen the use of side effects in Rx chains, and one way to avoid them, but here is another one I recently read about in a blog post as a recommendation on how to avoid using array indices in Rx’s forkJoin() operator:

forkJoin-with-side-effects.ts
forkJoin(
  this.http.get('api/some-data'),
  language,
  dateRange.pipe(tap(([startDate, endDate]) => { 
      this.startDate = startDate;
      this.endDate = endDate;
    })),
  users.pipe(tap(users => this.setUsers(users))),
  this.http.get('api/chart-data')
    .pipe(tap(chart => this.chartData = chart))
)
.subscribe(([someData, lang]) => {
  this.setWelcomeMessage(someData, lang);
  // all data acquired, wire-up everything
  this.launch();
});

forkJoin executes all Observables passed to it, waits for all of them to complete before emitting an event itself containing the content of the last events of each observable, passed to it. The marble diagram for forkJoin looks like this:

Marble diagram for the Rx join operator

The long and short of it is that we organize the various Observables in the forkJoin() in the correct order so that we can use array destructuring in our subscription to get the values we want into properly named variable names.

We order the Observables such that the ones we want to use in our subscription come first. All other calls use the tap() operator to store the different values in object properties as a side effect, thereby modifying class state at the time a HTTPRequest completes but that the programmer has no control over — see line 4 onwards and the use of the tap() operator.

This modification of class state is indicated in the following marble diagram by events that are displayed as squares:

forkJoin with side effects

I agree with the desire to not access results by numeric array index, but for that reason, forkJoin accepts an object literal, where the results of the various calls can be accessed through properties on an object emitted as its result instead of numeric indices. Here’s an example:

forkJoin-object-parameter.ts
const observableTasks = {
  foo: of(1, 2, 3, 4),
  bar: Promise.resolve(8),
  baz: timer(4000),
};

const observable = forkJoin(observableTasks)
  .subscribe(
    value => console.log(value),
  );

// After 4 seconds delay, it logs:
// { foo: 4, bar: 8, baz: 0 }

Not only is this version more readable, it also avoids hard-to-track run-time errors which occur in the previous implementation: Let’s say, for argument’s sake, the API call to fetch the chart data is slow due to an overloaded server, but the call for the dateRange executes much faster, we now have an unexpected state, since our date range has already been changed whereas we have not finished loading the correct data set.

The fact that forkJoin only emits an event once all observables have completed does not help here, because the observables will only notify the subscription that they are completed after the tap() commands with the state modification will have executed.

What’s more, let’s consider the case where one of the Observables throws an onError event. In that case, forkJoin will not emit the data of any of the observables passed to it but instead emit an onError event as well:

forkJoin with side effects when encountering an error

We see that in this constellation, there already will have been state modifications by the third and fourth observable stream passed to forkJoin — before it emits its onError event. Translated, this means that we already effected a state change, even if the observable eventually failed — the status of the system is now undefined and possibly invalid and what’s more: This behavior is really hard to reproduce, trouble-shoot and fix. The original, valid, state is irrevocably lost because parts of it have been overwritten by the side effects.

This is, admittedly, a contrived example, but it shows the benefit of not populating variables in the environment out of a running Rx chain — in particular for asynchronous methods with unknown run times such as HTTP requests. In those cases, the browser could access the already changed values for some other reason — for example in Angular, a change in the data for a property binding would cause an update of the date range in the UI, but not the data for the graph, which comes later — Not to speak of the final state change in the subscription, which might happen at a much later time as well and might be disconnected from the other changes entirely.

That’s why the avoidance of side effects in a functional programming context is so critical, because it allows to better reason about our code, to improve re-usability of your existing methods and avoid hard-to-track errors.