All topics
Frontend · Learning hub

Rxjs notes for developers

Master Rxjs with a curated set of 3 developer notes — core concepts, patterns, and interview prep. Maintained by the DevRecall team.

Save this stack to your DevRecallMore Frontend notes
Rxjs

RxJS Observables & Operators

RxJS Observables & Operators Core Concepts Observable — lazy stream of values over time; nothing happens until subscribed Observer — { next, error, complete } c

RxJS Observables & Operators

Core Concepts

  • Observable — lazy stream of values over time; nothing happens until subscribed

  • Observer — { next, error, complete } callbacks

  • Subject — both Observable and Observer; multicast (share one stream with many subscribers)

  • BehaviorSubject — Subject with current value; emits last value to new subscribers

  • Operator — pure function that transforms one Observable into another (map, filter, switchMap...)

Creation & Subscription

import {
  Observable, Subject, BehaviorSubject, ReplaySubject,
  of, from, interval, timer, fromEvent, forkJoin, combineLatest, merge,
  EMPTY, NEVER,
} from 'rxjs';
import {
  map, filter, take, takeUntil, switchMap, mergeMap, concatMap, exhaustMap,
  debounceTime, throttleTime, distinctUntilChanged, catchError, retry,
  tap, shareReplay, startWith, withLatestFrom, combineLatestWith,
} from 'rxjs/operators';

// Creating observables
of(1, 2, 3)                          // emits 1, 2, 3 then completes
from([1, 2, 3])                      // from iterable or Promise
from(fetch('/api/users'))            // from Promise
interval(1000)                       // 0, 1, 2... every second (never completes)
timer(2000, 1000)                    // starts after 2s, then every 1s
fromEvent(document, 'click')        // DOM event stream
EMPTY                                // completes immediately
NEVER                                // never emits, never completes

// Custom observable
const obs$ = new Observable<number>(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  setTimeout(() => { subscriber.next(3); subscriber.complete(); }, 1000);
  return () => console.log('cleanup');  // teardown logic
});

// Subscribe
const sub = obs$.subscribe({
  next: val => console.log('Value:', val),
  error: err => console.error('Error:', err),
  complete: () => console.log('Done'),
});
// Or shorthand: obs$.subscribe(val => console.log(val));

// Always unsubscribe to prevent memory leaks
sub.unsubscribe();

Key Operators

// Transformation
obs$.pipe(map(x => x * 2))
obs$.pipe(filter(x => x > 0))
obs$.pipe(take(5))                    // take first 5, then complete
obs$.pipe(takeUntil(stop$))           // complete when stop$ emits

// FlatMap variants (handle inner observables)
// switchMap  — cancel previous inner, start new (search autocomplete)
// mergeMap   — all inner run concurrently (no cancel)
// concatMap  — queue; one at a time in order (sequential operations)
// exhaustMap — ignore new while current inner is active (submit button)

const search$ = fromEvent<InputEvent>(input, 'input').pipe(
  map(e => (e.target as HTMLInputElement).value),
  debounceTime(300),               // wait 300ms after last keystroke
  distinctUntilChanged(),          // ignore if value didn't change
  filter(q => q.length >= 2),
  switchMap(q => from(fetch(`/api/search?q=${q}`).then(r => r.json()))),
  catchError(err => { console.error(err); return EMPTY; })
);

// Combining streams
combineLatest([user$, permissions$]).pipe(
  map(([user, perms]) => ({ ...user, permissions: perms }))
)
forkJoin([obs1$, obs2$, obs3$])      // wait for all to complete, emit last values
merge(click$, keypress$)             // interleave events from both

// Side effects (don't use tap for business logic)
obs$.pipe(tap(val => console.log('debug:', val)))

// Error handling
obs$.pipe(
  retry(3),                         // retry up to 3 times on error
  catchError(err => of(defaultValue)) // recover with fallback
)

// Sharing (prevent re-subscription / multiple HTTP calls)
const users$ = from(api.getUsers()).pipe(shareReplay(1));

// Subjects
const subject = new Subject<string>();
subject.next('hello');              // push value
subject.subscribe(console.log);    // subscribe

const state$ = new BehaviorSubject({ count: 0 });
state$.next({ count: 1 });
state$.getValue();                  // get current value synchronously
Rxjs

Observables & Operators

Observables & Operators RxJS is a library for reactive programming using Observables — lazy push-based streams of values over time. Nothing executes until you s

Observables & Operators

RxJS is a library for reactive programming using Observables — lazy push-based streams of values over time. Nothing executes until you subscribe.

Creating Observables

import {
  Observable, of, from, interval, timer, fromEvent, EMPTY, NEVER,
} from 'rxjs';

// of — emit a fixed sequence of values then complete
of(1, 2, 3).subscribe(console.log); // 1, 2, 3

// from — from array, iterable, or Promise
from([10, 20, 30]).subscribe(console.log);      // 10, 20, 30
from(fetch('/api/users').then(r => r.json()))   // from Promise
  .subscribe(users => console.log(users));

// interval — emit 0, 1, 2... every N ms (never completes)
interval(1000).subscribe(n => console.log(n)); // 0, 1, 2...

// timer — emit once after delay, or repeatedly after initial delay
timer(2000).subscribe(() => console.log('2s later')); // once
timer(0, 1000).subscribe(n => console.log(n));        // 0, 1, 2...

// fromEvent — DOM or Node.js EventEmitter event stream
fromEvent(document, 'click').subscribe(e => console.log(e));
fromEvent(document, 'keydown')
  .subscribe((e: Event) => console.log((e as KeyboardEvent).key));

// Custom Observable — full control over emission
const custom$ = new Observable<number>(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  setTimeout(() => {
    subscriber.next(3);
    subscriber.complete();
  }, 1000);
  // Return teardown function (called on unsubscribe)
  return () => console.log('cleaned up');
});

// EMPTY — completes immediately without emitting
// NEVER  — never emits, never completes (use in tests/stubs)
EMPTY.subscribe({ complete: () => console.log('done') });

Transformation & Filtering Operators

import { of, from, interval } from 'rxjs';
import {
  map, filter, take, skip, takeLast, first, last,
  distinctUntilChanged, debounceTime, throttleTime,
  scan, reduce, tap,
} from 'rxjs/operators';

// map — transform each value
of(1, 2, 3).pipe(map(x => x * 2)).subscribe(console.log); // 2, 4, 6

// filter — drop values that don't pass predicate
of(1, 2, 3, 4, 5).pipe(filter(x => x % 2 === 0)).subscribe(console.log); // 2, 4

// take / skip / first / last
interval(500).pipe(take(3)).subscribe(console.log); // 0, 1, 2 then complete
of(1,2,3,4,5).pipe(skip(2)).subscribe(console.log); // 3, 4, 5
of(1,2,3).pipe(first()).subscribe(console.log);     // 1
of(1,2,3).pipe(last()).subscribe(console.log);      // 3

// distinctUntilChanged — skip consecutive duplicates
of(1,1,2,3,3,2).pipe(distinctUntilChanged()).subscribe(console.log); // 1,2,3,2

// debounceTime — emit only after silence for N ms (search input)
// throttleTime — emit once per N ms window (scroll handler)

// scan — like reduce but emits running total
of(1, 2, 3, 4).pipe(
  scan((acc, val) => acc + val, 0)
).subscribe(console.log); // 1, 3, 6, 10

// tap — side effects without modifying stream (logging, debugging)
of(1, 2, 3).pipe(
  tap(x => console.log('before:', x)),
  map(x => x * 10),
  tap(x => console.log('after:', x))
).subscribe();

Higher-Order Mapping Operators

import { fromEvent, from, of, interval, EMPTY } from 'rxjs';
import { switchMap, mergeMap, concatMap, exhaustMap, catchError } from 'rxjs/operators';

// All four flatten an Observable<Observable<T>> to Observable<T>
// The difference is how they handle overlapping inner observables:

// switchMap  — cancel previous inner, start new (best for search/autocomplete)
// mergeMap   — all inner run concurrently, no ordering guarantee
// concatMap  — queue: one inner at a time, in order (sequential HTTP requests)
// exhaustMap — ignore new outer values while inner is active (submit button)

const input = document.querySelector('input')!;
const searchResults$ = fromEvent(input, 'input').pipe(
  map((e: Event) => (e.target as HTMLInputElement).value),
  debounceTime(300),
  distinctUntilChanged(),
  filter(q => q.length >= 2),
  switchMap(query =>
    from(fetch(`/api/search?q=${query}`).then(r => r.json())).pipe(
      catchError(err => { console.error(err); return EMPTY; })
    )
  )
);

// mergeMap — parallel requests (order not guaranteed)
const ids = [1, 2, 3];
from(ids).pipe(
  mergeMap(id => from(fetch(`/api/item/${id}`).then(r => r.json())))
).subscribe(item => console.log(item));

// concatMap — sequential (waits for previous to complete)
from([1, 2, 3]).pipe(
  concatMap(n => of(n).pipe(delay(500)))
).subscribe(console.log); // 1 (after 0.5s), 2 (after 1s), 3 (after 1.5s)

// exhaustMap — ignore clicks while request is in flight
const submitBtn = document.querySelector('button')!;
fromEvent(submitBtn, 'click').pipe(
  exhaustMap(() => from(fetch('/api/submit').then(r => r.json())))
).subscribe(response => console.log('submitted:', response));

Combining Streams & Error Handling

import { combineLatest, forkJoin, merge, zip, of, throwError } from 'rxjs';
import { catchError, retry, retryWhen, delay, take, shareReplay, startWith } from 'rxjs/operators';

// combineLatest — emit array whenever any source emits (all must emit at least once)
const user$ = of({ name: 'Alice' });
const perms$ = of(['read', 'write']);
combineLatest([user$, perms$]).pipe(
  map(([user, perms]) => ({ ...user, permissions: perms }))
).subscribe(console.log);

// forkJoin — wait for ALL to complete, emit last value of each (like Promise.all)
forkJoin({
  users: from(fetch('/api/users').then(r => r.json())),
  config: from(fetch('/api/config').then(r => r.json())),
}).subscribe(({ users, config }) => console.log(users, config));

// merge — interleave emissions from multiple sources
const click$ = fromEvent(document, 'click');
const key$ = fromEvent(document, 'keydown');
merge(click$, key$).subscribe(e => console.log('interaction', e.type));

// Error handling
of(1, 2, 3).pipe(
  map(x => { if (x === 2) throw new Error('bad value'); return x; }),
  catchError(err => {
    console.error('caught:', err.message);
    return of(-1); // recover with fallback value
  })
).subscribe(console.log); // 1, -1

// retry — resubscribe up to N times on error
from(fetch('/api/data')).pipe(
  retry(3),
  catchError(err => of({ error: true, message: err.message }))
).subscribe(console.log);

// shareReplay — multicast + replay last N values to new subscribers
// Prevents multiple HTTP calls when multiple components subscribe
const users$ = from(fetch('/api/users').then(r => r.json())).pipe(
  shareReplay(1) // cache the last emission
);
users$.subscribe(u => console.log('component A:', u));
users$.subscribe(u => console.log('component B:', u)); // no second HTTP call
Rxjs

Subjects & Patterns

Subjects & Async Patterns Subjects bridge imperative and reactive code. They act as both Observable and Observer — you push values in imperatively and multiple

Subjects & Async Patterns

Subjects bridge imperative and reactive code. They act as both Observable and Observer — you push values in imperatively and multiple subscribers receive them reactively.

Subject Types

import { Subject, BehaviorSubject, ReplaySubject, AsyncSubject } from 'rxjs';

// Subject — hot multicast; new subscribers miss past emissions
const events$ = new Subject<string>();
events$.subscribe(e => console.log('A:', e));
events$.next('click');  // A: click
events$.subscribe(e => console.log('B:', e)); // B subscribes late
events$.next('keydown'); // A: keydown, B: keydown (both receive)

// BehaviorSubject — remembers and immediately emits current value to new subscribers
// Best for: shared mutable state (current user, theme, loading flag)
const theme$ = new BehaviorSubject<'light' | 'dark'>('light');
theme$.subscribe(t => console.log('subscriber 1:', t)); // immediately: 'light'
theme$.next('dark');
theme$.subscribe(t => console.log('subscriber 2:', t)); // immediately: 'dark'
console.log(theme$.getValue()); // 'dark' — synchronous access

// ReplaySubject(n) — replays last n emissions to new subscribers
const log$ = new ReplaySubject<string>(3); // buffer last 3
log$.next('a');
log$.next('b');
log$.next('c');
log$.next('d');
log$.subscribe(x => console.log(x)); // d, c, b (last 3 in order: b, c, d)

// AsyncSubject — only emits last value on complete (like a resolved Promise)
const result$ = new AsyncSubject<number>();
result$.subscribe(x => console.log('result:', x));
result$.next(1);
result$.next(2);
result$.next(3);
result$.complete(); // result: 3 — only last value emitted

takeUntil & Unsubscribe Patterns

import { Subject, interval, fromEvent } from 'rxjs';
import { takeUntil, takeWhile, take } from 'rxjs/operators';

// 1. takeUntil — the idiomatic Angular/RxJS pattern for cleanup
//    emit until a 'destroyer' subject emits (on component destroy)
class MyComponent {
  private destroy$ = new Subject<void>();

  ngOnInit() {
    interval(1000).pipe(
      takeUntil(this.destroy$)
    ).subscribe(n => console.log('tick', n));

    fromEvent(window, 'resize').pipe(
      takeUntil(this.destroy$)
    ).subscribe(() => this.onResize());
  }

  ngOnDestroy() {
    this.destroy$.next(); // triggers completion of all takeUntil streams
    this.destroy$.complete();
  }

  onResize() { /* ... */ }
}

// 2. takeWhile — complete when predicate returns false
interval(500).pipe(
  takeWhile(n => n < 5)
).subscribe(console.log); // 0, 1, 2, 3, 4 then completes

// 3. Explicit unsubscribe — store and manually unsubscribe
import { Subscription } from 'rxjs';
class AnotherComponent {
  private subs = new Subscription();

  ngOnInit() {
    this.subs.add(
      interval(1000).subscribe(n => console.log(n))
    );
    this.subs.add(
      fromEvent(window, 'scroll').subscribe(() => this.onScroll())
    );
  }

  ngOnDestroy() {
    this.subs.unsubscribe(); // cleans up all at once
  }

  onScroll() { /* ... */ }
}

State Management with BehaviorSubject

import { BehaviorSubject } from 'rxjs';
import { map, distinctUntilChanged } from 'rxjs/operators';

// Lightweight store pattern (alternative to NgRx for small apps)
interface AppState {
  user: { name: string } | null;
  loading: boolean;
  count: number;
}

const initialState: AppState = { user: null, loading: false, count: 0 };

class Store {
  private state$ = new BehaviorSubject<AppState>(initialState);

  // Select a slice of state (only emits when that slice changes)
  select<K extends keyof AppState>(key: K) {
    return this.state$.pipe(
      map(state => state[key]),
      distinctUntilChanged()
    );
  }

  getState(): AppState {
    return this.state$.getValue();
  }

  // Immutable updates
  patch(partial: Partial<AppState>) {
    this.state$.next({ ...this.getState(), ...partial });
  }

  setUser(user: AppState['user']) { this.patch({ user }); }
  setLoading(loading: boolean) { this.patch({ loading }); }
  increment() { this.patch({ count: this.getState().count + 1 }); }
}

const store = new Store();
store.select('user').subscribe(u => console.log('user changed:', u));
store.select('loading').subscribe(l => console.log('loading:', l));

store.setUser({ name: 'Alice' }); // user changed: { name: 'Alice' }
store.increment();                 // count: 1 (no user/loading emission)

Async Patterns & Real-World Examples

import { fromEvent, Subject, interval } from 'rxjs';
import { debounceTime, distinctUntilChanged, switchMap, catchError, withLatestFrom, bufferTime } from 'rxjs/operators';
import { EMPTY } from 'rxjs';

// 1. Search-as-you-type with cancellation
const searchInput = document.getElementById('search') as HTMLInputElement;
const search$ = fromEvent(searchInput, 'input').pipe(
  map((e: Event) => (e.target as HTMLInputElement).value.trim()),
  debounceTime(350),        // wait for typing to pause
  distinctUntilChanged(),   // skip if same value
  filter(q => q.length > 1),
  switchMap(q =>            // cancel in-flight request on new keystroke
    from(fetch(`/api/search?q=${encodeURIComponent(q)}`).then(r => r.json())).pipe(
      catchError(() => EMPTY) // swallow errors, show nothing
    )
  )
);

// 2. Polling with start/stop control
const startPolling$ = new Subject<void>();
const stopPolling$ = new Subject<void>();

startPolling$.pipe(
  switchMap(() =>
    interval(5000).pipe(
      takeUntil(stopPolling$),
      switchMap(() => from(fetch('/api/status').then(r => r.json())))
    )
  )
).subscribe(status => console.log('status:', status));

startPolling$.next(); // begin polling
// stopPolling$.next(); // stop polling

// 3. Buffer user actions and batch-process
const userActions$ = new Subject<string>();
userActions$.pipe(
  bufferTime(2000),  // collect all actions in 2s windows
  filter(actions => actions.length > 0)
).subscribe(actions => {
  console.log('batch:', actions); // process in bulk
});

userActions$.next('view:page1');
userActions$.next('click:button1');
// After 2s: ['view:page1', 'click:button1']

Keep your Rxjs knowledge sharp.

Save this stack to your personal DevRecall — add your own notes, track what you're learning, and share what you know with the community.

Get started — free forever