RxJS Observables & Operators
RxJS Observables & Operators Core Concepts Observable — lazy stream of values over time; nothing happens until subscribed Observer — { next, error, complete } c…
RxJS Observables & Operators
Core Concepts
Observable — lazy stream of values over time; nothing happens until subscribed
Observer — { next, error, complete } callbacks
Subject — both Observable and Observer; multicast (share one stream with many subscribers)
BehaviorSubject — Subject with current value; emits last value to new subscribers
Operator — pure function that transforms one Observable into another (map, filter, switchMap...)
Creation & Subscription
import {
Observable, Subject, BehaviorSubject, ReplaySubject,
of, from, interval, timer, fromEvent, forkJoin, combineLatest, merge,
EMPTY, NEVER,
} from 'rxjs';
import {
map, filter, take, takeUntil, switchMap, mergeMap, concatMap, exhaustMap,
debounceTime, throttleTime, distinctUntilChanged, catchError, retry,
tap, shareReplay, startWith, withLatestFrom, combineLatestWith,
} from 'rxjs/operators';
// Creating observables
of(1, 2, 3) // emits 1, 2, 3 then completes
from([1, 2, 3]) // from iterable or Promise
from(fetch('/api/users')) // from Promise
interval(1000) // 0, 1, 2... every second (never completes)
timer(2000, 1000) // starts after 2s, then every 1s
fromEvent(document, 'click') // DOM event stream
EMPTY // completes immediately
NEVER // never emits, never completes
// Custom observable
const obs$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
setTimeout(() => { subscriber.next(3); subscriber.complete(); }, 1000);
return () => console.log('cleanup'); // teardown logic
});
// Subscribe
const sub = obs$.subscribe({
next: val => console.log('Value:', val),
error: err => console.error('Error:', err),
complete: () => console.log('Done'),
});
// Or shorthand: obs$.subscribe(val => console.log(val));
// Always unsubscribe to prevent memory leaks
sub.unsubscribe();Key Operators
// Transformation
obs$.pipe(map(x => x * 2))
obs$.pipe(filter(x => x > 0))
obs$.pipe(take(5)) // take first 5, then complete
obs$.pipe(takeUntil(stop$)) // complete when stop$ emits
// FlatMap variants (handle inner observables)
// switchMap — cancel previous inner, start new (search autocomplete)
// mergeMap — all inner run concurrently (no cancel)
// concatMap — queue; one at a time in order (sequential operations)
// exhaustMap — ignore new while current inner is active (submit button)
const search$ = fromEvent<InputEvent>(input, 'input').pipe(
map(e => (e.target as HTMLInputElement).value),
debounceTime(300), // wait 300ms after last keystroke
distinctUntilChanged(), // ignore if value didn't change
filter(q => q.length >= 2),
switchMap(q => from(fetch(`/api/search?q=${q}`).then(r => r.json()))),
catchError(err => { console.error(err); return EMPTY; })
);
// Combining streams
combineLatest([user$, permissions$]).pipe(
map(([user, perms]) => ({ ...user, permissions: perms }))
)
forkJoin([obs1$, obs2$, obs3$]) // wait for all to complete, emit last values
merge(click$, keypress$) // interleave events from both
// Side effects (don't use tap for business logic)
obs$.pipe(tap(val => console.log('debug:', val)))
// Error handling
obs$.pipe(
retry(3), // retry up to 3 times on error
catchError(err => of(defaultValue)) // recover with fallback
)
// Sharing (prevent re-subscription / multiple HTTP calls)
const users$ = from(api.getUsers()).pipe(shareReplay(1));
// Subjects
const subject = new Subject<string>();
subject.next('hello'); // push value
subject.subscribe(console.log); // subscribe
const state$ = new BehaviorSubject({ count: 0 });
state$.next({ count: 1 });
state$.getValue(); // get current value synchronously