observable-duck
- 发布于
目录
RxJS
RxJS 对自己的解释是:RxJS is a library for composing asynchronous and event-based programs by using observable sequences. (RxJS 是一个使用可观察序列编写异步和基于事件的程序的库)
Think of RxJS as Lodash for events.(可以认为是事件版本的 Lodash)
那实际上就是对观察者模式的一个实践,借用官网的例子来说明下用法就是
document.addEventListener('click', () => console.log('Clicked!'))
// 等价于下面的写法
Rx.fromEvent(document, 'click').subscribe(() => console.log('Clicked!'))
上面的 fromEvent 是一个从指定对象获取指定事件并转化成流的工具方法,将用户在 document 的点击事件流动到事件序列中以订阅处。下面说一下核心用法:
import { Observable } from 'rxjs';
const observable = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
setTimeout(() => {
subscriber.next(3);
subscriber.complete();
}, 1000);
});
console.log('just before subscribe');
observable.subscribe((value) => console.log(value));
console.log('just after subscribe');
// just before subscribe
// 1
// 2
// 3
// just after subscribe
// 4
这里 Observable 是 RxJS 中最核心的对象,用来创建一个流,构造函数提供一个 subscriber,可以用来在流中流入值,结束一个流或者抛错,需要注意的是写到 Observable 内部的代码是惰性的,只有被订阅时才会执行,更多请见官方文档 ,这里仅展示如何用 RxJS 来处理异步代码。
Redux
Redux 强调的是单向不可变数据流,所以状态的改变需要 dispatch 一个 action,然后由 reducer 纯函数进行处理返回新的状态,将所有的副作用通过中间件机制混入到 dispatch 的过程中
那这里 redux 的 action 其实就是一个个事件对象,可以创建一个 action 的 Observable,在派发 action 的时候同时也将其流动到 Observable 中,然后再订阅 action 流去处理副作用,还可以利用到 RxJS 强大的流处理能力(过滤,转换、组合等)
依据这个思路去写一个 redux 的中间件:
function createMiddleware() {
const action$ = new Subject()
let subscription
const middleware = store => next => action => {
next(action)
actionSubject$.next(action) // flow to streamer
}
middleware.run = (streamer) => {
if (subscription) {
return subscription.unsubscribe()
}
subscription = streamer(action$)
}
middleware.close = () => {
if (!subscription) return
subscription.unsubscribe()
}
return middleware
}
这里核心就是 middleware.run 方法的参数 streamer,它是一个接受 action 流为参数的方法,有了 action 流后就可以自由订阅完成各种事情,实际用法如下:
import { Action, applyMiddleware, legacy_createStore } from 'redux'
import { Observable } from 'rxjs'
import { createMiddleware } from 'redux-observable-action'
const reducer = (state = 0, action) => {
switch (action.type) {
case 'INCREMENT':
return state + 1
case 'DECREMENT':
return state - 1
default:
return state
}
}
const streamerMiddleware = createMiddleware()
const store = legacy_createStore(reducer, applyMiddleware(streamerMiddleware))
function rootStreamer(action$: Observable<Action>) {
return action$.subscribe(action => {
if (action.type === 'INCREMENT') {
// do something
}
})
}
streamerMiddleware.run(rootStreamer)
实际上 RxJS 社区还有一套实现 redux-observable 会更加复杂一些,提供 action 流的同时还有 state 的流,在 redux-observable 中注册流的方法称作 epic,接受 action 作为参数,并要求返回 action$
个人感觉强制要求返回 action 流的做法并不自由,有时候仅想要订阅后完成一些任务,而不是单纯做过滤、转换
然后介绍一下 duck 模式:在 redux 应用中,各模块 reducer、creaters、types 往往被拆分到各处,加个东西让人挺恼火,因此就有了 duck,反其道而行之,将各组件捆绑在一起组织和维护代码,这套模式已有很多库进行了实践,比如 saga-duck 将各组件组装在一起同时将 redux-saga 一起混入,完整包含状态管理和异步流程控制,可复用、可组装
下面的 observable-duck 则借鉴 saga-duck 的做法,将 redux-saga 的支持替换成 redux-observable-action,同时优化 ts 类型支持,使得 redux store 在 duck 和 react 组件中更加类型完备
observable-duck
将 redux 和 rxjs 的 Observable 组合在一起,可以方便的聚合逻辑并且支持流出 state 到 react 组件,类型完备,开发体验良好
基本使用
安装
npm i observable-duck --save
组织代码
import { Action } from "redux";
import { Base, Action, take } from "observable-duck";
import { Observable } from "rxjs";
import { debounceTime } from 'rxjs/operators'
export default class AppDuck extends Base {
get quickTypes() {
enum Type {
INCREMENT,
DECREMENT,
}
return {
...super.quickTypes,
...Type,
};
}
get reducers() {
const types = this.types;
return {
count: (state = 0, action) => {
switch (action.type) {
case types.INCREMENT:
return state + 1;
case types.DECREMENT:
return state - 1;
default:
return state;
}
},
};
}
get creators() {
const types = this.types;
return {
...super.creators,
increment: () => ({ type: types.INCREMENT }),
decrement: () => ({ type: types.DECREMENT }),
};
}
/**
* 添加 Action 装饰器,注入 redux 的 action 流
*/
@Action
increment(action$: Observable<Action>) {
const duck = this;
return action$
.pipe(
take(duck.types.INCREMENT), // 过滤 action
debounceTime(20), // 加入防抖以实现 redux-saga 中 takeLatest 的效果
)
.subscribe((action) => {
const state = duck.getState();
// preform your effect
});
}
}
// 创建该 duck 的运行时,基本上各部分的组装逻辑都在这里进行
const runtime = Runtime.create(AppDuck)
测试 duck 纯逻辑部分
然后你可以仅测试你的纯逻辑部分确认没有问题
import { expect, test, describe } from 'vitest'
import { Runtime } from 'observable-duck'
import AppDuck from './AppDuck'
describe('AppDuck.test', () => {
test('AppDuck.count', async () => {
const runtime = Runtime.create(AppDuck)
const { dispatch, getState, creators } = runtime.duck
expect(getState().count).toBe(0)
dispatch(creators.increment())
expect(getState().count).toBe(1)
dispatch(creators.decrement())
expect(getState().count).toBe(0)
})
})
连接 React 组件
然后可以将 runtime 连接到 react 组件(由 react-redux 实现),使用 ConnectedProps<AppDuck>
注释后的 props 也将获得完整的类型
import * as React from 'react'
import { ConnectedProps } from 'observable-duck'
import AppDuck from './AppDuck'
function App(props: ConnectedProps<AppDuck>) {
const { duck, store, dispatch } = props
const { creators } = duck
const [count, setCount] = React.useState(0)
return <div>
<h4>React.useState</h4>
<div>
<button onClick={() => setCount((c) => c - 1)}>-</button>
<span>{count}</span>
<button onClick={() => setCount((c) => c + 1)}>+</button>
</div>
<h4>React Redux</h4>
<div>
<button onClick={() => dispatch(creators.decrement())}>-</button>
<span>{store.count}</span>
<button onClick={() => dispatch(creators.increment())}>+</button>
</div>
</div>
}
// 导出连接过后的组件
export default Runtime.create(AppDuck).connect(App)
或者并不一定非得连接到 react 组件上使用,由于是 redux 和 rxjs 组合使用,你可以使用 Observable 任意发挥
另一种方式在 React 组件中使用
还可以用提供的 useDuck hook 在组件内部创建 redux 仓库与 duck 的实例化
// index.tsx
import * as React from 'react'
import { useDuck } from 'observable-duck'
import Duck from './Duck'
export default function () {
const { duck, store, dispatch } = useDuck(Duck)
const { types } = duck
return (
<div>
useDuck:
<div>
<input
value={store.value}
onChange={(v) =>
dispatch({
type: types.SET_VALUE,
payload: v.target.value,
})
}
/>
</div>
<br />
</div>
)
}
// Duck.ts
import { Base, reduceFromPayload } from 'observable-duck'
export default class Duck extends Base {
get quickTypes() {
enum Type {
SET_VALUE,
}
return {
...Type,
}
}
get reducers() {
const { types } = this
return {
value: reduceFromPayload<string>(types.SET_VALUE, ''),
}
}
}
扩展 duck
为了更好的内聚与更低的耦合,duck 也支持将别的逻辑成块的 duck 作为子 duck 扩展进自身,duck 中的 redux store,Observable 都将注册,并且扩展后的 duck 同样类型完备
外层 duck 关注扩展进来的逻辑,可以接受内层 duck 的 action 进行处理,内层 duck 不关注自身所处环境,因此不会处理外层环境 duck 的 action
import { Observable } from 'rxjs'
import { Base, Action } from 'observable-duck'
export default class ParentDuck extends Base {
get quickDucks() {
return {
sub: SubDuck,
}
}
get quickTypes() {
enum Type {
INCREMENT,
DECREMENT,
SET_VALUE,
}
return {
...super.quickTypes,
...Type,
}
}
get reducers() {
const types = this.types
return {
name: (state: string) => 'init name',
timestamp: (state: number) => Date.now(),
value: reduceFromPayload<string>(types.SET_VALUE, ''),
}
}
get creators() {
const types = this.types
return {
...super.creators,
increment: () => ({ type: types.INCREMENT }),
decrement: () => ({ type: types.DECREMENT }),
}
}
@Action
incrementStreamer(action$: Observable<Action>) {
const duck = this
return action$.pipe(filterAction(duck.types.INCREMENT)).subscribe((action) => {
const state = duck.getState()
console.log(state.sub.aaa)
// 可以将派发 action 由子 duck 处理
dispatch({
type: ducks.sub.types.SUB,
payload: 'from parent\'s value',
})
})
}
}
class SubDuck extends Base {
get quickTypes() {
enum Type {
SUB,
}
return {
...super.quickTypes,
...Type,
}
}
get reducers() {
const types = this.types
return {
aaa: (state: string) => 'init name',
value: reduceFromPayload<string>(types.SUB, ''),
}
}
// ...
}
连接外部订阅源
在 duck 中订阅其他 runtime.redux 然后 do anything
// One.ts
import { Runtime } from 'observable-duck'
import Template from './Template'
import Duck from './Duck'
export const runtime = Runtime.create(Duck) // 单独将 runtime 也导出
export default runtime.connect(Template) // 将 runtime 与 react 组件连接后默认导出
// Two.ts
import { Base, From } from 'observable-duck'
import { runtime } from './One.ts'
class Search extends Base {
@From(runtime.redux)
accept(external$: Observable<DuckState<typeof runtime.duck>>) {
const { dispatch } = this
return external$.pipe(/** ... */).subscribe((value) => {
dispatch({
type: "...",
payload: value,
})
})
}
}
或者直接引用外部源可以做到双向同步
import { Observable } from 'rxjs'
import { webSocket } from 'rxjs/webSocket'
import { Base, Action, Cache, take } from 'observable-duck'
export default class Search extends Base {
get quickTypes() {
enum Type {
SET_VALUE,
SEARCH,
}
return {
...Type,
}
}
get reducers() {
const types = this.types
return {
value: reduceFromPayload<string>(types.SET_VALUE, ''),
}
}
get creators() {
const { types } = this
return {
setValue: createToPayload<string>(types.SET_VALUE),
search: createToPayload<void>(types.SEARCH),
}
}
@Cache()
get websocket$() {
const { types, dispatch } = this
const $ = webSocket('wss://***')
this.subscription.add(
$.subscribe((data) => dispatch({
type: types.SET_VALUE,
payload: data,
}))
)
return $
}
@Action
watchSearch(action$: Observable<Action>) {
const duck = this
return action$
.pipe(take(duck.types.SEARCH))
.subscribe((action) => duck.websocket$.next(action.payload))
}
}