observable-duck

发布于

RxJS

RxJS 对自己的解释是:RxJS is a library for composing asynchronous and event-based programs by using observable sequences. (RxJS 是一个使用可观察序列编写异步和基于事件的程序的库)

Think of RxJS as Lodash for events.(可以认为是事件版本的 Lodash)

那实际上就是对观察者模式的一个实践,借用官网的例子来说明下用法就是

document.addEventListener('click', () => console.log('Clicked!'))

// 等价于下面的写法
Rx.fromEvent(document, 'click').subscribe(() => console.log('Clicked!'))

上面的 fromEvent 是一个从指定对象获取指定事件并转化成流的工具方法,将用户在 document 的点击事件流动到事件序列中以订阅处。下面说一下核心用法:

import { Observable } from 'rxjs';

const observable = new Observable((subscriber) => {
  subscriber.next(1);
  subscriber.next(2);
  setTimeout(() => {
    subscriber.next(3);
    subscriber.complete();
  }, 1000);
});

console.log('just before subscribe');
observable.subscribe((value) => console.log(value));
console.log('just after subscribe');

// just before subscribe
// 1
// 2
// 3
// just after subscribe
// 4

这里 Observable 是 RxJS 中最核心的对象,用来创建一个流,构造函数提供一个 subscriber,可以用来在流中流入值,结束一个流或者抛错,需要注意的是写到 Observable 内部的代码是惰性的,只有被订阅时才会执行,更多请见官方文档 ,这里仅展示如何用 RxJS 来处理异步代码。

Redux

Redux 强调的是单向不可变数据流,所以状态的改变需要 dispatch 一个 action,然后由 reducer 纯函数进行处理返回新的状态,将所有的副作用通过中间件机制混入到 dispatch 的过程中

那这里 redux 的 action 其实就是一个个事件对象,可以创建一个 action 的 Observable,在派发 action 的时候同时也将其流动到 Observable 中,然后再订阅 action 流去处理副作用,还可以利用到 RxJS 强大的流处理能力(过滤,转换、组合等)

依据这个思路去写一个 redux 的中间件:

function createMiddleware() {
  const action$ = new Subject()
  let subscription
  const middleware = store => next => action => {
    next(action)
    actionSubject$.next(action) // flow to streamer
  }
  middleware.run = (streamer) => {
    if (subscription) {
      return subscription.unsubscribe()
    }
    subscription = streamer(action$)
  }
  middleware.close = () => {
    if (!subscription) return
    subscription.unsubscribe()
  }
  return middleware
}

这里核心就是 middleware.run 方法的参数 streamer,它是一个接受 action 流为参数的方法,有了 action 流后就可以自由订阅完成各种事情,实际用法如下:

import { Action, applyMiddleware, legacy_createStore } from 'redux'
import { Observable } from 'rxjs'
import { createMiddleware } from 'redux-observable-action'

const reducer = (state = 0, action) => {
  switch (action.type) {
    case 'INCREMENT':
      return state + 1
    case 'DECREMENT':
      return state - 1
    default:
      return state
  }
}

const streamerMiddleware = createMiddleware()
const store = legacy_createStore(reducer, applyMiddleware(streamerMiddleware))

function rootStreamer(action$: Observable<Action>) {
  return action$.subscribe(action => {
    if (action.type === 'INCREMENT') {
      // do something
    }
  })
}

streamerMiddleware.run(rootStreamer)

实际上 RxJS 社区还有一套实现 redux-observable 会更加复杂一些,提供 action 流的同时还有 state 的流,在 redux-observable 中注册流的方法称作 epic,接受 actionstate state 作为参数,并要求返回 action$

个人感觉强制要求返回 action 流的做法并不自由,有时候仅想要订阅后完成一些任务,而不是单纯做过滤、转换

然后介绍一下 duck 模式:在 redux 应用中,各模块 reducer、creaters、types 往往被拆分到各处,加个东西让人挺恼火,因此就有了 duck,反其道而行之,将各组件捆绑在一起组织和维护代码,这套模式已有很多库进行了实践,比如 saga-duck 将各组件组装在一起同时将 redux-saga 一起混入,完整包含状态管理和异步流程控制,可复用、可组装

下面的 observable-duck 则借鉴 saga-duck 的做法,将 redux-saga 的支持替换成 redux-observable-action,同时优化 ts 类型支持,使得 redux store 在 duck 和 react 组件中更加类型完备

observable-duck

将 redux 和 rxjs 的 Observable 组合在一起,可以方便的聚合逻辑并且支持流出 state 到 react 组件,类型完备,开发体验良好

基本使用

安装

npm i observable-duck --save

组织代码

import { Action } from "redux";
import { Base, Action, take } from "observable-duck";
import { Observable } from "rxjs";
import { debounceTime } from 'rxjs/operators'

export default class AppDuck extends Base {
  get quickTypes() {
    enum Type {
      INCREMENT,
      DECREMENT,
    }
    return {
      ...super.quickTypes,
      ...Type,
    };
  }
  get reducers() {
    const types = this.types;
    return {
      count: (state = 0, action) => {
        switch (action.type) {
          case types.INCREMENT:
            return state + 1;
          case types.DECREMENT:
            return state - 1;
          default:
            return state;
        }
      },
    };
  }
  get creators() {
    const types = this.types;
    return {
      ...super.creators,
      increment: () => ({ type: types.INCREMENT }),
      decrement: () => ({ type: types.DECREMENT }),
    };
  }

  /**
   * 添加 Action 装饰器,注入 redux 的 action 流
   */
  @Action
  increment(action$: Observable<Action>) {
    const duck = this;
    return action$
      .pipe(
        take(duck.types.INCREMENT), // 过滤 action
        debounceTime(20), // 加入防抖以实现 redux-saga 中 takeLatest 的效果
      )
      .subscribe((action) => {
        const state = duck.getState();
        // preform your effect
      });
  }
}

// 创建该 duck 的运行时,基本上各部分的组装逻辑都在这里进行
const runtime = Runtime.create(AppDuck)

测试 duck 纯逻辑部分

然后你可以仅测试你的纯逻辑部分确认没有问题

import { expect, test, describe } from 'vitest'
import { Runtime } from 'observable-duck'
import AppDuck from './AppDuck'

describe('AppDuck.test', () => {
  test('AppDuck.count', async () => {
    const runtime = Runtime.create(AppDuck)
    const { dispatch, getState, creators } = runtime.duck
    expect(getState().count).toBe(0)
    dispatch(creators.increment())
    expect(getState().count).toBe(1)
    dispatch(creators.decrement())
    expect(getState().count).toBe(0)
  })
})

连接 React 组件

然后可以将 runtime 连接到 react 组件(由 react-redux 实现),使用 ConnectedProps<AppDuck> 注释后的 props 也将获得完整的类型

import * as React from 'react'
import { ConnectedProps } from 'observable-duck'
import AppDuck from './AppDuck'

function App(props: ConnectedProps<AppDuck>) {
  const { duck, store, dispatch } = props
  const { creators } = duck
  const [count, setCount] = React.useState(0)
  return <div>
    <h4>React.useState</h4>
    <div>
      <button onClick={() => setCount((c) => c - 1)}>-</button>
      <span>{count}</span>
      <button onClick={() => setCount((c) => c + 1)}>+</button>
    </div>
    <h4>React Redux</h4>
    <div>
      <button onClick={() => dispatch(creators.decrement())}>-</button>
      <span>{store.count}</span>
      <button onClick={() => dispatch(creators.increment())}>+</button>
    </div>
  </div>
}

// 导出连接过后的组件
export default Runtime.create(AppDuck).connect(App)

或者并不一定非得连接到 react 组件上使用,由于是 redux 和 rxjs 组合使用,你可以使用 Observable 任意发挥

另一种方式在 React 组件中使用

还可以用提供的 useDuck hook 在组件内部创建 redux 仓库与 duck 的实例化

// index.tsx
import * as React from 'react'
import { useDuck } from 'observable-duck'
import Duck from './Duck'

export default function () {
  const { duck, store, dispatch } = useDuck(Duck)
  const { types } = duck
  return (
    <div>
      useDuck:
      <div>
        <input
          value={store.value}
          onChange={(v) =>
            dispatch({
              type: types.SET_VALUE,
              payload: v.target.value,
            })
          }
        />
      </div>
      <br />
    </div>
  )
}

// Duck.ts
import { Base, reduceFromPayload } from 'observable-duck'
export default class Duck extends Base {
  get quickTypes() {
    enum Type {
      SET_VALUE,
    }
    return {
      ...Type,
    }
  }
  get reducers() {
    const { types } = this
    return {
      value: reduceFromPayload<string>(types.SET_VALUE, ''),
    }
  }
}

扩展 duck

为了更好的内聚与更低的耦合,duck 也支持将别的逻辑成块的 duck 作为子 duck 扩展进自身,duck 中的 redux store,Observable 都将注册,并且扩展后的 duck 同样类型完备

外层 duck 关注扩展进来的逻辑,可以接受内层 duck 的 action 进行处理,内层 duck 不关注自身所处环境,因此不会处理外层环境 duck 的 action

import { Observable } from 'rxjs'
import { Base, Action } from 'observable-duck'

export default class ParentDuck extends Base {
  get quickDucks() {
    return {
      sub: SubDuck,
    }
  }
  get quickTypes() {
    enum Type {
      INCREMENT,
      DECREMENT,
      SET_VALUE,
    }
    return {
      ...super.quickTypes,
      ...Type,
    }
  }
  get reducers() {
    const types = this.types
    return {
      name: (state: string) => 'init name',
      timestamp: (state: number) => Date.now(),
      value: reduceFromPayload<string>(types.SET_VALUE, ''),
    }
  }
  get creators() {
    const types = this.types
    return {
      ...super.creators,
      increment: () => ({ type: types.INCREMENT }),
      decrement: () => ({ type: types.DECREMENT }),
    }
  }
  @Action
  incrementStreamer(action$: Observable<Action>) {
    const duck = this
    return action$.pipe(filterAction(duck.types.INCREMENT)).subscribe((action) => {
      const state = duck.getState()
      console.log(state.sub.aaa)
      // 可以将派发 action 由子 duck 处理
      dispatch({
        type: ducks.sub.types.SUB,
        payload: 'from parent\'s value',
      })
    })
  }
}

class SubDuck extends Base {
  get quickTypes() {
    enum Type {
      SUB,
    }
    return {
      ...super.quickTypes,
      ...Type,
    }
  }
  get reducers() {
    const types = this.types
    return {
      aaa: (state: string) => 'init name',
      value: reduceFromPayload<string>(types.SUB, ''),
    }
  }
  // ...
}

连接外部订阅源

在 duck 中订阅其他 runtime.redux 然后 do anything

// One.ts
import { Runtime } from 'observable-duck'
import Template from './Template'
import Duck from './Duck'

export const runtime = Runtime.create(Duck) // 单独将 runtime 也导出
export default runtime.connect(Template) // 将 runtime 与 react 组件连接后默认导出
// Two.ts
import { Base, From } from 'observable-duck'
import { runtime } from './One.ts'

class Search extends Base {
  @From(runtime.redux)
  accept(external$: Observable<DuckState<typeof runtime.duck>>) {
    const { dispatch } = this
    return external$.pipe(/** ... */).subscribe((value) => {
      dispatch({
        type: "...",
        payload: value,
      })
    })
  }
}

或者直接引用外部源可以做到双向同步

import { Observable } from 'rxjs'
import { webSocket } from 'rxjs/webSocket'
import { Base, Action, Cache, take } from 'observable-duck'

export default class Search extends Base {
  get quickTypes() {
    enum Type {
      SET_VALUE,
      SEARCH,
    }
    return {
      ...Type,
    }
  }
  get reducers() {
    const types = this.types
    return {
      value: reduceFromPayload<string>(types.SET_VALUE, ''),
    }
  }
  get creators() {
    const { types } = this
    return {
      setValue: createToPayload<string>(types.SET_VALUE),
      search: createToPayload<void>(types.SEARCH),
    }
  }
  @Cache()
  get websocket$() {
    const { types, dispatch } = this
    const $ = webSocket('wss://***')
    this.subscription.add(
      $.subscribe((data) => dispatch({
        type: types.SET_VALUE,
        payload: data,
      }))
    )
    return $
  }
  @Action
  watchSearch(action$: Observable<Action>) {
    const duck = this
    return action$
      .pipe(take(duck.types.SEARCH))
      .subscribe((action) => duck.websocket$.next(action.payload))
  }
}