banner
chenyh

chenyh

chenyh.xlog.app
github
telegram
email

結合 RxJS 和 Redux 的代碼實踐:observable-duck

先說 RxJS,RxJS 對自己的解釋是:RxJS 是一個使用可觀察序列編寫異步和基於事件的程序的庫。

可以認為是事件版本的 Lodash。

那實際上就是對觀察者模式的一個實踐,借用官網的例子來說明下用法就是

document.addEventListener('click', () => console.log('Clicked!'))

// 等價於下面的寫法
Rx.fromEvent(document, 'click').subscribe(() => console.log('Clicked!'))

上面的 fromEvent 是一個從指定對象獲取指定事件並轉化成流的工具方法,將用戶在 document 的點擊事件流動到事件序列中以訂閱處。下面說一下核心用法:

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  setTimeout(() => {
    subscriber.next(3);
    subscriber.complete();
  }, 1000);
});

console.log('just before subscribe');
observable.subscribe((value) => console.log(value));
console.log('just after subscribe');

// just before subscribe
// 1
// 2
// 3
// just after subscribe
// 4

這裡 Observable 是 RxJS 中最核心的對象,用來創建一個流,構造函數提供一個 subscriber,可以用來在流中流入值,結束一個流或者拋錯,需要注意的是寫到 Observable 內部的代碼是惰性的,只有被訂閱時才會執行,更多請見官方文檔,這裡僅展示如何用 RxJS 來處理異步代碼。

然後簡單說一下 Redux,Redux 強調的是單向不可變數據流,所以狀態的改變需要 dispatch 一個 action,然後由 reducer 純函數進行處理返回新的狀態,將所有的副作用通過中間件機制混入到 dispatch 的過程中。

那這裡 redux 的 action 其實就是一個個事件對象,可以創建一個 action 的 Observable,在派發 action 的時候同時也將其流動到 Observable 中,然後再訂閱 action 流去處理副作用,還可以利用到 RxJS 強大的流處理能力(過濾,轉換、組合等)。

依據這個思路去寫一個 redux 的中間件:

function createMiddleware() {
  const action$ = new Subject()
  let subscription
  const middleware = store => next => action => {
    next(action)
    actionSubject$.next(action) // flow to streamer
  }
  middleware.run = (streamer) => {
    if (subscription) {
      return subscription.unsubscribe()
    }
    subscription = streamer(action$)
  }
  middleware.close = () => {
    if (!subscription) return
    subscription.unsubscribe()
  }
  return middleware
}

這裡核心就是 middleware.run 方法的參數 streamer,它是一個接受 action 流為參數的方法,有了 action 流後就可以自由訂閱完成各種事情,實際用法如下:

import { Action, applyMiddleware, legacy_createStore } from 'redux'
import { Observable } from 'rxjs'
import { createMiddleware } from 'redux-observable-action'

const reducer = (state = 0, action) => {
  switch (action.type) {
    case 'INCREMENT':
      return state + 1
    case 'DECREMENT':
      return state - 1
    default:
      return state
  }
}

const streamerMiddleware = createMiddleware()
const store = legacy_createStore(reducer, applyMiddleware(streamerMiddleware))

function rootStreamer(action$: Observable<Action>) {
  return action$.subscribe(action => {
    if (action.type === 'INCREMENT') {
      // do something
    }
  })
}

streamerMiddleware.run(rootStreamer)

實際上 RxJS 社區還有一套實現 redux-observable 會更加複雜一些,提供 action 流的同時還有 state 的流,在 redux-observable 中註冊流的方法稱作 epic,接受 action$ state$ 作為參數,並要求返回 action$。

個人感覺強制要求返回 action 流的做法並不自由,有時候僅想要訂閱後完成一些任務,而不是單純做過濾、轉換。

然後介紹一下 duck 模式:在 redux 應用中,各模塊 reducer、creaters、types 往往被拆分到各處,加個東西讓人挺惱火,因此就有了 duck,反其道而行之,將各組件捆綁在一起組織和維護代碼,這套模式已有很多庫進行了實踐,比如 saga-duck 將各組件組裝在一起同時將 redux-saga 一起混入,完整包含狀態管理和異步流程控制,可重用、可組裝。

下面的 observable-duck 則借鑒 saga-duck 的做法,將 redux-saga 的支持替換成 redux-observable-action,同時優化 ts 類型支持,使得 redux store 在 duck 和 react 組件中更加類型完備。

observable-duck#

將 redux 和 rxjs 的 Observable 組合在一起,可以方便的聚合邏輯並且支持流出 state 到 react 組件,類型完備,開發體驗良好。

基本使用#

安裝#

npm i observable-duck --save

組織代碼#

import { Action } from "redux";
import { Base, Action, take } from "observable-duck";
import { Observable } from "rxjs";
import { debounceTime } from 'rxjs/operators'

export default class AppDuck extends Base {
  get quickTypes() {
    enum Type {
      INCREMENT,
      DECREMENT,
    }
    return {
      ...super.quickTypes,
      ...Type,
    };
  }
  get reducers() {
    const types = this.types;
    return {
      count: (state = 0, action) => {
        switch (action.type) {
          case types.INCREMENT:
            return state + 1;
          case types.DECREMENT:
            return state - 1;
          default:
            return state;
        }
      },
    };
  }
  get creators() {
    const types = this.types;
    return {
      ...super.creators,
      increment: () => ({ type: types.INCREMENT }),
      decrement: () => ({ type: types.DECREMENT }),
    };
  }

  /**
   * 添加 Action 裝飾器,注入 redux 的 action 流
   */
  @Action
  increment(action$: Observable<Action>) {
    const duck = this;
    return action$
      .pipe(
        take(duck.types.INCREMENT), // 过滤 action
        debounceTime(20), // 加入防抖以實現 redux-saga 中 takeLatest 的效果
      )
      .subscribe((action) => {
        const state = duck.getState();
        // preform your effect
      });
  }
}

// 創建該 duck 的運行時,基本上各部分的組裝邏輯都在這裡進行
const runtime = Runtime.create(AppDuck)

測試 duck 純邏輯部分#

然後你可以僅測試你的純邏輯部分確認沒有問題

import { expect, test, describe } from 'vitest'
import { Runtime } from 'observable-duck'
import AppDuck from './AppDuck'

describe('AppDuck.test', () => {
  test('AppDuck.count', async () => {
    const runtime = Runtime.create(AppDuck)
    const { dispatch, getState, creators } = runtime.duck
    expect(getState().count).toBe(0)
    dispatch(creators.increment())
    expect(getState().count).toBe(1)
    dispatch(creators.decrement())
    expect(getState().count).toBe(0)
  })
})

連接 React 組件#

然後可以將 runtime 連接到 react 組件(由 react-redux 實現),使用 ConnectedProps<AppDuck> 註釋後的 props 也將獲得完整的類型

import * as React from 'react'
import { ConnectedProps } from 'observable-duck'
import AppDuck from './AppDuck'

function App(props: ConnectedProps<AppDuck>) {
  const { duck, store, dispatch } = props
  const { creators } = duck
  const [count, setCount] = React.useState(0)
  return <div>
    <h4>React.useState</h4>
    <div>
      <button onClick={() => setCount((c) => c - 1)}>-</button>
      <span>{count}</span>
      <button onClick={() => setCount((c) => c + 1)}>+</button>
    </div>
    <h4>React Redux</h4>
    <div>
      <button onClick={() => dispatch(creators.decrement())}>-</button>
      <span>{store.count}</span>
      <button onClick={() => dispatch(creators.increment())}>+</button>
    </div>
  </div>
}

// 導出連接過後的組件
export default Runtime.create(AppDuck).connect(App)

或者並不一定非得連接到 react 組件上使用,由於是 redux 和 rxjs 組合使用,你可以使用 Observable 任意發揮。

另一種方式在 React 組件中使用#

還可以用提供的 useDuck hook 在組件內部創建 redux 倉庫與 duck 的實例化

// index.tsx
import * as React from 'react'
import { useDuck } from 'observable-duck'
import Duck from './Duck'

export default function () {
  const { duck, store, dispatch } = useDuck(Duck)
  const { types } = duck
  return (
    <div>
      useDuck:
      <div>
        <input
          value={store.value}
          onChange={(v) =>
            dispatch({
              type: types.SET_VALUE,
              payload: v.target.value,
            })
          }
        />
      </div>
      <br />
    </div>
  )
}

// Duck.ts
import { Base, reduceFromPayload } from 'observable-duck'
export default class Duck extends Base {
  get quickTypes() {
    enum Type {
      SET_VALUE,
    }
    return {
      ...Type,
    }
  }
  get reducers() {
    const { types } = this
    return {
      value: reduceFromPayload<string>(types.SET_VALUE, ''),
    }
  }
}

擴展 duck#

為了更好的內聚與更低的耦合,duck 也支持將別的邏輯成塊的 duck 作為子 duck 擴展進自身,duck 中的 redux store,Observable 都將註冊,並且擴展後的 duck 同樣類型完備。

外層 duck 關注擴展進來的邏輯,可以接受內層 duck 的 action 進行處理,內層 duck 不關注自身所處環境,因此不會處理外層環境 duck 的 action。

import { Observable } from 'rxjs'
import { Base, Action } from 'observable-duck'

export default class ParentDuck extends Base {
  get quickDucks() {
    return {
      sub: SubDuck,
    }
  }
  get quickTypes() {
    enum Type {
      INCREMENT,
      DECREMENT,
      SET_VALUE,
    }
    return {
      ...super.quickTypes,
      ...Type,
    }
  }
  get reducers() {
    const types = this.types
    return {
      name: (state: string) => 'init name',
      timestamp: (state: number) => Date.now(),
      value: reduceFromPayload<string>(types.SET_VALUE, ''),
    }
  }
  get creators() {
    const types = this.types
    return {
      ...super.creators,
      increment: () => ({ type: types.INCREMENT }),
      decrement: () => ({ type: types.DECREMENT }),
    }
  }
  @Action
  incrementStreamer(action$: Observable<Action>) {
    const duck = this
    return action$.pipe(filterAction(duck.types.INCREMENT)).subscribe((action) => {
      const state = duck.getState()
      console.log(state.sub.aaa)
      // 可以將派發 action 由子 duck 處理
      dispatch({
        type: ducks.sub.types.SUB,
        payload: 'from parent\'s value',
      })
    })
  }
}

class SubDuck extends Base {
  get quickTypes() {
    enum Type {
      SUB,
    }
    return {
      ...super.quickTypes,
      ...Type,
    }
  }
  get reducers() {
    const types = this.types
    return {
      aaa: (state: string) => 'init name',
      value: reduceFromPayload<string>(types.SUB, ''),
    }
  }
  // ...
}

連接外部訂閱源#

在 duck 中訂閱其他 runtime.redux 然後做任何事情

// One.ts
import { Runtime } from 'observable-duck'
import Template from './Template'
import Duck from './Duck'

export const runtime = Runtime.create(Duck) // 單獨將 runtime 也導出
export default runtime.connect(Template) // 將 runtime 與 react 組件連接後默認導出
// Two.ts
import { Base, From } from 'observable-duck'
import { runtime } from './One.ts'

class Search extends Base {
  @From(runtime.redux)
  accept(external$: Observable<DuckState<typeof runtime.duck>>) {
    const { dispatch } = this
    return external$.pipe(/** ... */).subscribe((value) => {
      dispatch({
        type: "...",
        payload: value,
      })
    })
  }
}

或者直接引用外部源可以做到雙向同步

import { Observable } from 'rxjs'
import { webSocket } from 'rxjs/webSocket'
import { Base, Action, Cache, take } from 'observable-duck'

export default class Search extends Base {
  get quickTypes() {
    enum Type {
      SET_VALUE,
      SEARCH,
    }
    return {
      ...Type,
    }
  }
  get reducers() {
    const types = this.types
    return {
      value: reduceFromPayload<string>(types.SET_VALUE, ''),
    }
  }
  get creators() {
    const { types } = this
    return {
      setValue: createToPayload<string>(types.SET_VALUE),
      search: createToPayload<void>(types.SEARCH),
    }
  }
  @Cache()
  get websocket$() {
    const { types, dispatch } = this
    const $ = webSocket('wss://***')
    this.subscription.add(
      $.subscribe((data) => dispatch({
        type: types.SET_VALUE,
        payload: data,
      }))
    )
    return $
  }
  @Action
  watchSearch(action$: Observable<Action>) {
    const duck = this
    return action$
      .pipe(take(duck.types.SEARCH))
      .subscribe((action) => duck.websocket$.next(action.payload))
  }
}
載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。