先说 RxJS、RxJS の説明は次のとおりです:RxJS は、可観測なシーケンスを使用して非同期およびイベントベースのプログラムを構成するためのライブラリです。
RxJS をイベントのための Lodash と考えてください。
実際には、これはオブザーバーパターンの実践であり、公式サイトの例を借りて使い方を説明します。
document.addEventListener('click', () => console.log('Clicked!'))
// 以下の書き方と同等です
Rx.fromEvent(document, 'click').subscribe(() => console.log('Clicked!'))
上記の fromEvent は、指定されたオブジェクトから指定されたイベントを取得し、ストリームに変換するためのツールメソッドであり、ユーザーが document でクリックしたイベントをイベントシーケンスに流し込んで購読します。次に、核心的な使い方について説明します。
import { Observable } from 'rxjs';
const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
setTimeout(() => {
subscriber.next(3);
subscriber.complete();
}, 1000);
});
console.log('just before subscribe');
observable.subscribe((value) => console.log(value));
console.log('just after subscribe');
// just before subscribe
// 1
// 2
// 3
// just after subscribe
// 4
ここで Observable は RxJS の最も核心的なオブジェクトであり、ストリームを作成するために使用されます。コンストラクタはサブスクライバーを提供し、ストリームに値を流し込んだり、ストリームを終了したり、エラーを投げたりすることができます。注意が必要なのは、Observable 内部に書かれたコードは遅延実行され、購読されたときにのみ実行されることです。詳細については公式文書を参照してください。ここでは、RxJS を使用して非同期コードを処理する方法を示しています。
次に Redux について簡単に説明します。Redux は単方向の不変データフローを強調しているため、状態の変更にはアクションを dispatch する必要があり、その後 reducer という純関数が処理して新しい状態を返します。すべての副作用はミドルウェアメカニズムを介して dispatch のプロセスに混入します。
ここで Redux のアクションは実際にはイベントオブジェクトであり、アクションの Observable を作成できます。アクションを dispatch する際に同時にそれを Observable に流し込み、その後アクションストリームを購読して副作用を処理できます。また、RxJS の強力なストリーム処理能力(フィルタリング、変換、結合など)を利用することもできます。
この考え方に基づいて、Redux のミドルウェアを作成します。
function createMiddleware() {
const action$ = new Subject()
let subscription
const middleware = store => next => action => {
next(action)
actionSubject$.next(action) // ストリーマーに流す
}
middleware.run = (streamer) => {
if (subscription) {
return subscription.unsubscribe()
}
subscription = streamer(action$)
}
middleware.close = () => {
if (!subscription) return
subscription.unsubscribe()
}
return middleware
}
ここでの核心は middleware.run メソッドの引数 streamer であり、これはアクションストリームを引数として受け取るメソッドです。アクションストリームがあれば、自由に購読してさまざまなことを実行できます。実際の使い方は以下の通りです。
import { Action, applyMiddleware, legacy_createStore } from 'redux'
import { Observable } from 'rxjs'
import { createMiddleware } from 'redux-observable-action'
const reducer = (state = 0, action) => {
switch (action.type) {
case 'INCREMENT':
return state + 1
case 'DECREMENT':
return state - 1
default:
return state
}
}
const streamerMiddleware = createMiddleware()
const store = legacy_createStore(reducer, applyMiddleware(streamerMiddleware))
function rootStreamer(action$: Observable<Action>) {
return action$.subscribe(action => {
if (action.type === 'INCREMENT') {
// 何かをする
}
})
}
streamerMiddleware.run(rootStreamer)
実際、RxJS コミュニティには redux-observable という実装があり、もう少し複雑で、アクションストリームと同時に状態のストリームも提供します。redux-observable でストリームを登録するメソッドは epic と呼ばれ、action$ と state$ を引数として受け取り、アクションを返すことが求められます。
私個人の感想としては、アクションストリームを強制的に返すというやり方は自由度が低いと感じています。時には単に購読していくつかのタスクを完了したいだけで、単純にフィルタリングや変換を行うわけではありません。
次に duck モード を紹介します:Redux アプリケーションでは、各モジュールの reducer、creators、types がしばしば分散されており、何かを追加するのが面倒です。そこで duck が登場し、逆に各コンポーネントを束ねてコードを整理・管理します。このパターンは多くのライブラリで実践されており、例えば saga-duck は各コンポーネントを組み合わせ、redux-saga を混入させて、状態管理と非同期フロー制御を完全に含む、再利用可能で組み立て可能なものです。
以下の observable-duck は saga-duck のアプローチを参考にし、redux-saga のサポートを redux-observable-action に置き換え、ts 型サポートを最適化し、redux ストアが duck と react コンポーネントの中でより完全な型を持つようにします。
observable-duck#
Redux と RxJS の Observable を組み合わせることで、ロジックを簡単に集約し、状態を React コンポーネントに流出させることができ、型が完備され、開発体験が良好です。
基本使用法#
インストール#
npm i observable-duck --save
コードの整理#
import { Action } from "redux";
import { Base, Action, take } from "observable-duck";
import { Observable } from "rxjs";
import { debounceTime } from 'rxjs/operators'
export default class AppDuck extends Base {
get quickTypes() {
enum Type {
INCREMENT,
DECREMENT,
}
return {
...super.quickTypes,
...Type,
};
}
get reducers() {
const types = this.types;
return {
count: (state = 0, action) => {
switch (action.type) {
case types.INCREMENT:
return state + 1;
case types.DECREMENT:
return state - 1;
default:
return state;
}
},
};
}
get creators() {
const types = this.types;
return {
...super.creators,
increment: () => ({ type: types.INCREMENT }),
decrement: () => ({ type: types.DECREMENT }),
};
}
/**
* Action デコレーターを追加し、Redux のアクションストリームを注入します
*/
@Action
increment(action$: Observable<Action>) {
const duck = this;
return action$
.pipe(
take(duck.types.INCREMENT), // アクションをフィルタリング
debounceTime(20), // redux-saga の takeLatest の効果を実現するためにデバウンスを追加
)
.subscribe((action) => {
const state = duck.getState();
// 効果を実行
});
}
}
// この duck のランタイムを作成し、基本的に各部分の組み立てロジックがここで行われます
const runtime = Runtime.create(AppDuck)
duck の純粋なロジック部分をテストする#
次に、純粋なロジック部分をテストして問題がないことを確認できます。
import { expect, test, describe } from 'vitest'
import { Runtime } from 'observable-duck'
import AppDuck from './AppDuck'
describe('AppDuck.test', () => {
test('AppDuck.count', async () => {
const runtime = Runtime.create(AppDuck)
const { dispatch, getState, creators } = runtime.duck
expect(getState().count).toBe(0)
dispatch(creators.increment())
expect(getState().count).toBe(1)
dispatch(creators.decrement())
expect(getState().count).toBe(0)
})
})
React コンポーネントに接続する#
次に、ランタイムを React コンポーネントに接続できます(react-redux によって実現されます)。ConnectedProps<AppDuck>
注釈の付いた props も完全な型を取得します。
import * as React from 'react'
import { ConnectedProps } from 'observable-duck'
import AppDuck from './AppDuck'
function App(props: ConnectedProps<AppDuck>) {
const { duck, store, dispatch } = props
const { creators } = duck
const [count, setCount] = React.useState(0)
return <div>
<h4>React.useState</h4>
<div>
<button onClick={() => setCount((c) => c - 1)}>-</button>
<span>{count}</span>
<button onClick={() => setCount((c) => c + 1)}>+</button>
</div>
<h4>React Redux</h4>
<div>
<button onClick={() => dispatch(creators.decrement())}>-</button>
<span>{store.count}</span>
<button onClick={() => dispatch(creators.increment())}>+</button>
</div>
</div>
}
// 接続されたコンポーネントをエクスポート
export default Runtime.create(AppDuck).connect(App)
また、必ずしも React コンポーネントに接続する必要はなく、Redux と RxJS を組み合わせて使用するため、Observable を自由に活用できます。
React コンポーネント内で使用する別の方法#
提供された useDuck
フックを使用して、コンポーネント内で Redux ストアと duck のインスタンスを作成することもできます。
// index.tsx
import * as React from 'react'
import { useDuck } from 'observable-duck'
import Duck from './Duck'
export default function () {
const { duck, store, dispatch } = useDuck(Duck)
const { types } = duck
return (
<div>
useDuck:
<div>
<input
value={store.value}
onChange={(v) =>
dispatch({
type: types.SET_VALUE,
payload: v.target.value,
})
}
/>
</div>
<br />
</div>
)
}
// Duck.ts
import { Base, reduceFromPayload } from 'observable-duck'
export default class Duck extends Base {
get quickTypes() {
enum Type {
SET_VALUE,
}
return {
...Type,
}
}
get reducers() {
const { types } = this
return {
value: reduceFromPayload<string>(types.SET_VALUE, ''),
}
}
}
duck の拡張#
より良い内聚と低い結合のために、duck は他のロジックをブロックとして子 duck として自身に拡張することをサポートしています。duck 内の Redux ストア、Observable はすべて登録され、拡張された duck も同様に型が完備されます。
外側の duck は拡張されたロジックに関心を持ち、内側の duck のアクションを処理できますが、内側の duck は自身が置かれている環境に関心を持たないため、外側の環境 duck のアクションを処理することはありません。
import { Observable } from 'rxjs'
import { Base, Action } from 'observable-duck'
export default class ParentDuck extends Base {
get quickDucks() {
return {
sub: SubDuck,
}
}
get quickTypes() {
enum Type {
INCREMENT,
DECREMENT,
SET_VALUE,
}
return {
...super.quickTypes,
...Type,
}
}
get reducers() {
const types = this.types
return {
name: (state: string) => 'init name',
timestamp: (state: number) => Date.now(),
value: reduceFromPayload<string>(types.SET_VALUE, ''),
}
}
get creators() {
const types = this.types
return {
...super.creators,
increment: () => ({ type: types.INCREMENT }),
decrement: () => ({ type: types.DECREMENT }),
}
}
@Action
incrementStreamer(action$: Observable<Action>) {
const duck = this
return action$.pipe(filterAction(duck.types.INCREMENT)).subscribe((action) => {
const state = duck.getState()
console.log(state.sub.aaa)
// 子 duck によってアクションを処理させることができます
dispatch({
type: ducks.sub.types.SUB,
payload: 'from parent\'s value',
})
})
}
}
class SubDuck extends Base {
get quickTypes() {
enum Type {
SUB,
}
return {
...super.quickTypes,
...Type,
}
}
get reducers() {
const types = this.types
return {
aaa: (state: string) => 'init name',
value: reduceFromPayload<string>(types.SUB, ''),
}
}
// ...
}
外部購読源に接続する#
duck 内で他の runtime.redux
を購読し、何でも行うことができます。
// One.ts
import { Runtime } from 'observable-duck'
import Template from './Template'
import Duck from './Duck'
export const runtime = Runtime.create(Duck) // 単独でランタイムをエクスポート
export default runtime.connect(Template) // ランタイムを React コンポーネントに接続してデフォルトエクスポート
// Two.ts
import { Base, From } from 'observable-duck'
import { runtime } from './One.ts'
class Search extends Base {
@From(runtime.redux)
accept(external$: Observable<DuckState<typeof runtime.duck>>) {
const { dispatch } = this
return external$.pipe(/** ... */).subscribe((value) => {
dispatch({
type: "...",
payload: value,
})
})
}
}
また、外部ソースを直接参照することで双方向の同期を実現することもできます。
import { Observable } from 'rxjs'
import { webSocket } from 'rxjs/webSocket'
import { Base, Action, Cache, take } from 'observable-duck'
export default class Search extends Base {
get quickTypes() {
enum Type {
SET_VALUE,
SEARCH,
}
return {
...Type,
}
}
get reducers() {
const types = this.types
return {
value: reduceFromPayload<string>(types.SET_VALUE, ''),
}
}
get creators() {
const { types } = this
return {
setValue: createToPayload<string>(types.SET_VALUE),
search: createToPayload<void>(types.SEARCH),
}
}
@Cache()
get websocket$() {
const { types, dispatch } = this
const $ = webSocket('wss://***')
this.subscription.add(
$.subscribe((data) => dispatch({
type: types.SET_VALUE,
payload: data,
}))
)
return $
}
@Action
watchSearch(action$: Observable<Action>) {
const duck = this
return action$
.pipe(take(duck.types.SEARCH))
.subscribe((action) => duck.websocket$.next(action.payload))
}
}