先說 RxJS,RxJS 對自己的解釋是:RxJS 是一個使用可觀察序列編寫異步和基於事件的程序的庫。
可以認為是事件版本的 Lodash。
那實際上就是對觀察者模式的一個實踐,借用官網的例子來說明下用法就是
document.addEventListener('click', () => console.log('Clicked!'))
// 等價於下面的寫法
Rx.fromEvent(document, 'click').subscribe(() => console.log('Clicked!'))
上面的 fromEvent 是一個從指定對象獲取指定事件並轉化成流的工具方法,將用戶在 document 的點擊事件流動到事件序列中以訂閱處。下面說一下核心用法:
import { Observable } from 'rxjs';
const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
setTimeout(() => {
subscriber.next(3);
subscriber.complete();
}, 1000);
});
console.log('just before subscribe');
observable.subscribe((value) => console.log(value));
console.log('just after subscribe');
// just before subscribe
// 1
// 2
// 3
// just after subscribe
// 4
這裡 Observable 是 RxJS 中最核心的對象,用來創建一個流,構造函數提供一個 subscriber,可以用來在流中流入值,結束一個流或者拋錯,需要注意的是寫到 Observable 內部的代碼是惰性的,只有被訂閱時才會執行,更多請見官方文檔,這裡僅展示如何用 RxJS 來處理異步代碼。
然後簡單說一下 Redux,Redux 強調的是單向不可變數據流,所以狀態的改變需要 dispatch 一個 action,然後由 reducer 純函數進行處理返回新的狀態,將所有的副作用通過中間件機制混入到 dispatch 的過程中。
那這裡 redux 的 action 其實就是一個個事件對象,可以創建一個 action 的 Observable,在派發 action 的時候同時也將其流動到 Observable 中,然後再訂閱 action 流去處理副作用,還可以利用到 RxJS 強大的流處理能力(過濾,轉換、組合等)。
依據這個思路去寫一個 redux 的中間件:
function createMiddleware() {
const action$ = new Subject()
let subscription
const middleware = store => next => action => {
next(action)
actionSubject$.next(action) // flow to streamer
}
middleware.run = (streamer) => {
if (subscription) {
return subscription.unsubscribe()
}
subscription = streamer(action$)
}
middleware.close = () => {
if (!subscription) return
subscription.unsubscribe()
}
return middleware
}
這裡核心就是 middleware.run 方法的參數 streamer,它是一個接受 action 流為參數的方法,有了 action 流後就可以自由訂閱完成各種事情,實際用法如下:
import { Action, applyMiddleware, legacy_createStore } from 'redux'
import { Observable } from 'rxjs'
import { createMiddleware } from 'redux-observable-action'
const reducer = (state = 0, action) => {
switch (action.type) {
case 'INCREMENT':
return state + 1
case 'DECREMENT':
return state - 1
default:
return state
}
}
const streamerMiddleware = createMiddleware()
const store = legacy_createStore(reducer, applyMiddleware(streamerMiddleware))
function rootStreamer(action$: Observable<Action>) {
return action$.subscribe(action => {
if (action.type === 'INCREMENT') {
// do something
}
})
}
streamerMiddleware.run(rootStreamer)
實際上 RxJS 社區還有一套實現 redux-observable 會更加複雜一些,提供 action 流的同時還有 state 的流,在 redux-observable 中註冊流的方法稱作 epic,接受 action$ state$ 作為參數,並要求返回 action$。
個人感覺強制要求返回 action 流的做法並不自由,有時候僅想要訂閱後完成一些任務,而不是單純做過濾、轉換。
然後介紹一下 duck 模式:在 redux 應用中,各模塊 reducer、creaters、types 往往被拆分到各處,加個東西讓人挺惱火,因此就有了 duck,反其道而行之,將各組件捆綁在一起組織和維護代碼,這套模式已有很多庫進行了實踐,比如 saga-duck 將各組件組裝在一起同時將 redux-saga 一起混入,完整包含狀態管理和異步流程控制,可重用、可組裝。
下面的 observable-duck 則借鑒 saga-duck 的做法,將 redux-saga 的支持替換成 redux-observable-action,同時優化 ts 類型支持,使得 redux store 在 duck 和 react 組件中更加類型完備。
observable-duck#
將 redux 和 rxjs 的 Observable 組合在一起,可以方便的聚合邏輯並且支持流出 state 到 react 組件,類型完備,開發體驗良好。
基本使用#
安裝#
npm i observable-duck --save
組織代碼#
import { Action } from "redux";
import { Base, Action, take } from "observable-duck";
import { Observable } from "rxjs";
import { debounceTime } from 'rxjs/operators'
export default class AppDuck extends Base {
get quickTypes() {
enum Type {
INCREMENT,
DECREMENT,
}
return {
...super.quickTypes,
...Type,
};
}
get reducers() {
const types = this.types;
return {
count: (state = 0, action) => {
switch (action.type) {
case types.INCREMENT:
return state + 1;
case types.DECREMENT:
return state - 1;
default:
return state;
}
},
};
}
get creators() {
const types = this.types;
return {
...super.creators,
increment: () => ({ type: types.INCREMENT }),
decrement: () => ({ type: types.DECREMENT }),
};
}
/**
* 添加 Action 裝飾器,注入 redux 的 action 流
*/
@Action
increment(action$: Observable<Action>) {
const duck = this;
return action$
.pipe(
take(duck.types.INCREMENT), // 过滤 action
debounceTime(20), // 加入防抖以實現 redux-saga 中 takeLatest 的效果
)
.subscribe((action) => {
const state = duck.getState();
// preform your effect
});
}
}
// 創建該 duck 的運行時,基本上各部分的組裝邏輯都在這裡進行
const runtime = Runtime.create(AppDuck)
測試 duck 純邏輯部分#
然後你可以僅測試你的純邏輯部分確認沒有問題
import { expect, test, describe } from 'vitest'
import { Runtime } from 'observable-duck'
import AppDuck from './AppDuck'
describe('AppDuck.test', () => {
test('AppDuck.count', async () => {
const runtime = Runtime.create(AppDuck)
const { dispatch, getState, creators } = runtime.duck
expect(getState().count).toBe(0)
dispatch(creators.increment())
expect(getState().count).toBe(1)
dispatch(creators.decrement())
expect(getState().count).toBe(0)
})
})
連接 React 組件#
然後可以將 runtime 連接到 react 組件(由 react-redux 實現),使用 ConnectedProps<AppDuck>
註釋後的 props 也將獲得完整的類型
import * as React from 'react'
import { ConnectedProps } from 'observable-duck'
import AppDuck from './AppDuck'
function App(props: ConnectedProps<AppDuck>) {
const { duck, store, dispatch } = props
const { creators } = duck
const [count, setCount] = React.useState(0)
return <div>
<h4>React.useState</h4>
<div>
<button onClick={() => setCount((c) => c - 1)}>-</button>
<span>{count}</span>
<button onClick={() => setCount((c) => c + 1)}>+</button>
</div>
<h4>React Redux</h4>
<div>
<button onClick={() => dispatch(creators.decrement())}>-</button>
<span>{store.count}</span>
<button onClick={() => dispatch(creators.increment())}>+</button>
</div>
</div>
}
// 導出連接過後的組件
export default Runtime.create(AppDuck).connect(App)
或者並不一定非得連接到 react 組件上使用,由於是 redux 和 rxjs 組合使用,你可以使用 Observable 任意發揮。
另一種方式在 React 組件中使用#
還可以用提供的 useDuck
hook 在組件內部創建 redux 倉庫與 duck 的實例化
// index.tsx
import * as React from 'react'
import { useDuck } from 'observable-duck'
import Duck from './Duck'
export default function () {
const { duck, store, dispatch } = useDuck(Duck)
const { types } = duck
return (
<div>
useDuck:
<div>
<input
value={store.value}
onChange={(v) =>
dispatch({
type: types.SET_VALUE,
payload: v.target.value,
})
}
/>
</div>
<br />
</div>
)
}
// Duck.ts
import { Base, reduceFromPayload } from 'observable-duck'
export default class Duck extends Base {
get quickTypes() {
enum Type {
SET_VALUE,
}
return {
...Type,
}
}
get reducers() {
const { types } = this
return {
value: reduceFromPayload<string>(types.SET_VALUE, ''),
}
}
}
擴展 duck#
為了更好的內聚與更低的耦合,duck 也支持將別的邏輯成塊的 duck 作為子 duck 擴展進自身,duck 中的 redux store,Observable 都將註冊,並且擴展後的 duck 同樣類型完備。
外層 duck 關注擴展進來的邏輯,可以接受內層 duck 的 action 進行處理,內層 duck 不關注自身所處環境,因此不會處理外層環境 duck 的 action。
import { Observable } from 'rxjs'
import { Base, Action } from 'observable-duck'
export default class ParentDuck extends Base {
get quickDucks() {
return {
sub: SubDuck,
}
}
get quickTypes() {
enum Type {
INCREMENT,
DECREMENT,
SET_VALUE,
}
return {
...super.quickTypes,
...Type,
}
}
get reducers() {
const types = this.types
return {
name: (state: string) => 'init name',
timestamp: (state: number) => Date.now(),
value: reduceFromPayload<string>(types.SET_VALUE, ''),
}
}
get creators() {
const types = this.types
return {
...super.creators,
increment: () => ({ type: types.INCREMENT }),
decrement: () => ({ type: types.DECREMENT }),
}
}
@Action
incrementStreamer(action$: Observable<Action>) {
const duck = this
return action$.pipe(filterAction(duck.types.INCREMENT)).subscribe((action) => {
const state = duck.getState()
console.log(state.sub.aaa)
// 可以將派發 action 由子 duck 處理
dispatch({
type: ducks.sub.types.SUB,
payload: 'from parent\'s value',
})
})
}
}
class SubDuck extends Base {
get quickTypes() {
enum Type {
SUB,
}
return {
...super.quickTypes,
...Type,
}
}
get reducers() {
const types = this.types
return {
aaa: (state: string) => 'init name',
value: reduceFromPayload<string>(types.SUB, ''),
}
}
// ...
}
連接外部訂閱源#
在 duck 中訂閱其他 runtime.redux
然後做任何事情
// One.ts
import { Runtime } from 'observable-duck'
import Template from './Template'
import Duck from './Duck'
export const runtime = Runtime.create(Duck) // 單獨將 runtime 也導出
export default runtime.connect(Template) // 將 runtime 與 react 組件連接後默認導出
// Two.ts
import { Base, From } from 'observable-duck'
import { runtime } from './One.ts'
class Search extends Base {
@From(runtime.redux)
accept(external$: Observable<DuckState<typeof runtime.duck>>) {
const { dispatch } = this
return external$.pipe(/** ... */).subscribe((value) => {
dispatch({
type: "...",
payload: value,
})
})
}
}
或者直接引用外部源可以做到雙向同步
import { Observable } from 'rxjs'
import { webSocket } from 'rxjs/webSocket'
import { Base, Action, Cache, take } from 'observable-duck'
export default class Search extends Base {
get quickTypes() {
enum Type {
SET_VALUE,
SEARCH,
}
return {
...Type,
}
}
get reducers() {
const types = this.types
return {
value: reduceFromPayload<string>(types.SET_VALUE, ''),
}
}
get creators() {
const { types } = this
return {
setValue: createToPayload<string>(types.SET_VALUE),
search: createToPayload<void>(types.SEARCH),
}
}
@Cache()
get websocket$() {
const { types, dispatch } = this
const $ = webSocket('wss://***')
this.subscription.add(
$.subscribe((data) => dispatch({
type: types.SET_VALUE,
payload: data,
}))
)
return $
}
@Action
watchSearch(action$: Observable<Action>) {
const duck = this
return action$
.pipe(take(duck.types.SEARCH))
.subscribe((action) => duck.websocket$.next(action.payload))
}
}