背景是在项目中需要选定一个处理redux异步数据流控制库,redux-saga相对来说更受欢迎,使用库的过程中,如果能知道它的实现方式,对于更优雅,更有效率的使用库本身有比较大的好处,所以抽空看了它的源码实现,有所收获,记录下来分享给大家!
假设不使用redux-thunk或者redux-saga
使用async await等简单方案 获取用户id数据,发出action将数据保存到store
function fetchData(userId) {// 返回一个promise, 内容是数据}function fetchUser() {// 返回一个promise, 内容是用户信息}class Component {componentDidMount() {const { userInfo: { userId } } = await fetchUser();store.dispatch({type: 'UPDATE_USER_ID', payload: userId});const { data } = await fetchData(userId);store.dispatch({type: 'UPDATE_DATA', payload: data});}}
需要扩展怎么办,相对来说好一点的方案如下
class DataHandler {static fetchData() {const { userInfo: { userId } } = await fetchUser();store.dispatch({type: 'UPDATE_USER_ID', payload: userId});const { data } = await fetchData(userId);store.dispatch({type: 'UPDATE_DATA', payload: data});}}class ComponentA {componentDidMount() {DataHandler.fetchData();}}class ComponentB {componentDidMount() {DataHandler.fetchData();}}
可是如果需要定制数据处理逻辑,有的组件获取到数据含有其他额外处理就没法完成复用,需要额外开发逻辑处理,最好的方法是,把异步数据获取和操作独立出来,通过指令压缩或扩展逻辑
使用redux-saga之后
异步逻辑抽离到了saga文件,可以构建统一的 集中的异步处理中心,可以直接分发action触发对应逻辑,复用也简单,分发同一个action即可,如果有特殊定制,只需另加saga方法特殊处理,是store reducer专注处理state,接口与实现 分离
// storeimport rootSaga from './sagas'const sagaMiddleware = createSagaMiddleware()const store = ...sagaMiddleware.run(rootSaga)// saga.jsimport { delay } from 'redux-saga'import { put, takeEvery, all } from 'redux-saga/effects'function* incrementAsync() {yield delay(1000)//...可以加额外逻辑yield put({ type: 'INCREMENT1' })// ...handler}function* incrementAsync2() {yield delay(1000)//...可以加额外逻辑yield put({ type: 'INCREMENT2' })// ...handler}function* incrementAsync3() {yield delay(1000)//...可以加额外逻辑yield put({ type: 'INCREMENT3' })// ...handler}function* watchIncrementAsync() {yield takeEvery('INCREMENT_ASYNC1', incrementAsync)yield takeEvery('INCREMENT_ASYNC2', incrementAsync2)yield takeEvery('INCREMENT_ASYNC', incrementAsync3)}function* watchIncrementAsync2() {yield takeEvery('INCREMENT_ASYNC2', incrementAsync2)yield takeEvery('INCREMENT_ASYNC3', incrementAsync3)}export default function* rootSaga() {yield all([helloSaga(),watchIncrementAsync(),watchIncrementAsync2()])}// renderfunction render() {ReactDOM.render(<Countervalue={store.getState()}onIncrement={() => action('INCREMENT')}onDecrement={() => action('DECREMENT')}// onIncrementAsync={() => action('INCREMENT_ASYNC1')}// onIncrementAsync={() => action('INCREMENT_ASYNC2')}onIncrementAsync={() => action('INCREMENT_ASYNC3')} />,document.getElementById('root'))}
redux-saga的本质,集中处理副作用
只要是跟函数外部环境发生的交互就都属于副作用
包括但不限于
- 发送一个 http 请求
- 更改文件系统
- 往数据库插入记录
- 使用LocalStorage进行本地存储
- 打印/log
- 获取用户输入
- DOM 查询
- 访问系统状态
redux是一个同步的树形state系统,没有考虑异步的处理,把这些脏活交给了用户,redux-saga就是尽量保持redux的纯粹,自己接管异步脏活,让redux只维护state树
纯粹的state树
reducers: {update: state => {state.value -= 1}}
掺杂了异步脏活的state树
reducers: {update: state => {await api()// ...// ...// ...state.value -= 1}}
// 可以当成一个handlerfunction* incrementAsync() {yield delay(1000)yield put({ type: 'INCREMENT' })}function* watchIncrementAsync() {yield takeEvery('INCREMENT_ASYNC', incrementAsync)}channel = [{effect: takeEvery// 注册的typetype: 'INCREMENT_ASYNC',cb: incrementAsync}]
saga的优点
- 更容易管理副作用
- 程序更高效执行
- 易于测试
- 易于处理错误
javascript的Generator生成器
saga就是一个中间件,实现异步流程控制管理,异步流程管理,ES6特性Generator,调用next()方法推进流程执行,对生成器函数概念陌生的可以移步https://www.infoq.cn/article/es6-in-depth-iterators-and-the-for-of-loop/
function* someSaga() {// yield 一个 promise 应该返回 promise resolve 的值const response = yield fetch('https://example.com/')// yield 一个 take effect 应该返回一个 Actionconst action = yield take('SOME_ACTION')// yield 一个 all effect 应该返回一个数组,该数组记录了 effect1 或 effect2 的执行结果const allResult = yield all([effect1, effect2])}someSaga()
在我们选定用生成器函数来控制异步流程后,怎么有序的,自动的触发next()很关键
for循环消费迭代器
for循环是一个迭代器, 本质上for循环是一个迭代器语法糖,底层调用iterator.next()实现迭代,但是功能有限,不能传参
function* range(start, end) {for (let i = start; i < end; i++) {yield i}}for (let x of range(1, 10)) {console.log(x)}// 输出 1, 2, 3 ... 8, 9//
while (true)消费
同步操作,无法控制等待异步完成继续执行接下来的iterator.next()
const iterator = range(1, 10)while (true) {const { done, value } = iterator.next(/* 我们可以决定这里的参数 */)if (done) {break}if (value === 5) {iterator.throw(new Error('5 is bad input'))}console.log(value)}// 输出 1, 2, 3, 4,然后抛出异常 '5 is bad input'
saga的解决方案-递归方法,next中嵌套next
function* range2(start, end) {for (let i = start; i < end; i++) {const response = yield iconsole.log(`response of ${i} is ${response}`)}}const iterator = range2(1, 10)function next(arg, isErr) {// 注意驱动函数多了参数 arg 和 isErrlet resultif (isErr) {result = iterator.throw(arg)} else {// 这里我们将 arg 作为参数传递给 iterator.next,作为 effect-producer 中 yield 语句的返回值result = iterator.next(arg)}const { done, value } = resultif (done) {return}console.log('getting:', value)if (value === 5) {// 将 isErr 置为 true,就能用递归的方式调用 iterator.throw 方法next(new Error('5 is bad input'), true)} else {// 延迟调用驱动函数;「响应」是「请求」的两倍setTimeout(() => next(value * 2), value * 1000)}}next()// 输出// getting: 1// response of 1 is 2// getting: 2// response of 2 is 4// getting: 3// response of 3 is 6// getting: 4// response of 4 is 8// getting: 5// Uncaught Error: 5 is bad input// 输出 getting: x 之后,输出会暂停一段时间
注册saga中间件
saga本质上是一个redux中间件,如果对redux不熟悉,请移步https://redux.js.org/
redux中间件的经典使用姿势,高阶函数
function sagaMiddleware({ getState, dispatch }) {boundRunSaga = runSaga.bind(null, {...options,context,channel,dispatch,getState,sagaMonitor,})return next => action => {if (sagaMonitor && sagaMonitor.actionDispatched) {sagaMonitor.actionDispatched(action)}const result = next(action) // hit reducerschannel.put(action) // 通过saga处理逻辑return result}}
从这边可以看到一个action是有会经过两个通道,稍微注意action commit type的唯一性,基本上也不会有冲突,示例如下
// sagaconst initialState: WorkflowState = {// 点击某一条记录时 推入该条记录approve: null,permission: null,records: [],userinfo: {},};export function makeFetchRecordsCommit(row: WorkflowRecord): RecordsUpdateCommit {return {type: WorkflowActionType.updateRecords,payload: row,};}export default {namespace: 'workflow',state: initialState,effects: {*fetchRecords(action: StringPayloadAction, { call, put }: EffectsCommandMap) {const formNo = action.payload;const res: Response = yield call(fetchRecordsByFormNo, formNo);if (!res.empty) {const records = res.content;yield put(makeFetchRecordsCommit(records));}},},reducers: {updateRecords(state: WorkflowState, { payload }: ApproveUpdateCommit) {return {...state,records: payload,};},},};
启动saga注册
sagaMiddleware.run = (...args) => {if (process.env.NODE_ENV !== 'production' && !boundRunSaga) {throw new Error('Before running a Saga, you must mount the Saga middleware on the Store using applyMiddleware')}return boundRunSaga(...args)}// store上注册const sagaMiddleware = createSagaMiddleware()const store = createStore(reducer, applyMiddleware(sagaMiddleware))sagaMiddleware.run(rootSaga)
runSaga
export function runSaga({ channel = stdChannel(), dispatch, getState, context = {}, sagaMonitor, effectMiddlewares, onError = logError },saga,...args) {const iterator = saga(...args) // rootSaga执行获取了迭代器const env = {channel,dispatch: wrapSagaDispatch(dispatch),getState,sagaMonitor,onError,finalizeRunEffect,}// 开启taskreturn immediately(() => {const task = proc(env, iterator, context, effectId, getMetaInfo(saga), /* isRoot */ true, undefined)if (sagaMonitor) {sagaMonitor.effectResolved(effectId, task)}return task})}
redux-saga之take
相对最常用的api,捕捉未来的action并作出对应操作
// rootSagaexport function* rootSaga () {yield take(actions.FETCH_USERINFO)// 当dispatch action.type命中actions.FETCH_USERINFO// 执行下列逻辑// ... handlerTakehandlerTake}
- 第一步,执行rootSata,得到一个迭代器
import { take } from "./internal/io"// rootSaga()const iterator = saga(...args)// 执行procconst task = proc(env, iterator, context, effectId, getMetaInfo(saga), /* isRoot */ true, undefined)// 构造一个next方法 = currcbfunction next (arg, isErr) {try {let result = iterator.next(arg)if (!result.done) {digestEffect(result.value, parentEffectId, next)}} catch (error) {...}}// 构造完next方法后直接调用next()// 执行 构造一个effect 本质上是一个纯对象take(actions.FETCH_USERINFO)// patternOrChannel = actions.FETCH_USERINFOmakeEffect(effectTypes.TAKE, { pattern: patternOrChannel })// result = { value: ..., done: false }// payload = { pattern: patternOrChannel }value = {[IO]: true, // 标记 判断是否属于sagacombinator: false,type, // effect类型 Takepayload,}// result.done === false// 进入代码分支 digestEffect 且传递了next作为回调函数// cb = nextfunction digestEffect(effect, parentEffectId, cb, label = '') {let effectSettled // 标记当前effect处理进度function currCb(res, isErr) {if (effectSettled) {return}effectSettled = true // 表示当前effect处理过了 适用于竞赛请求的情况cb(res, isErr)}// currCb = () => { next() } 包裹一层nextfinalRunEffect(effect, effectId, currCb)}// 开始执行runEffectfunction runEffect(effect, effectId, currCb) {if (is.promise(effect)) {// 非promiseresolvePromise(effect, currCb)} else if (is.iterator(effect)) {// 这边当前实例不会进入proc(env, effect, task.context, effectId, meta, /* isRoot */ false, currCb)} else if (effect && effect[IO]) {// 符合当前分支const effectRunner = effectRunnerMap[effect.type]effectRunner(env, effect.payload, currCb, executingContext)} else {// 如果不是saga effect 照常进行 此时将会调用顶层next,rootSaga方法体将继续执行currCb(effect)}}// 执行effectRunner// 匹配到的是runTakeEffect take类型的runnerfunction runTakeEffect(env, { channel = env.channel, pattern, maybe }, cb) {// 构造回调函数 作为后续响应当前actions.FETCH_USERINFO要执行的注册handlerconst takeCb = input => {if (input instanceof Error) {cb(input, true)return}if (isEnd(input) && !maybe) {cb(TERMINATE)return}cb(input)}try {// 注册channel// 注意tabkeCb是包裹了一层上层传递下来的next 也就是说 程序卡在// 卡在准备执行handlerTake的地方// 因为需要iterator.next()才能继续执行handlerTake,暂时没有触发因子channel.take(takeCb, is.notUndef(pattern) ? matcher(pattern) : null)// matcher处理通配符等情况} catch (err) {cb(err, true)return}}// channel.taketake(cb, matcher = matchers.wildcard) {if (closed) {cb(END)return}cb[MATCH] = matcher // 正则生成 根据dispatch的action.type来匹配触发nextTakers.push(cb) // buffer数组保存cb 包含match信息,也就是actions.FETCH_USERINFO这类信息}// 至此saga take注册完成// 触发 在注册saga的地方提过,action.type会进入两个分支 一个是redux分支,一个是saga分支return next => action => {if (sagaMonitor && sagaMonitor.actionDispatched) {sagaMonitor.actionDispatched(action)}const result = next(action)channel.put(action) // actionreturn result}// 看看put做了啥put(input) {if (closed) {return}if (isEnd(input)) {close()return}// 当前维护的所有actions相关takecbconst takers = (currentTakers = nextTakers)for (let i = 0, len = takers.length; i < len; i++) {const taker = takers[i]// 找出正则匹配上的action typeif (taker[MATCH](input)) {taker.cancel()// takecb = 顶层next() 触发iterator.next() 执行handlerTaketaker(input)}}},
redux-saga之takeEvery
发现take有个问题,就是只能触发一个就不再执行,不使用与多次点击
不推荐的解决办法
// rootSagaexport function* rootSaga () {while (true) {yield take(actions.FETCH_USERINFO)// 当dispatch action.type命中actions.FETCH_USERINFO// 执行下列逻辑// ... handlerTakehandlerTake}}
redux-saga提供了一个工具函数takeEvery,也就是说每次分发的actions.FETCH_USERINFO都能收集且触发继而执行handlerTake
// rootSagaexport function* rootSaga () {yield takeEvery(actions.FETCH_USERINFO)// 当dispatch action.type命中actions.FETCH_USERINFO// 执行下列逻辑// ... handlerTakehandlerTake}
解析它的实现,本质上和while…true类似,消费完上一个effect后,生成一个新的take effect
// rootSagaexport function* rootSaga () {yield takeEvery(actions.FETCH_USERINFO)// 当dispatch action.type命中actions.FETCH_USERINFO// 执行下列逻辑// ... handlerTakehandlerTake}
构造takeEvery
// 工具方法 构造迭代器 核心是next方法export function makeIterator(next, thro = kThrow, name = 'iterator') {const iterator = { meta: { name }, next, throw: thro, return: kReturn, isSagaIterator: true }return iterator}// next方法编写export default function fsmIterator(fsm, startState, name) {let stateUpdater,errorState,effect,nextState = startState// 除非报错的情况 否则nextState在q1, q2间切换function next(arg, error) {if (nextState === qEnd) {return done(arg)}if (error && !errorState) {nextState = qEndthrow error} else {stateUpdater && stateUpdater(arg) // 状态流转方法const currentState = error ? fsm[errorState](error) : fsm[nextState]();({ nextState, effect, stateUpdater, errorState } = currentState)return nextState === qEnd ? done(arg) : effect}}return makeIterator(next, error => next(null, error), name)}export default function takeEvery(patternOrChannel, worker, ...args) {// 熟悉的地方 模拟take的处理结果 相当于使用take注册const yTake = { done: false, value: take(patternOrChannel) }// fork类型effectconst yFork = ac => ({ done: false, value: fork(worker, ...args, ac) })// 切换q1, q2let action,setAction = ac => (action = ac)// 原来代码// return fsmIterator(// {// q1() {// return { nextState: 'q2', effect: yTake, stateUpdater: setAction }// },// q2() {// return { nextState: 'q1', effect: yFork(action) }// },// },// 'q1', // 初始状态q1// `takeEvery(${safeName(patternOrChannel)}, ${worker.name})`,// )// 搬运后代码 可以看到状态在q1,q2轮转const fsm = {q1() {return { nextState: 'q2', effect: yTake, stateUpdater: setAction }},q2() {return { nextState: 'q1', effect: yFork(action) }},},function next(arg, error) {if (nextState === qEnd) {return done(arg)}if (error && !errorState) {nextState = qEndthrow error} else {stateUpdater && stateUpdater(arg)const currentState = error ? fsm[errorState](error) : fsm[nextState]();({ nextState, effect, stateUpdater, errorState } = currentState)return nextState === qEnd ? done(arg) : effect}}}
用take类似的分析方法走一遍
// rootSagaconst iterator = saga(...args)// 开启procconst task = proc(env, iterator, context, effectId, getMetaInfo(saga), /* isRoot */ true, undefined)// 构造进程nextfunction next(arg, isErr) {try {let result = iterator.next(arg)if (!result.done) {digestEffect(result.value, parentEffectId, next)}}}// 构造完next 立即执行next()// iterator.next()得到// { nextState: 'q2', effect: yTake, stateUpdater: setAction }// 也就是function next(arg, error) {if (nextState === qEnd) {return done(arg)}if (error && !errorState) {nextState = qEndthrow error} else {stateUpdater && stateUpdater(arg)const currentState = error ? fsm[errorState](error) : fsm[nextState]();({ nextState, effect, stateUpdater, errorState } = currentState)// q1.effectreturn q1.effect}}// 得到一个take类型effect,参照上面take的处理,直到捕捉到一个actions.FETCH_USERINFO// 假设捕捉成功,此时执行最外层定义的proc的next中包裹的result = iterator.next(arg)// 得到result = { nextState: 'q1', effect: yFork(action) }// 因为result.done !== true// 继续执行digestEffect(result.value, .., proc-next)// 继续执行effectRunner(env, effect.payload, currCb, executingContext)// 判断出effect.type === effectTypes.forkfunction runForkEffect(env, { context, fn, args, detached }, cb, { task: parent }) {// 类似rootSagaconst taskIterator = createTaskIterator({ context, fn, args })const meta = getIteratorMetaInfo(taskIterator, fn)immediately(() => {// 开启一个proc,就类似根saga的处理方法 走一遍单个的take注册流程// 这个开启之后是另一个管理流程了 但是之前我们proc的收尾呢const child = proc(env, taskIterator, parent.context, currentEffectId, meta, detached, undefined)if (detached) {cb(child)} else {if (child.isRunning()) {parent.queue.addTask(child)cb(child)} else if (child.isAborted()) {parent.queue.abort(child.error())} else {// cb收尾// 执行handleTakecb(child)}}})}
总结就是,开启生产一个take effect,消费一个的同时再次生产一个take effect待命下一次的action
redux-saga之call
前面类似take,最后的effectRunner不同
function runCallEffect(env, { context, fn, args }, cb, { task }) {// catch synchronous failures; see #152try {const result = fn.apply(context, args)if (is.promise(result)) {// 如果是异步请求 result = then// 只有在then完成后,才会执行cb,也就是proc-next// 所以call是阻塞的resolvePromise(result, cb)return}if (is.iterator(result)) {// resolve iteratorproc(env, result, task.context, currentEffectId, getMetaInfo(fn), /* isRoot */ false, cb)return}cb(result)} catch (error) {cb(error, true)}}
redux-saga之put
前面类似take,最后的effectRunner不同
function runPutEffect(env, { channel, action, resolve }, cb) {// 重点是有调度器 调度器阻塞asap(() => {let resulttry {// 直接dispatchresult = (channel ? channel.put : env.dispatch)(action)} catch (error) {cb(error, true)return}if (resolve && is.promise(result)) {resolvePromise(result, cb)} else {cb(result)}})}
为什么redux-saga需要调度器
源码阅读的时候对一个schedule文件充满好奇,没有了解到它的用处
假设我们有如下场景
function* rootSaga() {// next0()yield fork(genA) // LINE-1// next1()yield fork(genB) // LINE-2}function* genA() {// nextA0()yield put({ type: 'A' })// nextA1()yield take('B')}function* genB() {// nextB0()yield take('A')// nextB1()yield put({ type: 'B' })}// 按照刚刚的分析,如果没有调度器// 执行next0(), fork,开启了一个新的proc// 执行genA, put为同步方法,立即执行 发出一个action// 此时还没有执行nextA1,没有执行genB,所以有可能遗失一个action 'A'// 注册完 takeB后,执行genB,分发 action 'B', 成功捕捉// 有遗失action是不能接受的
调度方案解决action遗失问题
// 任务队列const queue = []// 锁let semaphore = 0// 执行任务function exec(task) {try {suspend()task()} finally {release()}}// 尽快执行任务export function asap(task) {// 压入任务栈queue.push(task)if (!semaphore) {suspend()flush()}}// 立即执行任务export function immediately(task) {try {suspend()return task()} finally {flush()}}// 上锁function suspend() {semaphore++}// 解锁function release() {semaphore--}function flush() {release()let taskwhile (!semaphore && (task = queue.shift()) !== undefined) {exec(task)}}
重新分析上述例子
// runSaga中的immediately 立即执行 rootSaga立即执行// task1return immediately(() => {const task = proc(env, iterator, context, effectId, getMetaInfo(saga), /* isRoot */ true, undefined)if (sagaMonitor) {sagaMonitor.effectResolved(effectId, task)}return task})// forkAfunction runForkEffect(env, { context, fn, args, detached }, cb, { task: parent }) {const taskIterator = createTaskIterator({ context, fn, args })const meta = getIteratorMetaInfo(taskIterator, fn)// 立即执行 task2immediately(() => {// procAconst child = proc(env, taskIterator, parent.context, currentEffectId, meta, detached, undefined)if (detached) {cb(child)} else {if (child.isRunning()) {parent.queue.addTask(child)cb(child)} else if (child.isAborted()) {parent.queue.abort(child.error())} else {cb(child)}}})}// genA put task3, 注意 此时是在procA中,此任务本身处于task2中,而立即执行的任务会上锁asap(() => {let resulttry {result = (channel ? channel.put : env.dispatch)(action)} catch (error) {cb(error, true)return}if (resolve && is.promise(result)) {resolvePromise(result, cb)} else {cb(result)}})// 此时put堵塞,不是put本身堵塞,是注册put,迭代器没有回调proc-next// 调起forkB,注册take 'B',没有启用调度器 用不着 注册不影响后续,而且越早越好// put B同样堵塞,调度器执行flush,put A执行,而后put B执行,// 两个put都是在take注册完成之后,不再有action遗失问题
总结一下 ,调度器本质就是,注册操作优先,触发操作靠后,理论上最好不要写这种交叉逻辑
