別太在意包裝上的照片與實物之間的差異:RxJS

前言

隨著 GitHub 的盛行,越來越多的技術都走向開源,研究原始碼可以讓我們了解相關技術運作原理,藉以修正認知上的落差,畢竟官方文件雖然豐富,但是很多概念難以透過文字描述就可以理解,另一個好處就是學習這些大神的技術。
以上說明都不是本篇的重點,要知道,看完程式碼是賢人,看懂程式碼是神人,這都不是筆者這種”閒”人可以做到的,所以我們只挑選簡單的部分來協助我們更容易的理解 RxJS。

我們可以到 GitHub 網站下載 RxJS 原始碼

Observer 只是一個介面

開啟 rxjs\src\internal\types.ts,可以發現 Observer 是一個 interface,所以說只要具備 nexterrorcomplete 3個方法的物件都可以稱為 Observer

types.ts
1
2
3
4
5
6
export interface Observer<T> {
closed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}

由介面可以知道只有 nexterror 可以接收到參數,我們也可以知道 Observer 的責任只決定當發生 nexterrorcomplete 時要做什麼,什麼時候發生?誰來通知它?這些都不知道。

Observable 只關心訂閱與取消

開啟 rxjs\src\internal\Observable.ts,可以看到 Observable 會實作 Subscribable ,而這個介面只有提供 Observer 訂閱(subscribe) 的功能,透過 subscribe 可以在得到一個具有 取消訂閱(unsubscribe) 的物件。

Observable.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
export class Observable<T> implements Subscribable<T> {

}

export interface Subscribable<T> {
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
error?: (error: any) => void,
complete?: () => void): Unsubscribable;
}

export interface Unsubscribable {
unsubscribe(): void;
}

Observable 透過 subscribe 來與 Observer 建立連結,也就是說我們可以透過 Observable 知道通知對象

Operator Function:Map

到目前為止我們還不知道 RxJS 的主動通知機制,我們透過 Map 這個最常使用的 RxJS Operator 來研究,開啟 rxjs\src\internal\operators\map.ts,可以看到 map 只是一個 Function,它最終會呼叫並回傳 Observable 的 lift 方法。

map.ts
1
2
3
4
5
6
7
8
export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
return function mapOperation(source: Observable<T>): Observable<R> {
if (typeof project !== 'function') {
throw new TypeError('argument is not a function. Are you looking for `mapTo()`?');
}
return source.lift(new MapOperator(project, thisArg));
};
}

打包機:lift

回去查閱 Observable 內的 lift 方法,可以看到 lift 會建立一個新的 Observable 來包裝目前的 Observable 與 Operator 物件。

Observable.ts
1
2
3
4
5
6
7
8
export class Observable<T> implements Subscribable<T> {
lift<R>(operator: Operator<T, R>): Observable<R> {
const observable = new Observable<R>();
observable.source = this;
observable.operator = operator;
return observable;
}
}

lift 就像俄羅斯套娃的生產工廠,每串接一個 RxJS Operator Function 就會多包一層 Observable,當然我們也可以知道 Observable 的 source 屬性代表上一層 Observable,operator 代表目前要處理的事情。
img

Operator 只做訂閱

上面我們可以看到 Map 函式會傳遞一個 MapOperator 物件給 lift 函式,而這個 Operator 物件主要時做一個 call 方法。

Observable.ts
1
2
3
4
5
6
7
8
9
10
11
12
export class MapOperator<T, R> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => R, private thisArg: any) {
}

call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new MapSubscriber(subscriber, this.project, this.thisArg));
}
}

export interface Operator<T, R> {
call(subscriber: Subscriber<R>, source: any): TeardownLogic;
}

再看看 MapOperator 內的 call 方法竟然只是呼叫 Observable 的訂閱(subscribe),所以 RxJS 程式內的 Operator 物件與我們平常講的 Operator Function 是不一樣的東西,我們知道 subscribe 只接收 Observer,所以 MapSubscriber 應該實作了 Observer 介面。

Subscriber 就是 Observer

從原始碼可以看到 MapSubscriber 繼承自 Subscriber 類別,而 Subscriber 實作了 Observer 介面,比較特別的是 nexterrorcomplete 只管流程規則

  • 只要 isStopped 屬性為 true 所有方法都停止動作。
  • errorcomplete 方法會將 isStopped 屬性設定為 true

由此可知為什麼 Observer 一旦觸發 errorcomplete 方法之後就不會再收到任何回應。
destination 是指後下一個 Observer,可想而知這是外層 Observable 所 subscribe 的對象(Observer)。
nexterrorcomplete 剛好分別還有對應的方法 _next_error_complete,這3個方法預設直接呼叫下一個 Observer 所對應的方法,只是 _error_complete 預設同時會取消訂閱(unsubscribe)。

Observable.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class MapSubscriber<T, R> extends Subscriber<T> {

}

export class Subscriber<T> extends Subscription implements Observer<T> {

protected isStopped: boolean = false;
protected destination: PartialObserver<any>;

next(value?: T): void {
if (!this.isStopped) {
this._next(value);
}
}

error(err?: any): void {
if (!this.isStopped) {
this.isStopped = true;
this._error(err);
}
}

complete(): void {
if (!this.isStopped) {
this.isStopped = true;
this._complete();
}
}

protected _next(value: T): void {
this.destination.next(value);
}

protected _error(err: any): void {
this.destination.error(err);
this.unsubscribe();
}

protected _complete(): void {
this.destination.complete();
this.unsubscribe();
}
}

不論透過 Subscriber 建構式(constructor)或是靜態方法 create 都會將所傳入的 Observer 物件透過 SafeSubscriber 類別轉換成 Subscriber 物件,但是我們可以將他們視為同一個物件來思考。

OperatorSubscriber 才是重點

回來觀看 MapSubscriber 原始碼,可以看到它繼承 Subscriber 類別並覆寫 _next 方法,也就是當上一個 Observer 觸發 next 方法時,我們同時會得到最新的值,接著透過外部設定的函式 project 處理轉換新的值,並傳遞給下一個 Observer 的 next 方法。

Observable.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MapSubscriber<T, R> extends Subscriber<T> {
count: number = 0;
private thisArg: any;

constructor(destination: Subscriber<R>,
private project: (value: T, index: number) => R,
thisArg: any) {
super(destination);
this.thisArg = thisArg || this;
}

// NOTE: This looks unoptimized, but it's actually purposefully NOT
// using try/catch optimizations.
protected _next(value: T) {
let result: any;
try {
result = this.project.call(this.thisArg, value, this.count++);
} catch (err) {
this.destination.error(err);
return;
}
this.destination.next(result);
}
}

由此可知 OperatorSubscriber 可以說就是 RxJS Operator 最核心的部分,因為它決定很多事情:

  • 資料流程規則 (nexterrorcomplete)
  • 資料的加工方式 (_next_error_complete)
  • 決定處理後要呼叫下一個 Observer 的哪一個方法

所以其實我們也可以自己建立 Operator Function,重點就是先繼承 Subscriber 類別。

next 並不一定要接 next:Single

開啟 rxjs\src\internal\operators\single.ts,直接觀看最下面的 SingleSubscriber 類別,可以看到第2次傳值時會呼叫下一個 Observer 的 error 方法。
this.destination.error('Sequence contains more than one element');

Observable.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class SingleSubscriber<T> extends Subscriber<T> {
private seenValue: boolean = false;
private singleValue: T;
private index: number = 0;

private applySingleValue(value: T): void {
if (this.seenValue) {
this.destination.error('Sequence contains more than one element');
} else {
this.seenValue = true;
this.singleValue = value;
}
}

protected _next(value: T): void {
const index = this.index++;

if (this.predicate) {
this.tryNext(value, index);
} else {
this.applySingleValue(value);
}
}

private tryNext(value: T, index: number): void {
try {
if (this.predicate(value, index, this.source)) {
this.applySingleValue(value);
}
} catch (err) {
this.destination.error(err);
}
}

protected _complete(): void {
const destination = this.destination;

if (this.index > 0) {
destination.next(this.seenValue ? this.singleValue : undefined);
destination.complete();
} else {
destination.error(new EmptyError);
}
}
}

比較特別的是從 _complete 可以看到如果收到一個值之後就完成(complete),下一個 Observer 是會收到 nextcomplete 的,這是從官網的彈珠圖所看不出來的情境。
img

從 SingleSubscriber 類別其實我們只能認知第2次以後的 next 都會呼叫下一個 Observer 的 error,若沒有先了解底層 Subscriber 的流程規則,其實有可能會誤認為後續都會觸發 error

retry :為什麼發生錯誤(error)後還可以繼續通知?

從官網彈珠圖來看 retry 怎麼會再發生 error 之後還可以繼續收到通知,這不是違反規則嗎?
img
開啟 rxjs\src\internal\operators\retry.ts,可以看到它覆寫了 error 而不是 _error,而它所做的事情就是重新訂閱,沒錯它先取消目前訂閱,接著還原自己的屬性參數,最後再從新訂閱。

retry.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class RetrySubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<any>,
private count: number,
private source: Observable<T>) {
super(destination);
}
error(err: any) {
if (!this.isStopped) {
const { source, count } = this;
if (count === 0) {
return super.error(err);
} else if (count > -1) {
this.count = count - 1;
}
source.subscribe(this._unsubscribeAndRecycle());
}
}
}

export class Subscriber<T> extends Subscription implements Observer<T> {
_unsubscribeAndRecycle(): Subscriber<T> {
const { _parent, _parents } = this;
this._parent = null;
this._parents = null;
this.unsubscribe();
this.closed = false;
this.isStopped = false;
this._parent = _parent;
this._parents = _parents;
return this;
}
}

如果看到複寫 nexterrorcomplete 其實就代表有機會改變流程規則,當然大原則之下其實都是遵循規則的,會修改往往是為了達到某些需求,例如:repeat 它做的事情跟 retry 一樣,只是它是改變 complete

Subject 不只是提供給多個 Observer 訂閱

每個醫師身邊都應該要有個白衣天使:RxJS 內我們最後提到 Subject 就跟 Observable 一樣,只是它可以提供給多個 Observer 同時訂閱,可以說是 Observable 的強化版,但是從原始碼(rxjs\src\internal\Subject.ts)來看它還有另一個更強大的功能,可以後續由我們自己來觸發 nexterrorcomplete

retry.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
export class Subject<T> extends Observable<T> implements SubscriptionLike {

observers: Observer<T>[] = [];
next(value?: T) {
if (!this.isStopped) {
...
for (let i = 0; i < len; i++) {
copy[i].next(value);
}
}
}

error(err: any) {
...
for (let i = 0; i < len; i++) {
copy[i].error(err);
}
this.observers.length = 0;
}

complete() {
...
for (let i = 0; i < len; i++) {
copy[i].complete();
}
this.observers.length = 0;
}
}

在使用 Observable 類別時,其實我們必須是先將 nexterrorcomplete 規則擬定好,如果條件多時會讓程式邏輯會變得很複雜,而 Subject 直接提供 nexterrorcomplete 方法,讓我們可以事後依各種狀況來觸發對應的方法,當原始資料來源是我們自己產生時,透過 Subject 封裝應該會比 Observable 有彈性。

BehaviorSubject

BehaviorSubject 應該是 Subject 中最方便的,從原始碼(rxjs\src\internal\BehaviorSubject.ts)來了解它的特性,我們看到它繼承了 Subject 類別,比較特別的是它提供初始值設定,我們可以建立時同時給一個初始值,而起既使沒有訂閱,我們也可以透過 getValue 方法抓取目前最新的值,另一個特點就是一旦訂閱馬上可以透過 next 收到目前的值。

BehaviorSubject.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
export class BehaviorSubject<T> extends Subject<T> {

constructor(private _value: T) {
super();
}

get value(): T {
return this.getValue();
}

/** @deprecated This is an internal implementation detail, do not use. */
_subscribe(subscriber: Subscriber<T>): Subscription {
const subscription = super._subscribe(subscriber);
if (subscription && !(<SubscriptionLike>subscription).closed) {
subscriber.next(this._value);
}
return subscription;
}

getValue(): T {
if (this.hasError) {
throw this.thrownError;
} else if (this.closed) {
throw new ObjectUnsubscribedError();
} else {
return this._value;
}
}

next(value: T): void {
super.next(this._value = value);
}
}

後記

看到目前的感想就是
做事的只有 Subscriber
它決定了:

  • 透過 next 正常傳值時要作什麼處理?

  • 收到 error 錯誤時要如何因應?

  • 收到 complete 完成通知之後要做什麼?

  • 以上處理完之後是否要通知下一個 Subscriber?要用什麼方法通知?通知什麼訊息?

Observable 其實是幫我們將 Subscriber 串接起來。