title: 別太在意包裝上的照片與實物之間的差異:RxJS
date: 2018-06-22
categories: Training
keywords:
隨著 GitHub 的盛行,越來越多的技術都走向開源,研究原始碼可以讓我們了解相關技術運作原理,藉以修正認知上的落差,畢竟官方文件雖然豐富,但是很多概念難以透過文字描述就可以理解,另一個好處就是學習這些大神的技術。
以上說明都不是本篇的重點,要知道,看完程式碼是賢人,看懂程式碼是神人,這都不是筆者這種”閒”人可以做到的,所以我們只挑選簡單的部分來協助我們更容易的理解 RxJS。
我們可以到 GitHub 網站下載 RxJS 原始碼
開啟 rxjs\src\internal\types.ts
,可以發現 Observer 是一個 interface,所以說只要具備 next
、error
、complete
3個方法的物件都可以稱為 Observer。
{% codeblock types.ts lang:ts %}
export interface Observer
closed?: boolean;
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
{% endcodeblock %}
由介面可以知道只有 next
與 error
可以接收到參數,我們也可以知道 Observer 的責任只決定當發生 next
、error
、complete
時要做什麼,什麼時候發生?誰來通知它?這些都不知道。
開啟 rxjs\src\internal\Observable.ts
,可以看到 Observable 會實作 Subscribable ,而這個介面只有提供 Observer 訂閱(subscribe) 的功能,透過 subscribe 可以在得到一個具有 取消訂閱(unsubscribe) 的物件。
{% codeblock Observable.ts lang:ts %}
export class Observable
}
export interface Subscribable
subscribe(observerOrNext?: PartialObserver
error?: (error: any) => void,
complete?: () => void): Unsubscribable;
}
export interface Unsubscribable {
unsubscribe(): void;
}
{% endcodeblock %}
Observable 透過 subscribe
來與 Observer 建立連結,也就是說我們可以透過 Observable 知道通知對象。
到目前為止我們還不知道 RxJS 的主動通知機制,我們透過 Map 這個最常使用的 RxJS Operator 來研究,開啟 rxjs\src\internal\operators\map.ts
,可以看到 map 只是一個 Function,它最終會呼叫並回傳 Observable 的 lift 方法。
{% codeblock map.ts lang:ts %}
export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
return function mapOperation(source: Observable
if (typeof project !== ‘function’) {
throw new TypeError(‘argument is not a function. Are you looking for mapTo()
?’);
}
return source.lift(new MapOperator(project, thisArg));
};
}
{% endcodeblock %}
回去查閱 Observable 內的 lift
方法,可以看到 lift
會建立一個新的 Observable 來包裝目前的 Observable 與 Operator 物件。
{% codeblock Observable.ts lang:ts %}
export class Observable
lift
const observable = new Observable
observable.source = this;
observable.operator = operator;
return observable;
}
}
{% endcodeblock %}
lift
就像俄羅斯套娃的生產工廠,每串接一個 RxJS Operator Function 就會多包一層 Observable,當然我們也可以知道 Observable 的 source
屬性代表上一層 Observable,operator
代表目前要處理的事情。
上面我們可以看到 Map 函式會傳遞一個 MapOperator 物件給 lift
函式,而這個 Operator 物件主要時做一個 call
方法。
{% codeblock Observable.ts lang:ts %}
export class MapOperator<T, R> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => R, private thisArg: any) {
}
call(subscriber: Subscriber
return source.subscribe(new MapSubscriber(subscriber, this.project, this.thisArg));
}
}
export interface Operator<T, R> {
call(subscriber: Subscriber
}
{% endcodeblock %}
再看看 MapOperator 內的 call
方法竟然只是呼叫 Observable 的訂閱(subscribe),所以 RxJS 程式內的 Operator 物件與我們平常講的 Operator Function 是不一樣的東西,我們知道 subscribe
只接收 Observer,所以 MapSubscriber 應該實作了 Observer 介面。
從原始碼可以看到 MapSubscriber 繼承自 Subscriber 類別,而 Subscriber 實作了 Observer 介面,比較特別的是 next
、error
、complete
只管流程規則:
isStopped
屬性為 true
所有方法都停止動作。 error
、complete
方法會將 isStopped
屬性設定為 true
。由此可知為什麼 Observer 一旦觸發 error
、complete
方法之後就不會再收到任何回應。destination
是指後下一個 Observer,可想而知這是外層 Observable 所 subscribe
的對象(Observer)。next
、error
、complete
剛好分別還有對應的方法 _next
、_error
、_complete
,這3個方法預設直接呼叫下一個 Observer 所對應的方法,只是 _error
、_complete
預設同時會取消訂閱(unsubscribe)。
{% codeblock Observable.ts lang:ts %}
class MapSubscriber<T, R> extends Subscriber
}
export class Subscriber
protected isStopped: boolean = false;
protected destination: PartialObserver
next(value?: T): void {
if (!this.isStopped) {
this._next(value);
}
}
error(err?: any): void {
if (!this.isStopped) {
this.isStopped = true;
this._error(err);
}
}
complete(): void {
if (!this.isStopped) {
this.isStopped = true;
this._complete();
}
}
protected _next(value: T): void {
this.destination.next(value);
}
protected _error(err: any): void {
this.destination.error(err);
this.unsubscribe();
}
protected _complete(): void {
this.destination.complete();
this.unsubscribe();
}
}
{% endcodeblock %}
不論透過 Subscriber 建構式(constructor)或是靜態方法
create
都會將所傳入的 Observer 物件透過 SafeSubscriber 類別轉換成 Subscriber 物件,但是我們可以將他們視為同一個物件來思考。
回來觀看 MapSubscriber 原始碼,可以看到它繼承 Subscriber 類別並覆寫 _next
方法,也就是當上一個 Observer 觸發 next
方法時,我們同時會得到最新的值,接著透過外部設定的函式 project
處理轉換新的值,並傳遞給下一個 Observer 的 next
方法。
{% codeblock Observable.ts lang:ts %}
class MapSubscriber<T, R> extends Subscriber
count: number = 0;
private thisArg: any;
constructor(destination: Subscriber
private project: (value: T, index: number) => R,
thisArg: any) {
super(destination);
this.thisArg = thisArg || this;
}
// NOTE: This looks unoptimized, but it’s actually purposefully NOT
// using try/catch optimizations.
protected _next(value: T) {
let result: any;
try {
result = this.project.call(this.thisArg, value, this.count++);
} catch (err) {
this.destination.error(err);
return;
}
this.destination.next(result);
}
}
{% endcodeblock %}
由此可知 OperatorSubscriber 可以說就是 RxJS Operator 最核心的部分,因為它決定很多事情:
next
、error
、complete
)_next
、_error
、_complete
)所以其實我們也可以自己建立 Operator Function,重點就是先繼承 Subscriber 類別。
開啟 rxjs\src\internal\operators\single.ts
,直接觀看最下面的 SingleSubscriber 類別,可以看到第2次傳值時會呼叫下一個 Observer 的 error
方法。this.destination.error('Sequence contains more than one element');
{% codeblock Observable.ts lang:ts %}
class SingleSubscriber
private seenValue: boolean = false;
private singleValue: T;
private index: number = 0;
private applySingleValue(value: T): void {
if (this.seenValue) {
this.destination.error(‘Sequence contains more than one element’);
} else {
this.seenValue = true;
this.singleValue = value;
}
}
protected _next(value: T): void {
const index = this.index++;
if (this.predicate) {
this.tryNext(value, index);
} else {
this.applySingleValue(value);
}
}
private tryNext(value: T, index: number): void {
try {
if (this.predicate(value, index, this.source)) {
this.applySingleValue(value);
}
} catch (err) {
this.destination.error(err);
}
}
protected _complete(): void {
const destination = this.destination;
if (this.index > 0) {
destination.next(this.seenValue ? this.singleValue : undefined);
destination.complete();
} else {
destination.error(new EmptyError);
}
}
}
{% endcodeblock %}
比較特別的是從 _complete
可以看到如果收到一個值之後就完成(complete),下一個 Observer 是會收到 next
與 complete
的,這是從官網的彈珠圖所看不出來的情境。
從 SingleSubscriber 類別其實我們只能認知第2次以後的
next
都會呼叫下一個 Observer 的error
,若沒有先了解底層 Subscriber 的流程規則,其實有可能會誤認為後續都會觸發error
。
從官網彈珠圖來看 retry 怎麼會再發生 error
之後還可以繼續收到通知,這不是違反規則嗎?
開啟 rxjs\src\internal\operators\retry.ts
,可以看到它覆寫了 error
而不是 _error
,而它所做的事情就是重新訂閱,沒錯它先取消目前訂閱,接著還原自己的屬性參數,最後再從新訂閱。
{% codeblock retry.ts lang:ts %}
class RetrySubscriber
constructor(destination: Subscriber
private count: number,
private source: Observable
super(destination);
}
error(err: any) {
if (!this.isStopped) {
const { source, count } = this;
if (count === 0) {
return super.error(err);
} else if (count > -1) {
this.count = count - 1;
}
source.subscribe(this._unsubscribeAndRecycle());
}
}
}
export class Subscriber
_unsubscribeAndRecycle(): Subscriber
const { _parent, _parents } = this;
this._parent = null;
this._parents = null;
this.unsubscribe();
this.closed = false;
this.isStopped = false;
this._parent = _parent;
this._parents = _parents;
return this;
}
}
{% endcodeblock %}
如果看到複寫
next
、error
、complete
其實就代表有機會改變流程規則,當然大原則之下其實都是遵循規則的,會修改往往是為了達到某些需求,例如:repeat 它做的事情跟 retry 一樣,只是它是改變complete
。
在 每個醫師身邊都應該要有個白衣天使:RxJS 內我們最後提到 Subject 就跟 Observable 一樣,只是它可以提供給多個 Observer 同時訂閱,可以說是 Observable 的強化版,但是從原始碼(rxjs\src\internal\Subject.ts
)來看它還有另一個更強大的功能,可以後續由我們自己來觸發 next
、error
、complete
。
{% codeblock retry.ts lang:ts %}
export class Subject
observers: Observer
next(value?: T) {
if (!this.isStopped) {
…
for (let i = 0; i < len; i++) {
copy[i].next(value);
}
}
}
error(err: any) {
…
for (let i = 0; i < len; i++) {
copy[i].error(err);
}
this.observers.length = 0;
}
complete() {
…
for (let i = 0; i < len; i++) {
copy[i].complete();
}
this.observers.length = 0;
}
}
{% endcodeblock %}
在使用 Observable 類別時,其實我們必須是先將 next
、error
、complete
規則擬定好,如果條件多時會讓程式邏輯會變得很複雜,而 Subject 直接提供 next
、error
、complete
方法,讓我們可以事後依各種狀況來觸發對應的方法,當原始資料來源是我們自己產生時,透過 Subject 封裝應該會比 Observable 有彈性。
BehaviorSubject 應該是 Subject 中最方便的,從原始碼(rxjs\src\internal\BehaviorSubject.ts
)來了解它的特性,我們看到它繼承了 Subject 類別,比較特別的是它提供初始值設定,我們可以建立時同時給一個初始值,而起既使沒有訂閱,我們也可以透過 getValue
方法抓取目前最新的值,另一個特點就是一旦訂閱馬上可以透過 next
收到目前的值。
{% codeblock BehaviorSubject.ts lang:ts %}
export class BehaviorSubject
constructor(private _value: T) {
super();
}
get value(): T {
return this.getValue();
}
/** @deprecated This is an internal implementation detail, do not use. */
_subscribe(subscriber: Subscriber
const subscription = super._subscribe(subscriber);
if (subscription && !(
subscriber.next(this._value);
}
return subscription;
}
getValue(): T {
if (this.hasError) {
throw this.thrownError;
} else if (this.closed) {
throw new ObjectUnsubscribedError();
} else {
return this._value;
}
}
next(value: T): void {
super.next(this._value = value);
}
}
{% endcodeblock %}
看到目前的感想就是
做事的只有 Subscriber
它決定了:
透過 next
正常傳值時要作什麼處理?
收到 error
錯誤時要如何因應?
收到 complete
完成通知之後要做什麼?
以上處理完之後是否要通知下一個 Subscriber?要用什麼方法通知?通知什麼訊息?
Observable 其實是幫我們將 Subscriber 串接起來。