banner
chenyh

chenyh

chenyh.xlog.app
github
telegram
email

Combining RxJS and Redux Code Practice: observable-duck

First, let's talk about RxJS. RxJS describes itself as: RxJS is a library for composing asynchronous and event-based programs by using observable sequences.

Think of RxJS as Lodash for events.

In essence, it is a practical implementation of the observer pattern. To illustrate its usage, let's borrow an example from the official website:

document.addEventListener('click', () => console.log('Clicked!'))

// Equivalent to the following code
Rx.fromEvent(document, 'click').subscribe(() => console.log('Clicked!'))

The fromEvent method above is a utility that retrieves a specified event from a specified object and converts it into a stream, allowing the user's click events on the document to flow into the event sequence for subscription. Now, let's discuss the core usage:

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  setTimeout(() => {
    subscriber.next(3);
    subscriber.complete();
  }, 1000);
});

console.log('just before subscribe');
observable.subscribe((value) => console.log(value));
console.log('just after subscribe');

// just before subscribe
// 1
// 2
// 3
// just after subscribe
// 4

Here, Observable is the core object in RxJS, used to create a stream. The constructor provides a subscriber that can be used to emit values into the stream, complete the stream, or throw errors. It is important to note that the code written inside the Observable is lazy and will only execute when subscribed. For more details, please refer to the official documentation. Here, we only demonstrate how to use RxJS to handle asynchronous code.

Next, let's briefly discuss Redux. Redux emphasizes a unidirectional immutable data flow, so state changes require dispatching an action, which is then processed by a pure function called a reducer to return a new state. All side effects are mixed into the dispatch process through middleware mechanisms.

In this context, Redux actions are essentially event objects, and you can create an action Observable. When dispatching an action, it can simultaneously flow into the Observable, allowing you to subscribe to the action stream to handle side effects, while also leveraging RxJS's powerful stream processing capabilities (filtering, transforming, combining, etc.).

Based on this idea, here’s how to write a Redux middleware:

function createMiddleware() {
  const action$ = new Subject()
  let subscription
  const middleware = store => next => action => {
    next(action)
    action$.next(action) // flow to streamer
  }
  middleware.run = (streamer) => {
    if (subscription) {
      return subscription.unsubscribe()
    }
    subscription = streamer(action$)
  }
  middleware.close = () => {
    if (!subscription) return
    subscription.unsubscribe()
  }
  return middleware
}

The core here is the middleware.run method's parameter streamer, which is a method that accepts the action stream as an argument. With the action stream, you can freely subscribe to complete various tasks. The actual usage is as follows:

import { Action, applyMiddleware, legacy_createStore } from 'redux'
import { Observable } from 'rxjs'
import { createMiddleware } from 'redux-observable-action'

const reducer = (state = 0, action) => {
  switch (action.type) {
    case 'INCREMENT':
      return state + 1
    case 'DECREMENT':
      return state - 1
    default:
      return state
  }
}

const streamerMiddleware = createMiddleware()
const store = legacy_createStore(reducer, applyMiddleware(streamerMiddleware))

function rootStreamer(action$: Observable<Action>) {
  return action$.subscribe(action => {
    if (action.type === 'INCREMENT') {
      // do something
    }
  })
}

streamerMiddleware.run(rootStreamer)

In fact, the RxJS community has a more complex implementation called redux-observable, which provides both action streams and state streams. In redux-observable, the method for registering streams is called epic, which accepts action$ and state$ as parameters and requires returning action$.

Personally, I feel that the requirement to return an action stream is not very flexible; sometimes you just want to subscribe and complete some tasks, rather than simply filtering or transforming.

Now, let's introduce the duck pattern: In Redux applications, various modules' reducers, creators, and types are often scattered everywhere, which can be quite annoying. Thus, the duck pattern was created, which bundles components together to organize and maintain code. Many libraries have implemented this pattern, such as saga-duck, which assembles components while integrating redux-saga, fully encompassing state management and asynchronous flow control, making it reusable and composable.

The following observable-duck takes inspiration from saga-duck, replacing redux-saga support with redux-observable-action, while also optimizing TypeScript type support, making the Redux store more type-complete in both duck and React components.

observable-duck#

Combining Redux and RxJS's Observable allows for easy aggregation of logic and supports flowing state into React components, with complete types and a good development experience.

Basic Usage#

Installation#

npm i observable-duck --save

Organizing Code#

import { Action } from "redux";
import { Base, Action, take } from "observable-duck";
import { Observable } from "rxjs";
import { debounceTime } from 'rxjs/operators'

export default class AppDuck extends Base {
  get quickTypes() {
    enum Type {
      INCREMENT,
      DECREMENT,
    }
    return {
      ...super.quickTypes,
      ...Type,
    };
  }
  get reducers() {
    const types = this.types;
    return {
      count: (state = 0, action) => {
        switch (action.type) {
          case types.INCREMENT:
            return state + 1;
          case types.DECREMENT:
            return state - 1;
          default:
            return state;
        }
      },
    };
  }
  get creators() {
    const types = this.types;
    return {
      ...super.creators,
      increment: () => ({ type: types.INCREMENT }),
      decrement: () => ({ type: types.DECREMENT }),
    };
  }

  /**
   * Add Action decorator to inject Redux's action stream
   */
  @Action
  increment(action$: Observable<Action>) {
    const duck = this;
    return action$
      .pipe(
        take(duck.types.INCREMENT), // filter action
        debounceTime(20), // add debounce to achieve redux-saga's takeLatest effect
      )
      .subscribe((action) => {
        const state = duck.getState();
        // perform your effect
      });
  }
}

// Create the runtime for this duck, where the assembly logic for various parts occurs
const runtime = Runtime.create(AppDuck)

Testing the Pure Logic of Duck#

You can test your pure logic part to confirm everything is fine:

import { expect, test, describe } from 'vitest'
import { Runtime } from 'observable-duck'
import AppDuck from './AppDuck'

describe('AppDuck.test', () => {
  test('AppDuck.count', async () => {
    const runtime = Runtime.create(AppDuck)
    const { dispatch, getState, creators } = runtime.duck
    expect(getState().count).toBe(0)
    dispatch(creators.increment())
    expect(getState().count).toBe(1)
    dispatch(creators.decrement())
    expect(getState().count).toBe(0)
  })
})

Connecting React Components#

You can then connect the runtime to React components (implemented by react-redux), and the props annotated with ConnectedProps<AppDuck> will also gain complete types:

import * as React from 'react'
import { ConnectedProps } from 'observable-duck'
import AppDuck from './AppDuck'

function App(props: ConnectedProps<AppDuck>) {
  const { duck, store, dispatch } = props
  const { creators } = duck
  const [count, setCount] = React.useState(0)
  return <div>
    <h4>React.useState</h4>
    <div>
      <button onClick={() => setCount((c) => c - 1)}>-</button>
      <span>{count}</span>
      <button onClick={() => setCount((c) => c + 1)}>+</button>
    </div>
    <h4>React Redux</h4>
    <div>
      <button onClick={() => dispatch(creators.decrement())}>-</button>
      <span>{store.count}</span>
      <button onClick={() => dispatch(creators.increment())}>+</button>
    </div>
  </div>
}

// Export the connected component
export default Runtime.create(AppDuck).connect(App)

Alternatively, you don't necessarily have to connect to React components; since Redux and RxJS are used together, you can use Observable in any way you like.

Another Way to Use in React Components#

You can also use the provided useDuck hook to create a Redux store and instantiate the duck within the component:

// index.tsx
import * as React from 'react'
import { useDuck } from 'observable-duck'
import Duck from './Duck'

export default function () {
  const { duck, store, dispatch } = useDuck(Duck)
  const { types } = duck
  return (
    <div>
      useDuck:
      <div>
        <input
          value={store.value}
          onChange={(v) =>
            dispatch({
              type: types.SET_VALUE,
              payload: v.target.value,
            })
          }
        />
      </div>
      <br />
    </div>
  )
}

// Duck.ts
import { Base, reduceFromPayload } from 'observable-duck'
export default class Duck extends Base {
  get quickTypes() {
    enum Type {
      SET_VALUE,
    }
    return {
      ...Type,
    }
  }
  get reducers() {
    const { types } = this
    return {
      value: reduceFromPayload<string>(types.SET_VALUE, ''),
    }
  }
}

Extending Duck#

To achieve better cohesion and lower coupling, ducks also support extending other logically grouped ducks as child ducks. The Redux store and Observable within the duck will be registered, and the extended duck will also be type-complete.

The outer duck focuses on the logic that is extended, and can handle actions from the inner duck, while the inner duck does not concern itself with its environment and thus will not handle actions from the outer duck.

import { Observable } from 'rxjs'
import { Base, Action } from 'observable-duck'

export default class ParentDuck extends Base {
  get quickDucks() {
    return {
      sub: SubDuck,
    }
  }
  get quickTypes() {
    enum Type {
      INCREMENT,
      DECREMENT,
      SET_VALUE,
    }
    return {
      ...super.quickTypes,
      ...Type,
    }
  }
  get reducers() {
    const types = this.types
    return {
      name: (state: string) => 'init name',
      timestamp: (state: number) => Date.now(),
      value: reduceFromPayload<string>(types.SET_VALUE, ''),
    }
  }
  get creators() {
    const types = this.types
    return {
      ...super.creators,
      increment: () => ({ type: types.INCREMENT }),
      decrement: () => ({ type: types.DECREMENT }),
    }
  }
  @Action
  incrementStreamer(action$: Observable<Action>) {
    const duck = this
    return action$.pipe(filterAction(duck.types.INCREMENT)).subscribe((action) => {
      const state = duck.getState()
      console.log(state.sub.aaa)
      // Dispatch action to be handled by the child duck
      dispatch({
        type: ducks.sub.types.SUB,
        payload: 'from parent\'s value',
      })
    })
  }
}

class SubDuck extends Base {
  get quickTypes() {
    enum Type {
      SUB,
    }
    return {
      ...super.quickTypes,
      ...Type,
    }
  }
  get reducers() {
    const types = this.types
    return {
      aaa: (state: string) => 'init name',
      value: reduceFromPayload<string>(types.SUB, ''),
    }
  }
  // ...
}

Connecting External Subscription Sources#

You can subscribe to other runtime.redux in a duck and do anything you want:

// One.ts
import { Runtime } from 'observable-duck'
import Template from './Template'
import Duck from './Duck'

export const runtime = Runtime.create(Duck) // Export the runtime separately
export default runtime.connect(Template) // Connect the runtime to the React component and default export it
// Two.ts
import { Base, From } from 'observable-duck'
import { runtime } from './One.ts'

class Search extends Base {
  @From(runtime.redux)
  accept(external$: Observable<DuckState<typeof runtime.duck>>) {
    const { dispatch } = this
    return external$.pipe(/** ... */).subscribe((value) => {
      dispatch({
        type: "...",
        payload: value,
      })
    })
  }
}

Alternatively, you can directly reference external sources for bidirectional synchronization:

import { Observable } from 'rxjs'
import { webSocket } from 'rxjs/webSocket'
import { Base, Action, Cache, take } from 'observable-duck'

export default class Search extends Base {
  get quickTypes() {
    enum Type {
      SET_VALUE,
      SEARCH,
    }
    return {
      ...Type,
    }
  }
  get reducers() {
    const types = this.types
    return {
      value: reduceFromPayload<string>(types.SET_VALUE, ''),
    }
  }
  get creators() {
    const { types } = this
    return {
      setValue: createToPayload<string>(types.SET_VALUE),
      search: createToPayload<void>(types.SEARCH),
    }
  }
  @Cache()
  get websocket$() {
    const { types, dispatch } = this
    const $ = webSocket('wss://***')
    this.subscription.add(
      $.subscribe((data) => dispatch({
        type: types.SET_VALUE,
        payload: data,
      }))
    )
    return $
  }
  @Action
  watchSearch(action$: Observable<Action>) {
    const duck = this
    return action$
      .pipe(take(duck.types.SEARCH))
      .subscribe((action) => duck.websocket$.next(action.payload))
  }
}
Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.